buffer.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package z
   7  
   8  import (
   9  	"encoding/binary"
  10  	"errors"
  11  	"fmt"
  12  	"log"
  13  	"os"
  14  	"sort"
  15  	"sync/atomic"
  16  )
  17  
  18  const (
  19  	defaultCapacity = 64
  20  	defaultTag      = "buffer"
  21  )
  22  
  23  // Buffer is equivalent of bytes.Buffer without the ability to read. It is NOT thread-safe.
  24  //
  25  // In UseCalloc mode, z.Calloc is used to allocate memory, which depending upon how the code is
  26  // compiled could use jemalloc for allocations.
  27  //
  28  // In UseMmap mode, Buffer  uses file mmap to allocate memory. This allows us to store big data
  29  // structures without using physical memory.
  30  //
  31  // MaxSize can be set to limit the memory usage.
  32  type Buffer struct {
  33  	padding       uint64     // number of starting bytes used for padding
  34  	offset        uint64     // used length of the buffer
  35  	buf           []byte     // backing slice for the buffer
  36  	bufType       BufferType // type of the underlying buffer
  37  	curSz         int        // capacity of the buffer
  38  	maxSz         int        // causes a panic if the buffer grows beyond this size
  39  	mmapFile      *MmapFile  // optional mmap backing for the buffer
  40  	autoMmapAfter int        // Calloc falls back to an mmaped tmpfile after crossing this size
  41  	autoMmapDir   string     // directory for autoMmap to create a tempfile in
  42  	persistent    bool       // when enabled, Release will not delete the underlying mmap file
  43  	tag           string     // used for jemalloc stats
  44  }
  45  
  46  func NewBuffer(capacity int, tag string) *Buffer {
  47  	if capacity < defaultCapacity {
  48  		capacity = defaultCapacity
  49  	}
  50  	if tag == "" {
  51  		tag = defaultTag
  52  	}
  53  	return &Buffer{
  54  		buf:     Calloc(capacity, tag),
  55  		bufType: UseCalloc,
  56  		curSz:   capacity,
  57  		offset:  8,
  58  		padding: 8,
  59  		tag:     tag,
  60  	}
  61  }
  62  
  63  // It is the caller's responsibility to set offset after this, because Buffer
  64  // doesn't remember what it was.
  65  func NewBufferPersistent(path string, capacity int) (*Buffer, error) {
  66  	file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
  67  	if err != nil {
  68  		return nil, err
  69  	}
  70  	buffer, err := newBufferFile(file, capacity)
  71  	if err != nil {
  72  		return nil, err
  73  	}
  74  	buffer.persistent = true
  75  	return buffer, nil
  76  }
  77  
  78  func NewBufferTmp(dir string, capacity int) (*Buffer, error) {
  79  	if dir == "" {
  80  		dir = tmpDir
  81  	}
  82  	file, err := os.CreateTemp(dir, "buffer")
  83  	if err != nil {
  84  		return nil, err
  85  	}
  86  	return newBufferFile(file, capacity)
  87  }
  88  
  89  func newBufferFile(file *os.File, capacity int) (*Buffer, error) {
  90  	if capacity < defaultCapacity {
  91  		capacity = defaultCapacity
  92  	}
  93  	mmapFile, err := OpenMmapFileUsing(file, capacity, true)
  94  	if err != nil && err != NewFile {
  95  		return nil, err
  96  	}
  97  	buf := &Buffer{
  98  		buf:      mmapFile.Data,
  99  		bufType:  UseMmap,
 100  		curSz:    len(mmapFile.Data),
 101  		mmapFile: mmapFile,
 102  		offset:   8,
 103  		padding:  8,
 104  	}
 105  	return buf, nil
 106  }
 107  
 108  func NewBufferSlice(slice []byte) *Buffer {
 109  	return &Buffer{
 110  		offset:  uint64(len(slice)),
 111  		buf:     slice,
 112  		bufType: UseInvalid,
 113  	}
 114  }
 115  
 116  func (b *Buffer) WithAutoMmap(threshold int, path string) *Buffer {
 117  	if b.bufType != UseCalloc {
 118  		panic("can only autoMmap with UseCalloc")
 119  	}
 120  	b.autoMmapAfter = threshold
 121  	if path == "" {
 122  		b.autoMmapDir = tmpDir
 123  	} else {
 124  		b.autoMmapDir = path
 125  	}
 126  	return b
 127  }
 128  
 129  func (b *Buffer) WithMaxSize(size int) *Buffer {
 130  	b.maxSz = size
 131  	return b
 132  }
 133  
 134  func (b *Buffer) IsEmpty() bool {
 135  	return int(b.offset) == b.StartOffset()
 136  }
 137  
 138  // LenWithPadding would return the number of bytes written to the buffer so far
 139  // plus the padding at the start of the buffer.
 140  func (b *Buffer) LenWithPadding() int {
 141  	return int(atomic.LoadUint64(&b.offset))
 142  }
 143  
 144  // LenNoPadding would return the number of bytes written to the buffer so far
 145  // (without the padding).
 146  func (b *Buffer) LenNoPadding() int {
 147  	return int(atomic.LoadUint64(&b.offset) - b.padding)
 148  }
 149  
 150  // Bytes would return all the written bytes as a slice.
 151  func (b *Buffer) Bytes() []byte {
 152  	off := atomic.LoadUint64(&b.offset)
 153  	return b.buf[b.padding:off]
 154  }
 155  
 156  // Grow would grow the buffer to have at least n more bytes. In case the buffer is at capacity, it
 157  // would reallocate twice the size of current capacity + n, to ensure n bytes can be written to the
 158  // buffer without further allocation. In UseMmap mode, this might result in underlying file
 159  // expansion.
 160  func (b *Buffer) Grow(n int) {
 161  	if b.buf == nil {
 162  		panic("z.Buffer needs to be initialized before using")
 163  	}
 164  	if b.maxSz > 0 && int(b.offset)+n > b.maxSz {
 165  		err := fmt.Errorf(
 166  			"z.Buffer max size exceeded: %d offset: %d grow: %d", b.maxSz, b.offset, n)
 167  		panic(err)
 168  	}
 169  	if int(b.offset)+n < b.curSz {
 170  		return
 171  	}
 172  
 173  	// Calculate new capacity.
 174  	growBy := b.curSz + n
 175  	// Don't allocate more than 1GB at a time.
 176  	if growBy > 1<<30 {
 177  		growBy = 1 << 30
 178  	}
 179  	// Allocate at least n, even if it exceeds the 1GB limit above.
 180  	if n > growBy {
 181  		growBy = n
 182  	}
 183  	b.curSz += growBy
 184  
 185  	switch b.bufType {
 186  	case UseCalloc:
 187  		// If autoMmap gets triggered, copy the slice over to an mmaped file.
 188  		if b.autoMmapAfter > 0 && b.curSz > b.autoMmapAfter {
 189  			b.bufType = UseMmap
 190  			file, err := os.CreateTemp(b.autoMmapDir, "")
 191  			if err != nil {
 192  				panic(err)
 193  			}
 194  			mmapFile, err := OpenMmapFileUsing(file, b.curSz, true)
 195  			if err != nil && err != NewFile {
 196  				panic(err)
 197  			}
 198  			assert(int(b.offset) == copy(mmapFile.Data, b.buf[:b.offset]))
 199  			Free(b.buf)
 200  			b.mmapFile = mmapFile
 201  			b.buf = mmapFile.Data
 202  			break
 203  		}
 204  
 205  		// Else, reallocate the slice.
 206  		newBuf := Calloc(b.curSz, b.tag)
 207  		assert(int(b.offset) == copy(newBuf, b.buf[:b.offset]))
 208  		Free(b.buf)
 209  		b.buf = newBuf
 210  
 211  	case UseMmap:
 212  		// Truncate and remap the underlying file.
 213  		if err := b.mmapFile.Truncate(int64(b.curSz)); err != nil {
 214  			err = errors.Join(err,
 215  				fmt.Errorf("while trying to truncate file: %s to size: %d", b.mmapFile.Fd.Name(), b.curSz))
 216  			panic(err)
 217  		}
 218  		b.buf = b.mmapFile.Data
 219  
 220  	default:
 221  		panic("can only use Grow on UseCalloc and UseMmap buffers")
 222  	}
 223  }
 224  
 225  // Allocate is a way to get a slice of size n back from the buffer. This slice can be directly
 226  // written to. Warning: Allocate is not thread-safe. The byte slice returned MUST be used before
 227  // further calls to Buffer.
 228  func (b *Buffer) Allocate(n int) []byte {
 229  	b.Grow(n)
 230  	off := b.offset
 231  	b.offset += uint64(n)
 232  	return b.buf[off:int(b.offset)]
 233  }
 234  
 235  // AllocateOffset works the same way as allocate, but instead of returning a byte slice, it returns
 236  // the offset of the allocation.
 237  func (b *Buffer) AllocateOffset(n int) int {
 238  	b.Grow(n)
 239  	b.offset += uint64(n)
 240  	return int(b.offset) - n
 241  }
 242  
 243  func (b *Buffer) writeLen(sz int) {
 244  	buf := b.Allocate(8)
 245  	binary.BigEndian.PutUint64(buf, uint64(sz))
 246  }
 247  
 248  // SliceAllocate would encode the size provided into the buffer, followed by a call to Allocate,
 249  // hence returning the slice of size sz. This can be used to allocate a lot of small buffers into
 250  // this big buffer.
 251  // Note that SliceAllocate should NOT be mixed with normal calls to Write.
 252  func (b *Buffer) SliceAllocate(sz int) []byte {
 253  	b.Grow(8 + sz)
 254  	b.writeLen(sz)
 255  	return b.Allocate(sz)
 256  }
 257  
 258  func (b *Buffer) StartOffset() int {
 259  	return int(b.padding)
 260  }
 261  
 262  func (b *Buffer) WriteSlice(slice []byte) {
 263  	dst := b.SliceAllocate(len(slice))
 264  	assert(len(slice) == copy(dst, slice))
 265  }
 266  
 267  func (b *Buffer) SliceIterate(f func(slice []byte) error) error {
 268  	if b.IsEmpty() {
 269  		return nil
 270  	}
 271  
 272  	next := b.StartOffset()
 273  	var slice []byte
 274  	for next >= 0 {
 275  		slice, next = b.Slice(next)
 276  		if len(slice) == 0 {
 277  			continue
 278  		}
 279  		if err := f(slice); err != nil {
 280  			return err
 281  		}
 282  	}
 283  
 284  	return nil
 285  }
 286  
 287  const (
 288  	UseCalloc BufferType = iota
 289  	UseMmap
 290  	UseInvalid
 291  )
 292  
 293  type BufferType int
 294  
 295  func (t BufferType) String() string {
 296  	switch t {
 297  	case UseCalloc:
 298  		return "UseCalloc"
 299  	case UseMmap:
 300  		return "UseMmap"
 301  	default:
 302  		return "UseInvalid"
 303  	}
 304  }
 305  
 306  type LessFunc func(a, b []byte) bool
 307  type sortHelper struct {
 308  	offsets []int
 309  	b       *Buffer
 310  	tmp     *Buffer
 311  	less    LessFunc
 312  	small   []int
 313  }
 314  
 315  func (s *sortHelper) sortSmall(start, end int) {
 316  	s.tmp.Reset()
 317  	s.small = s.small[:0]
 318  	next := start
 319  	for next >= 0 && next < end {
 320  		s.small = append(s.small, next)
 321  		_, next = s.b.Slice(next)
 322  	}
 323  
 324  	// We are sorting the slices pointed to by s.small offsets, but only moving the offsets around.
 325  	sort.Slice(s.small, func(i, j int) bool {
 326  		left, _ := s.b.Slice(s.small[i])
 327  		right, _ := s.b.Slice(s.small[j])
 328  		return s.less(left, right)
 329  	})
 330  	// Now we iterate over the s.small offsets and copy over the slices. The result is now in order.
 331  	for _, off := range s.small {
 332  		_, _ = s.tmp.Write(rawSlice(s.b.buf[off:]))
 333  	}
 334  	assert(end-start == copy(s.b.buf[start:end], s.tmp.Bytes()))
 335  }
 336  
 337  func assert(b bool) {
 338  	if !b {
 339  		log.Fatalf("%+v", errors.New("Assertion failure"))
 340  	}
 341  }
 342  func check(err error) {
 343  	if err != nil {
 344  		log.Fatalf("%+v", err)
 345  	}
 346  }
 347  func check2(_ interface{}, err error) {
 348  	check(err)
 349  }
 350  
 351  func (s *sortHelper) merge(left, right []byte, start, end int) {
 352  	if len(left) == 0 || len(right) == 0 {
 353  		return
 354  	}
 355  	s.tmp.Reset()
 356  	check2(s.tmp.Write(left))
 357  	left = s.tmp.Bytes()
 358  
 359  	var ls, rs []byte
 360  
 361  	copyLeft := func() {
 362  		assert(len(ls) == copy(s.b.buf[start:], ls))
 363  		left = left[len(ls):]
 364  		start += len(ls)
 365  	}
 366  	copyRight := func() {
 367  		assert(len(rs) == copy(s.b.buf[start:], rs))
 368  		right = right[len(rs):]
 369  		start += len(rs)
 370  	}
 371  
 372  	for start < end {
 373  		if len(left) == 0 {
 374  			assert(len(right) == copy(s.b.buf[start:end], right))
 375  			return
 376  		}
 377  		if len(right) == 0 {
 378  			assert(len(left) == copy(s.b.buf[start:end], left))
 379  			return
 380  		}
 381  		ls = rawSlice(left)
 382  		rs = rawSlice(right)
 383  
 384  		// We skip the first 4 bytes in the rawSlice, because that stores the length.
 385  		if s.less(ls[8:], rs[8:]) {
 386  			copyLeft()
 387  		} else {
 388  			copyRight()
 389  		}
 390  	}
 391  }
 392  
 393  func (s *sortHelper) sort(lo, hi int) []byte {
 394  	assert(lo <= hi)
 395  
 396  	mid := lo + (hi-lo)/2
 397  	loff, hoff := s.offsets[lo], s.offsets[hi]
 398  	if lo == mid {
 399  		// No need to sort, just return the buffer.
 400  		return s.b.buf[loff:hoff]
 401  	}
 402  
 403  	// lo, mid would sort from [offset[lo], offset[mid]) .
 404  	left := s.sort(lo, mid)
 405  	// Typically we'd use mid+1, but here mid represents an offset in the buffer. Each offset
 406  	// contains a thousand entries. So, if we do mid+1, we'd skip over those entries.
 407  	right := s.sort(mid, hi)
 408  
 409  	s.merge(left, right, loff, hoff)
 410  	return s.b.buf[loff:hoff]
 411  }
 412  
 413  // SortSlice is like SortSliceBetween but sorting over the entire buffer.
 414  func (b *Buffer) SortSlice(less func(left, right []byte) bool) {
 415  	b.SortSliceBetween(b.StartOffset(), int(b.offset), less)
 416  }
 417  func (b *Buffer) SortSliceBetween(start, end int, less LessFunc) {
 418  	if start >= end {
 419  		return
 420  	}
 421  	if start == 0 {
 422  		panic("start can never be zero")
 423  	}
 424  
 425  	var offsets []int
 426  	next, count := start, 0
 427  	for next >= 0 && next < end {
 428  		if count%1024 == 0 {
 429  			offsets = append(offsets, next)
 430  		}
 431  		_, next = b.Slice(next)
 432  		count++
 433  	}
 434  	assert(len(offsets) > 0)
 435  	if offsets[len(offsets)-1] != end {
 436  		offsets = append(offsets, end)
 437  	}
 438  
 439  	szTmp := int(float64((end-start)/2) * 1.1)
 440  	s := &sortHelper{
 441  		offsets: offsets,
 442  		b:       b,
 443  		less:    less,
 444  		small:   make([]int, 0, 1024),
 445  		tmp:     NewBuffer(szTmp, b.tag),
 446  	}
 447  	defer func() { _ = s.tmp.Release() }()
 448  
 449  	left := offsets[0]
 450  	for _, off := range offsets[1:] {
 451  		s.sortSmall(left, off)
 452  		left = off
 453  	}
 454  	s.sort(0, len(offsets)-1)
 455  }
 456  
 457  func rawSlice(buf []byte) []byte {
 458  	sz := binary.BigEndian.Uint64(buf)
 459  	return buf[:8+int(sz)]
 460  }
 461  
 462  // Slice would return the slice written at offset.
 463  func (b *Buffer) Slice(offset int) ([]byte, int) {
 464  	if offset >= int(b.offset) {
 465  		return nil, -1
 466  	}
 467  
 468  	sz := binary.BigEndian.Uint64(b.buf[offset:])
 469  	start := offset + 8
 470  	next := start + int(sz)
 471  	res := b.buf[start:next]
 472  	if next >= int(b.offset) {
 473  		next = -1
 474  	}
 475  	return res, next
 476  }
 477  
 478  // SliceOffsets is an expensive function. Use sparingly.
 479  func (b *Buffer) SliceOffsets() []int {
 480  	next := b.StartOffset()
 481  	var offsets []int
 482  	for next >= 0 {
 483  		offsets = append(offsets, next)
 484  		_, next = b.Slice(next)
 485  	}
 486  	return offsets
 487  }
 488  
 489  func (b *Buffer) Data(offset int) []byte {
 490  	if offset > b.curSz {
 491  		panic("offset beyond current size")
 492  	}
 493  	return b.buf[offset:b.curSz]
 494  }
 495  
 496  // Write would write p bytes to the buffer.
 497  func (b *Buffer) Write(p []byte) (n int, err error) {
 498  	n = len(p)
 499  	b.Grow(n)
 500  	assert(n == copy(b.buf[b.offset:], p))
 501  	b.offset += uint64(n)
 502  	return n, nil
 503  }
 504  
 505  // Reset would reset the buffer to be reused.
 506  func (b *Buffer) Reset() {
 507  	b.offset = uint64(b.StartOffset())
 508  }
 509  
 510  // Release would free up the memory allocated by the buffer. Once the usage of buffer is done, it is
 511  // important to call Release, otherwise a memory leak can happen.
 512  func (b *Buffer) Release() error {
 513  	if b == nil {
 514  		return nil
 515  	}
 516  	switch b.bufType {
 517  	case UseCalloc:
 518  		Free(b.buf)
 519  	case UseMmap:
 520  		if b.mmapFile == nil {
 521  			return nil
 522  		}
 523  		path := b.mmapFile.Fd.Name()
 524  		if err := b.mmapFile.Close(-1); err != nil {
 525  			return errors.Join(err, fmt.Errorf("while closing file: %s", path))
 526  		}
 527  		if !b.persistent {
 528  			if err := os.Remove(path); err != nil {
 529  				return errors.Join(err, fmt.Errorf("while deleting file %s", path))
 530  			}
 531  		}
 532  	}
 533  	return nil
 534  }
 535