memory_monitor.go raw
1 //go:build !(js && wasm)
2
3 package ratelimit
4
5 import (
6 "sync"
7 "sync/atomic"
8 "time"
9
10 "next.orly.dev/pkg/interfaces/loadmonitor"
11 )
12
13 // MemoryMonitor is a simple load monitor that only tracks process memory.
14 // Used for database backends that don't have their own load metrics.
15 type MemoryMonitor struct {
16 // Configuration
17 pollInterval time.Duration
18 targetBytes atomic.Uint64
19
20 // State
21 running atomic.Bool
22 stopChan chan struct{}
23 doneChan chan struct{}
24
25 // Metrics (protected by mutex)
26 mu sync.RWMutex
27 currentMetrics loadmonitor.Metrics
28
29 // Latency tracking
30 queryLatencies []time.Duration
31 writeLatencies []time.Duration
32 latencyMu sync.Mutex
33
34 // Emergency mode
35 emergencyThreshold float64 // e.g., 1.167 (target + 1/6)
36 recoveryThreshold float64 // e.g., 0.833 (target - 1/6)
37 inEmergency atomic.Bool
38 }
39
40 // NewMemoryMonitor creates a memory-only load monitor.
41 // pollInterval controls how often memory is sampled (recommended: 100ms).
42 func NewMemoryMonitor(pollInterval time.Duration) *MemoryMonitor {
43 m := &MemoryMonitor{
44 pollInterval: pollInterval,
45 stopChan: make(chan struct{}),
46 doneChan: make(chan struct{}),
47 queryLatencies: make([]time.Duration, 0, 100),
48 writeLatencies: make([]time.Duration, 0, 100),
49 emergencyThreshold: 1.167, // Default: target + 1/6
50 recoveryThreshold: 0.833, // Default: target - 1/6
51 }
52 return m
53 }
54
55 // GetMetrics returns the current load metrics.
56 func (m *MemoryMonitor) GetMetrics() loadmonitor.Metrics {
57 m.mu.RLock()
58 defer m.mu.RUnlock()
59 return m.currentMetrics
60 }
61
62 // RecordQueryLatency records a query latency sample.
63 func (m *MemoryMonitor) RecordQueryLatency(latency time.Duration) {
64 m.latencyMu.Lock()
65 defer m.latencyMu.Unlock()
66
67 m.queryLatencies = append(m.queryLatencies, latency)
68 if len(m.queryLatencies) > 100 {
69 m.queryLatencies = m.queryLatencies[1:]
70 }
71 }
72
73 // RecordWriteLatency records a write latency sample.
74 func (m *MemoryMonitor) RecordWriteLatency(latency time.Duration) {
75 m.latencyMu.Lock()
76 defer m.latencyMu.Unlock()
77
78 m.writeLatencies = append(m.writeLatencies, latency)
79 if len(m.writeLatencies) > 100 {
80 m.writeLatencies = m.writeLatencies[1:]
81 }
82 }
83
84 // SetMemoryTarget sets the target memory limit in bytes.
85 func (m *MemoryMonitor) SetMemoryTarget(bytes uint64) {
86 m.targetBytes.Store(bytes)
87 }
88
89 // SetEmergencyThreshold sets the memory threshold for emergency mode.
90 func (m *MemoryMonitor) SetEmergencyThreshold(threshold float64) {
91 m.mu.Lock()
92 defer m.mu.Unlock()
93 m.emergencyThreshold = threshold
94 }
95
96 // GetEmergencyThreshold returns the current emergency threshold.
97 func (m *MemoryMonitor) GetEmergencyThreshold() float64 {
98 m.mu.RLock()
99 defer m.mu.RUnlock()
100 return m.emergencyThreshold
101 }
102
103 // ForceEmergencyMode manually triggers emergency mode for a duration.
104 func (m *MemoryMonitor) ForceEmergencyMode(duration time.Duration) {
105 m.inEmergency.Store(true)
106 go func() {
107 time.Sleep(duration)
108 m.inEmergency.Store(false)
109 }()
110 }
111
112 // Start begins background metric collection.
113 func (m *MemoryMonitor) Start() <-chan struct{} {
114 if m.running.Swap(true) {
115 // Already running
116 return m.doneChan
117 }
118
119 go m.pollLoop()
120 return m.doneChan
121 }
122
123 // Stop halts background metric collection.
124 func (m *MemoryMonitor) Stop() {
125 if !m.running.Swap(false) {
126 return
127 }
128 close(m.stopChan)
129 <-m.doneChan
130 }
131
132 // pollLoop continuously samples memory and updates metrics.
133 func (m *MemoryMonitor) pollLoop() {
134 defer close(m.doneChan)
135
136 ticker := time.NewTicker(m.pollInterval)
137 defer ticker.Stop()
138
139 for {
140 select {
141 case <-m.stopChan:
142 return
143 case <-ticker.C:
144 m.updateMetrics()
145 }
146 }
147 }
148
149 // updateMetrics samples current memory and updates the metrics.
150 func (m *MemoryMonitor) updateMetrics() {
151 target := m.targetBytes.Load()
152 if target == 0 {
153 target = 1 // Avoid division by zero
154 }
155
156 // Get physical memory using the same method as other monitors
157 procMem := ReadProcessMemoryStats()
158 physicalMemBytes := procMem.PhysicalMemoryBytes()
159 physicalMemMB := physicalMemBytes / (1024 * 1024)
160
161 // Calculate memory pressure
162 memPressure := float64(physicalMemBytes) / float64(target)
163
164 // Check emergency mode thresholds
165 m.mu.RLock()
166 emergencyThreshold := m.emergencyThreshold
167 recoveryThreshold := m.recoveryThreshold
168 m.mu.RUnlock()
169
170 wasEmergency := m.inEmergency.Load()
171 if memPressure > emergencyThreshold {
172 m.inEmergency.Store(true)
173 } else if memPressure < recoveryThreshold && wasEmergency {
174 m.inEmergency.Store(false)
175 }
176
177 // Calculate average latencies
178 m.latencyMu.Lock()
179 var avgQuery, avgWrite time.Duration
180 if len(m.queryLatencies) > 0 {
181 var total time.Duration
182 for _, l := range m.queryLatencies {
183 total += l
184 }
185 avgQuery = total / time.Duration(len(m.queryLatencies))
186 }
187 if len(m.writeLatencies) > 0 {
188 var total time.Duration
189 for _, l := range m.writeLatencies {
190 total += l
191 }
192 avgWrite = total / time.Duration(len(m.writeLatencies))
193 }
194 m.latencyMu.Unlock()
195
196 // Update metrics
197 m.mu.Lock()
198 m.currentMetrics = loadmonitor.Metrics{
199 MemoryPressure: memPressure,
200 WriteLoad: 0, // No database-specific load metric
201 ReadLoad: 0, // No database-specific load metric
202 QueryLatency: avgQuery,
203 WriteLatency: avgWrite,
204 Timestamp: time.Now(),
205 InEmergencyMode: m.inEmergency.Load(),
206 CompactionPending: false, // memory-only monitor doesn't track compaction
207 PhysicalMemoryMB: physicalMemMB,
208 }
209 m.mu.Unlock()
210 }
211
212 // Ensure MemoryMonitor implements the required interfaces
213 var _ loadmonitor.Monitor = (*MemoryMonitor)(nil)
214 var _ loadmonitor.EmergencyModeMonitor = (*MemoryMonitor)(nil)
215