event_cache.go raw

   1  package querycache
   2  
   3  import (
   4  	"container/list"
   5  	"sync"
   6  	"time"
   7  
   8  	"github.com/klauspost/compress/zstd"
   9  	"next.orly.dev/pkg/lol/log"
  10  	"next.orly.dev/pkg/nostr/encoders/event"
  11  	"next.orly.dev/pkg/nostr/encoders/filter"
  12  )
  13  
  14  const (
  15  	// DefaultMaxSize is the default maximum cache size in bytes (512 MB)
  16  	DefaultMaxSize = 512 * 1024 * 1024
  17  	// DefaultMaxAge is the default maximum age for cache entries
  18  	DefaultMaxAge = 5 * time.Minute
  19  )
  20  
  21  // EventCacheEntry represents a cached set of compressed serialized events for a filter
  22  type EventCacheEntry struct {
  23  	FilterKey        string
  24  	CompressedData   []byte    // ZSTD compressed serialized JSON events
  25  	UncompressedSize int       // Original size before compression (for stats)
  26  	CompressedSize   int       // Actual compressed size in bytes
  27  	EventCount       int       // Number of events in this entry
  28  	LastAccess       time.Time
  29  	CreatedAt        time.Time
  30  	listElement      *list.Element
  31  }
  32  
  33  // EventCache caches event.S results from database queries with ZSTD compression
  34  type EventCache struct {
  35  	mu sync.RWMutex
  36  
  37  	entries map[string]*EventCacheEntry
  38  	lruList *list.List
  39  
  40  	currentSize int64 // Tracks compressed size
  41  	maxSize     int64
  42  	maxAge      time.Duration
  43  
  44  	// ZSTD encoder/decoder — encoder is NOT safe for concurrent use,
  45  	// so we protect it with a dedicated mutex. Decoder is safe.
  46  	encoder   *zstd.Encoder
  47  	encoderMu sync.Mutex
  48  	decoder   *zstd.Decoder
  49  
  50  	// Compaction tracking
  51  	needsCompaction bool
  52  	compactionChan  chan struct{}
  53  
  54  	// Shutdown signal for background goroutines
  55  	stopCh chan struct{}
  56  
  57  	// Metrics
  58  	hits             uint64
  59  	misses           uint64
  60  	evictions        uint64
  61  	invalidations    uint64
  62  	compressionRatio float64 // Average compression ratio
  63  	compactionRuns   uint64
  64  }
  65  
  66  // NewEventCache creates a new event cache
  67  func NewEventCache(maxSize int64, maxAge time.Duration) *EventCache {
  68  	if maxSize <= 0 {
  69  		maxSize = DefaultMaxSize
  70  	}
  71  	if maxAge <= 0 {
  72  		maxAge = DefaultMaxAge
  73  	}
  74  
  75  	// Create ZSTD encoder at level 9 (best compression)
  76  	encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression))
  77  	if err != nil {
  78  		log.E.F("failed to create ZSTD encoder: %v", err)
  79  		return nil
  80  	}
  81  
  82  	// Create ZSTD decoder
  83  	decoder, err := zstd.NewReader(nil)
  84  	if err != nil {
  85  		log.E.F("failed to create ZSTD decoder: %v", err)
  86  		return nil
  87  	}
  88  
  89  	c := &EventCache{
  90  		entries:        make(map[string]*EventCacheEntry),
  91  		lruList:        list.New(),
  92  		maxSize:        maxSize,
  93  		maxAge:         maxAge,
  94  		encoder:        encoder,
  95  		decoder:        decoder,
  96  		compactionChan: make(chan struct{}, 1),
  97  		stopCh:         make(chan struct{}),
  98  	}
  99  
 100  	// Start background workers
 101  	go c.cleanupExpired()
 102  	go c.compactionWorker()
 103  
 104  	return c
 105  }
 106  
 107  // Close stops background goroutines. Safe to call multiple times.
 108  func (c *EventCache) Close() {
 109  	select {
 110  	case <-c.stopCh:
 111  		// already closed
 112  	default:
 113  		close(c.stopCh)
 114  	}
 115  }
 116  
 117  // Get retrieves cached serialized events for a filter (decompresses on the fly)
 118  func (c *EventCache) Get(f *filter.F) (serializedJSON [][]byte, found bool) {
 119  	// Normalize filter by sorting to ensure consistent cache keys
 120  	f.Sort()
 121  	filterKey := string(f.Serialize())
 122  
 123  	c.mu.Lock()
 124  	entry, exists := c.entries[filterKey]
 125  	if !exists {
 126  		c.misses++
 127  		c.mu.Unlock()
 128  		return nil, false
 129  	}
 130  
 131  	// Check if expired
 132  	if time.Since(entry.CreatedAt) > c.maxAge {
 133  		c.removeEntry(entry)
 134  		c.misses++
 135  		c.mu.Unlock()
 136  		return nil, false
 137  	}
 138  
 139  	// Copy compressed data under lock so eviction can't free it
 140  	compressedCopy := make([]byte, len(entry.CompressedData))
 141  	copy(compressedCopy, entry.CompressedData)
 142  	eventCount := entry.EventCount
 143  	compressedSize := entry.CompressedSize
 144  	uncompressedSize := entry.UncompressedSize
 145  
 146  	// Update access time and move to front
 147  	entry.LastAccess = time.Now()
 148  	c.lruList.MoveToFront(entry.listElement)
 149  	c.hits++
 150  	c.mu.Unlock()
 151  
 152  	// Decompress outside lock
 153  	decompressed, err := c.decoder.DecodeAll(compressedCopy, nil)
 154  	if err != nil {
 155  		log.E.F("failed to decompress cache entry: %v", err)
 156  		return nil, false
 157  	}
 158  
 159  	// Deserialize the individual JSON events from the decompressed blob
 160  	// Format: each event is newline-delimited JSON
 161  	serializedJSON = make([][]byte, 0, eventCount)
 162  	start := 0
 163  	for i := 0; i < len(decompressed); i++ {
 164  		if decompressed[i] == '\n' {
 165  			if i > start {
 166  				eventJSON := make([]byte, i-start)
 167  				copy(eventJSON, decompressed[start:i])
 168  				serializedJSON = append(serializedJSON, eventJSON)
 169  			}
 170  			start = i + 1
 171  		}
 172  	}
 173  	// Handle last event if no trailing newline
 174  	if start < len(decompressed) {
 175  		eventJSON := make([]byte, len(decompressed)-start)
 176  		copy(eventJSON, decompressed[start:])
 177  		serializedJSON = append(serializedJSON, eventJSON)
 178  	}
 179  
 180  	log.D.F("event cache HIT: filter=%s events=%d compressed=%d uncompressed=%d ratio=%.2f",
 181  		filterKey[:min(50, len(filterKey))], eventCount, compressedSize,
 182  		uncompressedSize, float64(uncompressedSize)/float64(compressedSize))
 183  
 184  	return serializedJSON, true
 185  }
 186  
 187  // PutJSON stores pre-marshaled JSON in the cache with ZSTD compression
 188  // This should be called AFTER events are sent to the client with the marshaled envelopes
 189  func (c *EventCache) PutJSON(f *filter.F, marshaledJSON [][]byte) {
 190  	if len(marshaledJSON) == 0 {
 191  		return
 192  	}
 193  
 194  	// Normalize filter by sorting to ensure consistent cache keys
 195  	f.Sort()
 196  	filterKey := string(f.Serialize())
 197  
 198  	// Concatenate all JSON events with newline delimiters for compression
 199  	totalSize := 0
 200  	for _, jsonData := range marshaledJSON {
 201  		totalSize += len(jsonData) + 1 // +1 for newline
 202  	}
 203  
 204  	uncompressed := make([]byte, 0, totalSize)
 205  	for _, jsonData := range marshaledJSON {
 206  		uncompressed = append(uncompressed, jsonData...)
 207  		uncompressed = append(uncompressed, '\n')
 208  	}
 209  
 210  	// Compress with ZSTD — encoder is not concurrent-safe, use dedicated lock
 211  	c.encoderMu.Lock()
 212  	compressed := c.encoder.EncodeAll(uncompressed, nil)
 213  	c.encoderMu.Unlock()
 214  	compressedSize := len(compressed)
 215  
 216  	// Don't cache if compressed size is still too large
 217  	if int64(compressedSize) > c.maxSize {
 218  		log.W.F("event cache: compressed entry too large: %d bytes", compressedSize)
 219  		return
 220  	}
 221  
 222  	c.mu.Lock()
 223  	defer c.mu.Unlock()
 224  
 225  	// Check if already exists
 226  	if existing, exists := c.entries[filterKey]; exists {
 227  		c.currentSize -= int64(existing.CompressedSize)
 228  		existing.CompressedData = compressed
 229  		existing.UncompressedSize = totalSize
 230  		existing.CompressedSize = compressedSize
 231  		existing.EventCount = len(marshaledJSON)
 232  		existing.LastAccess = time.Now()
 233  		existing.CreatedAt = time.Now()
 234  		c.currentSize += int64(compressedSize)
 235  		c.lruList.MoveToFront(existing.listElement)
 236  		c.updateCompressionRatio(totalSize, compressedSize)
 237  		log.T.F("event cache UPDATE: filter=%s events=%d ratio=%.2f",
 238  			filterKey[:min(50, len(filterKey))], len(marshaledJSON),
 239  			float64(totalSize)/float64(compressedSize))
 240  		return
 241  	}
 242  
 243  	// Evict if necessary
 244  	evictionCount := 0
 245  	for c.currentSize+int64(compressedSize) > c.maxSize && c.lruList.Len() > 0 {
 246  		oldest := c.lruList.Back()
 247  		if oldest != nil {
 248  			oldEntry := oldest.Value.(*EventCacheEntry)
 249  			c.removeEntry(oldEntry)
 250  			c.evictions++
 251  			evictionCount++
 252  		}
 253  	}
 254  
 255  	// Trigger compaction if we evicted entries
 256  	if evictionCount > 0 {
 257  		c.needsCompaction = true
 258  		select {
 259  		case c.compactionChan <- struct{}{}:
 260  		default:
 261  			// Channel already has signal, compaction will run
 262  		}
 263  	}
 264  
 265  	// Create new entry
 266  	entry := &EventCacheEntry{
 267  		FilterKey:        filterKey,
 268  		CompressedData:   compressed,
 269  		UncompressedSize: totalSize,
 270  		CompressedSize:   compressedSize,
 271  		EventCount:       len(marshaledJSON),
 272  		LastAccess:       time.Now(),
 273  		CreatedAt:        time.Now(),
 274  	}
 275  
 276  	entry.listElement = c.lruList.PushFront(entry)
 277  	c.entries[filterKey] = entry
 278  	c.currentSize += int64(compressedSize)
 279  	c.updateCompressionRatio(totalSize, compressedSize)
 280  
 281  	log.D.F("event cache PUT: filter=%s events=%d uncompressed=%d compressed=%d ratio=%.2f total=%d/%d",
 282  		filterKey[:min(50, len(filterKey))], len(marshaledJSON), totalSize, compressedSize,
 283  		float64(totalSize)/float64(compressedSize), c.currentSize, c.maxSize)
 284  }
 285  
 286  // updateCompressionRatio updates the rolling average compression ratio
 287  func (c *EventCache) updateCompressionRatio(uncompressed, compressed int) {
 288  	if compressed == 0 {
 289  		return
 290  	}
 291  	newRatio := float64(uncompressed) / float64(compressed)
 292  	// Use exponential moving average
 293  	if c.compressionRatio == 0 {
 294  		c.compressionRatio = newRatio
 295  	} else {
 296  		c.compressionRatio = 0.9*c.compressionRatio + 0.1*newRatio
 297  	}
 298  }
 299  
 300  // Invalidate clears all entries (called when new events are stored)
 301  func (c *EventCache) Invalidate() {
 302  	c.mu.Lock()
 303  	defer c.mu.Unlock()
 304  
 305  	if len(c.entries) > 0 {
 306  		cleared := len(c.entries)
 307  		c.entries = make(map[string]*EventCacheEntry)
 308  		c.lruList = list.New()
 309  		c.currentSize = 0
 310  		c.invalidations += uint64(cleared)
 311  		log.T.F("event cache INVALIDATE: cleared %d entries", cleared)
 312  	}
 313  }
 314  
 315  // removeEntry removes an entry (must be called with lock held)
 316  func (c *EventCache) removeEntry(entry *EventCacheEntry) {
 317  	delete(c.entries, entry.FilterKey)
 318  	c.lruList.Remove(entry.listElement)
 319  	c.currentSize -= int64(entry.CompressedSize)
 320  }
 321  
 322  // compactionWorker runs in the background and compacts cache entries after evictions
 323  // to reclaim fragmented space and improve cache efficiency
 324  func (c *EventCache) compactionWorker() {
 325  	for {
 326  		select {
 327  		case <-c.stopCh:
 328  			return
 329  		case _, ok := <-c.compactionChan:
 330  			if !ok {
 331  				return
 332  			}
 333  		}
 334  
 335  		c.mu.Lock()
 336  		if !c.needsCompaction {
 337  			c.mu.Unlock()
 338  			continue
 339  		}
 340  
 341  		log.D.F("cache compaction: starting (entries=%d size=%d/%d)",
 342  			len(c.entries), c.currentSize, c.maxSize)
 343  
 344  		c.needsCompaction = false
 345  		c.compactionRuns++
 346  		c.mu.Unlock()
 347  
 348  		log.D.F("cache compaction: completed (runs=%d)", c.compactionRuns)
 349  	}
 350  }
 351  
 352  // cleanupExpired removes expired entries periodically
 353  func (c *EventCache) cleanupExpired() {
 354  	ticker := time.NewTicker(1 * time.Minute)
 355  	defer ticker.Stop()
 356  
 357  	for {
 358  		select {
 359  		case <-c.stopCh:
 360  			return
 361  		case <-ticker.C:
 362  		}
 363  
 364  		c.mu.Lock()
 365  		now := time.Now()
 366  		var toRemove []*EventCacheEntry
 367  
 368  		for _, entry := range c.entries {
 369  			if now.Sub(entry.CreatedAt) > c.maxAge {
 370  				toRemove = append(toRemove, entry)
 371  			}
 372  		}
 373  
 374  		for _, entry := range toRemove {
 375  			c.removeEntry(entry)
 376  		}
 377  
 378  		if len(toRemove) > 0 {
 379  			log.D.F("event cache cleanup: removed %d expired entries", len(toRemove))
 380  		}
 381  
 382  		c.mu.Unlock()
 383  	}
 384  }
 385  
 386  // CacheStats holds cache performance metrics
 387  type CacheStats struct {
 388  	Entries          int
 389  	CurrentSize      int64   // Compressed size
 390  	MaxSize          int64
 391  	Hits             uint64
 392  	Misses           uint64
 393  	HitRate          float64
 394  	Evictions        uint64
 395  	Invalidations    uint64
 396  	CompressionRatio float64 // Average compression ratio
 397  	CompactionRuns   uint64
 398  }
 399  
 400  // Stats returns cache statistics
 401  func (c *EventCache) Stats() CacheStats {
 402  	c.mu.RLock()
 403  	defer c.mu.RUnlock()
 404  
 405  	total := c.hits + c.misses
 406  	hitRate := 0.0
 407  	if total > 0 {
 408  		hitRate = float64(c.hits) / float64(total)
 409  	}
 410  
 411  	return CacheStats{
 412  		Entries:          len(c.entries),
 413  		CurrentSize:      c.currentSize,
 414  		MaxSize:          c.maxSize,
 415  		Hits:             c.hits,
 416  		Misses:           c.misses,
 417  		HitRate:          hitRate,
 418  		Evictions:        c.evictions,
 419  		Invalidations:    c.invalidations,
 420  		CompressionRatio: c.compressionRatio,
 421  		CompactionRuns:   c.compactionRuns,
 422  	}
 423  }
 424  
 425  func min(a, b int) int {
 426  	if a < b {
 427  		return a
 428  	}
 429  	return b
 430  }
 431  
 432  // GetEvents retrieves cached events for a filter (decompresses and deserializes on the fly)
 433  // This is the new method that returns event.E objects instead of marshaled JSON
 434  func (c *EventCache) GetEvents(f *filter.F) (events []*event.E, found bool) {
 435  	// Normalize filter by sorting to ensure consistent cache keys
 436  	f.Sort()
 437  	filterKey := string(f.Serialize())
 438  
 439  	c.mu.Lock()
 440  	entry, exists := c.entries[filterKey]
 441  	if !exists {
 442  		c.misses++
 443  		c.mu.Unlock()
 444  		return nil, false
 445  	}
 446  
 447  	// Check if entry is expired
 448  	if time.Since(entry.CreatedAt) > c.maxAge {
 449  		c.removeEntry(entry)
 450  		c.misses++
 451  		c.mu.Unlock()
 452  		return nil, false
 453  	}
 454  
 455  	// Copy compressed data under lock so eviction can't free it
 456  	compressedCopy := make([]byte, len(entry.CompressedData))
 457  	copy(compressedCopy, entry.CompressedData)
 458  	eventCount := entry.EventCount
 459  	compressedSize := entry.CompressedSize
 460  	uncompressedSize := entry.UncompressedSize
 461  
 462  	// Update access time and move to front
 463  	entry.LastAccess = time.Now()
 464  	c.lruList.MoveToFront(entry.listElement)
 465  	c.hits++
 466  	c.mu.Unlock()
 467  
 468  	// Decompress outside lock — decoder is safe for concurrent use
 469  	decompressed, err := c.decoder.DecodeAll(compressedCopy, nil)
 470  	if err != nil {
 471  		log.E.F("failed to decompress cached events: %v", err)
 472  		return nil, false
 473  	}
 474  
 475  	// Deserialize events from newline-delimited JSON
 476  	events = make([]*event.E, 0, eventCount)
 477  	start := 0
 478  	for i, b := range decompressed {
 479  		if b == '\n' {
 480  			if i > start {
 481  				ev := event.New()
 482  				if _, err := ev.Unmarshal(decompressed[start:i]); err != nil {
 483  					log.E.F("failed to unmarshal cached event: %v", err)
 484  					return nil, false
 485  				}
 486  				events = append(events, ev)
 487  			}
 488  			start = i + 1
 489  		}
 490  	}
 491  
 492  	// Handle last event if no trailing newline
 493  	if start < len(decompressed) {
 494  		ev := event.New()
 495  		if _, err := ev.Unmarshal(decompressed[start:]); err != nil {
 496  			log.E.F("failed to unmarshal cached event: %v", err)
 497  			return nil, false
 498  		}
 499  		events = append(events, ev)
 500  	}
 501  
 502  	log.D.F("event cache HIT: filter=%s events=%d compressed=%d uncompressed=%d ratio=%.2f",
 503  		filterKey[:min(50, len(filterKey))], eventCount, compressedSize,
 504  		uncompressedSize, float64(uncompressedSize)/float64(compressedSize))
 505  
 506  	return events, true
 507  }
 508  
 509  // PutEvents stores events in the cache with ZSTD compression
 510  // This should be called AFTER events are sent to the client
 511  func (c *EventCache) PutEvents(f *filter.F, events []*event.E) {
 512  	if len(events) == 0 {
 513  		return
 514  	}
 515  
 516  	// Normalize filter by sorting to ensure consistent cache keys
 517  	f.Sort()
 518  	filterKey := string(f.Serialize())
 519  
 520  	// Serialize all events as newline-delimited JSON for compression
 521  	totalSize := 0
 522  	for _, ev := range events {
 523  		totalSize += ev.EstimateSize() + 1 // +1 for newline
 524  	}
 525  
 526  	uncompressed := make([]byte, 0, totalSize)
 527  	for _, ev := range events {
 528  		uncompressed = ev.Marshal(uncompressed)
 529  		uncompressed = append(uncompressed, '\n')
 530  	}
 531  
 532  	// Compress with ZSTD — encoder is not concurrent-safe, use dedicated lock
 533  	c.encoderMu.Lock()
 534  	compressed := c.encoder.EncodeAll(uncompressed, nil)
 535  	c.encoderMu.Unlock()
 536  	compressedSize := len(compressed)
 537  
 538  	// Don't cache if compressed size is still too large
 539  	if int64(compressedSize) > c.maxSize {
 540  		log.W.F("event cache: compressed entry too large: %d bytes", compressedSize)
 541  		return
 542  	}
 543  
 544  	c.mu.Lock()
 545  	defer c.mu.Unlock()
 546  
 547  	// Check if already exists
 548  	if existing, exists := c.entries[filterKey]; exists {
 549  		c.currentSize -= int64(existing.CompressedSize)
 550  		existing.CompressedData = compressed
 551  		existing.UncompressedSize = len(uncompressed)
 552  		existing.CompressedSize = compressedSize
 553  		existing.EventCount = len(events)
 554  		existing.LastAccess = time.Now()
 555  		existing.CreatedAt = time.Now()
 556  		c.currentSize += int64(compressedSize)
 557  		c.lruList.MoveToFront(existing.listElement)
 558  		c.updateCompressionRatio(len(uncompressed), compressedSize)
 559  		log.T.F("event cache UPDATE: filter=%s events=%d ratio=%.2f",
 560  			filterKey[:min(50, len(filterKey))], len(events),
 561  			float64(len(uncompressed))/float64(compressedSize))
 562  		return
 563  	}
 564  
 565  	// Evict if necessary
 566  	evictionCount := 0
 567  	for c.currentSize+int64(compressedSize) > c.maxSize && c.lruList.Len() > 0 {
 568  		oldest := c.lruList.Back()
 569  		if oldest != nil {
 570  			oldEntry := oldest.Value.(*EventCacheEntry)
 571  			c.removeEntry(oldEntry)
 572  			c.evictions++
 573  			evictionCount++
 574  		}
 575  	}
 576  
 577  	if evictionCount > 0 {
 578  		c.needsCompaction = true
 579  		select {
 580  		case c.compactionChan <- struct{}{}:
 581  		default:
 582  		}
 583  	}
 584  
 585  	// Create new entry
 586  	entry := &EventCacheEntry{
 587  		FilterKey:        filterKey,
 588  		CompressedData:   compressed,
 589  		UncompressedSize: len(uncompressed),
 590  		CompressedSize:   compressedSize,
 591  		EventCount:       len(events),
 592  		LastAccess:       time.Now(),
 593  		CreatedAt:        time.Now(),
 594  	}
 595  
 596  	entry.listElement = c.lruList.PushFront(entry)
 597  	c.entries[filterKey] = entry
 598  	c.currentSize += int64(compressedSize)
 599  	c.updateCompressionRatio(len(uncompressed), compressedSize)
 600  
 601  	log.D.F("event cache PUT: filter=%s events=%d uncompressed=%d compressed=%d ratio=%.2f total=%d/%d",
 602  		filterKey[:min(50, len(filterKey))], len(events), len(uncompressed), compressedSize,
 603  		float64(len(uncompressed))/float64(compressedSize), c.currentSize, c.maxSize)
 604  }
 605