main.go raw

   1  package main
   2  
   3  import (
   4  	"context"
   5  	"encoding/base64"
   6  	"encoding/binary"
   7  	"encoding/json"
   8  	"flag"
   9  	"fmt"
  10  	"math"
  11  	"os"
  12  	"runtime"
  13  	"strconv"
  14  	"strings"
  15  	"sync"
  16  	"time"
  17  
  18  	"next.orly.dev/pkg/lol/chk"
  19  	"next.orly.dev/pkg/lol/log"
  20  	"next.orly.dev/pkg/nostr/interfaces/signer/p8k"
  21  	"github.com/minio/sha256-simd"
  22  	"next.orly.dev/pkg/nostr/encoders/bech32encoding"
  23  	"next.orly.dev/pkg/nostr/encoders/event"
  24  	"next.orly.dev/pkg/nostr/encoders/filter"
  25  	"next.orly.dev/pkg/nostr/encoders/hex"
  26  	"next.orly.dev/pkg/nostr/encoders/kind"
  27  	"next.orly.dev/pkg/nostr/encoders/tag"
  28  	"next.orly.dev/pkg/nostr/encoders/timestamp"
  29  	"next.orly.dev/pkg/nostr/interfaces/signer"
  30  	"next.orly.dev/pkg/nostr/ws"
  31  )
  32  
  33  const (
  34  	// Bloom filter parameters for ~0.1% false positive rate with 1M events
  35  	bloomFilterBits      = 14377588 // ~1.75MB for 1M events at 0.1% FPR
  36  	bloomFilterHashFuncs = 10       // Optimal number of hash functions
  37  	maxMemoryMB          = 256      // Maximum memory usage in MB
  38  	memoryCheckInterval  = 30 * time.Second
  39  
  40  	// Rate limiting parameters
  41  	baseRetryDelay = 1 * time.Second
  42  	maxRetryDelay  = 60 * time.Second
  43  	maxRetries     = 5
  44  	batchSize      = time.Hour * 24 * 7 // 1 week batches
  45  
  46  	// Timeout parameters
  47  	maxRunTime           = 30 * time.Minute // Maximum total runtime
  48  	relayTimeout         = 5 * time.Minute  // Timeout per relay
  49  	stuckProgressTimeout = 2 * time.Minute  // Timeout if no progress is made
  50  )
  51  
  52  var relays = []string{
  53  	"wss://nostr.wine/",
  54  	"wss://nostr.land/",
  55  	"wss://orly-relay.imwald.eu",
  56  	"wss://relay.orly.dev/",
  57  	"wss://relay.damus.io/",
  58  	"wss://nos.lol/",
  59  	"wss://theforest.nostr1.com/",
  60  }
  61  
  62  // BloomFilter implements a memory-efficient bloom filter for event deduplication
  63  type BloomFilter struct {
  64  	bits     []byte
  65  	size     uint32
  66  	hashFunc int
  67  	mutex    sync.RWMutex
  68  }
  69  
  70  // NewBloomFilter creates a new bloom filter with specified parameters
  71  func NewBloomFilter(bits uint32, hashFuncs int) *BloomFilter {
  72  	return &BloomFilter{
  73  		bits:     make([]byte, (bits+7)/8), // Round up to nearest byte
  74  		size:     bits,
  75  		hashFunc: hashFuncs,
  76  	}
  77  }
  78  
  79  // hash generates multiple hash values for a given input using SHA256
  80  func (bf *BloomFilter) hash(data []byte) []uint32 {
  81  	hashes := make([]uint32, bf.hashFunc)
  82  
  83  	// Use SHA256 as base hash
  84  	baseHash := sha256.Sum256(data)
  85  
  86  	// Generate multiple hash values by combining with different salts
  87  	for i := 0; i < bf.hashFunc; i++ {
  88  		// Create salt by appending index
  89  		saltedData := make([]byte, len(baseHash)+4)
  90  		copy(saltedData, baseHash[:])
  91  		binary.LittleEndian.PutUint32(saltedData[len(baseHash):], uint32(i))
  92  
  93  		// Hash the salted data
  94  		h := sha256.Sum256(saltedData)
  95  
  96  		// Convert first 4 bytes to uint32 and mod by filter size
  97  		hashVal := binary.LittleEndian.Uint32(h[:4])
  98  		hashes[i] = hashVal % bf.size
  99  	}
 100  
 101  	return hashes
 102  }
 103  
 104  // Add adds an item to the bloom filter
 105  func (bf *BloomFilter) Add(data []byte) {
 106  	bf.mutex.Lock()
 107  	defer bf.mutex.Unlock()
 108  
 109  	hashes := bf.hash(data)
 110  	for _, h := range hashes {
 111  		byteIndex := h / 8
 112  		bitIndex := h % 8
 113  		bf.bits[byteIndex] |= 1 << bitIndex
 114  	}
 115  }
 116  
 117  // Contains checks if an item might be in the bloom filter
 118  func (bf *BloomFilter) Contains(data []byte) bool {
 119  	bf.mutex.RLock()
 120  	defer bf.mutex.RUnlock()
 121  
 122  	hashes := bf.hash(data)
 123  	for _, h := range hashes {
 124  		byteIndex := h / 8
 125  		bitIndex := h % 8
 126  		if bf.bits[byteIndex]&(1<<bitIndex) == 0 {
 127  			return false
 128  		}
 129  	}
 130  	return true
 131  }
 132  
 133  // EstimatedItems estimates the number of items added to the filter
 134  func (bf *BloomFilter) EstimatedItems() uint32 {
 135  	bf.mutex.RLock()
 136  	defer bf.mutex.RUnlock()
 137  
 138  	// Count set bits
 139  	setBits := uint32(0)
 140  	for _, b := range bf.bits {
 141  		for i := 0; i < 8; i++ {
 142  			if b&(1<<i) != 0 {
 143  				setBits++
 144  			}
 145  		}
 146  	}
 147  
 148  	// Estimate items using bloom filter formula: n ≈ -(m/k) * ln(1 - X/m)
 149  	// where m = filter size, k = hash functions, X = set bits
 150  	if setBits == 0 {
 151  		return 0
 152  	}
 153  
 154  	m := float64(bf.size)
 155  	k := float64(bf.hashFunc)
 156  	x := float64(setBits)
 157  
 158  	if x >= m {
 159  		return uint32(m / k) // Saturated filter
 160  	}
 161  
 162  	estimated := -(m / k) * math.Log(1-(x/m))
 163  	return uint32(estimated)
 164  }
 165  
 166  // MemoryUsage returns the memory usage in bytes
 167  func (bf *BloomFilter) MemoryUsage() int {
 168  	return len(bf.bits)
 169  }
 170  
 171  // ToBase64 serializes the bloom filter to a base64 encoded string
 172  func (bf *BloomFilter) ToBase64() string {
 173  	bf.mutex.RLock()
 174  	defer bf.mutex.RUnlock()
 175  
 176  	// Create a serialization format: [size:4][hashFunc:4][bits:variable]
 177  	serialized := make([]byte, 8+len(bf.bits))
 178  
 179  	// Write size (4 bytes)
 180  	binary.LittleEndian.PutUint32(serialized[0:4], bf.size)
 181  
 182  	// Write hash function count (4 bytes)
 183  	binary.LittleEndian.PutUint32(serialized[4:8], uint32(bf.hashFunc))
 184  
 185  	// Write bits data
 186  	copy(serialized[8:], bf.bits)
 187  
 188  	return base64.StdEncoding.EncodeToString(serialized)
 189  }
 190  
 191  // FromBase64 deserializes a bloom filter from a base64 encoded string
 192  func FromBase64(encoded string) (*BloomFilter, error) {
 193  	data, err := base64.StdEncoding.DecodeString(encoded)
 194  	if err != nil {
 195  		return nil, fmt.Errorf("failed to decode base64: %w", err)
 196  	}
 197  
 198  	if len(data) < 8 {
 199  		return nil, fmt.Errorf("invalid bloom filter data: too short")
 200  	}
 201  
 202  	// Read size (4 bytes)
 203  	size := binary.LittleEndian.Uint32(data[0:4])
 204  
 205  	// Read hash function count (4 bytes)
 206  	hashFunc := int(binary.LittleEndian.Uint32(data[4:8]))
 207  
 208  	// Read bits data
 209  	bits := make([]byte, len(data)-8)
 210  	copy(bits, data[8:])
 211  
 212  	// Validate that the bits length matches the expected size
 213  	expectedBytesLen := (size + 7) / 8
 214  	if uint32(len(bits)) != expectedBytesLen {
 215  		return nil, fmt.Errorf("invalid bloom filter data: bits length mismatch")
 216  	}
 217  
 218  	return &BloomFilter{
 219  		bits:     bits,
 220  		size:     size,
 221  		hashFunc: hashFunc,
 222  	}, nil
 223  }
 224  
 225  // RelayState tracks the state and rate limiting for each relay
 226  type RelayState struct {
 227  	url           string
 228  	retryCount    int
 229  	nextRetryTime time.Time
 230  	rateLimited   bool
 231  	completed     bool
 232  	mutex         sync.RWMutex
 233  }
 234  
 235  // TimeWindow represents a time range for progressive fetching
 236  type TimeWindow struct {
 237  	since *timestamp.T
 238  	until *timestamp.T
 239  }
 240  
 241  // CompletionTracker tracks which relay-time window combinations have been completed
 242  type CompletionTracker struct {
 243  	completed map[string]map[string]bool // relay -> timewindow -> completed
 244  	mutex     sync.RWMutex
 245  }
 246  
 247  func NewCompletionTracker() *CompletionTracker {
 248  	return &CompletionTracker{
 249  		completed: make(map[string]map[string]bool),
 250  	}
 251  }
 252  
 253  func (ct *CompletionTracker) MarkCompleted(relayURL string, timeWindow string) {
 254  	ct.mutex.Lock()
 255  	defer ct.mutex.Unlock()
 256  
 257  	if ct.completed[relayURL] == nil {
 258  		ct.completed[relayURL] = make(map[string]bool)
 259  	}
 260  	ct.completed[relayURL][timeWindow] = true
 261  }
 262  
 263  func (ct *CompletionTracker) IsCompleted(relayURL string, timeWindow string) bool {
 264  	ct.mutex.RLock()
 265  	defer ct.mutex.RUnlock()
 266  
 267  	if ct.completed[relayURL] == nil {
 268  		return false
 269  	}
 270  	return ct.completed[relayURL][timeWindow]
 271  }
 272  
 273  func (ct *CompletionTracker) GetCompletionStatus() (completed, total int) {
 274  	ct.mutex.RLock()
 275  	defer ct.mutex.RUnlock()
 276  
 277  	for _, windows := range ct.completed {
 278  		for _, isCompleted := range windows {
 279  			total++
 280  			if isCompleted {
 281  				completed++
 282  			}
 283  		}
 284  	}
 285  	return
 286  }
 287  
 288  type Aggregator struct {
 289  	npub              string
 290  	pubkeyBytes       []byte
 291  	seenEvents        *BloomFilter
 292  	seenRelays        map[string]bool
 293  	relayQueue        chan string
 294  	relayMutex        sync.RWMutex
 295  	ctx               context.Context
 296  	cancel            context.CancelFunc
 297  	since             *timestamp.T
 298  	until             *timestamp.T
 299  	wg                sync.WaitGroup
 300  	progressiveEnd    *timestamp.T
 301  	memoryTicker      *time.Ticker
 302  	eventCount        uint64
 303  	relayStates       map[string]*RelayState
 304  	relayStatesMutex  sync.RWMutex
 305  	completionTracker *CompletionTracker
 306  	timeWindows       []TimeWindow
 307  	// Track actual time range of processed events
 308  	actualSince *timestamp.T
 309  	actualUntil *timestamp.T
 310  	timeMutex   sync.RWMutex
 311  	// Bloom filter file for loading existing state
 312  	bloomFilterFile string
 313  	appendMode      bool
 314  	// Progress tracking for timeout detection
 315  	startTime        time.Time
 316  	lastProgress     int
 317  	lastProgressTime time.Time
 318  	progressMutex    sync.RWMutex
 319  	// Authentication support
 320  	signer        signer.I // Optional signer for relay authentication
 321  	hasPrivateKey bool     // Whether we have a private key for auth
 322  }
 323  
 324  func NewAggregator(keyInput string, since, until *timestamp.T, bloomFilterFile string) (agg *Aggregator, err error) {
 325  	var pubkeyBytes []byte
 326  	var signer signer.I
 327  	var hasPrivateKey bool
 328  
 329  	// Determine if input is nsec (private key) or npub (public key)
 330  	if strings.HasPrefix(keyInput, "nsec") {
 331  		// Handle nsec (private key) - derive pubkey and enable authentication
 332  		var secretBytes []byte
 333  		if secretBytes, err = bech32encoding.NsecToBytes(keyInput); chk.E(err) {
 334  			return nil, fmt.Errorf("failed to decode nsec: %w", err)
 335  		}
 336  
 337  		// Create signer from private key
 338  		var signerErr error
 339  		if signer, signerErr = p8k.New(); signerErr != nil {
 340  			return nil, fmt.Errorf("failed to create signer: %w", signerErr)
 341  		}
 342  		if err = signer.InitSec(secretBytes); chk.E(err) {
 343  			return nil, fmt.Errorf("failed to initialize signer: %w", err)
 344  		}
 345  
 346  		// Get public key from signer
 347  		pubkeyBytes = signer.Pub()
 348  		hasPrivateKey = true
 349  
 350  		log.I.F("using private key (nsec) - authentication enabled")
 351  	} else if strings.HasPrefix(keyInput, "npub") {
 352  		// Handle npub (public key only) - no authentication
 353  		if pubkeyBytes, err = bech32encoding.NpubToBytes(keyInput); chk.E(err) {
 354  			return nil, fmt.Errorf("failed to decode npub: %w", err)
 355  		}
 356  		hasPrivateKey = false
 357  
 358  		log.I.F("using public key (npub) - authentication disabled")
 359  	} else {
 360  		return nil, fmt.Errorf("key input must start with 'nsec' or 'npub', got: %s", keyInput[:4])
 361  	}
 362  
 363  	ctx, cancel := context.WithCancel(context.Background())
 364  
 365  	// Set progressive end to current time if until is not specified
 366  	progressiveEnd := until
 367  	if progressiveEnd == nil {
 368  		progressiveEnd = timestamp.Now()
 369  	}
 370  
 371  	// Initialize bloom filter - either new or loaded from file
 372  	var bloomFilter *BloomFilter
 373  	var appendMode bool
 374  
 375  	if bloomFilterFile != "" {
 376  		// Try to load existing bloom filter
 377  		if bloomFilter, err = loadBloomFilterFromFile(bloomFilterFile); err != nil {
 378  			log.W.F("failed to load bloom filter from %s: %v, creating new filter", bloomFilterFile, err)
 379  			bloomFilter = NewBloomFilter(bloomFilterBits, bloomFilterHashFuncs)
 380  		} else {
 381  			log.I.F("loaded existing bloom filter from %s", bloomFilterFile)
 382  			appendMode = true
 383  		}
 384  	} else {
 385  		bloomFilter = NewBloomFilter(bloomFilterBits, bloomFilterHashFuncs)
 386  	}
 387  
 388  	agg = &Aggregator{
 389  		npub:              keyInput,
 390  		pubkeyBytes:       pubkeyBytes,
 391  		seenEvents:        bloomFilter,
 392  		seenRelays:        make(map[string]bool),
 393  		relayQueue:        make(chan string, 100),
 394  		ctx:               ctx,
 395  		cancel:            cancel,
 396  		since:             since,
 397  		until:             until,
 398  		progressiveEnd:    progressiveEnd,
 399  		memoryTicker:      time.NewTicker(memoryCheckInterval),
 400  		eventCount:        0,
 401  		relayStates:       make(map[string]*RelayState),
 402  		completionTracker: NewCompletionTracker(),
 403  		bloomFilterFile:   bloomFilterFile,
 404  		appendMode:        appendMode,
 405  		startTime:         time.Now(),
 406  		lastProgress:      0,
 407  		lastProgressTime:  time.Now(),
 408  		signer:            signer,
 409  		hasPrivateKey:     hasPrivateKey,
 410  	}
 411  
 412  	// Calculate time windows for progressive fetching
 413  	agg.calculateTimeWindows()
 414  
 415  	// Add initial relays to queue
 416  	for _, relayURL := range relays {
 417  		agg.addRelay(relayURL)
 418  	}
 419  
 420  	return
 421  }
 422  
 423  // loadBloomFilterFromFile loads a bloom filter from a file containing base64 encoded data
 424  func loadBloomFilterFromFile(filename string) (*BloomFilter, error) {
 425  	data, err := os.ReadFile(filename)
 426  	if err != nil {
 427  		return nil, fmt.Errorf("failed to read file: %w", err)
 428  	}
 429  
 430  	// Find the base64 data between the markers
 431  	content := string(data)
 432  	startMarker := "Bloom filter (base64):\n"
 433  	endMarker := "\n=== END BLOOM FILTER ==="
 434  
 435  	startIdx := strings.Index(content, startMarker)
 436  	if startIdx == -1 {
 437  		return nil, fmt.Errorf("bloom filter start marker not found")
 438  	}
 439  	startIdx += len(startMarker)
 440  
 441  	endIdx := strings.Index(content[startIdx:], endMarker)
 442  	if endIdx == -1 {
 443  		return nil, fmt.Errorf("bloom filter end marker not found")
 444  	}
 445  
 446  	base64Data := strings.TrimSpace(content[startIdx : startIdx+endIdx])
 447  	return FromBase64(base64Data)
 448  }
 449  
 450  // updateActualTimeRange updates the actual time range of processed events
 451  func (a *Aggregator) updateActualTimeRange(eventTime *timestamp.T) {
 452  	a.timeMutex.Lock()
 453  	defer a.timeMutex.Unlock()
 454  
 455  	if a.actualSince == nil || eventTime.I64() < a.actualSince.I64() {
 456  		a.actualSince = eventTime
 457  	}
 458  
 459  	if a.actualUntil == nil || eventTime.I64() > a.actualUntil.I64() {
 460  		a.actualUntil = eventTime
 461  	}
 462  }
 463  
 464  // getActualTimeRange returns the actual time range of processed events
 465  func (a *Aggregator) getActualTimeRange() (since, until *timestamp.T) {
 466  	a.timeMutex.RLock()
 467  	defer a.timeMutex.RUnlock()
 468  	return a.actualSince, a.actualUntil
 469  }
 470  
 471  // calculateTimeWindows pre-calculates all time windows for progressive fetching
 472  func (a *Aggregator) calculateTimeWindows() {
 473  	if a.since == nil {
 474  		// If no since time, we'll just work backwards from progressiveEnd
 475  		// We can't pre-calculate windows without a start time
 476  		return
 477  	}
 478  
 479  	var windows []TimeWindow
 480  	currentUntil := a.progressiveEnd
 481  
 482  	for currentUntil.I64() > a.since.I64() {
 483  		currentSince := timestamp.FromUnix(currentUntil.I64() - int64(batchSize.Seconds()))
 484  		if currentSince.I64() < a.since.I64() {
 485  			currentSince = a.since
 486  		}
 487  
 488  		windows = append(windows, TimeWindow{
 489  			since: currentSince,
 490  			until: currentUntil,
 491  		})
 492  
 493  		currentUntil = currentSince
 494  		if currentUntil.I64() <= a.since.I64() {
 495  			break
 496  		}
 497  	}
 498  
 499  	a.timeWindows = windows
 500  	log.I.F("calculated %d time windows for progressive fetching", len(windows))
 501  }
 502  
 503  // getOrCreateRelayState gets or creates a relay state for rate limiting
 504  func (a *Aggregator) getOrCreateRelayState(relayURL string) *RelayState {
 505  	a.relayStatesMutex.Lock()
 506  	defer a.relayStatesMutex.Unlock()
 507  
 508  	if state, exists := a.relayStates[relayURL]; exists {
 509  		return state
 510  	}
 511  
 512  	state := &RelayState{
 513  		url:           relayURL,
 514  		retryCount:    0,
 515  		nextRetryTime: time.Now(),
 516  		rateLimited:   false,
 517  		completed:     false,
 518  	}
 519  	a.relayStates[relayURL] = state
 520  	return state
 521  }
 522  
 523  // shouldRetryRelay checks if a relay should be retried based on rate limiting
 524  func (a *Aggregator) shouldRetryRelay(relayURL string) bool {
 525  	state := a.getOrCreateRelayState(relayURL)
 526  	state.mutex.RLock()
 527  	defer state.mutex.RUnlock()
 528  
 529  	if state.completed {
 530  		return false
 531  	}
 532  
 533  	if state.rateLimited && time.Now().Before(state.nextRetryTime) {
 534  		return false
 535  	}
 536  
 537  	return state.retryCount < maxRetries
 538  }
 539  
 540  // markRelayRateLimited marks a relay as rate limited and sets retry time
 541  func (a *Aggregator) markRelayRateLimited(relayURL string) {
 542  	state := a.getOrCreateRelayState(relayURL)
 543  	state.mutex.Lock()
 544  	defer state.mutex.Unlock()
 545  
 546  	state.rateLimited = true
 547  	state.retryCount++
 548  
 549  	if state.retryCount >= maxRetries {
 550  		log.W.F("relay %s permanently failed after %d retries", relayURL, maxRetries)
 551  		state.completed = true // Mark as completed to exclude from future attempts
 552  		return
 553  	}
 554  
 555  	// Exponential backoff with jitter
 556  	delay := time.Duration(float64(baseRetryDelay) * math.Pow(2, float64(state.retryCount-1)))
 557  	if delay > maxRetryDelay {
 558  		delay = maxRetryDelay
 559  	}
 560  
 561  	state.nextRetryTime = time.Now().Add(delay)
 562  	log.W.F("relay %s rate limited, retry %d/%d in %v", relayURL, state.retryCount, maxRetries, delay)
 563  }
 564  
 565  // markRelayCompleted marks a relay as completed for all time windows
 566  func (a *Aggregator) markRelayCompleted(relayURL string) {
 567  	state := a.getOrCreateRelayState(relayURL)
 568  	state.mutex.Lock()
 569  	defer state.mutex.Unlock()
 570  
 571  	state.completed = true
 572  	log.I.F("relay %s marked as completed", relayURL)
 573  }
 574  
 575  // checkAllCompleted checks if all relay-time window combinations are completed
 576  func (a *Aggregator) checkAllCompleted() bool {
 577  	if len(a.timeWindows) == 0 {
 578  		// If no time windows calculated, we can't determine completion
 579  		return false
 580  	}
 581  
 582  	a.relayStatesMutex.RLock()
 583  	allRelays := make([]string, 0, len(a.relayStates))
 584  	for relayURL := range a.relayStates {
 585  		allRelays = append(allRelays, relayURL)
 586  	}
 587  	a.relayStatesMutex.RUnlock()
 588  
 589  	// Check if all relay-time window combinations are completed
 590  	totalCombinations := len(allRelays) * len(a.timeWindows)
 591  	completedCombinations := 0
 592  	availableCombinations := 0 // Combinations from relays that haven't permanently failed
 593  
 594  	for _, relayURL := range allRelays {
 595  		state := a.getOrCreateRelayState(relayURL)
 596  		state.mutex.RLock()
 597  		isRelayFailed := state.retryCount >= maxRetries
 598  		state.mutex.RUnlock()
 599  
 600  		for _, window := range a.timeWindows {
 601  			windowKey := fmt.Sprintf("%d-%d", window.since.I64(), window.until.I64())
 602  			if a.completionTracker.IsCompleted(relayURL, windowKey) {
 603  				completedCombinations++
 604  			}
 605  
 606  			// Only count combinations from relays that haven't permanently failed
 607  			if !isRelayFailed {
 608  				availableCombinations++
 609  			}
 610  		}
 611  	}
 612  
 613  	// Update progress tracking
 614  	a.progressMutex.Lock()
 615  	if completedCombinations > a.lastProgress {
 616  		a.lastProgress = completedCombinations
 617  		a.lastProgressTime = time.Now()
 618  	}
 619  	a.progressMutex.Unlock()
 620  
 621  	if totalCombinations > 0 {
 622  		progress := float64(completedCombinations) / float64(totalCombinations) * 100
 623  		log.I.F("completion progress: %d/%d (%.1f%%) - available: %d", completedCombinations, totalCombinations, progress, availableCombinations)
 624  
 625  		// Consider complete if we've finished all available combinations (excluding permanently failed relays)
 626  		if availableCombinations > 0 {
 627  			return completedCombinations >= availableCombinations
 628  		}
 629  		return completedCombinations == totalCombinations
 630  	}
 631  
 632  	return false
 633  }
 634  
 635  func (a *Aggregator) isEventSeen(eventID string) (seen bool) {
 636  	return a.seenEvents.Contains([]byte(eventID))
 637  }
 638  
 639  func (a *Aggregator) markEventSeen(eventID string) {
 640  	a.seenEvents.Add([]byte(eventID))
 641  	a.eventCount++
 642  }
 643  
 644  func (a *Aggregator) addRelay(relayURL string) {
 645  	a.relayMutex.Lock()
 646  	defer a.relayMutex.Unlock()
 647  
 648  	if !a.seenRelays[relayURL] {
 649  		a.seenRelays[relayURL] = true
 650  		select {
 651  		case a.relayQueue <- relayURL:
 652  			log.I.F("added new relay to queue: %s", relayURL)
 653  		default:
 654  			log.W.F("relay queue full, skipping: %s", relayURL)
 655  		}
 656  	}
 657  }
 658  
 659  func (a *Aggregator) processRelayListEvent(ev *event.E) {
 660  	// Extract relay URLs from "r" tags in kind 10002 events
 661  	if ev.Kind != 10002 { // RelayListMetadata
 662  		return
 663  	}
 664  
 665  	log.I.F("processing relay list event from %s", hex.Enc(ev.Pubkey))
 666  
 667  	for _, tag := range ev.Tags.GetAll([]byte("r")) {
 668  		if len(tag.T) >= 2 {
 669  			relayURL := string(tag.T[1])
 670  			if relayURL != "" {
 671  				log.I.F("discovered relay from relay list: %s", relayURL)
 672  				a.addRelay(relayURL)
 673  			}
 674  		}
 675  	}
 676  }
 677  
 678  func (a *Aggregator) outputEvent(ev *event.E) (err error) {
 679  	// Convert event to JSON and output to stdout
 680  	var jsonBytes []byte
 681  	if jsonBytes, err = json.Marshal(map[string]interface{}{
 682  		"id":         hex.Enc(ev.ID),
 683  		"pubkey":     hex.Enc(ev.Pubkey),
 684  		"created_at": ev.CreatedAt,
 685  		"kind":       ev.Kind,
 686  		"tags":       ev.Tags,
 687  		"content":    string(ev.Content),
 688  		"sig":        hex.Enc(ev.Sig),
 689  	}); chk.E(err) {
 690  		return fmt.Errorf("failed to marshal event to JSON: %w", err)
 691  	}
 692  
 693  	fmt.Println(string(jsonBytes))
 694  	return
 695  }
 696  
 697  func (a *Aggregator) connectToRelay(relayURL string) {
 698  	defer func() {
 699  		log.I.F("relay connection finished: %s", relayURL)
 700  		a.wg.Done()
 701  	}()
 702  
 703  	log.I.F("connecting to relay: %s", relayURL)
 704  
 705  	// Create context with timeout for connection
 706  	connCtx, connCancel := context.WithTimeout(a.ctx, 10*time.Second)
 707  	defer connCancel()
 708  
 709  	// Connect to relay
 710  	var client *ws.Client
 711  	var err error
 712  	if client, err = ws.RelayConnect(connCtx, relayURL); chk.E(err) {
 713  		log.E.F("failed to connect to relay %s: %v", relayURL, err)
 714  		return
 715  	}
 716  	defer client.Close()
 717  
 718  	log.I.F("connected to relay: %s", relayURL)
 719  
 720  	// Attempt authentication if we have a private key
 721  	if a.hasPrivateKey && a.signer != nil {
 722  		authCtx, authCancel := context.WithTimeout(a.ctx, 5*time.Second)
 723  		defer authCancel()
 724  
 725  		if err = client.Auth(authCtx, a.signer); err != nil {
 726  			log.W.F("authentication failed for relay %s: %v", relayURL, err)
 727  			// Continue without authentication - some relays may not require it
 728  		} else {
 729  			log.I.F("successfully authenticated to relay: %s", relayURL)
 730  		}
 731  	}
 732  
 733  	// Perform progressive backward fetching
 734  	a.progressiveFetch(client, relayURL)
 735  }
 736  
 737  func (a *Aggregator) progressiveFetch(client *ws.Client, relayURL string) {
 738  	// Check if relay should be retried
 739  	if !a.shouldRetryRelay(relayURL) {
 740  		log.W.F("skipping relay %s due to rate limiting or max retries", relayURL)
 741  		return
 742  	}
 743  
 744  	// Create hex-encoded pubkey for p-tags
 745  	pubkeyHex := hex.Enc(a.pubkeyBytes)
 746  
 747  	// Use pre-calculated time windows if available, otherwise calculate on the fly
 748  	var windows []TimeWindow
 749  	if len(a.timeWindows) > 0 {
 750  		windows = a.timeWindows
 751  	} else {
 752  		// Fallback to dynamic calculation for unlimited time ranges
 753  		currentUntil := a.progressiveEnd
 754  		for {
 755  			currentSince := timestamp.FromUnix(currentUntil.I64() - int64(batchSize.Seconds()))
 756  			if a.since != nil && currentSince.I64() < a.since.I64() {
 757  				currentSince = a.since
 758  			}
 759  
 760  			windows = append(windows, TimeWindow{
 761  				since: currentSince,
 762  				until: currentUntil,
 763  			})
 764  
 765  			currentUntil = currentSince
 766  			if a.since != nil && currentUntil.I64() <= a.since.I64() {
 767  				break
 768  			}
 769  
 770  			// Prevent infinite loops for unlimited ranges
 771  			if len(windows) > 1000 {
 772  				log.W.F("limiting to 1000 time windows for relay %s", relayURL)
 773  				break
 774  			}
 775  		}
 776  	}
 777  
 778  	// Process each time window
 779  	for _, window := range windows {
 780  		windowKey := fmt.Sprintf("%d-%d", window.since.I64(), window.until.I64())
 781  
 782  		// Skip if already completed
 783  		if a.completionTracker.IsCompleted(relayURL, windowKey) {
 784  			continue
 785  		}
 786  
 787  		select {
 788  		case <-a.ctx.Done():
 789  			log.I.F("context cancelled, stopping progressive fetch for relay %s", relayURL)
 790  			return
 791  		default:
 792  		}
 793  
 794  		log.I.F("fetching batch from %s: %d to %d", relayURL, window.since.I64(), window.until.I64())
 795  
 796  		// Try to fetch this time window with retry logic
 797  		success := a.fetchTimeWindow(client, relayURL, window, pubkeyHex)
 798  
 799  		if success {
 800  			// Mark this time window as completed for this relay
 801  			a.completionTracker.MarkCompleted(relayURL, windowKey)
 802  		} else {
 803  			// If fetch failed, mark relay as rate limited and return
 804  			a.markRelayRateLimited(relayURL)
 805  			return
 806  		}
 807  	}
 808  
 809  	// Mark relay as completed for all time windows
 810  	a.markRelayCompleted(relayURL)
 811  	log.I.F("completed all time windows for relay %s", relayURL)
 812  }
 813  
 814  func (a *Aggregator) fetchTimeWindow(client *ws.Client, relayURL string, window TimeWindow, pubkeyHex string) bool {
 815  	// Create filters for this time batch
 816  	f1 := &filter.F{
 817  		Authors: tag.NewFromBytesSlice(a.pubkeyBytes),
 818  		Since:   window.since,
 819  		Until:   window.until,
 820  	}
 821  
 822  	f2 := &filter.F{
 823  		Tags:  tag.NewSWithCap(1),
 824  		Since: window.since,
 825  		Until: window.until,
 826  	}
 827  	pTag := tag.NewFromAny("p", pubkeyHex)
 828  	f2.Tags.Append(pTag)
 829  
 830  	// Add relay list filter to discover new relays
 831  	f3 := &filter.F{
 832  		Authors: tag.NewFromBytesSlice(a.pubkeyBytes),
 833  		Kinds:   kind.NewS(kind.New(10002)), // RelayListMetadata
 834  		Since:   window.since,
 835  		Until:   window.until,
 836  	}
 837  
 838  	// Subscribe to events using all filters with a dedicated context and timeout
 839  	// Use a longer timeout to avoid premature cancellation by completion monitor
 840  	subCtx, subCancel := context.WithTimeout(context.Background(), 10*time.Minute)
 841  
 842  	var sub *ws.Subscription
 843  	var err error
 844  	if sub, err = client.Subscribe(subCtx, filter.NewS(f1, f2, f3)); chk.E(err) {
 845  		subCancel() // Cancel context on error
 846  		log.E.F("failed to subscribe to relay %s: %v", relayURL, err)
 847  		return false
 848  	}
 849  
 850  	// Ensure subscription is cleaned up when we're done
 851  	defer func() {
 852  		sub.Unsub()
 853  		subCancel()
 854  	}()
 855  
 856  	log.I.F("subscribed to batch from %s for pubkey %s (authored by, mentioning, and relay lists)", relayURL, a.npub)
 857  
 858  	// Process events for this batch
 859  	batchComplete := false
 860  	rateLimited := false
 861  
 862  	for !batchComplete && !rateLimited {
 863  		select {
 864  		case <-a.ctx.Done():
 865  			log.I.F("aggregator context cancelled, stopping batch for relay %s", relayURL)
 866  			return false
 867  		case <-subCtx.Done():
 868  			log.W.F("subscription timeout for relay %s", relayURL)
 869  			return false
 870  		case ev := <-sub.Events:
 871  			if ev == nil {
 872  				log.I.F("event channel closed for relay %s", relayURL)
 873  				return false
 874  			}
 875  
 876  			eventID := hex.Enc(ev.ID)
 877  
 878  			// Check if we've already seen this event
 879  			if a.isEventSeen(eventID) {
 880  				continue
 881  			}
 882  
 883  			// Mark event as seen
 884  			a.markEventSeen(eventID)
 885  
 886  			// Update actual time range
 887  			a.updateActualTimeRange(timestamp.FromUnix(ev.CreatedAt))
 888  
 889  			// Process relay list events to discover new relays
 890  			if ev.Kind == 10002 {
 891  				a.processRelayListEvent(ev)
 892  			}
 893  
 894  			// Output event to stdout
 895  			if err = a.outputEvent(ev); chk.E(err) {
 896  				log.E.F("failed to output event: %v", err)
 897  			}
 898  		case <-sub.EndOfStoredEvents:
 899  			log.I.F("end of stored events for batch on relay %s", relayURL)
 900  			batchComplete = true
 901  		case reason := <-sub.ClosedReason:
 902  			reasonStr := string(reason)
 903  			log.W.F("subscription closed for relay %s: %s", relayURL, reasonStr)
 904  
 905  			// Check for rate limiting messages
 906  			if a.isRateLimitMessage(reasonStr) {
 907  				log.W.F("detected rate limiting from relay %s", relayURL)
 908  				rateLimited = true
 909  			}
 910  
 911  			sub.Unsub()
 912  			return !rateLimited
 913  			// Note: NOTICE messages are handled at the client level, not subscription level
 914  			// Rate limiting detection will primarily rely on CLOSED messages
 915  		}
 916  	}
 917  
 918  	sub.Unsub()
 919  	return !rateLimited
 920  }
 921  
 922  // isRateLimitMessage checks if a message indicates rate limiting
 923  func (a *Aggregator) isRateLimitMessage(message string) bool {
 924  	message = strings.ToLower(message)
 925  	rateLimitIndicators := []string{
 926  		"too many",
 927  		"rate limit",
 928  		"slow down",
 929  		"concurrent req",
 930  		"throttle",
 931  		"backoff",
 932  	}
 933  
 934  	for _, indicator := range rateLimitIndicators {
 935  		if strings.Contains(message, indicator) {
 936  			return true
 937  		}
 938  	}
 939  	return false
 940  }
 941  
 942  func (a *Aggregator) Start() (err error) {
 943  	log.I.F("starting aggregator for key: %s", a.npub)
 944  	log.I.F("pubkey bytes: %s", hex.Enc(a.pubkeyBytes))
 945  	log.I.F("bloom filter: %d bits (%.2fMB), %d hash functions, ~0.1%% false positive rate",
 946  		bloomFilterBits, float64(a.seenEvents.MemoryUsage())/1024/1024, bloomFilterHashFuncs)
 947  
 948  	// Start memory monitoring goroutine
 949  	go a.memoryMonitor()
 950  
 951  	// Start relay processor goroutine
 952  	go a.processRelayQueue()
 953  
 954  	// Start completion monitoring goroutine
 955  	go a.completionMonitor()
 956  
 957  	// Add initial relay count to wait group
 958  	a.wg.Add(len(relays))
 959  	log.I.F("waiting for %d initial relay connections to complete", len(relays))
 960  
 961  	// Wait for all relay connections to finish OR completion
 962  	done := make(chan struct{})
 963  	go func() {
 964  		a.wg.Wait()
 965  		close(done)
 966  	}()
 967  
 968  	select {
 969  	case <-done:
 970  		log.I.F("all relay connections completed")
 971  	case <-a.ctx.Done():
 972  		log.I.F("aggregator terminated due to completion")
 973  	}
 974  
 975  	// Stop memory monitoring
 976  	a.memoryTicker.Stop()
 977  
 978  	// Output bloom filter summary
 979  	a.outputBloomFilter()
 980  
 981  	log.I.F("aggregator finished")
 982  	return
 983  }
 984  
 985  // completionMonitor periodically checks if all work is completed and terminates if so
 986  func (a *Aggregator) completionMonitor() {
 987  	ticker := time.NewTicker(10 * time.Second) // Check every 10 seconds
 988  	defer ticker.Stop()
 989  
 990  	for {
 991  		select {
 992  		case <-a.ctx.Done():
 993  			return
 994  		case <-ticker.C:
 995  			// Check for various termination conditions
 996  			if a.shouldTerminate() {
 997  				return
 998  			}
 999  
1000  			// Also check for rate-limited relays that can be retried
1001  			a.retryRateLimitedRelays()
1002  		}
1003  	}
1004  }
1005  
1006  // shouldTerminate checks various conditions that should cause the aggregator to terminate
1007  func (a *Aggregator) shouldTerminate() bool {
1008  	now := time.Now()
1009  
1010  	// Check if all work is completed
1011  	if a.checkAllCompleted() {
1012  		log.I.F("all relay-time window combinations completed, terminating aggregator")
1013  		a.cancel()
1014  		return true
1015  	}
1016  
1017  	// Check for maximum runtime timeout
1018  	if now.Sub(a.startTime) > maxRunTime {
1019  		log.W.F("maximum runtime (%v) exceeded, terminating aggregator", maxRunTime)
1020  		a.cancel()
1021  		return true
1022  	}
1023  
1024  	// Check for stuck progress timeout
1025  	a.progressMutex.RLock()
1026  	timeSinceProgress := now.Sub(a.lastProgressTime)
1027  	a.progressMutex.RUnlock()
1028  
1029  	if timeSinceProgress > stuckProgressTimeout {
1030  		log.W.F("no progress made for %v, terminating aggregator", timeSinceProgress)
1031  		a.cancel()
1032  		return true
1033  	}
1034  
1035  	return false
1036  }
1037  
1038  // retryRateLimitedRelays checks for rate-limited relays that can be retried
1039  func (a *Aggregator) retryRateLimitedRelays() {
1040  	a.relayStatesMutex.RLock()
1041  	defer a.relayStatesMutex.RUnlock()
1042  
1043  	for relayURL, state := range a.relayStates {
1044  		state.mutex.RLock()
1045  		canRetry := state.rateLimited &&
1046  			time.Now().After(state.nextRetryTime) &&
1047  			state.retryCount < maxRetries &&
1048  			!state.completed
1049  		state.mutex.RUnlock()
1050  
1051  		if canRetry {
1052  			log.I.F("retrying rate-limited relay: %s", relayURL)
1053  
1054  			// Reset rate limiting status
1055  			state.mutex.Lock()
1056  			state.rateLimited = false
1057  			state.mutex.Unlock()
1058  
1059  			// Add back to queue for retry
1060  			select {
1061  			case a.relayQueue <- relayURL:
1062  				a.wg.Add(1)
1063  			default:
1064  				log.W.F("relay queue full, skipping retry for %s", relayURL)
1065  			}
1066  		}
1067  	}
1068  }
1069  
1070  func (a *Aggregator) processRelayQueue() {
1071  	initialRelayCount := len(relays)
1072  	processedInitial := 0
1073  
1074  	for {
1075  		select {
1076  		case <-a.ctx.Done():
1077  			log.I.F("relay queue processor stopping")
1078  			return
1079  		case relayURL := <-a.relayQueue:
1080  			log.I.F("processing relay from queue: %s", relayURL)
1081  
1082  			// For dynamically discovered relays (after initial ones), add to wait group
1083  			if processedInitial >= initialRelayCount {
1084  				a.wg.Add(1)
1085  			} else {
1086  				processedInitial++
1087  			}
1088  
1089  			go a.connectToRelay(relayURL)
1090  		}
1091  	}
1092  }
1093  
1094  func (a *Aggregator) Stop() {
1095  	a.cancel()
1096  	if a.memoryTicker != nil {
1097  		a.memoryTicker.Stop()
1098  	}
1099  }
1100  
1101  // outputBloomFilter outputs the bloom filter as base64 to stderr with statistics
1102  func (a *Aggregator) outputBloomFilter() {
1103  	base64Filter := a.seenEvents.ToBase64()
1104  	estimatedEvents := a.seenEvents.EstimatedItems()
1105  	memoryUsage := float64(a.seenEvents.MemoryUsage()) / 1024 / 1024
1106  
1107  	// Get actual time range of processed events
1108  	actualSince, actualUntil := a.getActualTimeRange()
1109  
1110  	// Output to stderr so it doesn't interfere with JSONL event output to stdout
1111  	fmt.Fprintf(os.Stderr, "\n=== BLOOM FILTER SUMMARY ===\n")
1112  	fmt.Fprintf(os.Stderr, "Events processed: %d\n", a.eventCount)
1113  	fmt.Fprintf(os.Stderr, "Estimated unique events: %d\n", estimatedEvents)
1114  	fmt.Fprintf(os.Stderr, "Bloom filter size: %.2f MB\n", memoryUsage)
1115  	fmt.Fprintf(os.Stderr, "False positive rate: ~0.1%%\n")
1116  	fmt.Fprintf(os.Stderr, "Hash functions: %d\n", bloomFilterHashFuncs)
1117  
1118  	// Output time range information
1119  	if actualSince != nil && actualUntil != nil {
1120  		fmt.Fprintf(os.Stderr, "Time range covered: %d to %d\n", actualSince.I64(), actualUntil.I64())
1121  		fmt.Fprintf(os.Stderr, "Time range (human): %s to %s\n",
1122  			time.Unix(actualSince.I64(), 0).UTC().Format(time.RFC3339),
1123  			time.Unix(actualUntil.I64(), 0).UTC().Format(time.RFC3339))
1124  	} else if a.since != nil && a.until != nil {
1125  		// Fallback to requested range if no events were processed
1126  		fmt.Fprintf(os.Stderr, "Requested time range: %d to %d\n", a.since.I64(), a.until.I64())
1127  		fmt.Fprintf(os.Stderr, "Requested range (human): %s to %s\n",
1128  			time.Unix(a.since.I64(), 0).UTC().Format(time.RFC3339),
1129  			time.Unix(a.until.I64(), 0).UTC().Format(time.RFC3339))
1130  	} else {
1131  		fmt.Fprintf(os.Stderr, "Time range: unbounded\n")
1132  	}
1133  
1134  	fmt.Fprintf(os.Stderr, "\nBloom filter (base64):\n%s\n", base64Filter)
1135  	fmt.Fprintf(os.Stderr, "=== END BLOOM FILTER ===\n")
1136  }
1137  
1138  // getMemoryUsageMB returns current memory usage in MB
1139  func (a *Aggregator) getMemoryUsageMB() float64 {
1140  	var m runtime.MemStats
1141  	runtime.ReadMemStats(&m)
1142  	return float64(m.Alloc) / 1024 / 1024
1143  }
1144  
1145  // memoryMonitor monitors memory usage and logs statistics
1146  func (a *Aggregator) memoryMonitor() {
1147  	for {
1148  		select {
1149  		case <-a.ctx.Done():
1150  			log.I.F("memory monitor stopping")
1151  			return
1152  		case <-a.memoryTicker.C:
1153  			memUsage := a.getMemoryUsageMB()
1154  			bloomMemMB := float64(a.seenEvents.MemoryUsage()) / 1024 / 1024
1155  			estimatedEvents := a.seenEvents.EstimatedItems()
1156  
1157  			log.I.F("memory stats: total=%.2fMB, bloom=%.2fMB, events=%d, estimated_events=%d",
1158  				memUsage, bloomMemMB, a.eventCount, estimatedEvents)
1159  
1160  			// Check if we're approaching memory limits
1161  			if memUsage > maxMemoryMB {
1162  				log.W.F("high memory usage detected: %.2fMB (limit: %dMB)", memUsage, maxMemoryMB)
1163  
1164  				// Force garbage collection
1165  				runtime.GC()
1166  
1167  				// Check again after GC
1168  				newMemUsage := a.getMemoryUsageMB()
1169  				log.I.F("memory usage after GC: %.2fMB", newMemUsage)
1170  
1171  				// If still too high, warn but continue (bloom filter has fixed size)
1172  				if newMemUsage > maxMemoryMB*1.2 {
1173  					log.E.F("critical memory usage: %.2fMB, but continuing with bloom filter", newMemUsage)
1174  				}
1175  			}
1176  		}
1177  	}
1178  }
1179  
1180  func parseTimestamp(s string) (ts *timestamp.T, err error) {
1181  	if s == "" {
1182  		return nil, nil
1183  	}
1184  
1185  	var t int64
1186  	if t, err = strconv.ParseInt(s, 10, 64); chk.E(err) {
1187  		return nil, fmt.Errorf("invalid timestamp format: %w", err)
1188  	}
1189  
1190  	ts = timestamp.FromUnix(t)
1191  	return
1192  }
1193  
1194  func main() {
1195  	var keyInput string
1196  	var sinceStr string
1197  	var untilStr string
1198  	var bloomFilterFile string
1199  	var outputFile string
1200  
1201  	flag.StringVar(&keyInput, "key", "", "nsec (private key) or npub (public key) to search for events")
1202  	flag.StringVar(&sinceStr, "since", "", "start timestamp (Unix timestamp) - only events after this time")
1203  	flag.StringVar(&untilStr, "until", "", "end timestamp (Unix timestamp) - only events before this time")
1204  	flag.StringVar(&bloomFilterFile, "filter", "", "file containing base64 encoded bloom filter to exclude already seen events")
1205  	flag.StringVar(&outputFile, "output", "", "output file for events (default: stdout)")
1206  	flag.Parse()
1207  
1208  	if keyInput == "" {
1209  		fmt.Fprintf(os.Stderr, "Usage: %s -key <nsec|npub> [-since <timestamp>] [-until <timestamp>] [-filter <file>] [-output <file>]\n", os.Args[0])
1210  		fmt.Fprintf(os.Stderr, "Example: %s -key npub1... -since 1640995200 -until 1672531200 -filter bloom.txt -output events.jsonl\n", os.Args[0])
1211  		fmt.Fprintf(os.Stderr, "Example: %s -key nsec1... -since 1640995200 -until 1672531200 -output events.jsonl\n", os.Args[0])
1212  		fmt.Fprintf(os.Stderr, "\nKey types:\n")
1213  		fmt.Fprintf(os.Stderr, "  nsec: Private key (enables authentication to relays that require it)\n")
1214  		fmt.Fprintf(os.Stderr, "  npub: Public key (authentication disabled)\n")
1215  		fmt.Fprintf(os.Stderr, "\nTimestamps should be Unix timestamps (seconds since epoch)\n")
1216  		fmt.Fprintf(os.Stderr, "If -filter is provided, output will be appended to the output file\n")
1217  		os.Exit(1)
1218  	}
1219  
1220  	var since, until *timestamp.T
1221  	var err error
1222  
1223  	if since, err = parseTimestamp(sinceStr); chk.E(err) {
1224  		fmt.Fprintf(os.Stderr, "Error parsing since timestamp: %v\n", err)
1225  		os.Exit(1)
1226  	}
1227  
1228  	if until, err = parseTimestamp(untilStr); chk.E(err) {
1229  		fmt.Fprintf(os.Stderr, "Error parsing until timestamp: %v\n", err)
1230  		os.Exit(1)
1231  	}
1232  
1233  	// Validate that since is before until if both are provided
1234  	if since != nil && until != nil && since.I64() >= until.I64() {
1235  		fmt.Fprintf(os.Stderr, "Error: since timestamp must be before until timestamp\n")
1236  		os.Exit(1)
1237  	}
1238  
1239  	// Set up output redirection if needed
1240  	if outputFile != "" {
1241  		var file *os.File
1242  		if bloomFilterFile != "" {
1243  			// Append mode if bloom filter is provided
1244  			file, err = os.OpenFile(outputFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
1245  		} else {
1246  			// Truncate mode if no bloom filter
1247  			file, err = os.OpenFile(outputFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
1248  		}
1249  		if err != nil {
1250  			fmt.Fprintf(os.Stderr, "Error opening output file: %v\n", err)
1251  			os.Exit(1)
1252  		}
1253  		defer file.Close()
1254  
1255  		// Redirect stdout to file
1256  		os.Stdout = file
1257  	}
1258  
1259  	var agg *Aggregator
1260  	if agg, err = NewAggregator(keyInput, since, until, bloomFilterFile); chk.E(err) {
1261  		fmt.Fprintf(os.Stderr, "Error creating aggregator: %v\n", err)
1262  		os.Exit(1)
1263  	}
1264  
1265  	if err = agg.Start(); chk.E(err) {
1266  		fmt.Fprintf(os.Stderr, "Error running aggregator: %v\n", err)
1267  		os.Exit(1)
1268  	}
1269  }
1270