thread.service.ts raw
1 import { ExtendedKind } from '@/constants'
2 import {
3 getEventKey,
4 getKeyFromTag,
5 getParentTag,
6 getReplaceableCoordinateFromEvent,
7 getRootTag,
8 isReplaceableEvent,
9 isReplyNoteEvent
10 } from '@/lib/event'
11 import { generateBech32IdFromETag } from '@/lib/tag'
12 import client from '@/services/client.service'
13 import graphQueryService from '@/services/graph-query.service'
14 import dayjs from 'dayjs'
15 import { Filter, kinds, NostrEvent } from 'nostr-tools'
16
17 type TRootInfo =
18 | { type: 'E'; id: string; pubkey: string }
19 | { type: 'A'; id: string; pubkey: string; relay?: string }
20 | { type: 'I'; id: string }
21
22 class ThreadService {
23 static instance: ThreadService
24
25 private rootInfoCache = new Map<string, Promise<TRootInfo | undefined>>()
26 private subscriptions = new Map<
27 string,
28 {
29 promise: Promise<{
30 closer: () => void
31 timelineKey: string
32 }>
33 count: number
34 until?: number
35 }
36 >()
37 private threadMap = new Map<string, NostrEvent[]>()
38 private processedReplyKeys = new Set<string>()
39 private parentKeyMap = new Map<string, string>()
40 private descendantCache = new Map<string, Map<string, NostrEvent[]>>()
41
42 private threadListeners = new Map<string, Set<() => void>>()
43 private allDescendantThreadsListeners = new Map<string, Set<() => void>>()
44 private readonly EMPTY_ARRAY: NostrEvent[] = []
45 private readonly EMPTY_MAP: Map<string, NostrEvent[]> = new Map()
46
47 constructor() {
48 if (!ThreadService.instance) {
49 ThreadService.instance = this
50 }
51 return ThreadService.instance
52 }
53
54 async subscribe(stuff: NostrEvent | string, limit = 100) {
55 const { event } = this.resolveStuff(stuff)
56 const rootInfo = await this.parseRootInfo(stuff)
57 if (!rootInfo) return
58
59 const subscription = this.subscriptions.get(rootInfo.id)
60 if (subscription) {
61 subscription.count += 1
62 return
63 }
64
65 // Try graph query first for E-tag threads (event ID based)
66 if (rootInfo.type === 'E') {
67 const graphResult = await this.tryGraphQueryThread(rootInfo.id)
68 if (graphResult) {
69 // Graph query succeeded, no need to subscribe
70 this.subscriptions.set(rootInfo.id, {
71 promise: Promise.resolve({ closer: () => {}, timelineKey: '' }),
72 count: 1,
73 until: undefined // Graph queries return complete results
74 })
75 return
76 }
77 }
78
79 const _subscribe = async () => {
80 const relaySet = new Set<string>()
81 const rootPubkey = (rootInfo as { pubkey?: string }).pubkey ?? event?.pubkey
82
83 // Phase 3: Improved relay selection for thread loading
84 // 1. Add relay hint from rootInfo (for replaceable events)
85 if (rootInfo.type === 'A' && rootInfo.relay) {
86 relaySet.add(rootInfo.relay)
87 }
88
89 // 2. Add relays where we saw this event
90 if (event) {
91 const seenOn = client.getSeenEventRelayUrls(event.id)
92 seenOn.forEach((url) => relaySet.add(url))
93 }
94
95 // 3. Add author's relays
96 if (rootPubkey) {
97 const relayList = await client.fetchRelayList(rootPubkey)
98 relayList.read.slice(0, 4).forEach((url) => relaySet.add(url))
99 }
100
101 // 4. Fall back to current user's relays if needed
102 if (relaySet.size === 0) {
103 client.currentRelays.forEach((url) => relaySet.add(url))
104 }
105
106 let relayUrls = Array.from(relaySet).slice(0, 8)
107
108 const filters: (Omit<Filter, 'since' | 'until'> & {
109 limit: number
110 })[] = []
111 if (rootInfo.type === 'E') {
112 filters.push({
113 '#e': [rootInfo.id],
114 kinds: [kinds.ShortTextNote],
115 limit
116 })
117 if (event?.kind !== kinds.ShortTextNote) {
118 filters.push({
119 '#E': [rootInfo.id],
120 kinds: [ExtendedKind.COMMENT, ExtendedKind.VOICE_COMMENT],
121 limit
122 })
123 }
124 } else if (rootInfo.type === 'A') {
125 filters.push(
126 {
127 '#a': [rootInfo.id],
128 kinds: [kinds.ShortTextNote],
129 limit
130 },
131 {
132 '#A': [rootInfo.id],
133 kinds: [ExtendedKind.COMMENT, ExtendedKind.VOICE_COMMENT],
134 limit
135 }
136 )
137 } else {
138 filters.push({
139 '#I': [rootInfo.id],
140 kinds: [ExtendedKind.COMMENT, ExtendedKind.VOICE_COMMENT],
141 limit
142 })
143 }
144 let resolve: () => void
145 const _promise = new Promise<void>((res) => {
146 resolve = res
147 })
148 const { closer, timelineKey } = await client.subscribeTimeline(
149 filters.map((filter) => ({
150 urls: relayUrls.slice(0, 8),
151 filter
152 })),
153 {
154 onEvents: (events, eosed) => {
155 if (events.length > 0) {
156 this.addRepliesToThread(events)
157 }
158 if (eosed) {
159 const subscription = this.subscriptions.get(rootInfo.id)
160 if (subscription && events.length > 0) {
161 subscription.until = events[events.length - 1].created_at - 1
162 }
163 resolve()
164 }
165 },
166 onNew: (evt) => {
167 this.addRepliesToThread([evt])
168 }
169 }
170 )
171 await _promise
172 return { closer, timelineKey }
173 }
174
175 const promise = _subscribe()
176 this.subscriptions.set(rootInfo.id, {
177 promise,
178 count: 1,
179 until: dayjs().unix()
180 })
181 await promise
182 }
183
184 async unsubscribe(stuff: NostrEvent | string) {
185 const rootInfo = await this.parseRootInfo(stuff)
186 if (!rootInfo) return
187
188 const subscription = this.subscriptions.get(rootInfo.id)
189 if (!subscription) return
190
191 setTimeout(() => {
192 subscription.count -= 1
193 if (subscription.count <= 0) {
194 this.subscriptions.delete(rootInfo.id)
195 subscription.promise.then(({ closer }) => {
196 closer()
197 })
198 }
199 }, 2000)
200 }
201
202 async loadMore(stuff: NostrEvent | string, limit = 100): Promise<boolean> {
203 const rootInfo = await this.parseRootInfo(stuff)
204 if (!rootInfo) return false
205
206 const subscription = this.subscriptions.get(rootInfo.id)
207 if (!subscription) return false
208
209 const { timelineKey } = await subscription.promise
210 if (!timelineKey) return false
211
212 if (!subscription.until) return false
213
214 const events = await client.loadMoreTimeline(timelineKey, subscription.until, limit)
215 this.addRepliesToThread(events)
216
217 const { event } = this.resolveStuff(stuff)
218 let newUntil = events.length ? events[events.length - 1].created_at - 1 : undefined
219 if (newUntil && event && !isReplaceableEvent(event.kind) && newUntil < event.created_at) {
220 newUntil = undefined
221 }
222 subscription.until = newUntil
223 return !!newUntil
224 }
225
226 addRepliesToThread(replies: NostrEvent[]) {
227 const newReplyEventMap = new Map<string, NostrEvent[]>()
228 replies.forEach((reply) => {
229 const key = getEventKey(reply)
230 if (this.processedReplyKeys.has(key)) return
231 this.processedReplyKeys.add(key)
232
233 if (!isReplyNoteEvent(reply)) return
234
235 const parentTag = getParentTag(reply)
236 if (parentTag) {
237 const parentKey = getKeyFromTag(parentTag.tag)
238 if (parentKey) {
239 const thread = newReplyEventMap.get(parentKey) ?? []
240 thread.push(reply)
241 newReplyEventMap.set(parentKey, thread)
242 this.parentKeyMap.set(key, parentKey)
243 }
244 }
245 })
246 if (newReplyEventMap.size === 0) return
247
248 for (const [key, newReplyEvents] of newReplyEventMap.entries()) {
249 const thread = this.threadMap.get(key) ?? []
250 thread.push(...newReplyEvents)
251 this.threadMap.set(key, thread)
252 }
253
254 this.descendantCache.clear()
255 for (const key of newReplyEventMap.keys()) {
256 this.notifyThreadUpdate(key)
257 this.notifyAllDescendantThreadsUpdate(key)
258 }
259 }
260
261 getThread(stuffKey: string): NostrEvent[] {
262 return this.threadMap.get(stuffKey) ?? this.EMPTY_ARRAY
263 }
264
265 getAllDescendantThreads(stuffKey: string): Map<string, NostrEvent[]> {
266 const cached = this.descendantCache.get(stuffKey)
267 if (cached) return cached
268
269 const build = () => {
270 const thread = this.threadMap.get(stuffKey)
271 if (!thread || thread.length === 0) {
272 return this.EMPTY_MAP
273 }
274
275 const result = new Map<string, NostrEvent[]>()
276 const keys: string[] = [stuffKey]
277 while (keys.length > 0) {
278 const key = keys.pop()!
279 const thread = this.threadMap.get(key) ?? []
280 if (thread.length > 0) {
281 result.set(key, thread)
282 thread.forEach((reply) => {
283 const replyKey = getEventKey(reply)
284 keys.push(replyKey)
285 })
286 }
287 }
288 return result
289 }
290
291 const allThreads = build()
292 this.descendantCache.set(stuffKey, allThreads)
293 return allThreads
294 }
295
296 listenThread(key: string, callback: () => void) {
297 let set = this.threadListeners.get(key)
298 if (!set) {
299 set = new Set()
300 this.threadListeners.set(key, set)
301 }
302 set.add(callback)
303 return () => {
304 set?.delete(callback)
305 if (set?.size === 0) this.threadListeners.delete(key)
306 }
307 }
308
309 private notifyThreadUpdate(key: string) {
310 const set = this.threadListeners.get(key)
311 if (set) {
312 set.forEach((cb) => cb())
313 }
314 }
315
316 listenAllDescendantThreads(key: string, callback: () => void) {
317 let set = this.allDescendantThreadsListeners.get(key)
318 if (!set) {
319 set = new Set()
320 this.allDescendantThreadsListeners.set(key, set)
321 }
322 set.add(callback)
323 return () => {
324 set?.delete(callback)
325 if (set?.size === 0) this.allDescendantThreadsListeners.delete(key)
326 }
327 }
328
329 private notifyAllDescendantThreadsUpdate(key: string) {
330 const notify = (_key: string) => {
331 const set = this.allDescendantThreadsListeners.get(_key)
332 if (set) {
333 set.forEach((cb) => cb())
334 }
335 }
336
337 notify(key)
338 let parentKey = this.parentKeyMap.get(key)
339 while (parentKey) {
340 notify(parentKey)
341 parentKey = this.parentKeyMap.get(parentKey)
342 }
343 }
344
345 private async parseRootInfo(stuff: NostrEvent | string): Promise<TRootInfo | undefined> {
346 const { event, externalContent } = this.resolveStuff(stuff)
347 if (!event && !externalContent) return
348
349 const cacheKey = event ? getEventKey(event) : externalContent!
350 const cache = this.rootInfoCache.get(cacheKey)
351 if (cache) return cache
352
353 const _parseRootInfo = async (): Promise<TRootInfo | undefined> => {
354 let root: TRootInfo = event
355 ? isReplaceableEvent(event.kind)
356 ? {
357 type: 'A',
358 id: getReplaceableCoordinateFromEvent(event),
359 pubkey: event.pubkey,
360 relay: client.getEventHint(event.id)
361 }
362 : { type: 'E', id: event.id, pubkey: event.pubkey }
363 : { type: 'I', id: externalContent! }
364
365 const rootTag = getRootTag(event)
366 if (rootTag?.type === 'e') {
367 const [, rootEventHexId, , , rootEventPubkey] = rootTag.tag
368 if (rootEventHexId && rootEventPubkey) {
369 root = { type: 'E', id: rootEventHexId, pubkey: rootEventPubkey }
370 } else {
371 const rootEventId = generateBech32IdFromETag(rootTag.tag)
372 if (rootEventId) {
373 const rootEvent = await client.fetchEvent(rootEventId)
374 if (rootEvent) {
375 root = { type: 'E', id: rootEvent.id, pubkey: rootEvent.pubkey }
376 }
377 }
378 }
379 } else if (rootTag?.type === 'a') {
380 const [, coordinate, relay] = rootTag.tag
381 const [, pubkey] = coordinate.split(':')
382 root = { type: 'A', id: coordinate, pubkey, relay }
383 } else if (rootTag?.type === 'i') {
384 root = { type: 'I', id: rootTag.tag[1] }
385 }
386 return root
387 }
388
389 const promise = _parseRootInfo()
390 this.rootInfoCache.set(cacheKey, promise)
391 return promise
392 }
393
394 /**
395 * Try to fetch thread events using graph query (NIP-XX).
396 * Returns true if successful, false otherwise.
397 */
398 private async tryGraphQueryThread(eventId: string): Promise<boolean> {
399 try {
400 // Use current user's relays for graph queries
401 const relays = client.currentRelays.length > 0 ? client.currentRelays : []
402 if (relays.length === 0) {
403 return false
404 }
405
406 const graphResult = await graphQueryService.queryThread(
407 relays,
408 eventId,
409 10, // Max depth for threads
410 {
411 inboundRefKinds: [7, 9735] // Reactions and zaps
412 }
413 )
414
415 if (!graphResult?.events_by_depth?.length) {
416 return false
417 }
418
419 // Graph query returns event IDs by depth
420 // We need to fetch the actual events and add them to the thread
421 const allEventIds = graphResult.events_by_depth.flat()
422 if (allEventIds.length === 0) {
423 return false
424 }
425
426 // Fetch actual events for the IDs returned by graph query
427 const events = await client.fetchEvents(relays, {
428 ids: allEventIds.slice(0, 500), // Limit to prevent huge queries
429 limit: allEventIds.length
430 })
431
432 if (events.length > 0) {
433 this.addRepliesToThread(events)
434 return true
435 }
436
437 return false
438 } catch (error) {
439 console.error('Graph query for thread failed:', error)
440 return false
441 }
442 }
443
444 private resolveStuff(stuff: NostrEvent | string) {
445 return typeof stuff === 'string'
446 ? { event: undefined, externalContent: stuff, stuffKey: stuff }
447 : { event: stuff, externalContent: undefined, stuffKey: getEventKey(stuff) }
448 }
449 }
450
451 const instance = new ThreadService()
452
453 export default instance
454