access_tracker.go raw
1 //go:build !windows
2
3 package storage
4
5 import (
6 "container/list"
7 "context"
8 "sort"
9 "sync"
10
11 "next.orly.dev/pkg/lol/log"
12
13 "next.orly.dev/pkg/database/indexes/types"
14 )
15
16 // AccessTrackerDatabase defines the interface for the underlying database
17 // that stores access tracking information.
18 type AccessTrackerDatabase interface {
19 RecordEventAccess(serial uint64, connectionID string) error
20 GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error)
21 GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error)
22 }
23
24 // accessKey is the composite key for deduplication: serial + connectionID
25 type accessKey struct {
26 Serial uint64
27 ConnectionID string
28 }
29
30 // AccessTracker tracks event access patterns with session deduplication.
31 // It maintains an in-memory cache to deduplicate accesses from the same
32 // connection, reducing database writes while ensuring unique session counting.
33 type AccessTracker struct {
34 db AccessTrackerDatabase
35
36 // Deduplication cache: tracks which (serial, connectionID) pairs
37 // have already been recorded in this session window
38 mu sync.RWMutex
39 seen map[accessKey]struct{}
40 seenOrder *list.List // LRU order for eviction
41 seenElements map[accessKey]*list.Element
42 maxSeen int // Maximum entries in dedup cache
43
44 // Flush interval for stats
45 ctx context.Context
46 cancel context.CancelFunc
47 }
48
49 // NewAccessTracker creates a new access tracker.
50 // maxSeenEntries controls the size of the deduplication cache.
51 func NewAccessTracker(db AccessTrackerDatabase, maxSeenEntries int) *AccessTracker {
52 if maxSeenEntries <= 0 {
53 maxSeenEntries = 100000 // Default: 100k entries
54 }
55
56 ctx, cancel := context.WithCancel(context.Background())
57
58 return &AccessTracker{
59 db: db,
60 seen: make(map[accessKey]struct{}),
61 seenOrder: list.New(),
62 seenElements: make(map[accessKey]*list.Element),
63 maxSeen: maxSeenEntries,
64 ctx: ctx,
65 cancel: cancel,
66 }
67 }
68
69 // RecordAccess records an access to an event by a connection.
70 // Deduplicates accesses from the same connection within the cache window.
71 // Returns true if this was a new access, false if deduplicated.
72 func (t *AccessTracker) RecordAccess(serial uint64, connectionID string) (bool, error) {
73 key := accessKey{Serial: serial, ConnectionID: connectionID}
74
75 t.mu.Lock()
76 // Check if already seen
77 if _, exists := t.seen[key]; exists {
78 // Move to front (most recent)
79 if elem, ok := t.seenElements[key]; ok {
80 t.seenOrder.MoveToFront(elem)
81 }
82 t.mu.Unlock()
83 return false, nil // Deduplicated
84 }
85
86 // Evict oldest if at capacity
87 if len(t.seen) >= t.maxSeen {
88 oldest := t.seenOrder.Back()
89 if oldest != nil {
90 oldKey := oldest.Value.(accessKey)
91 delete(t.seen, oldKey)
92 delete(t.seenElements, oldKey)
93 t.seenOrder.Remove(oldest)
94 }
95 }
96
97 // Add to cache
98 t.seen[key] = struct{}{}
99 elem := t.seenOrder.PushFront(key)
100 t.seenElements[key] = elem
101 t.mu.Unlock()
102
103 // Record to database
104 if err := t.db.RecordEventAccess(serial, connectionID); err != nil {
105 return true, err
106 }
107
108 return true, nil
109 }
110
111 // GetAccessInfo returns the access information for an event.
112 func (t *AccessTracker) GetAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) {
113 return t.db.GetEventAccessInfo(serial)
114 }
115
116 // GetColdestEvents returns event serials sorted by coldness.
117 // limit: max events to return
118 // minAgeSec: minimum age in seconds since last access
119 func (t *AccessTracker) GetColdestEvents(limit int, minAgeSec int64) ([]uint64, error) {
120 return t.db.GetLeastAccessedEvents(limit, minAgeSec)
121 }
122
123 // GetColdestEventsWithWoT returns event serials sorted by WoT-weighted coldness.
124 // It overfetches candidates from the database, resolves each event's author to a
125 // WoT depth, applies a depth bonus to the coldness score, filters out immune kinds,
126 // re-sorts, and returns the coldest `limit` events.
127 //
128 // candidateMultiplier controls overfetch ratio (e.g., 5 = fetch 5x limit candidates).
129 func (t *AccessTracker) GetColdestEventsWithWoT(
130 limit, candidateMultiplier int,
131 minAgeSec int64,
132 wot WoTProvider,
133 authorDB AuthorLookup,
134 ) ([]uint64, error) {
135 if candidateMultiplier <= 1 {
136 candidateMultiplier = 5
137 }
138
139 // Overfetch raw candidates
140 candidates, err := t.db.GetLeastAccessedEvents(limit*candidateMultiplier, minAgeSec)
141 if err != nil {
142 return nil, err
143 }
144
145 type scored struct {
146 serial uint64
147 score int64
148 }
149
150 var results []scored
151 for _, serial := range candidates {
152 ser := &types.Uint40{}
153 if err := ser.Set(serial); err != nil {
154 continue
155 }
156
157 ev, err := authorDB.FetchEventBySerial(ser)
158 if err != nil || ev == nil {
159 continue
160 }
161
162 // Skip immune kinds
163 if isImmuneKind(ev.Kind) {
164 continue
165 }
166
167 // Get access info for scoring
168 lastAccess, accessCount, err := t.db.GetEventAccessInfo(serial)
169 if err != nil {
170 continue
171 }
172
173 // Resolve author WoT depth
174 depth := 0
175 if pkSerial, err := authorDB.GetPubkeySerial(ev.Pubkey); err == nil {
176 depth = wot.GetDepthForGC(pkSerial.Get())
177 }
178
179 // WoT-weighted coldness: higher score = warmer = keep longer
180 score := lastAccess + int64(accessCount)*3600 + wotBonus(depth)
181 results = append(results, scored{serial, score})
182 }
183
184 // Sort by score ascending (coldest first)
185 sort.Slice(results, func(i, j int) bool {
186 return results[i].score < results[j].score
187 })
188
189 // Return up to limit
190 out := make([]uint64, 0, limit)
191 for i := 0; i < len(results) && i < limit; i++ {
192 out = append(out, results[i].serial)
193 }
194 return out, nil
195 }
196
197 // ClearConnection removes all dedup entries for a specific connection.
198 // Call this when a connection closes to free up cache space.
199 func (t *AccessTracker) ClearConnection(connectionID string) {
200 t.mu.Lock()
201 defer t.mu.Unlock()
202
203 // Find and remove all entries for this connection
204 for key, elem := range t.seenElements {
205 if key.ConnectionID == connectionID {
206 delete(t.seen, key)
207 delete(t.seenElements, key)
208 t.seenOrder.Remove(elem)
209 }
210 }
211 }
212
213 // Stats returns current cache statistics.
214 func (t *AccessTracker) Stats() AccessTrackerStats {
215 t.mu.RLock()
216 defer t.mu.RUnlock()
217
218 return AccessTrackerStats{
219 CachedEntries: len(t.seen),
220 MaxEntries: t.maxSeen,
221 }
222 }
223
224 // AccessTrackerStats holds access tracker statistics.
225 type AccessTrackerStats struct {
226 CachedEntries int
227 MaxEntries int
228 }
229
230 // Start starts any background goroutines for the tracker.
231 // Currently a no-op but provided for future use.
232 func (t *AccessTracker) Start() {
233 log.I.F("access tracker started with %d max dedup entries", t.maxSeen)
234 }
235
236 // Stop stops the access tracker and releases resources.
237 func (t *AccessTracker) Stop() {
238 t.cancel()
239 log.I.F("access tracker stopped")
240 }
241