pool.go raw

   1  // Package buffer implements a buffer for serialization, consisting of a chain of []byte-s to
   2  // reduce copying and to allow reuse of individual chunks.
   3  package buffer
   4  
   5  import (
   6  	"io"
   7  	"net"
   8  	"sync"
   9  )
  10  
  11  // PoolConfig contains configuration for the allocation and reuse strategy.
  12  type PoolConfig struct {
  13  	StartSize  int // Minimum chunk size that is allocated.
  14  	PooledSize int // Minimum chunk size that is reused, reusing chunks too small will result in overhead.
  15  	MaxSize    int // Maximum chunk size that will be allocated.
  16  }
  17  
  18  var config = PoolConfig{
  19  	StartSize:  128,
  20  	PooledSize: 512,
  21  	MaxSize:    32768,
  22  }
  23  
  24  // Reuse pool: chunk size -> pool.
  25  var buffers = map[int]*sync.Pool{}
  26  
  27  func initBuffers() {
  28  	for l := config.PooledSize; l <= config.MaxSize; l *= 2 {
  29  		buffers[l] = new(sync.Pool)
  30  	}
  31  }
  32  
  33  func init() {
  34  	initBuffers()
  35  }
  36  
  37  // Init sets up a non-default pooling and allocation strategy. Should be run before serialization is done.
  38  func Init(cfg PoolConfig) {
  39  	config = cfg
  40  	initBuffers()
  41  }
  42  
  43  // putBuf puts a chunk to reuse pool if it can be reused.
  44  func putBuf(buf []byte) {
  45  	size := cap(buf)
  46  	if size < config.PooledSize {
  47  		return
  48  	}
  49  	if c := buffers[size]; c != nil {
  50  		c.Put(buf[:0])
  51  	}
  52  }
  53  
  54  // getBuf gets a chunk from reuse pool or creates a new one if reuse failed.
  55  func getBuf(size int) []byte {
  56  	if size >= config.PooledSize {
  57  		if c := buffers[size]; c != nil {
  58  			v := c.Get()
  59  			if v != nil {
  60  				return v.([]byte)
  61  			}
  62  		}
  63  	}
  64  	return make([]byte, 0, size)
  65  }
  66  
  67  // Buffer is a buffer optimized for serialization without extra copying.
  68  type Buffer struct {
  69  
  70  	// Buf is the current chunk that can be used for serialization.
  71  	Buf []byte
  72  
  73  	toPool []byte
  74  	bufs   [][]byte
  75  }
  76  
  77  // EnsureSpace makes sure that the current chunk contains at least s free bytes,
  78  // possibly creating a new chunk.
  79  func (b *Buffer) EnsureSpace(s int) {
  80  	if cap(b.Buf)-len(b.Buf) < s {
  81  		b.ensureSpaceSlow(s)
  82  	}
  83  }
  84  
  85  func (b *Buffer) ensureSpaceSlow(s int) {
  86  	l := len(b.Buf)
  87  	if l > 0 {
  88  		if cap(b.toPool) != cap(b.Buf) {
  89  			// Chunk was reallocated, toPool can be pooled.
  90  			putBuf(b.toPool)
  91  		}
  92  		if cap(b.bufs) == 0 {
  93  			b.bufs = make([][]byte, 0, 8)
  94  		}
  95  		b.bufs = append(b.bufs, b.Buf)
  96  		l = cap(b.toPool) * 2
  97  	} else {
  98  		l = config.StartSize
  99  	}
 100  
 101  	if l > config.MaxSize {
 102  		l = config.MaxSize
 103  	}
 104  	b.Buf = getBuf(l)
 105  	b.toPool = b.Buf
 106  }
 107  
 108  // AppendByte appends a single byte to buffer.
 109  func (b *Buffer) AppendByte(data byte) {
 110  	b.EnsureSpace(1)
 111  	b.Buf = append(b.Buf, data)
 112  }
 113  
 114  // AppendBytes appends a byte slice to buffer.
 115  func (b *Buffer) AppendBytes(data []byte) {
 116  	if len(data) <= cap(b.Buf)-len(b.Buf) {
 117  		b.Buf = append(b.Buf, data...) // fast path
 118  	} else {
 119  		b.appendBytesSlow(data)
 120  	}
 121  }
 122  
 123  func (b *Buffer) appendBytesSlow(data []byte) {
 124  	for len(data) > 0 {
 125  		b.EnsureSpace(1)
 126  
 127  		sz := cap(b.Buf) - len(b.Buf)
 128  		if sz > len(data) {
 129  			sz = len(data)
 130  		}
 131  
 132  		b.Buf = append(b.Buf, data[:sz]...)
 133  		data = data[sz:]
 134  	}
 135  }
 136  
 137  // AppendString appends a string to buffer.
 138  func (b *Buffer) AppendString(data string) {
 139  	if len(data) <= cap(b.Buf)-len(b.Buf) {
 140  		b.Buf = append(b.Buf, data...) // fast path
 141  	} else {
 142  		b.appendStringSlow(data)
 143  	}
 144  }
 145  
 146  func (b *Buffer) appendStringSlow(data string) {
 147  	for len(data) > 0 {
 148  		b.EnsureSpace(1)
 149  
 150  		sz := cap(b.Buf) - len(b.Buf)
 151  		if sz > len(data) {
 152  			sz = len(data)
 153  		}
 154  
 155  		b.Buf = append(b.Buf, data[:sz]...)
 156  		data = data[sz:]
 157  	}
 158  }
 159  
 160  // Size computes the size of a buffer by adding sizes of every chunk.
 161  func (b *Buffer) Size() int {
 162  	size := len(b.Buf)
 163  	for _, buf := range b.bufs {
 164  		size += len(buf)
 165  	}
 166  	return size
 167  }
 168  
 169  // DumpTo outputs the contents of a buffer to a writer and resets the buffer.
 170  func (b *Buffer) DumpTo(w io.Writer) (written int, err error) {
 171  	bufs := net.Buffers(b.bufs)
 172  	if len(b.Buf) > 0 {
 173  		bufs = append(bufs, b.Buf)
 174  	}
 175  	n, err := bufs.WriteTo(w)
 176  
 177  	for _, buf := range b.bufs {
 178  		putBuf(buf)
 179  	}
 180  	putBuf(b.toPool)
 181  
 182  	b.bufs = nil
 183  	b.Buf = nil
 184  	b.toPool = nil
 185  
 186  	return int(n), err
 187  }
 188  
 189  // BuildBytes creates a single byte slice with all the contents of the buffer. Data is
 190  // copied if it does not fit in a single chunk. You can optionally provide one byte
 191  // slice as argument that it will try to reuse.
 192  func (b *Buffer) BuildBytes(reuse ...[]byte) []byte {
 193  	if len(b.bufs) == 0 {
 194  		ret := b.Buf
 195  		b.toPool = nil
 196  		b.Buf = nil
 197  		return ret
 198  	}
 199  
 200  	var ret []byte
 201  	size := b.Size()
 202  
 203  	// If we got a buffer as argument and it is big enough, reuse it.
 204  	if len(reuse) == 1 && cap(reuse[0]) >= size {
 205  		ret = reuse[0][:0]
 206  	} else {
 207  		ret = make([]byte, 0, size)
 208  	}
 209  	for _, buf := range b.bufs {
 210  		ret = append(ret, buf...)
 211  		putBuf(buf)
 212  	}
 213  
 214  	ret = append(ret, b.Buf...)
 215  	putBuf(b.toPool)
 216  
 217  	b.bufs = nil
 218  	b.toPool = nil
 219  	b.Buf = nil
 220  
 221  	return ret
 222  }
 223  
 224  type readCloser struct {
 225  	offset int
 226  	bufs   [][]byte
 227  }
 228  
 229  func (r *readCloser) Read(p []byte) (n int, err error) {
 230  	for _, buf := range r.bufs {
 231  		// Copy as much as we can.
 232  		x := copy(p[n:], buf[r.offset:])
 233  		n += x // Increment how much we filled.
 234  
 235  		// Did we empty the whole buffer?
 236  		if r.offset+x == len(buf) {
 237  			// On to the next buffer.
 238  			r.offset = 0
 239  			r.bufs = r.bufs[1:]
 240  
 241  			// We can release this buffer.
 242  			putBuf(buf)
 243  		} else {
 244  			r.offset += x
 245  		}
 246  
 247  		if n == len(p) {
 248  			break
 249  		}
 250  	}
 251  	// No buffers left or nothing read?
 252  	if len(r.bufs) == 0 {
 253  		err = io.EOF
 254  	}
 255  	return
 256  }
 257  
 258  func (r *readCloser) Close() error {
 259  	// Release all remaining buffers.
 260  	for _, buf := range r.bufs {
 261  		putBuf(buf)
 262  	}
 263  	// In case Close gets called multiple times.
 264  	r.bufs = nil
 265  
 266  	return nil
 267  }
 268  
 269  // ReadCloser creates an io.ReadCloser with all the contents of the buffer.
 270  func (b *Buffer) ReadCloser() io.ReadCloser {
 271  	ret := &readCloser{0, append(b.bufs, b.Buf)}
 272  
 273  	b.bufs = nil
 274  	b.toPool = nil
 275  	b.Buf = nil
 276  
 277  	return ret
 278  }
 279