databuffer.go raw

   1  // Copyright 2014 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package http2
   6  
   7  import (
   8  	"errors"
   9  	"fmt"
  10  	"sync"
  11  )
  12  
  13  // Buffer chunks are allocated from a pool to reduce pressure on GC.
  14  // The maximum wasted space per dataBuffer is 2x the largest size class,
  15  // which happens when the dataBuffer has multiple chunks and there is
  16  // one unread byte in both the first and last chunks. We use a few size
  17  // classes to minimize overheads for servers that typically receive very
  18  // small request bodies.
  19  //
  20  // TODO: Benchmark to determine if the pools are necessary. The GC may have
  21  // improved enough that we can instead allocate chunks like this:
  22  // make([]byte, max(16<<10, expectedBytesRemaining))
  23  var dataChunkPools = [...]sync.Pool{
  24  	{New: func() interface{} { return new([1 << 10]byte) }},
  25  	{New: func() interface{} { return new([2 << 10]byte) }},
  26  	{New: func() interface{} { return new([4 << 10]byte) }},
  27  	{New: func() interface{} { return new([8 << 10]byte) }},
  28  	{New: func() interface{} { return new([16 << 10]byte) }},
  29  }
  30  
  31  func getDataBufferChunk(size int64) []byte {
  32  	switch {
  33  	case size <= 1<<10:
  34  		return dataChunkPools[0].Get().(*[1 << 10]byte)[:]
  35  	case size <= 2<<10:
  36  		return dataChunkPools[1].Get().(*[2 << 10]byte)[:]
  37  	case size <= 4<<10:
  38  		return dataChunkPools[2].Get().(*[4 << 10]byte)[:]
  39  	case size <= 8<<10:
  40  		return dataChunkPools[3].Get().(*[8 << 10]byte)[:]
  41  	default:
  42  		return dataChunkPools[4].Get().(*[16 << 10]byte)[:]
  43  	}
  44  }
  45  
  46  func putDataBufferChunk(p []byte) {
  47  	switch len(p) {
  48  	case 1 << 10:
  49  		dataChunkPools[0].Put((*[1 << 10]byte)(p))
  50  	case 2 << 10:
  51  		dataChunkPools[1].Put((*[2 << 10]byte)(p))
  52  	case 4 << 10:
  53  		dataChunkPools[2].Put((*[4 << 10]byte)(p))
  54  	case 8 << 10:
  55  		dataChunkPools[3].Put((*[8 << 10]byte)(p))
  56  	case 16 << 10:
  57  		dataChunkPools[4].Put((*[16 << 10]byte)(p))
  58  	default:
  59  		panic(fmt.Sprintf("unexpected buffer len=%v", len(p)))
  60  	}
  61  }
  62  
  63  // dataBuffer is an io.ReadWriter backed by a list of data chunks.
  64  // Each dataBuffer is used to read DATA frames on a single stream.
  65  // The buffer is divided into chunks so the server can limit the
  66  // total memory used by a single connection without limiting the
  67  // request body size on any single stream.
  68  type dataBuffer struct {
  69  	chunks   [][]byte
  70  	r        int   // next byte to read is chunks[0][r]
  71  	w        int   // next byte to write is chunks[len(chunks)-1][w]
  72  	size     int   // total buffered bytes
  73  	expected int64 // we expect at least this many bytes in future Write calls (ignored if <= 0)
  74  }
  75  
  76  var errReadEmpty = errors.New("read from empty dataBuffer")
  77  
  78  // Read copies bytes from the buffer into p.
  79  // It is an error to read when no data is available.
  80  func (b *dataBuffer) Read(p []byte) (int, error) {
  81  	if b.size == 0 {
  82  		return 0, errReadEmpty
  83  	}
  84  	var ntotal int
  85  	for len(p) > 0 && b.size > 0 {
  86  		readFrom := b.bytesFromFirstChunk()
  87  		n := copy(p, readFrom)
  88  		p = p[n:]
  89  		ntotal += n
  90  		b.r += n
  91  		b.size -= n
  92  		// If the first chunk has been consumed, advance to the next chunk.
  93  		if b.r == len(b.chunks[0]) {
  94  			putDataBufferChunk(b.chunks[0])
  95  			end := len(b.chunks) - 1
  96  			copy(b.chunks[:end], b.chunks[1:])
  97  			b.chunks[end] = nil
  98  			b.chunks = b.chunks[:end]
  99  			b.r = 0
 100  		}
 101  	}
 102  	return ntotal, nil
 103  }
 104  
 105  func (b *dataBuffer) bytesFromFirstChunk() []byte {
 106  	if len(b.chunks) == 1 {
 107  		return b.chunks[0][b.r:b.w]
 108  	}
 109  	return b.chunks[0][b.r:]
 110  }
 111  
 112  // Len returns the number of bytes of the unread portion of the buffer.
 113  func (b *dataBuffer) Len() int {
 114  	return b.size
 115  }
 116  
 117  // Write appends p to the buffer.
 118  func (b *dataBuffer) Write(p []byte) (int, error) {
 119  	ntotal := len(p)
 120  	for len(p) > 0 {
 121  		// If the last chunk is empty, allocate a new chunk. Try to allocate
 122  		// enough to fully copy p plus any additional bytes we expect to
 123  		// receive. However, this may allocate less than len(p).
 124  		want := int64(len(p))
 125  		if b.expected > want {
 126  			want = b.expected
 127  		}
 128  		chunk := b.lastChunkOrAlloc(want)
 129  		n := copy(chunk[b.w:], p)
 130  		p = p[n:]
 131  		b.w += n
 132  		b.size += n
 133  		b.expected -= int64(n)
 134  	}
 135  	return ntotal, nil
 136  }
 137  
 138  func (b *dataBuffer) lastChunkOrAlloc(want int64) []byte {
 139  	if len(b.chunks) != 0 {
 140  		last := b.chunks[len(b.chunks)-1]
 141  		if b.w < len(last) {
 142  			return last
 143  		}
 144  	}
 145  	chunk := getDataBufferChunk(want)
 146  	b.chunks = append(b.chunks, chunk)
 147  	b.w = 0
 148  	return chunk
 149  }
 150