directory.go raw
1 package spider
2
3 import (
4 "context"
5 "sync"
6 "sync/atomic"
7 "time"
8
9 "next.orly.dev/pkg/nostr/crypto/keys"
10 "next.orly.dev/pkg/nostr/encoders/event"
11 "next.orly.dev/pkg/nostr/encoders/filter"
12 "next.orly.dev/pkg/nostr/encoders/kind"
13 "next.orly.dev/pkg/nostr/encoders/tag"
14 "next.orly.dev/pkg/nostr/utils/normalize"
15 "next.orly.dev/pkg/nostr/ws"
16 "next.orly.dev/pkg/lol/chk"
17 "next.orly.dev/pkg/lol/errorf"
18 "next.orly.dev/pkg/lol/log"
19 "next.orly.dev/pkg/database"
20 "next.orly.dev/pkg/interfaces/publisher"
21 dsync "next.orly.dev/pkg/sync"
22 )
23
24 const (
25 // DirectorySpiderDefaultInterval is how often the directory spider runs
26 DirectorySpiderDefaultInterval = 24 * time.Hour
27 // DirectorySpiderDefaultMaxHops is the maximum hop distance for relay discovery
28 DirectorySpiderDefaultMaxHops = 3
29 // DirectorySpiderRelayTimeout is the timeout for connecting to and querying a relay
30 DirectorySpiderRelayTimeout = 30 * time.Second
31 // DirectorySpiderQueryTimeout is the timeout for waiting for EOSE on a query
32 DirectorySpiderQueryTimeout = 60 * time.Second
33 // DirectorySpiderRelayDelay is the delay between processing relays (rate limiting)
34 DirectorySpiderRelayDelay = 500 * time.Millisecond
35 // DirectorySpiderMaxEventsPerQuery is the limit for each query
36 DirectorySpiderMaxEventsPerQuery = 5000
37 )
38
39 // DirectorySpider manages periodic relay discovery and metadata synchronization.
40 // It discovers relays by crawling kind 10002 (relay list) events, expanding outward
41 // in hops from seed pubkeys (whitelisted users), then fetches essential metadata
42 // events (kinds 0, 3, 10000, 10002) from all discovered relays.
43 type DirectorySpider struct {
44 ctx context.Context
45 cancel context.CancelFunc
46 db *database.D
47 pub publisher.I
48
49 // Configuration
50 interval time.Duration
51 maxHops int
52
53 // State
54 running atomic.Bool
55 lastRun time.Time
56
57 // Relay discovery state (reset each run)
58 mu sync.Mutex
59 discoveredRelays map[string]int // URL -> hop distance
60 processedRelays map[string]bool // Already fetched metadata from
61
62 // Self-detection
63 relayIdentityPubkey string
64 selfURLs map[string]bool
65 nip11Cache *dsync.NIP11Cache
66
67 // Callback for getting seed pubkeys (whitelisted users)
68 getSeedPubkeys func() [][]byte
69
70 // Trigger channel for manual runs
71 triggerChan chan struct{}
72 }
73
74 // NewDirectorySpider creates a new DirectorySpider instance.
75 func NewDirectorySpider(
76 ctx context.Context,
77 db *database.D,
78 pub publisher.I,
79 interval time.Duration,
80 maxHops int,
81 ) (ds *DirectorySpider, err error) {
82 if db == nil {
83 err = errorf.E("database cannot be nil")
84 return
85 }
86
87 if interval <= 0 {
88 interval = DirectorySpiderDefaultInterval
89 }
90 if maxHops <= 0 {
91 maxHops = DirectorySpiderDefaultMaxHops
92 }
93
94 ctx, cancel := context.WithCancel(ctx)
95
96 // Get relay identity pubkey for self-detection
97 var relayPubkey string
98 if skb, err := db.GetRelayIdentitySecret(); err == nil && len(skb) == 32 {
99 pk, _ := keys.SecretBytesToPubKeyHex(skb)
100 relayPubkey = pk
101 }
102
103 ds = &DirectorySpider{
104 ctx: ctx,
105 cancel: cancel,
106 db: db,
107 pub: pub,
108 interval: interval,
109 maxHops: maxHops,
110 discoveredRelays: make(map[string]int),
111 processedRelays: make(map[string]bool),
112 relayIdentityPubkey: relayPubkey,
113 selfURLs: make(map[string]bool),
114 nip11Cache: dsync.NewNIP11Cache(30 * time.Minute),
115 triggerChan: make(chan struct{}, 1),
116 }
117
118 return
119 }
120
121 // SetSeedCallback sets the callback function for getting seed pubkeys (whitelisted users).
122 func (ds *DirectorySpider) SetSeedCallback(getSeedPubkeys func() [][]byte) {
123 ds.mu.Lock()
124 defer ds.mu.Unlock()
125 ds.getSeedPubkeys = getSeedPubkeys
126 }
127
128 // Start begins the directory spider operation.
129 func (ds *DirectorySpider) Start() (err error) {
130 if ds.running.Load() {
131 err = errorf.E("directory spider already running")
132 return
133 }
134
135 if ds.getSeedPubkeys == nil {
136 err = errorf.E("seed callback must be set before starting")
137 return
138 }
139
140 ds.running.Store(true)
141 go ds.mainLoop()
142
143 log.I.F("directory spider: started (interval: %v, max hops: %d)", ds.interval, ds.maxHops)
144 return
145 }
146
147 // Stop stops the directory spider operation.
148 func (ds *DirectorySpider) Stop() {
149 if !ds.running.Load() {
150 return
151 }
152
153 ds.running.Store(false)
154 ds.cancel()
155
156 log.I.F("directory spider: stopped")
157 }
158
159 // TriggerNow forces an immediate run of the directory spider.
160 func (ds *DirectorySpider) TriggerNow() {
161 select {
162 case ds.triggerChan <- struct{}{}:
163 log.D.F("directory spider: manual trigger sent")
164 default:
165 log.D.F("directory spider: trigger already pending")
166 }
167 }
168
169 // LastRun returns the time of the last completed run.
170 func (ds *DirectorySpider) LastRun() time.Time {
171 ds.mu.Lock()
172 defer ds.mu.Unlock()
173 return ds.lastRun
174 }
175
176 // mainLoop is the main spider loop that runs periodically.
177 func (ds *DirectorySpider) mainLoop() {
178 // Run immediately on start
179 ds.runOnce()
180
181 ticker := time.NewTicker(ds.interval)
182 defer ticker.Stop()
183
184 log.D.F("directory spider: main loop started, running every %v", ds.interval)
185
186 for {
187 select {
188 case <-ds.ctx.Done():
189 return
190 case <-ds.triggerChan:
191 log.D.F("directory spider: manual trigger received")
192 ds.runOnce()
193 case <-ticker.C:
194 log.D.F("directory spider: scheduled run triggered")
195 ds.runOnce()
196 }
197 }
198 }
199
200 // runOnce performs a single directory spider run.
201 func (ds *DirectorySpider) runOnce() {
202 if !ds.running.Load() {
203 return
204 }
205
206 log.D.F("directory spider: starting run")
207 start := time.Now()
208
209 // Reset state for this run
210 ds.mu.Lock()
211 ds.discoveredRelays = make(map[string]int)
212 ds.processedRelays = make(map[string]bool)
213 ds.mu.Unlock()
214
215 // Phase 1: Discover relays via hop expansion
216 if err := ds.discoverRelays(); err != nil {
217 log.E.F("directory spider: relay discovery failed: %v", err)
218 return
219 }
220
221 ds.mu.Lock()
222 relayCount := len(ds.discoveredRelays)
223 ds.mu.Unlock()
224
225 log.D.F("directory spider: discovered %d relays", relayCount)
226
227 // Phase 2: Fetch metadata from all discovered relays
228 if err := ds.fetchMetadataFromRelays(); err != nil {
229 log.E.F("directory spider: metadata fetch failed: %v", err)
230 return
231 }
232
233 ds.mu.Lock()
234 ds.lastRun = time.Now()
235 ds.mu.Unlock()
236
237 log.D.F("directory spider: completed run in %v", time.Since(start))
238 }
239
240 // discoverRelays performs the multi-hop relay discovery.
241 func (ds *DirectorySpider) discoverRelays() error {
242 // Get seed pubkeys from callback
243 seedPubkeys := ds.getSeedPubkeys()
244 if len(seedPubkeys) == 0 {
245 log.W.F("directory spider: no seed pubkeys available")
246 return nil
247 }
248
249 log.D.F("directory spider: starting relay discovery with %d seed pubkeys", len(seedPubkeys))
250
251 // Round 0: Get relay lists from seed pubkeys in local database
252 seedRelays, err := ds.getRelaysFromLocalDB(seedPubkeys)
253 if err != nil {
254 return errorf.W("failed to get relays from local DB: %v", err)
255 }
256
257 // Filter out self-relays WITHOUT holding mu — isSelfRelay takes mu internally
258 var nonSelfRelays []string
259 for _, url := range seedRelays {
260 if !ds.isSelfRelay(url) {
261 nonSelfRelays = append(nonSelfRelays, url)
262 }
263 }
264
265 // Add seed relays at hop 0
266 ds.mu.Lock()
267 for _, url := range nonSelfRelays {
268 ds.discoveredRelays[url] = 0
269 }
270 ds.mu.Unlock()
271
272 log.D.F("directory spider: found %d seed relays from local database", len(seedRelays))
273
274 // Rounds 1 to maxHops: Expand outward
275 for hop := 1; hop <= ds.maxHops; hop++ {
276 select {
277 case <-ds.ctx.Done():
278 return ds.ctx.Err()
279 default:
280 }
281
282 // Get relays at previous hop level that haven't been processed
283 ds.mu.Lock()
284 var relaysToProcess []string
285 for url, hopLevel := range ds.discoveredRelays {
286 if hopLevel == hop-1 && !ds.processedRelays[url] {
287 relaysToProcess = append(relaysToProcess, url)
288 }
289 }
290 ds.mu.Unlock()
291
292 if len(relaysToProcess) == 0 {
293 log.D.F("directory spider: no relays to process at hop %d", hop)
294 break
295 }
296
297 log.D.F("directory spider: hop %d - processing %d relays", hop, len(relaysToProcess))
298
299 newRelaysThisHop := 0
300
301 // Process each relay serially
302 for _, relayURL := range relaysToProcess {
303 select {
304 case <-ds.ctx.Done():
305 return ds.ctx.Err()
306 default:
307 }
308
309 // Fetch kind 10002 events from this relay
310 events, err := ds.fetchRelayListsFromRelay(relayURL)
311 if err != nil {
312 log.W.F("directory spider: failed to fetch from %s: %v", relayURL, err)
313 // Mark as processed even on failure to avoid retrying
314 ds.mu.Lock()
315 ds.processedRelays[relayURL] = true
316 ds.mu.Unlock()
317 continue
318 }
319
320 // Extract new relay URLs
321 newRelays := ds.extractRelaysFromEvents(events)
322
323 // Filter self-relays outside the lock to avoid deadlock
324 // (isSelfRelay takes ds.mu internally)
325 ds.mu.Lock()
326 var unknownRelays []string
327 for _, newURL := range newRelays {
328 if _, exists := ds.discoveredRelays[newURL]; !exists {
329 unknownRelays = append(unknownRelays, newURL)
330 }
331 }
332 ds.processedRelays[relayURL] = true
333 ds.mu.Unlock()
334
335 var nonSelfNew []string
336 for _, newURL := range unknownRelays {
337 if !ds.isSelfRelay(newURL) {
338 nonSelfNew = append(nonSelfNew, newURL)
339 }
340 }
341
342 ds.mu.Lock()
343 for _, newURL := range nonSelfNew {
344 if _, exists := ds.discoveredRelays[newURL]; !exists {
345 ds.discoveredRelays[newURL] = hop
346 newRelaysThisHop++
347 }
348 }
349 ds.mu.Unlock()
350
351 // Rate limiting delay between relays
352 time.Sleep(DirectorySpiderRelayDelay)
353 }
354
355 log.D.F("directory spider: hop %d - discovered %d new relays", hop, newRelaysThisHop)
356 }
357
358 return nil
359 }
360
361 // getRelaysFromLocalDB queries the local database for kind 10002 events from seed pubkeys.
362 func (ds *DirectorySpider) getRelaysFromLocalDB(seedPubkeys [][]byte) ([]string, error) {
363 ctx, cancel := context.WithTimeout(ds.ctx, 30*time.Second)
364 defer cancel()
365
366 // Query for kind 10002 from seed pubkeys
367 f := &filter.F{
368 Authors: tag.NewFromBytesSlice(seedPubkeys...),
369 Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
370 }
371
372 events, err := ds.db.QueryEvents(ctx, f)
373 if err != nil {
374 return nil, err
375 }
376
377 return ds.extractRelaysFromEvents(events), nil
378 }
379
380 // fetchRelayListsFromRelay connects to a relay and fetches all kind 10002 events.
381 func (ds *DirectorySpider) fetchRelayListsFromRelay(relayURL string) ([]*event.E, error) {
382 ctx, cancel := context.WithTimeout(ds.ctx, DirectorySpiderRelayTimeout)
383 defer cancel()
384
385 log.D.F("directory spider: connecting to %s", relayURL)
386
387 client, err := ws.RelayConnect(ctx, relayURL)
388 if err != nil {
389 return nil, errorf.W("failed to connect: %v", err)
390 }
391 defer client.Close()
392
393 // Query for all kind 10002 events
394 limit := uint(DirectorySpiderMaxEventsPerQuery)
395 f := filter.NewS(&filter.F{
396 Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
397 Limit: &limit,
398 })
399
400 sub, err := client.Subscribe(ctx, f)
401 if err != nil {
402 return nil, errorf.W("failed to subscribe: %v", err)
403 }
404 defer sub.Unsub()
405
406 var events []*event.E
407 queryCtx, queryCancel := context.WithTimeout(ctx, DirectorySpiderQueryTimeout)
408 defer queryCancel()
409
410 // Collect events until EOSE or timeout
411 for {
412 select {
413 case <-queryCtx.Done():
414 log.D.F("directory spider: query timeout for %s, got %d events", relayURL, len(events))
415 return events, nil
416 case <-sub.EndOfStoredEvents:
417 log.D.F("directory spider: EOSE from %s, got %d events", relayURL, len(events))
418 return events, nil
419 case ev := <-sub.Events:
420 if ev == nil {
421 return events, nil
422 }
423 events = append(events, ev)
424 }
425 }
426 }
427
428 // extractRelaysFromEvents parses kind 10002 events and extracts relay URLs from "r" tags.
429 func (ds *DirectorySpider) extractRelaysFromEvents(events []*event.E) []string {
430 seen := make(map[string]bool)
431 var relays []string
432
433 for _, ev := range events {
434 // Get all "r" tags
435 rTags := ev.Tags.GetAll([]byte("r"))
436 for _, rTag := range rTags {
437 if len(rTag.T) < 2 {
438 continue
439 }
440 urlBytes := rTag.Value()
441 if len(urlBytes) == 0 {
442 continue
443 }
444
445 // Normalize the URL
446 normalized := string(normalize.URL(string(urlBytes)))
447 if normalized == "" {
448 continue
449 }
450
451 if !seen[normalized] {
452 seen[normalized] = true
453 relays = append(relays, normalized)
454 }
455 }
456 }
457
458 return relays
459 }
460
461 // fetchMetadataFromRelays iterates through all discovered relays and fetches metadata.
462 func (ds *DirectorySpider) fetchMetadataFromRelays() error {
463 ds.mu.Lock()
464 // Copy relay list to avoid holding lock during network operations
465 var relays []string
466 for url := range ds.discoveredRelays {
467 relays = append(relays, url)
468 }
469 ds.mu.Unlock()
470
471 log.D.F("directory spider: fetching metadata from %d relays", len(relays))
472
473 // Kinds to fetch: 0 (profile), 3 (follow list), 10000 (mute list), 10002 (relay list)
474 kindsToFetch := []uint16{
475 kind.ProfileMetadata.K, // 0
476 kind.FollowList.K, // 3
477 kind.MuteList.K, // 10000
478 kind.RelayListMetadata.K, // 10002
479 }
480
481 totalSaved := 0
482 totalDuplicates := 0
483
484 for _, relayURL := range relays {
485 select {
486 case <-ds.ctx.Done():
487 return ds.ctx.Err()
488 default:
489 }
490
491 ds.mu.Lock()
492 alreadyProcessed := ds.processedRelays[relayURL]
493 ds.mu.Unlock()
494
495 if alreadyProcessed {
496 continue
497 }
498
499 log.D.F("directory spider: fetching metadata from %s", relayURL)
500
501 for _, k := range kindsToFetch {
502 select {
503 case <-ds.ctx.Done():
504 return ds.ctx.Err()
505 default:
506 }
507
508 events, err := ds.fetchKindFromRelay(relayURL, k)
509 if err != nil {
510 log.W.F("directory spider: failed to fetch kind %d from %s: %v", k, relayURL, err)
511 continue
512 }
513
514 saved, duplicates := ds.storeEvents(events)
515 totalSaved += saved
516 totalDuplicates += duplicates
517
518 log.D.F("directory spider: kind %d from %s: %d saved, %d duplicates",
519 k, relayURL, saved, duplicates)
520 }
521
522 ds.mu.Lock()
523 ds.processedRelays[relayURL] = true
524 ds.mu.Unlock()
525
526 // Rate limiting delay between relays
527 time.Sleep(DirectorySpiderRelayDelay)
528 }
529
530 log.D.F("directory spider: metadata fetch complete - %d events saved, %d duplicates",
531 totalSaved, totalDuplicates)
532
533 return nil
534 }
535
536 // fetchKindFromRelay connects to a relay and fetches events of a specific kind.
537 func (ds *DirectorySpider) fetchKindFromRelay(relayURL string, k uint16) ([]*event.E, error) {
538 ctx, cancel := context.WithTimeout(ds.ctx, DirectorySpiderRelayTimeout)
539 defer cancel()
540
541 client, err := ws.RelayConnect(ctx, relayURL)
542 if err != nil {
543 return nil, errorf.W("failed to connect: %v", err)
544 }
545 defer client.Close()
546
547 // Query for events of this kind
548 limit := uint(DirectorySpiderMaxEventsPerQuery)
549 f := filter.NewS(&filter.F{
550 Kinds: kind.NewS(kind.New(k)),
551 Limit: &limit,
552 })
553
554 sub, err := client.Subscribe(ctx, f)
555 if err != nil {
556 return nil, errorf.W("failed to subscribe: %v", err)
557 }
558 defer sub.Unsub()
559
560 var events []*event.E
561 queryCtx, queryCancel := context.WithTimeout(ctx, DirectorySpiderQueryTimeout)
562 defer queryCancel()
563
564 for {
565 select {
566 case <-queryCtx.Done():
567 return events, nil
568 case <-sub.EndOfStoredEvents:
569 return events, nil
570 case ev := <-sub.Events:
571 if ev == nil {
572 return events, nil
573 }
574 events = append(events, ev)
575 }
576 }
577 }
578
579 // storeEvents saves events to the database and publishes new ones.
580 func (ds *DirectorySpider) storeEvents(events []*event.E) (saved, duplicates int) {
581 for _, ev := range events {
582 _, err := ds.db.SaveEvent(ds.ctx, ev)
583 if err != nil {
584 if chk.T(err) {
585 // Most errors are duplicates, which is expected
586 duplicates++
587 }
588 continue
589 }
590 saved++
591
592 // Publish event to active subscribers
593 if ds.pub != nil {
594 go ds.pub.Deliver(ev)
595 }
596 }
597 return
598 }
599
600 // isSelfRelay checks if a relay URL is ourselves by comparing NIP-11 pubkeys.
601 func (ds *DirectorySpider) isSelfRelay(relayURL string) bool {
602 // If we don't have a relay identity pubkey, can't compare
603 if ds.relayIdentityPubkey == "" {
604 return false
605 }
606
607 ds.mu.Lock()
608 // Fast path: check if we already know this URL is ours
609 if ds.selfURLs[relayURL] {
610 ds.mu.Unlock()
611 return true
612 }
613 ds.mu.Unlock()
614
615 // Slow path: check via NIP-11 pubkey
616 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
617 defer cancel()
618
619 peerPubkey, err := ds.nip11Cache.GetPubkey(ctx, relayURL)
620 if err != nil {
621 // Can't determine, assume not self
622 return false
623 }
624
625 if peerPubkey == ds.relayIdentityPubkey {
626 log.D.F("directory spider: discovered self-relay: %s", relayURL)
627 ds.mu.Lock()
628 ds.selfURLs[relayURL] = true
629 ds.mu.Unlock()
630 return true
631 }
632
633 return false
634 }
635