archive.go raw
1 // Package archive provides query augmentation from authoritative archive relays.
2 // It manages connections to archive relays and fetches events that match local
3 // queries, caching them locally for future access.
4 package archive
5
6 import (
7 "context"
8 "sync"
9 "time"
10
11 "next.orly.dev/pkg/lol/log"
12
13 "next.orly.dev/pkg/nostr/encoders/event"
14 "next.orly.dev/pkg/nostr/encoders/filter"
15 )
16
17 // ArchiveDatabase defines the interface for storing fetched events.
18 type ArchiveDatabase interface {
19 SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error)
20 }
21
22 // EventDeliveryChannel defines the interface for streaming results back to clients.
23 type EventDeliveryChannel interface {
24 SendEvent(ev *event.E) error
25 IsConnected() bool
26 }
27
28 // Manager handles connections to archive relays for query augmentation.
29 type Manager struct {
30 ctx context.Context
31 cancel context.CancelFunc
32
33 relays []string
34 timeout time.Duration
35 db ArchiveDatabase
36 queryCache *QueryCache
37
38 // Connection pool
39 mu sync.RWMutex
40 connections map[string]*RelayConnection
41
42 // Configuration
43 enabled bool
44 }
45
46 // Config holds the configuration for the archive manager.
47 type Config struct {
48 Enabled bool
49 Relays []string
50 TimeoutSec int
51 CacheTTLHrs int
52 }
53
54 // New creates a new archive manager.
55 func New(ctx context.Context, db ArchiveDatabase, cfg Config) *Manager {
56 if !cfg.Enabled || len(cfg.Relays) == 0 {
57 return &Manager{enabled: false}
58 }
59
60 mgrCtx, cancel := context.WithCancel(ctx)
61
62 timeout := time.Duration(cfg.TimeoutSec) * time.Second
63 if timeout <= 0 {
64 timeout = 30 * time.Second
65 }
66
67 cacheTTL := time.Duration(cfg.CacheTTLHrs) * time.Hour
68 if cacheTTL <= 0 {
69 cacheTTL = 24 * time.Hour
70 }
71
72 m := &Manager{
73 ctx: mgrCtx,
74 cancel: cancel,
75 relays: cfg.Relays,
76 timeout: timeout,
77 db: db,
78 queryCache: NewQueryCache(cacheTTL, 100000), // 100k cached queries
79 connections: make(map[string]*RelayConnection),
80 enabled: true,
81 }
82
83 log.I.F("archive manager initialized with %d relays, %v timeout, %v cache TTL",
84 len(cfg.Relays), timeout, cacheTTL)
85
86 return m
87 }
88
89 // IsEnabled returns whether the archive manager is enabled.
90 func (m *Manager) IsEnabled() bool {
91 return m.enabled
92 }
93
94 // QueryArchive queries archive relays asynchronously and stores/streams results.
95 // This should be called in a goroutine after returning local results.
96 //
97 // Parameters:
98 // - subID: the subscription ID for the query
99 // - connID: the connection ID (for access tracking)
100 // - f: the filter to query
101 // - delivered: map of event IDs already delivered to the client
102 // - listener: optional channel to stream results back (may be nil)
103 func (m *Manager) QueryArchive(
104 subID string,
105 connID string,
106 f *filter.F,
107 delivered map[string]struct{},
108 listener EventDeliveryChannel,
109 ) {
110 if !m.enabled {
111 return
112 }
113
114 // Check if this query was recently executed
115 if m.queryCache.HasQueried(f) {
116 log.D.F("archive: query cache hit, skipping archive query for sub %s", subID)
117 return
118 }
119
120 // Mark query as executed
121 m.queryCache.MarkQueried(f)
122
123 // Create query context with timeout
124 queryCtx, cancel := context.WithTimeout(m.ctx, m.timeout)
125 defer cancel()
126
127 // Query all relays in parallel
128 var wg sync.WaitGroup
129 results := make(chan *event.E, 1000)
130
131 for _, relayURL := range m.relays {
132 wg.Add(1)
133 go func(url string) {
134 defer wg.Done()
135 m.queryRelay(queryCtx, url, f, results)
136 }(relayURL)
137 }
138
139 // Close results channel when all relays are done
140 go func() {
141 wg.Wait()
142 close(results)
143 }()
144
145 // Process results
146 stored := 0
147 streamed := 0
148
149 for ev := range results {
150 // Skip if already delivered
151 evIDStr := string(ev.ID[:])
152 if _, exists := delivered[evIDStr]; exists {
153 continue
154 }
155
156 // Store event
157 exists, err := m.db.SaveEvent(queryCtx, ev)
158 if err != nil {
159 log.D.F("archive: failed to save event: %v", err)
160 continue
161 }
162 if !exists {
163 stored++
164 }
165
166 // Stream to client if still connected
167 if listener != nil && listener.IsConnected() {
168 if err := listener.SendEvent(ev); err == nil {
169 streamed++
170 delivered[evIDStr] = struct{}{}
171 }
172 }
173 }
174
175 if stored > 0 || streamed > 0 {
176 log.D.F("archive: query %s completed - stored: %d, streamed: %d", subID, stored, streamed)
177 }
178 }
179
180 // queryRelay queries a single archive relay and sends results to the channel.
181 func (m *Manager) queryRelay(ctx context.Context, url string, f *filter.F, results chan<- *event.E) {
182 conn, err := m.getOrCreateConnection(url)
183 if err != nil {
184 log.D.F("archive: failed to connect to %s: %v", url, err)
185 return
186 }
187
188 events, err := conn.Query(ctx, f)
189 if err != nil {
190 log.D.F("archive: query failed on %s: %v", url, err)
191 return
192 }
193
194 for _, ev := range events {
195 select {
196 case <-ctx.Done():
197 return
198 case results <- ev:
199 }
200 }
201 }
202
203 // getOrCreateConnection returns an existing connection or creates a new one.
204 func (m *Manager) getOrCreateConnection(url string) (*RelayConnection, error) {
205 m.mu.RLock()
206 conn, exists := m.connections[url]
207 m.mu.RUnlock()
208
209 if exists && conn.IsConnected() {
210 return conn, nil
211 }
212
213 m.mu.Lock()
214 defer m.mu.Unlock()
215
216 // Double-check after acquiring write lock
217 conn, exists = m.connections[url]
218 if exists && conn.IsConnected() {
219 return conn, nil
220 }
221
222 // Create new connection
223 conn = NewRelayConnection(m.ctx, url)
224 if err := conn.Connect(); err != nil {
225 return nil, err
226 }
227
228 m.connections[url] = conn
229 return conn, nil
230 }
231
232 // Stop stops the archive manager and closes all connections.
233 func (m *Manager) Stop() {
234 if !m.enabled {
235 return
236 }
237
238 m.cancel()
239
240 m.mu.Lock()
241 defer m.mu.Unlock()
242
243 for _, conn := range m.connections {
244 conn.Close()
245 }
246 m.connections = make(map[string]*RelayConnection)
247
248 log.I.F("archive manager stopped")
249 }
250
251 // Stats returns current archive manager statistics.
252 func (m *Manager) Stats() ManagerStats {
253 if !m.enabled {
254 return ManagerStats{}
255 }
256
257 m.mu.RLock()
258 defer m.mu.RUnlock()
259
260 connected := 0
261 for _, conn := range m.connections {
262 if conn.IsConnected() {
263 connected++
264 }
265 }
266
267 return ManagerStats{
268 Enabled: m.enabled,
269 TotalRelays: len(m.relays),
270 ConnectedRelays: connected,
271 CachedQueries: m.queryCache.Len(),
272 MaxCachedQueries: m.queryCache.MaxSize(),
273 }
274 }
275
276 // ManagerStats holds archive manager statistics.
277 type ManagerStats struct {
278 Enabled bool
279 TotalRelays int
280 ConnectedRelays int
281 CachedQueries int
282 MaxCachedQueries int
283 }
284