relay-stats.service.ts raw
1 import type {
2 TRelayEntry,
3 TNetworkRelayStats,
4 TRelayDirection,
5 } from '@/types/relay-management'
6 import {
7 AUTO_DISABLE_FAILURE_RATE,
8 AUTO_DISABLE_MIN_ATTEMPTS,
9 DIRECTION_BITS,
10 STATUS_BITS,
11 DIRECTION_FROM_BITS,
12 STATUS_FROM_BITS,
13 } from '@/types/relay-management'
14 import { normalizeUrl } from '@/lib/url'
15 import networkIdentityService from './network-identity.service'
16 import indexedDb from './indexed-db.service'
17 import { ipToBytes, bytesToIp, ipToHex } from './network-identity.service'
18
19 const FLUSH_INTERVAL_MS = 10_000
20
21 interface StoredNetworkStats {
22 publishSuccess: number
23 publishFailure: number
24 fetchSuccess: number
25 fetchFailure: number
26 lastSeen: number
27 }
28
29 interface StoredRelayEntry {
30 url: string
31 direction: TRelayDirection
32 relayIp: string | null
33 networkStats: Array<[string, StoredNetworkStats]>
34 manualExclude: boolean
35 status: 'pending' | 'approved' | 'rejected'
36 reason: string
37 addedAt: number
38 updatedAt: number
39 }
40
41 class RelayStatsService {
42 private entries: Map<string, TRelayEntry> = new Map()
43 private dirtyUrls: Set<string> = new Set()
44 private flushTimer: ReturnType<typeof setTimeout> | null = null
45 private initPromise: Promise<void> | null = null
46
47 async init(): Promise<void> {
48 if (this.initPromise) return this.initPromise
49 this.initPromise = this.loadFromDb()
50 return this.initPromise
51 }
52
53 private async loadFromDb(): Promise<void> {
54 const rows = await indexedDb.getAllRelayStats()
55 for (const row of rows) {
56 const stored = row.value as StoredRelayEntry
57 if (!stored || !stored.url) continue
58 const networkStats = new Map<string, TNetworkRelayStats>()
59 if (Array.isArray(stored.networkStats)) {
60 for (const [key, val] of stored.networkStats) {
61 networkStats.set(key, { ...val })
62 }
63 }
64 this.entries.set(stored.url, {
65 url: stored.url,
66 direction: stored.direction ?? 'outbox',
67 relayIp: stored.relayIp ?? null,
68 networkStats,
69 manualExclude: stored.manualExclude ?? false,
70 status: stored.status ?? 'pending',
71 reason: stored.reason ?? '',
72 addedAt: stored.addedAt ?? Date.now(),
73 updatedAt: stored.updatedAt ?? Date.now(),
74 })
75 }
76 }
77
78 getOrCreateEntry(url: string): TRelayEntry {
79 url = normalizeUrl(url)
80 let entry = this.entries.get(url)
81 if (!entry) {
82 entry = {
83 url,
84 direction: 'outbox',
85 relayIp: null,
86 networkStats: new Map(),
87 manualExclude: false,
88 status: 'pending',
89 reason: '',
90 addedAt: Date.now(),
91 updatedAt: Date.now(),
92 }
93 this.entries.set(url, entry)
94 }
95 return entry
96 }
97
98 recordPublishSuccess(url: string): void {
99 this.recordStat(url, 'publishSuccess')
100 }
101
102 recordPublishFailure(url: string): void {
103 this.recordStat(url, 'publishFailure')
104 }
105
106 recordFetchSuccess(url: string): void {
107 this.recordStat(url, 'fetchSuccess')
108 }
109
110 recordFetchFailure(url: string): void {
111 this.recordStat(url, 'fetchFailure')
112 }
113
114 private recordStat(
115 url: string,
116 field: keyof Pick<TNetworkRelayStats, 'publishSuccess' | 'publishFailure' | 'fetchSuccess' | 'fetchFailure'>
117 ): void {
118 const entry = this.getOrCreateEntry(url)
119 const ipHex = networkIdentityService.getCurrentIpHex() ?? 'unknown'
120 let stats = entry.networkStats.get(ipHex)
121 if (!stats) {
122 stats = {
123 publishSuccess: 0,
124 publishFailure: 0,
125 fetchSuccess: 0,
126 fetchFailure: 0,
127 lastSeen: Date.now(),
128 }
129 entry.networkStats.set(ipHex, stats)
130 }
131 stats[field]++
132 stats.lastSeen = Date.now()
133 entry.updatedAt = Date.now()
134 this.dirtyUrls.add(url)
135 this.scheduleFlush()
136 }
137
138 getFailureRate(url: string): number {
139 const entry = this.entries.get(normalizeUrl(url))
140 if (!entry) return 0
141 const ipHex = networkIdentityService.getCurrentIpHex() ?? 'unknown'
142 const stats = entry.networkStats.get(ipHex)
143 if (!stats) return 0
144 const total =
145 stats.publishSuccess + stats.publishFailure + stats.fetchSuccess + stats.fetchFailure
146 if (total === 0) return 0
147 return (stats.publishFailure + stats.fetchFailure) / total
148 }
149
150 isAutoDisabled(url: string): boolean {
151 const entry = this.entries.get(normalizeUrl(url))
152 if (!entry) return false
153 const ipHex = networkIdentityService.getCurrentIpHex() ?? 'unknown'
154 const stats = entry.networkStats.get(ipHex)
155 if (!stats) return false
156 const total =
157 stats.publishSuccess + stats.publishFailure + stats.fetchSuccess + stats.fetchFailure
158 if (total <= AUTO_DISABLE_MIN_ATTEMPTS) return false
159 const failureRate = (stats.publishFailure + stats.fetchFailure) / total
160 return failureRate >= AUTO_DISABLE_FAILURE_RATE
161 }
162
163 getEntry(url: string): TRelayEntry | undefined {
164 return this.entries.get(normalizeUrl(url))
165 }
166
167 getAllEntries(): TRelayEntry[] {
168 return Array.from(this.entries.values())
169 }
170
171 setRelayIp(url: string, ip: string): void {
172 url = normalizeUrl(url)
173 const entry = this.getOrCreateEntry(url)
174 entry.relayIp = ip
175 entry.updatedAt = Date.now()
176 this.dirtyUrls.add(url)
177 this.scheduleFlush()
178 }
179
180 updateEntry(
181 url: string,
182 updates: Partial<Pick<TRelayEntry, 'status' | 'direction' | 'manualExclude' | 'reason'>>
183 ): void {
184 const entry = this.getOrCreateEntry(url)
185 if (updates.status !== undefined) entry.status = updates.status
186 if (updates.direction !== undefined) entry.direction = updates.direction
187 if (updates.manualExclude !== undefined) entry.manualExclude = updates.manualExclude
188 if (updates.reason !== undefined) entry.reason = updates.reason
189 entry.updatedAt = Date.now()
190 this.dirtyUrls.add(url)
191 this.scheduleFlush()
192 }
193
194 private scheduleFlush(): void {
195 if (this.flushTimer) return
196 this.flushTimer = setTimeout(() => {
197 this.flushTimer = null
198 this.flush()
199 }, FLUSH_INTERVAL_MS)
200 }
201
202 private async flush(): Promise<void> {
203 if (this.dirtyUrls.size === 0) return
204 const urls = Array.from(this.dirtyUrls)
205 this.dirtyUrls.clear()
206
207 for (const url of urls) {
208 const entry = this.entries.get(url)
209 if (!entry) continue
210 const serialized: StoredRelayEntry = {
211 url: entry.url,
212 direction: entry.direction,
213 relayIp: entry.relayIp,
214 networkStats: Array.from(entry.networkStats.entries()),
215 manualExclude: entry.manualExclude,
216 status: entry.status,
217 reason: entry.reason,
218 addedAt: entry.addedAt,
219 updatedAt: entry.updatedAt,
220 }
221 await indexedDb.putRelayStats(url, serialized)
222 }
223 }
224
225 encodeBinary(): Uint8Array {
226 const entries = this.getAllEntries()
227 const encoder = new TextEncoder()
228
229 // Calculate total size
230 let totalSize = 0
231 const encodedUrls: Uint8Array[] = []
232 for (const entry of entries) {
233 const urlBytes = encoder.encode(entry.url)
234 encodedUrls.push(urlBytes)
235 // 1 (url len) + N (url) + 1 (flags) + 4 (relay ip) + 1 (stats count) + K*12 (stats)
236 totalSize += 1 + urlBytes.length + 1 + 4 + 1 + entry.networkStats.size * 12
237 }
238
239 const buffer = new ArrayBuffer(totalSize)
240 const view = new DataView(buffer)
241 const bytes = new Uint8Array(buffer)
242 let offset = 0
243
244 for (let i = 0; i < entries.length; i++) {
245 const entry = entries[i]
246 const urlBytes = encodedUrls[i]
247
248 // URL length + URL bytes
249 view.setUint8(offset, urlBytes.length)
250 offset += 1
251 bytes.set(urlBytes, offset)
252 offset += urlBytes.length
253
254 // Flags: direction(2 bits) | status(2 bits) | manualExclude(1 bit) | reserved(3 bits)
255 const dirBits = DIRECTION_BITS[entry.direction] ?? 0
256 const stsBits = STATUS_BITS[entry.status] ?? 0
257 const exclBit = entry.manualExclude ? 1 : 0
258 const flags = ((dirBits & 0x3) << 6) | ((stsBits & 0x3) << 4) | ((exclBit & 0x1) << 3)
259 view.setUint8(offset, flags)
260 offset += 1
261
262 // Relay IPv4 (4 bytes)
263 const relayIpBytes = ipToBytes(entry.relayIp ?? '0.0.0.0')
264 bytes.set(relayIpBytes, offset)
265 offset += 4
266
267 // Network stats count
268 view.setUint8(offset, Math.min(entry.networkStats.size, 255))
269 offset += 1
270
271 // Network stats entries
272 let statsWritten = 0
273 for (const [ipHex, stats] of entry.networkStats) {
274 if (statsWritten >= 255) break
275 // Client external IPv4 from hex (4 bytes)
276 const clientIpBytes = ipHexToBytes(ipHex)
277 bytes.set(clientIpBytes, offset)
278 offset += 4
279
280 view.setUint16(offset, Math.min(stats.publishSuccess, 65535))
281 offset += 2
282 view.setUint16(offset, Math.min(stats.publishFailure, 65535))
283 offset += 2
284 view.setUint16(offset, Math.min(stats.fetchSuccess, 65535))
285 offset += 2
286 view.setUint16(offset, Math.min(stats.fetchFailure, 65535))
287 offset += 2
288 statsWritten++
289 }
290 }
291
292 return new Uint8Array(buffer, 0, offset)
293 }
294
295 decodeBinary(data: Uint8Array): void {
296 const view = new DataView(data.buffer, data.byteOffset, data.byteLength)
297 const decoder = new TextDecoder()
298 let offset = 0
299
300 while (offset < data.length) {
301 if (offset + 1 > data.length) break
302
303 // URL length + URL
304 const urlLen = view.getUint8(offset)
305 offset += 1
306 if (offset + urlLen > data.length) break
307 const url = decoder.decode(data.subarray(offset, offset + urlLen))
308 offset += urlLen
309
310 if (offset + 6 > data.length) break
311
312 // Flags
313 const flags = view.getUint8(offset)
314 offset += 1
315 const dirBits = (flags >> 6) & 0x3
316 const stsBits = (flags >> 4) & 0x3
317 const exclBit = (flags >> 3) & 0x1
318 const direction = DIRECTION_FROM_BITS[dirBits]
319 const status = STATUS_FROM_BITS[stsBits]
320 const manualExclude = exclBit === 1
321
322 // Relay IPv4
323 const relayIpRaw = bytesToIp(data.subarray(offset, offset + 4))
324 const relayIp = relayIpRaw === '0.0.0.0' ? null : relayIpRaw
325 offset += 4
326
327 // Network stats count
328 const statsCount = view.getUint8(offset)
329 offset += 1
330
331 const incomingStats = new Map<string, TNetworkRelayStats>()
332 for (let s = 0; s < statsCount; s++) {
333 if (offset + 12 > data.length) break
334
335 const clientIpBytes = data.subarray(offset, offset + 4)
336 const clientIpHex = ipToHex(bytesToIp(clientIpBytes))
337 offset += 4
338
339 const publishSuccess = view.getUint16(offset)
340 offset += 2
341 const publishFailure = view.getUint16(offset)
342 offset += 2
343 const fetchSuccess = view.getUint16(offset)
344 offset += 2
345 const fetchFailure = view.getUint16(offset)
346 offset += 2
347
348 incomingStats.set(clientIpHex, {
349 publishSuccess,
350 publishFailure,
351 fetchSuccess,
352 fetchFailure,
353 lastSeen: Date.now(),
354 })
355 }
356
357 // Merge into existing entries
358 const existing = this.entries.get(url)
359 if (existing) {
360 // Merge network stats with monotonic max
361 for (const [ipHex, incoming] of incomingStats) {
362 const local = existing.networkStats.get(ipHex)
363 if (local) {
364 local.publishSuccess = Math.max(local.publishSuccess, incoming.publishSuccess)
365 local.publishFailure = Math.max(local.publishFailure, incoming.publishFailure)
366 local.fetchSuccess = Math.max(local.fetchSuccess, incoming.fetchSuccess)
367 local.fetchFailure = Math.max(local.fetchFailure, incoming.fetchFailure)
368 local.lastSeen = Math.max(local.lastSeen, incoming.lastSeen)
369 } else {
370 existing.networkStats.set(ipHex, incoming)
371 }
372 }
373 // Keep local flags for existing entries
374 if (relayIp && !existing.relayIp) {
375 existing.relayIp = relayIp
376 }
377 existing.updatedAt = Date.now()
378 } else {
379 this.entries.set(url, {
380 url,
381 direction,
382 relayIp,
383 networkStats: incomingStats,
384 manualExclude,
385 status,
386 reason: '',
387 addedAt: Date.now(),
388 updatedAt: Date.now(),
389 })
390 }
391
392 this.dirtyUrls.add(url)
393 }
394
395 if (this.dirtyUrls.size > 0) {
396 this.scheduleFlush()
397 }
398 }
399 }
400
401 /** Convert 8-char hex IP to 4-byte Uint8Array. Handles 'unknown' and malformed input. */
402 function ipHexToBytes(hex: string): Uint8Array {
403 const bytes = new Uint8Array(4)
404 if (hex.length !== 8) return bytes
405 for (let i = 0; i < 4; i++) {
406 const n = parseInt(hex.slice(i * 2, i * 2 + 2), 16)
407 bytes[i] = isNaN(n) ? 0 : n & 0xff
408 }
409 return bytes
410 }
411
412 const relayStatsService = new RelayStatsService()
413 export default relayStatsService
414