import { ExtendedKind } from '@/constants' import { getEventKey, getKeyFromTag, getParentTag, getReplaceableCoordinateFromEvent, getRootTag, isReplaceableEvent, isReplyNoteEvent } from '@/lib/event' import { generateBech32IdFromETag } from '@/lib/tag' import client from '@/services/client.service' import graphQueryService from '@/services/graph-query.service' import dayjs from 'dayjs' import { Filter, kinds, NostrEvent } from 'nostr-tools' type TRootInfo = | { type: 'E'; id: string; pubkey: string } | { type: 'A'; id: string; pubkey: string; relay?: string } | { type: 'I'; id: string } class ThreadService { static instance: ThreadService private rootInfoCache = new Map>() private subscriptions = new Map< string, { promise: Promise<{ closer: () => void timelineKey: string }> count: number until?: number } >() private threadMap = new Map() private processedReplyKeys = new Set() private parentKeyMap = new Map() private descendantCache = new Map>() private threadListeners = new Map void>>() private allDescendantThreadsListeners = new Map void>>() private readonly EMPTY_ARRAY: NostrEvent[] = [] private readonly EMPTY_MAP: Map = new Map() constructor() { if (!ThreadService.instance) { ThreadService.instance = this } return ThreadService.instance } async subscribe(stuff: NostrEvent | string, limit = 100) { const { event } = this.resolveStuff(stuff) const rootInfo = await this.parseRootInfo(stuff) if (!rootInfo) return const subscription = this.subscriptions.get(rootInfo.id) if (subscription) { subscription.count += 1 return } // Try graph query first for E-tag threads (event ID based) if (rootInfo.type === 'E') { const graphResult = await this.tryGraphQueryThread(rootInfo.id) if (graphResult) { // Graph query succeeded, no need to subscribe this.subscriptions.set(rootInfo.id, { promise: Promise.resolve({ closer: () => {}, timelineKey: '' }), count: 1, until: undefined // Graph queries return complete results }) return } } const _subscribe = async () => { const relaySet = new Set() const rootPubkey = (rootInfo as { pubkey?: string }).pubkey ?? event?.pubkey // Phase 3: Improved relay selection for thread loading // 1. Add relay hint from rootInfo (for replaceable events) if (rootInfo.type === 'A' && rootInfo.relay) { relaySet.add(rootInfo.relay) } // 2. Add relays where we saw this event if (event) { const seenOn = client.getSeenEventRelayUrls(event.id) seenOn.forEach((url) => relaySet.add(url)) } // 3. Add author's relays if (rootPubkey) { const relayList = await client.fetchRelayList(rootPubkey) relayList.read.slice(0, 4).forEach((url) => relaySet.add(url)) } // 4. Fall back to current user's relays if needed if (relaySet.size === 0) { client.currentRelays.forEach((url) => relaySet.add(url)) } let relayUrls = Array.from(relaySet).slice(0, 8) const filters: (Omit & { limit: number })[] = [] if (rootInfo.type === 'E') { filters.push({ '#e': [rootInfo.id], kinds: [kinds.ShortTextNote], limit }) if (event?.kind !== kinds.ShortTextNote) { filters.push({ '#E': [rootInfo.id], kinds: [ExtendedKind.COMMENT, ExtendedKind.VOICE_COMMENT], limit }) } } else if (rootInfo.type === 'A') { filters.push( { '#a': [rootInfo.id], kinds: [kinds.ShortTextNote], limit }, { '#A': [rootInfo.id], kinds: [ExtendedKind.COMMENT, ExtendedKind.VOICE_COMMENT], limit } ) } else { filters.push({ '#I': [rootInfo.id], kinds: [ExtendedKind.COMMENT, ExtendedKind.VOICE_COMMENT], limit }) } let resolve: () => void const _promise = new Promise((res) => { resolve = res }) const { closer, timelineKey } = await client.subscribeTimeline( filters.map((filter) => ({ urls: relayUrls.slice(0, 8), filter })), { onEvents: (events, eosed) => { if (events.length > 0) { this.addRepliesToThread(events) } if (eosed) { const subscription = this.subscriptions.get(rootInfo.id) if (subscription && events.length > 0) { subscription.until = events[events.length - 1].created_at - 1 } resolve() } }, onNew: (evt) => { this.addRepliesToThread([evt]) } } ) await _promise return { closer, timelineKey } } const promise = _subscribe() this.subscriptions.set(rootInfo.id, { promise, count: 1, until: dayjs().unix() }) await promise } async unsubscribe(stuff: NostrEvent | string) { const rootInfo = await this.parseRootInfo(stuff) if (!rootInfo) return const subscription = this.subscriptions.get(rootInfo.id) if (!subscription) return setTimeout(() => { subscription.count -= 1 if (subscription.count <= 0) { this.subscriptions.delete(rootInfo.id) subscription.promise.then(({ closer }) => { closer() }) } }, 2000) } async loadMore(stuff: NostrEvent | string, limit = 100): Promise { const rootInfo = await this.parseRootInfo(stuff) if (!rootInfo) return false const subscription = this.subscriptions.get(rootInfo.id) if (!subscription) return false const { timelineKey } = await subscription.promise if (!timelineKey) return false if (!subscription.until) return false const events = await client.loadMoreTimeline(timelineKey, subscription.until, limit) this.addRepliesToThread(events) const { event } = this.resolveStuff(stuff) let newUntil = events.length ? events[events.length - 1].created_at - 1 : undefined if (newUntil && event && !isReplaceableEvent(event.kind) && newUntil < event.created_at) { newUntil = undefined } subscription.until = newUntil return !!newUntil } addRepliesToThread(replies: NostrEvent[]) { const newReplyEventMap = new Map() replies.forEach((reply) => { const key = getEventKey(reply) if (this.processedReplyKeys.has(key)) return this.processedReplyKeys.add(key) if (!isReplyNoteEvent(reply)) return const parentTag = getParentTag(reply) if (parentTag) { const parentKey = getKeyFromTag(parentTag.tag) if (parentKey) { const thread = newReplyEventMap.get(parentKey) ?? [] thread.push(reply) newReplyEventMap.set(parentKey, thread) this.parentKeyMap.set(key, parentKey) } } }) if (newReplyEventMap.size === 0) return for (const [key, newReplyEvents] of newReplyEventMap.entries()) { const thread = this.threadMap.get(key) ?? [] thread.push(...newReplyEvents) this.threadMap.set(key, thread) } this.descendantCache.clear() for (const key of newReplyEventMap.keys()) { this.notifyThreadUpdate(key) this.notifyAllDescendantThreadsUpdate(key) } } getThread(stuffKey: string): NostrEvent[] { return this.threadMap.get(stuffKey) ?? this.EMPTY_ARRAY } getAllDescendantThreads(stuffKey: string): Map { const cached = this.descendantCache.get(stuffKey) if (cached) return cached const build = () => { const thread = this.threadMap.get(stuffKey) if (!thread || thread.length === 0) { return this.EMPTY_MAP } const result = new Map() const keys: string[] = [stuffKey] while (keys.length > 0) { const key = keys.pop()! const thread = this.threadMap.get(key) ?? [] if (thread.length > 0) { result.set(key, thread) thread.forEach((reply) => { const replyKey = getEventKey(reply) keys.push(replyKey) }) } } return result } const allThreads = build() this.descendantCache.set(stuffKey, allThreads) return allThreads } listenThread(key: string, callback: () => void) { let set = this.threadListeners.get(key) if (!set) { set = new Set() this.threadListeners.set(key, set) } set.add(callback) return () => { set?.delete(callback) if (set?.size === 0) this.threadListeners.delete(key) } } private notifyThreadUpdate(key: string) { const set = this.threadListeners.get(key) if (set) { set.forEach((cb) => cb()) } } listenAllDescendantThreads(key: string, callback: () => void) { let set = this.allDescendantThreadsListeners.get(key) if (!set) { set = new Set() this.allDescendantThreadsListeners.set(key, set) } set.add(callback) return () => { set?.delete(callback) if (set?.size === 0) this.allDescendantThreadsListeners.delete(key) } } private notifyAllDescendantThreadsUpdate(key: string) { const notify = (_key: string) => { const set = this.allDescendantThreadsListeners.get(_key) if (set) { set.forEach((cb) => cb()) } } notify(key) let parentKey = this.parentKeyMap.get(key) while (parentKey) { notify(parentKey) parentKey = this.parentKeyMap.get(parentKey) } } private async parseRootInfo(stuff: NostrEvent | string): Promise { const { event, externalContent } = this.resolveStuff(stuff) if (!event && !externalContent) return const cacheKey = event ? getEventKey(event) : externalContent! const cache = this.rootInfoCache.get(cacheKey) if (cache) return cache const _parseRootInfo = async (): Promise => { let root: TRootInfo = event ? isReplaceableEvent(event.kind) ? { type: 'A', id: getReplaceableCoordinateFromEvent(event), pubkey: event.pubkey, relay: client.getEventHint(event.id) } : { type: 'E', id: event.id, pubkey: event.pubkey } : { type: 'I', id: externalContent! } const rootTag = getRootTag(event) if (rootTag?.type === 'e') { const [, rootEventHexId, , , rootEventPubkey] = rootTag.tag if (rootEventHexId && rootEventPubkey) { root = { type: 'E', id: rootEventHexId, pubkey: rootEventPubkey } } else { const rootEventId = generateBech32IdFromETag(rootTag.tag) if (rootEventId) { const rootEvent = await client.fetchEvent(rootEventId) if (rootEvent) { root = { type: 'E', id: rootEvent.id, pubkey: rootEvent.pubkey } } } } } else if (rootTag?.type === 'a') { const [, coordinate, relay] = rootTag.tag const [, pubkey] = coordinate.split(':') root = { type: 'A', id: coordinate, pubkey, relay } } else if (rootTag?.type === 'i') { root = { type: 'I', id: rootTag.tag[1] } } return root } const promise = _parseRootInfo() this.rootInfoCache.set(cacheKey, promise) return promise } /** * Try to fetch thread events using graph query (NIP-XX). * Returns true if successful, false otherwise. */ private async tryGraphQueryThread(eventId: string): Promise { try { // Use current user's relays for graph queries const relays = client.currentRelays.length > 0 ? client.currentRelays : [] if (relays.length === 0) { return false } const graphResult = await graphQueryService.queryThread( relays, eventId, 10, // Max depth for threads { inboundRefKinds: [7, 9735] // Reactions and zaps } ) if (!graphResult?.events_by_depth?.length) { return false } // Graph query returns event IDs by depth // We need to fetch the actual events and add them to the thread const allEventIds = graphResult.events_by_depth.flat() if (allEventIds.length === 0) { return false } // Fetch actual events for the IDs returned by graph query const events = await client.fetchEvents(relays, { ids: allEventIds.slice(0, 500), // Limit to prevent huge queries limit: allEventIds.length }) if (events.length > 0) { this.addRepliesToThread(events) return true } return false } catch (error) { console.error('Graph query for thread failed:', error) return false } } private resolveStuff(stuff: NostrEvent | string) { return typeof stuff === 'string' ? { event: undefined, externalContent: stuff, stuffKey: stuff } : { event: stuff, externalContent: undefined, stuffKey: getEventKey(stuff) } } } const instance = new ThreadService() export default instance