archive.go raw

   1  // Package archive provides query augmentation from authoritative archive relays.
   2  // It manages connections to archive relays and fetches events that match local
   3  // queries, caching them locally for future access.
   4  package archive
   5  
   6  import (
   7  	"context"
   8  	"sync"
   9  	"time"
  10  
  11  	"next.orly.dev/pkg/lol/log"
  12  
  13  	"next.orly.dev/pkg/nostr/encoders/event"
  14  	"next.orly.dev/pkg/nostr/encoders/filter"
  15  )
  16  
  17  // ArchiveDatabase defines the interface for storing fetched events.
  18  type ArchiveDatabase interface {
  19  	SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error)
  20  }
  21  
  22  // EventDeliveryChannel defines the interface for streaming results back to clients.
  23  type EventDeliveryChannel interface {
  24  	SendEvent(ev *event.E) error
  25  	IsConnected() bool
  26  }
  27  
  28  // Manager handles connections to archive relays for query augmentation.
  29  type Manager struct {
  30  	ctx    context.Context
  31  	cancel context.CancelFunc
  32  
  33  	relays     []string
  34  	timeout    time.Duration
  35  	db         ArchiveDatabase
  36  	queryCache *QueryCache
  37  
  38  	// Connection pool
  39  	mu          sync.RWMutex
  40  	connections map[string]*RelayConnection
  41  
  42  	// Configuration
  43  	enabled bool
  44  }
  45  
  46  // Config holds the configuration for the archive manager.
  47  type Config struct {
  48  	Enabled     bool
  49  	Relays      []string
  50  	TimeoutSec  int
  51  	CacheTTLHrs int
  52  }
  53  
  54  // New creates a new archive manager.
  55  func New(ctx context.Context, db ArchiveDatabase, cfg Config) *Manager {
  56  	if !cfg.Enabled || len(cfg.Relays) == 0 {
  57  		return &Manager{enabled: false}
  58  	}
  59  
  60  	mgrCtx, cancel := context.WithCancel(ctx)
  61  
  62  	timeout := time.Duration(cfg.TimeoutSec) * time.Second
  63  	if timeout <= 0 {
  64  		timeout = 30 * time.Second
  65  	}
  66  
  67  	cacheTTL := time.Duration(cfg.CacheTTLHrs) * time.Hour
  68  	if cacheTTL <= 0 {
  69  		cacheTTL = 24 * time.Hour
  70  	}
  71  
  72  	m := &Manager{
  73  		ctx:         mgrCtx,
  74  		cancel:      cancel,
  75  		relays:      cfg.Relays,
  76  		timeout:     timeout,
  77  		db:          db,
  78  		queryCache:  NewQueryCache(cacheTTL, 100000), // 100k cached queries
  79  		connections: make(map[string]*RelayConnection),
  80  		enabled:     true,
  81  	}
  82  
  83  	log.I.F("archive manager initialized with %d relays, %v timeout, %v cache TTL",
  84  		len(cfg.Relays), timeout, cacheTTL)
  85  
  86  	return m
  87  }
  88  
  89  // IsEnabled returns whether the archive manager is enabled.
  90  func (m *Manager) IsEnabled() bool {
  91  	return m.enabled
  92  }
  93  
  94  // QueryArchive queries archive relays asynchronously and stores/streams results.
  95  // This should be called in a goroutine after returning local results.
  96  //
  97  // Parameters:
  98  //   - subID: the subscription ID for the query
  99  //   - connID: the connection ID (for access tracking)
 100  //   - f: the filter to query
 101  //   - delivered: map of event IDs already delivered to the client
 102  //   - listener: optional channel to stream results back (may be nil)
 103  func (m *Manager) QueryArchive(
 104  	subID string,
 105  	connID string,
 106  	f *filter.F,
 107  	delivered map[string]struct{},
 108  	listener EventDeliveryChannel,
 109  ) {
 110  	if !m.enabled {
 111  		return
 112  	}
 113  
 114  	// Check if this query was recently executed
 115  	if m.queryCache.HasQueried(f) {
 116  		log.D.F("archive: query cache hit, skipping archive query for sub %s", subID)
 117  		return
 118  	}
 119  
 120  	// Mark query as executed
 121  	m.queryCache.MarkQueried(f)
 122  
 123  	// Create query context with timeout
 124  	queryCtx, cancel := context.WithTimeout(m.ctx, m.timeout)
 125  	defer cancel()
 126  
 127  	// Query all relays in parallel
 128  	var wg sync.WaitGroup
 129  	results := make(chan *event.E, 1000)
 130  
 131  	for _, relayURL := range m.relays {
 132  		wg.Add(1)
 133  		go func(url string) {
 134  			defer wg.Done()
 135  			m.queryRelay(queryCtx, url, f, results)
 136  		}(relayURL)
 137  	}
 138  
 139  	// Close results channel when all relays are done
 140  	go func() {
 141  		wg.Wait()
 142  		close(results)
 143  	}()
 144  
 145  	// Process results
 146  	stored := 0
 147  	streamed := 0
 148  
 149  	for ev := range results {
 150  		// Skip if already delivered
 151  		evIDStr := string(ev.ID[:])
 152  		if _, exists := delivered[evIDStr]; exists {
 153  			continue
 154  		}
 155  
 156  		// Store event
 157  		exists, err := m.db.SaveEvent(queryCtx, ev)
 158  		if err != nil {
 159  			log.D.F("archive: failed to save event: %v", err)
 160  			continue
 161  		}
 162  		if !exists {
 163  			stored++
 164  		}
 165  
 166  		// Stream to client if still connected
 167  		if listener != nil && listener.IsConnected() {
 168  			if err := listener.SendEvent(ev); err == nil {
 169  				streamed++
 170  				delivered[evIDStr] = struct{}{}
 171  			}
 172  		}
 173  	}
 174  
 175  	if stored > 0 || streamed > 0 {
 176  		log.D.F("archive: query %s completed - stored: %d, streamed: %d", subID, stored, streamed)
 177  	}
 178  }
 179  
 180  // queryRelay queries a single archive relay and sends results to the channel.
 181  func (m *Manager) queryRelay(ctx context.Context, url string, f *filter.F, results chan<- *event.E) {
 182  	conn, err := m.getOrCreateConnection(url)
 183  	if err != nil {
 184  		log.D.F("archive: failed to connect to %s: %v", url, err)
 185  		return
 186  	}
 187  
 188  	events, err := conn.Query(ctx, f)
 189  	if err != nil {
 190  		log.D.F("archive: query failed on %s: %v", url, err)
 191  		return
 192  	}
 193  
 194  	for _, ev := range events {
 195  		select {
 196  		case <-ctx.Done():
 197  			return
 198  		case results <- ev:
 199  		}
 200  	}
 201  }
 202  
 203  // getOrCreateConnection returns an existing connection or creates a new one.
 204  func (m *Manager) getOrCreateConnection(url string) (*RelayConnection, error) {
 205  	m.mu.RLock()
 206  	conn, exists := m.connections[url]
 207  	m.mu.RUnlock()
 208  
 209  	if exists && conn.IsConnected() {
 210  		return conn, nil
 211  	}
 212  
 213  	m.mu.Lock()
 214  	defer m.mu.Unlock()
 215  
 216  	// Double-check after acquiring write lock
 217  	conn, exists = m.connections[url]
 218  	if exists && conn.IsConnected() {
 219  		return conn, nil
 220  	}
 221  
 222  	// Create new connection
 223  	conn = NewRelayConnection(m.ctx, url)
 224  	if err := conn.Connect(); err != nil {
 225  		return nil, err
 226  	}
 227  
 228  	m.connections[url] = conn
 229  	return conn, nil
 230  }
 231  
 232  // Stop stops the archive manager and closes all connections.
 233  func (m *Manager) Stop() {
 234  	if !m.enabled {
 235  		return
 236  	}
 237  
 238  	m.cancel()
 239  
 240  	m.mu.Lock()
 241  	defer m.mu.Unlock()
 242  
 243  	for _, conn := range m.connections {
 244  		conn.Close()
 245  	}
 246  	m.connections = make(map[string]*RelayConnection)
 247  
 248  	log.I.F("archive manager stopped")
 249  }
 250  
 251  // Stats returns current archive manager statistics.
 252  func (m *Manager) Stats() ManagerStats {
 253  	if !m.enabled {
 254  		return ManagerStats{}
 255  	}
 256  
 257  	m.mu.RLock()
 258  	defer m.mu.RUnlock()
 259  
 260  	connected := 0
 261  	for _, conn := range m.connections {
 262  		if conn.IsConnected() {
 263  			connected++
 264  		}
 265  	}
 266  
 267  	return ManagerStats{
 268  		Enabled:          m.enabled,
 269  		TotalRelays:      len(m.relays),
 270  		ConnectedRelays:  connected,
 271  		CachedQueries:    m.queryCache.Len(),
 272  		MaxCachedQueries: m.queryCache.MaxSize(),
 273  	}
 274  }
 275  
 276  // ManagerStats holds archive manager statistics.
 277  type ManagerStats struct {
 278  	Enabled          bool
 279  	TotalRelays      int
 280  	ConnectedRelays  int
 281  	CachedQueries    int
 282  	MaxCachedQueries int
 283  }
 284