gc.go raw

   1  //go:build !windows
   2  
   3  package storage
   4  
   5  // TODO: IMPORTANT - This GC implementation is EXPERIMENTAL and may cause crashes under high load.
   6  // The current implementation has the following issues that need to be addressed:
   7  //
   8  // 1. Badger race condition: DeleteEventBySerial runs transactions that can trigger
   9  //    "assignment to entry in nil map" panics in Badger v4.8.0 under concurrent load.
  10  //    This happens when GC deletes events while many REQ queries are being processed.
  11  //
  12  // 2. Batch transaction handling: On large datasets (14+ GB), deletions should be:
  13  //    - Serialized or use a transaction pool to prevent concurrent txn issues
  14  //    - Batched with proper delays between batches to avoid overwhelming Badger
  15  //    - Rate-limited based on current system load
  16  //
  17  // 3. The current 10ms delay every 100 events (line ~237) is insufficient for busy relays.
  18  //    Consider adaptive rate limiting based on pending transaction count.
  19  //
  20  // 4. Consider using Badger's WriteBatch API instead of individual Update transactions
  21  //    for bulk deletions, which may be more efficient and avoid some race conditions.
  22  
  23  import (
  24  	"context"
  25  	"sync"
  26  	"sync/atomic"
  27  	"time"
  28  
  29  	"next.orly.dev/pkg/lol/log"
  30  
  31  	"next.orly.dev/pkg/database/indexes/types"
  32  	"next.orly.dev/pkg/nostr/encoders/event"
  33  )
  34  
  35  // WoTProvider supplies WoT depth information for graph-weighted eviction.
  36  // Returns the WoT depth tier for a pubkey serial:
  37  //   - 1 = direct follow (depth 1), 2 = depth 2, 3 = depth 3
  38  //   - 0 = outsider (not in WoT)
  39  type WoTProvider interface {
  40  	GetDepthForGC(pubkeySerial uint64) int
  41  }
  42  
  43  // GCDatabase defines the interface for database operations needed by the GC.
  44  type GCDatabase interface {
  45  	Path() string
  46  	FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error)
  47  	DeleteEventBySerial(ctx context.Context, ser *types.Uint40, ev *event.E) error
  48  }
  49  
  50  // AuthorLookup provides the ability to resolve event author pubkeys to serials.
  51  type AuthorLookup interface {
  52  	FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error)
  53  	GetPubkeySerial(pubkey []byte) (*types.Uint40, error)
  54  }
  55  
  56  // GarbageCollector manages continuous event eviction based on access patterns.
  57  // It monitors storage usage and evicts the least accessed events when the
  58  // storage limit is exceeded.
  59  //
  60  // When a WoTProvider is configured, eviction scoring incorporates WoT depth:
  61  // events from socially proximate authors survive eviction longer than events
  62  // from outsiders. Kind 0 (profile), kind 3 (follow list), and kind 10002
  63  // (relay list) events are immune from eviction.
  64  type GarbageCollector struct {
  65  	ctx    context.Context
  66  	cancel context.CancelFunc
  67  	db     GCDatabase
  68  	tracker *AccessTracker
  69  
  70  	// Configuration
  71  	dataDir   string
  72  	maxBytes  int64 // 0 = auto-calculate
  73  	interval  time.Duration
  74  	batchSize int
  75  	minAgeSec int64 // Minimum age before considering for eviction
  76  
  77  	// WoT integration (optional, nil = flat scoring)
  78  	wotProvider  WoTProvider
  79  	authorLookup AuthorLookup
  80  
  81  	// State
  82  	mu           sync.Mutex
  83  	running      bool
  84  	evictedCount uint64
  85  	lastRun      time.Time
  86  }
  87  
  88  // GCConfig holds configuration for the garbage collector.
  89  type GCConfig struct {
  90  	MaxStorageBytes int64         // 0 = auto-calculate (80% of filesystem)
  91  	Interval        time.Duration // How often to check storage
  92  	BatchSize       int           // Events to consider per GC run
  93  	MinAgeSec       int64         // Minimum age before eviction (default: 1 hour)
  94  	WoTProvider     WoTProvider   // Optional WoT depth provider for graph-weighted eviction
  95  	AuthorLookup    AuthorLookup  // Required if WoTProvider is set
  96  }
  97  
  98  // DefaultGCConfig returns a default GC configuration.
  99  func DefaultGCConfig() GCConfig {
 100  	return GCConfig{
 101  		MaxStorageBytes: 0,             // Auto-detect
 102  		Interval:        time.Minute,   // Check every minute
 103  		BatchSize:       1000,          // 1000 events per run
 104  		MinAgeSec:       3600,          // 1 hour minimum age
 105  	}
 106  }
 107  
 108  // NewGarbageCollector creates a new garbage collector.
 109  func NewGarbageCollector(
 110  	ctx context.Context,
 111  	db GCDatabase,
 112  	tracker *AccessTracker,
 113  	cfg GCConfig,
 114  ) *GarbageCollector {
 115  	gcCtx, cancel := context.WithCancel(ctx)
 116  
 117  	if cfg.BatchSize <= 0 {
 118  		cfg.BatchSize = 1000
 119  	}
 120  	if cfg.Interval <= 0 {
 121  		cfg.Interval = time.Minute
 122  	}
 123  	if cfg.MinAgeSec <= 0 {
 124  		cfg.MinAgeSec = 3600 // 1 hour
 125  	}
 126  
 127  	return &GarbageCollector{
 128  		ctx:          gcCtx,
 129  		cancel:       cancel,
 130  		db:           db,
 131  		tracker:      tracker,
 132  		dataDir:      db.Path(),
 133  		maxBytes:     cfg.MaxStorageBytes,
 134  		interval:     cfg.Interval,
 135  		batchSize:    cfg.BatchSize,
 136  		minAgeSec:    cfg.MinAgeSec,
 137  		wotProvider:  cfg.WoTProvider,
 138  		authorLookup: cfg.AuthorLookup,
 139  	}
 140  }
 141  
 142  // Start begins the garbage collection loop.
 143  func (gc *GarbageCollector) Start() {
 144  	gc.mu.Lock()
 145  	if gc.running {
 146  		gc.mu.Unlock()
 147  		return
 148  	}
 149  	gc.running = true
 150  	gc.mu.Unlock()
 151  
 152  	go gc.runLoop()
 153  	log.I.F("garbage collector started (interval: %s, batch: %d)", gc.interval, gc.batchSize)
 154  }
 155  
 156  // Stop stops the garbage collector.
 157  func (gc *GarbageCollector) Stop() {
 158  	gc.cancel()
 159  	gc.mu.Lock()
 160  	gc.running = false
 161  	gc.mu.Unlock()
 162  	log.I.F("garbage collector stopped (total evicted: %d)", atomic.LoadUint64(&gc.evictedCount))
 163  }
 164  
 165  // runLoop is the main GC loop.
 166  func (gc *GarbageCollector) runLoop() {
 167  	ticker := time.NewTicker(gc.interval)
 168  	defer ticker.Stop()
 169  
 170  	for {
 171  		select {
 172  		case <-gc.ctx.Done():
 173  			return
 174  		case <-ticker.C:
 175  			if err := gc.runCycle(); err != nil {
 176  				log.W.F("GC cycle error: %v", err)
 177  			}
 178  		}
 179  	}
 180  }
 181  
 182  // runCycle executes one garbage collection cycle.
 183  func (gc *GarbageCollector) runCycle() error {
 184  	gc.mu.Lock()
 185  	gc.lastRun = time.Now()
 186  	gc.mu.Unlock()
 187  
 188  	// Check if we need to run GC
 189  	shouldRun, currentBytes, maxBytes, err := gc.shouldRunGC()
 190  	if err != nil {
 191  		return err
 192  	}
 193  
 194  	if !shouldRun {
 195  		return nil
 196  	}
 197  
 198  	log.D.F("GC triggered: current=%d MB, max=%d MB (%.1f%%)",
 199  		currentBytes/(1024*1024),
 200  		maxBytes/(1024*1024),
 201  		float64(currentBytes)/float64(maxBytes)*100)
 202  
 203  	// Get coldest events, using WoT-weighted scoring if available
 204  	var serials []uint64
 205  	if gc.wotProvider != nil && gc.authorLookup != nil {
 206  		serials, err = gc.tracker.GetColdestEventsWithWoT(
 207  			gc.batchSize, 5, gc.minAgeSec, gc.wotProvider, gc.authorLookup)
 208  	} else {
 209  		serials, err = gc.tracker.GetColdestEvents(gc.batchSize, gc.minAgeSec)
 210  	}
 211  	if err != nil {
 212  		return err
 213  	}
 214  
 215  	if len(serials) == 0 {
 216  		log.D.F("GC: no events eligible for eviction")
 217  		return nil
 218  	}
 219  
 220  	// Evict events
 221  	evicted, err := gc.evictEvents(serials)
 222  	if err != nil {
 223  		return err
 224  	}
 225  
 226  	atomic.AddUint64(&gc.evictedCount, uint64(evicted))
 227  	log.I.F("GC: evicted %d events (total: %d)", evicted, atomic.LoadUint64(&gc.evictedCount))
 228  
 229  	return nil
 230  }
 231  
 232  // shouldRunGC checks if storage limit is exceeded.
 233  func (gc *GarbageCollector) shouldRunGC() (bool, int64, int64, error) {
 234  	// Calculate max storage (dynamic based on filesystem)
 235  	maxBytes, err := CalculateMaxStorage(gc.dataDir, gc.maxBytes)
 236  	if err != nil {
 237  		return false, 0, 0, err
 238  	}
 239  
 240  	// Get current usage
 241  	currentBytes, err := GetCurrentStorageUsage(gc.dataDir)
 242  	if err != nil {
 243  		return false, 0, 0, err
 244  	}
 245  
 246  	return currentBytes > maxBytes, currentBytes, maxBytes, nil
 247  }
 248  
 249  // evictEvents evicts the specified events from the database.
 250  func (gc *GarbageCollector) evictEvents(serials []uint64) (int, error) {
 251  	evicted := 0
 252  
 253  	for _, serial := range serials {
 254  		// Check context for cancellation
 255  		select {
 256  		case <-gc.ctx.Done():
 257  			return evicted, gc.ctx.Err()
 258  		default:
 259  		}
 260  
 261  		// Convert serial to Uint40
 262  		ser := &types.Uint40{}
 263  		if err := ser.Set(serial); err != nil {
 264  			log.D.F("GC: invalid serial %d: %v", serial, err)
 265  			continue
 266  		}
 267  
 268  		// Fetch the event
 269  		ev, err := gc.db.FetchEventBySerial(ser)
 270  		if err != nil {
 271  			log.D.F("GC: failed to fetch event %d: %v", serial, err)
 272  			continue
 273  		}
 274  		if ev == nil {
 275  			continue // Already deleted
 276  		}
 277  
 278  		// Never evict structural kinds that define the social graph
 279  		if isImmuneKind(ev.Kind) {
 280  			continue
 281  		}
 282  
 283  		// Delete the event
 284  		if err := gc.db.DeleteEventBySerial(gc.ctx, ser, ev); err != nil {
 285  			log.D.F("GC: failed to delete event %d: %v", serial, err)
 286  			continue
 287  		}
 288  
 289  		evicted++
 290  
 291  		// Rate limit to avoid overwhelming the database
 292  		if evicted%100 == 0 {
 293  			time.Sleep(10 * time.Millisecond)
 294  		}
 295  	}
 296  
 297  	return evicted, nil
 298  }
 299  
 300  // Stats returns current GC statistics.
 301  func (gc *GarbageCollector) Stats() GCStats {
 302  	gc.mu.Lock()
 303  	lastRun := gc.lastRun
 304  	running := gc.running
 305  	gc.mu.Unlock()
 306  
 307  	// Get storage info
 308  	currentBytes, _ := GetCurrentStorageUsage(gc.dataDir)
 309  	maxBytes, _ := CalculateMaxStorage(gc.dataDir, gc.maxBytes)
 310  
 311  	var percentage float64
 312  	if maxBytes > 0 {
 313  		percentage = float64(currentBytes) / float64(maxBytes) * 100
 314  	}
 315  
 316  	return GCStats{
 317  		Running:             running,
 318  		LastRunTime:         lastRun,
 319  		TotalEvicted:        atomic.LoadUint64(&gc.evictedCount),
 320  		CurrentStorageBytes: currentBytes,
 321  		MaxStorageBytes:     maxBytes,
 322  		StoragePercentage:   percentage,
 323  	}
 324  }
 325  
 326  // isImmuneKind returns true for event kinds that must never be evicted.
 327  // These structural kinds define the social graph and relay metadata.
 328  func isImmuneKind(kind uint16) bool {
 329  	switch kind {
 330  	case 0:     // Profile metadata (NIP-01)
 331  		return true
 332  	case 3:     // Follow list (NIP-02)
 333  		return true
 334  	case 10002: // Relay list metadata (NIP-65)
 335  		return true
 336  	}
 337  	return false
 338  }
 339  
 340  // wotBonus returns the coldness score bonus (in seconds) for a WoT depth tier.
 341  // Higher bonus = harder to evict. Depth 1 gets 30 days of protection,
 342  // depth 2 gets 7 days, depth 3 gets 1 day, outsiders get nothing.
 343  func wotBonus(depth int) int64 {
 344  	switch depth {
 345  	case 1:
 346  		return 2592000 // 30 days
 347  	case 2:
 348  		return 604800 // 7 days
 349  	case 3:
 350  		return 86400 // 1 day
 351  	default:
 352  		return 0
 353  	}
 354  }
 355  
 356  // GCStats holds garbage collector statistics.
 357  type GCStats struct {
 358  	Running             bool
 359  	LastRunTime         time.Time
 360  	TotalEvicted        uint64
 361  	CurrentStorageBytes int64
 362  	MaxStorageBytes     int64
 363  	StoragePercentage   float64
 364  }
 365