memory_monitor.go raw

   1  //go:build !(js && wasm)
   2  
   3  package ratelimit
   4  
   5  import (
   6  	"sync"
   7  	"sync/atomic"
   8  	"time"
   9  
  10  	"next.orly.dev/pkg/interfaces/loadmonitor"
  11  )
  12  
  13  // MemoryMonitor is a simple load monitor that only tracks process memory.
  14  // Used for database backends that don't have their own load metrics.
  15  type MemoryMonitor struct {
  16  	// Configuration
  17  	pollInterval time.Duration
  18  	targetBytes  atomic.Uint64
  19  
  20  	// State
  21  	running  atomic.Bool
  22  	stopChan chan struct{}
  23  	doneChan chan struct{}
  24  
  25  	// Metrics (protected by mutex)
  26  	mu             sync.RWMutex
  27  	currentMetrics loadmonitor.Metrics
  28  
  29  	// Latency tracking
  30  	queryLatencies []time.Duration
  31  	writeLatencies []time.Duration
  32  	latencyMu      sync.Mutex
  33  
  34  	// Emergency mode
  35  	emergencyThreshold float64 // e.g., 1.167 (target + 1/6)
  36  	recoveryThreshold  float64 // e.g., 0.833 (target - 1/6)
  37  	inEmergency        atomic.Bool
  38  }
  39  
  40  // NewMemoryMonitor creates a memory-only load monitor.
  41  // pollInterval controls how often memory is sampled (recommended: 100ms).
  42  func NewMemoryMonitor(pollInterval time.Duration) *MemoryMonitor {
  43  	m := &MemoryMonitor{
  44  		pollInterval:       pollInterval,
  45  		stopChan:           make(chan struct{}),
  46  		doneChan:           make(chan struct{}),
  47  		queryLatencies:     make([]time.Duration, 0, 100),
  48  		writeLatencies:     make([]time.Duration, 0, 100),
  49  		emergencyThreshold: 1.167, // Default: target + 1/6
  50  		recoveryThreshold:  0.833, // Default: target - 1/6
  51  	}
  52  	return m
  53  }
  54  
  55  // GetMetrics returns the current load metrics.
  56  func (m *MemoryMonitor) GetMetrics() loadmonitor.Metrics {
  57  	m.mu.RLock()
  58  	defer m.mu.RUnlock()
  59  	return m.currentMetrics
  60  }
  61  
  62  // RecordQueryLatency records a query latency sample.
  63  func (m *MemoryMonitor) RecordQueryLatency(latency time.Duration) {
  64  	m.latencyMu.Lock()
  65  	defer m.latencyMu.Unlock()
  66  
  67  	m.queryLatencies = append(m.queryLatencies, latency)
  68  	if len(m.queryLatencies) > 100 {
  69  		m.queryLatencies = m.queryLatencies[1:]
  70  	}
  71  }
  72  
  73  // RecordWriteLatency records a write latency sample.
  74  func (m *MemoryMonitor) RecordWriteLatency(latency time.Duration) {
  75  	m.latencyMu.Lock()
  76  	defer m.latencyMu.Unlock()
  77  
  78  	m.writeLatencies = append(m.writeLatencies, latency)
  79  	if len(m.writeLatencies) > 100 {
  80  		m.writeLatencies = m.writeLatencies[1:]
  81  	}
  82  }
  83  
  84  // SetMemoryTarget sets the target memory limit in bytes.
  85  func (m *MemoryMonitor) SetMemoryTarget(bytes uint64) {
  86  	m.targetBytes.Store(bytes)
  87  }
  88  
  89  // SetEmergencyThreshold sets the memory threshold for emergency mode.
  90  func (m *MemoryMonitor) SetEmergencyThreshold(threshold float64) {
  91  	m.mu.Lock()
  92  	defer m.mu.Unlock()
  93  	m.emergencyThreshold = threshold
  94  }
  95  
  96  // GetEmergencyThreshold returns the current emergency threshold.
  97  func (m *MemoryMonitor) GetEmergencyThreshold() float64 {
  98  	m.mu.RLock()
  99  	defer m.mu.RUnlock()
 100  	return m.emergencyThreshold
 101  }
 102  
 103  // ForceEmergencyMode manually triggers emergency mode for a duration.
 104  func (m *MemoryMonitor) ForceEmergencyMode(duration time.Duration) {
 105  	m.inEmergency.Store(true)
 106  	go func() {
 107  		time.Sleep(duration)
 108  		m.inEmergency.Store(false)
 109  	}()
 110  }
 111  
 112  // Start begins background metric collection.
 113  func (m *MemoryMonitor) Start() <-chan struct{} {
 114  	if m.running.Swap(true) {
 115  		// Already running
 116  		return m.doneChan
 117  	}
 118  
 119  	go m.pollLoop()
 120  	return m.doneChan
 121  }
 122  
 123  // Stop halts background metric collection.
 124  func (m *MemoryMonitor) Stop() {
 125  	if !m.running.Swap(false) {
 126  		return
 127  	}
 128  	close(m.stopChan)
 129  	<-m.doneChan
 130  }
 131  
 132  // pollLoop continuously samples memory and updates metrics.
 133  func (m *MemoryMonitor) pollLoop() {
 134  	defer close(m.doneChan)
 135  
 136  	ticker := time.NewTicker(m.pollInterval)
 137  	defer ticker.Stop()
 138  
 139  	for {
 140  		select {
 141  		case <-m.stopChan:
 142  			return
 143  		case <-ticker.C:
 144  			m.updateMetrics()
 145  		}
 146  	}
 147  }
 148  
 149  // updateMetrics samples current memory and updates the metrics.
 150  func (m *MemoryMonitor) updateMetrics() {
 151  	target := m.targetBytes.Load()
 152  	if target == 0 {
 153  		target = 1 // Avoid division by zero
 154  	}
 155  
 156  	// Get physical memory using the same method as other monitors
 157  	procMem := ReadProcessMemoryStats()
 158  	physicalMemBytes := procMem.PhysicalMemoryBytes()
 159  	physicalMemMB := physicalMemBytes / (1024 * 1024)
 160  
 161  	// Calculate memory pressure
 162  	memPressure := float64(physicalMemBytes) / float64(target)
 163  
 164  	// Check emergency mode thresholds
 165  	m.mu.RLock()
 166  	emergencyThreshold := m.emergencyThreshold
 167  	recoveryThreshold := m.recoveryThreshold
 168  	m.mu.RUnlock()
 169  
 170  	wasEmergency := m.inEmergency.Load()
 171  	if memPressure > emergencyThreshold {
 172  		m.inEmergency.Store(true)
 173  	} else if memPressure < recoveryThreshold && wasEmergency {
 174  		m.inEmergency.Store(false)
 175  	}
 176  
 177  	// Calculate average latencies
 178  	m.latencyMu.Lock()
 179  	var avgQuery, avgWrite time.Duration
 180  	if len(m.queryLatencies) > 0 {
 181  		var total time.Duration
 182  		for _, l := range m.queryLatencies {
 183  			total += l
 184  		}
 185  		avgQuery = total / time.Duration(len(m.queryLatencies))
 186  	}
 187  	if len(m.writeLatencies) > 0 {
 188  		var total time.Duration
 189  		for _, l := range m.writeLatencies {
 190  			total += l
 191  		}
 192  		avgWrite = total / time.Duration(len(m.writeLatencies))
 193  	}
 194  	m.latencyMu.Unlock()
 195  
 196  	// Update metrics
 197  	m.mu.Lock()
 198  	m.currentMetrics = loadmonitor.Metrics{
 199  		MemoryPressure:    memPressure,
 200  		WriteLoad:         0,    // No database-specific load metric
 201  		ReadLoad:          0,    // No database-specific load metric
 202  		QueryLatency:      avgQuery,
 203  		WriteLatency:      avgWrite,
 204  		Timestamp:         time.Now(),
 205  		InEmergencyMode:   m.inEmergency.Load(),
 206  		CompactionPending: false, // memory-only monitor doesn't track compaction
 207  		PhysicalMemoryMB:  physicalMemMB,
 208  	}
 209  	m.mu.Unlock()
 210  }
 211  
 212  // Ensure MemoryMonitor implements the required interfaces
 213  var _ loadmonitor.Monitor = (*MemoryMonitor)(nil)
 214  var _ loadmonitor.EmergencyModeMonitor = (*MemoryMonitor)(nil)
 215