1 // Copyright (c) 2016,2020 Uber Technologies, Inc.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a copy
4 // of this software and associated documentation files (the "Software"), to deal
5 // in the Software without restriction, including without limitation the rights
6 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7 // copies of the Software, and to permit persons to whom the Software is
8 // furnished to do so, subject to the following conditions:
9 //
10 // The above copyright notice and this permission notice shall be included in
11 // all copies or substantial portions of the Software.
12 //
13 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19 // THE SOFTWARE.
20 21 package ratelimit // import "go.uber.org/ratelimit"
22 23 import (
24 "time"
25 26 "sync/atomic"
27 "unsafe"
28 )
29 30 type state struct {
31 last time.Time
32 sleepFor time.Duration
33 }
34 35 type atomicLimiter struct {
36 state unsafe.Pointer
37 //lint:ignore U1000 Padding is unused but it is crucial to maintain performance
38 // of this rate limiter in case of collocation with other frequently accessed memory.
39 padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.
40 41 perRequest time.Duration
42 maxSlack time.Duration
43 clock Clock
44 }
45 46 // newAtomicBased returns a new atomic based limiter.
47 func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
48 // TODO consider moving config building to the implementation
49 // independent code.
50 config := buildConfig(opts)
51 perRequest := config.per / time.Duration(rate)
52 l := &atomicLimiter{
53 perRequest: perRequest,
54 maxSlack: -1 * time.Duration(config.slack) * perRequest,
55 clock: config.clock,
56 }
57 58 initialState := state{
59 last: time.Time{},
60 sleepFor: 0,
61 }
62 atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
63 return l
64 }
65 66 // Take blocks to ensure that the time spent between multiple
67 // Take calls is on average per/rate.
68 func (t *atomicLimiter) Take() time.Time {
69 var (
70 newState state
71 taken bool
72 interval time.Duration
73 )
74 for !taken {
75 now := t.clock.Now()
76 77 previousStatePointer := atomic.LoadPointer(&t.state)
78 oldState := (*state)(previousStatePointer)
79 80 newState = state{
81 last: now,
82 sleepFor: oldState.sleepFor,
83 }
84 85 // If this is our first request, then we allow it.
86 if oldState.last.IsZero() {
87 taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
88 continue
89 }
90 91 // sleepFor calculates how much time we should sleep based on
92 // the perRequest budget and how long the last request took.
93 // Since the request may take longer than the budget, this number
94 // can get negative, and is summed across requests.
95 newState.sleepFor += t.perRequest - now.Sub(oldState.last)
96 // We shouldn't allow sleepFor to get too negative, since it would mean that
97 // a service that slowed down a lot for a short period of time would get
98 // a much higher RPS following that.
99 if newState.sleepFor < t.maxSlack {
100 newState.sleepFor = t.maxSlack
101 }
102 if newState.sleepFor > 0 {
103 newState.last = newState.last.Add(newState.sleepFor)
104 interval, newState.sleepFor = newState.sleepFor, 0
105 }
106 taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
107 }
108 t.clock.Sleep(interval)
109 return newState.last
110 }
111