thread.service.ts raw

   1  import { ExtendedKind } from '@/constants'
   2  import {
   3    getEventKey,
   4    getKeyFromTag,
   5    getParentTag,
   6    getReplaceableCoordinateFromEvent,
   7    getRootTag,
   8    isReplaceableEvent,
   9    isReplyNoteEvent
  10  } from '@/lib/event'
  11  import { generateBech32IdFromETag } from '@/lib/tag'
  12  import client from '@/services/client.service'
  13  import graphQueryService from '@/services/graph-query.service'
  14  import dayjs from 'dayjs'
  15  import { Filter, kinds, NostrEvent } from 'nostr-tools'
  16  
  17  type TRootInfo =
  18    | { type: 'E'; id: string; pubkey: string }
  19    | { type: 'A'; id: string; pubkey: string; relay?: string }
  20    | { type: 'I'; id: string }
  21  
  22  class ThreadService {
  23    static instance: ThreadService
  24  
  25    private rootInfoCache = new Map<string, Promise<TRootInfo | undefined>>()
  26    private subscriptions = new Map<
  27      string,
  28      {
  29        promise: Promise<{
  30          closer: () => void
  31          timelineKey: string
  32        }>
  33        count: number
  34        until?: number
  35      }
  36    >()
  37    private threadMap = new Map<string, NostrEvent[]>()
  38    private processedReplyKeys = new Set<string>()
  39    private parentKeyMap = new Map<string, string>()
  40    private descendantCache = new Map<string, Map<string, NostrEvent[]>>()
  41  
  42    private threadListeners = new Map<string, Set<() => void>>()
  43    private allDescendantThreadsListeners = new Map<string, Set<() => void>>()
  44    private readonly EMPTY_ARRAY: NostrEvent[] = []
  45    private readonly EMPTY_MAP: Map<string, NostrEvent[]> = new Map()
  46  
  47    constructor() {
  48      if (!ThreadService.instance) {
  49        ThreadService.instance = this
  50      }
  51      return ThreadService.instance
  52    }
  53  
  54    async subscribe(stuff: NostrEvent | string, limit = 100) {
  55      const { event } = this.resolveStuff(stuff)
  56      const rootInfo = await this.parseRootInfo(stuff)
  57      if (!rootInfo) return
  58  
  59      const subscription = this.subscriptions.get(rootInfo.id)
  60      if (subscription) {
  61        subscription.count += 1
  62        return
  63      }
  64  
  65      // Try graph query first for E-tag threads (event ID based)
  66      if (rootInfo.type === 'E') {
  67        const graphResult = await this.tryGraphQueryThread(rootInfo.id)
  68        if (graphResult) {
  69          // Graph query succeeded, no need to subscribe
  70          this.subscriptions.set(rootInfo.id, {
  71            promise: Promise.resolve({ closer: () => {}, timelineKey: '' }),
  72            count: 1,
  73            until: undefined // Graph queries return complete results
  74          })
  75          return
  76        }
  77      }
  78  
  79      const _subscribe = async () => {
  80        const relaySet = new Set<string>()
  81        const rootPubkey = (rootInfo as { pubkey?: string }).pubkey ?? event?.pubkey
  82  
  83        // Phase 3: Improved relay selection for thread loading
  84        // 1. Add relay hint from rootInfo (for replaceable events)
  85        if (rootInfo.type === 'A' && rootInfo.relay) {
  86          relaySet.add(rootInfo.relay)
  87        }
  88  
  89        // 2. Add relays where we saw this event
  90        if (event) {
  91          const seenOn = client.getSeenEventRelayUrls(event.id)
  92          seenOn.forEach((url) => relaySet.add(url))
  93        }
  94  
  95        // 3. Add author's relays
  96        if (rootPubkey) {
  97          const relayList = await client.fetchRelayList(rootPubkey)
  98          relayList.read.slice(0, 4).forEach((url) => relaySet.add(url))
  99        }
 100  
 101        // 4. Fall back to current user's relays if needed
 102        if (relaySet.size === 0) {
 103          client.currentRelays.forEach((url) => relaySet.add(url))
 104        }
 105  
 106        let relayUrls = Array.from(relaySet).slice(0, 8)
 107  
 108        const filters: (Omit<Filter, 'since' | 'until'> & {
 109          limit: number
 110        })[] = []
 111        if (rootInfo.type === 'E') {
 112          filters.push({
 113            '#e': [rootInfo.id],
 114            kinds: [kinds.ShortTextNote],
 115            limit
 116          })
 117          if (event?.kind !== kinds.ShortTextNote) {
 118            filters.push({
 119              '#E': [rootInfo.id],
 120              kinds: [ExtendedKind.COMMENT, ExtendedKind.VOICE_COMMENT],
 121              limit
 122            })
 123          }
 124        } else if (rootInfo.type === 'A') {
 125          filters.push(
 126            {
 127              '#a': [rootInfo.id],
 128              kinds: [kinds.ShortTextNote],
 129              limit
 130            },
 131            {
 132              '#A': [rootInfo.id],
 133              kinds: [ExtendedKind.COMMENT, ExtendedKind.VOICE_COMMENT],
 134              limit
 135            }
 136          )
 137        } else {
 138          filters.push({
 139            '#I': [rootInfo.id],
 140            kinds: [ExtendedKind.COMMENT, ExtendedKind.VOICE_COMMENT],
 141            limit
 142          })
 143        }
 144        let resolve: () => void
 145        const _promise = new Promise<void>((res) => {
 146          resolve = res
 147        })
 148        const { closer, timelineKey } = await client.subscribeTimeline(
 149          filters.map((filter) => ({
 150            urls: relayUrls.slice(0, 8),
 151            filter
 152          })),
 153          {
 154            onEvents: (events, eosed) => {
 155              if (events.length > 0) {
 156                this.addRepliesToThread(events)
 157              }
 158              if (eosed) {
 159                const subscription = this.subscriptions.get(rootInfo.id)
 160                if (subscription && events.length > 0) {
 161                  subscription.until = events[events.length - 1].created_at - 1
 162                }
 163                resolve()
 164              }
 165            },
 166            onNew: (evt) => {
 167              this.addRepliesToThread([evt])
 168            }
 169          }
 170        )
 171        await _promise
 172        return { closer, timelineKey }
 173      }
 174  
 175      const promise = _subscribe()
 176      this.subscriptions.set(rootInfo.id, {
 177        promise,
 178        count: 1,
 179        until: dayjs().unix()
 180      })
 181      await promise
 182    }
 183  
 184    async unsubscribe(stuff: NostrEvent | string) {
 185      const rootInfo = await this.parseRootInfo(stuff)
 186      if (!rootInfo) return
 187  
 188      const subscription = this.subscriptions.get(rootInfo.id)
 189      if (!subscription) return
 190  
 191      setTimeout(() => {
 192        subscription.count -= 1
 193        if (subscription.count <= 0) {
 194          this.subscriptions.delete(rootInfo.id)
 195          subscription.promise.then(({ closer }) => {
 196            closer()
 197          })
 198        }
 199      }, 2000)
 200    }
 201  
 202    async loadMore(stuff: NostrEvent | string, limit = 100): Promise<boolean> {
 203      const rootInfo = await this.parseRootInfo(stuff)
 204      if (!rootInfo) return false
 205  
 206      const subscription = this.subscriptions.get(rootInfo.id)
 207      if (!subscription) return false
 208  
 209      const { timelineKey } = await subscription.promise
 210      if (!timelineKey) return false
 211  
 212      if (!subscription.until) return false
 213  
 214      const events = await client.loadMoreTimeline(timelineKey, subscription.until, limit)
 215      this.addRepliesToThread(events)
 216  
 217      const { event } = this.resolveStuff(stuff)
 218      let newUntil = events.length ? events[events.length - 1].created_at - 1 : undefined
 219      if (newUntil && event && !isReplaceableEvent(event.kind) && newUntil < event.created_at) {
 220        newUntil = undefined
 221      }
 222      subscription.until = newUntil
 223      return !!newUntil
 224    }
 225  
 226    addRepliesToThread(replies: NostrEvent[]) {
 227      const newReplyEventMap = new Map<string, NostrEvent[]>()
 228      replies.forEach((reply) => {
 229        const key = getEventKey(reply)
 230        if (this.processedReplyKeys.has(key)) return
 231        this.processedReplyKeys.add(key)
 232  
 233        if (!isReplyNoteEvent(reply)) return
 234  
 235        const parentTag = getParentTag(reply)
 236        if (parentTag) {
 237          const parentKey = getKeyFromTag(parentTag.tag)
 238          if (parentKey) {
 239            const thread = newReplyEventMap.get(parentKey) ?? []
 240            thread.push(reply)
 241            newReplyEventMap.set(parentKey, thread)
 242            this.parentKeyMap.set(key, parentKey)
 243          }
 244        }
 245      })
 246      if (newReplyEventMap.size === 0) return
 247  
 248      for (const [key, newReplyEvents] of newReplyEventMap.entries()) {
 249        const thread = this.threadMap.get(key) ?? []
 250        thread.push(...newReplyEvents)
 251        this.threadMap.set(key, thread)
 252      }
 253  
 254      this.descendantCache.clear()
 255      for (const key of newReplyEventMap.keys()) {
 256        this.notifyThreadUpdate(key)
 257        this.notifyAllDescendantThreadsUpdate(key)
 258      }
 259    }
 260  
 261    getThread(stuffKey: string): NostrEvent[] {
 262      return this.threadMap.get(stuffKey) ?? this.EMPTY_ARRAY
 263    }
 264  
 265    getAllDescendantThreads(stuffKey: string): Map<string, NostrEvent[]> {
 266      const cached = this.descendantCache.get(stuffKey)
 267      if (cached) return cached
 268  
 269      const build = () => {
 270        const thread = this.threadMap.get(stuffKey)
 271        if (!thread || thread.length === 0) {
 272          return this.EMPTY_MAP
 273        }
 274  
 275        const result = new Map<string, NostrEvent[]>()
 276        const keys: string[] = [stuffKey]
 277        while (keys.length > 0) {
 278          const key = keys.pop()!
 279          const thread = this.threadMap.get(key) ?? []
 280          if (thread.length > 0) {
 281            result.set(key, thread)
 282            thread.forEach((reply) => {
 283              const replyKey = getEventKey(reply)
 284              keys.push(replyKey)
 285            })
 286          }
 287        }
 288        return result
 289      }
 290  
 291      const allThreads = build()
 292      this.descendantCache.set(stuffKey, allThreads)
 293      return allThreads
 294    }
 295  
 296    listenThread(key: string, callback: () => void) {
 297      let set = this.threadListeners.get(key)
 298      if (!set) {
 299        set = new Set()
 300        this.threadListeners.set(key, set)
 301      }
 302      set.add(callback)
 303      return () => {
 304        set?.delete(callback)
 305        if (set?.size === 0) this.threadListeners.delete(key)
 306      }
 307    }
 308  
 309    private notifyThreadUpdate(key: string) {
 310      const set = this.threadListeners.get(key)
 311      if (set) {
 312        set.forEach((cb) => cb())
 313      }
 314    }
 315  
 316    listenAllDescendantThreads(key: string, callback: () => void) {
 317      let set = this.allDescendantThreadsListeners.get(key)
 318      if (!set) {
 319        set = new Set()
 320        this.allDescendantThreadsListeners.set(key, set)
 321      }
 322      set.add(callback)
 323      return () => {
 324        set?.delete(callback)
 325        if (set?.size === 0) this.allDescendantThreadsListeners.delete(key)
 326      }
 327    }
 328  
 329    private notifyAllDescendantThreadsUpdate(key: string) {
 330      const notify = (_key: string) => {
 331        const set = this.allDescendantThreadsListeners.get(_key)
 332        if (set) {
 333          set.forEach((cb) => cb())
 334        }
 335      }
 336  
 337      notify(key)
 338      let parentKey = this.parentKeyMap.get(key)
 339      while (parentKey) {
 340        notify(parentKey)
 341        parentKey = this.parentKeyMap.get(parentKey)
 342      }
 343    }
 344  
 345    private async parseRootInfo(stuff: NostrEvent | string): Promise<TRootInfo | undefined> {
 346      const { event, externalContent } = this.resolveStuff(stuff)
 347      if (!event && !externalContent) return
 348  
 349      const cacheKey = event ? getEventKey(event) : externalContent!
 350      const cache = this.rootInfoCache.get(cacheKey)
 351      if (cache) return cache
 352  
 353      const _parseRootInfo = async (): Promise<TRootInfo | undefined> => {
 354        let root: TRootInfo = event
 355          ? isReplaceableEvent(event.kind)
 356            ? {
 357                type: 'A',
 358                id: getReplaceableCoordinateFromEvent(event),
 359                pubkey: event.pubkey,
 360                relay: client.getEventHint(event.id)
 361              }
 362            : { type: 'E', id: event.id, pubkey: event.pubkey }
 363          : { type: 'I', id: externalContent! }
 364  
 365        const rootTag = getRootTag(event)
 366        if (rootTag?.type === 'e') {
 367          const [, rootEventHexId, , , rootEventPubkey] = rootTag.tag
 368          if (rootEventHexId && rootEventPubkey) {
 369            root = { type: 'E', id: rootEventHexId, pubkey: rootEventPubkey }
 370          } else {
 371            const rootEventId = generateBech32IdFromETag(rootTag.tag)
 372            if (rootEventId) {
 373              const rootEvent = await client.fetchEvent(rootEventId)
 374              if (rootEvent) {
 375                root = { type: 'E', id: rootEvent.id, pubkey: rootEvent.pubkey }
 376              }
 377            }
 378          }
 379        } else if (rootTag?.type === 'a') {
 380          const [, coordinate, relay] = rootTag.tag
 381          const [, pubkey] = coordinate.split(':')
 382          root = { type: 'A', id: coordinate, pubkey, relay }
 383        } else if (rootTag?.type === 'i') {
 384          root = { type: 'I', id: rootTag.tag[1] }
 385        }
 386        return root
 387      }
 388  
 389      const promise = _parseRootInfo()
 390      this.rootInfoCache.set(cacheKey, promise)
 391      return promise
 392    }
 393  
 394    /**
 395     * Try to fetch thread events using graph query (NIP-XX).
 396     * Returns true if successful, false otherwise.
 397     */
 398    private async tryGraphQueryThread(eventId: string): Promise<boolean> {
 399      try {
 400        // Use current user's relays for graph queries
 401        const relays = client.currentRelays.length > 0 ? client.currentRelays : []
 402        if (relays.length === 0) {
 403          return false
 404        }
 405  
 406        const graphResult = await graphQueryService.queryThread(
 407          relays,
 408          eventId,
 409          10, // Max depth for threads
 410          {
 411            inboundRefKinds: [7, 9735] // Reactions and zaps
 412          }
 413        )
 414  
 415        if (!graphResult?.events_by_depth?.length) {
 416          return false
 417        }
 418  
 419        // Graph query returns event IDs by depth
 420        // We need to fetch the actual events and add them to the thread
 421        const allEventIds = graphResult.events_by_depth.flat()
 422        if (allEventIds.length === 0) {
 423          return false
 424        }
 425  
 426        // Fetch actual events for the IDs returned by graph query
 427        const events = await client.fetchEvents(relays, {
 428          ids: allEventIds.slice(0, 500), // Limit to prevent huge queries
 429          limit: allEventIds.length
 430        })
 431  
 432        if (events.length > 0) {
 433          this.addRepliesToThread(events)
 434          return true
 435        }
 436  
 437        return false
 438      } catch (error) {
 439        console.error('Graph query for thread failed:', error)
 440        return false
 441      }
 442    }
 443  
 444    private resolveStuff(stuff: NostrEvent | string) {
 445      return typeof stuff === 'string'
 446        ? { event: undefined, externalContent: stuff, stuffKey: stuff }
 447        : { event: stuff, externalContent: undefined, stuffKey: getEventKey(stuff) }
 448    }
 449  }
 450  
 451  const instance = new ThreadService()
 452  
 453  export default instance
 454