limiter_atomic.go raw

   1  // Copyright (c) 2016,2020 Uber Technologies, Inc.
   2  //
   3  // Permission is hereby granted, free of charge, to any person obtaining a copy
   4  // of this software and associated documentation files (the "Software"), to deal
   5  // in the Software without restriction, including without limitation the rights
   6  // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
   7  // copies of the Software, and to permit persons to whom the Software is
   8  // furnished to do so, subject to the following conditions:
   9  //
  10  // The above copyright notice and this permission notice shall be included in
  11  // all copies or substantial portions of the Software.
  12  //
  13  // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14  // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15  // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16  // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17  // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  18  // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  19  // THE SOFTWARE.
  20  
  21  package ratelimit // import "go.uber.org/ratelimit"
  22  
  23  import (
  24  	"time"
  25  
  26  	"sync/atomic"
  27  	"unsafe"
  28  )
  29  
  30  type state struct {
  31  	last     time.Time
  32  	sleepFor time.Duration
  33  }
  34  
  35  type atomicLimiter struct {
  36  	state unsafe.Pointer
  37  	//lint:ignore U1000 Padding is unused but it is crucial to maintain performance
  38  	// of this rate limiter in case of collocation with other frequently accessed memory.
  39  	padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.
  40  
  41  	perRequest time.Duration
  42  	maxSlack   time.Duration
  43  	clock      Clock
  44  }
  45  
  46  // newAtomicBased returns a new atomic based limiter.
  47  func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
  48  	// TODO consider moving config building to the implementation
  49  	// independent code.
  50  	config := buildConfig(opts)
  51  	perRequest := config.per / time.Duration(rate)
  52  	l := &atomicLimiter{
  53  		perRequest: perRequest,
  54  		maxSlack:   -1 * time.Duration(config.slack) * perRequest,
  55  		clock:      config.clock,
  56  	}
  57  
  58  	initialState := state{
  59  		last:     time.Time{},
  60  		sleepFor: 0,
  61  	}
  62  	atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
  63  	return l
  64  }
  65  
  66  // Take blocks to ensure that the time spent between multiple
  67  // Take calls is on average per/rate.
  68  func (t *atomicLimiter) Take() time.Time {
  69  	var (
  70  		newState state
  71  		taken    bool
  72  		interval time.Duration
  73  	)
  74  	for !taken {
  75  		now := t.clock.Now()
  76  
  77  		previousStatePointer := atomic.LoadPointer(&t.state)
  78  		oldState := (*state)(previousStatePointer)
  79  
  80  		newState = state{
  81  			last:     now,
  82  			sleepFor: oldState.sleepFor,
  83  		}
  84  
  85  		// If this is our first request, then we allow it.
  86  		if oldState.last.IsZero() {
  87  			taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
  88  			continue
  89  		}
  90  
  91  		// sleepFor calculates how much time we should sleep based on
  92  		// the perRequest budget and how long the last request took.
  93  		// Since the request may take longer than the budget, this number
  94  		// can get negative, and is summed across requests.
  95  		newState.sleepFor += t.perRequest - now.Sub(oldState.last)
  96  		// We shouldn't allow sleepFor to get too negative, since it would mean that
  97  		// a service that slowed down a lot for a short period of time would get
  98  		// a much higher RPS following that.
  99  		if newState.sleepFor < t.maxSlack {
 100  			newState.sleepFor = t.maxSlack
 101  		}
 102  		if newState.sleepFor > 0 {
 103  			newState.last = newState.last.Add(newState.sleepFor)
 104  			interval, newState.sleepFor = newState.sleepFor, 0
 105  		}
 106  		taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
 107  	}
 108  	t.clock.Sleep(interval)
 109  	return newState.last
 110  }
 111