event_cache.go raw
1 package querycache
2
3 import (
4 "container/list"
5 "sync"
6 "time"
7
8 "github.com/klauspost/compress/zstd"
9 "next.orly.dev/pkg/lol/log"
10 "next.orly.dev/pkg/nostr/encoders/event"
11 "next.orly.dev/pkg/nostr/encoders/filter"
12 )
13
14 const (
15 // DefaultMaxSize is the default maximum cache size in bytes (512 MB)
16 DefaultMaxSize = 512 * 1024 * 1024
17 // DefaultMaxAge is the default maximum age for cache entries
18 DefaultMaxAge = 5 * time.Minute
19 )
20
21 // EventCacheEntry represents a cached set of compressed serialized events for a filter
22 type EventCacheEntry struct {
23 FilterKey string
24 CompressedData []byte // ZSTD compressed serialized JSON events
25 UncompressedSize int // Original size before compression (for stats)
26 CompressedSize int // Actual compressed size in bytes
27 EventCount int // Number of events in this entry
28 LastAccess time.Time
29 CreatedAt time.Time
30 listElement *list.Element
31 }
32
33 // EventCache caches event.S results from database queries with ZSTD compression
34 type EventCache struct {
35 mu sync.RWMutex
36
37 entries map[string]*EventCacheEntry
38 lruList *list.List
39
40 currentSize int64 // Tracks compressed size
41 maxSize int64
42 maxAge time.Duration
43
44 // ZSTD encoder/decoder — encoder is NOT safe for concurrent use,
45 // so we protect it with a dedicated mutex. Decoder is safe.
46 encoder *zstd.Encoder
47 encoderMu sync.Mutex
48 decoder *zstd.Decoder
49
50 // Compaction tracking
51 needsCompaction bool
52 compactionChan chan struct{}
53
54 // Shutdown signal for background goroutines
55 stopCh chan struct{}
56
57 // Metrics
58 hits uint64
59 misses uint64
60 evictions uint64
61 invalidations uint64
62 compressionRatio float64 // Average compression ratio
63 compactionRuns uint64
64 }
65
66 // NewEventCache creates a new event cache
67 func NewEventCache(maxSize int64, maxAge time.Duration) *EventCache {
68 if maxSize <= 0 {
69 maxSize = DefaultMaxSize
70 }
71 if maxAge <= 0 {
72 maxAge = DefaultMaxAge
73 }
74
75 // Create ZSTD encoder at level 9 (best compression)
76 encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression))
77 if err != nil {
78 log.E.F("failed to create ZSTD encoder: %v", err)
79 return nil
80 }
81
82 // Create ZSTD decoder
83 decoder, err := zstd.NewReader(nil)
84 if err != nil {
85 log.E.F("failed to create ZSTD decoder: %v", err)
86 return nil
87 }
88
89 c := &EventCache{
90 entries: make(map[string]*EventCacheEntry),
91 lruList: list.New(),
92 maxSize: maxSize,
93 maxAge: maxAge,
94 encoder: encoder,
95 decoder: decoder,
96 compactionChan: make(chan struct{}, 1),
97 stopCh: make(chan struct{}),
98 }
99
100 // Start background workers
101 go c.cleanupExpired()
102 go c.compactionWorker()
103
104 return c
105 }
106
107 // Close stops background goroutines. Safe to call multiple times.
108 func (c *EventCache) Close() {
109 select {
110 case <-c.stopCh:
111 // already closed
112 default:
113 close(c.stopCh)
114 }
115 }
116
117 // Get retrieves cached serialized events for a filter (decompresses on the fly)
118 func (c *EventCache) Get(f *filter.F) (serializedJSON [][]byte, found bool) {
119 // Normalize filter by sorting to ensure consistent cache keys
120 f.Sort()
121 filterKey := string(f.Serialize())
122
123 c.mu.Lock()
124 entry, exists := c.entries[filterKey]
125 if !exists {
126 c.misses++
127 c.mu.Unlock()
128 return nil, false
129 }
130
131 // Check if expired
132 if time.Since(entry.CreatedAt) > c.maxAge {
133 c.removeEntry(entry)
134 c.misses++
135 c.mu.Unlock()
136 return nil, false
137 }
138
139 // Copy compressed data under lock so eviction can't free it
140 compressedCopy := make([]byte, len(entry.CompressedData))
141 copy(compressedCopy, entry.CompressedData)
142 eventCount := entry.EventCount
143 compressedSize := entry.CompressedSize
144 uncompressedSize := entry.UncompressedSize
145
146 // Update access time and move to front
147 entry.LastAccess = time.Now()
148 c.lruList.MoveToFront(entry.listElement)
149 c.hits++
150 c.mu.Unlock()
151
152 // Decompress outside lock
153 decompressed, err := c.decoder.DecodeAll(compressedCopy, nil)
154 if err != nil {
155 log.E.F("failed to decompress cache entry: %v", err)
156 return nil, false
157 }
158
159 // Deserialize the individual JSON events from the decompressed blob
160 // Format: each event is newline-delimited JSON
161 serializedJSON = make([][]byte, 0, eventCount)
162 start := 0
163 for i := 0; i < len(decompressed); i++ {
164 if decompressed[i] == '\n' {
165 if i > start {
166 eventJSON := make([]byte, i-start)
167 copy(eventJSON, decompressed[start:i])
168 serializedJSON = append(serializedJSON, eventJSON)
169 }
170 start = i + 1
171 }
172 }
173 // Handle last event if no trailing newline
174 if start < len(decompressed) {
175 eventJSON := make([]byte, len(decompressed)-start)
176 copy(eventJSON, decompressed[start:])
177 serializedJSON = append(serializedJSON, eventJSON)
178 }
179
180 log.D.F("event cache HIT: filter=%s events=%d compressed=%d uncompressed=%d ratio=%.2f",
181 filterKey[:min(50, len(filterKey))], eventCount, compressedSize,
182 uncompressedSize, float64(uncompressedSize)/float64(compressedSize))
183
184 return serializedJSON, true
185 }
186
187 // PutJSON stores pre-marshaled JSON in the cache with ZSTD compression
188 // This should be called AFTER events are sent to the client with the marshaled envelopes
189 func (c *EventCache) PutJSON(f *filter.F, marshaledJSON [][]byte) {
190 if len(marshaledJSON) == 0 {
191 return
192 }
193
194 // Normalize filter by sorting to ensure consistent cache keys
195 f.Sort()
196 filterKey := string(f.Serialize())
197
198 // Concatenate all JSON events with newline delimiters for compression
199 totalSize := 0
200 for _, jsonData := range marshaledJSON {
201 totalSize += len(jsonData) + 1 // +1 for newline
202 }
203
204 uncompressed := make([]byte, 0, totalSize)
205 for _, jsonData := range marshaledJSON {
206 uncompressed = append(uncompressed, jsonData...)
207 uncompressed = append(uncompressed, '\n')
208 }
209
210 // Compress with ZSTD — encoder is not concurrent-safe, use dedicated lock
211 c.encoderMu.Lock()
212 compressed := c.encoder.EncodeAll(uncompressed, nil)
213 c.encoderMu.Unlock()
214 compressedSize := len(compressed)
215
216 // Don't cache if compressed size is still too large
217 if int64(compressedSize) > c.maxSize {
218 log.W.F("event cache: compressed entry too large: %d bytes", compressedSize)
219 return
220 }
221
222 c.mu.Lock()
223 defer c.mu.Unlock()
224
225 // Check if already exists
226 if existing, exists := c.entries[filterKey]; exists {
227 c.currentSize -= int64(existing.CompressedSize)
228 existing.CompressedData = compressed
229 existing.UncompressedSize = totalSize
230 existing.CompressedSize = compressedSize
231 existing.EventCount = len(marshaledJSON)
232 existing.LastAccess = time.Now()
233 existing.CreatedAt = time.Now()
234 c.currentSize += int64(compressedSize)
235 c.lruList.MoveToFront(existing.listElement)
236 c.updateCompressionRatio(totalSize, compressedSize)
237 log.T.F("event cache UPDATE: filter=%s events=%d ratio=%.2f",
238 filterKey[:min(50, len(filterKey))], len(marshaledJSON),
239 float64(totalSize)/float64(compressedSize))
240 return
241 }
242
243 // Evict if necessary
244 evictionCount := 0
245 for c.currentSize+int64(compressedSize) > c.maxSize && c.lruList.Len() > 0 {
246 oldest := c.lruList.Back()
247 if oldest != nil {
248 oldEntry := oldest.Value.(*EventCacheEntry)
249 c.removeEntry(oldEntry)
250 c.evictions++
251 evictionCount++
252 }
253 }
254
255 // Trigger compaction if we evicted entries
256 if evictionCount > 0 {
257 c.needsCompaction = true
258 select {
259 case c.compactionChan <- struct{}{}:
260 default:
261 // Channel already has signal, compaction will run
262 }
263 }
264
265 // Create new entry
266 entry := &EventCacheEntry{
267 FilterKey: filterKey,
268 CompressedData: compressed,
269 UncompressedSize: totalSize,
270 CompressedSize: compressedSize,
271 EventCount: len(marshaledJSON),
272 LastAccess: time.Now(),
273 CreatedAt: time.Now(),
274 }
275
276 entry.listElement = c.lruList.PushFront(entry)
277 c.entries[filterKey] = entry
278 c.currentSize += int64(compressedSize)
279 c.updateCompressionRatio(totalSize, compressedSize)
280
281 log.D.F("event cache PUT: filter=%s events=%d uncompressed=%d compressed=%d ratio=%.2f total=%d/%d",
282 filterKey[:min(50, len(filterKey))], len(marshaledJSON), totalSize, compressedSize,
283 float64(totalSize)/float64(compressedSize), c.currentSize, c.maxSize)
284 }
285
286 // updateCompressionRatio updates the rolling average compression ratio
287 func (c *EventCache) updateCompressionRatio(uncompressed, compressed int) {
288 if compressed == 0 {
289 return
290 }
291 newRatio := float64(uncompressed) / float64(compressed)
292 // Use exponential moving average
293 if c.compressionRatio == 0 {
294 c.compressionRatio = newRatio
295 } else {
296 c.compressionRatio = 0.9*c.compressionRatio + 0.1*newRatio
297 }
298 }
299
300 // Invalidate clears all entries (called when new events are stored)
301 func (c *EventCache) Invalidate() {
302 c.mu.Lock()
303 defer c.mu.Unlock()
304
305 if len(c.entries) > 0 {
306 cleared := len(c.entries)
307 c.entries = make(map[string]*EventCacheEntry)
308 c.lruList = list.New()
309 c.currentSize = 0
310 c.invalidations += uint64(cleared)
311 log.T.F("event cache INVALIDATE: cleared %d entries", cleared)
312 }
313 }
314
315 // removeEntry removes an entry (must be called with lock held)
316 func (c *EventCache) removeEntry(entry *EventCacheEntry) {
317 delete(c.entries, entry.FilterKey)
318 c.lruList.Remove(entry.listElement)
319 c.currentSize -= int64(entry.CompressedSize)
320 }
321
322 // compactionWorker runs in the background and compacts cache entries after evictions
323 // to reclaim fragmented space and improve cache efficiency
324 func (c *EventCache) compactionWorker() {
325 for {
326 select {
327 case <-c.stopCh:
328 return
329 case _, ok := <-c.compactionChan:
330 if !ok {
331 return
332 }
333 }
334
335 c.mu.Lock()
336 if !c.needsCompaction {
337 c.mu.Unlock()
338 continue
339 }
340
341 log.D.F("cache compaction: starting (entries=%d size=%d/%d)",
342 len(c.entries), c.currentSize, c.maxSize)
343
344 c.needsCompaction = false
345 c.compactionRuns++
346 c.mu.Unlock()
347
348 log.D.F("cache compaction: completed (runs=%d)", c.compactionRuns)
349 }
350 }
351
352 // cleanupExpired removes expired entries periodically
353 func (c *EventCache) cleanupExpired() {
354 ticker := time.NewTicker(1 * time.Minute)
355 defer ticker.Stop()
356
357 for {
358 select {
359 case <-c.stopCh:
360 return
361 case <-ticker.C:
362 }
363
364 c.mu.Lock()
365 now := time.Now()
366 var toRemove []*EventCacheEntry
367
368 for _, entry := range c.entries {
369 if now.Sub(entry.CreatedAt) > c.maxAge {
370 toRemove = append(toRemove, entry)
371 }
372 }
373
374 for _, entry := range toRemove {
375 c.removeEntry(entry)
376 }
377
378 if len(toRemove) > 0 {
379 log.D.F("event cache cleanup: removed %d expired entries", len(toRemove))
380 }
381
382 c.mu.Unlock()
383 }
384 }
385
386 // CacheStats holds cache performance metrics
387 type CacheStats struct {
388 Entries int
389 CurrentSize int64 // Compressed size
390 MaxSize int64
391 Hits uint64
392 Misses uint64
393 HitRate float64
394 Evictions uint64
395 Invalidations uint64
396 CompressionRatio float64 // Average compression ratio
397 CompactionRuns uint64
398 }
399
400 // Stats returns cache statistics
401 func (c *EventCache) Stats() CacheStats {
402 c.mu.RLock()
403 defer c.mu.RUnlock()
404
405 total := c.hits + c.misses
406 hitRate := 0.0
407 if total > 0 {
408 hitRate = float64(c.hits) / float64(total)
409 }
410
411 return CacheStats{
412 Entries: len(c.entries),
413 CurrentSize: c.currentSize,
414 MaxSize: c.maxSize,
415 Hits: c.hits,
416 Misses: c.misses,
417 HitRate: hitRate,
418 Evictions: c.evictions,
419 Invalidations: c.invalidations,
420 CompressionRatio: c.compressionRatio,
421 CompactionRuns: c.compactionRuns,
422 }
423 }
424
425 func min(a, b int) int {
426 if a < b {
427 return a
428 }
429 return b
430 }
431
432 // GetEvents retrieves cached events for a filter (decompresses and deserializes on the fly)
433 // This is the new method that returns event.E objects instead of marshaled JSON
434 func (c *EventCache) GetEvents(f *filter.F) (events []*event.E, found bool) {
435 // Normalize filter by sorting to ensure consistent cache keys
436 f.Sort()
437 filterKey := string(f.Serialize())
438
439 c.mu.Lock()
440 entry, exists := c.entries[filterKey]
441 if !exists {
442 c.misses++
443 c.mu.Unlock()
444 return nil, false
445 }
446
447 // Check if entry is expired
448 if time.Since(entry.CreatedAt) > c.maxAge {
449 c.removeEntry(entry)
450 c.misses++
451 c.mu.Unlock()
452 return nil, false
453 }
454
455 // Copy compressed data under lock so eviction can't free it
456 compressedCopy := make([]byte, len(entry.CompressedData))
457 copy(compressedCopy, entry.CompressedData)
458 eventCount := entry.EventCount
459 compressedSize := entry.CompressedSize
460 uncompressedSize := entry.UncompressedSize
461
462 // Update access time and move to front
463 entry.LastAccess = time.Now()
464 c.lruList.MoveToFront(entry.listElement)
465 c.hits++
466 c.mu.Unlock()
467
468 // Decompress outside lock — decoder is safe for concurrent use
469 decompressed, err := c.decoder.DecodeAll(compressedCopy, nil)
470 if err != nil {
471 log.E.F("failed to decompress cached events: %v", err)
472 return nil, false
473 }
474
475 // Deserialize events from newline-delimited JSON
476 events = make([]*event.E, 0, eventCount)
477 start := 0
478 for i, b := range decompressed {
479 if b == '\n' {
480 if i > start {
481 ev := event.New()
482 if _, err := ev.Unmarshal(decompressed[start:i]); err != nil {
483 log.E.F("failed to unmarshal cached event: %v", err)
484 return nil, false
485 }
486 events = append(events, ev)
487 }
488 start = i + 1
489 }
490 }
491
492 // Handle last event if no trailing newline
493 if start < len(decompressed) {
494 ev := event.New()
495 if _, err := ev.Unmarshal(decompressed[start:]); err != nil {
496 log.E.F("failed to unmarshal cached event: %v", err)
497 return nil, false
498 }
499 events = append(events, ev)
500 }
501
502 log.D.F("event cache HIT: filter=%s events=%d compressed=%d uncompressed=%d ratio=%.2f",
503 filterKey[:min(50, len(filterKey))], eventCount, compressedSize,
504 uncompressedSize, float64(uncompressedSize)/float64(compressedSize))
505
506 return events, true
507 }
508
509 // PutEvents stores events in the cache with ZSTD compression
510 // This should be called AFTER events are sent to the client
511 func (c *EventCache) PutEvents(f *filter.F, events []*event.E) {
512 if len(events) == 0 {
513 return
514 }
515
516 // Normalize filter by sorting to ensure consistent cache keys
517 f.Sort()
518 filterKey := string(f.Serialize())
519
520 // Serialize all events as newline-delimited JSON for compression
521 totalSize := 0
522 for _, ev := range events {
523 totalSize += ev.EstimateSize() + 1 // +1 for newline
524 }
525
526 uncompressed := make([]byte, 0, totalSize)
527 for _, ev := range events {
528 uncompressed = ev.Marshal(uncompressed)
529 uncompressed = append(uncompressed, '\n')
530 }
531
532 // Compress with ZSTD — encoder is not concurrent-safe, use dedicated lock
533 c.encoderMu.Lock()
534 compressed := c.encoder.EncodeAll(uncompressed, nil)
535 c.encoderMu.Unlock()
536 compressedSize := len(compressed)
537
538 // Don't cache if compressed size is still too large
539 if int64(compressedSize) > c.maxSize {
540 log.W.F("event cache: compressed entry too large: %d bytes", compressedSize)
541 return
542 }
543
544 c.mu.Lock()
545 defer c.mu.Unlock()
546
547 // Check if already exists
548 if existing, exists := c.entries[filterKey]; exists {
549 c.currentSize -= int64(existing.CompressedSize)
550 existing.CompressedData = compressed
551 existing.UncompressedSize = len(uncompressed)
552 existing.CompressedSize = compressedSize
553 existing.EventCount = len(events)
554 existing.LastAccess = time.Now()
555 existing.CreatedAt = time.Now()
556 c.currentSize += int64(compressedSize)
557 c.lruList.MoveToFront(existing.listElement)
558 c.updateCompressionRatio(len(uncompressed), compressedSize)
559 log.T.F("event cache UPDATE: filter=%s events=%d ratio=%.2f",
560 filterKey[:min(50, len(filterKey))], len(events),
561 float64(len(uncompressed))/float64(compressedSize))
562 return
563 }
564
565 // Evict if necessary
566 evictionCount := 0
567 for c.currentSize+int64(compressedSize) > c.maxSize && c.lruList.Len() > 0 {
568 oldest := c.lruList.Back()
569 if oldest != nil {
570 oldEntry := oldest.Value.(*EventCacheEntry)
571 c.removeEntry(oldEntry)
572 c.evictions++
573 evictionCount++
574 }
575 }
576
577 if evictionCount > 0 {
578 c.needsCompaction = true
579 select {
580 case c.compactionChan <- struct{}{}:
581 default:
582 }
583 }
584
585 // Create new entry
586 entry := &EventCacheEntry{
587 FilterKey: filterKey,
588 CompressedData: compressed,
589 UncompressedSize: len(uncompressed),
590 CompressedSize: compressedSize,
591 EventCount: len(events),
592 LastAccess: time.Now(),
593 CreatedAt: time.Now(),
594 }
595
596 entry.listElement = c.lruList.PushFront(entry)
597 c.entries[filterKey] = entry
598 c.currentSize += int64(compressedSize)
599 c.updateCompressionRatio(len(uncompressed), compressedSize)
600
601 log.D.F("event cache PUT: filter=%s events=%d uncompressed=%d compressed=%d ratio=%.2f total=%d/%d",
602 filterKey[:min(50, len(filterKey))], len(events), len(uncompressed), compressedSize,
603 float64(len(uncompressed))/float64(compressedSize), c.currentSize, c.maxSize)
604 }
605