pid.go raw
1 // Package ratelimit provides adaptive rate limiting using PID control.
2 // The PID controller uses proportional, integral, and derivative terms
3 // with a low-pass filter on the derivative to suppress high-frequency noise.
4 package ratelimit
5
6 import (
7 "math"
8 "sync"
9 "time"
10 )
11
12 // PIDController implements a PID controller with filtered derivative.
13 // It is designed for rate limiting database operations based on load metrics.
14 //
15 // The controller computes a delay recommendation based on:
16 // - Proportional (P): Immediate response to current error
17 // - Integral (I): Accumulated error to eliminate steady-state offset
18 // - Derivative (D): Rate of change prediction (filtered to reduce noise)
19 //
20 // The filtered derivative uses a low-pass filter to attenuate high-frequency
21 // noise that would otherwise cause erratic control behavior.
22 type PIDController struct {
23 // Gains
24 Kp float64 // Proportional gain
25 Ki float64 // Integral gain
26 Kd float64 // Derivative gain
27
28 // Setpoint is the target process variable value (e.g., 0.85 for 85% of target memory).
29 // The controller drives the process variable toward this setpoint.
30 Setpoint float64
31
32 // DerivativeFilterAlpha is the low-pass filter coefficient for the derivative term.
33 // Range: 0.0-1.0, where lower values provide stronger filtering.
34 // Recommended: 0.2 for strong filtering, 0.5 for moderate filtering.
35 DerivativeFilterAlpha float64
36
37 // Integral limits for anti-windup
38 IntegralMax float64
39 IntegralMin float64
40
41 // Output limits
42 OutputMin float64 // Minimum output (typically 0 = no delay)
43 OutputMax float64 // Maximum output (max delay in seconds)
44
45 // Internal state (protected by mutex)
46 mu sync.Mutex
47 integral float64
48 prevError float64
49 prevFilteredError float64
50 lastUpdate time.Time
51 initialized bool
52 }
53
54 // DefaultPIDControllerForWrites creates a PID controller tuned for write operations.
55 // Writes benefit from aggressive integral and moderate proportional response.
56 func DefaultPIDControllerForWrites() *PIDController {
57 return &PIDController{
58 Kp: 0.5, // Moderate proportional response
59 Ki: 0.1, // Steady integral to eliminate offset
60 Kd: 0.05, // Small derivative for prediction
61 Setpoint: 0.85, // Target 85% of memory limit
62 DerivativeFilterAlpha: 0.2, // Strong filtering (20% new, 80% old)
63 IntegralMax: 10.0, // Anti-windup: max 10 seconds accumulated
64 IntegralMin: -2.0, // Allow small negative for faster recovery
65 OutputMin: 0.0, // No delay minimum
66 OutputMax: 1.0, // Max 1 second delay per write
67 }
68 }
69
70 // DefaultPIDControllerForReads creates a PID controller tuned for read operations.
71 // Reads should be more responsive but with less aggressive throttling.
72 func DefaultPIDControllerForReads() *PIDController {
73 return &PIDController{
74 Kp: 0.3, // Lower proportional (reads are more important)
75 Ki: 0.05, // Lower integral (don't accumulate as aggressively)
76 Kd: 0.02, // Very small derivative
77 Setpoint: 0.90, // Target 90% (more tolerant of memory use)
78 DerivativeFilterAlpha: 0.15, // Very strong filtering
79 IntegralMax: 5.0, // Lower anti-windup limit
80 IntegralMin: -1.0, // Allow small negative
81 OutputMin: 0.0, // No delay minimum
82 OutputMax: 0.5, // Max 500ms delay per read
83 }
84 }
85
86 // NewPIDController creates a new PID controller with custom parameters.
87 func NewPIDController(
88 kp, ki, kd float64,
89 setpoint float64,
90 derivativeFilterAlpha float64,
91 integralMin, integralMax float64,
92 outputMin, outputMax float64,
93 ) *PIDController {
94 return &PIDController{
95 Kp: kp,
96 Ki: ki,
97 Kd: kd,
98 Setpoint: setpoint,
99 DerivativeFilterAlpha: derivativeFilterAlpha,
100 IntegralMin: integralMin,
101 IntegralMax: integralMax,
102 OutputMin: outputMin,
103 OutputMax: outputMax,
104 }
105 }
106
107 // Update computes the PID output based on the current process variable.
108 // The process variable should be in the range [0.0, 1.0+] representing load level.
109 //
110 // Returns the recommended delay in seconds. A value of 0 means no delay needed.
111 func (p *PIDController) Update(processVariable float64) float64 {
112 p.mu.Lock()
113 defer p.mu.Unlock()
114
115 now := time.Now()
116
117 // Initialize on first call
118 if !p.initialized {
119 p.lastUpdate = now
120 p.prevError = processVariable - p.Setpoint
121 p.prevFilteredError = p.prevError
122 p.initialized = true
123 return 0 // No delay on first call
124 }
125
126 // Calculate time delta
127 dt := now.Sub(p.lastUpdate).Seconds()
128 if dt <= 0 {
129 dt = 0.001 // Minimum 1ms to avoid division by zero
130 }
131 p.lastUpdate = now
132
133 // Calculate current error (positive when above setpoint = need to throttle)
134 error := processVariable - p.Setpoint
135
136 // Proportional term: immediate response to current error
137 pTerm := p.Kp * error
138
139 // Integral term: accumulate error over time
140 // Apply anti-windup by clamping the integral
141 p.integral += error * dt
142 p.integral = clamp(p.integral, p.IntegralMin, p.IntegralMax)
143 iTerm := p.Ki * p.integral
144
145 // Derivative term with low-pass filter
146 // Apply exponential moving average to filter high-frequency noise:
147 // filtered = alpha * new + (1 - alpha) * old
148 // This is equivalent to a first-order low-pass filter
149 filteredError := p.DerivativeFilterAlpha*error + (1-p.DerivativeFilterAlpha)*p.prevFilteredError
150
151 // Derivative of the filtered error
152 var dTerm float64
153 if dt > 0 {
154 dTerm = p.Kd * (filteredError - p.prevFilteredError) / dt
155 }
156
157 // Update previous values for next iteration
158 p.prevError = error
159 p.prevFilteredError = filteredError
160
161 // Compute total output and clamp to limits
162 output := pTerm + iTerm + dTerm
163 output = clamp(output, p.OutputMin, p.OutputMax)
164
165 // Only return positive delays (throttle when above setpoint)
166 if output < 0 {
167 return 0
168 }
169 return output
170 }
171
172 // Reset clears the controller state, useful when conditions change significantly.
173 func (p *PIDController) Reset() {
174 p.mu.Lock()
175 defer p.mu.Unlock()
176
177 p.integral = 0
178 p.prevError = 0
179 p.prevFilteredError = 0
180 p.initialized = false
181 }
182
183 // SetSetpoint updates the target setpoint.
184 func (p *PIDController) SetSetpoint(setpoint float64) {
185 p.mu.Lock()
186 defer p.mu.Unlock()
187 p.Setpoint = setpoint
188 }
189
190 // SetGains updates the PID gains.
191 func (p *PIDController) SetGains(kp, ki, kd float64) {
192 p.mu.Lock()
193 defer p.mu.Unlock()
194 p.Kp = kp
195 p.Ki = ki
196 p.Kd = kd
197 }
198
199 // GetState returns the current internal state for monitoring/debugging.
200 func (p *PIDController) GetState() (integral, prevError, prevFilteredError float64) {
201 p.mu.Lock()
202 defer p.mu.Unlock()
203 return p.integral, p.prevError, p.prevFilteredError
204 }
205
206 // clamp restricts a value to the range [min, max].
207 func clamp(value, min, max float64) float64 {
208 if math.IsNaN(value) {
209 return 0
210 }
211 if value < min {
212 return min
213 }
214 if value > max {
215 return max
216 }
217 return value
218 }
219