ratelimit.go raw
1 package graph
2
3 import (
4 "context"
5 "sync"
6 "time"
7 )
8
9 // RateLimiter implements a token bucket rate limiter with adaptive throttling
10 // based on graph query complexity. It allows cooperative scheduling by inserting
11 // pauses between operations to allow other work to proceed.
12 type RateLimiter struct {
13 mu sync.Mutex
14
15 // Token bucket parameters
16 tokens float64 // Current available tokens
17 maxTokens float64 // Maximum token capacity
18 refillRate float64 // Tokens per second to add
19 lastRefill time.Time // Last time tokens were refilled
20
21 // Throttling parameters
22 baseDelay time.Duration // Minimum delay between operations
23 maxDelay time.Duration // Maximum delay for complex queries
24 depthFactor float64 // Multiplier per depth level
25 limitFactor float64 // Multiplier based on result limit
26 }
27
28 // RateLimiterConfig configures the rate limiter behavior.
29 type RateLimiterConfig struct {
30 // MaxTokens is the maximum number of tokens in the bucket (default: 100)
31 MaxTokens float64
32
33 // RefillRate is tokens added per second (default: 10)
34 RefillRate float64
35
36 // BaseDelay is the minimum delay between operations (default: 1ms)
37 BaseDelay time.Duration
38
39 // MaxDelay is the maximum delay for complex queries (default: 100ms)
40 MaxDelay time.Duration
41
42 // DepthFactor is the cost multiplier per depth level (default: 2.0)
43 // A depth-3 query costs 2^3 = 8x more tokens than depth-1
44 DepthFactor float64
45
46 // LimitFactor is additional cost per 100 results requested (default: 0.1)
47 LimitFactor float64
48 }
49
50 // DefaultRateLimiterConfig returns sensible defaults for the rate limiter.
51 func DefaultRateLimiterConfig() RateLimiterConfig {
52 return RateLimiterConfig{
53 MaxTokens: 100.0,
54 RefillRate: 10.0, // Refills fully in 10 seconds
55 BaseDelay: 1 * time.Millisecond,
56 MaxDelay: 100 * time.Millisecond,
57 DepthFactor: 2.0,
58 LimitFactor: 0.1,
59 }
60 }
61
62 // NewRateLimiter creates a new rate limiter with the given configuration.
63 func NewRateLimiter(cfg RateLimiterConfig) *RateLimiter {
64 if cfg.MaxTokens <= 0 {
65 cfg.MaxTokens = DefaultRateLimiterConfig().MaxTokens
66 }
67 if cfg.RefillRate <= 0 {
68 cfg.RefillRate = DefaultRateLimiterConfig().RefillRate
69 }
70 if cfg.BaseDelay <= 0 {
71 cfg.BaseDelay = DefaultRateLimiterConfig().BaseDelay
72 }
73 if cfg.MaxDelay <= 0 {
74 cfg.MaxDelay = DefaultRateLimiterConfig().MaxDelay
75 }
76 if cfg.DepthFactor <= 0 {
77 cfg.DepthFactor = DefaultRateLimiterConfig().DepthFactor
78 }
79 if cfg.LimitFactor <= 0 {
80 cfg.LimitFactor = DefaultRateLimiterConfig().LimitFactor
81 }
82
83 return &RateLimiter{
84 tokens: cfg.MaxTokens,
85 maxTokens: cfg.MaxTokens,
86 refillRate: cfg.RefillRate,
87 lastRefill: time.Now(),
88 baseDelay: cfg.BaseDelay,
89 maxDelay: cfg.MaxDelay,
90 depthFactor: cfg.DepthFactor,
91 limitFactor: cfg.LimitFactor,
92 }
93 }
94
95 // QueryCost calculates the token cost for a graph query based on its complexity.
96 // Higher depths and larger limits cost exponentially more tokens.
97 func (rl *RateLimiter) QueryCost(q *Query) float64 {
98 if q == nil {
99 return 1.0
100 }
101
102 // Base cost is exponential in depth: depthFactor^depth
103 // This models the exponential growth of traversal work
104 cost := 1.0
105 for i := 0; i < q.Depth; i++ {
106 cost *= rl.depthFactor
107 }
108
109 // Add cost for bidirectional queries (traversing both directions)
110 if q.IsBidirectional() {
111 cost *= 1.5
112 }
113
114 return cost
115 }
116
117 // OperationCost calculates the token cost for a single traversal operation.
118 // This is used during query execution for per-operation throttling.
119 func (rl *RateLimiter) OperationCost(depth int, nodesAtDepth int) float64 {
120 // Cost increases with depth and number of nodes to process
121 depthMultiplier := 1.0
122 for i := 0; i < depth; i++ {
123 depthMultiplier *= rl.depthFactor
124 }
125
126 // More nodes at this depth = more work
127 nodeFactor := 1.0 + float64(nodesAtDepth)*0.01
128
129 return depthMultiplier * nodeFactor
130 }
131
132 // refillTokens adds tokens based on elapsed time since last refill.
133 func (rl *RateLimiter) refillTokens() {
134 now := time.Now()
135 elapsed := now.Sub(rl.lastRefill).Seconds()
136 rl.lastRefill = now
137
138 rl.tokens += elapsed * rl.refillRate
139 if rl.tokens > rl.maxTokens {
140 rl.tokens = rl.maxTokens
141 }
142 }
143
144 // Acquire tries to acquire tokens for a query. If not enough tokens are available,
145 // it waits until they become available or the context is cancelled.
146 // Returns the delay that was applied, or an error if context was cancelled.
147 func (rl *RateLimiter) Acquire(ctx context.Context, cost float64) (time.Duration, error) {
148 rl.mu.Lock()
149 defer rl.mu.Unlock()
150
151 rl.refillTokens()
152
153 var totalDelay time.Duration
154
155 // Wait until we have enough tokens
156 for rl.tokens < cost {
157 // Calculate how long we need to wait for tokens to refill
158 tokensNeeded := cost - rl.tokens
159 waitTime := time.Duration(tokensNeeded/rl.refillRate*1000) * time.Millisecond
160
161 // Clamp to max delay
162 if waitTime > rl.maxDelay {
163 waitTime = rl.maxDelay
164 }
165 if waitTime < rl.baseDelay {
166 waitTime = rl.baseDelay
167 }
168
169 // Release lock while waiting
170 rl.mu.Unlock()
171
172 select {
173 case <-ctx.Done():
174 rl.mu.Lock()
175 return totalDelay, ctx.Err()
176 case <-time.After(waitTime):
177 }
178
179 totalDelay += waitTime
180 rl.mu.Lock()
181 rl.refillTokens()
182 }
183
184 // Consume tokens
185 rl.tokens -= cost
186 return totalDelay, nil
187 }
188
189 // TryAcquire attempts to acquire tokens without waiting.
190 // Returns true if successful, false if insufficient tokens.
191 func (rl *RateLimiter) TryAcquire(cost float64) bool {
192 rl.mu.Lock()
193 defer rl.mu.Unlock()
194
195 rl.refillTokens()
196
197 if rl.tokens >= cost {
198 rl.tokens -= cost
199 return true
200 }
201 return false
202 }
203
204 // Pause inserts a cooperative delay to allow other work to proceed.
205 // The delay is proportional to the current depth and load.
206 // This should be called periodically during long-running traversals.
207 func (rl *RateLimiter) Pause(ctx context.Context, depth int, itemsProcessed int) error {
208 // Calculate adaptive delay based on depth and progress
209 // Deeper traversals and more processed items = longer pauses
210 delay := rl.baseDelay
211
212 // Increase delay with depth
213 for i := 0; i < depth; i++ {
214 delay += rl.baseDelay
215 }
216
217 // Add extra delay every N items to allow other work
218 if itemsProcessed > 0 && itemsProcessed%100 == 0 {
219 delay += rl.baseDelay * 5
220 }
221
222 // Cap at max delay
223 if delay > rl.maxDelay {
224 delay = rl.maxDelay
225 }
226
227 select {
228 case <-ctx.Done():
229 return ctx.Err()
230 case <-time.After(delay):
231 return nil
232 }
233 }
234
235 // AvailableTokens returns the current number of available tokens.
236 func (rl *RateLimiter) AvailableTokens() float64 {
237 rl.mu.Lock()
238 defer rl.mu.Unlock()
239 rl.refillTokens()
240 return rl.tokens
241 }
242
243 // Throttler provides a simple interface for cooperative scheduling during traversal.
244 // It wraps the rate limiter and provides depth-aware throttling.
245 type Throttler struct {
246 rl *RateLimiter
247 depth int
248 itemsProcessed int
249 }
250
251 // NewThrottler creates a throttler for a specific traversal operation.
252 func NewThrottler(rl *RateLimiter, depth int) *Throttler {
253 return &Throttler{
254 rl: rl,
255 depth: depth,
256 }
257 }
258
259 // Tick should be called after processing each item.
260 // It tracks progress and inserts pauses as needed.
261 func (t *Throttler) Tick(ctx context.Context) error {
262 t.itemsProcessed++
263
264 // Insert cooperative pause periodically
265 // More frequent pauses at higher depths
266 interval := 50
267 if t.depth >= 2 {
268 interval = 25
269 }
270 if t.depth >= 4 {
271 interval = 10
272 }
273
274 if t.itemsProcessed%interval == 0 {
275 return t.rl.Pause(ctx, t.depth, t.itemsProcessed)
276 }
277 return nil
278 }
279
280 // Complete marks the throttler as complete and returns stats.
281 func (t *Throttler) Complete() (itemsProcessed int) {
282 return t.itemsProcessed
283 }
284