1 // Copyright 2015 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 5 // Package rate provides a rate limiter.
6 package rate
7 8 import (
9 "context"
10 "fmt"
11 "math"
12 "sync"
13 "time"
14 )
15 16 // Limit defines the maximum frequency of some events.
17 // Limit is represented as number of events per second.
18 // A zero Limit allows no events.
19 type Limit float64
20 21 // Inf is the infinite rate limit; it allows all events (even if burst is zero).
22 const Inf = Limit(math.MaxFloat64)
23 24 // Every converts a minimum time interval between events to a Limit.
25 func Every(interval time.Duration) Limit {
26 if interval <= 0 {
27 return Inf
28 }
29 return 1 / Limit(interval.Seconds())
30 }
31 32 // A Limiter controls how frequently events are allowed to happen.
33 // It implements a "token bucket" of size b, initially full and refilled
34 // at rate r tokens per second.
35 // Informally, in any large enough time interval, the Limiter limits the
36 // rate to r tokens per second, with a maximum burst size of b events.
37 // As a special case, if r == Inf (the infinite rate), b is ignored.
38 // See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
39 //
40 // The zero value is a valid Limiter, but it will reject all events.
41 // Use NewLimiter to create non-zero Limiters.
42 //
43 // Limiter has three main methods, Allow, Reserve, and Wait.
44 // Most callers should use Wait.
45 //
46 // Each of the three methods consumes a single token.
47 // They differ in their behavior when no token is available.
48 // If no token is available, Allow returns false.
49 // If no token is available, Reserve returns a reservation for a future token
50 // and the amount of time the caller must wait before using it.
51 // If no token is available, Wait blocks until one can be obtained
52 // or its associated context.Context is canceled.
53 //
54 // The methods AllowN, ReserveN, and WaitN consume n tokens.
55 //
56 // Limiter is safe for simultaneous use by multiple goroutines.
57 type Limiter struct {
58 mu sync.Mutex
59 limit Limit
60 burst int
61 tokens float64
62 // last is the last time the limiter's tokens field was updated
63 last time.Time
64 // lastEvent is the latest time of a rate-limited event (past or future)
65 lastEvent time.Time
66 }
67 68 // Limit returns the maximum overall event rate.
69 func (lim *Limiter) Limit() Limit {
70 lim.mu.Lock()
71 defer lim.mu.Unlock()
72 return lim.limit
73 }
74 75 // Burst returns the maximum burst size. Burst is the maximum number of tokens
76 // that can be consumed in a single call to Allow, Reserve, or Wait, so higher
77 // Burst values allow more events to happen at once.
78 // A zero Burst allows no events, unless limit == Inf.
79 func (lim *Limiter) Burst() int {
80 lim.mu.Lock()
81 defer lim.mu.Unlock()
82 return lim.burst
83 }
84 85 // TokensAt returns the number of tokens available at time t.
86 func (lim *Limiter) TokensAt(t time.Time) float64 {
87 lim.mu.Lock()
88 tokens := lim.advance(t) // does not mutate lim
89 lim.mu.Unlock()
90 return tokens
91 }
92 93 // Tokens returns the number of tokens available now.
94 func (lim *Limiter) Tokens() float64 {
95 return lim.TokensAt(time.Now())
96 }
97 98 // NewLimiter returns a new Limiter that allows events up to rate r and permits
99 // bursts of at most b tokens.
100 func NewLimiter(r Limit, b int) *Limiter {
101 return &Limiter{
102 limit: r,
103 burst: b,
104 tokens: float64(b),
105 }
106 }
107 108 // Allow reports whether an event may happen now.
109 func (lim *Limiter) Allow() bool {
110 return lim.AllowN(time.Now(), 1)
111 }
112 113 // AllowN reports whether n events may happen at time t.
114 // Use this method if you intend to drop / skip events that exceed the rate limit.
115 // Otherwise use Reserve or Wait.
116 func (lim *Limiter) AllowN(t time.Time, n int) bool {
117 return lim.reserveN(t, n, 0).ok
118 }
119 120 // A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
121 // A Reservation may be canceled, which may enable the Limiter to permit additional events.
122 type Reservation struct {
123 ok bool
124 lim *Limiter
125 tokens int
126 timeToAct time.Time
127 // This is the Limit at reservation time, it can change later.
128 limit Limit
129 }
130 131 // OK returns whether the limiter can provide the requested number of tokens
132 // within the maximum wait time. If OK is false, Delay returns InfDuration, and
133 // Cancel does nothing.
134 func (r *Reservation) OK() bool {
135 return r.ok
136 }
137 138 // Delay is shorthand for DelayFrom(time.Now()).
139 func (r *Reservation) Delay() time.Duration {
140 return r.DelayFrom(time.Now())
141 }
142 143 // InfDuration is the duration returned by Delay when a Reservation is not OK.
144 const InfDuration = time.Duration(math.MaxInt64)
145 146 // DelayFrom returns the duration for which the reservation holder must wait
147 // before taking the reserved action. Zero duration means act immediately.
148 // InfDuration means the limiter cannot grant the tokens requested in this
149 // Reservation within the maximum wait time.
150 func (r *Reservation) DelayFrom(t time.Time) time.Duration {
151 if !r.ok {
152 return InfDuration
153 }
154 delay := r.timeToAct.Sub(t)
155 if delay < 0 {
156 return 0
157 }
158 return delay
159 }
160 161 // Cancel is shorthand for CancelAt(time.Now()).
162 func (r *Reservation) Cancel() {
163 r.CancelAt(time.Now())
164 }
165 166 // CancelAt indicates that the reservation holder will not perform the reserved action
167 // and reverses the effects of this Reservation on the rate limit as much as possible,
168 // considering that other reservations may have already been made.
169 func (r *Reservation) CancelAt(t time.Time) {
170 if !r.ok {
171 return
172 }
173 174 r.lim.mu.Lock()
175 defer r.lim.mu.Unlock()
176 177 if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {
178 return
179 }
180 181 // calculate tokens to restore
182 // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
183 // after r was obtained. These tokens should not be restored.
184 restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
185 if restoreTokens <= 0 {
186 return
187 }
188 // advance time to now
189 tokens := r.lim.advance(t)
190 // calculate new number of tokens
191 tokens += restoreTokens
192 if burst := float64(r.lim.burst); tokens > burst {
193 tokens = burst
194 }
195 // update state
196 r.lim.last = t
197 r.lim.tokens = tokens
198 if r.timeToAct.Equal(r.lim.lastEvent) {
199 prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
200 if !prevEvent.Before(t) {
201 r.lim.lastEvent = prevEvent
202 }
203 }
204 }
205 206 // Reserve is shorthand for ReserveN(time.Now(), 1).
207 func (lim *Limiter) Reserve() *Reservation {
208 return lim.ReserveN(time.Now(), 1)
209 }
210 211 // ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
212 // The Limiter takes this Reservation into account when allowing future events.
213 // The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size.
214 // Usage example:
215 //
216 // r := lim.ReserveN(time.Now(), 1)
217 // if !r.OK() {
218 // // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
219 // return
220 // }
221 // time.Sleep(r.Delay())
222 // Act()
223 //
224 // Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
225 // If you need to respect a deadline or cancel the delay, use Wait instead.
226 // To drop or skip events exceeding rate limit, use Allow instead.
227 func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {
228 r := lim.reserveN(t, n, InfDuration)
229 return &r
230 }
231 232 // Wait is shorthand for WaitN(ctx, 1).
233 func (lim *Limiter) Wait(ctx context.Context) (err error) {
234 return lim.WaitN(ctx, 1)
235 }
236 237 // WaitN blocks until lim permits n events to happen.
238 // It returns an error if n exceeds the Limiter's burst size, the Context is
239 // canceled, or the expected wait time exceeds the Context's Deadline.
240 // The burst limit is ignored if the rate limit is Inf.
241 func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
242 // The test code calls lim.wait with a fake timer generator.
243 // This is the real timer generator.
244 newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {
245 timer := time.NewTimer(d)
246 return timer.C, timer.Stop, func() {}
247 }
248 249 return lim.wait(ctx, n, time.Now(), newTimer)
250 }
251 252 // wait is the internal implementation of WaitN.
253 func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
254 lim.mu.Lock()
255 burst := lim.burst
256 limit := lim.limit
257 lim.mu.Unlock()
258 259 if n > burst && limit != Inf {
260 return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
261 }
262 // Check if ctx is already cancelled
263 select {
264 case <-ctx.Done():
265 return ctx.Err()
266 default:
267 }
268 // Determine wait limit
269 waitLimit := InfDuration
270 if deadline, ok := ctx.Deadline(); ok {
271 waitLimit = deadline.Sub(t)
272 }
273 // Reserve
274 r := lim.reserveN(t, n, waitLimit)
275 if !r.ok {
276 return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
277 }
278 // Wait if necessary
279 delay := r.DelayFrom(t)
280 if delay == 0 {
281 return nil
282 }
283 ch, stop, advance := newTimer(delay)
284 defer stop()
285 advance() // only has an effect when testing
286 select {
287 case <-ch:
288 // We can proceed.
289 return nil
290 case <-ctx.Done():
291 // Context was canceled before we could proceed. Cancel the
292 // reservation, which may permit other events to proceed sooner.
293 r.Cancel()
294 return ctx.Err()
295 }
296 }
297 298 // SetLimit is shorthand for SetLimitAt(time.Now(), newLimit).
299 func (lim *Limiter) SetLimit(newLimit Limit) {
300 lim.SetLimitAt(time.Now(), newLimit)
301 }
302 303 // SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
304 // or underutilized by those which reserved (using Reserve or Wait) but did not yet act
305 // before SetLimitAt was called.
306 func (lim *Limiter) SetLimitAt(t time.Time, newLimit Limit) {
307 lim.mu.Lock()
308 defer lim.mu.Unlock()
309 310 tokens := lim.advance(t)
311 312 lim.last = t
313 lim.tokens = tokens
314 lim.limit = newLimit
315 }
316 317 // SetBurst is shorthand for SetBurstAt(time.Now(), newBurst).
318 func (lim *Limiter) SetBurst(newBurst int) {
319 lim.SetBurstAt(time.Now(), newBurst)
320 }
321 322 // SetBurstAt sets a new burst size for the limiter.
323 func (lim *Limiter) SetBurstAt(t time.Time, newBurst int) {
324 lim.mu.Lock()
325 defer lim.mu.Unlock()
326 327 tokens := lim.advance(t)
328 329 lim.last = t
330 lim.tokens = tokens
331 lim.burst = newBurst
332 }
333 334 // reserveN is a helper method for AllowN, ReserveN, and WaitN.
335 // maxFutureReserve specifies the maximum reservation wait duration allowed.
336 // reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
337 func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
338 lim.mu.Lock()
339 defer lim.mu.Unlock()
340 341 if lim.limit == Inf {
342 return Reservation{
343 ok: true,
344 lim: lim,
345 tokens: n,
346 timeToAct: t,
347 }
348 }
349 350 tokens := lim.advance(t)
351 352 // Calculate the remaining number of tokens resulting from the request.
353 tokens -= float64(n)
354 355 // Calculate the wait duration
356 var waitDuration time.Duration
357 if tokens < 0 {
358 waitDuration = lim.limit.durationFromTokens(-tokens)
359 }
360 361 // Decide result
362 ok := n <= lim.burst && waitDuration <= maxFutureReserve
363 364 // Prepare reservation
365 r := Reservation{
366 ok: ok,
367 lim: lim,
368 limit: lim.limit,
369 }
370 if ok {
371 r.tokens = n
372 r.timeToAct = t.Add(waitDuration)
373 374 // Update state
375 lim.last = t
376 lim.tokens = tokens
377 lim.lastEvent = r.timeToAct
378 }
379 380 return r
381 }
382 383 // advance calculates and returns an updated number of tokens for lim
384 // resulting from the passage of time.
385 // lim is not changed.
386 // advance requires that lim.mu is held.
387 func (lim *Limiter) advance(t time.Time) (newTokens float64) {
388 last := lim.last
389 if t.Before(last) {
390 last = t
391 }
392 393 // Calculate the new number of tokens, due to time that passed.
394 elapsed := t.Sub(last)
395 delta := lim.limit.tokensFromDuration(elapsed)
396 tokens := lim.tokens + delta
397 if burst := float64(lim.burst); tokens > burst {
398 tokens = burst
399 }
400 return tokens
401 }
402 403 // durationFromTokens is a unit conversion function from the number of tokens to the duration
404 // of time it takes to accumulate them at a rate of limit tokens per second.
405 func (limit Limit) durationFromTokens(tokens float64) time.Duration {
406 if limit <= 0 {
407 return InfDuration
408 }
409 410 duration := (tokens / float64(limit)) * float64(time.Second)
411 412 // Cap the duration to the maximum representable int64 value, to avoid overflow.
413 if duration > float64(math.MaxInt64) {
414 return InfDuration
415 }
416 417 return time.Duration(duration)
418 }
419 420 // tokensFromDuration is a unit conversion function from a time duration to the number of tokens
421 // which could be accumulated during that duration at a rate of limit tokens per second.
422 func (limit Limit) tokensFromDuration(d time.Duration) float64 {
423 if limit <= 0 {
424 return 0
425 }
426 return d.Seconds() * float64(limit)
427 }
428