bdp_estimator.go raw

   1  /*
   2   *
   3   * Copyright 2017 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package transport
  20  
  21  import (
  22  	"sync"
  23  	"time"
  24  )
  25  
  26  const (
  27  	// bdpLimit is the maximum value the flow control windows will be increased
  28  	// to.  TCP typically limits this to 4MB, but some systems go up to 16MB.
  29  	// Since this is only a limit, it is safe to make it optimistic.
  30  	bdpLimit = (1 << 20) * 16
  31  	// alpha is a constant factor used to keep a moving average
  32  	// of RTTs.
  33  	alpha = 0.9
  34  	// If the current bdp sample is greater than or equal to
  35  	// our beta * our estimated bdp and the current bandwidth
  36  	// sample is the maximum bandwidth observed so far, we
  37  	// increase our bbp estimate by a factor of gamma.
  38  	beta = 0.66
  39  	// To put our bdp to be smaller than or equal to twice the real BDP,
  40  	// we should multiply our current sample with 4/3, however to round things out
  41  	// we use 2 as the multiplication factor.
  42  	gamma = 2
  43  )
  44  
  45  // Adding arbitrary data to ping so that its ack can be identified.
  46  // Easter-egg: what does the ping message say?
  47  var bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}}
  48  
  49  type bdpEstimator struct {
  50  	// sentAt is the time when the ping was sent.
  51  	sentAt time.Time
  52  
  53  	mu sync.Mutex
  54  	// bdp is the current bdp estimate.
  55  	bdp uint32
  56  	// sample is the number of bytes received in one measurement cycle.
  57  	sample uint32
  58  	// bwMax is the maximum bandwidth noted so far (bytes/sec).
  59  	bwMax float64
  60  	// bool to keep track of the beginning of a new measurement cycle.
  61  	isSent bool
  62  	// Callback to update the window sizes.
  63  	updateFlowControl func(n uint32)
  64  	// sampleCount is the number of samples taken so far.
  65  	sampleCount uint64
  66  	// round trip time (seconds)
  67  	rtt float64
  68  }
  69  
  70  // timesnap registers the time bdp ping was sent out so that
  71  // network rtt can be calculated when its ack is received.
  72  // It is called (by controller) when the bdpPing is
  73  // being written on the wire.
  74  func (b *bdpEstimator) timesnap(d [8]byte) {
  75  	if bdpPing.data != d {
  76  		return
  77  	}
  78  	b.sentAt = time.Now()
  79  }
  80  
  81  // add adds bytes to the current sample for calculating bdp.
  82  // It returns true only if a ping must be sent. This can be used
  83  // by the caller (handleData) to make decision about batching
  84  // a window update with it.
  85  func (b *bdpEstimator) add(n uint32) bool {
  86  	b.mu.Lock()
  87  	defer b.mu.Unlock()
  88  	if b.bdp == bdpLimit {
  89  		return false
  90  	}
  91  	if !b.isSent {
  92  		b.isSent = true
  93  		b.sample = n
  94  		b.sentAt = time.Time{}
  95  		b.sampleCount++
  96  		return true
  97  	}
  98  	b.sample += n
  99  	return false
 100  }
 101  
 102  // calculate is called when an ack for a bdp ping is received.
 103  // Here we calculate the current bdp and bandwidth sample and
 104  // decide if the flow control windows should go up.
 105  func (b *bdpEstimator) calculate(d [8]byte) {
 106  	// Check if the ping acked for was the bdp ping.
 107  	if bdpPing.data != d {
 108  		return
 109  	}
 110  	b.mu.Lock()
 111  	rttSample := time.Since(b.sentAt).Seconds()
 112  	if b.sampleCount < 10 {
 113  		// Bootstrap rtt with an average of first 10 rtt samples.
 114  		b.rtt += (rttSample - b.rtt) / float64(b.sampleCount)
 115  	} else {
 116  		// Heed to the recent past more.
 117  		b.rtt += (rttSample - b.rtt) * float64(alpha)
 118  	}
 119  	b.isSent = false
 120  	// The number of bytes accumulated so far in the sample is smaller
 121  	// than or equal to 1.5 times the real BDP on a saturated connection.
 122  	bwCurrent := float64(b.sample) / (b.rtt * float64(1.5))
 123  	if bwCurrent > b.bwMax {
 124  		b.bwMax = bwCurrent
 125  	}
 126  	// If the current sample (which is smaller than or equal to the 1.5 times the real BDP) is
 127  	// greater than or equal to 2/3rd our perceived bdp AND this is the maximum bandwidth seen so far, we
 128  	// should update our perception of the network BDP.
 129  	if float64(b.sample) >= beta*float64(b.bdp) && bwCurrent == b.bwMax && b.bdp != bdpLimit {
 130  		sampleFloat := float64(b.sample)
 131  		b.bdp = uint32(gamma * sampleFloat)
 132  		if b.bdp > bdpLimit {
 133  			b.bdp = bdpLimit
 134  		}
 135  		bdp := b.bdp
 136  		b.mu.Unlock()
 137  		b.updateFlowControl(bdp)
 138  		return
 139  	}
 140  	b.mu.Unlock()
 141  }
 142