limiter_mutexbased.go raw

   1  // Copyright (c) 2016,2020 Uber Technologies, Inc.
   2  //
   3  // Permission is hereby granted, free of charge, to any person obtaining a copy
   4  // of this software and associated documentation files (the "Software"), to deal
   5  // in the Software without restriction, including without limitation the rights
   6  // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
   7  // copies of the Software, and to permit persons to whom the Software is
   8  // furnished to do so, subject to the following conditions:
   9  //
  10  // The above copyright notice and this permission notice shall be included in
  11  // all copies or substantial portions of the Software.
  12  //
  13  // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14  // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15  // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16  // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17  // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  18  // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  19  // THE SOFTWARE.
  20  
  21  package ratelimit // import "go.uber.org/ratelimit"
  22  
  23  import (
  24  	"sync"
  25  	"time"
  26  )
  27  
  28  type mutexLimiter struct {
  29  	sync.Mutex
  30  	last       time.Time
  31  	sleepFor   time.Duration
  32  	perRequest time.Duration
  33  	maxSlack   time.Duration
  34  	clock      Clock
  35  }
  36  
  37  // newMutexBased returns a new mutex based limiter.
  38  func newMutexBased(rate int, opts ...Option) *mutexLimiter {
  39  	// TODO consider moving config building to the implementation
  40  	// independent code.
  41  	config := buildConfig(opts)
  42  	perRequest := config.per / time.Duration(rate)
  43  	l := &mutexLimiter{
  44  		perRequest: perRequest,
  45  		maxSlack:   -1 * time.Duration(config.slack) * perRequest,
  46  		clock:      config.clock,
  47  	}
  48  	return l
  49  }
  50  
  51  // Take blocks to ensure that the time spent between multiple
  52  // Take calls is on average per/rate.
  53  func (t *mutexLimiter) Take() time.Time {
  54  	t.Lock()
  55  	defer t.Unlock()
  56  
  57  	now := t.clock.Now()
  58  
  59  	// If this is our first request, then we allow it.
  60  	if t.last.IsZero() {
  61  		t.last = now
  62  		return t.last
  63  	}
  64  
  65  	// sleepFor calculates how much time we should sleep based on
  66  	// the perRequest budget and how long the last request took.
  67  	// Since the request may take longer than the budget, this number
  68  	// can get negative, and is summed across requests.
  69  	t.sleepFor += t.perRequest - now.Sub(t.last)
  70  
  71  	// We shouldn't allow sleepFor to get too negative, since it would mean that
  72  	// a service that slowed down a lot for a short period of time would get
  73  	// a much higher RPS following that.
  74  	if t.sleepFor < t.maxSlack {
  75  		t.sleepFor = t.maxSlack
  76  	}
  77  
  78  	// If sleepFor is positive, then we should sleep now.
  79  	if t.sleepFor > 0 {
  80  		t.clock.Sleep(t.sleepFor)
  81  		t.last = now.Add(t.sleepFor)
  82  		t.sleepFor = 0
  83  	} else {
  84  		t.last = now
  85  	}
  86  
  87  	return t.last
  88  }
  89