1 //go:build !windows
2
3 package storage
4
5 // TODO: IMPORTANT - This GC implementation is EXPERIMENTAL and may cause crashes under high load.
6 // The current implementation has the following issues that need to be addressed:
7 //
8 // 1. Badger race condition: DeleteEventBySerial runs transactions that can trigger
9 // "assignment to entry in nil map" panics in Badger v4.8.0 under concurrent load.
10 // This happens when GC deletes events while many REQ queries are being processed.
11 //
12 // 2. Batch transaction handling: On large datasets (14+ GB), deletions should be:
13 // - Serialized or use a transaction pool to prevent concurrent txn issues
14 // - Batched with proper delays between batches to avoid overwhelming Badger
15 // - Rate-limited based on current system load
16 //
17 // 3. The current 10ms delay every 100 events (line ~237) is insufficient for busy relays.
18 // Consider adaptive rate limiting based on pending transaction count.
19 //
20 // 4. Consider using Badger's WriteBatch API instead of individual Update transactions
21 // for bulk deletions, which may be more efficient and avoid some race conditions.
22
23 import (
24 "context"
25 "sync"
26 "sync/atomic"
27 "time"
28
29 "next.orly.dev/pkg/lol/log"
30
31 "next.orly.dev/pkg/database/indexes/types"
32 "next.orly.dev/pkg/nostr/encoders/event"
33 )
34
35 // WoTProvider supplies WoT depth information for graph-weighted eviction.
36 // Returns the WoT depth tier for a pubkey serial:
37 // - 1 = direct follow (depth 1), 2 = depth 2, 3 = depth 3
38 // - 0 = outsider (not in WoT)
39 type WoTProvider interface {
40 GetDepthForGC(pubkeySerial uint64) int
41 }
42
43 // GCDatabase defines the interface for database operations needed by the GC.
44 type GCDatabase interface {
45 Path() string
46 FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error)
47 DeleteEventBySerial(ctx context.Context, ser *types.Uint40, ev *event.E) error
48 }
49
50 // AuthorLookup provides the ability to resolve event author pubkeys to serials.
51 type AuthorLookup interface {
52 FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error)
53 GetPubkeySerial(pubkey []byte) (*types.Uint40, error)
54 }
55
56 // GarbageCollector manages continuous event eviction based on access patterns.
57 // It monitors storage usage and evicts the least accessed events when the
58 // storage limit is exceeded.
59 //
60 // When a WoTProvider is configured, eviction scoring incorporates WoT depth:
61 // events from socially proximate authors survive eviction longer than events
62 // from outsiders. Kind 0 (profile), kind 3 (follow list), and kind 10002
63 // (relay list) events are immune from eviction.
64 type GarbageCollector struct {
65 ctx context.Context
66 cancel context.CancelFunc
67 db GCDatabase
68 tracker *AccessTracker
69
70 // Configuration
71 dataDir string
72 maxBytes int64 // 0 = auto-calculate
73 interval time.Duration
74 batchSize int
75 minAgeSec int64 // Minimum age before considering for eviction
76
77 // WoT integration (optional, nil = flat scoring)
78 wotProvider WoTProvider
79 authorLookup AuthorLookup
80
81 // State
82 mu sync.Mutex
83 running bool
84 evictedCount uint64
85 lastRun time.Time
86 }
87
88 // GCConfig holds configuration for the garbage collector.
89 type GCConfig struct {
90 MaxStorageBytes int64 // 0 = auto-calculate (80% of filesystem)
91 Interval time.Duration // How often to check storage
92 BatchSize int // Events to consider per GC run
93 MinAgeSec int64 // Minimum age before eviction (default: 1 hour)
94 WoTProvider WoTProvider // Optional WoT depth provider for graph-weighted eviction
95 AuthorLookup AuthorLookup // Required if WoTProvider is set
96 }
97
98 // DefaultGCConfig returns a default GC configuration.
99 func DefaultGCConfig() GCConfig {
100 return GCConfig{
101 MaxStorageBytes: 0, // Auto-detect
102 Interval: time.Minute, // Check every minute
103 BatchSize: 1000, // 1000 events per run
104 MinAgeSec: 3600, // 1 hour minimum age
105 }
106 }
107
108 // NewGarbageCollector creates a new garbage collector.
109 func NewGarbageCollector(
110 ctx context.Context,
111 db GCDatabase,
112 tracker *AccessTracker,
113 cfg GCConfig,
114 ) *GarbageCollector {
115 gcCtx, cancel := context.WithCancel(ctx)
116
117 if cfg.BatchSize <= 0 {
118 cfg.BatchSize = 1000
119 }
120 if cfg.Interval <= 0 {
121 cfg.Interval = time.Minute
122 }
123 if cfg.MinAgeSec <= 0 {
124 cfg.MinAgeSec = 3600 // 1 hour
125 }
126
127 return &GarbageCollector{
128 ctx: gcCtx,
129 cancel: cancel,
130 db: db,
131 tracker: tracker,
132 dataDir: db.Path(),
133 maxBytes: cfg.MaxStorageBytes,
134 interval: cfg.Interval,
135 batchSize: cfg.BatchSize,
136 minAgeSec: cfg.MinAgeSec,
137 wotProvider: cfg.WoTProvider,
138 authorLookup: cfg.AuthorLookup,
139 }
140 }
141
142 // Start begins the garbage collection loop.
143 func (gc *GarbageCollector) Start() {
144 gc.mu.Lock()
145 if gc.running {
146 gc.mu.Unlock()
147 return
148 }
149 gc.running = true
150 gc.mu.Unlock()
151
152 go gc.runLoop()
153 log.I.F("garbage collector started (interval: %s, batch: %d)", gc.interval, gc.batchSize)
154 }
155
156 // Stop stops the garbage collector.
157 func (gc *GarbageCollector) Stop() {
158 gc.cancel()
159 gc.mu.Lock()
160 gc.running = false
161 gc.mu.Unlock()
162 log.I.F("garbage collector stopped (total evicted: %d)", atomic.LoadUint64(&gc.evictedCount))
163 }
164
165 // runLoop is the main GC loop.
166 func (gc *GarbageCollector) runLoop() {
167 ticker := time.NewTicker(gc.interval)
168 defer ticker.Stop()
169
170 for {
171 select {
172 case <-gc.ctx.Done():
173 return
174 case <-ticker.C:
175 if err := gc.runCycle(); err != nil {
176 log.W.F("GC cycle error: %v", err)
177 }
178 }
179 }
180 }
181
182 // runCycle executes one garbage collection cycle.
183 func (gc *GarbageCollector) runCycle() error {
184 gc.mu.Lock()
185 gc.lastRun = time.Now()
186 gc.mu.Unlock()
187
188 // Check if we need to run GC
189 shouldRun, currentBytes, maxBytes, err := gc.shouldRunGC()
190 if err != nil {
191 return err
192 }
193
194 if !shouldRun {
195 return nil
196 }
197
198 log.D.F("GC triggered: current=%d MB, max=%d MB (%.1f%%)",
199 currentBytes/(1024*1024),
200 maxBytes/(1024*1024),
201 float64(currentBytes)/float64(maxBytes)*100)
202
203 // Get coldest events, using WoT-weighted scoring if available
204 var serials []uint64
205 if gc.wotProvider != nil && gc.authorLookup != nil {
206 serials, err = gc.tracker.GetColdestEventsWithWoT(
207 gc.batchSize, 5, gc.minAgeSec, gc.wotProvider, gc.authorLookup)
208 } else {
209 serials, err = gc.tracker.GetColdestEvents(gc.batchSize, gc.minAgeSec)
210 }
211 if err != nil {
212 return err
213 }
214
215 if len(serials) == 0 {
216 log.D.F("GC: no events eligible for eviction")
217 return nil
218 }
219
220 // Evict events
221 evicted, err := gc.evictEvents(serials)
222 if err != nil {
223 return err
224 }
225
226 atomic.AddUint64(&gc.evictedCount, uint64(evicted))
227 log.I.F("GC: evicted %d events (total: %d)", evicted, atomic.LoadUint64(&gc.evictedCount))
228
229 return nil
230 }
231
232 // shouldRunGC checks if storage limit is exceeded.
233 func (gc *GarbageCollector) shouldRunGC() (bool, int64, int64, error) {
234 // Calculate max storage (dynamic based on filesystem)
235 maxBytes, err := CalculateMaxStorage(gc.dataDir, gc.maxBytes)
236 if err != nil {
237 return false, 0, 0, err
238 }
239
240 // Get current usage
241 currentBytes, err := GetCurrentStorageUsage(gc.dataDir)
242 if err != nil {
243 return false, 0, 0, err
244 }
245
246 return currentBytes > maxBytes, currentBytes, maxBytes, nil
247 }
248
249 // evictEvents evicts the specified events from the database.
250 func (gc *GarbageCollector) evictEvents(serials []uint64) (int, error) {
251 evicted := 0
252
253 for _, serial := range serials {
254 // Check context for cancellation
255 select {
256 case <-gc.ctx.Done():
257 return evicted, gc.ctx.Err()
258 default:
259 }
260
261 // Convert serial to Uint40
262 ser := &types.Uint40{}
263 if err := ser.Set(serial); err != nil {
264 log.D.F("GC: invalid serial %d: %v", serial, err)
265 continue
266 }
267
268 // Fetch the event
269 ev, err := gc.db.FetchEventBySerial(ser)
270 if err != nil {
271 log.D.F("GC: failed to fetch event %d: %v", serial, err)
272 continue
273 }
274 if ev == nil {
275 continue // Already deleted
276 }
277
278 // Never evict structural kinds that define the social graph
279 if isImmuneKind(ev.Kind) {
280 continue
281 }
282
283 // Delete the event
284 if err := gc.db.DeleteEventBySerial(gc.ctx, ser, ev); err != nil {
285 log.D.F("GC: failed to delete event %d: %v", serial, err)
286 continue
287 }
288
289 evicted++
290
291 // Rate limit to avoid overwhelming the database
292 if evicted%100 == 0 {
293 time.Sleep(10 * time.Millisecond)
294 }
295 }
296
297 return evicted, nil
298 }
299
300 // Stats returns current GC statistics.
301 func (gc *GarbageCollector) Stats() GCStats {
302 gc.mu.Lock()
303 lastRun := gc.lastRun
304 running := gc.running
305 gc.mu.Unlock()
306
307 // Get storage info
308 currentBytes, _ := GetCurrentStorageUsage(gc.dataDir)
309 maxBytes, _ := CalculateMaxStorage(gc.dataDir, gc.maxBytes)
310
311 var percentage float64
312 if maxBytes > 0 {
313 percentage = float64(currentBytes) / float64(maxBytes) * 100
314 }
315
316 return GCStats{
317 Running: running,
318 LastRunTime: lastRun,
319 TotalEvicted: atomic.LoadUint64(&gc.evictedCount),
320 CurrentStorageBytes: currentBytes,
321 MaxStorageBytes: maxBytes,
322 StoragePercentage: percentage,
323 }
324 }
325
326 // isImmuneKind returns true for event kinds that must never be evicted.
327 // These structural kinds define the social graph and relay metadata.
328 func isImmuneKind(kind uint16) bool {
329 switch kind {
330 case 0: // Profile metadata (NIP-01)
331 return true
332 case 3: // Follow list (NIP-02)
333 return true
334 case 10002: // Relay list metadata (NIP-65)
335 return true
336 }
337 return false
338 }
339
340 // wotBonus returns the coldness score bonus (in seconds) for a WoT depth tier.
341 // Higher bonus = harder to evict. Depth 1 gets 30 days of protection,
342 // depth 2 gets 7 days, depth 3 gets 1 day, outsiders get nothing.
343 func wotBonus(depth int) int64 {
344 switch depth {
345 case 1:
346 return 2592000 // 30 days
347 case 2:
348 return 604800 // 7 days
349 case 3:
350 return 86400 // 1 day
351 default:
352 return 0
353 }
354 }
355
356 // GCStats holds garbage collector statistics.
357 type GCStats struct {
358 Running bool
359 LastRunTime time.Time
360 TotalEvicted uint64
361 CurrentStorageBytes int64
362 MaxStorageBytes int64
363 StoragePercentage float64
364 }
365