access_tracker.go raw

   1  //go:build !windows
   2  
   3  package storage
   4  
   5  import (
   6  	"container/list"
   7  	"context"
   8  	"sort"
   9  	"sync"
  10  
  11  	"next.orly.dev/pkg/lol/log"
  12  
  13  	"next.orly.dev/pkg/database/indexes/types"
  14  )
  15  
  16  // AccessTrackerDatabase defines the interface for the underlying database
  17  // that stores access tracking information.
  18  type AccessTrackerDatabase interface {
  19  	RecordEventAccess(serial uint64, connectionID string) error
  20  	GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error)
  21  	GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error)
  22  }
  23  
  24  // accessKey is the composite key for deduplication: serial + connectionID
  25  type accessKey struct {
  26  	Serial       uint64
  27  	ConnectionID string
  28  }
  29  
  30  // AccessTracker tracks event access patterns with session deduplication.
  31  // It maintains an in-memory cache to deduplicate accesses from the same
  32  // connection, reducing database writes while ensuring unique session counting.
  33  type AccessTracker struct {
  34  	db AccessTrackerDatabase
  35  
  36  	// Deduplication cache: tracks which (serial, connectionID) pairs
  37  	// have already been recorded in this session window
  38  	mu           sync.RWMutex
  39  	seen         map[accessKey]struct{}
  40  	seenOrder    *list.List // LRU order for eviction
  41  	seenElements map[accessKey]*list.Element
  42  	maxSeen      int // Maximum entries in dedup cache
  43  
  44  	// Flush interval for stats
  45  	ctx    context.Context
  46  	cancel context.CancelFunc
  47  }
  48  
  49  // NewAccessTracker creates a new access tracker.
  50  // maxSeenEntries controls the size of the deduplication cache.
  51  func NewAccessTracker(db AccessTrackerDatabase, maxSeenEntries int) *AccessTracker {
  52  	if maxSeenEntries <= 0 {
  53  		maxSeenEntries = 100000 // Default: 100k entries
  54  	}
  55  
  56  	ctx, cancel := context.WithCancel(context.Background())
  57  
  58  	return &AccessTracker{
  59  		db:           db,
  60  		seen:         make(map[accessKey]struct{}),
  61  		seenOrder:    list.New(),
  62  		seenElements: make(map[accessKey]*list.Element),
  63  		maxSeen:      maxSeenEntries,
  64  		ctx:          ctx,
  65  		cancel:       cancel,
  66  	}
  67  }
  68  
  69  // RecordAccess records an access to an event by a connection.
  70  // Deduplicates accesses from the same connection within the cache window.
  71  // Returns true if this was a new access, false if deduplicated.
  72  func (t *AccessTracker) RecordAccess(serial uint64, connectionID string) (bool, error) {
  73  	key := accessKey{Serial: serial, ConnectionID: connectionID}
  74  
  75  	t.mu.Lock()
  76  	// Check if already seen
  77  	if _, exists := t.seen[key]; exists {
  78  		// Move to front (most recent)
  79  		if elem, ok := t.seenElements[key]; ok {
  80  			t.seenOrder.MoveToFront(elem)
  81  		}
  82  		t.mu.Unlock()
  83  		return false, nil // Deduplicated
  84  	}
  85  
  86  	// Evict oldest if at capacity
  87  	if len(t.seen) >= t.maxSeen {
  88  		oldest := t.seenOrder.Back()
  89  		if oldest != nil {
  90  			oldKey := oldest.Value.(accessKey)
  91  			delete(t.seen, oldKey)
  92  			delete(t.seenElements, oldKey)
  93  			t.seenOrder.Remove(oldest)
  94  		}
  95  	}
  96  
  97  	// Add to cache
  98  	t.seen[key] = struct{}{}
  99  	elem := t.seenOrder.PushFront(key)
 100  	t.seenElements[key] = elem
 101  	t.mu.Unlock()
 102  
 103  	// Record to database
 104  	if err := t.db.RecordEventAccess(serial, connectionID); err != nil {
 105  		return true, err
 106  	}
 107  
 108  	return true, nil
 109  }
 110  
 111  // GetAccessInfo returns the access information for an event.
 112  func (t *AccessTracker) GetAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) {
 113  	return t.db.GetEventAccessInfo(serial)
 114  }
 115  
 116  // GetColdestEvents returns event serials sorted by coldness.
 117  // limit: max events to return
 118  // minAgeSec: minimum age in seconds since last access
 119  func (t *AccessTracker) GetColdestEvents(limit int, minAgeSec int64) ([]uint64, error) {
 120  	return t.db.GetLeastAccessedEvents(limit, minAgeSec)
 121  }
 122  
 123  // GetColdestEventsWithWoT returns event serials sorted by WoT-weighted coldness.
 124  // It overfetches candidates from the database, resolves each event's author to a
 125  // WoT depth, applies a depth bonus to the coldness score, filters out immune kinds,
 126  // re-sorts, and returns the coldest `limit` events.
 127  //
 128  // candidateMultiplier controls overfetch ratio (e.g., 5 = fetch 5x limit candidates).
 129  func (t *AccessTracker) GetColdestEventsWithWoT(
 130  	limit, candidateMultiplier int,
 131  	minAgeSec int64,
 132  	wot WoTProvider,
 133  	authorDB AuthorLookup,
 134  ) ([]uint64, error) {
 135  	if candidateMultiplier <= 1 {
 136  		candidateMultiplier = 5
 137  	}
 138  
 139  	// Overfetch raw candidates
 140  	candidates, err := t.db.GetLeastAccessedEvents(limit*candidateMultiplier, minAgeSec)
 141  	if err != nil {
 142  		return nil, err
 143  	}
 144  
 145  	type scored struct {
 146  		serial uint64
 147  		score  int64
 148  	}
 149  
 150  	var results []scored
 151  	for _, serial := range candidates {
 152  		ser := &types.Uint40{}
 153  		if err := ser.Set(serial); err != nil {
 154  			continue
 155  		}
 156  
 157  		ev, err := authorDB.FetchEventBySerial(ser)
 158  		if err != nil || ev == nil {
 159  			continue
 160  		}
 161  
 162  		// Skip immune kinds
 163  		if isImmuneKind(ev.Kind) {
 164  			continue
 165  		}
 166  
 167  		// Get access info for scoring
 168  		lastAccess, accessCount, err := t.db.GetEventAccessInfo(serial)
 169  		if err != nil {
 170  			continue
 171  		}
 172  
 173  		// Resolve author WoT depth
 174  		depth := 0
 175  		if pkSerial, err := authorDB.GetPubkeySerial(ev.Pubkey); err == nil {
 176  			depth = wot.GetDepthForGC(pkSerial.Get())
 177  		}
 178  
 179  		// WoT-weighted coldness: higher score = warmer = keep longer
 180  		score := lastAccess + int64(accessCount)*3600 + wotBonus(depth)
 181  		results = append(results, scored{serial, score})
 182  	}
 183  
 184  	// Sort by score ascending (coldest first)
 185  	sort.Slice(results, func(i, j int) bool {
 186  		return results[i].score < results[j].score
 187  	})
 188  
 189  	// Return up to limit
 190  	out := make([]uint64, 0, limit)
 191  	for i := 0; i < len(results) && i < limit; i++ {
 192  		out = append(out, results[i].serial)
 193  	}
 194  	return out, nil
 195  }
 196  
 197  // ClearConnection removes all dedup entries for a specific connection.
 198  // Call this when a connection closes to free up cache space.
 199  func (t *AccessTracker) ClearConnection(connectionID string) {
 200  	t.mu.Lock()
 201  	defer t.mu.Unlock()
 202  
 203  	// Find and remove all entries for this connection
 204  	for key, elem := range t.seenElements {
 205  		if key.ConnectionID == connectionID {
 206  			delete(t.seen, key)
 207  			delete(t.seenElements, key)
 208  			t.seenOrder.Remove(elem)
 209  		}
 210  	}
 211  }
 212  
 213  // Stats returns current cache statistics.
 214  func (t *AccessTracker) Stats() AccessTrackerStats {
 215  	t.mu.RLock()
 216  	defer t.mu.RUnlock()
 217  
 218  	return AccessTrackerStats{
 219  		CachedEntries: len(t.seen),
 220  		MaxEntries:    t.maxSeen,
 221  	}
 222  }
 223  
 224  // AccessTrackerStats holds access tracker statistics.
 225  type AccessTrackerStats struct {
 226  	CachedEntries int
 227  	MaxEntries    int
 228  }
 229  
 230  // Start starts any background goroutines for the tracker.
 231  // Currently a no-op but provided for future use.
 232  func (t *AccessTracker) Start() {
 233  	log.I.F("access tracker started with %d max dedup entries", t.maxSeen)
 234  }
 235  
 236  // Stop stops the access tracker and releases resources.
 237  func (t *AccessTracker) Stop() {
 238  	t.cancel()
 239  	log.I.F("access tracker stopped")
 240  }
 241