directory.go raw

   1  package spider
   2  
   3  import (
   4  	"context"
   5  	"sync"
   6  	"sync/atomic"
   7  	"time"
   8  
   9  	"next.orly.dev/pkg/nostr/crypto/keys"
  10  	"next.orly.dev/pkg/nostr/encoders/event"
  11  	"next.orly.dev/pkg/nostr/encoders/filter"
  12  	"next.orly.dev/pkg/nostr/encoders/kind"
  13  	"next.orly.dev/pkg/nostr/encoders/tag"
  14  	"next.orly.dev/pkg/nostr/utils/normalize"
  15  	"next.orly.dev/pkg/nostr/ws"
  16  	"next.orly.dev/pkg/lol/chk"
  17  	"next.orly.dev/pkg/lol/errorf"
  18  	"next.orly.dev/pkg/lol/log"
  19  	"next.orly.dev/pkg/database"
  20  	"next.orly.dev/pkg/interfaces/publisher"
  21  	dsync "next.orly.dev/pkg/sync"
  22  )
  23  
  24  const (
  25  	// DirectorySpiderDefaultInterval is how often the directory spider runs
  26  	DirectorySpiderDefaultInterval = 24 * time.Hour
  27  	// DirectorySpiderDefaultMaxHops is the maximum hop distance for relay discovery
  28  	DirectorySpiderDefaultMaxHops = 3
  29  	// DirectorySpiderRelayTimeout is the timeout for connecting to and querying a relay
  30  	DirectorySpiderRelayTimeout = 30 * time.Second
  31  	// DirectorySpiderQueryTimeout is the timeout for waiting for EOSE on a query
  32  	DirectorySpiderQueryTimeout = 60 * time.Second
  33  	// DirectorySpiderRelayDelay is the delay between processing relays (rate limiting)
  34  	DirectorySpiderRelayDelay = 500 * time.Millisecond
  35  	// DirectorySpiderMaxEventsPerQuery is the limit for each query
  36  	DirectorySpiderMaxEventsPerQuery = 5000
  37  )
  38  
  39  // DirectorySpider manages periodic relay discovery and metadata synchronization.
  40  // It discovers relays by crawling kind 10002 (relay list) events, expanding outward
  41  // in hops from seed pubkeys (whitelisted users), then fetches essential metadata
  42  // events (kinds 0, 3, 10000, 10002) from all discovered relays.
  43  type DirectorySpider struct {
  44  	ctx    context.Context
  45  	cancel context.CancelFunc
  46  	db     *database.D
  47  	pub    publisher.I
  48  
  49  	// Configuration
  50  	interval time.Duration
  51  	maxHops  int
  52  
  53  	// State
  54  	running atomic.Bool
  55  	lastRun time.Time
  56  
  57  	// Relay discovery state (reset each run)
  58  	mu               sync.Mutex
  59  	discoveredRelays map[string]int  // URL -> hop distance
  60  	processedRelays  map[string]bool // Already fetched metadata from
  61  
  62  	// Self-detection
  63  	relayIdentityPubkey string
  64  	selfURLs            map[string]bool
  65  	nip11Cache          *dsync.NIP11Cache
  66  
  67  	// Callback for getting seed pubkeys (whitelisted users)
  68  	getSeedPubkeys func() [][]byte
  69  
  70  	// Trigger channel for manual runs
  71  	triggerChan chan struct{}
  72  }
  73  
  74  // NewDirectorySpider creates a new DirectorySpider instance.
  75  func NewDirectorySpider(
  76  	ctx context.Context,
  77  	db *database.D,
  78  	pub publisher.I,
  79  	interval time.Duration,
  80  	maxHops int,
  81  ) (ds *DirectorySpider, err error) {
  82  	if db == nil {
  83  		err = errorf.E("database cannot be nil")
  84  		return
  85  	}
  86  
  87  	if interval <= 0 {
  88  		interval = DirectorySpiderDefaultInterval
  89  	}
  90  	if maxHops <= 0 {
  91  		maxHops = DirectorySpiderDefaultMaxHops
  92  	}
  93  
  94  	ctx, cancel := context.WithCancel(ctx)
  95  
  96  	// Get relay identity pubkey for self-detection
  97  	var relayPubkey string
  98  	if skb, err := db.GetRelayIdentitySecret(); err == nil && len(skb) == 32 {
  99  		pk, _ := keys.SecretBytesToPubKeyHex(skb)
 100  		relayPubkey = pk
 101  	}
 102  
 103  	ds = &DirectorySpider{
 104  		ctx:                 ctx,
 105  		cancel:              cancel,
 106  		db:                  db,
 107  		pub:                 pub,
 108  		interval:            interval,
 109  		maxHops:             maxHops,
 110  		discoveredRelays:    make(map[string]int),
 111  		processedRelays:     make(map[string]bool),
 112  		relayIdentityPubkey: relayPubkey,
 113  		selfURLs:            make(map[string]bool),
 114  		nip11Cache:          dsync.NewNIP11Cache(30 * time.Minute),
 115  		triggerChan:         make(chan struct{}, 1),
 116  	}
 117  
 118  	return
 119  }
 120  
 121  // SetSeedCallback sets the callback function for getting seed pubkeys (whitelisted users).
 122  func (ds *DirectorySpider) SetSeedCallback(getSeedPubkeys func() [][]byte) {
 123  	ds.mu.Lock()
 124  	defer ds.mu.Unlock()
 125  	ds.getSeedPubkeys = getSeedPubkeys
 126  }
 127  
 128  // Start begins the directory spider operation.
 129  func (ds *DirectorySpider) Start() (err error) {
 130  	if ds.running.Load() {
 131  		err = errorf.E("directory spider already running")
 132  		return
 133  	}
 134  
 135  	if ds.getSeedPubkeys == nil {
 136  		err = errorf.E("seed callback must be set before starting")
 137  		return
 138  	}
 139  
 140  	ds.running.Store(true)
 141  	go ds.mainLoop()
 142  
 143  	log.I.F("directory spider: started (interval: %v, max hops: %d)", ds.interval, ds.maxHops)
 144  	return
 145  }
 146  
 147  // Stop stops the directory spider operation.
 148  func (ds *DirectorySpider) Stop() {
 149  	if !ds.running.Load() {
 150  		return
 151  	}
 152  
 153  	ds.running.Store(false)
 154  	ds.cancel()
 155  
 156  	log.I.F("directory spider: stopped")
 157  }
 158  
 159  // TriggerNow forces an immediate run of the directory spider.
 160  func (ds *DirectorySpider) TriggerNow() {
 161  	select {
 162  	case ds.triggerChan <- struct{}{}:
 163  		log.D.F("directory spider: manual trigger sent")
 164  	default:
 165  		log.D.F("directory spider: trigger already pending")
 166  	}
 167  }
 168  
 169  // LastRun returns the time of the last completed run.
 170  func (ds *DirectorySpider) LastRun() time.Time {
 171  	ds.mu.Lock()
 172  	defer ds.mu.Unlock()
 173  	return ds.lastRun
 174  }
 175  
 176  // mainLoop is the main spider loop that runs periodically.
 177  func (ds *DirectorySpider) mainLoop() {
 178  	// Run immediately on start
 179  	ds.runOnce()
 180  
 181  	ticker := time.NewTicker(ds.interval)
 182  	defer ticker.Stop()
 183  
 184  	log.D.F("directory spider: main loop started, running every %v", ds.interval)
 185  
 186  	for {
 187  		select {
 188  		case <-ds.ctx.Done():
 189  			return
 190  		case <-ds.triggerChan:
 191  			log.D.F("directory spider: manual trigger received")
 192  			ds.runOnce()
 193  		case <-ticker.C:
 194  			log.D.F("directory spider: scheduled run triggered")
 195  			ds.runOnce()
 196  		}
 197  	}
 198  }
 199  
 200  // runOnce performs a single directory spider run.
 201  func (ds *DirectorySpider) runOnce() {
 202  	if !ds.running.Load() {
 203  		return
 204  	}
 205  
 206  	log.D.F("directory spider: starting run")
 207  	start := time.Now()
 208  
 209  	// Reset state for this run
 210  	ds.mu.Lock()
 211  	ds.discoveredRelays = make(map[string]int)
 212  	ds.processedRelays = make(map[string]bool)
 213  	ds.mu.Unlock()
 214  
 215  	// Phase 1: Discover relays via hop expansion
 216  	if err := ds.discoverRelays(); err != nil {
 217  		log.E.F("directory spider: relay discovery failed: %v", err)
 218  		return
 219  	}
 220  
 221  	ds.mu.Lock()
 222  	relayCount := len(ds.discoveredRelays)
 223  	ds.mu.Unlock()
 224  
 225  	log.D.F("directory spider: discovered %d relays", relayCount)
 226  
 227  	// Phase 2: Fetch metadata from all discovered relays
 228  	if err := ds.fetchMetadataFromRelays(); err != nil {
 229  		log.E.F("directory spider: metadata fetch failed: %v", err)
 230  		return
 231  	}
 232  
 233  	ds.mu.Lock()
 234  	ds.lastRun = time.Now()
 235  	ds.mu.Unlock()
 236  
 237  	log.D.F("directory spider: completed run in %v", time.Since(start))
 238  }
 239  
 240  // discoverRelays performs the multi-hop relay discovery.
 241  func (ds *DirectorySpider) discoverRelays() error {
 242  	// Get seed pubkeys from callback
 243  	seedPubkeys := ds.getSeedPubkeys()
 244  	if len(seedPubkeys) == 0 {
 245  		log.W.F("directory spider: no seed pubkeys available")
 246  		return nil
 247  	}
 248  
 249  	log.D.F("directory spider: starting relay discovery with %d seed pubkeys", len(seedPubkeys))
 250  
 251  	// Round 0: Get relay lists from seed pubkeys in local database
 252  	seedRelays, err := ds.getRelaysFromLocalDB(seedPubkeys)
 253  	if err != nil {
 254  		return errorf.W("failed to get relays from local DB: %v", err)
 255  	}
 256  
 257  	// Filter out self-relays WITHOUT holding mu — isSelfRelay takes mu internally
 258  	var nonSelfRelays []string
 259  	for _, url := range seedRelays {
 260  		if !ds.isSelfRelay(url) {
 261  			nonSelfRelays = append(nonSelfRelays, url)
 262  		}
 263  	}
 264  
 265  	// Add seed relays at hop 0
 266  	ds.mu.Lock()
 267  	for _, url := range nonSelfRelays {
 268  		ds.discoveredRelays[url] = 0
 269  	}
 270  	ds.mu.Unlock()
 271  
 272  	log.D.F("directory spider: found %d seed relays from local database", len(seedRelays))
 273  
 274  	// Rounds 1 to maxHops: Expand outward
 275  	for hop := 1; hop <= ds.maxHops; hop++ {
 276  		select {
 277  		case <-ds.ctx.Done():
 278  			return ds.ctx.Err()
 279  		default:
 280  		}
 281  
 282  		// Get relays at previous hop level that haven't been processed
 283  		ds.mu.Lock()
 284  		var relaysToProcess []string
 285  		for url, hopLevel := range ds.discoveredRelays {
 286  			if hopLevel == hop-1 && !ds.processedRelays[url] {
 287  				relaysToProcess = append(relaysToProcess, url)
 288  			}
 289  		}
 290  		ds.mu.Unlock()
 291  
 292  		if len(relaysToProcess) == 0 {
 293  			log.D.F("directory spider: no relays to process at hop %d", hop)
 294  			break
 295  		}
 296  
 297  		log.D.F("directory spider: hop %d - processing %d relays", hop, len(relaysToProcess))
 298  
 299  		newRelaysThisHop := 0
 300  
 301  		// Process each relay serially
 302  		for _, relayURL := range relaysToProcess {
 303  			select {
 304  			case <-ds.ctx.Done():
 305  				return ds.ctx.Err()
 306  			default:
 307  			}
 308  
 309  			// Fetch kind 10002 events from this relay
 310  			events, err := ds.fetchRelayListsFromRelay(relayURL)
 311  			if err != nil {
 312  				log.W.F("directory spider: failed to fetch from %s: %v", relayURL, err)
 313  				// Mark as processed even on failure to avoid retrying
 314  				ds.mu.Lock()
 315  				ds.processedRelays[relayURL] = true
 316  				ds.mu.Unlock()
 317  				continue
 318  			}
 319  
 320  			// Extract new relay URLs
 321  			newRelays := ds.extractRelaysFromEvents(events)
 322  
 323  			// Filter self-relays outside the lock to avoid deadlock
 324  			// (isSelfRelay takes ds.mu internally)
 325  			ds.mu.Lock()
 326  			var unknownRelays []string
 327  			for _, newURL := range newRelays {
 328  				if _, exists := ds.discoveredRelays[newURL]; !exists {
 329  					unknownRelays = append(unknownRelays, newURL)
 330  				}
 331  			}
 332  			ds.processedRelays[relayURL] = true
 333  			ds.mu.Unlock()
 334  
 335  			var nonSelfNew []string
 336  			for _, newURL := range unknownRelays {
 337  				if !ds.isSelfRelay(newURL) {
 338  					nonSelfNew = append(nonSelfNew, newURL)
 339  				}
 340  			}
 341  
 342  			ds.mu.Lock()
 343  			for _, newURL := range nonSelfNew {
 344  				if _, exists := ds.discoveredRelays[newURL]; !exists {
 345  					ds.discoveredRelays[newURL] = hop
 346  					newRelaysThisHop++
 347  				}
 348  			}
 349  			ds.mu.Unlock()
 350  
 351  			// Rate limiting delay between relays
 352  			time.Sleep(DirectorySpiderRelayDelay)
 353  		}
 354  
 355  		log.D.F("directory spider: hop %d - discovered %d new relays", hop, newRelaysThisHop)
 356  	}
 357  
 358  	return nil
 359  }
 360  
 361  // getRelaysFromLocalDB queries the local database for kind 10002 events from seed pubkeys.
 362  func (ds *DirectorySpider) getRelaysFromLocalDB(seedPubkeys [][]byte) ([]string, error) {
 363  	ctx, cancel := context.WithTimeout(ds.ctx, 30*time.Second)
 364  	defer cancel()
 365  
 366  	// Query for kind 10002 from seed pubkeys
 367  	f := &filter.F{
 368  		Authors: tag.NewFromBytesSlice(seedPubkeys...),
 369  		Kinds:   kind.NewS(kind.New(kind.RelayListMetadata.K)),
 370  	}
 371  
 372  	events, err := ds.db.QueryEvents(ctx, f)
 373  	if err != nil {
 374  		return nil, err
 375  	}
 376  
 377  	return ds.extractRelaysFromEvents(events), nil
 378  }
 379  
 380  // fetchRelayListsFromRelay connects to a relay and fetches all kind 10002 events.
 381  func (ds *DirectorySpider) fetchRelayListsFromRelay(relayURL string) ([]*event.E, error) {
 382  	ctx, cancel := context.WithTimeout(ds.ctx, DirectorySpiderRelayTimeout)
 383  	defer cancel()
 384  
 385  	log.D.F("directory spider: connecting to %s", relayURL)
 386  
 387  	client, err := ws.RelayConnect(ctx, relayURL)
 388  	if err != nil {
 389  		return nil, errorf.W("failed to connect: %v", err)
 390  	}
 391  	defer client.Close()
 392  
 393  	// Query for all kind 10002 events
 394  	limit := uint(DirectorySpiderMaxEventsPerQuery)
 395  	f := filter.NewS(&filter.F{
 396  		Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
 397  		Limit: &limit,
 398  	})
 399  
 400  	sub, err := client.Subscribe(ctx, f)
 401  	if err != nil {
 402  		return nil, errorf.W("failed to subscribe: %v", err)
 403  	}
 404  	defer sub.Unsub()
 405  
 406  	var events []*event.E
 407  	queryCtx, queryCancel := context.WithTimeout(ctx, DirectorySpiderQueryTimeout)
 408  	defer queryCancel()
 409  
 410  	// Collect events until EOSE or timeout
 411  	for {
 412  		select {
 413  		case <-queryCtx.Done():
 414  			log.D.F("directory spider: query timeout for %s, got %d events", relayURL, len(events))
 415  			return events, nil
 416  		case <-sub.EndOfStoredEvents:
 417  			log.D.F("directory spider: EOSE from %s, got %d events", relayURL, len(events))
 418  			return events, nil
 419  		case ev := <-sub.Events:
 420  			if ev == nil {
 421  				return events, nil
 422  			}
 423  			events = append(events, ev)
 424  		}
 425  	}
 426  }
 427  
 428  // extractRelaysFromEvents parses kind 10002 events and extracts relay URLs from "r" tags.
 429  func (ds *DirectorySpider) extractRelaysFromEvents(events []*event.E) []string {
 430  	seen := make(map[string]bool)
 431  	var relays []string
 432  
 433  	for _, ev := range events {
 434  		// Get all "r" tags
 435  		rTags := ev.Tags.GetAll([]byte("r"))
 436  		for _, rTag := range rTags {
 437  			if len(rTag.T) < 2 {
 438  				continue
 439  			}
 440  			urlBytes := rTag.Value()
 441  			if len(urlBytes) == 0 {
 442  				continue
 443  			}
 444  
 445  			// Normalize the URL
 446  			normalized := string(normalize.URL(string(urlBytes)))
 447  			if normalized == "" {
 448  				continue
 449  			}
 450  
 451  			if !seen[normalized] {
 452  				seen[normalized] = true
 453  				relays = append(relays, normalized)
 454  			}
 455  		}
 456  	}
 457  
 458  	return relays
 459  }
 460  
 461  // fetchMetadataFromRelays iterates through all discovered relays and fetches metadata.
 462  func (ds *DirectorySpider) fetchMetadataFromRelays() error {
 463  	ds.mu.Lock()
 464  	// Copy relay list to avoid holding lock during network operations
 465  	var relays []string
 466  	for url := range ds.discoveredRelays {
 467  		relays = append(relays, url)
 468  	}
 469  	ds.mu.Unlock()
 470  
 471  	log.D.F("directory spider: fetching metadata from %d relays", len(relays))
 472  
 473  	// Kinds to fetch: 0 (profile), 3 (follow list), 10000 (mute list), 10002 (relay list)
 474  	kindsToFetch := []uint16{
 475  		kind.ProfileMetadata.K,   // 0
 476  		kind.FollowList.K,        // 3
 477  		kind.MuteList.K,          // 10000
 478  		kind.RelayListMetadata.K, // 10002
 479  	}
 480  
 481  	totalSaved := 0
 482  	totalDuplicates := 0
 483  
 484  	for _, relayURL := range relays {
 485  		select {
 486  		case <-ds.ctx.Done():
 487  			return ds.ctx.Err()
 488  		default:
 489  		}
 490  
 491  		ds.mu.Lock()
 492  		alreadyProcessed := ds.processedRelays[relayURL]
 493  		ds.mu.Unlock()
 494  
 495  		if alreadyProcessed {
 496  			continue
 497  		}
 498  
 499  		log.D.F("directory spider: fetching metadata from %s", relayURL)
 500  
 501  		for _, k := range kindsToFetch {
 502  			select {
 503  			case <-ds.ctx.Done():
 504  				return ds.ctx.Err()
 505  			default:
 506  			}
 507  
 508  			events, err := ds.fetchKindFromRelay(relayURL, k)
 509  			if err != nil {
 510  				log.W.F("directory spider: failed to fetch kind %d from %s: %v", k, relayURL, err)
 511  				continue
 512  			}
 513  
 514  			saved, duplicates := ds.storeEvents(events)
 515  			totalSaved += saved
 516  			totalDuplicates += duplicates
 517  
 518  			log.D.F("directory spider: kind %d from %s: %d saved, %d duplicates",
 519  				k, relayURL, saved, duplicates)
 520  		}
 521  
 522  		ds.mu.Lock()
 523  		ds.processedRelays[relayURL] = true
 524  		ds.mu.Unlock()
 525  
 526  		// Rate limiting delay between relays
 527  		time.Sleep(DirectorySpiderRelayDelay)
 528  	}
 529  
 530  	log.D.F("directory spider: metadata fetch complete - %d events saved, %d duplicates",
 531  		totalSaved, totalDuplicates)
 532  
 533  	return nil
 534  }
 535  
 536  // fetchKindFromRelay connects to a relay and fetches events of a specific kind.
 537  func (ds *DirectorySpider) fetchKindFromRelay(relayURL string, k uint16) ([]*event.E, error) {
 538  	ctx, cancel := context.WithTimeout(ds.ctx, DirectorySpiderRelayTimeout)
 539  	defer cancel()
 540  
 541  	client, err := ws.RelayConnect(ctx, relayURL)
 542  	if err != nil {
 543  		return nil, errorf.W("failed to connect: %v", err)
 544  	}
 545  	defer client.Close()
 546  
 547  	// Query for events of this kind
 548  	limit := uint(DirectorySpiderMaxEventsPerQuery)
 549  	f := filter.NewS(&filter.F{
 550  		Kinds: kind.NewS(kind.New(k)),
 551  		Limit: &limit,
 552  	})
 553  
 554  	sub, err := client.Subscribe(ctx, f)
 555  	if err != nil {
 556  		return nil, errorf.W("failed to subscribe: %v", err)
 557  	}
 558  	defer sub.Unsub()
 559  
 560  	var events []*event.E
 561  	queryCtx, queryCancel := context.WithTimeout(ctx, DirectorySpiderQueryTimeout)
 562  	defer queryCancel()
 563  
 564  	for {
 565  		select {
 566  		case <-queryCtx.Done():
 567  			return events, nil
 568  		case <-sub.EndOfStoredEvents:
 569  			return events, nil
 570  		case ev := <-sub.Events:
 571  			if ev == nil {
 572  				return events, nil
 573  			}
 574  			events = append(events, ev)
 575  		}
 576  	}
 577  }
 578  
 579  // storeEvents saves events to the database and publishes new ones.
 580  func (ds *DirectorySpider) storeEvents(events []*event.E) (saved, duplicates int) {
 581  	for _, ev := range events {
 582  		_, err := ds.db.SaveEvent(ds.ctx, ev)
 583  		if err != nil {
 584  			if chk.T(err) {
 585  				// Most errors are duplicates, which is expected
 586  				duplicates++
 587  			}
 588  			continue
 589  		}
 590  		saved++
 591  
 592  		// Publish event to active subscribers
 593  		if ds.pub != nil {
 594  			go ds.pub.Deliver(ev)
 595  		}
 596  	}
 597  	return
 598  }
 599  
 600  // isSelfRelay checks if a relay URL is ourselves by comparing NIP-11 pubkeys.
 601  func (ds *DirectorySpider) isSelfRelay(relayURL string) bool {
 602  	// If we don't have a relay identity pubkey, can't compare
 603  	if ds.relayIdentityPubkey == "" {
 604  		return false
 605  	}
 606  
 607  	ds.mu.Lock()
 608  	// Fast path: check if we already know this URL is ours
 609  	if ds.selfURLs[relayURL] {
 610  		ds.mu.Unlock()
 611  		return true
 612  	}
 613  	ds.mu.Unlock()
 614  
 615  	// Slow path: check via NIP-11 pubkey
 616  	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 617  	defer cancel()
 618  
 619  	peerPubkey, err := ds.nip11Cache.GetPubkey(ctx, relayURL)
 620  	if err != nil {
 621  		// Can't determine, assume not self
 622  		return false
 623  	}
 624  
 625  	if peerPubkey == ds.relayIdentityPubkey {
 626  		log.D.F("directory spider: discovered self-relay: %s", relayURL)
 627  		ds.mu.Lock()
 628  		ds.selfURLs[relayURL] = true
 629  		ds.mu.Unlock()
 630  		return true
 631  	}
 632  
 633  	return false
 634  }
 635