crawler.go raw

   1  // Package crawler provides an automated corpus crawler that discovers relays
   2  // via kind 10002 hop-expansion and then syncs all events from each discovered
   3  // relay using NIP-77 negentropy set reconciliation.
   4  //
   5  // Architecture:
   6  //
   7  //	Discovery loop expands relay URLs from seed pubkeys via kind 10002 →
   8  //	Crawler maintains a persistent frontier of relay entries with per-relay
   9  //	state → Sync loop picks relays due for sync and runs negentropy
  10  //	reconciliation with bounded concurrency → Frontier state is persisted
  11  //	to database markers so it survives restarts.
  12  package crawler
  13  
  14  import (
  15  	"context"
  16  	"encoding/json"
  17  	"fmt"
  18  	gosync "sync"
  19  	"sync/atomic"
  20  	"time"
  21  
  22  	"next.orly.dev/pkg/database"
  23  	"next.orly.dev/pkg/interfaces/publisher"
  24  	"next.orly.dev/pkg/lol/log"
  25  	"next.orly.dev/pkg/nostr/crypto/keys"
  26  	"next.orly.dev/pkg/nostr/encoders/filter"
  27  	"next.orly.dev/pkg/nostr/encoders/kind"
  28  	"next.orly.dev/pkg/nostr/encoders/tag"
  29  	"next.orly.dev/pkg/nostr/utils/normalize"
  30  	"next.orly.dev/pkg/nostr/ws"
  31  	dsync "next.orly.dev/pkg/sync"
  32  	"next.orly.dev/pkg/sync/negentropy"
  33  )
  34  
  35  const (
  36  	DefaultDiscoveryInterval = 4 * time.Hour
  37  	DefaultSyncInterval      = 30 * time.Minute
  38  	DefaultMaxHops           = 5
  39  	DefaultConcurrency       = 3
  40  	DefaultRelayTimeout      = 30 * time.Second
  41  	QueryTimeout             = 60 * time.Second
  42  	DefaultSyncTimeout       = 10 * time.Minute
  43  	DefaultRelayDelay        = 500 * time.Millisecond
  44  	DefaultMaxFailures       = 5
  45  	DefaultBlacklistDuration = 24 * time.Hour
  46  	DefaultMaxEventsPerSync  = 1_000_000
  47  	MaxEventsPerQuery        = 5000
  48  
  49  	markerFrontierKey = "crawler:frontier"
  50  	markerStatsKey    = "crawler:stats"
  51  )
  52  
  53  // Config holds configuration for the corpus crawler.
  54  type Config struct {
  55  	DiscoveryInterval time.Duration
  56  	MaxHops           int
  57  
  58  	SyncInterval     time.Duration
  59  	Concurrency      int
  60  	SyncTimeout      time.Duration
  61  	MaxEventsPerSync uint
  62  
  63  	RelayDelay time.Duration
  64  
  65  	MaxFailures       int
  66  	BlacklistDuration time.Duration
  67  }
  68  
  69  // DefaultConfig returns a Config with sensible defaults.
  70  func DefaultConfig() *Config {
  71  	return &Config{
  72  		DiscoveryInterval: DefaultDiscoveryInterval,
  73  		MaxHops:           DefaultMaxHops,
  74  		SyncInterval:      DefaultSyncInterval,
  75  		Concurrency:       DefaultConcurrency,
  76  		SyncTimeout:       DefaultSyncTimeout,
  77  		MaxEventsPerSync:  DefaultMaxEventsPerSync,
  78  		RelayDelay:        DefaultRelayDelay,
  79  		MaxFailures:       DefaultMaxFailures,
  80  		BlacklistDuration: DefaultBlacklistDuration,
  81  	}
  82  }
  83  
  84  // RelayState tracks the crawl state of a single relay.
  85  type RelayState struct {
  86  	URL              string    `json:"url"`
  87  	HopDistance      int       `json:"hop_distance"`
  88  	FirstSeen        time.Time `json:"first_seen"`
  89  	LastSync         time.Time `json:"last_sync"`
  90  	LastDiscovery    time.Time `json:"last_discovery"`
  91  	EventsSynced     int64     `json:"events_synced"`
  92  	TotalSyncs       int64     `json:"total_syncs"`
  93  	ConsecFailures   int       `json:"consec_failures"`
  94  	LastError        string    `json:"last_error,omitempty"`
  95  	BlacklistedUntil time.Time `json:"blacklisted_until,omitempty"`
  96  	IsSelf           bool      `json:"is_self,omitempty"`
  97  }
  98  
  99  func (rs *RelayState) isBlacklisted() bool {
 100  	return !rs.BlacklistedUntil.IsZero() && time.Now().Before(rs.BlacklistedUntil)
 101  }
 102  
 103  func (rs *RelayState) needsSync(interval time.Duration) bool {
 104  	if rs.IsSelf || rs.isBlacklisted() {
 105  		return false
 106  	}
 107  	return rs.LastSync.IsZero() || time.Since(rs.LastSync) >= interval
 108  }
 109  
 110  // Stats tracks aggregate crawler statistics.
 111  type Stats struct {
 112  	TotalRelaysDiscovered int64     `json:"total_relays_discovered"`
 113  	TotalRelaysSynced     int64     `json:"total_relays_synced"`
 114  	TotalEventsSynced     int64     `json:"total_events_synced"`
 115  	TotalSyncErrors       int64     `json:"total_sync_errors"`
 116  	LastDiscoveryRun      time.Time `json:"last_discovery_run"`
 117  	LastSyncRun           time.Time `json:"last_sync_run"`
 118  	BlacklistedRelays     int64     `json:"blacklisted_relays"`
 119  }
 120  
 121  // Crawler orchestrates relay discovery and corpus sync.
 122  type Crawler struct {
 123  	ctx    context.Context
 124  	cancel context.CancelFunc
 125  
 126  	db     database.Database
 127  	pub    publisher.I
 128  	config *Config
 129  
 130  	// mu protects frontier, stats, and selfURLs.
 131  	mu       gosync.RWMutex
 132  	frontier map[string]*RelayState
 133  	stats    Stats
 134  	selfURLs map[string]bool
 135  
 136  	relayIdentityPubkey string
 137  	nip11Cache          *dsync.NIP11Cache
 138  
 139  	// seedMu protects getSeedPubkeys independently from frontier lock
 140  	// to avoid holding mu during the callback (which may do its own locking).
 141  	seedMu         gosync.RWMutex
 142  	getSeedPubkeys func() [][]byte
 143  
 144  	running  atomic.Bool
 145  	stopOnce gosync.Once
 146  	stopChan chan struct{}
 147  	wg       gosync.WaitGroup
 148  }
 149  
 150  // New creates a new Crawler instance.
 151  func New(ctx context.Context, db database.Database, pub publisher.I, cfg *Config) (*Crawler, error) {
 152  	if db == nil {
 153  		return nil, fmt.Errorf("database cannot be nil")
 154  	}
 155  	if cfg == nil {
 156  		cfg = DefaultConfig()
 157  	}
 158  
 159  	ctx, cancel := context.WithCancel(ctx)
 160  
 161  	var relayPubkey string
 162  	if skb, err := db.GetRelayIdentitySecret(); err == nil && len(skb) == 32 {
 163  		pk, _ := keys.SecretBytesToPubKeyHex(skb)
 164  		relayPubkey = pk
 165  	}
 166  
 167  	c := &Crawler{
 168  		ctx:                 ctx,
 169  		cancel:              cancel,
 170  		db:                  db,
 171  		pub:                 pub,
 172  		config:              cfg,
 173  		frontier:            make(map[string]*RelayState),
 174  		selfURLs:            make(map[string]bool),
 175  		nip11Cache:          dsync.NewNIP11Cache(30 * time.Minute),
 176  		relayIdentityPubkey: relayPubkey,
 177  		stopChan:            make(chan struct{}),
 178  	}
 179  
 180  	if err := c.loadFrontier(); err != nil {
 181  		log.W.F("crawler: failed to load frontier: %v (starting fresh)", err)
 182  	}
 183  
 184  	return c, nil
 185  }
 186  
 187  // SetSeedCallback sets the callback for getting seed pubkeys used in discovery.
 188  func (c *Crawler) SetSeedCallback(fn func() [][]byte) {
 189  	c.seedMu.Lock()
 190  	defer c.seedMu.Unlock()
 191  	c.getSeedPubkeys = fn
 192  }
 193  
 194  func (c *Crawler) callSeedCallback() [][]byte {
 195  	c.seedMu.RLock()
 196  	fn := c.getSeedPubkeys
 197  	c.seedMu.RUnlock()
 198  	if fn == nil {
 199  		return nil
 200  	}
 201  	return fn()
 202  }
 203  
 204  // Start begins the crawler's discovery and sync loops.
 205  func (c *Crawler) Start() error {
 206  	if c.running.Load() {
 207  		return fmt.Errorf("crawler already running")
 208  	}
 209  
 210  	c.seedMu.RLock()
 211  	hasSeed := c.getSeedPubkeys != nil
 212  	c.seedMu.RUnlock()
 213  	if !hasSeed {
 214  		return fmt.Errorf("seed callback must be set before starting")
 215  	}
 216  
 217  	c.running.Store(true)
 218  
 219  	c.wg.Add(2)
 220  	go c.discoveryLoop()
 221  	go c.syncLoop()
 222  
 223  	log.I.F("crawler: started (discovery: %v, sync: %v, hops: %d, concurrency: %d)",
 224  		c.config.DiscoveryInterval, c.config.SyncInterval,
 225  		c.config.MaxHops, c.config.Concurrency)
 226  
 227  	return nil
 228  }
 229  
 230  // Stop stops the crawler gracefully.
 231  func (c *Crawler) Stop() {
 232  	if !c.running.Load() {
 233  		return
 234  	}
 235  	c.running.Store(false)
 236  	c.stopOnce.Do(func() { close(c.stopChan) })
 237  	c.cancel()
 238  	c.wg.Wait()
 239  
 240  	if err := c.saveFrontier(); err != nil {
 241  		log.W.F("crawler: failed to save frontier on stop: %v", err)
 242  	}
 243  
 244  	c.mu.RLock()
 245  	frontierSize := len(c.frontier)
 246  	totalEvents := c.stats.TotalEventsSynced
 247  	c.mu.RUnlock()
 248  
 249  	log.I.F("crawler: stopped (frontier: %d relays, total events synced: %d)",
 250  		frontierSize, totalEvents)
 251  }
 252  
 253  // GetStats returns a snapshot of crawler statistics.
 254  func (c *Crawler) GetStats() Stats {
 255  	c.mu.RLock()
 256  	defer c.mu.RUnlock()
 257  	return c.stats
 258  }
 259  
 260  // GetFrontierSize returns the number of relays in the frontier.
 261  func (c *Crawler) GetFrontierSize() int {
 262  	c.mu.RLock()
 263  	defer c.mu.RUnlock()
 264  	return len(c.frontier)
 265  }
 266  
 267  func (c *Crawler) discoveryLoop() {
 268  	defer c.wg.Done()
 269  
 270  	c.runDiscovery()
 271  
 272  	ticker := time.NewTicker(c.config.DiscoveryInterval)
 273  	defer ticker.Stop()
 274  
 275  	for {
 276  		select {
 277  		case <-c.stopChan:
 278  			return
 279  		case <-ticker.C:
 280  			c.runDiscovery()
 281  		}
 282  	}
 283  }
 284  
 285  func (c *Crawler) syncLoop() {
 286  	defer c.wg.Done()
 287  
 288  	select {
 289  	case <-c.stopChan:
 290  		return
 291  	case <-time.After(30 * time.Second):
 292  	}
 293  
 294  	c.runSyncCycle()
 295  
 296  	ticker := time.NewTicker(c.config.SyncInterval)
 297  	defer ticker.Stop()
 298  
 299  	for {
 300  		select {
 301  		case <-c.stopChan:
 302  			return
 303  		case <-ticker.C:
 304  			c.runSyncCycle()
 305  		}
 306  	}
 307  }
 308  
 309  // runDiscovery performs one relay discovery cycle using kind 10002 hop expansion.
 310  func (c *Crawler) runDiscovery() {
 311  	log.I.F("crawler: starting relay discovery (max hops: %d)", c.config.MaxHops)
 312  
 313  	seeds := c.callSeedCallback()
 314  	if len(seeds) == 0 {
 315  		log.W.F("crawler: no seed pubkeys, skipping discovery")
 316  		return
 317  	}
 318  
 319  	discovered := make(map[string]int) // URL -> hop distance
 320  
 321  	localRelays := c.getRelaysFromLocalDB(seeds)
 322  	for url := range localRelays {
 323  		discovered[url] = 0
 324  	}
 325  
 326  	log.I.F("crawler: hop 0 discovered %d relays from %d seed pubkeys", len(discovered), len(seeds))
 327  
 328  	for hop := 1; hop <= c.config.MaxHops; hop++ {
 329  		select {
 330  		case <-c.stopChan:
 331  			return
 332  		default:
 333  		}
 334  
 335  		var prevHopRelays []string
 336  		for url, h := range discovered {
 337  			if h == hop-1 {
 338  				prevHopRelays = append(prevHopRelays, url)
 339  			}
 340  		}
 341  
 342  		if len(prevHopRelays) == 0 {
 343  			log.I.F("crawler: no relays at hop %d, stopping expansion", hop-1)
 344  			break
 345  		}
 346  
 347  		newCount := 0
 348  		for _, relayURL := range prevHopRelays {
 349  			select {
 350  			case <-c.stopChan:
 351  				return
 352  			default:
 353  			}
 354  
 355  			// isSelfRelay does network IO (NIP-11) — must NOT hold mu.
 356  			if c.isSelfRelay(relayURL) {
 357  				continue
 358  			}
 359  
 360  			relays, err := c.fetchRelayListsFromRelay(relayURL)
 361  			if err != nil {
 362  				log.D.F("crawler: hop %d fetch from %s failed: %v", hop, relayURL, err)
 363  				continue
 364  			}
 365  
 366  			for _, newURL := range relays {
 367  				if _, exists := discovered[newURL]; !exists {
 368  					discovered[newURL] = hop
 369  					newCount++
 370  				}
 371  			}
 372  
 373  			time.Sleep(c.config.RelayDelay)
 374  		}
 375  
 376  		log.I.F("crawler: hop %d discovered %d new relays from %d sources",
 377  			hop, newCount, len(prevHopRelays))
 378  	}
 379  
 380  	// Filter self-relays before taking the lock (isSelfRelay does network IO).
 381  	filtered := make(map[string]int, len(discovered))
 382  	for url, hopDist := range discovered {
 383  		normURL := string(normalize.URL(url))
 384  		if normURL == "" {
 385  			continue
 386  		}
 387  		if c.isSelfRelay(normURL) {
 388  			continue
 389  		}
 390  		filtered[normURL] = hopDist
 391  	}
 392  
 393  	// Merge into frontier under lock.
 394  	c.mu.Lock()
 395  	added := 0
 396  	for normURL, hopDist := range filtered {
 397  		if existing, ok := c.frontier[normURL]; ok {
 398  			existing.LastDiscovery = time.Now()
 399  			if hopDist < existing.HopDistance {
 400  				existing.HopDistance = hopDist
 401  			}
 402  		} else {
 403  			c.frontier[normURL] = &RelayState{
 404  				URL:           normURL,
 405  				HopDistance:    hopDist,
 406  				FirstSeen:     time.Now(),
 407  				LastDiscovery: time.Now(),
 408  			}
 409  			added++
 410  		}
 411  	}
 412  	c.stats.TotalRelaysDiscovered = int64(len(c.frontier))
 413  	c.stats.LastDiscoveryRun = time.Now()
 414  	frontierSize := len(c.frontier)
 415  	c.mu.Unlock()
 416  
 417  	log.I.F("crawler: discovery complete — %d new relays added, frontier size: %d",
 418  		added, frontierSize)
 419  
 420  	if err := c.saveFrontier(); err != nil {
 421  		log.W.F("crawler: failed to save frontier: %v", err)
 422  	}
 423  }
 424  
 425  // runSyncCycle syncs events from relays that are due for a sync.
 426  func (c *Crawler) runSyncCycle() {
 427  	// Snapshot relay URLs and hop distances under lock.
 428  	type syncTarget struct {
 429  		url         string
 430  		hopDistance  int
 431  		totalSyncs  int64
 432  	}
 433  	c.mu.RLock()
 434  	var due []syncTarget
 435  	for _, rs := range c.frontier {
 436  		if rs.needsSync(c.config.SyncInterval) {
 437  			due = append(due, syncTarget{
 438  				url:        rs.URL,
 439  				hopDistance: rs.HopDistance,
 440  				totalSyncs: rs.TotalSyncs,
 441  			})
 442  		}
 443  	}
 444  	c.mu.RUnlock()
 445  
 446  	if len(due) == 0 {
 447  		log.D.F("crawler: no relays due for sync")
 448  		return
 449  	}
 450  
 451  	log.I.F("crawler: starting sync cycle — %d relays due", len(due))
 452  
 453  	sem := make(chan struct{}, c.config.Concurrency)
 454  	var syncWg gosync.WaitGroup
 455  
 456  	for _, target := range due {
 457  		select {
 458  		case <-c.stopChan:
 459  			syncWg.Wait()
 460  			return
 461  		default:
 462  		}
 463  
 464  		sem <- struct{}{}
 465  		syncWg.Add(1)
 466  
 467  		go func(t syncTarget) {
 468  			defer syncWg.Done()
 469  			defer func() { <-sem }()
 470  			c.syncRelay(t.url, t.hopDistance, t.totalSyncs)
 471  		}(target)
 472  	}
 473  
 474  	syncWg.Wait()
 475  
 476  	c.mu.Lock()
 477  	c.stats.LastSyncRun = time.Now()
 478  	c.mu.Unlock()
 479  
 480  	if err := c.saveFrontier(); err != nil {
 481  		log.W.F("crawler: failed to save frontier: %v", err)
 482  	}
 483  
 484  	log.I.F("crawler: sync cycle complete")
 485  }
 486  
 487  // syncRelay performs a negentropy sync with a single relay. The url, hopDistance,
 488  // and totalSyncs are snapshot values read under the lock by the caller; the
 489  // relay state in the frontier is updated under the lock after sync completes.
 490  func (c *Crawler) syncRelay(url string, hopDistance int, totalSyncs int64) {
 491  	ctx, cancel := context.WithTimeout(c.ctx, c.config.SyncTimeout)
 492  	defer cancel()
 493  
 494  	log.I.F("crawler: syncing %s (hop %d, syncs: %d)", url, hopDistance, totalSyncs)
 495  
 496  	negCfg := &negentropy.Config{
 497  		Peers:        []string{url},
 498  		SyncInterval: 60 * time.Second,
 499  		FrameSize:    128 * 1024,
 500  		IDSize:       16,
 501  		MaxEvents:    c.config.MaxEventsPerSync,
 502  	}
 503  
 504  	negMgr := negentropy.NewManager(c.db, negCfg)
 505  	negMgr.TriggerSync(ctx, url)
 506  	peerState, ok := negMgr.GetPeerState(url)
 507  
 508  	c.mu.Lock()
 509  	rs, exists := c.frontier[url]
 510  	if !exists {
 511  		c.mu.Unlock()
 512  		return
 513  	}
 514  
 515  	rs.LastSync = time.Now()
 516  	rs.TotalSyncs++
 517  
 518  	if !ok || peerState.Status == "error" {
 519  		rs.ConsecFailures++
 520  		if ok && peerState.LastError != "" {
 521  			rs.LastError = peerState.LastError
 522  		} else {
 523  			rs.LastError = "sync failed"
 524  		}
 525  
 526  		if rs.ConsecFailures >= c.config.MaxFailures {
 527  			rs.BlacklistedUntil = time.Now().Add(c.config.BlacklistDuration)
 528  			c.stats.BlacklistedRelays++
 529  			log.W.F("crawler: blacklisted %s after %d failures (until %v)",
 530  				rs.URL, rs.ConsecFailures, rs.BlacklistedUntil)
 531  		}
 532  
 533  		c.stats.TotalSyncErrors++
 534  		log.D.F("crawler: sync %s failed (%d consecutive): %s",
 535  			rs.URL, rs.ConsecFailures, rs.LastError)
 536  	} else {
 537  		eventsSynced := peerState.EventsSynced
 538  		rs.ConsecFailures = 0
 539  		rs.LastError = ""
 540  		rs.EventsSynced += eventsSynced
 541  		c.stats.TotalEventsSynced += eventsSynced
 542  		c.stats.TotalRelaysSynced++
 543  
 544  		if eventsSynced > 0 {
 545  			log.I.F("crawler: synced %d events from %s", eventsSynced, rs.URL)
 546  		}
 547  	}
 548  	c.mu.Unlock()
 549  }
 550  
 551  func (c *Crawler) getRelaysFromLocalDB(seeds [][]byte) map[string]bool {
 552  	relays := make(map[string]bool)
 553  
 554  	f := &filter.F{
 555  		Authors: tag.NewFromBytesSlice(seeds...),
 556  		Kinds:   kind.NewS(kind.New(kind.RelayListMetadata.K)),
 557  	}
 558  
 559  	events, err := c.db.QueryEvents(c.ctx, f)
 560  	if err != nil {
 561  		log.W.F("crawler: failed to query local relay lists: %v", err)
 562  		return relays
 563  	}
 564  
 565  	for _, ev := range events {
 566  		rTags := ev.Tags.GetAll([]byte("r"))
 567  		for _, rTag := range rTags {
 568  			if rTag.Len() < 2 {
 569  				continue
 570  			}
 571  			urlBytes := rTag.Value()
 572  			if len(urlBytes) == 0 {
 573  				continue
 574  			}
 575  			normURL := string(normalize.URL(string(urlBytes)))
 576  			if normURL != "" {
 577  				relays[normURL] = true
 578  			}
 579  		}
 580  	}
 581  
 582  	return relays
 583  }
 584  
 585  func (c *Crawler) fetchRelayListsFromRelay(relayURL string) ([]string, error) {
 586  	ctx, cancel := context.WithTimeout(c.ctx, DefaultRelayTimeout)
 587  	defer cancel()
 588  
 589  	client, err := ws.RelayConnect(ctx, relayURL)
 590  	if err != nil {
 591  		return nil, fmt.Errorf("connect failed: %w", err)
 592  	}
 593  	defer client.Close()
 594  
 595  	limit := uint(MaxEventsPerQuery)
 596  	ff := filter.NewS(&filter.F{
 597  		Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
 598  		Limit: &limit,
 599  	})
 600  
 601  	sub, err := client.Subscribe(ctx, ff)
 602  	if err != nil {
 603  		return nil, fmt.Errorf("subscribe failed: %w", err)
 604  	}
 605  	defer sub.Unsub()
 606  
 607  	var relays []string
 608  	seen := make(map[string]bool)
 609  
 610  	queryCtx, queryCancel := context.WithTimeout(ctx, QueryTimeout)
 611  	defer queryCancel()
 612  
 613  	for {
 614  		select {
 615  		case <-queryCtx.Done():
 616  			return relays, nil
 617  		case ev := <-sub.Events:
 618  			if ev == nil {
 619  				return relays, nil
 620  			}
 621  
 622  			if _, err := c.db.SaveEvent(c.ctx, ev); err != nil {
 623  				log.D.F("crawler: save event failed: %v", err)
 624  			}
 625  			if c.pub != nil {
 626  				c.pub.Deliver(ev)
 627  			}
 628  
 629  			rTags := ev.Tags.GetAll([]byte("r"))
 630  			for _, rTag := range rTags {
 631  				if rTag.Len() < 2 {
 632  					continue
 633  				}
 634  				normURL := string(normalize.URL(string(rTag.Value())))
 635  				if normURL != "" && !seen[normURL] {
 636  					seen[normURL] = true
 637  					relays = append(relays, normURL)
 638  				}
 639  			}
 640  		case <-sub.EndOfStoredEvents:
 641  			return relays, nil
 642  		}
 643  	}
 644  }
 645  
 646  // isSelfRelay checks if a relay URL belongs to this relay instance by comparing
 647  // NIP-11 pubkeys. This does network IO and must NOT be called while holding mu.
 648  func (c *Crawler) isSelfRelay(relayURL string) bool {
 649  	c.mu.RLock()
 650  	cached := c.selfURLs[relayURL]
 651  	c.mu.RUnlock()
 652  	if cached {
 653  		return true
 654  	}
 655  	if c.relayIdentityPubkey == "" {
 656  		return false
 657  	}
 658  
 659  	pubkey, err := c.nip11Cache.GetPubkey(c.ctx, relayURL)
 660  	if err != nil || pubkey == "" {
 661  		return false
 662  	}
 663  
 664  	if pubkey == c.relayIdentityPubkey {
 665  		c.mu.Lock()
 666  		c.selfURLs[relayURL] = true
 667  		c.mu.Unlock()
 668  		return true
 669  	}
 670  	return false
 671  }
 672  
 673  // loadFrontier loads persisted frontier state from database markers.
 674  // Only called during New() before Start(), so no lock needed.
 675  func (c *Crawler) loadFrontier() error {
 676  	data, err := c.db.GetMarker(markerFrontierKey)
 677  	if err != nil {
 678  		return err
 679  	}
 680  	if len(data) == 0 {
 681  		return nil
 682  	}
 683  
 684  	var frontier map[string]*RelayState
 685  	if err := json.Unmarshal(data, &frontier); err != nil {
 686  		return fmt.Errorf("unmarshal frontier: %w", err)
 687  	}
 688  
 689  	c.frontier = frontier
 690  	log.I.F("crawler: loaded frontier with %d relays", len(frontier))
 691  
 692  	statsData, err := c.db.GetMarker(markerStatsKey)
 693  	if err == nil && len(statsData) > 0 {
 694  		json.Unmarshal(statsData, &c.stats)
 695  	}
 696  
 697  	return nil
 698  }
 699  
 700  func (c *Crawler) saveFrontier() error {
 701  	c.mu.RLock()
 702  	data, err := json.Marshal(c.frontier)
 703  	if err != nil {
 704  		c.mu.RUnlock()
 705  		return fmt.Errorf("marshal frontier: %w", err)
 706  	}
 707  	statsData, err := json.Marshal(c.stats)
 708  	c.mu.RUnlock()
 709  	if err != nil {
 710  		return fmt.Errorf("marshal stats: %w", err)
 711  	}
 712  
 713  	if err := c.db.SetMarker(markerFrontierKey, data); err != nil {
 714  		return fmt.Errorf("save frontier: %w", err)
 715  	}
 716  	if err := c.db.SetMarker(markerStatsKey, statsData); err != nil {
 717  		return fmt.Errorf("save stats: %w", err)
 718  	}
 719  
 720  	return nil
 721  }
 722