buffer_slice.go raw

   1  /*
   2   *
   3   * Copyright 2024 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package mem
  20  
  21  import (
  22  	"fmt"
  23  	"io"
  24  )
  25  
  26  const (
  27  	// 32 KiB is what io.Copy uses.
  28  	readAllBufSize = 32 * 1024
  29  )
  30  
  31  // BufferSlice offers a means to represent data that spans one or more Buffer
  32  // instances. A BufferSlice is meant to be immutable after creation, and methods
  33  // like Ref create and return copies of the slice. This is why all methods have
  34  // value receivers rather than pointer receivers.
  35  //
  36  // Note that any of the methods that read the underlying buffers such as Ref,
  37  // Len or CopyTo etc., will panic if any underlying buffers have already been
  38  // freed. It is recommended to not directly interact with any of the underlying
  39  // buffers directly, rather such interactions should be mediated through the
  40  // various methods on this type.
  41  //
  42  // By convention, any APIs that return (mem.BufferSlice, error) should reduce
  43  // the burden on the caller by never returning a mem.BufferSlice that needs to
  44  // be freed if the error is non-nil, unless explicitly stated.
  45  type BufferSlice []Buffer
  46  
  47  // Len returns the sum of the length of all the Buffers in this slice.
  48  //
  49  // # Warning
  50  //
  51  // Invoking the built-in len on a BufferSlice will return the number of buffers
  52  // in the slice, and *not* the value returned by this function.
  53  func (s BufferSlice) Len() int {
  54  	var length int
  55  	for _, b := range s {
  56  		length += b.Len()
  57  	}
  58  	return length
  59  }
  60  
  61  // Ref invokes Ref on each buffer in the slice.
  62  func (s BufferSlice) Ref() {
  63  	for _, b := range s {
  64  		b.Ref()
  65  	}
  66  }
  67  
  68  // Free invokes Buffer.Free() on each Buffer in the slice.
  69  func (s BufferSlice) Free() {
  70  	for _, b := range s {
  71  		b.Free()
  72  	}
  73  }
  74  
  75  // CopyTo copies each of the underlying Buffer's data into the given buffer,
  76  // returning the number of bytes copied. Has the same semantics as the copy
  77  // builtin in that it will copy as many bytes as it can, stopping when either dst
  78  // is full or s runs out of data, returning the minimum of s.Len() and len(dst).
  79  func (s BufferSlice) CopyTo(dst []byte) int {
  80  	off := 0
  81  	for _, b := range s {
  82  		off += copy(dst[off:], b.ReadOnlyData())
  83  	}
  84  	return off
  85  }
  86  
  87  // Materialize concatenates all the underlying Buffer's data into a single
  88  // contiguous buffer using CopyTo.
  89  func (s BufferSlice) Materialize() []byte {
  90  	l := s.Len()
  91  	if l == 0 {
  92  		return nil
  93  	}
  94  	out := make([]byte, l)
  95  	s.CopyTo(out)
  96  	return out
  97  }
  98  
  99  // MaterializeToBuffer functions like Materialize except that it writes the data
 100  // to a single Buffer pulled from the given BufferPool.
 101  //
 102  // As a special case, if the input BufferSlice only actually has one Buffer, this
 103  // function simply increases the refcount before returning said Buffer. Freeing this
 104  // buffer won't release it until the BufferSlice is itself released.
 105  func (s BufferSlice) MaterializeToBuffer(pool BufferPool) Buffer {
 106  	if len(s) == 1 {
 107  		s[0].Ref()
 108  		return s[0]
 109  	}
 110  	sLen := s.Len()
 111  	if sLen == 0 {
 112  		return emptyBuffer{}
 113  	}
 114  	buf := pool.Get(sLen)
 115  	s.CopyTo(*buf)
 116  	return NewBuffer(buf, pool)
 117  }
 118  
 119  // Reader returns a new Reader for the input slice after taking references to
 120  // each underlying buffer.
 121  func (s BufferSlice) Reader() *Reader {
 122  	s.Ref()
 123  	return &Reader{
 124  		data: s,
 125  		len:  s.Len(),
 126  	}
 127  }
 128  
 129  // Reader exposes a BufferSlice's data as an io.Reader, allowing it to interface
 130  // with other systems.
 131  //
 132  // Buffers will be freed as they are read.
 133  //
 134  // A Reader can be constructed from a BufferSlice; alternatively the zero value
 135  // of a Reader may be used after calling Reset on it.
 136  type Reader struct {
 137  	data BufferSlice
 138  	len  int
 139  	// The index into data[0].ReadOnlyData().
 140  	bufferIdx int
 141  }
 142  
 143  // Remaining returns the number of unread bytes remaining in the slice.
 144  func (r *Reader) Remaining() int {
 145  	return r.len
 146  }
 147  
 148  // Reset frees the currently held buffer slice and starts reading from the
 149  // provided slice. This allows reusing the reader object.
 150  func (r *Reader) Reset(s BufferSlice) {
 151  	r.data.Free()
 152  	s.Ref()
 153  	r.data = s
 154  	r.len = s.Len()
 155  	r.bufferIdx = 0
 156  }
 157  
 158  // Close frees the underlying BufferSlice and never returns an error. Subsequent
 159  // calls to Read will return (0, io.EOF).
 160  func (r *Reader) Close() error {
 161  	r.data.Free()
 162  	r.data = nil
 163  	r.len = 0
 164  	return nil
 165  }
 166  
 167  func (r *Reader) freeFirstBufferIfEmpty() bool {
 168  	if len(r.data) == 0 || r.bufferIdx != len(r.data[0].ReadOnlyData()) {
 169  		return false
 170  	}
 171  
 172  	r.data[0].Free()
 173  	r.data = r.data[1:]
 174  	r.bufferIdx = 0
 175  	return true
 176  }
 177  
 178  func (r *Reader) Read(buf []byte) (n int, _ error) {
 179  	if r.len == 0 {
 180  		return 0, io.EOF
 181  	}
 182  
 183  	for len(buf) != 0 && r.len != 0 {
 184  		// Copy as much as possible from the first Buffer in the slice into the
 185  		// given byte slice.
 186  		data := r.data[0].ReadOnlyData()
 187  		copied := copy(buf, data[r.bufferIdx:])
 188  		r.len -= copied       // Reduce len by the number of bytes copied.
 189  		r.bufferIdx += copied // Increment the buffer index.
 190  		n += copied           // Increment the total number of bytes read.
 191  		buf = buf[copied:]    // Shrink the given byte slice.
 192  
 193  		// If we have copied all the data from the first Buffer, free it and advance to
 194  		// the next in the slice.
 195  		r.freeFirstBufferIfEmpty()
 196  	}
 197  
 198  	return n, nil
 199  }
 200  
 201  // ReadByte reads a single byte.
 202  func (r *Reader) ReadByte() (byte, error) {
 203  	if r.len == 0 {
 204  		return 0, io.EOF
 205  	}
 206  
 207  	// There may be any number of empty buffers in the slice, clear them all until a
 208  	// non-empty buffer is reached. This is guaranteed to exit since r.len is not 0.
 209  	for r.freeFirstBufferIfEmpty() {
 210  	}
 211  
 212  	b := r.data[0].ReadOnlyData()[r.bufferIdx]
 213  	r.len--
 214  	r.bufferIdx++
 215  	// Free the first buffer in the slice if the last byte was read
 216  	r.freeFirstBufferIfEmpty()
 217  	return b, nil
 218  }
 219  
 220  var _ io.Writer = (*writer)(nil)
 221  
 222  type writer struct {
 223  	buffers *BufferSlice
 224  	pool    BufferPool
 225  }
 226  
 227  func (w *writer) Write(p []byte) (n int, err error) {
 228  	b := Copy(p, w.pool)
 229  	*w.buffers = append(*w.buffers, b)
 230  	return b.Len(), nil
 231  }
 232  
 233  // NewWriter wraps the given BufferSlice and BufferPool to implement the
 234  // io.Writer interface. Every call to Write copies the contents of the given
 235  // buffer into a new Buffer pulled from the given pool and the Buffer is
 236  // added to the given BufferSlice.
 237  func NewWriter(buffers *BufferSlice, pool BufferPool) io.Writer {
 238  	return &writer{buffers: buffers, pool: pool}
 239  }
 240  
 241  // ReadAll reads from r until an error or EOF and returns the data it read.
 242  // A successful call returns err == nil, not err == EOF. Because ReadAll is
 243  // defined to read from src until EOF, it does not treat an EOF from Read
 244  // as an error to be reported.
 245  //
 246  // Important: A failed call returns a non-nil error and may also return
 247  // partially read buffers. It is the responsibility of the caller to free the
 248  // BufferSlice returned, or its memory will not be reused.
 249  func ReadAll(r io.Reader, pool BufferPool) (BufferSlice, error) {
 250  	var result BufferSlice
 251  	if wt, ok := r.(io.WriterTo); ok {
 252  		// This is more optimal since wt knows the size of chunks it wants to
 253  		// write and, hence, we can allocate buffers of an optimal size to fit
 254  		// them. E.g. might be a single big chunk, and we wouldn't chop it
 255  		// into pieces.
 256  		w := NewWriter(&result, pool)
 257  		_, err := wt.WriteTo(w)
 258  		return result, err
 259  	}
 260  nextBuffer:
 261  	for {
 262  		buf := pool.Get(readAllBufSize)
 263  		// We asked for 32KiB but may have been given a bigger buffer.
 264  		// Use all of it if that's the case.
 265  		*buf = (*buf)[:cap(*buf)]
 266  		usedCap := 0
 267  		for {
 268  			n, err := r.Read((*buf)[usedCap:])
 269  			usedCap += n
 270  			if err != nil {
 271  				if usedCap == 0 {
 272  					// Nothing in this buf, put it back
 273  					pool.Put(buf)
 274  				} else {
 275  					*buf = (*buf)[:usedCap]
 276  					result = append(result, NewBuffer(buf, pool))
 277  				}
 278  				if err == io.EOF {
 279  					err = nil
 280  				}
 281  				return result, err
 282  			}
 283  			if len(*buf) == usedCap {
 284  				result = append(result, NewBuffer(buf, pool))
 285  				continue nextBuffer
 286  			}
 287  		}
 288  	}
 289  }
 290  
 291  // Discard skips the next n bytes, returning the number of bytes discarded.
 292  //
 293  // It frees buffers as they are fully consumed.
 294  //
 295  // If Discard skips fewer than n bytes, it also returns an error.
 296  func (r *Reader) Discard(n int) (discarded int, err error) {
 297  	total := n
 298  	for n > 0 && r.len > 0 {
 299  		curData := r.data[0].ReadOnlyData()
 300  		curSize := min(n, len(curData)-r.bufferIdx)
 301  		n -= curSize
 302  		r.len -= curSize
 303  		r.bufferIdx += curSize
 304  		if r.bufferIdx >= len(curData) {
 305  			r.data[0].Free()
 306  			r.data = r.data[1:]
 307  			r.bufferIdx = 0
 308  		}
 309  	}
 310  	discarded = total - n
 311  	if n > 0 {
 312  		return discarded, fmt.Errorf("insufficient bytes in reader")
 313  	}
 314  	return discarded, nil
 315  }
 316  
 317  // Peek returns the next n bytes without advancing the reader.
 318  //
 319  // Peek appends results to the provided res slice and returns the updated slice.
 320  // This pattern allows re-using the storage of res if it has sufficient
 321  // capacity.
 322  //
 323  // The returned subslices are views into the underlying buffers and are only
 324  // valid until the reader is advanced past the corresponding buffer.
 325  //
 326  // If Peek returns fewer than n bytes, it also returns an error.
 327  func (r *Reader) Peek(n int, res [][]byte) ([][]byte, error) {
 328  	for i := 0; n > 0 && i < len(r.data); i++ {
 329  		curData := r.data[i].ReadOnlyData()
 330  		start := 0
 331  		if i == 0 {
 332  			start = r.bufferIdx
 333  		}
 334  		curSize := min(n, len(curData)-start)
 335  		if curSize == 0 {
 336  			continue
 337  		}
 338  		res = append(res, curData[start:start+curSize])
 339  		n -= curSize
 340  	}
 341  	if n > 0 {
 342  		return nil, fmt.Errorf("insufficient bytes in reader")
 343  	}
 344  	return res, nil
 345  }
 346