spider.go raw
1 package spider
2
3 import (
4 "context"
5 "fmt"
6 "strings"
7 "sync"
8 "time"
9
10 "next.orly.dev/pkg/nostr/crypto/keys"
11 "next.orly.dev/pkg/nostr/encoders/filter"
12 "next.orly.dev/pkg/nostr/encoders/hex"
13 "next.orly.dev/pkg/nostr/encoders/tag"
14 "next.orly.dev/pkg/nostr/encoders/timestamp"
15 "next.orly.dev/pkg/nostr/ws"
16 "next.orly.dev/pkg/lol/chk"
17 "next.orly.dev/pkg/lol/errorf"
18 "next.orly.dev/pkg/lol/log"
19 "next.orly.dev/pkg/database"
20 "next.orly.dev/pkg/interfaces/publisher"
21 dsync "next.orly.dev/pkg/sync"
22 )
23
24 const (
25 // BatchSize is the number of pubkeys per subscription batch
26 BatchSize = 20
27 // CatchupWindow is the extra time added to disconnection periods for catch-up
28 CatchupWindow = 30 * time.Minute
29 // ReconnectDelay is the initial delay between reconnection attempts
30 ReconnectDelay = 10 * time.Second
31 // MaxReconnectDelay is the maximum delay before switching to blackout
32 MaxReconnectDelay = 1 * time.Hour
33 // BlackoutPeriod is the duration to blacklist a relay after max backoff is reached
34 BlackoutPeriod = 24 * time.Hour
35 // BatchCreationDelay is the delay between creating each batch subscription
36 BatchCreationDelay = 500 * time.Millisecond
37 // RateLimitBackoffDuration is how long to wait when we get a rate limit error
38 RateLimitBackoffDuration = 1 * time.Minute
39 // RateLimitBackoffMultiplier is the factor by which we increase backoff on repeated rate limits
40 RateLimitBackoffMultiplier = 2
41 // MaxRateLimitBackoff is the maximum backoff duration for rate limiting
42 MaxRateLimitBackoff = 30 * time.Minute
43 // MainLoopInterval is how often the spider checks for updates
44 MainLoopInterval = 5 * time.Minute
45 // EventHandlerBufferSize is the buffer size for event channels
46 EventHandlerBufferSize = 100
47 )
48
49 // Spider manages connections to admin relays and syncs events for followed pubkeys
50 type Spider struct {
51 ctx context.Context
52 cancel context.CancelFunc
53 db *database.D
54 pub publisher.I
55 mode string
56
57 // Configuration
58 adminRelays []string
59 followList [][]byte
60 relayIdentityPubkey string // Our relay's identity pubkey (hex)
61 selfURLs map[string]bool // URLs discovered to be ourselves (for fast lookups)
62
63 // State management
64 mu sync.RWMutex
65 connections map[string]*RelayConnection
66 running bool
67
68 // Callbacks for getting updated data
69 getAdminRelays func() []string
70 getFollowList func() [][]byte
71
72 // Notification channel for follow list updates
73 followListUpdated chan struct{}
74 }
75
76 // RelayConnection manages a single relay connection and its subscriptions
77 type RelayConnection struct {
78 url string
79 client *ws.Client
80 ctx context.Context
81 cancel context.CancelFunc
82 spider *Spider
83
84 // Subscription management
85 mu sync.RWMutex
86 subscriptions map[string]*BatchSubscription
87
88 // Disconnection tracking
89 lastDisconnect time.Time
90 reconnectDelay time.Duration
91 connectionStartTime time.Time
92
93 // Blackout tracking for IP filters
94 blackoutUntil time.Time
95
96 // Rate limiting tracking
97 rateLimitBackoff time.Duration
98 rateLimitUntil time.Time
99 }
100
101 // BatchSubscription represents a subscription for a batch of pubkeys
102 type BatchSubscription struct {
103 id string
104 pubkeys [][]byte
105 startTime time.Time
106 sub *ws.Subscription
107 relay *RelayConnection
108
109 // Track disconnection periods for catch-up
110 disconnectedAt *time.Time
111 }
112
113 // DisconnectionPeriod tracks when a subscription was disconnected
114 type DisconnectionPeriod struct {
115 Start time.Time
116 End time.Time
117 }
118
119 // New creates a new Spider instance
120 func New(ctx context.Context, db *database.D, pub publisher.I, mode string) (s *Spider, err error) {
121 if db == nil {
122 err = errorf.E("database cannot be nil")
123 return
124 }
125
126 // Validate mode
127 switch mode {
128 case "follows", "none":
129 // Valid modes
130 default:
131 err = errorf.E("invalid spider mode: %s (valid modes: none, follows)", mode)
132 return
133 }
134
135 ctx, cancel := context.WithCancel(ctx)
136
137 // Get relay identity pubkey for self-detection
138 var relayPubkey string
139 if skb, err := db.GetRelayIdentitySecret(); err == nil && len(skb) == 32 {
140 pk, _ := keys.SecretBytesToPubKeyHex(skb)
141 relayPubkey = pk
142 }
143
144 s = &Spider{
145 ctx: ctx,
146 cancel: cancel,
147 db: db,
148 pub: pub,
149 mode: mode,
150 relayIdentityPubkey: relayPubkey,
151 selfURLs: make(map[string]bool),
152 connections: make(map[string]*RelayConnection),
153 followListUpdated: make(chan struct{}, 1),
154 }
155
156 return
157 }
158
159 // SetCallbacks sets the callback functions for getting updated admin relays and follow lists
160 func (s *Spider) SetCallbacks(getAdminRelays func() []string, getFollowList func() [][]byte) {
161 s.mu.Lock()
162 defer s.mu.Unlock()
163 s.getAdminRelays = getAdminRelays
164 s.getFollowList = getFollowList
165 }
166
167 // NotifyFollowListUpdate signals the spider that the follow list has been updated
168 func (s *Spider) NotifyFollowListUpdate() {
169 if s.followListUpdated != nil {
170 select {
171 case s.followListUpdated <- struct{}{}:
172 log.D.F("spider: follow list update notification sent")
173 default:
174 // Channel full, update already pending
175 log.D.F("spider: follow list update notification already pending")
176 }
177 }
178 }
179
180 // Start begins the spider operation
181 func (s *Spider) Start() (err error) {
182 s.mu.Lock()
183 defer s.mu.Unlock()
184
185 if s.running {
186 err = errorf.E("spider already running")
187 return
188 }
189
190 // Handle 'none' mode - no-op
191 if s.mode == "none" {
192 log.I.F("spider: mode is 'none', not starting")
193 return
194 }
195
196 if s.getAdminRelays == nil || s.getFollowList == nil {
197 err = errorf.E("callbacks must be set before starting")
198 return
199 }
200
201 s.running = true
202
203 // Start the main loop
204 go s.mainLoop()
205
206 log.I.F("spider: started in '%s' mode", s.mode)
207 return
208 }
209
210 // Stop stops the spider operation
211 func (s *Spider) Stop() {
212 s.mu.Lock()
213 defer s.mu.Unlock()
214
215 if !s.running {
216 return
217 }
218
219 s.running = false
220 s.cancel()
221
222 // Close all connections
223 for _, conn := range s.connections {
224 conn.close()
225 }
226 s.connections = make(map[string]*RelayConnection)
227
228 log.I.F("spider: stopped")
229 }
230
231 // mainLoop is the main spider loop that manages connections and subscriptions
232 func (s *Spider) mainLoop() {
233 ticker := time.NewTicker(MainLoopInterval)
234 defer ticker.Stop()
235
236 log.D.F("spider: main loop started, checking every %v", MainLoopInterval)
237
238 for {
239 select {
240 case <-s.ctx.Done():
241 return
242 case <-s.followListUpdated:
243 log.D.F("spider: follow list updated, refreshing connections")
244 s.updateConnections()
245 case <-ticker.C:
246 log.D.F("spider: periodic check triggered")
247 s.updateConnections()
248 }
249 }
250 }
251
252 // updateConnections updates relay connections based on current admin relays and follow lists
253 func (s *Spider) updateConnections() {
254 s.mu.Lock()
255 if !s.running {
256 s.mu.Unlock()
257 return
258 }
259
260 // Get current admin relays and follow list
261 adminRelays := s.getAdminRelays()
262 followList := s.getFollowList()
263 s.mu.Unlock()
264
265 if len(adminRelays) == 0 || len(followList) == 0 {
266 log.D.F("spider: no admin relays (%d) or follow list (%d) available",
267 len(adminRelays), len(followList))
268 return
269 }
270
271 // Filter out self-relays WITHOUT holding mu — isSelfRelay takes mu internally
272 var filteredRelays []string
273 for _, url := range adminRelays {
274 if s.isSelfRelay(url) {
275 log.D.F("spider: skipping self-relay: %s", url)
276 continue
277 }
278 filteredRelays = append(filteredRelays, url)
279 }
280
281 // Re-acquire lock for the mutation phase
282 s.mu.Lock()
283 defer s.mu.Unlock()
284
285 currentRelays := make(map[string]bool)
286 for _, url := range filteredRelays {
287 currentRelays[url] = true
288
289 if conn, exists := s.connections[url]; exists {
290 // Update existing connection
291 conn.updateSubscriptions(followList)
292 } else {
293 // Create new connection
294 s.createConnection(url, followList)
295 }
296 }
297
298 // Remove connections for relays no longer in admin list
299 for url, conn := range s.connections {
300 if !currentRelays[url] {
301 log.D.F("spider: removing connection to %s (no longer in admin relays)", url)
302 conn.close()
303 delete(s.connections, url)
304 }
305 }
306 }
307
308 // createConnection creates a new relay connection
309 func (s *Spider) createConnection(url string, followList [][]byte) {
310 log.D.F("spider: creating connection to %s", url)
311
312 ctx, cancel := context.WithCancel(s.ctx)
313 conn := &RelayConnection{
314 url: url,
315 ctx: ctx,
316 cancel: cancel,
317 spider: s,
318 subscriptions: make(map[string]*BatchSubscription),
319 reconnectDelay: ReconnectDelay,
320 }
321
322 s.connections[url] = conn
323
324 // Start connection in goroutine
325 go conn.manage(followList)
326 }
327
328 // manage handles the lifecycle of a relay connection
329 func (rc *RelayConnection) manage(followList [][]byte) {
330 for {
331 // Check context first
332 select {
333 case <-rc.ctx.Done():
334 log.D.F("spider: connection manager for %s stopping (context done)", rc.url)
335 return
336 default:
337 }
338
339 // Check if relay is blacked out
340 if rc.isBlackedOut() {
341 waitDuration := time.Until(rc.blackoutUntil)
342 log.D.F("spider: %s is blacked out for %v more", rc.url, waitDuration)
343
344 // Wait for blackout to expire or context cancellation
345 select {
346 case <-rc.ctx.Done():
347 return
348 case <-time.After(waitDuration):
349 // Blackout expired, reset delay and try again
350 rc.reconnectDelay = ReconnectDelay
351 log.D.F("spider: blackout period ended for %s, retrying", rc.url)
352 }
353 continue
354 }
355
356 // Attempt to connect
357 log.D.F("spider: attempting to connect to %s (backoff: %v)", rc.url, rc.reconnectDelay)
358 if err := rc.connect(); chk.E(err) {
359 log.W.F("spider: failed to connect to %s: %v", rc.url, err)
360 rc.waitBeforeReconnect()
361 continue
362 }
363
364 log.D.F("spider: connected to %s", rc.url)
365 rc.connectionStartTime = time.Now()
366
367 // Only reset reconnect delay on successful connection
368 // (don't reset if we had a quick disconnect before)
369 if rc.reconnectDelay > ReconnectDelay*8 {
370 // Gradual recovery: reduce by half instead of full reset
371 rc.reconnectDelay = rc.reconnectDelay / 2
372 log.D.F("spider: reducing backoff for %s to %v", rc.url, rc.reconnectDelay)
373 } else {
374 rc.reconnectDelay = ReconnectDelay
375 }
376 rc.blackoutUntil = time.Time{} // Clear blackout on successful connection
377
378 // Create subscriptions for follow list
379 rc.createSubscriptions(followList)
380
381 // Wait for disconnection
382 <-rc.client.Context().Done()
383
384 log.W.F("spider: disconnected from %s: %v", rc.url, rc.client.ConnectionCause())
385
386 // Check if disconnection happened very quickly (likely IP filter or ban)
387 connectionDuration := time.Since(rc.connectionStartTime)
388 const quickDisconnectThreshold = 2 * time.Minute
389 if connectionDuration < quickDisconnectThreshold {
390 log.W.F("spider: quick disconnection from %s after %v (likely connection issue/ban)", rc.url, connectionDuration)
391 // Don't reset the delay, keep the backoff and increase it
392 rc.waitBeforeReconnect()
393 } else {
394 // Normal disconnection after decent uptime - gentle backoff
395 log.D.F("spider: normal disconnection from %s after %v uptime", rc.url, connectionDuration)
396 // Small delay before reconnecting
397 select {
398 case <-rc.ctx.Done():
399 return
400 case <-time.After(5 * time.Second):
401 }
402 }
403
404 rc.handleDisconnection()
405
406 // Clean up
407 rc.client = nil
408 rc.clearSubscriptions()
409 }
410 }
411
412 // connect establishes a websocket connection to the relay
413 func (rc *RelayConnection) connect() (err error) {
414 connectCtx, cancel := context.WithTimeout(rc.ctx, 10*time.Second)
415 defer cancel()
416
417 // Create client with notice handler to detect rate limiting
418 rc.client, err = ws.RelayConnect(connectCtx, rc.url, ws.WithNoticeHandler(rc.handleNotice))
419 if chk.E(err) {
420 return
421 }
422
423 return
424 }
425
426 // handleNotice processes NOTICE messages from the relay
427 func (rc *RelayConnection) handleNotice(notice []byte) {
428 noticeStr := string(notice)
429 log.D.F("spider: NOTICE from %s: '%s'", rc.url, noticeStr)
430
431 // Check for rate limiting errors
432 if strings.Contains(noticeStr, "too many concurrent REQs") ||
433 strings.Contains(noticeStr, "rate limit") ||
434 strings.Contains(noticeStr, "slow down") {
435 rc.handleRateLimit()
436 }
437 }
438
439 // handleRateLimit applies backoff when rate limiting is detected
440 func (rc *RelayConnection) handleRateLimit() {
441 rc.mu.Lock()
442 defer rc.mu.Unlock()
443
444 // Initialize backoff if not set
445 if rc.rateLimitBackoff == 0 {
446 rc.rateLimitBackoff = RateLimitBackoffDuration
447 } else {
448 // Exponential backoff
449 rc.rateLimitBackoff *= RateLimitBackoffMultiplier
450 if rc.rateLimitBackoff > MaxRateLimitBackoff {
451 rc.rateLimitBackoff = MaxRateLimitBackoff
452 }
453 }
454
455 rc.rateLimitUntil = time.Now().Add(rc.rateLimitBackoff)
456 log.W.F("spider: rate limit detected on %s, backing off for %v until %v",
457 rc.url, rc.rateLimitBackoff, rc.rateLimitUntil)
458
459 // Close all current subscriptions to reduce load
460 rc.clearSubscriptionsLocked()
461 }
462
463 // waitBeforeReconnect waits before attempting to reconnect with exponential backoff
464 func (rc *RelayConnection) waitBeforeReconnect() {
465 log.D.F("spider: waiting %v before reconnecting to %s", rc.reconnectDelay, rc.url)
466
467 select {
468 case <-rc.ctx.Done():
469 return
470 case <-time.After(rc.reconnectDelay):
471 }
472
473 // Exponential backoff - double every time
474 // 10s -> 20s -> 40s -> 80s (1.3m) -> 160s (2.7m) -> 320s (5.3m) -> 640s (10.7m) -> 1280s (21m) -> 2560s (42m) -> 3600s (1h)
475 rc.reconnectDelay *= 2
476
477 // Cap at MaxReconnectDelay (1 hour), then switch to 24-hour blackout
478 if rc.reconnectDelay >= MaxReconnectDelay {
479 rc.blackoutUntil = time.Now().Add(BlackoutPeriod)
480 rc.reconnectDelay = ReconnectDelay // Reset for after blackout
481 log.W.F("spider: max reconnect backoff reached for %s, entering 24-hour blackout period", rc.url)
482 }
483 }
484
485 // isBlackedOut returns true if the relay is currently blacked out
486 func (rc *RelayConnection) isBlackedOut() bool {
487 return !rc.blackoutUntil.IsZero() && time.Now().Before(rc.blackoutUntil)
488 }
489
490 // handleDisconnection records disconnection time for catch-up logic
491 func (rc *RelayConnection) handleDisconnection() {
492 now := time.Now()
493 rc.lastDisconnect = now
494
495 // Mark all subscriptions as disconnected
496 rc.mu.Lock()
497 defer rc.mu.Unlock()
498
499 for _, sub := range rc.subscriptions {
500 if sub.disconnectedAt == nil {
501 sub.disconnectedAt = &now
502 }
503 }
504 }
505
506 // createSubscriptions creates batch subscriptions for the follow list
507 func (rc *RelayConnection) createSubscriptions(followList [][]byte) {
508 rc.mu.Lock()
509
510 // Check if we're in a rate limit backoff period
511 if time.Now().Before(rc.rateLimitUntil) {
512 remaining := time.Until(rc.rateLimitUntil)
513 rc.mu.Unlock()
514 log.W.F("spider: skipping subscription creation for %s, rate limited for %v more", rc.url, remaining)
515
516 // Schedule retry after backoff period
517 go func() {
518 time.Sleep(remaining)
519 rc.createSubscriptions(followList)
520 }()
521 return
522 }
523
524 // Clear rate limit backoff on successful subscription attempt
525 rc.rateLimitBackoff = 0
526 rc.rateLimitUntil = time.Time{}
527
528 // Clear existing subscriptions
529 rc.clearSubscriptionsLocked()
530
531 // Create batches of pubkeys
532 batches := rc.createBatches(followList)
533
534 log.D.F("spider: creating %d subscription batches for %d pubkeys on %s",
535 len(batches), len(followList), rc.url)
536
537 // Release lock before creating subscriptions to avoid holding it during delays
538 rc.mu.Unlock()
539
540 for i, batch := range batches {
541 // Check context before creating each batch
542 select {
543 case <-rc.ctx.Done():
544 return
545 default:
546 }
547
548 batchID := fmt.Sprintf("batch-%d", i)
549
550 rc.mu.Lock()
551 rc.createBatchSubscription(batchID, batch)
552 rc.mu.Unlock()
553
554 // Add delay between batches to avoid overwhelming the relay
555 if i < len(batches)-1 { // Don't delay after the last batch
556 time.Sleep(BatchCreationDelay)
557 }
558 }
559 }
560
561 // createBatches splits the follow list into batches of BatchSize
562 func (rc *RelayConnection) createBatches(followList [][]byte) (batches [][][]byte) {
563 for i := 0; i < len(followList); i += BatchSize {
564 end := i + BatchSize
565 if end > len(followList) {
566 end = len(followList)
567 }
568
569 batch := make([][]byte, end-i)
570 copy(batch, followList[i:end])
571 batches = append(batches, batch)
572 }
573 return
574 }
575
576 // createBatchSubscription creates a subscription for a batch of pubkeys
577 func (rc *RelayConnection) createBatchSubscription(batchID string, pubkeys [][]byte) {
578 if rc.client == nil {
579 return
580 }
581
582 // Create filters: one for authors, one for p tags
583 // For #p tag filters, all pubkeys must be in a single tag array as hex-encoded strings
584 tagElements := [][]byte{[]byte("p")} // First element is the key
585 for _, pk := range pubkeys {
586 pkHex := hex.EncAppend(nil, pk)
587 tagElements = append(tagElements, pkHex)
588 }
589 pTags := &tag.S{tag.NewFromBytesSlice(tagElements...)}
590
591 filters := filter.NewS(
592 &filter.F{
593 Authors: tag.NewFromBytesSlice(pubkeys...),
594 },
595 &filter.F{
596 Tags: pTags,
597 },
598 )
599
600 // Subscribe
601 sub, err := rc.client.Subscribe(rc.ctx, filters)
602 if chk.E(err) {
603 log.E.F("spider: failed to create subscription %s on %s: %v", batchID, rc.url, err)
604 return
605 }
606
607 batchSub := &BatchSubscription{
608 id: batchID,
609 pubkeys: pubkeys,
610 startTime: time.Now(),
611 sub: sub,
612 relay: rc,
613 }
614
615 rc.subscriptions[batchID] = batchSub
616
617 // Start event handler
618 go batchSub.handleEvents()
619
620 log.D.F("spider: created subscription %s for %d pubkeys on %s",
621 batchID, len(pubkeys), rc.url)
622 }
623
624 // handleEvents processes events from the subscription
625 func (bs *BatchSubscription) handleEvents() {
626 // Throttle event processing to avoid CPU spikes
627 ticker := time.NewTicker(10 * time.Millisecond)
628 defer ticker.Stop()
629
630 for {
631 select {
632 case <-bs.relay.ctx.Done():
633 return
634 case ev := <-bs.sub.Events:
635 if ev == nil {
636 return // Subscription closed
637 }
638
639 // Wait for throttle tick to avoid processing events too rapidly
640 <-ticker.C
641
642 // Save event to database
643 if _, err := bs.relay.spider.db.SaveEvent(bs.relay.ctx, ev); err != nil {
644 // Ignore duplicate events and other errors
645 log.T.F("spider: failed to save event from %s: %v", bs.relay.url, err)
646 } else {
647 // Publish event if it was newly saved
648 if bs.relay.spider.pub != nil {
649 go bs.relay.spider.pub.Deliver(ev)
650 }
651 log.T.F("spider: saved event from %s", bs.relay.url)
652 }
653 }
654 }
655 }
656
657 // updateSubscriptions updates subscriptions for a connection with new follow list
658 func (rc *RelayConnection) updateSubscriptions(followList [][]byte) {
659 if rc.client == nil || !rc.client.IsConnected() {
660 return // Will be handled on reconnection
661 }
662
663 rc.mu.Lock()
664
665 // Check if we're in a rate limit backoff period
666 if time.Now().Before(rc.rateLimitUntil) {
667 remaining := time.Until(rc.rateLimitUntil)
668 rc.mu.Unlock()
669 log.D.F("spider: deferring subscription update for %s, rate limited for %v more", rc.url, remaining)
670 return
671 }
672
673 // Check if we need to perform catch-up for disconnected subscriptions
674 now := time.Now()
675 needsCatchup := false
676
677 for _, sub := range rc.subscriptions {
678 if sub.disconnectedAt != nil {
679 needsCatchup = true
680 rc.performCatchup(sub, *sub.disconnectedAt, now, followList)
681 sub.disconnectedAt = nil // Clear disconnection marker
682 }
683 }
684
685 if needsCatchup {
686 log.D.F("spider: performed catch-up for disconnected subscriptions on %s", rc.url)
687 }
688
689 // Recreate subscriptions with updated follow list
690 rc.clearSubscriptionsLocked()
691
692 batches := rc.createBatches(followList)
693
694 // Release lock before creating subscriptions
695 rc.mu.Unlock()
696
697 for i, batch := range batches {
698 // Check context before creating each batch
699 select {
700 case <-rc.ctx.Done():
701 return
702 default:
703 }
704
705 batchID := fmt.Sprintf("batch-%d", i)
706
707 rc.mu.Lock()
708 rc.createBatchSubscription(batchID, batch)
709 rc.mu.Unlock()
710
711 // Add delay between batches
712 if i < len(batches)-1 {
713 time.Sleep(BatchCreationDelay)
714 }
715 }
716 }
717
718 // performCatchup queries for events missed during disconnection
719 func (rc *RelayConnection) performCatchup(sub *BatchSubscription, disconnectTime, reconnectTime time.Time, followList [][]byte) {
720 // Expand time window by CatchupWindow on both sides
721 since := disconnectTime.Add(-CatchupWindow)
722 until := reconnectTime.Add(CatchupWindow)
723
724 log.D.F("spider: performing catch-up for %s from %v to %v (expanded window)",
725 rc.url, since, until)
726
727 // Create catch-up filters with time constraints
728 sinceTs := timestamp.T{V: since.Unix()}
729 untilTs := timestamp.T{V: until.Unix()}
730
731 // Create filters with hex-encoded pubkeys for #p tags
732 // All pubkeys must be in a single tag array
733 tagElements := [][]byte{[]byte("p")} // First element is the key
734 for _, pk := range sub.pubkeys {
735 pkHex := hex.EncAppend(nil, pk)
736 tagElements = append(tagElements, pkHex)
737 }
738 pTags := &tag.S{tag.NewFromBytesSlice(tagElements...)}
739
740 filters := filter.NewS(
741 &filter.F{
742 Authors: tag.NewFromBytesSlice(sub.pubkeys...),
743 Since: &sinceTs,
744 Until: &untilTs,
745 },
746 &filter.F{
747 Tags: pTags,
748 Since: &sinceTs,
749 Until: &untilTs,
750 },
751 )
752
753 // Create temporary subscription for catch-up
754 catchupCtx, cancel := context.WithTimeout(rc.ctx, 30*time.Second)
755 defer cancel()
756
757 catchupSub, err := rc.client.Subscribe(catchupCtx, filters)
758 if chk.E(err) {
759 log.E.F("spider: failed to create catch-up subscription on %s: %v", rc.url, err)
760 return
761 }
762 defer catchupSub.Unsub()
763
764 // Process catch-up events with throttling
765 eventCount := 0
766 timeout := time.After(60 * time.Second) // Increased timeout for catch-up
767 throttle := time.NewTicker(20 * time.Millisecond)
768 defer throttle.Stop()
769
770 for {
771 select {
772 case <-catchupCtx.Done():
773 log.D.F("spider: catch-up completed on %s, processed %d events", rc.url, eventCount)
774 return
775 case <-timeout:
776 log.D.F("spider: catch-up timeout on %s, processed %d events", rc.url, eventCount)
777 return
778 case <-catchupSub.EndOfStoredEvents:
779 log.D.F("spider: catch-up EOSE on %s, processed %d events", rc.url, eventCount)
780 return
781 case ev := <-catchupSub.Events:
782 if ev == nil {
783 return
784 }
785
786 // Throttle event processing
787 <-throttle.C
788
789 eventCount++
790
791 // Save event to database
792 if _, err := rc.spider.db.SaveEvent(rc.ctx, ev); err != nil {
793 // Silently ignore errors (mostly duplicates)
794 } else {
795 // Publish event if it was newly saved
796 if rc.spider.pub != nil {
797 go rc.spider.pub.Deliver(ev)
798 }
799 log.T.F("spider: catch-up saved event %s from %s",
800 hex.Enc(ev.ID[:]), rc.url)
801 }
802 }
803 }
804 }
805
806 // clearSubscriptions clears all subscriptions (with lock)
807 func (rc *RelayConnection) clearSubscriptions() {
808 rc.mu.Lock()
809 defer rc.mu.Unlock()
810 rc.clearSubscriptionsLocked()
811 }
812
813 // clearSubscriptionsLocked clears all subscriptions (without lock)
814 func (rc *RelayConnection) clearSubscriptionsLocked() {
815 for _, sub := range rc.subscriptions {
816 if sub.sub != nil {
817 sub.sub.Unsub()
818 }
819 }
820 rc.subscriptions = make(map[string]*BatchSubscription)
821 }
822
823 // close closes the relay connection
824 func (rc *RelayConnection) close() {
825 rc.clearSubscriptions()
826
827 if rc.client != nil {
828 rc.client.Close()
829 rc.client = nil
830 }
831
832 rc.cancel()
833 }
834
835 // isSelfRelay checks if a relay URL is actually ourselves by comparing NIP-11 pubkeys
836 func (s *Spider) isSelfRelay(relayURL string) bool {
837 // If we don't have a relay identity pubkey, can't compare
838 if s.relayIdentityPubkey == "" {
839 return false
840 }
841
842 s.mu.RLock()
843 // Fast path: check if we already know this URL is ours
844 if s.selfURLs[relayURL] {
845 s.mu.RUnlock()
846 log.D.F("spider: skipping self-relay (known URL): %s", relayURL)
847 return true
848 }
849 s.mu.RUnlock()
850
851 // Slow path: check via NIP-11 pubkey
852 nip11Cache := dsync.NewNIP11Cache(30 * time.Minute)
853 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
854 defer cancel()
855
856 peerPubkey, err := nip11Cache.GetPubkey(ctx, relayURL)
857 if err != nil {
858 log.D.F("spider: couldn't fetch NIP-11 for %s: %v", relayURL, err)
859 return false
860 }
861
862 if peerPubkey == s.relayIdentityPubkey {
863 log.D.F("spider: discovered self-relay: %s (pubkey: %s)", relayURL, s.relayIdentityPubkey)
864 // Cache this URL as ours for future fast lookups
865 s.mu.Lock()
866 s.selfURLs[relayURL] = true
867 s.mu.Unlock()
868 return true
869 }
870
871 return false
872 }
873