y.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package y
   7  
   8  import (
   9  	"bytes"
  10  	"encoding/binary"
  11  	stderrors "errors"
  12  	"fmt"
  13  	"hash/crc32"
  14  	"io"
  15  	"math"
  16  	"os"
  17  	"reflect"
  18  	"strconv"
  19  	"sync"
  20  	"time"
  21  	"unsafe"
  22  
  23  	"github.com/dgraph-io/badger/v4/pb"
  24  	"github.com/dgraph-io/ristretto/v2/z"
  25  )
  26  
  27  var (
  28  	// ErrEOF indicates an end of file when trying to read from a memory mapped file
  29  	// and encountering the end of slice.
  30  	ErrEOF = stderrors.New("ErrEOF: End of file")
  31  
  32  	// ErrCommitAfterFinish indicates that write batch commit was called after
  33  	// finish
  34  	ErrCommitAfterFinish = stderrors.New("Batch commit not permitted after finish")
  35  )
  36  
  37  type Flags int
  38  
  39  const (
  40  	// Sync indicates that O_DSYNC should be set on the underlying file,
  41  	// ensuring that data writes do not return until the data is flushed
  42  	// to disk.
  43  	Sync Flags = 1 << iota
  44  	// ReadOnly opens the underlying file on a read-only basis.
  45  	ReadOnly
  46  )
  47  
  48  var (
  49  	// This is O_DSYNC (datasync) on platforms that support it -- see file_unix.go
  50  	datasyncFileFlag = 0x0
  51  
  52  	// CastagnoliCrcTable is a CRC32 polynomial table
  53  	CastagnoliCrcTable = crc32.MakeTable(crc32.Castagnoli)
  54  )
  55  
  56  // OpenExistingFile opens an existing file, errors if it doesn't exist.
  57  func OpenExistingFile(filename string, flags Flags) (*os.File, error) {
  58  	openFlags := os.O_RDWR
  59  	if flags&ReadOnly != 0 {
  60  		openFlags = os.O_RDONLY
  61  	}
  62  
  63  	if flags&Sync != 0 {
  64  		openFlags |= datasyncFileFlag
  65  	}
  66  	return os.OpenFile(filename, openFlags, 0)
  67  }
  68  
  69  // CreateSyncedFile creates a new file (using O_EXCL), errors if it already existed.
  70  func CreateSyncedFile(filename string, sync bool) (*os.File, error) {
  71  	flags := os.O_RDWR | os.O_CREATE | os.O_EXCL
  72  	if sync {
  73  		flags |= datasyncFileFlag
  74  	}
  75  	return os.OpenFile(filename, flags, 0600)
  76  }
  77  
  78  // OpenSyncedFile creates the file if one doesn't exist.
  79  func OpenSyncedFile(filename string, sync bool) (*os.File, error) {
  80  	flags := os.O_RDWR | os.O_CREATE
  81  	if sync {
  82  		flags |= datasyncFileFlag
  83  	}
  84  	return os.OpenFile(filename, flags, 0600)
  85  }
  86  
  87  // OpenTruncFile opens the file with O_RDWR | O_CREATE | O_TRUNC
  88  func OpenTruncFile(filename string, sync bool) (*os.File, error) {
  89  	flags := os.O_RDWR | os.O_CREATE | os.O_TRUNC
  90  	if sync {
  91  		flags |= datasyncFileFlag
  92  	}
  93  	return os.OpenFile(filename, flags, 0600)
  94  }
  95  
  96  // SafeCopy does append(a[:0], src...).
  97  func SafeCopy(a, src []byte) []byte {
  98  	return append(a[:0], src...)
  99  }
 100  
 101  // Copy copies a byte slice and returns the copied slice.
 102  func Copy(a []byte) []byte {
 103  	b := make([]byte, len(a))
 104  	copy(b, a)
 105  	return b
 106  }
 107  
 108  // KeyWithTs generates a new key by appending ts to key.
 109  func KeyWithTs(key []byte, ts uint64) []byte {
 110  	out := make([]byte, len(key)+8)
 111  	copy(out, key)
 112  	binary.BigEndian.PutUint64(out[len(key):], math.MaxUint64-ts)
 113  	return out
 114  }
 115  
 116  // ParseTs parses the timestamp from the key bytes.
 117  func ParseTs(key []byte) uint64 {
 118  	if len(key) <= 8 {
 119  		return 0
 120  	}
 121  	return math.MaxUint64 - binary.BigEndian.Uint64(key[len(key)-8:])
 122  }
 123  
 124  // CompareKeys checks the key without timestamp and checks the timestamp if keyNoTs
 125  // is same.
 126  // a<timestamp> would be sorted higher than aa<timestamp> if we use bytes.compare
 127  // All keys should have timestamp.
 128  func CompareKeys(key1, key2 []byte) int {
 129  	if cmp := bytes.Compare(key1[:len(key1)-8], key2[:len(key2)-8]); cmp != 0 {
 130  		return cmp
 131  	}
 132  	return bytes.Compare(key1[len(key1)-8:], key2[len(key2)-8:])
 133  }
 134  
 135  // ParseKey parses the actual key from the key bytes.
 136  func ParseKey(key []byte) []byte {
 137  	if key == nil {
 138  		return nil
 139  	}
 140  
 141  	return key[:len(key)-8]
 142  }
 143  
 144  // SameKey checks for key equality ignoring the version timestamp suffix.
 145  func SameKey(src, dst []byte) bool {
 146  	if len(src) != len(dst) {
 147  		return false
 148  	}
 149  	return bytes.Equal(ParseKey(src), ParseKey(dst))
 150  }
 151  
 152  // Slice holds a reusable buf, will reallocate if you request a larger size than ever before.
 153  // One problem is with n distinct sizes in random order it'll reallocate log(n) times.
 154  type Slice struct {
 155  	buf []byte
 156  }
 157  
 158  // Resize reuses the Slice's buffer (or makes a new one) and returns a slice in that buffer of
 159  // length sz.
 160  func (s *Slice) Resize(sz int) []byte {
 161  	if cap(s.buf) < sz {
 162  		s.buf = make([]byte, sz)
 163  	}
 164  	return s.buf[0:sz]
 165  }
 166  
 167  // FixedDuration returns a string representation of the given duration with the
 168  // hours, minutes, and seconds.
 169  func FixedDuration(d time.Duration) string {
 170  	str := fmt.Sprintf("%02ds", int(d.Seconds())%60)
 171  	if d >= time.Minute {
 172  		str = fmt.Sprintf("%02dm", int(d.Minutes())%60) + str
 173  	}
 174  	if d >= time.Hour {
 175  		str = fmt.Sprintf("%02dh", int(d.Hours())) + str
 176  	}
 177  	return str
 178  }
 179  
 180  // Throttle allows a limited number of workers to run at a time. It also
 181  // provides a mechanism to check for errors encountered by workers and wait for
 182  // them to finish.
 183  type Throttle struct {
 184  	once      sync.Once
 185  	wg        sync.WaitGroup
 186  	ch        chan struct{}
 187  	errCh     chan error
 188  	finishErr error
 189  }
 190  
 191  // NewThrottle creates a new throttle with a max number of workers.
 192  func NewThrottle(max int) *Throttle {
 193  	return &Throttle{
 194  		ch:    make(chan struct{}, max),
 195  		errCh: make(chan error, max),
 196  	}
 197  }
 198  
 199  // Do should be called by workers before they start working. It blocks if there
 200  // are already maximum number of workers working. If it detects an error from
 201  // previously Done workers, it would return it.
 202  func (t *Throttle) Do() error {
 203  	for {
 204  		select {
 205  		case t.ch <- struct{}{}:
 206  			t.wg.Add(1)
 207  			return nil
 208  		case err := <-t.errCh:
 209  			if err != nil {
 210  				return err
 211  			}
 212  		}
 213  	}
 214  }
 215  
 216  // Done should be called by workers when they finish working. They can also
 217  // pass the error status of work done.
 218  func (t *Throttle) Done(err error) {
 219  	if err != nil {
 220  		t.errCh <- err
 221  	}
 222  	select {
 223  	case <-t.ch:
 224  	default:
 225  		panic("Throttle Do Done mismatch")
 226  	}
 227  	t.wg.Done()
 228  }
 229  
 230  // Finish waits until all workers have finished working. It would return any error passed by Done.
 231  // If Finish is called multiple time, it will wait for workers to finish only once(first time).
 232  // From next calls, it will return same error as found on first call.
 233  func (t *Throttle) Finish() error {
 234  	t.once.Do(func() {
 235  		t.wg.Wait()
 236  		close(t.ch)
 237  		close(t.errCh)
 238  		for err := range t.errCh {
 239  			if err != nil {
 240  				t.finishErr = err
 241  				return
 242  			}
 243  		}
 244  	})
 245  
 246  	return t.finishErr
 247  }
 248  
 249  // U16ToBytes converts the given Uint16 to bytes
 250  func U16ToBytes(v uint16) []byte {
 251  	var uBuf [2]byte
 252  	binary.BigEndian.PutUint16(uBuf[:], v)
 253  	return uBuf[:]
 254  }
 255  
 256  // BytesToU16 converts the given byte slice to uint16
 257  func BytesToU16(b []byte) uint16 {
 258  	return binary.BigEndian.Uint16(b)
 259  }
 260  
 261  // U32ToBytes converts the given Uint32 to bytes
 262  func U32ToBytes(v uint32) []byte {
 263  	var uBuf [4]byte
 264  	binary.BigEndian.PutUint32(uBuf[:], v)
 265  	return uBuf[:]
 266  }
 267  
 268  // BytesToU32 converts the given byte slice to uint32
 269  func BytesToU32(b []byte) uint32 {
 270  	return binary.BigEndian.Uint32(b)
 271  }
 272  
 273  // U32SliceToBytes converts the given Uint32 slice to byte slice
 274  func U32SliceToBytes(u32s []uint32) []byte {
 275  	if len(u32s) == 0 {
 276  		return nil
 277  	}
 278  	var b []byte
 279  	hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
 280  	hdr.Len = len(u32s) * 4
 281  	hdr.Cap = hdr.Len
 282  	hdr.Data = uintptr(unsafe.Pointer(&u32s[0]))
 283  	return b
 284  }
 285  
 286  // BytesToU32Slice converts the given byte slice to uint32 slice
 287  func BytesToU32Slice(b []byte) []uint32 {
 288  	if len(b) == 0 {
 289  		return nil
 290  	}
 291  	var u32s []uint32
 292  	hdr := (*reflect.SliceHeader)(unsafe.Pointer(&u32s))
 293  	hdr.Len = len(b) / 4
 294  	hdr.Cap = hdr.Len
 295  	hdr.Data = uintptr(unsafe.Pointer(&b[0]))
 296  	return u32s
 297  }
 298  
 299  // U64ToBytes converts the given Uint64 to bytes
 300  func U64ToBytes(v uint64) []byte {
 301  	var uBuf [8]byte
 302  	binary.BigEndian.PutUint64(uBuf[:], v)
 303  	return uBuf[:]
 304  }
 305  
 306  // BytesToU64 converts the given byte slice to uint64
 307  func BytesToU64(b []byte) uint64 {
 308  	return binary.BigEndian.Uint64(b)
 309  }
 310  
 311  // U64SliceToBytes converts the given Uint64 slice to byte slice
 312  func U64SliceToBytes(u64s []uint64) []byte {
 313  	if len(u64s) == 0 {
 314  		return nil
 315  	}
 316  	var b []byte
 317  	hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
 318  	hdr.Len = len(u64s) * 8
 319  	hdr.Cap = hdr.Len
 320  	hdr.Data = uintptr(unsafe.Pointer(&u64s[0]))
 321  	return b
 322  }
 323  
 324  // BytesToU64Slice converts the given byte slice to uint64 slice
 325  func BytesToU64Slice(b []byte) []uint64 {
 326  	if len(b) == 0 {
 327  		return nil
 328  	}
 329  	var u64s []uint64
 330  	hdr := (*reflect.SliceHeader)(unsafe.Pointer(&u64s))
 331  	hdr.Len = len(b) / 8
 332  	hdr.Cap = hdr.Len
 333  	hdr.Data = uintptr(unsafe.Pointer(&b[0]))
 334  	return u64s
 335  }
 336  
 337  // page struct contains one underlying buffer.
 338  type page struct {
 339  	buf []byte
 340  }
 341  
 342  // PageBuffer consists of many pages. A page is a wrapper over []byte. PageBuffer can act as a
 343  // replacement of bytes.Buffer. Instead of having single underlying buffer, it has multiple
 344  // underlying buffers. Hence it avoids any copy during relocation(as happens in bytes.Buffer).
 345  // PageBuffer allocates memory in pages. Once a page is full, it will allocate page with double the
 346  // size of previous page. Its function are not thread safe.
 347  type PageBuffer struct {
 348  	pages []*page
 349  
 350  	length       int // Length of PageBuffer.
 351  	nextPageSize int // Size of next page to be allocated.
 352  }
 353  
 354  // NewPageBuffer returns a new PageBuffer with first page having size pageSize.
 355  func NewPageBuffer(pageSize int) *PageBuffer {
 356  	b := &PageBuffer{}
 357  	b.pages = append(b.pages, &page{buf: make([]byte, 0, pageSize)})
 358  	b.nextPageSize = pageSize * 2
 359  	return b
 360  }
 361  
 362  // Write writes data to PageBuffer b. It returns number of bytes written and any error encountered.
 363  func (b *PageBuffer) Write(data []byte) (int, error) {
 364  	dataLen := len(data)
 365  	for {
 366  		cp := b.pages[len(b.pages)-1] // Current page.
 367  
 368  		n := copy(cp.buf[len(cp.buf):cap(cp.buf)], data)
 369  		cp.buf = cp.buf[:len(cp.buf)+n]
 370  		b.length += n
 371  
 372  		if len(data) == n {
 373  			break
 374  		}
 375  		data = data[n:]
 376  
 377  		b.pages = append(b.pages, &page{buf: make([]byte, 0, b.nextPageSize)})
 378  		b.nextPageSize *= 2
 379  	}
 380  
 381  	return dataLen, nil
 382  }
 383  
 384  // WriteByte writes data byte to PageBuffer and returns any encountered error.
 385  func (b *PageBuffer) WriteByte(data byte) error {
 386  	_, err := b.Write([]byte{data})
 387  	return err
 388  }
 389  
 390  // Len returns length of PageBuffer.
 391  func (b *PageBuffer) Len() int {
 392  	return b.length
 393  }
 394  
 395  // pageForOffset returns pageIdx and startIdx for the offset.
 396  func (b *PageBuffer) pageForOffset(offset int) (int, int) {
 397  	AssertTrue(offset < b.length)
 398  
 399  	var pageIdx, startIdx, sizeNow int
 400  	for i := 0; i < len(b.pages); i++ {
 401  		cp := b.pages[i]
 402  
 403  		if sizeNow+len(cp.buf)-1 < offset {
 404  			sizeNow += len(cp.buf)
 405  		} else {
 406  			pageIdx = i
 407  			startIdx = offset - sizeNow
 408  			break
 409  		}
 410  	}
 411  
 412  	return pageIdx, startIdx
 413  }
 414  
 415  // Truncate truncates PageBuffer to length n.
 416  func (b *PageBuffer) Truncate(n int) {
 417  	pageIdx, startIdx := b.pageForOffset(n)
 418  	// For simplicity of the code reject extra pages. These pages can be kept.
 419  	b.pages = b.pages[:pageIdx+1]
 420  	cp := b.pages[len(b.pages)-1]
 421  	cp.buf = cp.buf[:startIdx]
 422  	b.length = n
 423  }
 424  
 425  // Bytes returns whole Buffer data as single []byte.
 426  func (b *PageBuffer) Bytes() []byte {
 427  	buf := make([]byte, b.length)
 428  	written := 0
 429  	for i := 0; i < len(b.pages); i++ {
 430  		written += copy(buf[written:], b.pages[i].buf)
 431  	}
 432  
 433  	return buf
 434  }
 435  
 436  // WriteTo writes whole buffer to w. It returns number of bytes written and any error encountered.
 437  func (b *PageBuffer) WriteTo(w io.Writer) (int64, error) {
 438  	written := int64(0)
 439  	for i := 0; i < len(b.pages); i++ {
 440  		n, err := w.Write(b.pages[i].buf)
 441  		written += int64(n)
 442  		if err != nil {
 443  			return written, err
 444  		}
 445  	}
 446  
 447  	return written, nil
 448  }
 449  
 450  // NewReaderAt returns a reader which starts reading from offset in page buffer.
 451  func (b *PageBuffer) NewReaderAt(offset int) *PageBufferReader {
 452  	pageIdx, startIdx := b.pageForOffset(offset)
 453  
 454  	return &PageBufferReader{
 455  		buf:      b,
 456  		pageIdx:  pageIdx,
 457  		startIdx: startIdx,
 458  	}
 459  }
 460  
 461  // PageBufferReader is a reader for PageBuffer.
 462  type PageBufferReader struct {
 463  	buf      *PageBuffer // Underlying page buffer.
 464  	pageIdx  int         // Idx of page from where it will start reading.
 465  	startIdx int         // Idx inside page - buf.pages[pageIdx] from where it will start reading.
 466  }
 467  
 468  // Read reads upto len(p) bytes. It returns number of bytes read and any error encountered.
 469  func (r *PageBufferReader) Read(p []byte) (int, error) {
 470  	// Check if there is enough to Read.
 471  	pc := len(r.buf.pages)
 472  
 473  	read := 0
 474  	for r.pageIdx < pc && read < len(p) {
 475  		cp := r.buf.pages[r.pageIdx] // Current Page.
 476  		endIdx := len(cp.buf)        // Last Idx up to which we can read from this page.
 477  
 478  		n := copy(p[read:], cp.buf[r.startIdx:endIdx])
 479  		read += n
 480  		r.startIdx += n
 481  
 482  		// Instead of len(cp.buf), we comparing with cap(cp.buf). This ensures that we move to next
 483  		// page only when we have read all data. Reading from last page is an edge case. We don't
 484  		// want to move to next page until last page is full to its capacity.
 485  		if r.startIdx >= cap(cp.buf) {
 486  			// We should move to next page.
 487  			r.pageIdx++
 488  			r.startIdx = 0
 489  			continue
 490  		}
 491  
 492  		// When last page in not full to its capacity and we have read all data up to its
 493  		// length, just break out of the loop.
 494  		if r.pageIdx == pc-1 {
 495  			break
 496  		}
 497  	}
 498  
 499  	if read == 0 && len(p) > 0 {
 500  		return read, io.EOF
 501  	}
 502  
 503  	return read, nil
 504  }
 505  
 506  const kvsz = int(unsafe.Sizeof(pb.KV{}))
 507  
 508  func NewKV(alloc *z.Allocator) *pb.KV {
 509  	if alloc == nil {
 510  		return &pb.KV{}
 511  	}
 512  	b := alloc.AllocateAligned(kvsz)
 513  	return (*pb.KV)(unsafe.Pointer(&b[0]))
 514  }
 515  
 516  // IBytesToString converts size in bytes to human readable format.
 517  // The code is taken from humanize library and changed to provide
 518  // value upto custom decimal precision.
 519  // IBytesToString(12312412, 1) -> 11.7 MiB
 520  func IBytesToString(size uint64, precision int) string {
 521  	sizes := []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"}
 522  	base := float64(1024)
 523  	if size < 10 {
 524  		return fmt.Sprintf("%d B", size)
 525  	}
 526  	e := math.Floor(math.Log(float64(size)) / math.Log(base))
 527  	suffix := sizes[int(e)]
 528  	val := float64(size) / math.Pow(base, e)
 529  	f := "%." + strconv.Itoa(precision) + "f %s"
 530  
 531  	return fmt.Sprintf(f, val, suffix)
 532  }
 533  
 534  type RateMonitor struct {
 535  	start       time.Time
 536  	lastSent    uint64
 537  	lastCapture time.Time
 538  	rates       []float64
 539  	idx         int
 540  }
 541  
 542  func NewRateMonitor(numSamples int) *RateMonitor {
 543  	return &RateMonitor{
 544  		start: time.Now(),
 545  		rates: make([]float64, numSamples),
 546  	}
 547  }
 548  
 549  const minRate = 0.0001
 550  
 551  // Capture captures the current number of sent bytes. This number should be monotonically
 552  // increasing.
 553  func (rm *RateMonitor) Capture(sent uint64) {
 554  	diff := sent - rm.lastSent
 555  	dur := time.Since(rm.lastCapture)
 556  	rm.lastCapture, rm.lastSent = time.Now(), sent
 557  
 558  	rate := float64(diff) / dur.Seconds()
 559  	if rate < minRate {
 560  		rate = minRate
 561  	}
 562  	rm.rates[rm.idx] = rate
 563  	rm.idx = (rm.idx + 1) % len(rm.rates)
 564  }
 565  
 566  // Rate returns the average rate of transmission smoothed out by the number of samples.
 567  func (rm *RateMonitor) Rate() uint64 {
 568  	var total float64
 569  	var den float64
 570  	for _, r := range rm.rates {
 571  		if r < minRate {
 572  			// Ignore this. We always set minRate, so this is a zero.
 573  			// Typically at the start of the rate monitor, we'd have zeros.
 574  			continue
 575  		}
 576  		total += r
 577  		den += 1.0
 578  	}
 579  	if den < minRate {
 580  		return 0
 581  	}
 582  	return uint64(total / den)
 583  }
 584