badger_monitor.go raw

   1  //go:build !(js && wasm)
   2  
   3  package ratelimit
   4  
   5  import (
   6  	"sync"
   7  	"sync/atomic"
   8  	"time"
   9  
  10  	"github.com/dgraph-io/badger/v4"
  11  	"next.orly.dev/pkg/lol/log"
  12  	"next.orly.dev/pkg/interfaces/loadmonitor"
  13  )
  14  
  15  // BadgerMonitor implements loadmonitor.Monitor for the Badger database.
  16  // It collects metrics from Badger's LSM tree, caches, and actual process memory.
  17  // It also implements CompactableMonitor and EmergencyModeMonitor interfaces.
  18  type BadgerMonitor struct {
  19  	db *badger.DB
  20  
  21  	// Target memory for pressure calculation
  22  	targetMemoryBytes atomic.Uint64
  23  
  24  	// Emergency mode configuration
  25  	emergencyThreshold atomic.Uint64 // stored as threshold * 1000 (e.g., 1500 = 1.5)
  26  	emergencyModeUntil atomic.Int64  // Unix nano when forced emergency mode ends
  27  	inEmergencyMode    atomic.Bool
  28  
  29  	// Compaction state
  30  	isCompacting atomic.Bool
  31  
  32  	// Latency tracking with exponential moving average
  33  	queryLatencyNs atomic.Int64
  34  	writeLatencyNs atomic.Int64
  35  	latencyAlpha   float64 // EMA coefficient (default 0.1)
  36  
  37  	// Cached metrics (updated by background goroutine)
  38  	metricsLock    sync.RWMutex
  39  	cachedMetrics  loadmonitor.Metrics
  40  	lastL0Tables   int
  41  	lastL0Score    float64
  42  
  43  	// Background collection
  44  	stopChan chan struct{}
  45  	stopped  chan struct{}
  46  	interval time.Duration
  47  }
  48  
  49  // Compile-time checks for interface implementation
  50  var _ loadmonitor.Monitor = (*BadgerMonitor)(nil)
  51  var _ loadmonitor.CompactableMonitor = (*BadgerMonitor)(nil)
  52  var _ loadmonitor.EmergencyModeMonitor = (*BadgerMonitor)(nil)
  53  
  54  // NewBadgerMonitor creates a new Badger load monitor.
  55  // The updateInterval controls how often metrics are collected (default 100ms).
  56  func NewBadgerMonitor(db *badger.DB, updateInterval time.Duration) *BadgerMonitor {
  57  	if updateInterval <= 0 {
  58  		updateInterval = 100 * time.Millisecond
  59  	}
  60  
  61  	m := &BadgerMonitor{
  62  		db:           db,
  63  		latencyAlpha: 0.1, // 10% new, 90% old for smooth EMA
  64  		stopChan:     make(chan struct{}),
  65  		stopped:      make(chan struct{}),
  66  		interval:     updateInterval,
  67  	}
  68  
  69  	// Set a default target (1.5GB)
  70  	m.targetMemoryBytes.Store(1500 * 1024 * 1024)
  71  
  72  	// Default emergency threshold: 150% of target
  73  	m.emergencyThreshold.Store(1500)
  74  
  75  	return m
  76  }
  77  
  78  // SetEmergencyThreshold sets the memory threshold above which emergency mode is triggered.
  79  // threshold is a fraction, e.g., 1.5 = 150% of target memory.
  80  func (m *BadgerMonitor) SetEmergencyThreshold(threshold float64) {
  81  	m.emergencyThreshold.Store(uint64(threshold * 1000))
  82  }
  83  
  84  // GetEmergencyThreshold returns the current emergency threshold as a fraction.
  85  func (m *BadgerMonitor) GetEmergencyThreshold() float64 {
  86  	return float64(m.emergencyThreshold.Load()) / 1000.0
  87  }
  88  
  89  // ForceEmergencyMode manually triggers emergency mode for a duration.
  90  func (m *BadgerMonitor) ForceEmergencyMode(duration time.Duration) {
  91  	m.emergencyModeUntil.Store(time.Now().Add(duration).UnixNano())
  92  	m.inEmergencyMode.Store(true)
  93  	log.W.F("⚠️  emergency mode forced for %v", duration)
  94  }
  95  
  96  // TriggerCompaction initiates a Badger Flatten operation to compact all levels.
  97  // This should be called when memory pressure is high and the database needs to
  98  // reclaim space. It runs synchronously and may take significant time.
  99  func (m *BadgerMonitor) TriggerCompaction() error {
 100  	if m.db == nil || m.db.IsClosed() {
 101  		return nil
 102  	}
 103  
 104  	if m.isCompacting.Load() {
 105  		log.D.Ln("compaction already in progress, skipping")
 106  		return nil
 107  	}
 108  
 109  	m.isCompacting.Store(true)
 110  	defer m.isCompacting.Store(false)
 111  
 112  	log.I.Ln("🗜️  triggering Badger compaction (Flatten)")
 113  	start := time.Now()
 114  
 115  	// Flatten with 4 workers (matches NumCompactors default)
 116  	err := m.db.Flatten(4)
 117  	if err != nil {
 118  		log.E.F("compaction failed: %v", err)
 119  		return err
 120  	}
 121  
 122  	// Also run value log GC to reclaim space
 123  	for {
 124  		err := m.db.RunValueLogGC(0.5)
 125  		if err != nil {
 126  			break // No more GC needed
 127  		}
 128  	}
 129  
 130  	log.I.F("🗜️  compaction completed in %v", time.Since(start))
 131  	return nil
 132  }
 133  
 134  // IsCompacting returns true if a compaction is currently in progress.
 135  func (m *BadgerMonitor) IsCompacting() bool {
 136  	return m.isCompacting.Load()
 137  }
 138  
 139  // GetMetrics returns the current load metrics.
 140  func (m *BadgerMonitor) GetMetrics() loadmonitor.Metrics {
 141  	m.metricsLock.RLock()
 142  	defer m.metricsLock.RUnlock()
 143  	return m.cachedMetrics
 144  }
 145  
 146  // RecordQueryLatency records a query latency sample using exponential moving average.
 147  func (m *BadgerMonitor) RecordQueryLatency(latency time.Duration) {
 148  	ns := latency.Nanoseconds()
 149  	for {
 150  		old := m.queryLatencyNs.Load()
 151  		if old == 0 {
 152  			if m.queryLatencyNs.CompareAndSwap(0, ns) {
 153  				return
 154  			}
 155  			continue
 156  		}
 157  		// EMA: new = alpha * sample + (1-alpha) * old
 158  		newVal := int64(m.latencyAlpha*float64(ns) + (1-m.latencyAlpha)*float64(old))
 159  		if m.queryLatencyNs.CompareAndSwap(old, newVal) {
 160  			return
 161  		}
 162  	}
 163  }
 164  
 165  // RecordWriteLatency records a write latency sample using exponential moving average.
 166  func (m *BadgerMonitor) RecordWriteLatency(latency time.Duration) {
 167  	ns := latency.Nanoseconds()
 168  	for {
 169  		old := m.writeLatencyNs.Load()
 170  		if old == 0 {
 171  			if m.writeLatencyNs.CompareAndSwap(0, ns) {
 172  				return
 173  			}
 174  			continue
 175  		}
 176  		// EMA: new = alpha * sample + (1-alpha) * old
 177  		newVal := int64(m.latencyAlpha*float64(ns) + (1-m.latencyAlpha)*float64(old))
 178  		if m.writeLatencyNs.CompareAndSwap(old, newVal) {
 179  			return
 180  		}
 181  	}
 182  }
 183  
 184  // SetMemoryTarget sets the target memory limit in bytes.
 185  func (m *BadgerMonitor) SetMemoryTarget(bytes uint64) {
 186  	m.targetMemoryBytes.Store(bytes)
 187  }
 188  
 189  // Start begins background metric collection.
 190  func (m *BadgerMonitor) Start() <-chan struct{} {
 191  	go m.collectLoop()
 192  	return m.stopped
 193  }
 194  
 195  // Stop halts background metric collection.
 196  func (m *BadgerMonitor) Stop() {
 197  	close(m.stopChan)
 198  	<-m.stopped
 199  }
 200  
 201  // collectLoop periodically collects metrics from Badger.
 202  func (m *BadgerMonitor) collectLoop() {
 203  	defer close(m.stopped)
 204  
 205  	ticker := time.NewTicker(m.interval)
 206  	defer ticker.Stop()
 207  
 208  	for {
 209  		select {
 210  		case <-m.stopChan:
 211  			return
 212  		case <-ticker.C:
 213  			m.updateMetrics()
 214  		}
 215  	}
 216  }
 217  
 218  // updateMetrics collects current metrics from Badger and actual process memory.
 219  func (m *BadgerMonitor) updateMetrics() {
 220  	if m.db == nil || m.db.IsClosed() {
 221  		return
 222  	}
 223  
 224  	metrics := loadmonitor.Metrics{
 225  		Timestamp: time.Now(),
 226  	}
 227  
 228  	// Use RSS-based memory pressure (actual physical memory, not Go runtime)
 229  	procMem := ReadProcessMemoryStats()
 230  	physicalMemBytes := procMem.PhysicalMemoryBytes()
 231  	metrics.PhysicalMemoryMB = physicalMemBytes / (1024 * 1024)
 232  
 233  	targetBytes := m.targetMemoryBytes.Load()
 234  	if targetBytes > 0 {
 235  		// Use actual physical memory (RSS - shared) for pressure calculation
 236  		metrics.MemoryPressure = float64(physicalMemBytes) / float64(targetBytes)
 237  	}
 238  
 239  	// Check emergency mode
 240  	emergencyThreshold := float64(m.emergencyThreshold.Load()) / 1000.0
 241  	forcedUntil := m.emergencyModeUntil.Load()
 242  	now := time.Now().UnixNano()
 243  
 244  	if forcedUntil > now {
 245  		// Still in forced emergency mode
 246  		metrics.InEmergencyMode = true
 247  	} else if metrics.MemoryPressure >= emergencyThreshold {
 248  		// Memory pressure exceeds emergency threshold
 249  		metrics.InEmergencyMode = true
 250  		if !m.inEmergencyMode.Load() {
 251  			log.W.F("⚠️  entering emergency mode: memory pressure %.1f%% >= threshold %.1f%%",
 252  				metrics.MemoryPressure*100, emergencyThreshold*100)
 253  		}
 254  	} else {
 255  		if m.inEmergencyMode.Load() {
 256  			log.I.F("✅ exiting emergency mode: memory pressure %.1f%% < threshold %.1f%%",
 257  				metrics.MemoryPressure*100, emergencyThreshold*100)
 258  		}
 259  	}
 260  	m.inEmergencyMode.Store(metrics.InEmergencyMode)
 261  
 262  	// Get Badger LSM tree information for write load
 263  	levels := m.db.Levels()
 264  	var l0Tables int
 265  	var maxScore float64
 266  
 267  	for _, level := range levels {
 268  		if level.Level == 0 {
 269  			l0Tables = level.NumTables
 270  		}
 271  		if level.Score > maxScore {
 272  			maxScore = level.Score
 273  		}
 274  	}
 275  
 276  	// Calculate write load based on L0 tables and compaction score
 277  	// L0 tables stall at NumLevelZeroTablesStall (default 16)
 278  	// We consider write pressure high when approaching that limit
 279  	const l0StallThreshold = 16
 280  	l0Load := float64(l0Tables) / float64(l0StallThreshold)
 281  	if l0Load > 1.0 {
 282  		l0Load = 1.0
 283  	}
 284  
 285  	// Compaction score > 1.0 means compaction is needed
 286  	// We blend L0 tables and compaction score for write load
 287  	compactionLoad := maxScore / 2.0 // Score of 2.0 = fully loaded
 288  	if compactionLoad > 1.0 {
 289  		compactionLoad = 1.0
 290  	}
 291  
 292  	// Mark compaction as pending if score is high
 293  	metrics.CompactionPending = maxScore > 1.5 || l0Tables > 10
 294  
 295  	// Blend: 60% L0 (immediate backpressure), 40% compaction score
 296  	metrics.WriteLoad = 0.6*l0Load + 0.4*compactionLoad
 297  
 298  	// Calculate read load from cache metrics
 299  	blockMetrics := m.db.BlockCacheMetrics()
 300  	indexMetrics := m.db.IndexCacheMetrics()
 301  
 302  	var blockHitRatio, indexHitRatio float64
 303  	if blockMetrics != nil {
 304  		blockHitRatio = blockMetrics.Ratio()
 305  	}
 306  	if indexMetrics != nil {
 307  		indexHitRatio = indexMetrics.Ratio()
 308  	}
 309  
 310  	// Average cache hit ratio (0 = no hits = high load, 1 = all hits = low load)
 311  	avgHitRatio := (blockHitRatio + indexHitRatio) / 2.0
 312  
 313  	// Invert: low hit ratio = high read load
 314  	// Use 0.5 as the threshold (below 50% hit ratio is concerning)
 315  	if avgHitRatio < 0.5 {
 316  		metrics.ReadLoad = 1.0 - avgHitRatio*2 // 0% hits = 1.0 load, 50% hits = 0.0 load
 317  	} else {
 318  		metrics.ReadLoad = 0 // Above 50% hit ratio = minimal load
 319  	}
 320  
 321  	// Store latencies
 322  	metrics.QueryLatency = time.Duration(m.queryLatencyNs.Load())
 323  	metrics.WriteLatency = time.Duration(m.writeLatencyNs.Load())
 324  
 325  	// Update cached metrics
 326  	m.metricsLock.Lock()
 327  	m.cachedMetrics = metrics
 328  	m.lastL0Tables = l0Tables
 329  	m.lastL0Score = maxScore
 330  	m.metricsLock.Unlock()
 331  }
 332  
 333  // GetL0Stats returns L0-specific statistics for debugging.
 334  func (m *BadgerMonitor) GetL0Stats() (tables int, score float64) {
 335  	m.metricsLock.RLock()
 336  	defer m.metricsLock.RUnlock()
 337  	return m.lastL0Tables, m.lastL0Score
 338  }
 339