buffers.go raw

   1  /*
   2   *
   3   * Copyright 2024 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  // Package mem provides utilities that facilitate memory reuse in byte slices
  20  // that are used as buffers.
  21  //
  22  // # Experimental
  23  //
  24  // Notice: All APIs in this package are EXPERIMENTAL and may be changed or
  25  // removed in a later release.
  26  package mem
  27  
  28  import (
  29  	"fmt"
  30  	"sync"
  31  	"sync/atomic"
  32  )
  33  
  34  // A Buffer represents a reference counted piece of data (in bytes) that can be
  35  // acquired by a call to NewBuffer() or Copy(). A reference to a Buffer may be
  36  // released by calling Free(), which invokes the free function given at creation
  37  // only after all references are released.
  38  //
  39  // Note that a Buffer is not safe for concurrent access and instead each
  40  // goroutine should use its own reference to the data, which can be acquired via
  41  // a call to Ref().
  42  //
  43  // Attempts to access the underlying data after releasing the reference to the
  44  // Buffer will panic.
  45  type Buffer interface {
  46  	// ReadOnlyData returns the underlying byte slice. Note that it is undefined
  47  	// behavior to modify the contents of this slice in any way.
  48  	ReadOnlyData() []byte
  49  	// Ref increases the reference counter for this Buffer.
  50  	Ref()
  51  	// Free decrements this Buffer's reference counter and frees the underlying
  52  	// byte slice if the counter reaches 0 as a result of this call.
  53  	Free()
  54  	// Len returns the Buffer's size.
  55  	Len() int
  56  
  57  	split(n int) (left, right Buffer)
  58  	read(buf []byte) (int, Buffer)
  59  }
  60  
  61  var (
  62  	bufferPoolingThreshold = 1 << 10
  63  
  64  	bufferObjectPool = sync.Pool{New: func() any { return new(buffer) }}
  65  	refObjectPool    = sync.Pool{New: func() any { return new(atomic.Int32) }}
  66  )
  67  
  68  // IsBelowBufferPoolingThreshold returns true if the given size is less than or
  69  // equal to the threshold for buffer pooling. This is used to determine whether
  70  // to pool buffers or allocate them directly.
  71  func IsBelowBufferPoolingThreshold(size int) bool {
  72  	return size <= bufferPoolingThreshold
  73  }
  74  
  75  type buffer struct {
  76  	origData *[]byte
  77  	data     []byte
  78  	refs     *atomic.Int32
  79  	pool     BufferPool
  80  }
  81  
  82  func newBuffer() *buffer {
  83  	return bufferObjectPool.Get().(*buffer)
  84  }
  85  
  86  // NewBuffer creates a new Buffer from the given data, initializing the reference
  87  // counter to 1. The data will then be returned to the given pool when all
  88  // references to the returned Buffer are released. As a special case to avoid
  89  // additional allocations, if the given buffer pool is nil, the returned buffer
  90  // will be a "no-op" Buffer where invoking Buffer.Free() does nothing and the
  91  // underlying data is never freed.
  92  //
  93  // Note that the backing array of the given data is not copied.
  94  func NewBuffer(data *[]byte, pool BufferPool) Buffer {
  95  	// Use the buffer's capacity instead of the length, otherwise buffers may
  96  	// not be reused under certain conditions. For example, if a large buffer
  97  	// is acquired from the pool, but fewer bytes than the buffering threshold
  98  	// are written to it, the buffer will not be returned to the pool.
  99  	if pool == nil || IsBelowBufferPoolingThreshold(cap(*data)) {
 100  		return (SliceBuffer)(*data)
 101  	}
 102  	b := newBuffer()
 103  	b.origData = data
 104  	b.data = *data
 105  	b.pool = pool
 106  	b.refs = refObjectPool.Get().(*atomic.Int32)
 107  	b.refs.Add(1)
 108  	return b
 109  }
 110  
 111  // Copy creates a new Buffer from the given data, initializing the reference
 112  // counter to 1.
 113  //
 114  // It acquires a []byte from the given pool and copies over the backing array
 115  // of the given data. The []byte acquired from the pool is returned to the
 116  // pool when all references to the returned Buffer are released.
 117  func Copy(data []byte, pool BufferPool) Buffer {
 118  	if IsBelowBufferPoolingThreshold(len(data)) {
 119  		buf := make(SliceBuffer, len(data))
 120  		copy(buf, data)
 121  		return buf
 122  	}
 123  
 124  	buf := pool.Get(len(data))
 125  	copy(*buf, data)
 126  	return NewBuffer(buf, pool)
 127  }
 128  
 129  func (b *buffer) ReadOnlyData() []byte {
 130  	if b.refs == nil {
 131  		panic("Cannot read freed buffer")
 132  	}
 133  	return b.data
 134  }
 135  
 136  func (b *buffer) Ref() {
 137  	if b.refs == nil {
 138  		panic("Cannot ref freed buffer")
 139  	}
 140  	b.refs.Add(1)
 141  }
 142  
 143  func (b *buffer) Free() {
 144  	if b.refs == nil {
 145  		panic("Cannot free freed buffer")
 146  	}
 147  
 148  	refs := b.refs.Add(-1)
 149  	switch {
 150  	case refs > 0:
 151  		return
 152  	case refs == 0:
 153  		if b.pool != nil {
 154  			b.pool.Put(b.origData)
 155  		}
 156  
 157  		refObjectPool.Put(b.refs)
 158  		b.origData = nil
 159  		b.data = nil
 160  		b.refs = nil
 161  		b.pool = nil
 162  		bufferObjectPool.Put(b)
 163  	default:
 164  		panic("Cannot free freed buffer")
 165  	}
 166  }
 167  
 168  func (b *buffer) Len() int {
 169  	return len(b.ReadOnlyData())
 170  }
 171  
 172  func (b *buffer) split(n int) (Buffer, Buffer) {
 173  	if b.refs == nil {
 174  		panic("Cannot split freed buffer")
 175  	}
 176  
 177  	b.refs.Add(1)
 178  	split := newBuffer()
 179  	split.origData = b.origData
 180  	split.data = b.data[n:]
 181  	split.refs = b.refs
 182  	split.pool = b.pool
 183  
 184  	b.data = b.data[:n]
 185  
 186  	return b, split
 187  }
 188  
 189  func (b *buffer) read(buf []byte) (int, Buffer) {
 190  	if b.refs == nil {
 191  		panic("Cannot read freed buffer")
 192  	}
 193  
 194  	n := copy(buf, b.data)
 195  	if n == len(b.data) {
 196  		b.Free()
 197  		return n, nil
 198  	}
 199  
 200  	b.data = b.data[n:]
 201  	return n, b
 202  }
 203  
 204  func (b *buffer) String() string {
 205  	return fmt.Sprintf("mem.Buffer(%p, data: %p, length: %d)", b, b.ReadOnlyData(), len(b.ReadOnlyData()))
 206  }
 207  
 208  // ReadUnsafe reads bytes from the given Buffer into the provided slice.
 209  // It does not perform safety checks.
 210  func ReadUnsafe(dst []byte, buf Buffer) (int, Buffer) {
 211  	return buf.read(dst)
 212  }
 213  
 214  // SplitUnsafe modifies the receiver to point to the first n bytes while it
 215  // returns a new reference to the remaining bytes. The returned Buffer
 216  // functions just like a normal reference acquired using Ref().
 217  func SplitUnsafe(buf Buffer, n int) (left, right Buffer) {
 218  	return buf.split(n)
 219  }
 220  
 221  type emptyBuffer struct{}
 222  
 223  func (e emptyBuffer) ReadOnlyData() []byte {
 224  	return nil
 225  }
 226  
 227  func (e emptyBuffer) Ref()  {}
 228  func (e emptyBuffer) Free() {}
 229  
 230  func (e emptyBuffer) Len() int {
 231  	return 0
 232  }
 233  
 234  func (e emptyBuffer) split(int) (left, right Buffer) {
 235  	return e, e
 236  }
 237  
 238  func (e emptyBuffer) read([]byte) (int, Buffer) {
 239  	return 0, e
 240  }
 241  
 242  // SliceBuffer is a Buffer implementation that wraps a byte slice. It provides
 243  // methods for reading, splitting, and managing the byte slice.
 244  type SliceBuffer []byte
 245  
 246  // ReadOnlyData returns the byte slice.
 247  func (s SliceBuffer) ReadOnlyData() []byte { return s }
 248  
 249  // Ref is a noop implementation of Ref.
 250  func (s SliceBuffer) Ref() {}
 251  
 252  // Free is a noop implementation of Free.
 253  func (s SliceBuffer) Free() {}
 254  
 255  // Len is a noop implementation of Len.
 256  func (s SliceBuffer) Len() int { return len(s) }
 257  
 258  func (s SliceBuffer) split(n int) (left, right Buffer) {
 259  	return s[:n], s[n:]
 260  }
 261  
 262  func (s SliceBuffer) read(buf []byte) (int, Buffer) {
 263  	n := copy(buf, s)
 264  	if n == len(s) {
 265  		return n, nil
 266  	}
 267  	return n, s[n:]
 268  }
 269