main.go raw
1 package main
2
3 import (
4 "context"
5 "encoding/base64"
6 "encoding/binary"
7 "encoding/json"
8 "flag"
9 "fmt"
10 "math"
11 "os"
12 "runtime"
13 "strconv"
14 "strings"
15 "sync"
16 "time"
17
18 "next.orly.dev/pkg/lol/chk"
19 "next.orly.dev/pkg/lol/log"
20 "next.orly.dev/pkg/nostr/interfaces/signer/p8k"
21 "github.com/minio/sha256-simd"
22 "next.orly.dev/pkg/nostr/encoders/bech32encoding"
23 "next.orly.dev/pkg/nostr/encoders/event"
24 "next.orly.dev/pkg/nostr/encoders/filter"
25 "next.orly.dev/pkg/nostr/encoders/hex"
26 "next.orly.dev/pkg/nostr/encoders/kind"
27 "next.orly.dev/pkg/nostr/encoders/tag"
28 "next.orly.dev/pkg/nostr/encoders/timestamp"
29 "next.orly.dev/pkg/nostr/interfaces/signer"
30 "next.orly.dev/pkg/nostr/ws"
31 )
32
33 const (
34 // Bloom filter parameters for ~0.1% false positive rate with 1M events
35 bloomFilterBits = 14377588 // ~1.75MB for 1M events at 0.1% FPR
36 bloomFilterHashFuncs = 10 // Optimal number of hash functions
37 maxMemoryMB = 256 // Maximum memory usage in MB
38 memoryCheckInterval = 30 * time.Second
39
40 // Rate limiting parameters
41 baseRetryDelay = 1 * time.Second
42 maxRetryDelay = 60 * time.Second
43 maxRetries = 5
44 batchSize = time.Hour * 24 * 7 // 1 week batches
45
46 // Timeout parameters
47 maxRunTime = 30 * time.Minute // Maximum total runtime
48 relayTimeout = 5 * time.Minute // Timeout per relay
49 stuckProgressTimeout = 2 * time.Minute // Timeout if no progress is made
50 )
51
52 var relays = []string{
53 "wss://nostr.wine/",
54 "wss://nostr.land/",
55 "wss://orly-relay.imwald.eu",
56 "wss://relay.orly.dev/",
57 "wss://relay.damus.io/",
58 "wss://nos.lol/",
59 "wss://theforest.nostr1.com/",
60 }
61
62 // BloomFilter implements a memory-efficient bloom filter for event deduplication
63 type BloomFilter struct {
64 bits []byte
65 size uint32
66 hashFunc int
67 mutex sync.RWMutex
68 }
69
70 // NewBloomFilter creates a new bloom filter with specified parameters
71 func NewBloomFilter(bits uint32, hashFuncs int) *BloomFilter {
72 return &BloomFilter{
73 bits: make([]byte, (bits+7)/8), // Round up to nearest byte
74 size: bits,
75 hashFunc: hashFuncs,
76 }
77 }
78
79 // hash generates multiple hash values for a given input using SHA256
80 func (bf *BloomFilter) hash(data []byte) []uint32 {
81 hashes := make([]uint32, bf.hashFunc)
82
83 // Use SHA256 as base hash
84 baseHash := sha256.Sum256(data)
85
86 // Generate multiple hash values by combining with different salts
87 for i := 0; i < bf.hashFunc; i++ {
88 // Create salt by appending index
89 saltedData := make([]byte, len(baseHash)+4)
90 copy(saltedData, baseHash[:])
91 binary.LittleEndian.PutUint32(saltedData[len(baseHash):], uint32(i))
92
93 // Hash the salted data
94 h := sha256.Sum256(saltedData)
95
96 // Convert first 4 bytes to uint32 and mod by filter size
97 hashVal := binary.LittleEndian.Uint32(h[:4])
98 hashes[i] = hashVal % bf.size
99 }
100
101 return hashes
102 }
103
104 // Add adds an item to the bloom filter
105 func (bf *BloomFilter) Add(data []byte) {
106 bf.mutex.Lock()
107 defer bf.mutex.Unlock()
108
109 hashes := bf.hash(data)
110 for _, h := range hashes {
111 byteIndex := h / 8
112 bitIndex := h % 8
113 bf.bits[byteIndex] |= 1 << bitIndex
114 }
115 }
116
117 // Contains checks if an item might be in the bloom filter
118 func (bf *BloomFilter) Contains(data []byte) bool {
119 bf.mutex.RLock()
120 defer bf.mutex.RUnlock()
121
122 hashes := bf.hash(data)
123 for _, h := range hashes {
124 byteIndex := h / 8
125 bitIndex := h % 8
126 if bf.bits[byteIndex]&(1<<bitIndex) == 0 {
127 return false
128 }
129 }
130 return true
131 }
132
133 // EstimatedItems estimates the number of items added to the filter
134 func (bf *BloomFilter) EstimatedItems() uint32 {
135 bf.mutex.RLock()
136 defer bf.mutex.RUnlock()
137
138 // Count set bits
139 setBits := uint32(0)
140 for _, b := range bf.bits {
141 for i := 0; i < 8; i++ {
142 if b&(1<<i) != 0 {
143 setBits++
144 }
145 }
146 }
147
148 // Estimate items using bloom filter formula: n ≈ -(m/k) * ln(1 - X/m)
149 // where m = filter size, k = hash functions, X = set bits
150 if setBits == 0 {
151 return 0
152 }
153
154 m := float64(bf.size)
155 k := float64(bf.hashFunc)
156 x := float64(setBits)
157
158 if x >= m {
159 return uint32(m / k) // Saturated filter
160 }
161
162 estimated := -(m / k) * math.Log(1-(x/m))
163 return uint32(estimated)
164 }
165
166 // MemoryUsage returns the memory usage in bytes
167 func (bf *BloomFilter) MemoryUsage() int {
168 return len(bf.bits)
169 }
170
171 // ToBase64 serializes the bloom filter to a base64 encoded string
172 func (bf *BloomFilter) ToBase64() string {
173 bf.mutex.RLock()
174 defer bf.mutex.RUnlock()
175
176 // Create a serialization format: [size:4][hashFunc:4][bits:variable]
177 serialized := make([]byte, 8+len(bf.bits))
178
179 // Write size (4 bytes)
180 binary.LittleEndian.PutUint32(serialized[0:4], bf.size)
181
182 // Write hash function count (4 bytes)
183 binary.LittleEndian.PutUint32(serialized[4:8], uint32(bf.hashFunc))
184
185 // Write bits data
186 copy(serialized[8:], bf.bits)
187
188 return base64.StdEncoding.EncodeToString(serialized)
189 }
190
191 // FromBase64 deserializes a bloom filter from a base64 encoded string
192 func FromBase64(encoded string) (*BloomFilter, error) {
193 data, err := base64.StdEncoding.DecodeString(encoded)
194 if err != nil {
195 return nil, fmt.Errorf("failed to decode base64: %w", err)
196 }
197
198 if len(data) < 8 {
199 return nil, fmt.Errorf("invalid bloom filter data: too short")
200 }
201
202 // Read size (4 bytes)
203 size := binary.LittleEndian.Uint32(data[0:4])
204
205 // Read hash function count (4 bytes)
206 hashFunc := int(binary.LittleEndian.Uint32(data[4:8]))
207
208 // Read bits data
209 bits := make([]byte, len(data)-8)
210 copy(bits, data[8:])
211
212 // Validate that the bits length matches the expected size
213 expectedBytesLen := (size + 7) / 8
214 if uint32(len(bits)) != expectedBytesLen {
215 return nil, fmt.Errorf("invalid bloom filter data: bits length mismatch")
216 }
217
218 return &BloomFilter{
219 bits: bits,
220 size: size,
221 hashFunc: hashFunc,
222 }, nil
223 }
224
225 // RelayState tracks the state and rate limiting for each relay
226 type RelayState struct {
227 url string
228 retryCount int
229 nextRetryTime time.Time
230 rateLimited bool
231 completed bool
232 mutex sync.RWMutex
233 }
234
235 // TimeWindow represents a time range for progressive fetching
236 type TimeWindow struct {
237 since *timestamp.T
238 until *timestamp.T
239 }
240
241 // CompletionTracker tracks which relay-time window combinations have been completed
242 type CompletionTracker struct {
243 completed map[string]map[string]bool // relay -> timewindow -> completed
244 mutex sync.RWMutex
245 }
246
247 func NewCompletionTracker() *CompletionTracker {
248 return &CompletionTracker{
249 completed: make(map[string]map[string]bool),
250 }
251 }
252
253 func (ct *CompletionTracker) MarkCompleted(relayURL string, timeWindow string) {
254 ct.mutex.Lock()
255 defer ct.mutex.Unlock()
256
257 if ct.completed[relayURL] == nil {
258 ct.completed[relayURL] = make(map[string]bool)
259 }
260 ct.completed[relayURL][timeWindow] = true
261 }
262
263 func (ct *CompletionTracker) IsCompleted(relayURL string, timeWindow string) bool {
264 ct.mutex.RLock()
265 defer ct.mutex.RUnlock()
266
267 if ct.completed[relayURL] == nil {
268 return false
269 }
270 return ct.completed[relayURL][timeWindow]
271 }
272
273 func (ct *CompletionTracker) GetCompletionStatus() (completed, total int) {
274 ct.mutex.RLock()
275 defer ct.mutex.RUnlock()
276
277 for _, windows := range ct.completed {
278 for _, isCompleted := range windows {
279 total++
280 if isCompleted {
281 completed++
282 }
283 }
284 }
285 return
286 }
287
288 type Aggregator struct {
289 npub string
290 pubkeyBytes []byte
291 seenEvents *BloomFilter
292 seenRelays map[string]bool
293 relayQueue chan string
294 relayMutex sync.RWMutex
295 ctx context.Context
296 cancel context.CancelFunc
297 since *timestamp.T
298 until *timestamp.T
299 wg sync.WaitGroup
300 progressiveEnd *timestamp.T
301 memoryTicker *time.Ticker
302 eventCount uint64
303 relayStates map[string]*RelayState
304 relayStatesMutex sync.RWMutex
305 completionTracker *CompletionTracker
306 timeWindows []TimeWindow
307 // Track actual time range of processed events
308 actualSince *timestamp.T
309 actualUntil *timestamp.T
310 timeMutex sync.RWMutex
311 // Bloom filter file for loading existing state
312 bloomFilterFile string
313 appendMode bool
314 // Progress tracking for timeout detection
315 startTime time.Time
316 lastProgress int
317 lastProgressTime time.Time
318 progressMutex sync.RWMutex
319 // Authentication support
320 signer signer.I // Optional signer for relay authentication
321 hasPrivateKey bool // Whether we have a private key for auth
322 }
323
324 func NewAggregator(keyInput string, since, until *timestamp.T, bloomFilterFile string) (agg *Aggregator, err error) {
325 var pubkeyBytes []byte
326 var signer signer.I
327 var hasPrivateKey bool
328
329 // Determine if input is nsec (private key) or npub (public key)
330 if strings.HasPrefix(keyInput, "nsec") {
331 // Handle nsec (private key) - derive pubkey and enable authentication
332 var secretBytes []byte
333 if secretBytes, err = bech32encoding.NsecToBytes(keyInput); chk.E(err) {
334 return nil, fmt.Errorf("failed to decode nsec: %w", err)
335 }
336
337 // Create signer from private key
338 var signerErr error
339 if signer, signerErr = p8k.New(); signerErr != nil {
340 return nil, fmt.Errorf("failed to create signer: %w", signerErr)
341 }
342 if err = signer.InitSec(secretBytes); chk.E(err) {
343 return nil, fmt.Errorf("failed to initialize signer: %w", err)
344 }
345
346 // Get public key from signer
347 pubkeyBytes = signer.Pub()
348 hasPrivateKey = true
349
350 log.I.F("using private key (nsec) - authentication enabled")
351 } else if strings.HasPrefix(keyInput, "npub") {
352 // Handle npub (public key only) - no authentication
353 if pubkeyBytes, err = bech32encoding.NpubToBytes(keyInput); chk.E(err) {
354 return nil, fmt.Errorf("failed to decode npub: %w", err)
355 }
356 hasPrivateKey = false
357
358 log.I.F("using public key (npub) - authentication disabled")
359 } else {
360 return nil, fmt.Errorf("key input must start with 'nsec' or 'npub', got: %s", keyInput[:4])
361 }
362
363 ctx, cancel := context.WithCancel(context.Background())
364
365 // Set progressive end to current time if until is not specified
366 progressiveEnd := until
367 if progressiveEnd == nil {
368 progressiveEnd = timestamp.Now()
369 }
370
371 // Initialize bloom filter - either new or loaded from file
372 var bloomFilter *BloomFilter
373 var appendMode bool
374
375 if bloomFilterFile != "" {
376 // Try to load existing bloom filter
377 if bloomFilter, err = loadBloomFilterFromFile(bloomFilterFile); err != nil {
378 log.W.F("failed to load bloom filter from %s: %v, creating new filter", bloomFilterFile, err)
379 bloomFilter = NewBloomFilter(bloomFilterBits, bloomFilterHashFuncs)
380 } else {
381 log.I.F("loaded existing bloom filter from %s", bloomFilterFile)
382 appendMode = true
383 }
384 } else {
385 bloomFilter = NewBloomFilter(bloomFilterBits, bloomFilterHashFuncs)
386 }
387
388 agg = &Aggregator{
389 npub: keyInput,
390 pubkeyBytes: pubkeyBytes,
391 seenEvents: bloomFilter,
392 seenRelays: make(map[string]bool),
393 relayQueue: make(chan string, 100),
394 ctx: ctx,
395 cancel: cancel,
396 since: since,
397 until: until,
398 progressiveEnd: progressiveEnd,
399 memoryTicker: time.NewTicker(memoryCheckInterval),
400 eventCount: 0,
401 relayStates: make(map[string]*RelayState),
402 completionTracker: NewCompletionTracker(),
403 bloomFilterFile: bloomFilterFile,
404 appendMode: appendMode,
405 startTime: time.Now(),
406 lastProgress: 0,
407 lastProgressTime: time.Now(),
408 signer: signer,
409 hasPrivateKey: hasPrivateKey,
410 }
411
412 // Calculate time windows for progressive fetching
413 agg.calculateTimeWindows()
414
415 // Add initial relays to queue
416 for _, relayURL := range relays {
417 agg.addRelay(relayURL)
418 }
419
420 return
421 }
422
423 // loadBloomFilterFromFile loads a bloom filter from a file containing base64 encoded data
424 func loadBloomFilterFromFile(filename string) (*BloomFilter, error) {
425 data, err := os.ReadFile(filename)
426 if err != nil {
427 return nil, fmt.Errorf("failed to read file: %w", err)
428 }
429
430 // Find the base64 data between the markers
431 content := string(data)
432 startMarker := "Bloom filter (base64):\n"
433 endMarker := "\n=== END BLOOM FILTER ==="
434
435 startIdx := strings.Index(content, startMarker)
436 if startIdx == -1 {
437 return nil, fmt.Errorf("bloom filter start marker not found")
438 }
439 startIdx += len(startMarker)
440
441 endIdx := strings.Index(content[startIdx:], endMarker)
442 if endIdx == -1 {
443 return nil, fmt.Errorf("bloom filter end marker not found")
444 }
445
446 base64Data := strings.TrimSpace(content[startIdx : startIdx+endIdx])
447 return FromBase64(base64Data)
448 }
449
450 // updateActualTimeRange updates the actual time range of processed events
451 func (a *Aggregator) updateActualTimeRange(eventTime *timestamp.T) {
452 a.timeMutex.Lock()
453 defer a.timeMutex.Unlock()
454
455 if a.actualSince == nil || eventTime.I64() < a.actualSince.I64() {
456 a.actualSince = eventTime
457 }
458
459 if a.actualUntil == nil || eventTime.I64() > a.actualUntil.I64() {
460 a.actualUntil = eventTime
461 }
462 }
463
464 // getActualTimeRange returns the actual time range of processed events
465 func (a *Aggregator) getActualTimeRange() (since, until *timestamp.T) {
466 a.timeMutex.RLock()
467 defer a.timeMutex.RUnlock()
468 return a.actualSince, a.actualUntil
469 }
470
471 // calculateTimeWindows pre-calculates all time windows for progressive fetching
472 func (a *Aggregator) calculateTimeWindows() {
473 if a.since == nil {
474 // If no since time, we'll just work backwards from progressiveEnd
475 // We can't pre-calculate windows without a start time
476 return
477 }
478
479 var windows []TimeWindow
480 currentUntil := a.progressiveEnd
481
482 for currentUntil.I64() > a.since.I64() {
483 currentSince := timestamp.FromUnix(currentUntil.I64() - int64(batchSize.Seconds()))
484 if currentSince.I64() < a.since.I64() {
485 currentSince = a.since
486 }
487
488 windows = append(windows, TimeWindow{
489 since: currentSince,
490 until: currentUntil,
491 })
492
493 currentUntil = currentSince
494 if currentUntil.I64() <= a.since.I64() {
495 break
496 }
497 }
498
499 a.timeWindows = windows
500 log.I.F("calculated %d time windows for progressive fetching", len(windows))
501 }
502
503 // getOrCreateRelayState gets or creates a relay state for rate limiting
504 func (a *Aggregator) getOrCreateRelayState(relayURL string) *RelayState {
505 a.relayStatesMutex.Lock()
506 defer a.relayStatesMutex.Unlock()
507
508 if state, exists := a.relayStates[relayURL]; exists {
509 return state
510 }
511
512 state := &RelayState{
513 url: relayURL,
514 retryCount: 0,
515 nextRetryTime: time.Now(),
516 rateLimited: false,
517 completed: false,
518 }
519 a.relayStates[relayURL] = state
520 return state
521 }
522
523 // shouldRetryRelay checks if a relay should be retried based on rate limiting
524 func (a *Aggregator) shouldRetryRelay(relayURL string) bool {
525 state := a.getOrCreateRelayState(relayURL)
526 state.mutex.RLock()
527 defer state.mutex.RUnlock()
528
529 if state.completed {
530 return false
531 }
532
533 if state.rateLimited && time.Now().Before(state.nextRetryTime) {
534 return false
535 }
536
537 return state.retryCount < maxRetries
538 }
539
540 // markRelayRateLimited marks a relay as rate limited and sets retry time
541 func (a *Aggregator) markRelayRateLimited(relayURL string) {
542 state := a.getOrCreateRelayState(relayURL)
543 state.mutex.Lock()
544 defer state.mutex.Unlock()
545
546 state.rateLimited = true
547 state.retryCount++
548
549 if state.retryCount >= maxRetries {
550 log.W.F("relay %s permanently failed after %d retries", relayURL, maxRetries)
551 state.completed = true // Mark as completed to exclude from future attempts
552 return
553 }
554
555 // Exponential backoff with jitter
556 delay := time.Duration(float64(baseRetryDelay) * math.Pow(2, float64(state.retryCount-1)))
557 if delay > maxRetryDelay {
558 delay = maxRetryDelay
559 }
560
561 state.nextRetryTime = time.Now().Add(delay)
562 log.W.F("relay %s rate limited, retry %d/%d in %v", relayURL, state.retryCount, maxRetries, delay)
563 }
564
565 // markRelayCompleted marks a relay as completed for all time windows
566 func (a *Aggregator) markRelayCompleted(relayURL string) {
567 state := a.getOrCreateRelayState(relayURL)
568 state.mutex.Lock()
569 defer state.mutex.Unlock()
570
571 state.completed = true
572 log.I.F("relay %s marked as completed", relayURL)
573 }
574
575 // checkAllCompleted checks if all relay-time window combinations are completed
576 func (a *Aggregator) checkAllCompleted() bool {
577 if len(a.timeWindows) == 0 {
578 // If no time windows calculated, we can't determine completion
579 return false
580 }
581
582 a.relayStatesMutex.RLock()
583 allRelays := make([]string, 0, len(a.relayStates))
584 for relayURL := range a.relayStates {
585 allRelays = append(allRelays, relayURL)
586 }
587 a.relayStatesMutex.RUnlock()
588
589 // Check if all relay-time window combinations are completed
590 totalCombinations := len(allRelays) * len(a.timeWindows)
591 completedCombinations := 0
592 availableCombinations := 0 // Combinations from relays that haven't permanently failed
593
594 for _, relayURL := range allRelays {
595 state := a.getOrCreateRelayState(relayURL)
596 state.mutex.RLock()
597 isRelayFailed := state.retryCount >= maxRetries
598 state.mutex.RUnlock()
599
600 for _, window := range a.timeWindows {
601 windowKey := fmt.Sprintf("%d-%d", window.since.I64(), window.until.I64())
602 if a.completionTracker.IsCompleted(relayURL, windowKey) {
603 completedCombinations++
604 }
605
606 // Only count combinations from relays that haven't permanently failed
607 if !isRelayFailed {
608 availableCombinations++
609 }
610 }
611 }
612
613 // Update progress tracking
614 a.progressMutex.Lock()
615 if completedCombinations > a.lastProgress {
616 a.lastProgress = completedCombinations
617 a.lastProgressTime = time.Now()
618 }
619 a.progressMutex.Unlock()
620
621 if totalCombinations > 0 {
622 progress := float64(completedCombinations) / float64(totalCombinations) * 100
623 log.I.F("completion progress: %d/%d (%.1f%%) - available: %d", completedCombinations, totalCombinations, progress, availableCombinations)
624
625 // Consider complete if we've finished all available combinations (excluding permanently failed relays)
626 if availableCombinations > 0 {
627 return completedCombinations >= availableCombinations
628 }
629 return completedCombinations == totalCombinations
630 }
631
632 return false
633 }
634
635 func (a *Aggregator) isEventSeen(eventID string) (seen bool) {
636 return a.seenEvents.Contains([]byte(eventID))
637 }
638
639 func (a *Aggregator) markEventSeen(eventID string) {
640 a.seenEvents.Add([]byte(eventID))
641 a.eventCount++
642 }
643
644 func (a *Aggregator) addRelay(relayURL string) {
645 a.relayMutex.Lock()
646 defer a.relayMutex.Unlock()
647
648 if !a.seenRelays[relayURL] {
649 a.seenRelays[relayURL] = true
650 select {
651 case a.relayQueue <- relayURL:
652 log.I.F("added new relay to queue: %s", relayURL)
653 default:
654 log.W.F("relay queue full, skipping: %s", relayURL)
655 }
656 }
657 }
658
659 func (a *Aggregator) processRelayListEvent(ev *event.E) {
660 // Extract relay URLs from "r" tags in kind 10002 events
661 if ev.Kind != 10002 { // RelayListMetadata
662 return
663 }
664
665 log.I.F("processing relay list event from %s", hex.Enc(ev.Pubkey))
666
667 for _, tag := range ev.Tags.GetAll([]byte("r")) {
668 if len(tag.T) >= 2 {
669 relayURL := string(tag.T[1])
670 if relayURL != "" {
671 log.I.F("discovered relay from relay list: %s", relayURL)
672 a.addRelay(relayURL)
673 }
674 }
675 }
676 }
677
678 func (a *Aggregator) outputEvent(ev *event.E) (err error) {
679 // Convert event to JSON and output to stdout
680 var jsonBytes []byte
681 if jsonBytes, err = json.Marshal(map[string]interface{}{
682 "id": hex.Enc(ev.ID),
683 "pubkey": hex.Enc(ev.Pubkey),
684 "created_at": ev.CreatedAt,
685 "kind": ev.Kind,
686 "tags": ev.Tags,
687 "content": string(ev.Content),
688 "sig": hex.Enc(ev.Sig),
689 }); chk.E(err) {
690 return fmt.Errorf("failed to marshal event to JSON: %w", err)
691 }
692
693 fmt.Println(string(jsonBytes))
694 return
695 }
696
697 func (a *Aggregator) connectToRelay(relayURL string) {
698 defer func() {
699 log.I.F("relay connection finished: %s", relayURL)
700 a.wg.Done()
701 }()
702
703 log.I.F("connecting to relay: %s", relayURL)
704
705 // Create context with timeout for connection
706 connCtx, connCancel := context.WithTimeout(a.ctx, 10*time.Second)
707 defer connCancel()
708
709 // Connect to relay
710 var client *ws.Client
711 var err error
712 if client, err = ws.RelayConnect(connCtx, relayURL); chk.E(err) {
713 log.E.F("failed to connect to relay %s: %v", relayURL, err)
714 return
715 }
716 defer client.Close()
717
718 log.I.F("connected to relay: %s", relayURL)
719
720 // Attempt authentication if we have a private key
721 if a.hasPrivateKey && a.signer != nil {
722 authCtx, authCancel := context.WithTimeout(a.ctx, 5*time.Second)
723 defer authCancel()
724
725 if err = client.Auth(authCtx, a.signer); err != nil {
726 log.W.F("authentication failed for relay %s: %v", relayURL, err)
727 // Continue without authentication - some relays may not require it
728 } else {
729 log.I.F("successfully authenticated to relay: %s", relayURL)
730 }
731 }
732
733 // Perform progressive backward fetching
734 a.progressiveFetch(client, relayURL)
735 }
736
737 func (a *Aggregator) progressiveFetch(client *ws.Client, relayURL string) {
738 // Check if relay should be retried
739 if !a.shouldRetryRelay(relayURL) {
740 log.W.F("skipping relay %s due to rate limiting or max retries", relayURL)
741 return
742 }
743
744 // Create hex-encoded pubkey for p-tags
745 pubkeyHex := hex.Enc(a.pubkeyBytes)
746
747 // Use pre-calculated time windows if available, otherwise calculate on the fly
748 var windows []TimeWindow
749 if len(a.timeWindows) > 0 {
750 windows = a.timeWindows
751 } else {
752 // Fallback to dynamic calculation for unlimited time ranges
753 currentUntil := a.progressiveEnd
754 for {
755 currentSince := timestamp.FromUnix(currentUntil.I64() - int64(batchSize.Seconds()))
756 if a.since != nil && currentSince.I64() < a.since.I64() {
757 currentSince = a.since
758 }
759
760 windows = append(windows, TimeWindow{
761 since: currentSince,
762 until: currentUntil,
763 })
764
765 currentUntil = currentSince
766 if a.since != nil && currentUntil.I64() <= a.since.I64() {
767 break
768 }
769
770 // Prevent infinite loops for unlimited ranges
771 if len(windows) > 1000 {
772 log.W.F("limiting to 1000 time windows for relay %s", relayURL)
773 break
774 }
775 }
776 }
777
778 // Process each time window
779 for _, window := range windows {
780 windowKey := fmt.Sprintf("%d-%d", window.since.I64(), window.until.I64())
781
782 // Skip if already completed
783 if a.completionTracker.IsCompleted(relayURL, windowKey) {
784 continue
785 }
786
787 select {
788 case <-a.ctx.Done():
789 log.I.F("context cancelled, stopping progressive fetch for relay %s", relayURL)
790 return
791 default:
792 }
793
794 log.I.F("fetching batch from %s: %d to %d", relayURL, window.since.I64(), window.until.I64())
795
796 // Try to fetch this time window with retry logic
797 success := a.fetchTimeWindow(client, relayURL, window, pubkeyHex)
798
799 if success {
800 // Mark this time window as completed for this relay
801 a.completionTracker.MarkCompleted(relayURL, windowKey)
802 } else {
803 // If fetch failed, mark relay as rate limited and return
804 a.markRelayRateLimited(relayURL)
805 return
806 }
807 }
808
809 // Mark relay as completed for all time windows
810 a.markRelayCompleted(relayURL)
811 log.I.F("completed all time windows for relay %s", relayURL)
812 }
813
814 func (a *Aggregator) fetchTimeWindow(client *ws.Client, relayURL string, window TimeWindow, pubkeyHex string) bool {
815 // Create filters for this time batch
816 f1 := &filter.F{
817 Authors: tag.NewFromBytesSlice(a.pubkeyBytes),
818 Since: window.since,
819 Until: window.until,
820 }
821
822 f2 := &filter.F{
823 Tags: tag.NewSWithCap(1),
824 Since: window.since,
825 Until: window.until,
826 }
827 pTag := tag.NewFromAny("p", pubkeyHex)
828 f2.Tags.Append(pTag)
829
830 // Add relay list filter to discover new relays
831 f3 := &filter.F{
832 Authors: tag.NewFromBytesSlice(a.pubkeyBytes),
833 Kinds: kind.NewS(kind.New(10002)), // RelayListMetadata
834 Since: window.since,
835 Until: window.until,
836 }
837
838 // Subscribe to events using all filters with a dedicated context and timeout
839 // Use a longer timeout to avoid premature cancellation by completion monitor
840 subCtx, subCancel := context.WithTimeout(context.Background(), 10*time.Minute)
841
842 var sub *ws.Subscription
843 var err error
844 if sub, err = client.Subscribe(subCtx, filter.NewS(f1, f2, f3)); chk.E(err) {
845 subCancel() // Cancel context on error
846 log.E.F("failed to subscribe to relay %s: %v", relayURL, err)
847 return false
848 }
849
850 // Ensure subscription is cleaned up when we're done
851 defer func() {
852 sub.Unsub()
853 subCancel()
854 }()
855
856 log.I.F("subscribed to batch from %s for pubkey %s (authored by, mentioning, and relay lists)", relayURL, a.npub)
857
858 // Process events for this batch
859 batchComplete := false
860 rateLimited := false
861
862 for !batchComplete && !rateLimited {
863 select {
864 case <-a.ctx.Done():
865 log.I.F("aggregator context cancelled, stopping batch for relay %s", relayURL)
866 return false
867 case <-subCtx.Done():
868 log.W.F("subscription timeout for relay %s", relayURL)
869 return false
870 case ev := <-sub.Events:
871 if ev == nil {
872 log.I.F("event channel closed for relay %s", relayURL)
873 return false
874 }
875
876 eventID := hex.Enc(ev.ID)
877
878 // Check if we've already seen this event
879 if a.isEventSeen(eventID) {
880 continue
881 }
882
883 // Mark event as seen
884 a.markEventSeen(eventID)
885
886 // Update actual time range
887 a.updateActualTimeRange(timestamp.FromUnix(ev.CreatedAt))
888
889 // Process relay list events to discover new relays
890 if ev.Kind == 10002 {
891 a.processRelayListEvent(ev)
892 }
893
894 // Output event to stdout
895 if err = a.outputEvent(ev); chk.E(err) {
896 log.E.F("failed to output event: %v", err)
897 }
898 case <-sub.EndOfStoredEvents:
899 log.I.F("end of stored events for batch on relay %s", relayURL)
900 batchComplete = true
901 case reason := <-sub.ClosedReason:
902 reasonStr := string(reason)
903 log.W.F("subscription closed for relay %s: %s", relayURL, reasonStr)
904
905 // Check for rate limiting messages
906 if a.isRateLimitMessage(reasonStr) {
907 log.W.F("detected rate limiting from relay %s", relayURL)
908 rateLimited = true
909 }
910
911 sub.Unsub()
912 return !rateLimited
913 // Note: NOTICE messages are handled at the client level, not subscription level
914 // Rate limiting detection will primarily rely on CLOSED messages
915 }
916 }
917
918 sub.Unsub()
919 return !rateLimited
920 }
921
922 // isRateLimitMessage checks if a message indicates rate limiting
923 func (a *Aggregator) isRateLimitMessage(message string) bool {
924 message = strings.ToLower(message)
925 rateLimitIndicators := []string{
926 "too many",
927 "rate limit",
928 "slow down",
929 "concurrent req",
930 "throttle",
931 "backoff",
932 }
933
934 for _, indicator := range rateLimitIndicators {
935 if strings.Contains(message, indicator) {
936 return true
937 }
938 }
939 return false
940 }
941
942 func (a *Aggregator) Start() (err error) {
943 log.I.F("starting aggregator for key: %s", a.npub)
944 log.I.F("pubkey bytes: %s", hex.Enc(a.pubkeyBytes))
945 log.I.F("bloom filter: %d bits (%.2fMB), %d hash functions, ~0.1%% false positive rate",
946 bloomFilterBits, float64(a.seenEvents.MemoryUsage())/1024/1024, bloomFilterHashFuncs)
947
948 // Start memory monitoring goroutine
949 go a.memoryMonitor()
950
951 // Start relay processor goroutine
952 go a.processRelayQueue()
953
954 // Start completion monitoring goroutine
955 go a.completionMonitor()
956
957 // Add initial relay count to wait group
958 a.wg.Add(len(relays))
959 log.I.F("waiting for %d initial relay connections to complete", len(relays))
960
961 // Wait for all relay connections to finish OR completion
962 done := make(chan struct{})
963 go func() {
964 a.wg.Wait()
965 close(done)
966 }()
967
968 select {
969 case <-done:
970 log.I.F("all relay connections completed")
971 case <-a.ctx.Done():
972 log.I.F("aggregator terminated due to completion")
973 }
974
975 // Stop memory monitoring
976 a.memoryTicker.Stop()
977
978 // Output bloom filter summary
979 a.outputBloomFilter()
980
981 log.I.F("aggregator finished")
982 return
983 }
984
985 // completionMonitor periodically checks if all work is completed and terminates if so
986 func (a *Aggregator) completionMonitor() {
987 ticker := time.NewTicker(10 * time.Second) // Check every 10 seconds
988 defer ticker.Stop()
989
990 for {
991 select {
992 case <-a.ctx.Done():
993 return
994 case <-ticker.C:
995 // Check for various termination conditions
996 if a.shouldTerminate() {
997 return
998 }
999
1000 // Also check for rate-limited relays that can be retried
1001 a.retryRateLimitedRelays()
1002 }
1003 }
1004 }
1005
1006 // shouldTerminate checks various conditions that should cause the aggregator to terminate
1007 func (a *Aggregator) shouldTerminate() bool {
1008 now := time.Now()
1009
1010 // Check if all work is completed
1011 if a.checkAllCompleted() {
1012 log.I.F("all relay-time window combinations completed, terminating aggregator")
1013 a.cancel()
1014 return true
1015 }
1016
1017 // Check for maximum runtime timeout
1018 if now.Sub(a.startTime) > maxRunTime {
1019 log.W.F("maximum runtime (%v) exceeded, terminating aggregator", maxRunTime)
1020 a.cancel()
1021 return true
1022 }
1023
1024 // Check for stuck progress timeout
1025 a.progressMutex.RLock()
1026 timeSinceProgress := now.Sub(a.lastProgressTime)
1027 a.progressMutex.RUnlock()
1028
1029 if timeSinceProgress > stuckProgressTimeout {
1030 log.W.F("no progress made for %v, terminating aggregator", timeSinceProgress)
1031 a.cancel()
1032 return true
1033 }
1034
1035 return false
1036 }
1037
1038 // retryRateLimitedRelays checks for rate-limited relays that can be retried
1039 func (a *Aggregator) retryRateLimitedRelays() {
1040 a.relayStatesMutex.RLock()
1041 defer a.relayStatesMutex.RUnlock()
1042
1043 for relayURL, state := range a.relayStates {
1044 state.mutex.RLock()
1045 canRetry := state.rateLimited &&
1046 time.Now().After(state.nextRetryTime) &&
1047 state.retryCount < maxRetries &&
1048 !state.completed
1049 state.mutex.RUnlock()
1050
1051 if canRetry {
1052 log.I.F("retrying rate-limited relay: %s", relayURL)
1053
1054 // Reset rate limiting status
1055 state.mutex.Lock()
1056 state.rateLimited = false
1057 state.mutex.Unlock()
1058
1059 // Add back to queue for retry
1060 select {
1061 case a.relayQueue <- relayURL:
1062 a.wg.Add(1)
1063 default:
1064 log.W.F("relay queue full, skipping retry for %s", relayURL)
1065 }
1066 }
1067 }
1068 }
1069
1070 func (a *Aggregator) processRelayQueue() {
1071 initialRelayCount := len(relays)
1072 processedInitial := 0
1073
1074 for {
1075 select {
1076 case <-a.ctx.Done():
1077 log.I.F("relay queue processor stopping")
1078 return
1079 case relayURL := <-a.relayQueue:
1080 log.I.F("processing relay from queue: %s", relayURL)
1081
1082 // For dynamically discovered relays (after initial ones), add to wait group
1083 if processedInitial >= initialRelayCount {
1084 a.wg.Add(1)
1085 } else {
1086 processedInitial++
1087 }
1088
1089 go a.connectToRelay(relayURL)
1090 }
1091 }
1092 }
1093
1094 func (a *Aggregator) Stop() {
1095 a.cancel()
1096 if a.memoryTicker != nil {
1097 a.memoryTicker.Stop()
1098 }
1099 }
1100
1101 // outputBloomFilter outputs the bloom filter as base64 to stderr with statistics
1102 func (a *Aggregator) outputBloomFilter() {
1103 base64Filter := a.seenEvents.ToBase64()
1104 estimatedEvents := a.seenEvents.EstimatedItems()
1105 memoryUsage := float64(a.seenEvents.MemoryUsage()) / 1024 / 1024
1106
1107 // Get actual time range of processed events
1108 actualSince, actualUntil := a.getActualTimeRange()
1109
1110 // Output to stderr so it doesn't interfere with JSONL event output to stdout
1111 fmt.Fprintf(os.Stderr, "\n=== BLOOM FILTER SUMMARY ===\n")
1112 fmt.Fprintf(os.Stderr, "Events processed: %d\n", a.eventCount)
1113 fmt.Fprintf(os.Stderr, "Estimated unique events: %d\n", estimatedEvents)
1114 fmt.Fprintf(os.Stderr, "Bloom filter size: %.2f MB\n", memoryUsage)
1115 fmt.Fprintf(os.Stderr, "False positive rate: ~0.1%%\n")
1116 fmt.Fprintf(os.Stderr, "Hash functions: %d\n", bloomFilterHashFuncs)
1117
1118 // Output time range information
1119 if actualSince != nil && actualUntil != nil {
1120 fmt.Fprintf(os.Stderr, "Time range covered: %d to %d\n", actualSince.I64(), actualUntil.I64())
1121 fmt.Fprintf(os.Stderr, "Time range (human): %s to %s\n",
1122 time.Unix(actualSince.I64(), 0).UTC().Format(time.RFC3339),
1123 time.Unix(actualUntil.I64(), 0).UTC().Format(time.RFC3339))
1124 } else if a.since != nil && a.until != nil {
1125 // Fallback to requested range if no events were processed
1126 fmt.Fprintf(os.Stderr, "Requested time range: %d to %d\n", a.since.I64(), a.until.I64())
1127 fmt.Fprintf(os.Stderr, "Requested range (human): %s to %s\n",
1128 time.Unix(a.since.I64(), 0).UTC().Format(time.RFC3339),
1129 time.Unix(a.until.I64(), 0).UTC().Format(time.RFC3339))
1130 } else {
1131 fmt.Fprintf(os.Stderr, "Time range: unbounded\n")
1132 }
1133
1134 fmt.Fprintf(os.Stderr, "\nBloom filter (base64):\n%s\n", base64Filter)
1135 fmt.Fprintf(os.Stderr, "=== END BLOOM FILTER ===\n")
1136 }
1137
1138 // getMemoryUsageMB returns current memory usage in MB
1139 func (a *Aggregator) getMemoryUsageMB() float64 {
1140 var m runtime.MemStats
1141 runtime.ReadMemStats(&m)
1142 return float64(m.Alloc) / 1024 / 1024
1143 }
1144
1145 // memoryMonitor monitors memory usage and logs statistics
1146 func (a *Aggregator) memoryMonitor() {
1147 for {
1148 select {
1149 case <-a.ctx.Done():
1150 log.I.F("memory monitor stopping")
1151 return
1152 case <-a.memoryTicker.C:
1153 memUsage := a.getMemoryUsageMB()
1154 bloomMemMB := float64(a.seenEvents.MemoryUsage()) / 1024 / 1024
1155 estimatedEvents := a.seenEvents.EstimatedItems()
1156
1157 log.I.F("memory stats: total=%.2fMB, bloom=%.2fMB, events=%d, estimated_events=%d",
1158 memUsage, bloomMemMB, a.eventCount, estimatedEvents)
1159
1160 // Check if we're approaching memory limits
1161 if memUsage > maxMemoryMB {
1162 log.W.F("high memory usage detected: %.2fMB (limit: %dMB)", memUsage, maxMemoryMB)
1163
1164 // Force garbage collection
1165 runtime.GC()
1166
1167 // Check again after GC
1168 newMemUsage := a.getMemoryUsageMB()
1169 log.I.F("memory usage after GC: %.2fMB", newMemUsage)
1170
1171 // If still too high, warn but continue (bloom filter has fixed size)
1172 if newMemUsage > maxMemoryMB*1.2 {
1173 log.E.F("critical memory usage: %.2fMB, but continuing with bloom filter", newMemUsage)
1174 }
1175 }
1176 }
1177 }
1178 }
1179
1180 func parseTimestamp(s string) (ts *timestamp.T, err error) {
1181 if s == "" {
1182 return nil, nil
1183 }
1184
1185 var t int64
1186 if t, err = strconv.ParseInt(s, 10, 64); chk.E(err) {
1187 return nil, fmt.Errorf("invalid timestamp format: %w", err)
1188 }
1189
1190 ts = timestamp.FromUnix(t)
1191 return
1192 }
1193
1194 func main() {
1195 var keyInput string
1196 var sinceStr string
1197 var untilStr string
1198 var bloomFilterFile string
1199 var outputFile string
1200
1201 flag.StringVar(&keyInput, "key", "", "nsec (private key) or npub (public key) to search for events")
1202 flag.StringVar(&sinceStr, "since", "", "start timestamp (Unix timestamp) - only events after this time")
1203 flag.StringVar(&untilStr, "until", "", "end timestamp (Unix timestamp) - only events before this time")
1204 flag.StringVar(&bloomFilterFile, "filter", "", "file containing base64 encoded bloom filter to exclude already seen events")
1205 flag.StringVar(&outputFile, "output", "", "output file for events (default: stdout)")
1206 flag.Parse()
1207
1208 if keyInput == "" {
1209 fmt.Fprintf(os.Stderr, "Usage: %s -key <nsec|npub> [-since <timestamp>] [-until <timestamp>] [-filter <file>] [-output <file>]\n", os.Args[0])
1210 fmt.Fprintf(os.Stderr, "Example: %s -key npub1... -since 1640995200 -until 1672531200 -filter bloom.txt -output events.jsonl\n", os.Args[0])
1211 fmt.Fprintf(os.Stderr, "Example: %s -key nsec1... -since 1640995200 -until 1672531200 -output events.jsonl\n", os.Args[0])
1212 fmt.Fprintf(os.Stderr, "\nKey types:\n")
1213 fmt.Fprintf(os.Stderr, " nsec: Private key (enables authentication to relays that require it)\n")
1214 fmt.Fprintf(os.Stderr, " npub: Public key (authentication disabled)\n")
1215 fmt.Fprintf(os.Stderr, "\nTimestamps should be Unix timestamps (seconds since epoch)\n")
1216 fmt.Fprintf(os.Stderr, "If -filter is provided, output will be appended to the output file\n")
1217 os.Exit(1)
1218 }
1219
1220 var since, until *timestamp.T
1221 var err error
1222
1223 if since, err = parseTimestamp(sinceStr); chk.E(err) {
1224 fmt.Fprintf(os.Stderr, "Error parsing since timestamp: %v\n", err)
1225 os.Exit(1)
1226 }
1227
1228 if until, err = parseTimestamp(untilStr); chk.E(err) {
1229 fmt.Fprintf(os.Stderr, "Error parsing until timestamp: %v\n", err)
1230 os.Exit(1)
1231 }
1232
1233 // Validate that since is before until if both are provided
1234 if since != nil && until != nil && since.I64() >= until.I64() {
1235 fmt.Fprintf(os.Stderr, "Error: since timestamp must be before until timestamp\n")
1236 os.Exit(1)
1237 }
1238
1239 // Set up output redirection if needed
1240 if outputFile != "" {
1241 var file *os.File
1242 if bloomFilterFile != "" {
1243 // Append mode if bloom filter is provided
1244 file, err = os.OpenFile(outputFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
1245 } else {
1246 // Truncate mode if no bloom filter
1247 file, err = os.OpenFile(outputFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
1248 }
1249 if err != nil {
1250 fmt.Fprintf(os.Stderr, "Error opening output file: %v\n", err)
1251 os.Exit(1)
1252 }
1253 defer file.Close()
1254
1255 // Redirect stdout to file
1256 os.Stdout = file
1257 }
1258
1259 var agg *Aggregator
1260 if agg, err = NewAggregator(keyInput, since, until, bloomFilterFile); chk.E(err) {
1261 fmt.Fprintf(os.Stderr, "Error creating aggregator: %v\n", err)
1262 os.Exit(1)
1263 }
1264
1265 if err = agg.Start(); chk.E(err) {
1266 fmt.Fprintf(os.Stderr, "Error running aggregator: %v\n", err)
1267 os.Exit(1)
1268 }
1269 }
1270