limiter.go raw
1 package ratelimit
2
3 import (
4 "context"
5 "runtime"
6 "sync"
7 "sync/atomic"
8 "time"
9
10 "next.orly.dev/pkg/lol/log"
11 "next.orly.dev/pkg/interfaces/loadmonitor"
12 pidif "next.orly.dev/pkg/interfaces/pid"
13 "next.orly.dev/pkg/pid"
14 )
15
16 // OperationType distinguishes between read and write operations
17 // for applying different rate limiting strategies.
18 type OperationType int
19
20 const (
21 // Read operations (REQ queries)
22 Read OperationType = iota
23 // Write operations (EVENT saves, imports)
24 Write
25 )
26
27 // String returns a human-readable name for the operation type.
28 func (o OperationType) String() string {
29 switch o {
30 case Read:
31 return "read"
32 case Write:
33 return "write"
34 default:
35 return "unknown"
36 }
37 }
38
39 // Config holds configuration for the adaptive rate limiter.
40 type Config struct {
41 // Enabled controls whether rate limiting is active.
42 Enabled bool
43
44 // TargetMemoryMB is the target memory limit in megabytes.
45 // Memory pressure is calculated relative to this target.
46 TargetMemoryMB int
47
48 // WriteSetpoint is the target process variable for writes (0.0-1.0).
49 // Default: 0.85 (throttle when load exceeds 85%)
50 WriteSetpoint float64
51
52 // ReadSetpoint is the target process variable for reads (0.0-1.0).
53 // Default: 0.90 (more tolerant for reads)
54 ReadSetpoint float64
55
56 // PID gains for writes
57 WriteKp float64
58 WriteKi float64
59 WriteKd float64
60
61 // PID gains for reads
62 ReadKp float64
63 ReadKi float64
64 ReadKd float64
65
66 // MaxWriteDelayMs is the maximum delay for write operations in milliseconds.
67 MaxWriteDelayMs int
68
69 // MaxReadDelayMs is the maximum delay for read operations in milliseconds.
70 MaxReadDelayMs int
71
72 // MetricUpdateInterval is how often to poll the load monitor.
73 MetricUpdateInterval time.Duration
74
75 // MemoryWeight is the weight given to memory pressure in process variable (0.0-1.0).
76 // The remaining weight is given to the load metric.
77 // Default: 0.7 (70% memory, 30% load)
78 MemoryWeight float64
79
80 // EmergencyThreshold is the memory pressure level (fraction of target) that triggers emergency mode.
81 // Default: 1.167 (116.7% = target + 1/6th)
82 // When exceeded, writes are aggressively throttled until memory drops below RecoveryThreshold.
83 EmergencyThreshold float64
84
85 // RecoveryThreshold is the memory pressure level below which we exit emergency mode.
86 // Default: 0.833 (83.3% = target - 1/6th)
87 // Hysteresis prevents rapid oscillation between normal and emergency modes.
88 RecoveryThreshold float64
89
90 // EmergencyMaxDelayMs is the maximum delay for writes during emergency mode.
91 // Default: 5000 (5 seconds) - much longer than normal MaxWriteDelayMs
92 EmergencyMaxDelayMs int
93
94 // CompactionCheckInterval controls how often to check if compaction should be triggered.
95 // Default: 10 seconds
96 CompactionCheckInterval time.Duration
97 }
98
99 // DefaultConfig returns a default configuration for the rate limiter.
100 func DefaultConfig() Config {
101 return Config{
102 Enabled: true,
103 TargetMemoryMB: 1500, // 1.5GB target
104 WriteSetpoint: 0.85,
105 ReadSetpoint: 0.90,
106 WriteKp: 0.5,
107 WriteKi: 0.1,
108 WriteKd: 0.05,
109 ReadKp: 0.3,
110 ReadKi: 0.05,
111 ReadKd: 0.02,
112 MaxWriteDelayMs: 1000, // 1 second max
113 MaxReadDelayMs: 500, // 500ms max
114 MetricUpdateInterval: 100 * time.Millisecond,
115 MemoryWeight: 0.7,
116 EmergencyThreshold: 1.167, // Target + 1/6th (~1.75GB for 1.5GB target)
117 RecoveryThreshold: 0.833, // Target - 1/6th (~1.25GB for 1.5GB target)
118 EmergencyMaxDelayMs: 5000, // 5 seconds max in emergency mode
119 CompactionCheckInterval: 10 * time.Second,
120 }
121 }
122
123 // NewConfigFromValues creates a Config from individual configuration values.
124 // This is useful when loading configuration from environment variables.
125 func NewConfigFromValues(
126 enabled bool,
127 targetMB int,
128 writeKp, writeKi, writeKd float64,
129 readKp, readKi, readKd float64,
130 maxWriteMs, maxReadMs int,
131 writeTarget, readTarget float64,
132 emergencyThreshold, recoveryThreshold float64,
133 emergencyMaxMs int,
134 ) Config {
135 // Apply defaults for zero values
136 if emergencyThreshold == 0 {
137 emergencyThreshold = 1.167 // Target + 1/6th
138 }
139 if recoveryThreshold == 0 {
140 recoveryThreshold = 0.833 // Target - 1/6th
141 }
142 if emergencyMaxMs == 0 {
143 emergencyMaxMs = 5000 // 5 seconds
144 }
145
146 return Config{
147 Enabled: enabled,
148 TargetMemoryMB: targetMB,
149 WriteSetpoint: writeTarget,
150 ReadSetpoint: readTarget,
151 WriteKp: writeKp,
152 WriteKi: writeKi,
153 WriteKd: writeKd,
154 ReadKp: readKp,
155 ReadKi: readKi,
156 ReadKd: readKd,
157 MaxWriteDelayMs: maxWriteMs,
158 MaxReadDelayMs: maxReadMs,
159 MetricUpdateInterval: 100 * time.Millisecond,
160 MemoryWeight: 0.7,
161 EmergencyThreshold: emergencyThreshold,
162 RecoveryThreshold: recoveryThreshold,
163 EmergencyMaxDelayMs: emergencyMaxMs,
164 CompactionCheckInterval: 10 * time.Second,
165 }
166 }
167
168 // Limiter implements adaptive rate limiting using PID control.
169 // It monitors database load metrics and computes appropriate delays
170 // to keep the system within its target operating range.
171 type Limiter struct {
172 config Config
173 monitor loadmonitor.Monitor
174
175 // PID controllers for reads and writes (using generic pid.Controller)
176 writePID pidif.Controller
177 readPID pidif.Controller
178
179 // Cached metrics (updated periodically)
180 metricsLock sync.RWMutex
181 currentMetrics loadmonitor.Metrics
182
183 // Emergency mode tracking with hysteresis
184 inEmergencyMode atomic.Bool
185 lastEmergencyCheck atomic.Int64 // Unix nano timestamp
186 compactionTriggered atomic.Bool
187
188 // Connection-level metrics for adaptive connection acceptance
189 activeConnections atomic.Int64
190 goroutineCount atomic.Int64
191
192 // Connection storm config (set via SetConnectionLimits)
193 maxGlobalConns int
194 connDelayMaxMs int
195 goroutineWarning int
196 goroutineMax int
197
198 // Statistics
199 totalWriteDelayMs atomic.Int64
200 totalReadDelayMs atomic.Int64
201 writeThrottles atomic.Int64
202 readThrottles atomic.Int64
203 emergencyEvents atomic.Int64
204 droppedConns atomic.Int64
205
206 // Lifecycle
207 ctx context.Context
208 cancel context.CancelFunc
209 stopOnce sync.Once
210 stopped chan struct{}
211 wg sync.WaitGroup
212 }
213
214 // NewLimiter creates a new adaptive rate limiter.
215 // If monitor is nil, the limiter will be disabled.
216 func NewLimiter(config Config, monitor loadmonitor.Monitor) *Limiter {
217 ctx, cancel := context.WithCancel(context.Background())
218
219 // Apply defaults for zero values
220 if config.EmergencyThreshold == 0 {
221 config.EmergencyThreshold = 1.167 // Target + 1/6th
222 }
223 if config.RecoveryThreshold == 0 {
224 config.RecoveryThreshold = 0.833 // Target - 1/6th
225 }
226 if config.EmergencyMaxDelayMs == 0 {
227 config.EmergencyMaxDelayMs = 5000 // 5 seconds
228 }
229 if config.CompactionCheckInterval == 0 {
230 config.CompactionCheckInterval = 10 * time.Second
231 }
232
233 l := &Limiter{
234 config: config,
235 monitor: monitor,
236 ctx: ctx,
237 cancel: cancel,
238 stopped: make(chan struct{}),
239 }
240
241 // Create PID controllers with configured gains using the generic pid package
242 l.writePID = pid.New(pidif.Tuning{
243 Kp: config.WriteKp,
244 Ki: config.WriteKi,
245 Kd: config.WriteKd,
246 Setpoint: config.WriteSetpoint,
247 DerivativeFilterAlpha: 0.2, // Strong filtering for writes
248 IntegralMin: -2.0,
249 IntegralMax: float64(config.MaxWriteDelayMs) / 1000.0 * 2, // Anti-windup limits
250 OutputMin: 0,
251 OutputMax: float64(config.MaxWriteDelayMs) / 1000.0,
252 })
253
254 l.readPID = pid.New(pidif.Tuning{
255 Kp: config.ReadKp,
256 Ki: config.ReadKi,
257 Kd: config.ReadKd,
258 Setpoint: config.ReadSetpoint,
259 DerivativeFilterAlpha: 0.15, // Very strong filtering for reads
260 IntegralMin: -1.0,
261 IntegralMax: float64(config.MaxReadDelayMs) / 1000.0 * 2,
262 OutputMin: 0,
263 OutputMax: float64(config.MaxReadDelayMs) / 1000.0,
264 })
265
266 // Set memory target on monitor
267 if monitor != nil && config.TargetMemoryMB > 0 {
268 monitor.SetMemoryTarget(uint64(config.TargetMemoryMB) * 1024 * 1024)
269 }
270
271 // Configure emergency threshold if monitor supports it
272 if emMon, ok := monitor.(loadmonitor.EmergencyModeMonitor); ok {
273 emMon.SetEmergencyThreshold(config.EmergencyThreshold)
274 }
275
276 return l
277 }
278
279 // Start begins the rate limiter's background metric collection.
280 func (l *Limiter) Start() {
281 if l.monitor == nil || !l.config.Enabled {
282 return
283 }
284
285 // Start the monitor
286 l.monitor.Start()
287
288 // Start metric update loop
289 l.wg.Add(1)
290 go l.updateLoop()
291 }
292
293 // updateLoop periodically fetches metrics from the monitor.
294 func (l *Limiter) updateLoop() {
295 defer l.wg.Done()
296
297 ticker := time.NewTicker(l.config.MetricUpdateInterval)
298 defer ticker.Stop()
299
300 for {
301 select {
302 case <-l.ctx.Done():
303 return
304 case <-ticker.C:
305 if l.monitor != nil {
306 metrics := l.monitor.GetMetrics()
307 l.metricsLock.Lock()
308 l.currentMetrics = metrics
309 l.metricsLock.Unlock()
310 }
311 // Sample goroutine count for connection storm detection
312 l.goroutineCount.Store(int64(runtime.NumGoroutine()))
313 }
314 }
315 }
316
317 // Stop halts the rate limiter.
318 func (l *Limiter) Stop() {
319 l.stopOnce.Do(func() {
320 l.cancel()
321 if l.monitor != nil {
322 l.monitor.Stop()
323 }
324 l.wg.Wait()
325 close(l.stopped)
326 })
327 }
328
329 // Stopped returns a channel that closes when the limiter has stopped.
330 func (l *Limiter) Stopped() <-chan struct{} {
331 return l.stopped
332 }
333
334 // Wait blocks until the rate limiter permits the operation to proceed.
335 // It returns the delay that was applied, or 0 if no delay was needed.
336 // If the context is cancelled, it returns immediately.
337 // opType accepts int for interface compatibility (0=Read, 1=Write)
338 func (l *Limiter) Wait(ctx context.Context, opType int) time.Duration {
339 if !l.config.Enabled || l.monitor == nil {
340 return 0
341 }
342
343 delay := l.ComputeDelay(OperationType(opType))
344 if delay <= 0 {
345 return 0
346 }
347
348 // Apply the delay
349 select {
350 case <-ctx.Done():
351 return 0
352 case <-time.After(delay):
353 return delay
354 }
355 }
356
357 // ComputeDelay calculates the recommended delay for an operation.
358 // This can be used to check the delay without actually waiting.
359 func (l *Limiter) ComputeDelay(opType OperationType) time.Duration {
360 if !l.config.Enabled || l.monitor == nil {
361 return 0
362 }
363
364 // Get current metrics
365 l.metricsLock.RLock()
366 metrics := l.currentMetrics
367 l.metricsLock.RUnlock()
368
369 // Check emergency mode with hysteresis
370 inEmergency := l.checkEmergencyMode(metrics.MemoryPressure)
371
372 // Compute process variable as weighted combination of memory and load
373 var loadMetric float64
374 switch opType {
375 case Write:
376 loadMetric = metrics.WriteLoad
377 case Read:
378 loadMetric = metrics.ReadLoad
379 }
380
381 // Combine memory pressure and load
382 // Process variable = memoryWeight * memoryPressure + (1-memoryWeight) * loadMetric
383 pv := l.config.MemoryWeight*metrics.MemoryPressure + (1-l.config.MemoryWeight)*loadMetric
384
385 // Select the appropriate PID controller
386 var delaySec float64
387 switch opType {
388 case Write:
389 out := l.writePID.UpdateValue(pv)
390 delaySec = out.Value()
391
392 // In emergency mode, apply progressive throttling for writes
393 if inEmergency {
394 // Calculate how far above emergency threshold we are
395 // Linear scaling: multiplier = 1 + (excess * 5)
396 // At emergency threshold: 1x, at +20% above: 2x, at +40% above: 3x
397 excessPressure := metrics.MemoryPressure - l.config.EmergencyThreshold
398 if excessPressure < 0 {
399 excessPressure = 0
400 }
401 multiplier := 1.0 + excessPressure*5.0
402
403 emergencyDelaySec := delaySec * multiplier
404 maxEmergencySec := float64(l.config.EmergencyMaxDelayMs) / 1000.0
405
406 if emergencyDelaySec > maxEmergencySec {
407 emergencyDelaySec = maxEmergencySec
408 }
409 // Minimum emergency delay of 100ms to allow other operations
410 if emergencyDelaySec < 0.1 {
411 emergencyDelaySec = 0.1
412 }
413 delaySec = emergencyDelaySec
414 }
415
416 if delaySec > 0 {
417 l.writeThrottles.Add(1)
418 l.totalWriteDelayMs.Add(int64(delaySec * 1000))
419 }
420 case Read:
421 out := l.readPID.UpdateValue(pv)
422 delaySec = out.Value()
423 if delaySec > 0 {
424 l.readThrottles.Add(1)
425 l.totalReadDelayMs.Add(int64(delaySec * 1000))
426 }
427 }
428
429 if delaySec <= 0 {
430 return 0
431 }
432
433 return time.Duration(delaySec * float64(time.Second))
434 }
435
436 // checkEmergencyMode implements hysteresis-based emergency mode detection.
437 // Enters emergency mode when memory pressure >= EmergencyThreshold.
438 // Exits emergency mode when memory pressure <= RecoveryThreshold.
439 func (l *Limiter) checkEmergencyMode(memoryPressure float64) bool {
440 wasInEmergency := l.inEmergencyMode.Load()
441
442 if wasInEmergency {
443 // To exit, must drop below recovery threshold
444 if memoryPressure <= l.config.RecoveryThreshold {
445 l.inEmergencyMode.Store(false)
446 log.I.F("✅ exiting emergency mode: memory %.1f%% <= recovery threshold %.1f%%",
447 memoryPressure*100, l.config.RecoveryThreshold*100)
448 return false
449 }
450 return true
451 }
452
453 // To enter, must exceed emergency threshold
454 if memoryPressure >= l.config.EmergencyThreshold {
455 l.inEmergencyMode.Store(true)
456 l.emergencyEvents.Add(1)
457 log.W.F("⚠️ entering emergency mode: memory %.1f%% >= threshold %.1f%%",
458 memoryPressure*100, l.config.EmergencyThreshold*100)
459
460 // Trigger compaction if supported
461 l.triggerCompactionIfNeeded()
462 return true
463 }
464
465 return false
466 }
467
468 // triggerCompactionIfNeeded triggers database compaction if the monitor supports it
469 // and compaction isn't already in progress.
470 func (l *Limiter) triggerCompactionIfNeeded() {
471 if l.compactionTriggered.Load() {
472 return // Already triggered
473 }
474
475 compactMon, ok := l.monitor.(loadmonitor.CompactableMonitor)
476 if !ok {
477 return // Monitor doesn't support compaction
478 }
479
480 if compactMon.IsCompacting() {
481 return // Already compacting
482 }
483
484 l.compactionTriggered.Store(true)
485 go func() {
486 defer l.compactionTriggered.Store(false)
487 if err := compactMon.TriggerCompaction(); err != nil {
488 log.E.F("compaction failed: %v", err)
489 }
490 }()
491 }
492
493 // InEmergencyMode returns true if the limiter is currently in emergency mode.
494 func (l *Limiter) InEmergencyMode() bool {
495 return l.inEmergencyMode.Load()
496 }
497
498 // RecordLatency records an operation latency for the monitor.
499 func (l *Limiter) RecordLatency(opType OperationType, latency time.Duration) {
500 if l.monitor == nil {
501 return
502 }
503
504 switch opType {
505 case Write:
506 l.monitor.RecordWriteLatency(latency)
507 case Read:
508 l.monitor.RecordQueryLatency(latency)
509 }
510 }
511
512 // Stats returns rate limiter statistics.
513 type Stats struct {
514 WriteThrottles int64
515 ReadThrottles int64
516 TotalWriteDelayMs int64
517 TotalReadDelayMs int64
518 EmergencyEvents int64
519 InEmergencyMode bool
520 CurrentMetrics loadmonitor.Metrics
521 WritePIDState PIDState
522 ReadPIDState PIDState
523 }
524
525 // PIDState contains the internal state of a PID controller.
526 type PIDState struct {
527 Integral float64
528 PrevError float64
529 PrevFilteredError float64
530 }
531
532 // GetStats returns current rate limiter statistics.
533 func (l *Limiter) GetStats() Stats {
534 l.metricsLock.RLock()
535 metrics := l.currentMetrics
536 l.metricsLock.RUnlock()
537
538 stats := Stats{
539 WriteThrottles: l.writeThrottles.Load(),
540 ReadThrottles: l.readThrottles.Load(),
541 TotalWriteDelayMs: l.totalWriteDelayMs.Load(),
542 TotalReadDelayMs: l.totalReadDelayMs.Load(),
543 EmergencyEvents: l.emergencyEvents.Load(),
544 InEmergencyMode: l.inEmergencyMode.Load(),
545 CurrentMetrics: metrics,
546 }
547
548 // Type assert to concrete pid.Controller to access State() method
549 // This is for monitoring/debugging only
550 if wCtrl, ok := l.writePID.(*pid.Controller); ok {
551 integral, prevErr, prevFiltered, _ := wCtrl.State()
552 stats.WritePIDState = PIDState{
553 Integral: integral,
554 PrevError: prevErr,
555 PrevFilteredError: prevFiltered,
556 }
557 }
558 if rCtrl, ok := l.readPID.(*pid.Controller); ok {
559 integral, prevErr, prevFiltered, _ := rCtrl.State()
560 stats.ReadPIDState = PIDState{
561 Integral: integral,
562 PrevError: prevErr,
563 PrevFilteredError: prevFiltered,
564 }
565 }
566
567 return stats
568 }
569
570 // Reset clears all PID controller state and statistics.
571 func (l *Limiter) Reset() {
572 l.writePID.Reset()
573 l.readPID.Reset()
574 l.writeThrottles.Store(0)
575 l.readThrottles.Store(0)
576 l.totalWriteDelayMs.Store(0)
577 l.totalReadDelayMs.Store(0)
578 }
579
580 // IsEnabled returns whether rate limiting is active.
581 func (l *Limiter) IsEnabled() bool {
582 return l.config.Enabled && l.monitor != nil
583 }
584
585 // SetConnectionLimits configures the connection storm mitigation parameters.
586 func (l *Limiter) SetConnectionLimits(maxGlobal, delayMaxMs, goroutineWarn, goroutineMax int) {
587 l.maxGlobalConns = maxGlobal
588 l.connDelayMaxMs = delayMaxMs
589 l.goroutineWarning = goroutineWarn
590 l.goroutineMax = goroutineMax
591 }
592
593 // SetActiveConnections updates the current connection count metric.
594 func (l *Limiter) SetActiveConnections(n int64) {
595 l.activeConnections.Store(n)
596 }
597
598 // ActiveConnections returns the current connection count.
599 func (l *Limiter) ActiveConnections() int64 {
600 return l.activeConnections.Load()
601 }
602
603 // DroppedConnections returns the total number of connections dropped due to overload.
604 func (l *Limiter) DroppedConnections() int64 {
605 return l.droppedConns.Load()
606 }
607
608 // systemLoadScore computes a composite load score from 0.0 (idle) to 1.0+ (overloaded).
609 // It combines memory pressure, goroutine count, and connection count.
610 func (l *Limiter) systemLoadScore() float64 {
611 l.metricsLock.RLock()
612 memPressure := l.currentMetrics.MemoryPressure
613 l.metricsLock.RUnlock()
614
615 goroutines := l.goroutineCount.Load()
616 conns := l.activeConnections.Load()
617
618 // Memory component (0-1, already normalized)
619 memScore := memPressure
620
621 // Goroutine component: linear from warning to max
622 var goroutineScore float64
623 if l.goroutineWarning > 0 && goroutines > int64(l.goroutineWarning) {
624 goroutineScore = float64(goroutines-int64(l.goroutineWarning)) /
625 float64(l.goroutineMax-l.goroutineWarning)
626 if goroutineScore > 1.0 {
627 goroutineScore = 1.0
628 }
629 }
630
631 // Connection component: linear from 50% to 100% of max
632 var connScore float64
633 if l.maxGlobalConns > 0 && conns > int64(l.maxGlobalConns/2) {
634 connScore = float64(conns-int64(l.maxGlobalConns/2)) /
635 float64(l.maxGlobalConns/2)
636 if connScore > 1.0 {
637 connScore = 1.0
638 }
639 }
640
641 // Weighted combination: memory 50%, goroutines 30%, connections 20%
642 return memScore*0.5 + goroutineScore*0.3 + connScore*0.2
643 }
644
645 // ShouldAcceptConnection returns false if the system is too overloaded to accept
646 // new connections. It checks memory pressure, goroutine count, and connection count.
647 func (l *Limiter) ShouldAcceptConnection() bool {
648 if !l.config.Enabled || l.monitor == nil {
649 return true
650 }
651
652 // Hard limits: refuse immediately
653 goroutines := l.goroutineCount.Load()
654 if l.goroutineMax > 0 && goroutines >= int64(l.goroutineMax) {
655 l.droppedConns.Add(1)
656 log.W.F("refusing connection: goroutine count %d >= max %d", goroutines, l.goroutineMax)
657 return false
658 }
659
660 conns := l.activeConnections.Load()
661 if l.maxGlobalConns > 0 && conns >= int64(l.maxGlobalConns) {
662 l.droppedConns.Add(1)
663 log.W.F("refusing connection: active connections %d >= max %d", conns, l.maxGlobalConns)
664 return false
665 }
666
667 // Emergency mode: refuse
668 if l.inEmergencyMode.Load() {
669 l.droppedConns.Add(1)
670 log.W.F("refusing connection: emergency mode active")
671 return false
672 }
673
674 return true
675 }
676
677 // ConnectionDelay returns a delay to apply before accepting a new connection.
678 // Returns 0 if no delay is needed. The delay is proportional to system load.
679 func (l *Limiter) ConnectionDelay() time.Duration {
680 if !l.config.Enabled || l.monitor == nil || l.connDelayMaxMs <= 0 {
681 return 0
682 }
683
684 score := l.systemLoadScore()
685
686 // No delay below 0.5 load
687 if score < 0.5 {
688 return 0
689 }
690
691 // Linear delay from 0.5 to 1.0 load
692 fraction := (score - 0.5) * 2.0 // 0.0 at 0.5, 1.0 at 1.0
693 if fraction > 1.0 {
694 fraction = 1.0
695 }
696
697 delayMs := fraction * float64(l.connDelayMaxMs)
698 return time.Duration(delayMs) * time.Millisecond
699 }
700
701 // UpdateConfig updates the rate limiter configuration.
702 // This is useful for dynamic tuning.
703 func (l *Limiter) UpdateConfig(config Config) {
704 l.config = config
705
706 // Update PID controllers - use interface methods for setpoint and gains
707 l.writePID.SetSetpoint(config.WriteSetpoint)
708 l.writePID.SetGains(config.WriteKp, config.WriteKi, config.WriteKd)
709 // Type assert to set output limits (not part of base interface)
710 if wCtrl, ok := l.writePID.(*pid.Controller); ok {
711 wCtrl.SetOutputLimits(0, float64(config.MaxWriteDelayMs)/1000.0)
712 }
713
714 l.readPID.SetSetpoint(config.ReadSetpoint)
715 l.readPID.SetGains(config.ReadKp, config.ReadKi, config.ReadKd)
716 if rCtrl, ok := l.readPID.(*pid.Controller); ok {
717 rCtrl.SetOutputLimits(0, float64(config.MaxReadDelayMs)/1000.0)
718 }
719
720 // Update memory target
721 if l.monitor != nil && config.TargetMemoryMB > 0 {
722 l.monitor.SetMemoryTarget(uint64(config.TargetMemoryMB) * 1024 * 1024)
723 }
724 }
725