propagation-queue.service.ts raw

   1  import { Event, kinds } from 'nostr-tools'
   2  
   3  /**
   4   * PropagationQueueService
   5   *
   6   * Manages a queue of events to republish to the user's relays.
   7   * This helps:
   8   * 1. Cache frequently-accessed profiles locally
   9   * 2. Spread data across the decentralized network
  10   * 3. Reduce latency for future fetches
  11   *
  12   * Uses IndexedDB for persistence so the queue survives page refreshes.
  13   */
  14  
  15  const DB_NAME = 'smesh-propagation'
  16  const DB_VERSION = 1
  17  const QUEUE_STORE = 'propagation-queue'
  18  const PROPAGATED_STORE = 'propagated-events'
  19  
  20  interface PropagationJob {
  21    id: string
  22    event: Event
  23    targetRelays: string[]
  24    addedAt: number
  25    attempts: number
  26  }
  27  
  28  interface PropagatedEntry {
  29    id: string
  30    timestamp: number
  31  }
  32  
  33  class PropagationQueueService {
  34    private db: IDBDatabase | null = null
  35    private initPromise: Promise<void> | null = null
  36    private isProcessing = false
  37    private publishFn: ((event: Event, relays: string[]) => Promise<boolean>) | null = null
  38  
  39    // Kinds that should be propagated
  40    private propagatableKinds = [kinds.Metadata, kinds.RelayList]
  41  
  42    // How long to remember propagated events (24 hours)
  43    private propagationCooldownMs = 24 * 60 * 60 * 1000
  44  
  45    // Max retry attempts per job
  46    private maxAttempts = 3
  47  
  48    /**
  49     * Initialize the IndexedDB database
  50     */
  51    private init(): Promise<void> {
  52      if (this.initPromise) {
  53        return this.initPromise
  54      }
  55  
  56      this.initPromise = new Promise((resolve, reject) => {
  57        const request = indexedDB.open(DB_NAME, DB_VERSION)
  58  
  59        request.onerror = () => {
  60          console.error('Failed to open propagation queue DB:', request.error)
  61          reject(request.error)
  62        }
  63  
  64        request.onsuccess = () => {
  65          this.db = request.result
  66          resolve()
  67        }
  68  
  69        request.onupgradeneeded = () => {
  70          const db = request.result
  71  
  72          if (!db.objectStoreNames.contains(QUEUE_STORE)) {
  73            db.createObjectStore(QUEUE_STORE, { keyPath: 'id' })
  74          }
  75  
  76          if (!db.objectStoreNames.contains(PROPAGATED_STORE)) {
  77            const store = db.createObjectStore(PROPAGATED_STORE, { keyPath: 'id' })
  78            store.createIndex('timestamp', 'timestamp', { unique: false })
  79          }
  80        }
  81      })
  82  
  83      return this.initPromise
  84    }
  85  
  86    /**
  87     * Set the publish function to use for propagation
  88     * Must be called before queue processing can work
  89     */
  90    setPublishFunction(fn: (event: Event, relays: string[]) => Promise<boolean>): void {
  91      this.publishFn = fn
  92    }
  93  
  94    /**
  95     * Queue an event for propagation to the user's relays
  96     */
  97    async queueForPropagation(
  98      event: Event,
  99      targetRelays: string[],
 100      currentUserPubkey: string
 101    ): Promise<void> {
 102      // Skip if not a propagatable kind
 103      if (!this.propagatableKinds.includes(event.kind)) {
 104        return
 105      }
 106  
 107      // Skip own events (they're already on user's relays)
 108      if (event.pubkey === currentUserPubkey) {
 109        return
 110      }
 111  
 112      // Skip if no target relays
 113      if (targetRelays.length === 0) {
 114        return
 115      }
 116  
 117      await this.init()
 118  
 119      // Check if recently propagated
 120      const recentlyPropagated = await this.getPropagatedEntry(event.id)
 121      if (recentlyPropagated) {
 122        const age = Date.now() - recentlyPropagated.timestamp
 123        if (age < this.propagationCooldownMs) {
 124          return
 125        }
 126      }
 127  
 128      // Add to queue
 129      const job: PropagationJob = {
 130        id: event.id,
 131        event,
 132        targetRelays,
 133        addedAt: Date.now(),
 134        attempts: 0
 135      }
 136  
 137      await this.putJob(job)
 138  
 139      // Trigger processing (don't await - let it run in background)
 140      this.processQueue().catch(console.error)
 141    }
 142  
 143    /**
 144     * Process the propagation queue
 145     */
 146    async processQueue(): Promise<void> {
 147      if (this.isProcessing || !this.publishFn) {
 148        return
 149      }
 150  
 151      this.isProcessing = true
 152  
 153      try {
 154        await this.init()
 155        const jobs = await this.getAllJobs()
 156  
 157        for (const job of jobs) {
 158          try {
 159            // Try to publish
 160            const success = await this.publishFn(job.event, job.targetRelays)
 161  
 162            if (success) {
 163              // Mark as propagated and remove from queue
 164              await this.markAsPropagated(job.id)
 165              await this.deleteJob(job.id)
 166            } else {
 167              // Increment attempts
 168              job.attempts++
 169              if (job.attempts >= this.maxAttempts) {
 170                // Give up after max attempts
 171                await this.deleteJob(job.id)
 172              } else {
 173                await this.putJob(job)
 174              }
 175            }
 176          } catch (error) {
 177            console.warn('Propagation failed for event:', job.id, error)
 178            job.attempts++
 179            if (job.attempts >= this.maxAttempts) {
 180              await this.deleteJob(job.id)
 181            } else {
 182              await this.putJob(job)
 183            }
 184          }
 185        }
 186  
 187        // Clean up old propagated entries
 188        await this.cleanupOldEntries()
 189      } finally {
 190        this.isProcessing = false
 191      }
 192    }
 193  
 194    /**
 195     * Get queue size for debugging/monitoring
 196     */
 197    async getQueueSize(): Promise<number> {
 198      await this.init()
 199      return new Promise((resolve, reject) => {
 200        if (!this.db) {
 201          return resolve(0)
 202        }
 203  
 204        const transaction = this.db.transaction(QUEUE_STORE, 'readonly')
 205        const store = transaction.objectStore(QUEUE_STORE)
 206        const request = store.count()
 207  
 208        request.onsuccess = () => resolve(request.result)
 209        request.onerror = () => reject(request.error)
 210      })
 211    }
 212  
 213    // IndexedDB helpers
 214  
 215    private putJob(job: PropagationJob): Promise<void> {
 216      return new Promise((resolve, reject) => {
 217        if (!this.db) {
 218          return reject(new Error('Database not initialized'))
 219        }
 220  
 221        const transaction = this.db.transaction(QUEUE_STORE, 'readwrite')
 222        const store = transaction.objectStore(QUEUE_STORE)
 223        const request = store.put(job)
 224  
 225        request.onsuccess = () => resolve()
 226        request.onerror = () => reject(request.error)
 227      })
 228    }
 229  
 230    private deleteJob(id: string): Promise<void> {
 231      return new Promise((resolve, reject) => {
 232        if (!this.db) {
 233          return reject(new Error('Database not initialized'))
 234        }
 235  
 236        const transaction = this.db.transaction(QUEUE_STORE, 'readwrite')
 237        const store = transaction.objectStore(QUEUE_STORE)
 238        const request = store.delete(id)
 239  
 240        request.onsuccess = () => resolve()
 241        request.onerror = () => reject(request.error)
 242      })
 243    }
 244  
 245    private getAllJobs(): Promise<PropagationJob[]> {
 246      return new Promise((resolve, reject) => {
 247        if (!this.db) {
 248          return resolve([])
 249        }
 250  
 251        const transaction = this.db.transaction(QUEUE_STORE, 'readonly')
 252        const store = transaction.objectStore(QUEUE_STORE)
 253        const request = store.getAll()
 254  
 255        request.onsuccess = () => resolve(request.result || [])
 256        request.onerror = () => reject(request.error)
 257      })
 258    }
 259  
 260    private getPropagatedEntry(id: string): Promise<PropagatedEntry | null> {
 261      return new Promise((resolve, reject) => {
 262        if (!this.db) {
 263          return resolve(null)
 264        }
 265  
 266        const transaction = this.db.transaction(PROPAGATED_STORE, 'readonly')
 267        const store = transaction.objectStore(PROPAGATED_STORE)
 268        const request = store.get(id)
 269  
 270        request.onsuccess = () => resolve(request.result || null)
 271        request.onerror = () => reject(request.error)
 272      })
 273    }
 274  
 275    private markAsPropagated(id: string): Promise<void> {
 276      return new Promise((resolve, reject) => {
 277        if (!this.db) {
 278          return reject(new Error('Database not initialized'))
 279        }
 280  
 281        const transaction = this.db.transaction(PROPAGATED_STORE, 'readwrite')
 282        const store = transaction.objectStore(PROPAGATED_STORE)
 283        const entry: PropagatedEntry = { id, timestamp: Date.now() }
 284        const request = store.put(entry)
 285  
 286        request.onsuccess = () => resolve()
 287        request.onerror = () => reject(request.error)
 288      })
 289    }
 290  
 291    private async cleanupOldEntries(): Promise<void> {
 292      if (!this.db) {
 293        return
 294      }
 295  
 296      const weekAgo = Date.now() - 7 * 24 * 60 * 60 * 1000
 297  
 298      return new Promise((resolve) => {
 299        const transaction = this.db!.transaction(PROPAGATED_STORE, 'readwrite')
 300        const store = transaction.objectStore(PROPAGATED_STORE)
 301        const index = store.index('timestamp')
 302        const range = IDBKeyRange.upperBound(weekAgo)
 303        const request = index.openCursor(range)
 304  
 305        request.onsuccess = (event) => {
 306          const cursor = (event.target as IDBRequest).result
 307          if (cursor) {
 308            cursor.delete()
 309            cursor.continue()
 310          } else {
 311            resolve()
 312          }
 313        }
 314  
 315        request.onerror = () => resolve()
 316      })
 317    }
 318  }
 319  
 320  const propagationQueueService = new PropagationQueueService()
 321  export default propagationQueueService
 322