propagation-queue.service.ts raw
1 import { Event, kinds } from 'nostr-tools'
2
3 /**
4 * PropagationQueueService
5 *
6 * Manages a queue of events to republish to the user's relays.
7 * This helps:
8 * 1. Cache frequently-accessed profiles locally
9 * 2. Spread data across the decentralized network
10 * 3. Reduce latency for future fetches
11 *
12 * Uses IndexedDB for persistence so the queue survives page refreshes.
13 */
14
15 const DB_NAME = 'smesh-propagation'
16 const DB_VERSION = 1
17 const QUEUE_STORE = 'propagation-queue'
18 const PROPAGATED_STORE = 'propagated-events'
19
20 interface PropagationJob {
21 id: string
22 event: Event
23 targetRelays: string[]
24 addedAt: number
25 attempts: number
26 }
27
28 interface PropagatedEntry {
29 id: string
30 timestamp: number
31 }
32
33 class PropagationQueueService {
34 private db: IDBDatabase | null = null
35 private initPromise: Promise<void> | null = null
36 private isProcessing = false
37 private publishFn: ((event: Event, relays: string[]) => Promise<boolean>) | null = null
38
39 // Kinds that should be propagated
40 private propagatableKinds = [kinds.Metadata, kinds.RelayList]
41
42 // How long to remember propagated events (24 hours)
43 private propagationCooldownMs = 24 * 60 * 60 * 1000
44
45 // Max retry attempts per job
46 private maxAttempts = 3
47
48 /**
49 * Initialize the IndexedDB database
50 */
51 private init(): Promise<void> {
52 if (this.initPromise) {
53 return this.initPromise
54 }
55
56 this.initPromise = new Promise((resolve, reject) => {
57 const request = indexedDB.open(DB_NAME, DB_VERSION)
58
59 request.onerror = () => {
60 console.error('Failed to open propagation queue DB:', request.error)
61 reject(request.error)
62 }
63
64 request.onsuccess = () => {
65 this.db = request.result
66 resolve()
67 }
68
69 request.onupgradeneeded = () => {
70 const db = request.result
71
72 if (!db.objectStoreNames.contains(QUEUE_STORE)) {
73 db.createObjectStore(QUEUE_STORE, { keyPath: 'id' })
74 }
75
76 if (!db.objectStoreNames.contains(PROPAGATED_STORE)) {
77 const store = db.createObjectStore(PROPAGATED_STORE, { keyPath: 'id' })
78 store.createIndex('timestamp', 'timestamp', { unique: false })
79 }
80 }
81 })
82
83 return this.initPromise
84 }
85
86 /**
87 * Set the publish function to use for propagation
88 * Must be called before queue processing can work
89 */
90 setPublishFunction(fn: (event: Event, relays: string[]) => Promise<boolean>): void {
91 this.publishFn = fn
92 }
93
94 /**
95 * Queue an event for propagation to the user's relays
96 */
97 async queueForPropagation(
98 event: Event,
99 targetRelays: string[],
100 currentUserPubkey: string
101 ): Promise<void> {
102 // Skip if not a propagatable kind
103 if (!this.propagatableKinds.includes(event.kind)) {
104 return
105 }
106
107 // Skip own events (they're already on user's relays)
108 if (event.pubkey === currentUserPubkey) {
109 return
110 }
111
112 // Skip if no target relays
113 if (targetRelays.length === 0) {
114 return
115 }
116
117 await this.init()
118
119 // Check if recently propagated
120 const recentlyPropagated = await this.getPropagatedEntry(event.id)
121 if (recentlyPropagated) {
122 const age = Date.now() - recentlyPropagated.timestamp
123 if (age < this.propagationCooldownMs) {
124 return
125 }
126 }
127
128 // Add to queue
129 const job: PropagationJob = {
130 id: event.id,
131 event,
132 targetRelays,
133 addedAt: Date.now(),
134 attempts: 0
135 }
136
137 await this.putJob(job)
138
139 // Trigger processing (don't await - let it run in background)
140 this.processQueue().catch(console.error)
141 }
142
143 /**
144 * Process the propagation queue
145 */
146 async processQueue(): Promise<void> {
147 if (this.isProcessing || !this.publishFn) {
148 return
149 }
150
151 this.isProcessing = true
152
153 try {
154 await this.init()
155 const jobs = await this.getAllJobs()
156
157 for (const job of jobs) {
158 try {
159 // Try to publish
160 const success = await this.publishFn(job.event, job.targetRelays)
161
162 if (success) {
163 // Mark as propagated and remove from queue
164 await this.markAsPropagated(job.id)
165 await this.deleteJob(job.id)
166 } else {
167 // Increment attempts
168 job.attempts++
169 if (job.attempts >= this.maxAttempts) {
170 // Give up after max attempts
171 await this.deleteJob(job.id)
172 } else {
173 await this.putJob(job)
174 }
175 }
176 } catch (error) {
177 console.warn('Propagation failed for event:', job.id, error)
178 job.attempts++
179 if (job.attempts >= this.maxAttempts) {
180 await this.deleteJob(job.id)
181 } else {
182 await this.putJob(job)
183 }
184 }
185 }
186
187 // Clean up old propagated entries
188 await this.cleanupOldEntries()
189 } finally {
190 this.isProcessing = false
191 }
192 }
193
194 /**
195 * Get queue size for debugging/monitoring
196 */
197 async getQueueSize(): Promise<number> {
198 await this.init()
199 return new Promise((resolve, reject) => {
200 if (!this.db) {
201 return resolve(0)
202 }
203
204 const transaction = this.db.transaction(QUEUE_STORE, 'readonly')
205 const store = transaction.objectStore(QUEUE_STORE)
206 const request = store.count()
207
208 request.onsuccess = () => resolve(request.result)
209 request.onerror = () => reject(request.error)
210 })
211 }
212
213 // IndexedDB helpers
214
215 private putJob(job: PropagationJob): Promise<void> {
216 return new Promise((resolve, reject) => {
217 if (!this.db) {
218 return reject(new Error('Database not initialized'))
219 }
220
221 const transaction = this.db.transaction(QUEUE_STORE, 'readwrite')
222 const store = transaction.objectStore(QUEUE_STORE)
223 const request = store.put(job)
224
225 request.onsuccess = () => resolve()
226 request.onerror = () => reject(request.error)
227 })
228 }
229
230 private deleteJob(id: string): Promise<void> {
231 return new Promise((resolve, reject) => {
232 if (!this.db) {
233 return reject(new Error('Database not initialized'))
234 }
235
236 const transaction = this.db.transaction(QUEUE_STORE, 'readwrite')
237 const store = transaction.objectStore(QUEUE_STORE)
238 const request = store.delete(id)
239
240 request.onsuccess = () => resolve()
241 request.onerror = () => reject(request.error)
242 })
243 }
244
245 private getAllJobs(): Promise<PropagationJob[]> {
246 return new Promise((resolve, reject) => {
247 if (!this.db) {
248 return resolve([])
249 }
250
251 const transaction = this.db.transaction(QUEUE_STORE, 'readonly')
252 const store = transaction.objectStore(QUEUE_STORE)
253 const request = store.getAll()
254
255 request.onsuccess = () => resolve(request.result || [])
256 request.onerror = () => reject(request.error)
257 })
258 }
259
260 private getPropagatedEntry(id: string): Promise<PropagatedEntry | null> {
261 return new Promise((resolve, reject) => {
262 if (!this.db) {
263 return resolve(null)
264 }
265
266 const transaction = this.db.transaction(PROPAGATED_STORE, 'readonly')
267 const store = transaction.objectStore(PROPAGATED_STORE)
268 const request = store.get(id)
269
270 request.onsuccess = () => resolve(request.result || null)
271 request.onerror = () => reject(request.error)
272 })
273 }
274
275 private markAsPropagated(id: string): Promise<void> {
276 return new Promise((resolve, reject) => {
277 if (!this.db) {
278 return reject(new Error('Database not initialized'))
279 }
280
281 const transaction = this.db.transaction(PROPAGATED_STORE, 'readwrite')
282 const store = transaction.objectStore(PROPAGATED_STORE)
283 const entry: PropagatedEntry = { id, timestamp: Date.now() }
284 const request = store.put(entry)
285
286 request.onsuccess = () => resolve()
287 request.onerror = () => reject(request.error)
288 })
289 }
290
291 private async cleanupOldEntries(): Promise<void> {
292 if (!this.db) {
293 return
294 }
295
296 const weekAgo = Date.now() - 7 * 24 * 60 * 60 * 1000
297
298 return new Promise((resolve) => {
299 const transaction = this.db!.transaction(PROPAGATED_STORE, 'readwrite')
300 const store = transaction.objectStore(PROPAGATED_STORE)
301 const index = store.index('timestamp')
302 const range = IDBKeyRange.upperBound(weekAgo)
303 const request = index.openCursor(range)
304
305 request.onsuccess = (event) => {
306 const cursor = (event.target as IDBRequest).result
307 if (cursor) {
308 cursor.delete()
309 cursor.continue()
310 } else {
311 resolve()
312 }
313 }
314
315 request.onerror = () => resolve()
316 })
317 }
318 }
319
320 const propagationQueueService = new PropagationQueueService()
321 export default propagationQueueService
322