badger_monitor.go raw
1 //go:build !(js && wasm)
2
3 package ratelimit
4
5 import (
6 "sync"
7 "sync/atomic"
8 "time"
9
10 "github.com/dgraph-io/badger/v4"
11 "next.orly.dev/pkg/lol/log"
12 "next.orly.dev/pkg/interfaces/loadmonitor"
13 )
14
15 // BadgerMonitor implements loadmonitor.Monitor for the Badger database.
16 // It collects metrics from Badger's LSM tree, caches, and actual process memory.
17 // It also implements CompactableMonitor and EmergencyModeMonitor interfaces.
18 type BadgerMonitor struct {
19 db *badger.DB
20
21 // Target memory for pressure calculation
22 targetMemoryBytes atomic.Uint64
23
24 // Emergency mode configuration
25 emergencyThreshold atomic.Uint64 // stored as threshold * 1000 (e.g., 1500 = 1.5)
26 emergencyModeUntil atomic.Int64 // Unix nano when forced emergency mode ends
27 inEmergencyMode atomic.Bool
28
29 // Compaction state
30 isCompacting atomic.Bool
31
32 // Latency tracking with exponential moving average
33 queryLatencyNs atomic.Int64
34 writeLatencyNs atomic.Int64
35 latencyAlpha float64 // EMA coefficient (default 0.1)
36
37 // Cached metrics (updated by background goroutine)
38 metricsLock sync.RWMutex
39 cachedMetrics loadmonitor.Metrics
40 lastL0Tables int
41 lastL0Score float64
42
43 // Background collection
44 stopChan chan struct{}
45 stopped chan struct{}
46 interval time.Duration
47 }
48
49 // Compile-time checks for interface implementation
50 var _ loadmonitor.Monitor = (*BadgerMonitor)(nil)
51 var _ loadmonitor.CompactableMonitor = (*BadgerMonitor)(nil)
52 var _ loadmonitor.EmergencyModeMonitor = (*BadgerMonitor)(nil)
53
54 // NewBadgerMonitor creates a new Badger load monitor.
55 // The updateInterval controls how often metrics are collected (default 100ms).
56 func NewBadgerMonitor(db *badger.DB, updateInterval time.Duration) *BadgerMonitor {
57 if updateInterval <= 0 {
58 updateInterval = 100 * time.Millisecond
59 }
60
61 m := &BadgerMonitor{
62 db: db,
63 latencyAlpha: 0.1, // 10% new, 90% old for smooth EMA
64 stopChan: make(chan struct{}),
65 stopped: make(chan struct{}),
66 interval: updateInterval,
67 }
68
69 // Set a default target (1.5GB)
70 m.targetMemoryBytes.Store(1500 * 1024 * 1024)
71
72 // Default emergency threshold: 150% of target
73 m.emergencyThreshold.Store(1500)
74
75 return m
76 }
77
78 // SetEmergencyThreshold sets the memory threshold above which emergency mode is triggered.
79 // threshold is a fraction, e.g., 1.5 = 150% of target memory.
80 func (m *BadgerMonitor) SetEmergencyThreshold(threshold float64) {
81 m.emergencyThreshold.Store(uint64(threshold * 1000))
82 }
83
84 // GetEmergencyThreshold returns the current emergency threshold as a fraction.
85 func (m *BadgerMonitor) GetEmergencyThreshold() float64 {
86 return float64(m.emergencyThreshold.Load()) / 1000.0
87 }
88
89 // ForceEmergencyMode manually triggers emergency mode for a duration.
90 func (m *BadgerMonitor) ForceEmergencyMode(duration time.Duration) {
91 m.emergencyModeUntil.Store(time.Now().Add(duration).UnixNano())
92 m.inEmergencyMode.Store(true)
93 log.W.F("⚠️ emergency mode forced for %v", duration)
94 }
95
96 // TriggerCompaction initiates a Badger Flatten operation to compact all levels.
97 // This should be called when memory pressure is high and the database needs to
98 // reclaim space. It runs synchronously and may take significant time.
99 func (m *BadgerMonitor) TriggerCompaction() error {
100 if m.db == nil || m.db.IsClosed() {
101 return nil
102 }
103
104 if m.isCompacting.Load() {
105 log.D.Ln("compaction already in progress, skipping")
106 return nil
107 }
108
109 m.isCompacting.Store(true)
110 defer m.isCompacting.Store(false)
111
112 log.I.Ln("🗜️ triggering Badger compaction (Flatten)")
113 start := time.Now()
114
115 // Flatten with 4 workers (matches NumCompactors default)
116 err := m.db.Flatten(4)
117 if err != nil {
118 log.E.F("compaction failed: %v", err)
119 return err
120 }
121
122 // Also run value log GC to reclaim space
123 for {
124 err := m.db.RunValueLogGC(0.5)
125 if err != nil {
126 break // No more GC needed
127 }
128 }
129
130 log.I.F("🗜️ compaction completed in %v", time.Since(start))
131 return nil
132 }
133
134 // IsCompacting returns true if a compaction is currently in progress.
135 func (m *BadgerMonitor) IsCompacting() bool {
136 return m.isCompacting.Load()
137 }
138
139 // GetMetrics returns the current load metrics.
140 func (m *BadgerMonitor) GetMetrics() loadmonitor.Metrics {
141 m.metricsLock.RLock()
142 defer m.metricsLock.RUnlock()
143 return m.cachedMetrics
144 }
145
146 // RecordQueryLatency records a query latency sample using exponential moving average.
147 func (m *BadgerMonitor) RecordQueryLatency(latency time.Duration) {
148 ns := latency.Nanoseconds()
149 for {
150 old := m.queryLatencyNs.Load()
151 if old == 0 {
152 if m.queryLatencyNs.CompareAndSwap(0, ns) {
153 return
154 }
155 continue
156 }
157 // EMA: new = alpha * sample + (1-alpha) * old
158 newVal := int64(m.latencyAlpha*float64(ns) + (1-m.latencyAlpha)*float64(old))
159 if m.queryLatencyNs.CompareAndSwap(old, newVal) {
160 return
161 }
162 }
163 }
164
165 // RecordWriteLatency records a write latency sample using exponential moving average.
166 func (m *BadgerMonitor) RecordWriteLatency(latency time.Duration) {
167 ns := latency.Nanoseconds()
168 for {
169 old := m.writeLatencyNs.Load()
170 if old == 0 {
171 if m.writeLatencyNs.CompareAndSwap(0, ns) {
172 return
173 }
174 continue
175 }
176 // EMA: new = alpha * sample + (1-alpha) * old
177 newVal := int64(m.latencyAlpha*float64(ns) + (1-m.latencyAlpha)*float64(old))
178 if m.writeLatencyNs.CompareAndSwap(old, newVal) {
179 return
180 }
181 }
182 }
183
184 // SetMemoryTarget sets the target memory limit in bytes.
185 func (m *BadgerMonitor) SetMemoryTarget(bytes uint64) {
186 m.targetMemoryBytes.Store(bytes)
187 }
188
189 // Start begins background metric collection.
190 func (m *BadgerMonitor) Start() <-chan struct{} {
191 go m.collectLoop()
192 return m.stopped
193 }
194
195 // Stop halts background metric collection.
196 func (m *BadgerMonitor) Stop() {
197 close(m.stopChan)
198 <-m.stopped
199 }
200
201 // collectLoop periodically collects metrics from Badger.
202 func (m *BadgerMonitor) collectLoop() {
203 defer close(m.stopped)
204
205 ticker := time.NewTicker(m.interval)
206 defer ticker.Stop()
207
208 for {
209 select {
210 case <-m.stopChan:
211 return
212 case <-ticker.C:
213 m.updateMetrics()
214 }
215 }
216 }
217
218 // updateMetrics collects current metrics from Badger and actual process memory.
219 func (m *BadgerMonitor) updateMetrics() {
220 if m.db == nil || m.db.IsClosed() {
221 return
222 }
223
224 metrics := loadmonitor.Metrics{
225 Timestamp: time.Now(),
226 }
227
228 // Use RSS-based memory pressure (actual physical memory, not Go runtime)
229 procMem := ReadProcessMemoryStats()
230 physicalMemBytes := procMem.PhysicalMemoryBytes()
231 metrics.PhysicalMemoryMB = physicalMemBytes / (1024 * 1024)
232
233 targetBytes := m.targetMemoryBytes.Load()
234 if targetBytes > 0 {
235 // Use actual physical memory (RSS - shared) for pressure calculation
236 metrics.MemoryPressure = float64(physicalMemBytes) / float64(targetBytes)
237 }
238
239 // Check emergency mode
240 emergencyThreshold := float64(m.emergencyThreshold.Load()) / 1000.0
241 forcedUntil := m.emergencyModeUntil.Load()
242 now := time.Now().UnixNano()
243
244 if forcedUntil > now {
245 // Still in forced emergency mode
246 metrics.InEmergencyMode = true
247 } else if metrics.MemoryPressure >= emergencyThreshold {
248 // Memory pressure exceeds emergency threshold
249 metrics.InEmergencyMode = true
250 if !m.inEmergencyMode.Load() {
251 log.W.F("⚠️ entering emergency mode: memory pressure %.1f%% >= threshold %.1f%%",
252 metrics.MemoryPressure*100, emergencyThreshold*100)
253 }
254 } else {
255 if m.inEmergencyMode.Load() {
256 log.I.F("✅ exiting emergency mode: memory pressure %.1f%% < threshold %.1f%%",
257 metrics.MemoryPressure*100, emergencyThreshold*100)
258 }
259 }
260 m.inEmergencyMode.Store(metrics.InEmergencyMode)
261
262 // Get Badger LSM tree information for write load
263 levels := m.db.Levels()
264 var l0Tables int
265 var maxScore float64
266
267 for _, level := range levels {
268 if level.Level == 0 {
269 l0Tables = level.NumTables
270 }
271 if level.Score > maxScore {
272 maxScore = level.Score
273 }
274 }
275
276 // Calculate write load based on L0 tables and compaction score
277 // L0 tables stall at NumLevelZeroTablesStall (default 16)
278 // We consider write pressure high when approaching that limit
279 const l0StallThreshold = 16
280 l0Load := float64(l0Tables) / float64(l0StallThreshold)
281 if l0Load > 1.0 {
282 l0Load = 1.0
283 }
284
285 // Compaction score > 1.0 means compaction is needed
286 // We blend L0 tables and compaction score for write load
287 compactionLoad := maxScore / 2.0 // Score of 2.0 = fully loaded
288 if compactionLoad > 1.0 {
289 compactionLoad = 1.0
290 }
291
292 // Mark compaction as pending if score is high
293 metrics.CompactionPending = maxScore > 1.5 || l0Tables > 10
294
295 // Blend: 60% L0 (immediate backpressure), 40% compaction score
296 metrics.WriteLoad = 0.6*l0Load + 0.4*compactionLoad
297
298 // Calculate read load from cache metrics
299 blockMetrics := m.db.BlockCacheMetrics()
300 indexMetrics := m.db.IndexCacheMetrics()
301
302 var blockHitRatio, indexHitRatio float64
303 if blockMetrics != nil {
304 blockHitRatio = blockMetrics.Ratio()
305 }
306 if indexMetrics != nil {
307 indexHitRatio = indexMetrics.Ratio()
308 }
309
310 // Average cache hit ratio (0 = no hits = high load, 1 = all hits = low load)
311 avgHitRatio := (blockHitRatio + indexHitRatio) / 2.0
312
313 // Invert: low hit ratio = high read load
314 // Use 0.5 as the threshold (below 50% hit ratio is concerning)
315 if avgHitRatio < 0.5 {
316 metrics.ReadLoad = 1.0 - avgHitRatio*2 // 0% hits = 1.0 load, 50% hits = 0.0 load
317 } else {
318 metrics.ReadLoad = 0 // Above 50% hit ratio = minimal load
319 }
320
321 // Store latencies
322 metrics.QueryLatency = time.Duration(m.queryLatencyNs.Load())
323 metrics.WriteLatency = time.Duration(m.writeLatencyNs.Load())
324
325 // Update cached metrics
326 m.metricsLock.Lock()
327 m.cachedMetrics = metrics
328 m.lastL0Tables = l0Tables
329 m.lastL0Score = maxScore
330 m.metricsLock.Unlock()
331 }
332
333 // GetL0Stats returns L0-specific statistics for debugging.
334 func (m *BadgerMonitor) GetL0Stats() (tables int, score float64) {
335 m.metricsLock.RLock()
336 defer m.metricsLock.RUnlock()
337 return m.lastL0Tables, m.lastL0Score
338 }
339