buffer_pool.go raw

   1  /*
   2   *
   3   * Copyright 2024 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package mem
  20  
  21  import (
  22  	"sort"
  23  	"sync"
  24  
  25  	"google.golang.org/grpc/internal"
  26  )
  27  
  28  // BufferPool is a pool of buffers that can be shared and reused, resulting in
  29  // decreased memory allocation.
  30  type BufferPool interface {
  31  	// Get returns a buffer with specified length from the pool.
  32  	Get(length int) *[]byte
  33  
  34  	// Put returns a buffer to the pool.
  35  	//
  36  	// The provided pointer must hold a prefix of the buffer obtained via
  37  	// BufferPool.Get to ensure the buffer's entire capacity can be re-used.
  38  	Put(*[]byte)
  39  }
  40  
  41  const goPageSize = 4 << 10 // 4KiB. N.B. this must be a power of 2.
  42  
  43  var defaultBufferPoolSizes = []int{
  44  	256,
  45  	goPageSize,
  46  	16 << 10, // 16KB (max HTTP/2 frame size used by gRPC)
  47  	32 << 10, // 32KB (default buffer size for io.Copy)
  48  	1 << 20,  // 1MB
  49  }
  50  
  51  var defaultBufferPool BufferPool
  52  
  53  func init() {
  54  	defaultBufferPool = NewTieredBufferPool(defaultBufferPoolSizes...)
  55  
  56  	internal.SetDefaultBufferPoolForTesting = func(pool BufferPool) {
  57  		defaultBufferPool = pool
  58  	}
  59  
  60  	internal.SetBufferPoolingThresholdForTesting = func(threshold int) {
  61  		bufferPoolingThreshold = threshold
  62  	}
  63  }
  64  
  65  // DefaultBufferPool returns the current default buffer pool. It is a BufferPool
  66  // created with NewBufferPool that uses a set of default sizes optimized for
  67  // expected workflows.
  68  func DefaultBufferPool() BufferPool {
  69  	return defaultBufferPool
  70  }
  71  
  72  // NewTieredBufferPool returns a BufferPool implementation that uses multiple
  73  // underlying pools of the given pool sizes.
  74  func NewTieredBufferPool(poolSizes ...int) BufferPool {
  75  	sort.Ints(poolSizes)
  76  	pools := make([]*sizedBufferPool, len(poolSizes))
  77  	for i, s := range poolSizes {
  78  		pools[i] = newSizedBufferPool(s)
  79  	}
  80  	return &tieredBufferPool{
  81  		sizedPools: pools,
  82  	}
  83  }
  84  
  85  // tieredBufferPool implements the BufferPool interface with multiple tiers of
  86  // buffer pools for different sizes of buffers.
  87  type tieredBufferPool struct {
  88  	sizedPools   []*sizedBufferPool
  89  	fallbackPool simpleBufferPool
  90  }
  91  
  92  func (p *tieredBufferPool) Get(size int) *[]byte {
  93  	return p.getPool(size).Get(size)
  94  }
  95  
  96  func (p *tieredBufferPool) Put(buf *[]byte) {
  97  	p.getPool(cap(*buf)).Put(buf)
  98  }
  99  
 100  func (p *tieredBufferPool) getPool(size int) BufferPool {
 101  	poolIdx := sort.Search(len(p.sizedPools), func(i int) bool {
 102  		return p.sizedPools[i].defaultSize >= size
 103  	})
 104  
 105  	if poolIdx == len(p.sizedPools) {
 106  		return &p.fallbackPool
 107  	}
 108  
 109  	return p.sizedPools[poolIdx]
 110  }
 111  
 112  // sizedBufferPool is a BufferPool implementation that is optimized for specific
 113  // buffer sizes. For example, HTTP/2 frames within gRPC have a default max size
 114  // of 16kb and a sizedBufferPool can be configured to only return buffers with a
 115  // capacity of 16kb. Note that however it does not support returning larger
 116  // buffers and in fact panics if such a buffer is requested. Because of this,
 117  // this BufferPool implementation is not meant to be used on its own and rather
 118  // is intended to be embedded in a tieredBufferPool such that Get is only
 119  // invoked when the required size is smaller than or equal to defaultSize.
 120  type sizedBufferPool struct {
 121  	pool        sync.Pool
 122  	defaultSize int
 123  }
 124  
 125  func (p *sizedBufferPool) Get(size int) *[]byte {
 126  	buf, ok := p.pool.Get().(*[]byte)
 127  	if !ok {
 128  		buf := make([]byte, size, p.defaultSize)
 129  		return &buf
 130  	}
 131  	b := *buf
 132  	clear(b[:cap(b)])
 133  	*buf = b[:size]
 134  	return buf
 135  }
 136  
 137  func (p *sizedBufferPool) Put(buf *[]byte) {
 138  	if cap(*buf) < p.defaultSize {
 139  		// Ignore buffers that are too small to fit in the pool. Otherwise, when
 140  		// Get is called it will panic as it tries to index outside the bounds
 141  		// of the buffer.
 142  		return
 143  	}
 144  	p.pool.Put(buf)
 145  }
 146  
 147  func newSizedBufferPool(size int) *sizedBufferPool {
 148  	return &sizedBufferPool{
 149  		defaultSize: size,
 150  	}
 151  }
 152  
 153  var _ BufferPool = (*simpleBufferPool)(nil)
 154  
 155  // simpleBufferPool is an implementation of the BufferPool interface that
 156  // attempts to pool buffers with a sync.Pool. When Get is invoked, it tries to
 157  // acquire a buffer from the pool but if that buffer is too small, it returns it
 158  // to the pool and creates a new one.
 159  type simpleBufferPool struct {
 160  	pool sync.Pool
 161  }
 162  
 163  func (p *simpleBufferPool) Get(size int) *[]byte {
 164  	bs, ok := p.pool.Get().(*[]byte)
 165  	if ok && cap(*bs) >= size {
 166  		clear((*bs)[:cap(*bs)])
 167  		*bs = (*bs)[:size]
 168  		return bs
 169  	}
 170  
 171  	// A buffer was pulled from the pool, but it is too small. Put it back in
 172  	// the pool and create one large enough.
 173  	if ok {
 174  		p.pool.Put(bs)
 175  	}
 176  
 177  	// If we're going to allocate, round up to the nearest page. This way if
 178  	// requests frequently arrive with small variation we don't allocate
 179  	// repeatedly if we get unlucky and they increase over time. By default we
 180  	// only allocate here if size > 1MiB. Because goPageSize is a power of 2, we
 181  	// can round up efficiently.
 182  	allocSize := (size + goPageSize - 1) & ^(goPageSize - 1)
 183  
 184  	b := make([]byte, size, allocSize)
 185  	return &b
 186  }
 187  
 188  func (p *simpleBufferPool) Put(buf *[]byte) {
 189  	p.pool.Put(buf)
 190  }
 191  
 192  var _ BufferPool = NopBufferPool{}
 193  
 194  // NopBufferPool is a buffer pool that returns new buffers without pooling.
 195  type NopBufferPool struct{}
 196  
 197  // Get returns a buffer with specified length from the pool.
 198  func (NopBufferPool) Get(length int) *[]byte {
 199  	b := make([]byte, length)
 200  	return &b
 201  }
 202  
 203  // Put returns a buffer to the pool.
 204  func (NopBufferPool) Put(*[]byte) {
 205  }
 206