spider.go raw

   1  package spider
   2  
   3  import (
   4  	"context"
   5  	"fmt"
   6  	"strings"
   7  	"sync"
   8  	"time"
   9  
  10  	"next.orly.dev/pkg/nostr/crypto/keys"
  11  	"next.orly.dev/pkg/nostr/encoders/filter"
  12  	"next.orly.dev/pkg/nostr/encoders/hex"
  13  	"next.orly.dev/pkg/nostr/encoders/tag"
  14  	"next.orly.dev/pkg/nostr/encoders/timestamp"
  15  	"next.orly.dev/pkg/nostr/ws"
  16  	"next.orly.dev/pkg/lol/chk"
  17  	"next.orly.dev/pkg/lol/errorf"
  18  	"next.orly.dev/pkg/lol/log"
  19  	"next.orly.dev/pkg/database"
  20  	"next.orly.dev/pkg/interfaces/publisher"
  21  	dsync "next.orly.dev/pkg/sync"
  22  )
  23  
  24  const (
  25  	// BatchSize is the number of pubkeys per subscription batch
  26  	BatchSize = 20
  27  	// CatchupWindow is the extra time added to disconnection periods for catch-up
  28  	CatchupWindow = 30 * time.Minute
  29  	// ReconnectDelay is the initial delay between reconnection attempts
  30  	ReconnectDelay = 10 * time.Second
  31  	// MaxReconnectDelay is the maximum delay before switching to blackout
  32  	MaxReconnectDelay = 1 * time.Hour
  33  	// BlackoutPeriod is the duration to blacklist a relay after max backoff is reached
  34  	BlackoutPeriod = 24 * time.Hour
  35  	// BatchCreationDelay is the delay between creating each batch subscription
  36  	BatchCreationDelay = 500 * time.Millisecond
  37  	// RateLimitBackoffDuration is how long to wait when we get a rate limit error
  38  	RateLimitBackoffDuration = 1 * time.Minute
  39  	// RateLimitBackoffMultiplier is the factor by which we increase backoff on repeated rate limits
  40  	RateLimitBackoffMultiplier = 2
  41  	// MaxRateLimitBackoff is the maximum backoff duration for rate limiting
  42  	MaxRateLimitBackoff = 30 * time.Minute
  43  	// MainLoopInterval is how often the spider checks for updates
  44  	MainLoopInterval = 5 * time.Minute
  45  	// EventHandlerBufferSize is the buffer size for event channels
  46  	EventHandlerBufferSize = 100
  47  )
  48  
  49  // Spider manages connections to admin relays and syncs events for followed pubkeys
  50  type Spider struct {
  51  	ctx    context.Context
  52  	cancel context.CancelFunc
  53  	db     *database.D
  54  	pub    publisher.I
  55  	mode   string
  56  
  57  	// Configuration
  58  	adminRelays       []string
  59  	followList        [][]byte
  60  	relayIdentityPubkey string          // Our relay's identity pubkey (hex)
  61  	selfURLs          map[string]bool // URLs discovered to be ourselves (for fast lookups)
  62  
  63  	// State management
  64  	mu          sync.RWMutex
  65  	connections map[string]*RelayConnection
  66  	running     bool
  67  
  68  	// Callbacks for getting updated data
  69  	getAdminRelays func() []string
  70  	getFollowList  func() [][]byte
  71  
  72  	// Notification channel for follow list updates
  73  	followListUpdated chan struct{}
  74  }
  75  
  76  // RelayConnection manages a single relay connection and its subscriptions
  77  type RelayConnection struct {
  78  	url    string
  79  	client *ws.Client
  80  	ctx    context.Context
  81  	cancel context.CancelFunc
  82  	spider *Spider
  83  
  84  	// Subscription management
  85  	mu            sync.RWMutex
  86  	subscriptions map[string]*BatchSubscription
  87  
  88  	// Disconnection tracking
  89  	lastDisconnect      time.Time
  90  	reconnectDelay      time.Duration
  91  	connectionStartTime time.Time
  92  
  93  	// Blackout tracking for IP filters
  94  	blackoutUntil time.Time
  95  
  96  	// Rate limiting tracking
  97  	rateLimitBackoff time.Duration
  98  	rateLimitUntil   time.Time
  99  }
 100  
 101  // BatchSubscription represents a subscription for a batch of pubkeys
 102  type BatchSubscription struct {
 103  	id        string
 104  	pubkeys   [][]byte
 105  	startTime time.Time
 106  	sub       *ws.Subscription
 107  	relay     *RelayConnection
 108  
 109  	// Track disconnection periods for catch-up
 110  	disconnectedAt *time.Time
 111  }
 112  
 113  // DisconnectionPeriod tracks when a subscription was disconnected
 114  type DisconnectionPeriod struct {
 115  	Start time.Time
 116  	End   time.Time
 117  }
 118  
 119  // New creates a new Spider instance
 120  func New(ctx context.Context, db *database.D, pub publisher.I, mode string) (s *Spider, err error) {
 121  	if db == nil {
 122  		err = errorf.E("database cannot be nil")
 123  		return
 124  	}
 125  
 126  	// Validate mode
 127  	switch mode {
 128  	case "follows", "none":
 129  		// Valid modes
 130  	default:
 131  		err = errorf.E("invalid spider mode: %s (valid modes: none, follows)", mode)
 132  		return
 133  	}
 134  
 135  	ctx, cancel := context.WithCancel(ctx)
 136  
 137  	// Get relay identity pubkey for self-detection
 138  	var relayPubkey string
 139  	if skb, err := db.GetRelayIdentitySecret(); err == nil && len(skb) == 32 {
 140  		pk, _ := keys.SecretBytesToPubKeyHex(skb)
 141  		relayPubkey = pk
 142  	}
 143  
 144  	s = &Spider{
 145  		ctx:                 ctx,
 146  		cancel:              cancel,
 147  		db:                  db,
 148  		pub:                 pub,
 149  		mode:                mode,
 150  		relayIdentityPubkey: relayPubkey,
 151  		selfURLs:            make(map[string]bool),
 152  		connections:         make(map[string]*RelayConnection),
 153  		followListUpdated:   make(chan struct{}, 1),
 154  	}
 155  
 156  	return
 157  }
 158  
 159  // SetCallbacks sets the callback functions for getting updated admin relays and follow lists
 160  func (s *Spider) SetCallbacks(getAdminRelays func() []string, getFollowList func() [][]byte) {
 161  	s.mu.Lock()
 162  	defer s.mu.Unlock()
 163  	s.getAdminRelays = getAdminRelays
 164  	s.getFollowList = getFollowList
 165  }
 166  
 167  // NotifyFollowListUpdate signals the spider that the follow list has been updated
 168  func (s *Spider) NotifyFollowListUpdate() {
 169  	if s.followListUpdated != nil {
 170  		select {
 171  		case s.followListUpdated <- struct{}{}:
 172  			log.D.F("spider: follow list update notification sent")
 173  		default:
 174  			// Channel full, update already pending
 175  			log.D.F("spider: follow list update notification already pending")
 176  		}
 177  	}
 178  }
 179  
 180  // Start begins the spider operation
 181  func (s *Spider) Start() (err error) {
 182  	s.mu.Lock()
 183  	defer s.mu.Unlock()
 184  
 185  	if s.running {
 186  		err = errorf.E("spider already running")
 187  		return
 188  	}
 189  
 190  	// Handle 'none' mode - no-op
 191  	if s.mode == "none" {
 192  		log.I.F("spider: mode is 'none', not starting")
 193  		return
 194  	}
 195  
 196  	if s.getAdminRelays == nil || s.getFollowList == nil {
 197  		err = errorf.E("callbacks must be set before starting")
 198  		return
 199  	}
 200  
 201  	s.running = true
 202  
 203  	// Start the main loop
 204  	go s.mainLoop()
 205  
 206  	log.I.F("spider: started in '%s' mode", s.mode)
 207  	return
 208  }
 209  
 210  // Stop stops the spider operation
 211  func (s *Spider) Stop() {
 212  	s.mu.Lock()
 213  	defer s.mu.Unlock()
 214  
 215  	if !s.running {
 216  		return
 217  	}
 218  
 219  	s.running = false
 220  	s.cancel()
 221  
 222  	// Close all connections
 223  	for _, conn := range s.connections {
 224  		conn.close()
 225  	}
 226  	s.connections = make(map[string]*RelayConnection)
 227  
 228  	log.I.F("spider: stopped")
 229  }
 230  
 231  // mainLoop is the main spider loop that manages connections and subscriptions
 232  func (s *Spider) mainLoop() {
 233  	ticker := time.NewTicker(MainLoopInterval)
 234  	defer ticker.Stop()
 235  
 236  	log.D.F("spider: main loop started, checking every %v", MainLoopInterval)
 237  
 238  	for {
 239  		select {
 240  		case <-s.ctx.Done():
 241  			return
 242  		case <-s.followListUpdated:
 243  			log.D.F("spider: follow list updated, refreshing connections")
 244  			s.updateConnections()
 245  		case <-ticker.C:
 246  			log.D.F("spider: periodic check triggered")
 247  			s.updateConnections()
 248  		}
 249  	}
 250  }
 251  
 252  // updateConnections updates relay connections based on current admin relays and follow lists
 253  func (s *Spider) updateConnections() {
 254  	s.mu.Lock()
 255  	if !s.running {
 256  		s.mu.Unlock()
 257  		return
 258  	}
 259  
 260  	// Get current admin relays and follow list
 261  	adminRelays := s.getAdminRelays()
 262  	followList := s.getFollowList()
 263  	s.mu.Unlock()
 264  
 265  	if len(adminRelays) == 0 || len(followList) == 0 {
 266  		log.D.F("spider: no admin relays (%d) or follow list (%d) available",
 267  			len(adminRelays), len(followList))
 268  		return
 269  	}
 270  
 271  	// Filter out self-relays WITHOUT holding mu — isSelfRelay takes mu internally
 272  	var filteredRelays []string
 273  	for _, url := range adminRelays {
 274  		if s.isSelfRelay(url) {
 275  			log.D.F("spider: skipping self-relay: %s", url)
 276  			continue
 277  		}
 278  		filteredRelays = append(filteredRelays, url)
 279  	}
 280  
 281  	// Re-acquire lock for the mutation phase
 282  	s.mu.Lock()
 283  	defer s.mu.Unlock()
 284  
 285  	currentRelays := make(map[string]bool)
 286  	for _, url := range filteredRelays {
 287  		currentRelays[url] = true
 288  
 289  		if conn, exists := s.connections[url]; exists {
 290  			// Update existing connection
 291  			conn.updateSubscriptions(followList)
 292  		} else {
 293  			// Create new connection
 294  			s.createConnection(url, followList)
 295  		}
 296  	}
 297  
 298  	// Remove connections for relays no longer in admin list
 299  	for url, conn := range s.connections {
 300  		if !currentRelays[url] {
 301  			log.D.F("spider: removing connection to %s (no longer in admin relays)", url)
 302  			conn.close()
 303  			delete(s.connections, url)
 304  		}
 305  	}
 306  }
 307  
 308  // createConnection creates a new relay connection
 309  func (s *Spider) createConnection(url string, followList [][]byte) {
 310  	log.D.F("spider: creating connection to %s", url)
 311  
 312  	ctx, cancel := context.WithCancel(s.ctx)
 313  	conn := &RelayConnection{
 314  		url:            url,
 315  		ctx:            ctx,
 316  		cancel:         cancel,
 317  		spider:         s,
 318  		subscriptions:  make(map[string]*BatchSubscription),
 319  		reconnectDelay: ReconnectDelay,
 320  	}
 321  
 322  	s.connections[url] = conn
 323  
 324  	// Start connection in goroutine
 325  	go conn.manage(followList)
 326  }
 327  
 328  // manage handles the lifecycle of a relay connection
 329  func (rc *RelayConnection) manage(followList [][]byte) {
 330  	for {
 331  		// Check context first
 332  		select {
 333  		case <-rc.ctx.Done():
 334  			log.D.F("spider: connection manager for %s stopping (context done)", rc.url)
 335  			return
 336  		default:
 337  		}
 338  
 339  		// Check if relay is blacked out
 340  		if rc.isBlackedOut() {
 341  			waitDuration := time.Until(rc.blackoutUntil)
 342  			log.D.F("spider: %s is blacked out for %v more", rc.url, waitDuration)
 343  
 344  			// Wait for blackout to expire or context cancellation
 345  			select {
 346  			case <-rc.ctx.Done():
 347  				return
 348  			case <-time.After(waitDuration):
 349  				// Blackout expired, reset delay and try again
 350  				rc.reconnectDelay = ReconnectDelay
 351  				log.D.F("spider: blackout period ended for %s, retrying", rc.url)
 352  			}
 353  			continue
 354  		}
 355  
 356  		// Attempt to connect
 357  		log.D.F("spider: attempting to connect to %s (backoff: %v)", rc.url, rc.reconnectDelay)
 358  		if err := rc.connect(); chk.E(err) {
 359  			log.W.F("spider: failed to connect to %s: %v", rc.url, err)
 360  			rc.waitBeforeReconnect()
 361  			continue
 362  		}
 363  
 364  		log.D.F("spider: connected to %s", rc.url)
 365  		rc.connectionStartTime = time.Now()
 366  
 367  		// Only reset reconnect delay on successful connection
 368  		// (don't reset if we had a quick disconnect before)
 369  		if rc.reconnectDelay > ReconnectDelay*8 {
 370  			// Gradual recovery: reduce by half instead of full reset
 371  			rc.reconnectDelay = rc.reconnectDelay / 2
 372  			log.D.F("spider: reducing backoff for %s to %v", rc.url, rc.reconnectDelay)
 373  		} else {
 374  			rc.reconnectDelay = ReconnectDelay
 375  		}
 376  		rc.blackoutUntil = time.Time{} // Clear blackout on successful connection
 377  
 378  		// Create subscriptions for follow list
 379  		rc.createSubscriptions(followList)
 380  
 381  		// Wait for disconnection
 382  		<-rc.client.Context().Done()
 383  
 384  		log.W.F("spider: disconnected from %s: %v", rc.url, rc.client.ConnectionCause())
 385  
 386  		// Check if disconnection happened very quickly (likely IP filter or ban)
 387  		connectionDuration := time.Since(rc.connectionStartTime)
 388  		const quickDisconnectThreshold = 2 * time.Minute
 389  		if connectionDuration < quickDisconnectThreshold {
 390  			log.W.F("spider: quick disconnection from %s after %v (likely connection issue/ban)", rc.url, connectionDuration)
 391  			// Don't reset the delay, keep the backoff and increase it
 392  			rc.waitBeforeReconnect()
 393  		} else {
 394  			// Normal disconnection after decent uptime - gentle backoff
 395  			log.D.F("spider: normal disconnection from %s after %v uptime", rc.url, connectionDuration)
 396  			// Small delay before reconnecting
 397  			select {
 398  			case <-rc.ctx.Done():
 399  				return
 400  			case <-time.After(5 * time.Second):
 401  			}
 402  		}
 403  
 404  		rc.handleDisconnection()
 405  
 406  		// Clean up
 407  		rc.client = nil
 408  		rc.clearSubscriptions()
 409  	}
 410  }
 411  
 412  // connect establishes a websocket connection to the relay
 413  func (rc *RelayConnection) connect() (err error) {
 414  	connectCtx, cancel := context.WithTimeout(rc.ctx, 10*time.Second)
 415  	defer cancel()
 416  
 417  	// Create client with notice handler to detect rate limiting
 418  	rc.client, err = ws.RelayConnect(connectCtx, rc.url, ws.WithNoticeHandler(rc.handleNotice))
 419  	if chk.E(err) {
 420  		return
 421  	}
 422  
 423  	return
 424  }
 425  
 426  // handleNotice processes NOTICE messages from the relay
 427  func (rc *RelayConnection) handleNotice(notice []byte) {
 428  	noticeStr := string(notice)
 429  	log.D.F("spider: NOTICE from %s: '%s'", rc.url, noticeStr)
 430  
 431  	// Check for rate limiting errors
 432  	if strings.Contains(noticeStr, "too many concurrent REQs") ||
 433  		strings.Contains(noticeStr, "rate limit") ||
 434  		strings.Contains(noticeStr, "slow down") {
 435  		rc.handleRateLimit()
 436  	}
 437  }
 438  
 439  // handleRateLimit applies backoff when rate limiting is detected
 440  func (rc *RelayConnection) handleRateLimit() {
 441  	rc.mu.Lock()
 442  	defer rc.mu.Unlock()
 443  
 444  	// Initialize backoff if not set
 445  	if rc.rateLimitBackoff == 0 {
 446  		rc.rateLimitBackoff = RateLimitBackoffDuration
 447  	} else {
 448  		// Exponential backoff
 449  		rc.rateLimitBackoff *= RateLimitBackoffMultiplier
 450  		if rc.rateLimitBackoff > MaxRateLimitBackoff {
 451  			rc.rateLimitBackoff = MaxRateLimitBackoff
 452  		}
 453  	}
 454  
 455  	rc.rateLimitUntil = time.Now().Add(rc.rateLimitBackoff)
 456  	log.W.F("spider: rate limit detected on %s, backing off for %v until %v",
 457  		rc.url, rc.rateLimitBackoff, rc.rateLimitUntil)
 458  
 459  	// Close all current subscriptions to reduce load
 460  	rc.clearSubscriptionsLocked()
 461  }
 462  
 463  // waitBeforeReconnect waits before attempting to reconnect with exponential backoff
 464  func (rc *RelayConnection) waitBeforeReconnect() {
 465  	log.D.F("spider: waiting %v before reconnecting to %s", rc.reconnectDelay, rc.url)
 466  
 467  	select {
 468  	case <-rc.ctx.Done():
 469  		return
 470  	case <-time.After(rc.reconnectDelay):
 471  	}
 472  
 473  	// Exponential backoff - double every time
 474  	// 10s -> 20s -> 40s -> 80s (1.3m) -> 160s (2.7m) -> 320s (5.3m) -> 640s (10.7m) -> 1280s (21m) -> 2560s (42m) -> 3600s (1h)
 475  	rc.reconnectDelay *= 2
 476  
 477  	// Cap at MaxReconnectDelay (1 hour), then switch to 24-hour blackout
 478  	if rc.reconnectDelay >= MaxReconnectDelay {
 479  		rc.blackoutUntil = time.Now().Add(BlackoutPeriod)
 480  		rc.reconnectDelay = ReconnectDelay // Reset for after blackout
 481  		log.W.F("spider: max reconnect backoff reached for %s, entering 24-hour blackout period", rc.url)
 482  	}
 483  }
 484  
 485  // isBlackedOut returns true if the relay is currently blacked out
 486  func (rc *RelayConnection) isBlackedOut() bool {
 487  	return !rc.blackoutUntil.IsZero() && time.Now().Before(rc.blackoutUntil)
 488  }
 489  
 490  // handleDisconnection records disconnection time for catch-up logic
 491  func (rc *RelayConnection) handleDisconnection() {
 492  	now := time.Now()
 493  	rc.lastDisconnect = now
 494  
 495  	// Mark all subscriptions as disconnected
 496  	rc.mu.Lock()
 497  	defer rc.mu.Unlock()
 498  
 499  	for _, sub := range rc.subscriptions {
 500  		if sub.disconnectedAt == nil {
 501  			sub.disconnectedAt = &now
 502  		}
 503  	}
 504  }
 505  
 506  // createSubscriptions creates batch subscriptions for the follow list
 507  func (rc *RelayConnection) createSubscriptions(followList [][]byte) {
 508  	rc.mu.Lock()
 509  
 510  	// Check if we're in a rate limit backoff period
 511  	if time.Now().Before(rc.rateLimitUntil) {
 512  		remaining := time.Until(rc.rateLimitUntil)
 513  		rc.mu.Unlock()
 514  		log.W.F("spider: skipping subscription creation for %s, rate limited for %v more", rc.url, remaining)
 515  
 516  		// Schedule retry after backoff period
 517  		go func() {
 518  			time.Sleep(remaining)
 519  			rc.createSubscriptions(followList)
 520  		}()
 521  		return
 522  	}
 523  
 524  	// Clear rate limit backoff on successful subscription attempt
 525  	rc.rateLimitBackoff = 0
 526  	rc.rateLimitUntil = time.Time{}
 527  
 528  	// Clear existing subscriptions
 529  	rc.clearSubscriptionsLocked()
 530  
 531  	// Create batches of pubkeys
 532  	batches := rc.createBatches(followList)
 533  
 534  	log.D.F("spider: creating %d subscription batches for %d pubkeys on %s",
 535  		len(batches), len(followList), rc.url)
 536  
 537  	// Release lock before creating subscriptions to avoid holding it during delays
 538  	rc.mu.Unlock()
 539  
 540  	for i, batch := range batches {
 541  		// Check context before creating each batch
 542  		select {
 543  		case <-rc.ctx.Done():
 544  			return
 545  		default:
 546  		}
 547  
 548  		batchID := fmt.Sprintf("batch-%d", i)
 549  
 550  		rc.mu.Lock()
 551  		rc.createBatchSubscription(batchID, batch)
 552  		rc.mu.Unlock()
 553  
 554  		// Add delay between batches to avoid overwhelming the relay
 555  		if i < len(batches)-1 { // Don't delay after the last batch
 556  			time.Sleep(BatchCreationDelay)
 557  		}
 558  	}
 559  }
 560  
 561  // createBatches splits the follow list into batches of BatchSize
 562  func (rc *RelayConnection) createBatches(followList [][]byte) (batches [][][]byte) {
 563  	for i := 0; i < len(followList); i += BatchSize {
 564  		end := i + BatchSize
 565  		if end > len(followList) {
 566  			end = len(followList)
 567  		}
 568  
 569  		batch := make([][]byte, end-i)
 570  		copy(batch, followList[i:end])
 571  		batches = append(batches, batch)
 572  	}
 573  	return
 574  }
 575  
 576  // createBatchSubscription creates a subscription for a batch of pubkeys
 577  func (rc *RelayConnection) createBatchSubscription(batchID string, pubkeys [][]byte) {
 578  	if rc.client == nil {
 579  		return
 580  	}
 581  
 582  	// Create filters: one for authors, one for p tags
 583  	// For #p tag filters, all pubkeys must be in a single tag array as hex-encoded strings
 584  	tagElements := [][]byte{[]byte("p")} // First element is the key
 585  	for _, pk := range pubkeys {
 586  		pkHex := hex.EncAppend(nil, pk)
 587  		tagElements = append(tagElements, pkHex)
 588  	}
 589  	pTags := &tag.S{tag.NewFromBytesSlice(tagElements...)}
 590  
 591  	filters := filter.NewS(
 592  		&filter.F{
 593  			Authors: tag.NewFromBytesSlice(pubkeys...),
 594  		},
 595  		&filter.F{
 596  			Tags: pTags,
 597  		},
 598  	)
 599  
 600  	// Subscribe
 601  	sub, err := rc.client.Subscribe(rc.ctx, filters)
 602  	if chk.E(err) {
 603  		log.E.F("spider: failed to create subscription %s on %s: %v", batchID, rc.url, err)
 604  		return
 605  	}
 606  
 607  	batchSub := &BatchSubscription{
 608  		id:        batchID,
 609  		pubkeys:   pubkeys,
 610  		startTime: time.Now(),
 611  		sub:       sub,
 612  		relay:     rc,
 613  	}
 614  
 615  	rc.subscriptions[batchID] = batchSub
 616  
 617  	// Start event handler
 618  	go batchSub.handleEvents()
 619  
 620  	log.D.F("spider: created subscription %s for %d pubkeys on %s",
 621  		batchID, len(pubkeys), rc.url)
 622  }
 623  
 624  // handleEvents processes events from the subscription
 625  func (bs *BatchSubscription) handleEvents() {
 626  	// Throttle event processing to avoid CPU spikes
 627  	ticker := time.NewTicker(10 * time.Millisecond)
 628  	defer ticker.Stop()
 629  
 630  	for {
 631  		select {
 632  		case <-bs.relay.ctx.Done():
 633  			return
 634  		case ev := <-bs.sub.Events:
 635  			if ev == nil {
 636  				return // Subscription closed
 637  			}
 638  
 639  			// Wait for throttle tick to avoid processing events too rapidly
 640  			<-ticker.C
 641  
 642  			// Save event to database
 643  			if _, err := bs.relay.spider.db.SaveEvent(bs.relay.ctx, ev); err != nil {
 644  				// Ignore duplicate events and other errors
 645  				log.T.F("spider: failed to save event from %s: %v", bs.relay.url, err)
 646  			} else {
 647  				// Publish event if it was newly saved
 648  				if bs.relay.spider.pub != nil {
 649  					go bs.relay.spider.pub.Deliver(ev)
 650  				}
 651  				log.T.F("spider: saved event from %s", bs.relay.url)
 652  			}
 653  		}
 654  	}
 655  }
 656  
 657  // updateSubscriptions updates subscriptions for a connection with new follow list
 658  func (rc *RelayConnection) updateSubscriptions(followList [][]byte) {
 659  	if rc.client == nil || !rc.client.IsConnected() {
 660  		return // Will be handled on reconnection
 661  	}
 662  
 663  	rc.mu.Lock()
 664  
 665  	// Check if we're in a rate limit backoff period
 666  	if time.Now().Before(rc.rateLimitUntil) {
 667  		remaining := time.Until(rc.rateLimitUntil)
 668  		rc.mu.Unlock()
 669  		log.D.F("spider: deferring subscription update for %s, rate limited for %v more", rc.url, remaining)
 670  		return
 671  	}
 672  
 673  	// Check if we need to perform catch-up for disconnected subscriptions
 674  	now := time.Now()
 675  	needsCatchup := false
 676  
 677  	for _, sub := range rc.subscriptions {
 678  		if sub.disconnectedAt != nil {
 679  			needsCatchup = true
 680  			rc.performCatchup(sub, *sub.disconnectedAt, now, followList)
 681  			sub.disconnectedAt = nil // Clear disconnection marker
 682  		}
 683  	}
 684  
 685  	if needsCatchup {
 686  		log.D.F("spider: performed catch-up for disconnected subscriptions on %s", rc.url)
 687  	}
 688  
 689  	// Recreate subscriptions with updated follow list
 690  	rc.clearSubscriptionsLocked()
 691  
 692  	batches := rc.createBatches(followList)
 693  
 694  	// Release lock before creating subscriptions
 695  	rc.mu.Unlock()
 696  
 697  	for i, batch := range batches {
 698  		// Check context before creating each batch
 699  		select {
 700  		case <-rc.ctx.Done():
 701  			return
 702  		default:
 703  		}
 704  
 705  		batchID := fmt.Sprintf("batch-%d", i)
 706  
 707  		rc.mu.Lock()
 708  		rc.createBatchSubscription(batchID, batch)
 709  		rc.mu.Unlock()
 710  
 711  		// Add delay between batches
 712  		if i < len(batches)-1 {
 713  			time.Sleep(BatchCreationDelay)
 714  		}
 715  	}
 716  }
 717  
 718  // performCatchup queries for events missed during disconnection
 719  func (rc *RelayConnection) performCatchup(sub *BatchSubscription, disconnectTime, reconnectTime time.Time, followList [][]byte) {
 720  	// Expand time window by CatchupWindow on both sides
 721  	since := disconnectTime.Add(-CatchupWindow)
 722  	until := reconnectTime.Add(CatchupWindow)
 723  
 724  	log.D.F("spider: performing catch-up for %s from %v to %v (expanded window)",
 725  		rc.url, since, until)
 726  
 727  	// Create catch-up filters with time constraints
 728  	sinceTs := timestamp.T{V: since.Unix()}
 729  	untilTs := timestamp.T{V: until.Unix()}
 730  
 731  	// Create filters with hex-encoded pubkeys for #p tags
 732  	// All pubkeys must be in a single tag array
 733  	tagElements := [][]byte{[]byte("p")} // First element is the key
 734  	for _, pk := range sub.pubkeys {
 735  		pkHex := hex.EncAppend(nil, pk)
 736  		tagElements = append(tagElements, pkHex)
 737  	}
 738  	pTags := &tag.S{tag.NewFromBytesSlice(tagElements...)}
 739  
 740  	filters := filter.NewS(
 741  		&filter.F{
 742  			Authors: tag.NewFromBytesSlice(sub.pubkeys...),
 743  			Since:   &sinceTs,
 744  			Until:   &untilTs,
 745  		},
 746  		&filter.F{
 747  			Tags:  pTags,
 748  			Since: &sinceTs,
 749  			Until: &untilTs,
 750  		},
 751  	)
 752  
 753  	// Create temporary subscription for catch-up
 754  	catchupCtx, cancel := context.WithTimeout(rc.ctx, 30*time.Second)
 755  	defer cancel()
 756  
 757  	catchupSub, err := rc.client.Subscribe(catchupCtx, filters)
 758  	if chk.E(err) {
 759  		log.E.F("spider: failed to create catch-up subscription on %s: %v", rc.url, err)
 760  		return
 761  	}
 762  	defer catchupSub.Unsub()
 763  
 764  	// Process catch-up events with throttling
 765  	eventCount := 0
 766  	timeout := time.After(60 * time.Second) // Increased timeout for catch-up
 767  	throttle := time.NewTicker(20 * time.Millisecond)
 768  	defer throttle.Stop()
 769  
 770  	for {
 771  		select {
 772  		case <-catchupCtx.Done():
 773  			log.D.F("spider: catch-up completed on %s, processed %d events", rc.url, eventCount)
 774  			return
 775  		case <-timeout:
 776  			log.D.F("spider: catch-up timeout on %s, processed %d events", rc.url, eventCount)
 777  			return
 778  		case <-catchupSub.EndOfStoredEvents:
 779  			log.D.F("spider: catch-up EOSE on %s, processed %d events", rc.url, eventCount)
 780  			return
 781  		case ev := <-catchupSub.Events:
 782  			if ev == nil {
 783  				return
 784  			}
 785  
 786  			// Throttle event processing
 787  			<-throttle.C
 788  
 789  			eventCount++
 790  
 791  			// Save event to database
 792  			if _, err := rc.spider.db.SaveEvent(rc.ctx, ev); err != nil {
 793  				// Silently ignore errors (mostly duplicates)
 794  			} else {
 795  				// Publish event if it was newly saved
 796  				if rc.spider.pub != nil {
 797  					go rc.spider.pub.Deliver(ev)
 798  				}
 799  				log.T.F("spider: catch-up saved event %s from %s",
 800  					hex.Enc(ev.ID[:]), rc.url)
 801  			}
 802  		}
 803  	}
 804  }
 805  
 806  // clearSubscriptions clears all subscriptions (with lock)
 807  func (rc *RelayConnection) clearSubscriptions() {
 808  	rc.mu.Lock()
 809  	defer rc.mu.Unlock()
 810  	rc.clearSubscriptionsLocked()
 811  }
 812  
 813  // clearSubscriptionsLocked clears all subscriptions (without lock)
 814  func (rc *RelayConnection) clearSubscriptionsLocked() {
 815  	for _, sub := range rc.subscriptions {
 816  		if sub.sub != nil {
 817  			sub.sub.Unsub()
 818  		}
 819  	}
 820  	rc.subscriptions = make(map[string]*BatchSubscription)
 821  }
 822  
 823  // close closes the relay connection
 824  func (rc *RelayConnection) close() {
 825  	rc.clearSubscriptions()
 826  
 827  	if rc.client != nil {
 828  		rc.client.Close()
 829  		rc.client = nil
 830  	}
 831  
 832  	rc.cancel()
 833  }
 834  
 835  // isSelfRelay checks if a relay URL is actually ourselves by comparing NIP-11 pubkeys
 836  func (s *Spider) isSelfRelay(relayURL string) bool {
 837  	// If we don't have a relay identity pubkey, can't compare
 838  	if s.relayIdentityPubkey == "" {
 839  		return false
 840  	}
 841  
 842  	s.mu.RLock()
 843  	// Fast path: check if we already know this URL is ours
 844  	if s.selfURLs[relayURL] {
 845  		s.mu.RUnlock()
 846  		log.D.F("spider: skipping self-relay (known URL): %s", relayURL)
 847  		return true
 848  	}
 849  	s.mu.RUnlock()
 850  
 851  	// Slow path: check via NIP-11 pubkey
 852  	nip11Cache := dsync.NewNIP11Cache(30 * time.Minute)
 853  	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 854  	defer cancel()
 855  
 856  	peerPubkey, err := nip11Cache.GetPubkey(ctx, relayURL)
 857  	if err != nil {
 858  		log.D.F("spider: couldn't fetch NIP-11 for %s: %v", relayURL, err)
 859  		return false
 860  	}
 861  
 862  	if peerPubkey == s.relayIdentityPubkey {
 863  		log.D.F("spider: discovered self-relay: %s (pubkey: %s)", relayURL, s.relayIdentityPubkey)
 864  		// Cache this URL as ours for future fast lookups
 865  		s.mu.Lock()
 866  		s.selfURLs[relayURL] = true
 867  		s.mu.Unlock()
 868  		return true
 869  	}
 870  
 871  	return false
 872  }
 873