relay-stats.service.ts raw

   1  import type {
   2    TRelayEntry,
   3    TNetworkRelayStats,
   4    TRelayDirection,
   5  } from '@/types/relay-management'
   6  import {
   7    AUTO_DISABLE_FAILURE_RATE,
   8    AUTO_DISABLE_MIN_ATTEMPTS,
   9    DIRECTION_BITS,
  10    STATUS_BITS,
  11    DIRECTION_FROM_BITS,
  12    STATUS_FROM_BITS,
  13  } from '@/types/relay-management'
  14  import { normalizeUrl } from '@/lib/url'
  15  import networkIdentityService from './network-identity.service'
  16  import indexedDb from './indexed-db.service'
  17  import { ipToBytes, bytesToIp, ipToHex } from './network-identity.service'
  18  
  19  const FLUSH_INTERVAL_MS = 10_000
  20  
  21  interface StoredNetworkStats {
  22    publishSuccess: number
  23    publishFailure: number
  24    fetchSuccess: number
  25    fetchFailure: number
  26    lastSeen: number
  27  }
  28  
  29  interface StoredRelayEntry {
  30    url: string
  31    direction: TRelayDirection
  32    relayIp: string | null
  33    networkStats: Array<[string, StoredNetworkStats]>
  34    manualExclude: boolean
  35    status: 'pending' | 'approved' | 'rejected'
  36    reason: string
  37    addedAt: number
  38    updatedAt: number
  39  }
  40  
  41  class RelayStatsService {
  42    private entries: Map<string, TRelayEntry> = new Map()
  43    private dirtyUrls: Set<string> = new Set()
  44    private flushTimer: ReturnType<typeof setTimeout> | null = null
  45    private initPromise: Promise<void> | null = null
  46  
  47    async init(): Promise<void> {
  48      if (this.initPromise) return this.initPromise
  49      this.initPromise = this.loadFromDb()
  50      return this.initPromise
  51    }
  52  
  53    private async loadFromDb(): Promise<void> {
  54      const rows = await indexedDb.getAllRelayStats()
  55      for (const row of rows) {
  56        const stored = row.value as StoredRelayEntry
  57        if (!stored || !stored.url) continue
  58        const networkStats = new Map<string, TNetworkRelayStats>()
  59        if (Array.isArray(stored.networkStats)) {
  60          for (const [key, val] of stored.networkStats) {
  61            networkStats.set(key, { ...val })
  62          }
  63        }
  64        this.entries.set(stored.url, {
  65          url: stored.url,
  66          direction: stored.direction ?? 'outbox',
  67          relayIp: stored.relayIp ?? null,
  68          networkStats,
  69          manualExclude: stored.manualExclude ?? false,
  70          status: stored.status ?? 'pending',
  71          reason: stored.reason ?? '',
  72          addedAt: stored.addedAt ?? Date.now(),
  73          updatedAt: stored.updatedAt ?? Date.now(),
  74        })
  75      }
  76    }
  77  
  78    getOrCreateEntry(url: string): TRelayEntry {
  79      url = normalizeUrl(url)
  80      let entry = this.entries.get(url)
  81      if (!entry) {
  82        entry = {
  83          url,
  84          direction: 'outbox',
  85          relayIp: null,
  86          networkStats: new Map(),
  87          manualExclude: false,
  88          status: 'pending',
  89          reason: '',
  90          addedAt: Date.now(),
  91          updatedAt: Date.now(),
  92        }
  93        this.entries.set(url, entry)
  94      }
  95      return entry
  96    }
  97  
  98    recordPublishSuccess(url: string): void {
  99      this.recordStat(url, 'publishSuccess')
 100    }
 101  
 102    recordPublishFailure(url: string): void {
 103      this.recordStat(url, 'publishFailure')
 104    }
 105  
 106    recordFetchSuccess(url: string): void {
 107      this.recordStat(url, 'fetchSuccess')
 108    }
 109  
 110    recordFetchFailure(url: string): void {
 111      this.recordStat(url, 'fetchFailure')
 112    }
 113  
 114    private recordStat(
 115      url: string,
 116      field: keyof Pick<TNetworkRelayStats, 'publishSuccess' | 'publishFailure' | 'fetchSuccess' | 'fetchFailure'>
 117    ): void {
 118      const entry = this.getOrCreateEntry(url)
 119      const ipHex = networkIdentityService.getCurrentIpHex() ?? 'unknown'
 120      let stats = entry.networkStats.get(ipHex)
 121      if (!stats) {
 122        stats = {
 123          publishSuccess: 0,
 124          publishFailure: 0,
 125          fetchSuccess: 0,
 126          fetchFailure: 0,
 127          lastSeen: Date.now(),
 128        }
 129        entry.networkStats.set(ipHex, stats)
 130      }
 131      stats[field]++
 132      stats.lastSeen = Date.now()
 133      entry.updatedAt = Date.now()
 134      this.dirtyUrls.add(url)
 135      this.scheduleFlush()
 136    }
 137  
 138    getFailureRate(url: string): number {
 139      const entry = this.entries.get(normalizeUrl(url))
 140      if (!entry) return 0
 141      const ipHex = networkIdentityService.getCurrentIpHex() ?? 'unknown'
 142      const stats = entry.networkStats.get(ipHex)
 143      if (!stats) return 0
 144      const total =
 145        stats.publishSuccess + stats.publishFailure + stats.fetchSuccess + stats.fetchFailure
 146      if (total === 0) return 0
 147      return (stats.publishFailure + stats.fetchFailure) / total
 148    }
 149  
 150    isAutoDisabled(url: string): boolean {
 151      const entry = this.entries.get(normalizeUrl(url))
 152      if (!entry) return false
 153      const ipHex = networkIdentityService.getCurrentIpHex() ?? 'unknown'
 154      const stats = entry.networkStats.get(ipHex)
 155      if (!stats) return false
 156      const total =
 157        stats.publishSuccess + stats.publishFailure + stats.fetchSuccess + stats.fetchFailure
 158      if (total <= AUTO_DISABLE_MIN_ATTEMPTS) return false
 159      const failureRate = (stats.publishFailure + stats.fetchFailure) / total
 160      return failureRate >= AUTO_DISABLE_FAILURE_RATE
 161    }
 162  
 163    getEntry(url: string): TRelayEntry | undefined {
 164      return this.entries.get(normalizeUrl(url))
 165    }
 166  
 167    getAllEntries(): TRelayEntry[] {
 168      return Array.from(this.entries.values())
 169    }
 170  
 171    setRelayIp(url: string, ip: string): void {
 172      url = normalizeUrl(url)
 173      const entry = this.getOrCreateEntry(url)
 174      entry.relayIp = ip
 175      entry.updatedAt = Date.now()
 176      this.dirtyUrls.add(url)
 177      this.scheduleFlush()
 178    }
 179  
 180    updateEntry(
 181      url: string,
 182      updates: Partial<Pick<TRelayEntry, 'status' | 'direction' | 'manualExclude' | 'reason'>>
 183    ): void {
 184      const entry = this.getOrCreateEntry(url)
 185      if (updates.status !== undefined) entry.status = updates.status
 186      if (updates.direction !== undefined) entry.direction = updates.direction
 187      if (updates.manualExclude !== undefined) entry.manualExclude = updates.manualExclude
 188      if (updates.reason !== undefined) entry.reason = updates.reason
 189      entry.updatedAt = Date.now()
 190      this.dirtyUrls.add(url)
 191      this.scheduleFlush()
 192    }
 193  
 194    private scheduleFlush(): void {
 195      if (this.flushTimer) return
 196      this.flushTimer = setTimeout(() => {
 197        this.flushTimer = null
 198        this.flush()
 199      }, FLUSH_INTERVAL_MS)
 200    }
 201  
 202    private async flush(): Promise<void> {
 203      if (this.dirtyUrls.size === 0) return
 204      const urls = Array.from(this.dirtyUrls)
 205      this.dirtyUrls.clear()
 206  
 207      for (const url of urls) {
 208        const entry = this.entries.get(url)
 209        if (!entry) continue
 210        const serialized: StoredRelayEntry = {
 211          url: entry.url,
 212          direction: entry.direction,
 213          relayIp: entry.relayIp,
 214          networkStats: Array.from(entry.networkStats.entries()),
 215          manualExclude: entry.manualExclude,
 216          status: entry.status,
 217          reason: entry.reason,
 218          addedAt: entry.addedAt,
 219          updatedAt: entry.updatedAt,
 220        }
 221        await indexedDb.putRelayStats(url, serialized)
 222      }
 223    }
 224  
 225    encodeBinary(): Uint8Array {
 226      const entries = this.getAllEntries()
 227      const encoder = new TextEncoder()
 228  
 229      // Calculate total size
 230      let totalSize = 0
 231      const encodedUrls: Uint8Array[] = []
 232      for (const entry of entries) {
 233        const urlBytes = encoder.encode(entry.url)
 234        encodedUrls.push(urlBytes)
 235        // 1 (url len) + N (url) + 1 (flags) + 4 (relay ip) + 1 (stats count) + K*12 (stats)
 236        totalSize += 1 + urlBytes.length + 1 + 4 + 1 + entry.networkStats.size * 12
 237      }
 238  
 239      const buffer = new ArrayBuffer(totalSize)
 240      const view = new DataView(buffer)
 241      const bytes = new Uint8Array(buffer)
 242      let offset = 0
 243  
 244      for (let i = 0; i < entries.length; i++) {
 245        const entry = entries[i]
 246        const urlBytes = encodedUrls[i]
 247  
 248        // URL length + URL bytes
 249        view.setUint8(offset, urlBytes.length)
 250        offset += 1
 251        bytes.set(urlBytes, offset)
 252        offset += urlBytes.length
 253  
 254        // Flags: direction(2 bits) | status(2 bits) | manualExclude(1 bit) | reserved(3 bits)
 255        const dirBits = DIRECTION_BITS[entry.direction] ?? 0
 256        const stsBits = STATUS_BITS[entry.status] ?? 0
 257        const exclBit = entry.manualExclude ? 1 : 0
 258        const flags = ((dirBits & 0x3) << 6) | ((stsBits & 0x3) << 4) | ((exclBit & 0x1) << 3)
 259        view.setUint8(offset, flags)
 260        offset += 1
 261  
 262        // Relay IPv4 (4 bytes)
 263        const relayIpBytes = ipToBytes(entry.relayIp ?? '0.0.0.0')
 264        bytes.set(relayIpBytes, offset)
 265        offset += 4
 266  
 267        // Network stats count
 268        view.setUint8(offset, Math.min(entry.networkStats.size, 255))
 269        offset += 1
 270  
 271        // Network stats entries
 272        let statsWritten = 0
 273        for (const [ipHex, stats] of entry.networkStats) {
 274          if (statsWritten >= 255) break
 275          // Client external IPv4 from hex (4 bytes)
 276          const clientIpBytes = ipHexToBytes(ipHex)
 277          bytes.set(clientIpBytes, offset)
 278          offset += 4
 279  
 280          view.setUint16(offset, Math.min(stats.publishSuccess, 65535))
 281          offset += 2
 282          view.setUint16(offset, Math.min(stats.publishFailure, 65535))
 283          offset += 2
 284          view.setUint16(offset, Math.min(stats.fetchSuccess, 65535))
 285          offset += 2
 286          view.setUint16(offset, Math.min(stats.fetchFailure, 65535))
 287          offset += 2
 288          statsWritten++
 289        }
 290      }
 291  
 292      return new Uint8Array(buffer, 0, offset)
 293    }
 294  
 295    decodeBinary(data: Uint8Array): void {
 296      const view = new DataView(data.buffer, data.byteOffset, data.byteLength)
 297      const decoder = new TextDecoder()
 298      let offset = 0
 299  
 300      while (offset < data.length) {
 301        if (offset + 1 > data.length) break
 302  
 303        // URL length + URL
 304        const urlLen = view.getUint8(offset)
 305        offset += 1
 306        if (offset + urlLen > data.length) break
 307        const url = decoder.decode(data.subarray(offset, offset + urlLen))
 308        offset += urlLen
 309  
 310        if (offset + 6 > data.length) break
 311  
 312        // Flags
 313        const flags = view.getUint8(offset)
 314        offset += 1
 315        const dirBits = (flags >> 6) & 0x3
 316        const stsBits = (flags >> 4) & 0x3
 317        const exclBit = (flags >> 3) & 0x1
 318        const direction = DIRECTION_FROM_BITS[dirBits]
 319        const status = STATUS_FROM_BITS[stsBits]
 320        const manualExclude = exclBit === 1
 321  
 322        // Relay IPv4
 323        const relayIpRaw = bytesToIp(data.subarray(offset, offset + 4))
 324        const relayIp = relayIpRaw === '0.0.0.0' ? null : relayIpRaw
 325        offset += 4
 326  
 327        // Network stats count
 328        const statsCount = view.getUint8(offset)
 329        offset += 1
 330  
 331        const incomingStats = new Map<string, TNetworkRelayStats>()
 332        for (let s = 0; s < statsCount; s++) {
 333          if (offset + 12 > data.length) break
 334  
 335          const clientIpBytes = data.subarray(offset, offset + 4)
 336          const clientIpHex = ipToHex(bytesToIp(clientIpBytes))
 337          offset += 4
 338  
 339          const publishSuccess = view.getUint16(offset)
 340          offset += 2
 341          const publishFailure = view.getUint16(offset)
 342          offset += 2
 343          const fetchSuccess = view.getUint16(offset)
 344          offset += 2
 345          const fetchFailure = view.getUint16(offset)
 346          offset += 2
 347  
 348          incomingStats.set(clientIpHex, {
 349            publishSuccess,
 350            publishFailure,
 351            fetchSuccess,
 352            fetchFailure,
 353            lastSeen: Date.now(),
 354          })
 355        }
 356  
 357        // Merge into existing entries
 358        const existing = this.entries.get(url)
 359        if (existing) {
 360          // Merge network stats with monotonic max
 361          for (const [ipHex, incoming] of incomingStats) {
 362            const local = existing.networkStats.get(ipHex)
 363            if (local) {
 364              local.publishSuccess = Math.max(local.publishSuccess, incoming.publishSuccess)
 365              local.publishFailure = Math.max(local.publishFailure, incoming.publishFailure)
 366              local.fetchSuccess = Math.max(local.fetchSuccess, incoming.fetchSuccess)
 367              local.fetchFailure = Math.max(local.fetchFailure, incoming.fetchFailure)
 368              local.lastSeen = Math.max(local.lastSeen, incoming.lastSeen)
 369            } else {
 370              existing.networkStats.set(ipHex, incoming)
 371            }
 372          }
 373          // Keep local flags for existing entries
 374          if (relayIp && !existing.relayIp) {
 375            existing.relayIp = relayIp
 376          }
 377          existing.updatedAt = Date.now()
 378        } else {
 379          this.entries.set(url, {
 380            url,
 381            direction,
 382            relayIp,
 383            networkStats: incomingStats,
 384            manualExclude,
 385            status,
 386            reason: '',
 387            addedAt: Date.now(),
 388            updatedAt: Date.now(),
 389          })
 390        }
 391  
 392        this.dirtyUrls.add(url)
 393      }
 394  
 395      if (this.dirtyUrls.size > 0) {
 396        this.scheduleFlush()
 397      }
 398    }
 399  }
 400  
 401  /** Convert 8-char hex IP to 4-byte Uint8Array. Handles 'unknown' and malformed input. */
 402  function ipHexToBytes(hex: string): Uint8Array {
 403    const bytes = new Uint8Array(4)
 404    if (hex.length !== 8) return bytes
 405    for (let i = 0; i < 4; i++) {
 406      const n = parseInt(hex.slice(i * 2, i * 2 + 2), 16)
 407      bytes[i] = isNaN(n) ? 0 : n & 0xff
 408    }
 409    return bytes
 410  }
 411  
 412  const relayStatsService = new RelayStatsService()
 413  export default relayStatsService
 414