crawler.go raw
1 // Package crawler provides an automated corpus crawler that discovers relays
2 // via kind 10002 hop-expansion and then syncs all events from each discovered
3 // relay using NIP-77 negentropy set reconciliation.
4 //
5 // Architecture:
6 //
7 // Discovery loop expands relay URLs from seed pubkeys via kind 10002 →
8 // Crawler maintains a persistent frontier of relay entries with per-relay
9 // state → Sync loop picks relays due for sync and runs negentropy
10 // reconciliation with bounded concurrency → Frontier state is persisted
11 // to database markers so it survives restarts.
12 package crawler
13
14 import (
15 "context"
16 "encoding/json"
17 "fmt"
18 gosync "sync"
19 "sync/atomic"
20 "time"
21
22 "next.orly.dev/pkg/database"
23 "next.orly.dev/pkg/interfaces/publisher"
24 "next.orly.dev/pkg/lol/log"
25 "next.orly.dev/pkg/nostr/crypto/keys"
26 "next.orly.dev/pkg/nostr/encoders/filter"
27 "next.orly.dev/pkg/nostr/encoders/kind"
28 "next.orly.dev/pkg/nostr/encoders/tag"
29 "next.orly.dev/pkg/nostr/utils/normalize"
30 "next.orly.dev/pkg/nostr/ws"
31 dsync "next.orly.dev/pkg/sync"
32 "next.orly.dev/pkg/sync/negentropy"
33 )
34
35 const (
36 DefaultDiscoveryInterval = 4 * time.Hour
37 DefaultSyncInterval = 30 * time.Minute
38 DefaultMaxHops = 5
39 DefaultConcurrency = 3
40 DefaultRelayTimeout = 30 * time.Second
41 QueryTimeout = 60 * time.Second
42 DefaultSyncTimeout = 10 * time.Minute
43 DefaultRelayDelay = 500 * time.Millisecond
44 DefaultMaxFailures = 5
45 DefaultBlacklistDuration = 24 * time.Hour
46 DefaultMaxEventsPerSync = 1_000_000
47 MaxEventsPerQuery = 5000
48
49 markerFrontierKey = "crawler:frontier"
50 markerStatsKey = "crawler:stats"
51 )
52
53 // Config holds configuration for the corpus crawler.
54 type Config struct {
55 DiscoveryInterval time.Duration
56 MaxHops int
57
58 SyncInterval time.Duration
59 Concurrency int
60 SyncTimeout time.Duration
61 MaxEventsPerSync uint
62
63 RelayDelay time.Duration
64
65 MaxFailures int
66 BlacklistDuration time.Duration
67 }
68
69 // DefaultConfig returns a Config with sensible defaults.
70 func DefaultConfig() *Config {
71 return &Config{
72 DiscoveryInterval: DefaultDiscoveryInterval,
73 MaxHops: DefaultMaxHops,
74 SyncInterval: DefaultSyncInterval,
75 Concurrency: DefaultConcurrency,
76 SyncTimeout: DefaultSyncTimeout,
77 MaxEventsPerSync: DefaultMaxEventsPerSync,
78 RelayDelay: DefaultRelayDelay,
79 MaxFailures: DefaultMaxFailures,
80 BlacklistDuration: DefaultBlacklistDuration,
81 }
82 }
83
84 // RelayState tracks the crawl state of a single relay.
85 type RelayState struct {
86 URL string `json:"url"`
87 HopDistance int `json:"hop_distance"`
88 FirstSeen time.Time `json:"first_seen"`
89 LastSync time.Time `json:"last_sync"`
90 LastDiscovery time.Time `json:"last_discovery"`
91 EventsSynced int64 `json:"events_synced"`
92 TotalSyncs int64 `json:"total_syncs"`
93 ConsecFailures int `json:"consec_failures"`
94 LastError string `json:"last_error,omitempty"`
95 BlacklistedUntil time.Time `json:"blacklisted_until,omitempty"`
96 IsSelf bool `json:"is_self,omitempty"`
97 }
98
99 func (rs *RelayState) isBlacklisted() bool {
100 return !rs.BlacklistedUntil.IsZero() && time.Now().Before(rs.BlacklistedUntil)
101 }
102
103 func (rs *RelayState) needsSync(interval time.Duration) bool {
104 if rs.IsSelf || rs.isBlacklisted() {
105 return false
106 }
107 return rs.LastSync.IsZero() || time.Since(rs.LastSync) >= interval
108 }
109
110 // Stats tracks aggregate crawler statistics.
111 type Stats struct {
112 TotalRelaysDiscovered int64 `json:"total_relays_discovered"`
113 TotalRelaysSynced int64 `json:"total_relays_synced"`
114 TotalEventsSynced int64 `json:"total_events_synced"`
115 TotalSyncErrors int64 `json:"total_sync_errors"`
116 LastDiscoveryRun time.Time `json:"last_discovery_run"`
117 LastSyncRun time.Time `json:"last_sync_run"`
118 BlacklistedRelays int64 `json:"blacklisted_relays"`
119 }
120
121 // Crawler orchestrates relay discovery and corpus sync.
122 type Crawler struct {
123 ctx context.Context
124 cancel context.CancelFunc
125
126 db database.Database
127 pub publisher.I
128 config *Config
129
130 // mu protects frontier, stats, and selfURLs.
131 mu gosync.RWMutex
132 frontier map[string]*RelayState
133 stats Stats
134 selfURLs map[string]bool
135
136 relayIdentityPubkey string
137 nip11Cache *dsync.NIP11Cache
138
139 // seedMu protects getSeedPubkeys independently from frontier lock
140 // to avoid holding mu during the callback (which may do its own locking).
141 seedMu gosync.RWMutex
142 getSeedPubkeys func() [][]byte
143
144 running atomic.Bool
145 stopOnce gosync.Once
146 stopChan chan struct{}
147 wg gosync.WaitGroup
148 }
149
150 // New creates a new Crawler instance.
151 func New(ctx context.Context, db database.Database, pub publisher.I, cfg *Config) (*Crawler, error) {
152 if db == nil {
153 return nil, fmt.Errorf("database cannot be nil")
154 }
155 if cfg == nil {
156 cfg = DefaultConfig()
157 }
158
159 ctx, cancel := context.WithCancel(ctx)
160
161 var relayPubkey string
162 if skb, err := db.GetRelayIdentitySecret(); err == nil && len(skb) == 32 {
163 pk, _ := keys.SecretBytesToPubKeyHex(skb)
164 relayPubkey = pk
165 }
166
167 c := &Crawler{
168 ctx: ctx,
169 cancel: cancel,
170 db: db,
171 pub: pub,
172 config: cfg,
173 frontier: make(map[string]*RelayState),
174 selfURLs: make(map[string]bool),
175 nip11Cache: dsync.NewNIP11Cache(30 * time.Minute),
176 relayIdentityPubkey: relayPubkey,
177 stopChan: make(chan struct{}),
178 }
179
180 if err := c.loadFrontier(); err != nil {
181 log.W.F("crawler: failed to load frontier: %v (starting fresh)", err)
182 }
183
184 return c, nil
185 }
186
187 // SetSeedCallback sets the callback for getting seed pubkeys used in discovery.
188 func (c *Crawler) SetSeedCallback(fn func() [][]byte) {
189 c.seedMu.Lock()
190 defer c.seedMu.Unlock()
191 c.getSeedPubkeys = fn
192 }
193
194 func (c *Crawler) callSeedCallback() [][]byte {
195 c.seedMu.RLock()
196 fn := c.getSeedPubkeys
197 c.seedMu.RUnlock()
198 if fn == nil {
199 return nil
200 }
201 return fn()
202 }
203
204 // Start begins the crawler's discovery and sync loops.
205 func (c *Crawler) Start() error {
206 if c.running.Load() {
207 return fmt.Errorf("crawler already running")
208 }
209
210 c.seedMu.RLock()
211 hasSeed := c.getSeedPubkeys != nil
212 c.seedMu.RUnlock()
213 if !hasSeed {
214 return fmt.Errorf("seed callback must be set before starting")
215 }
216
217 c.running.Store(true)
218
219 c.wg.Add(2)
220 go c.discoveryLoop()
221 go c.syncLoop()
222
223 log.I.F("crawler: started (discovery: %v, sync: %v, hops: %d, concurrency: %d)",
224 c.config.DiscoveryInterval, c.config.SyncInterval,
225 c.config.MaxHops, c.config.Concurrency)
226
227 return nil
228 }
229
230 // Stop stops the crawler gracefully.
231 func (c *Crawler) Stop() {
232 if !c.running.Load() {
233 return
234 }
235 c.running.Store(false)
236 c.stopOnce.Do(func() { close(c.stopChan) })
237 c.cancel()
238 c.wg.Wait()
239
240 if err := c.saveFrontier(); err != nil {
241 log.W.F("crawler: failed to save frontier on stop: %v", err)
242 }
243
244 c.mu.RLock()
245 frontierSize := len(c.frontier)
246 totalEvents := c.stats.TotalEventsSynced
247 c.mu.RUnlock()
248
249 log.I.F("crawler: stopped (frontier: %d relays, total events synced: %d)",
250 frontierSize, totalEvents)
251 }
252
253 // GetStats returns a snapshot of crawler statistics.
254 func (c *Crawler) GetStats() Stats {
255 c.mu.RLock()
256 defer c.mu.RUnlock()
257 return c.stats
258 }
259
260 // GetFrontierSize returns the number of relays in the frontier.
261 func (c *Crawler) GetFrontierSize() int {
262 c.mu.RLock()
263 defer c.mu.RUnlock()
264 return len(c.frontier)
265 }
266
267 func (c *Crawler) discoveryLoop() {
268 defer c.wg.Done()
269
270 c.runDiscovery()
271
272 ticker := time.NewTicker(c.config.DiscoveryInterval)
273 defer ticker.Stop()
274
275 for {
276 select {
277 case <-c.stopChan:
278 return
279 case <-ticker.C:
280 c.runDiscovery()
281 }
282 }
283 }
284
285 func (c *Crawler) syncLoop() {
286 defer c.wg.Done()
287
288 select {
289 case <-c.stopChan:
290 return
291 case <-time.After(30 * time.Second):
292 }
293
294 c.runSyncCycle()
295
296 ticker := time.NewTicker(c.config.SyncInterval)
297 defer ticker.Stop()
298
299 for {
300 select {
301 case <-c.stopChan:
302 return
303 case <-ticker.C:
304 c.runSyncCycle()
305 }
306 }
307 }
308
309 // runDiscovery performs one relay discovery cycle using kind 10002 hop expansion.
310 func (c *Crawler) runDiscovery() {
311 log.I.F("crawler: starting relay discovery (max hops: %d)", c.config.MaxHops)
312
313 seeds := c.callSeedCallback()
314 if len(seeds) == 0 {
315 log.W.F("crawler: no seed pubkeys, skipping discovery")
316 return
317 }
318
319 discovered := make(map[string]int) // URL -> hop distance
320
321 localRelays := c.getRelaysFromLocalDB(seeds)
322 for url := range localRelays {
323 discovered[url] = 0
324 }
325
326 log.I.F("crawler: hop 0 discovered %d relays from %d seed pubkeys", len(discovered), len(seeds))
327
328 for hop := 1; hop <= c.config.MaxHops; hop++ {
329 select {
330 case <-c.stopChan:
331 return
332 default:
333 }
334
335 var prevHopRelays []string
336 for url, h := range discovered {
337 if h == hop-1 {
338 prevHopRelays = append(prevHopRelays, url)
339 }
340 }
341
342 if len(prevHopRelays) == 0 {
343 log.I.F("crawler: no relays at hop %d, stopping expansion", hop-1)
344 break
345 }
346
347 newCount := 0
348 for _, relayURL := range prevHopRelays {
349 select {
350 case <-c.stopChan:
351 return
352 default:
353 }
354
355 // isSelfRelay does network IO (NIP-11) — must NOT hold mu.
356 if c.isSelfRelay(relayURL) {
357 continue
358 }
359
360 relays, err := c.fetchRelayListsFromRelay(relayURL)
361 if err != nil {
362 log.D.F("crawler: hop %d fetch from %s failed: %v", hop, relayURL, err)
363 continue
364 }
365
366 for _, newURL := range relays {
367 if _, exists := discovered[newURL]; !exists {
368 discovered[newURL] = hop
369 newCount++
370 }
371 }
372
373 time.Sleep(c.config.RelayDelay)
374 }
375
376 log.I.F("crawler: hop %d discovered %d new relays from %d sources",
377 hop, newCount, len(prevHopRelays))
378 }
379
380 // Filter self-relays before taking the lock (isSelfRelay does network IO).
381 filtered := make(map[string]int, len(discovered))
382 for url, hopDist := range discovered {
383 normURL := string(normalize.URL(url))
384 if normURL == "" {
385 continue
386 }
387 if c.isSelfRelay(normURL) {
388 continue
389 }
390 filtered[normURL] = hopDist
391 }
392
393 // Merge into frontier under lock.
394 c.mu.Lock()
395 added := 0
396 for normURL, hopDist := range filtered {
397 if existing, ok := c.frontier[normURL]; ok {
398 existing.LastDiscovery = time.Now()
399 if hopDist < existing.HopDistance {
400 existing.HopDistance = hopDist
401 }
402 } else {
403 c.frontier[normURL] = &RelayState{
404 URL: normURL,
405 HopDistance: hopDist,
406 FirstSeen: time.Now(),
407 LastDiscovery: time.Now(),
408 }
409 added++
410 }
411 }
412 c.stats.TotalRelaysDiscovered = int64(len(c.frontier))
413 c.stats.LastDiscoveryRun = time.Now()
414 frontierSize := len(c.frontier)
415 c.mu.Unlock()
416
417 log.I.F("crawler: discovery complete — %d new relays added, frontier size: %d",
418 added, frontierSize)
419
420 if err := c.saveFrontier(); err != nil {
421 log.W.F("crawler: failed to save frontier: %v", err)
422 }
423 }
424
425 // runSyncCycle syncs events from relays that are due for a sync.
426 func (c *Crawler) runSyncCycle() {
427 // Snapshot relay URLs and hop distances under lock.
428 type syncTarget struct {
429 url string
430 hopDistance int
431 totalSyncs int64
432 }
433 c.mu.RLock()
434 var due []syncTarget
435 for _, rs := range c.frontier {
436 if rs.needsSync(c.config.SyncInterval) {
437 due = append(due, syncTarget{
438 url: rs.URL,
439 hopDistance: rs.HopDistance,
440 totalSyncs: rs.TotalSyncs,
441 })
442 }
443 }
444 c.mu.RUnlock()
445
446 if len(due) == 0 {
447 log.D.F("crawler: no relays due for sync")
448 return
449 }
450
451 log.I.F("crawler: starting sync cycle — %d relays due", len(due))
452
453 sem := make(chan struct{}, c.config.Concurrency)
454 var syncWg gosync.WaitGroup
455
456 for _, target := range due {
457 select {
458 case <-c.stopChan:
459 syncWg.Wait()
460 return
461 default:
462 }
463
464 sem <- struct{}{}
465 syncWg.Add(1)
466
467 go func(t syncTarget) {
468 defer syncWg.Done()
469 defer func() { <-sem }()
470 c.syncRelay(t.url, t.hopDistance, t.totalSyncs)
471 }(target)
472 }
473
474 syncWg.Wait()
475
476 c.mu.Lock()
477 c.stats.LastSyncRun = time.Now()
478 c.mu.Unlock()
479
480 if err := c.saveFrontier(); err != nil {
481 log.W.F("crawler: failed to save frontier: %v", err)
482 }
483
484 log.I.F("crawler: sync cycle complete")
485 }
486
487 // syncRelay performs a negentropy sync with a single relay. The url, hopDistance,
488 // and totalSyncs are snapshot values read under the lock by the caller; the
489 // relay state in the frontier is updated under the lock after sync completes.
490 func (c *Crawler) syncRelay(url string, hopDistance int, totalSyncs int64) {
491 ctx, cancel := context.WithTimeout(c.ctx, c.config.SyncTimeout)
492 defer cancel()
493
494 log.I.F("crawler: syncing %s (hop %d, syncs: %d)", url, hopDistance, totalSyncs)
495
496 negCfg := &negentropy.Config{
497 Peers: []string{url},
498 SyncInterval: 60 * time.Second,
499 FrameSize: 128 * 1024,
500 IDSize: 16,
501 MaxEvents: c.config.MaxEventsPerSync,
502 }
503
504 negMgr := negentropy.NewManager(c.db, negCfg)
505 negMgr.TriggerSync(ctx, url)
506 peerState, ok := negMgr.GetPeerState(url)
507
508 c.mu.Lock()
509 rs, exists := c.frontier[url]
510 if !exists {
511 c.mu.Unlock()
512 return
513 }
514
515 rs.LastSync = time.Now()
516 rs.TotalSyncs++
517
518 if !ok || peerState.Status == "error" {
519 rs.ConsecFailures++
520 if ok && peerState.LastError != "" {
521 rs.LastError = peerState.LastError
522 } else {
523 rs.LastError = "sync failed"
524 }
525
526 if rs.ConsecFailures >= c.config.MaxFailures {
527 rs.BlacklistedUntil = time.Now().Add(c.config.BlacklistDuration)
528 c.stats.BlacklistedRelays++
529 log.W.F("crawler: blacklisted %s after %d failures (until %v)",
530 rs.URL, rs.ConsecFailures, rs.BlacklistedUntil)
531 }
532
533 c.stats.TotalSyncErrors++
534 log.D.F("crawler: sync %s failed (%d consecutive): %s",
535 rs.URL, rs.ConsecFailures, rs.LastError)
536 } else {
537 eventsSynced := peerState.EventsSynced
538 rs.ConsecFailures = 0
539 rs.LastError = ""
540 rs.EventsSynced += eventsSynced
541 c.stats.TotalEventsSynced += eventsSynced
542 c.stats.TotalRelaysSynced++
543
544 if eventsSynced > 0 {
545 log.I.F("crawler: synced %d events from %s", eventsSynced, rs.URL)
546 }
547 }
548 c.mu.Unlock()
549 }
550
551 func (c *Crawler) getRelaysFromLocalDB(seeds [][]byte) map[string]bool {
552 relays := make(map[string]bool)
553
554 f := &filter.F{
555 Authors: tag.NewFromBytesSlice(seeds...),
556 Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
557 }
558
559 events, err := c.db.QueryEvents(c.ctx, f)
560 if err != nil {
561 log.W.F("crawler: failed to query local relay lists: %v", err)
562 return relays
563 }
564
565 for _, ev := range events {
566 rTags := ev.Tags.GetAll([]byte("r"))
567 for _, rTag := range rTags {
568 if rTag.Len() < 2 {
569 continue
570 }
571 urlBytes := rTag.Value()
572 if len(urlBytes) == 0 {
573 continue
574 }
575 normURL := string(normalize.URL(string(urlBytes)))
576 if normURL != "" {
577 relays[normURL] = true
578 }
579 }
580 }
581
582 return relays
583 }
584
585 func (c *Crawler) fetchRelayListsFromRelay(relayURL string) ([]string, error) {
586 ctx, cancel := context.WithTimeout(c.ctx, DefaultRelayTimeout)
587 defer cancel()
588
589 client, err := ws.RelayConnect(ctx, relayURL)
590 if err != nil {
591 return nil, fmt.Errorf("connect failed: %w", err)
592 }
593 defer client.Close()
594
595 limit := uint(MaxEventsPerQuery)
596 ff := filter.NewS(&filter.F{
597 Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
598 Limit: &limit,
599 })
600
601 sub, err := client.Subscribe(ctx, ff)
602 if err != nil {
603 return nil, fmt.Errorf("subscribe failed: %w", err)
604 }
605 defer sub.Unsub()
606
607 var relays []string
608 seen := make(map[string]bool)
609
610 queryCtx, queryCancel := context.WithTimeout(ctx, QueryTimeout)
611 defer queryCancel()
612
613 for {
614 select {
615 case <-queryCtx.Done():
616 return relays, nil
617 case ev := <-sub.Events:
618 if ev == nil {
619 return relays, nil
620 }
621
622 if _, err := c.db.SaveEvent(c.ctx, ev); err != nil {
623 log.D.F("crawler: save event failed: %v", err)
624 }
625 if c.pub != nil {
626 c.pub.Deliver(ev)
627 }
628
629 rTags := ev.Tags.GetAll([]byte("r"))
630 for _, rTag := range rTags {
631 if rTag.Len() < 2 {
632 continue
633 }
634 normURL := string(normalize.URL(string(rTag.Value())))
635 if normURL != "" && !seen[normURL] {
636 seen[normURL] = true
637 relays = append(relays, normURL)
638 }
639 }
640 case <-sub.EndOfStoredEvents:
641 return relays, nil
642 }
643 }
644 }
645
646 // isSelfRelay checks if a relay URL belongs to this relay instance by comparing
647 // NIP-11 pubkeys. This does network IO and must NOT be called while holding mu.
648 func (c *Crawler) isSelfRelay(relayURL string) bool {
649 c.mu.RLock()
650 cached := c.selfURLs[relayURL]
651 c.mu.RUnlock()
652 if cached {
653 return true
654 }
655 if c.relayIdentityPubkey == "" {
656 return false
657 }
658
659 pubkey, err := c.nip11Cache.GetPubkey(c.ctx, relayURL)
660 if err != nil || pubkey == "" {
661 return false
662 }
663
664 if pubkey == c.relayIdentityPubkey {
665 c.mu.Lock()
666 c.selfURLs[relayURL] = true
667 c.mu.Unlock()
668 return true
669 }
670 return false
671 }
672
673 // loadFrontier loads persisted frontier state from database markers.
674 // Only called during New() before Start(), so no lock needed.
675 func (c *Crawler) loadFrontier() error {
676 data, err := c.db.GetMarker(markerFrontierKey)
677 if err != nil {
678 return err
679 }
680 if len(data) == 0 {
681 return nil
682 }
683
684 var frontier map[string]*RelayState
685 if err := json.Unmarshal(data, &frontier); err != nil {
686 return fmt.Errorf("unmarshal frontier: %w", err)
687 }
688
689 c.frontier = frontier
690 log.I.F("crawler: loaded frontier with %d relays", len(frontier))
691
692 statsData, err := c.db.GetMarker(markerStatsKey)
693 if err == nil && len(statsData) > 0 {
694 json.Unmarshal(statsData, &c.stats)
695 }
696
697 return nil
698 }
699
700 func (c *Crawler) saveFrontier() error {
701 c.mu.RLock()
702 data, err := json.Marshal(c.frontier)
703 if err != nil {
704 c.mu.RUnlock()
705 return fmt.Errorf("marshal frontier: %w", err)
706 }
707 statsData, err := json.Marshal(c.stats)
708 c.mu.RUnlock()
709 if err != nil {
710 return fmt.Errorf("marshal stats: %w", err)
711 }
712
713 if err := c.db.SetMarker(markerFrontierKey, data); err != nil {
714 return fmt.Errorf("save frontier: %w", err)
715 }
716 if err := c.db.SetMarker(markerStatsKey, statsData); err != nil {
717 return fmt.Errorf("save stats: %w", err)
718 }
719
720 return nil
721 }
722