value.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package badger
   7  
   8  import (
   9  	"bytes"
  10  	"context"
  11  	"errors"
  12  	"fmt"
  13  	"hash"
  14  	"hash/crc32"
  15  	"io"
  16  	"math"
  17  	"os"
  18  	"sort"
  19  	"strconv"
  20  	"strings"
  21  	"sync"
  22  	"sync/atomic"
  23  
  24  	"go.opentelemetry.io/otel"
  25  	"go.opentelemetry.io/otel/attribute"
  26  
  27  	"github.com/dgraph-io/badger/v4/y"
  28  	"github.com/dgraph-io/ristretto/v2/z"
  29  )
  30  
  31  // maxVlogFileSize is the maximum size of the vlog file which can be created. Vlog Offset is of
  32  // uint32, so limiting at max uint32.
  33  var maxVlogFileSize uint32 = math.MaxUint32
  34  
  35  // Values have their first byte being byteData or byteDelete. This helps us distinguish between
  36  // a key that has never been seen and a key that has been explicitly deleted.
  37  const (
  38  	bitDelete                 byte = 1 << 0 // Set if the key has been deleted.
  39  	bitValuePointer           byte = 1 << 1 // Set if the value is NOT stored directly next to key.
  40  	bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
  41  	// Set if item shouldn't be discarded via compactions (used by merge operator)
  42  	bitMergeEntry byte = 1 << 3
  43  	// The MSB 2 bits are for transactions.
  44  	bitTxn    byte = 1 << 6 // Set if the entry is part of a txn.
  45  	bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.
  46  
  47  	mi int64 = 1 << 20 //nolint:unused
  48  
  49  	// size of vlog header.
  50  	// +----------------+------------------+
  51  	// | keyID(8 bytes) |  baseIV(12 bytes)|
  52  	// +----------------+------------------+
  53  	vlogHeaderSize = 20
  54  )
  55  
  56  var errStop = errors.New("Stop iteration")
  57  var errTruncate = errors.New("Do truncate")
  58  
  59  type logEntry func(e Entry, vp valuePointer) error
  60  
  61  type safeRead struct {
  62  	k []byte
  63  	v []byte
  64  
  65  	recordOffset uint32
  66  	lf           *logFile
  67  }
  68  
  69  // hashReader implements io.Reader, io.ByteReader interfaces. It also keeps track of the number
  70  // bytes read. The hashReader writes to h (hash) what it reads from r.
  71  type hashReader struct {
  72  	r         io.Reader
  73  	h         hash.Hash32
  74  	bytesRead int // Number of bytes read.
  75  }
  76  
  77  func newHashReader(r io.Reader) *hashReader {
  78  	hash := crc32.New(y.CastagnoliCrcTable)
  79  	return &hashReader{
  80  		r: r,
  81  		h: hash,
  82  	}
  83  }
  84  
  85  // Read reads len(p) bytes from the reader. Returns the number of bytes read, error on failure.
  86  func (t *hashReader) Read(p []byte) (int, error) {
  87  	n, err := t.r.Read(p)
  88  	if err != nil {
  89  		return n, err
  90  	}
  91  	t.bytesRead += n
  92  	return t.h.Write(p[:n])
  93  }
  94  
  95  // ReadByte reads exactly one byte from the reader. Returns error on failure.
  96  func (t *hashReader) ReadByte() (byte, error) {
  97  	b := make([]byte, 1)
  98  	_, err := t.Read(b)
  99  	return b[0], err
 100  }
 101  
 102  // Sum32 returns the sum32 of the underlying hash.
 103  func (t *hashReader) Sum32() uint32 {
 104  	return t.h.Sum32()
 105  }
 106  
 107  // Entry reads an entry from the provided reader. It also validates the checksum for every entry
 108  // read. Returns error on failure.
 109  func (r *safeRead) Entry(reader io.Reader) (*Entry, error) {
 110  	tee := newHashReader(reader)
 111  	var h header
 112  	hlen, err := h.DecodeFrom(tee)
 113  	if err != nil {
 114  		return nil, err
 115  	}
 116  	if h.klen > uint32(1<<16) { // Key length must be below uint16.
 117  		return nil, errTruncate
 118  	}
 119  	kl := int(h.klen)
 120  	if cap(r.k) < kl {
 121  		r.k = make([]byte, 2*kl)
 122  	}
 123  	vl := int(h.vlen)
 124  	if cap(r.v) < vl {
 125  		r.v = make([]byte, 2*vl)
 126  	}
 127  
 128  	e := &Entry{}
 129  	e.offset = r.recordOffset
 130  	e.hlen = hlen
 131  	buf := make([]byte, h.klen+h.vlen)
 132  	if _, err := io.ReadFull(tee, buf[:]); err != nil {
 133  		if err == io.EOF {
 134  			err = errTruncate
 135  		}
 136  		return nil, err
 137  	}
 138  	if r.lf.encryptionEnabled() {
 139  		if buf, err = r.lf.decryptKV(buf[:], r.recordOffset); err != nil {
 140  			return nil, err
 141  		}
 142  	}
 143  	e.Key = buf[:h.klen]
 144  	e.Value = buf[h.klen:]
 145  	var crcBuf [crc32.Size]byte
 146  	if _, err := io.ReadFull(reader, crcBuf[:]); err != nil {
 147  		if err == io.EOF {
 148  			err = errTruncate
 149  		}
 150  		return nil, err
 151  	}
 152  	crc := y.BytesToU32(crcBuf[:])
 153  	if crc != tee.Sum32() {
 154  		return nil, errTruncate
 155  	}
 156  	e.meta = h.meta
 157  	e.UserMeta = h.userMeta
 158  	e.ExpiresAt = h.expiresAt
 159  	return e, nil
 160  }
 161  
 162  func (vlog *valueLog) rewrite(f *logFile) error {
 163  	vlog.filesLock.RLock()
 164  	for _, fid := range vlog.filesToBeDeleted {
 165  		if fid == f.fid {
 166  			vlog.filesLock.RUnlock()
 167  			return fmt.Errorf("value log file already marked for deletion fid: %d", fid)
 168  		}
 169  	}
 170  	maxFid := vlog.maxFid
 171  	y.AssertTruef(f.fid < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
 172  	vlog.filesLock.RUnlock()
 173  
 174  	vlog.opt.Infof("Rewriting fid: %d", f.fid)
 175  	wb := make([]*Entry, 0, 1000)
 176  	var size int64
 177  
 178  	y.AssertTrue(vlog.db != nil)
 179  	var count, moved int
 180  	fe := func(e Entry) error {
 181  		count++
 182  		if count%100000 == 0 {
 183  			vlog.opt.Debugf("Processing entry %d", count)
 184  		}
 185  
 186  		vs, err := vlog.db.get(e.Key)
 187  		if err != nil {
 188  			return err
 189  		}
 190  		if discardEntry(e, vs, vlog.db) {
 191  			return nil
 192  		}
 193  
 194  		// Value is still present in value log.
 195  		if len(vs.Value) == 0 {
 196  			return fmt.Errorf("Empty value: %+v", vs)
 197  		}
 198  		var vp valuePointer
 199  		vp.Decode(vs.Value)
 200  
 201  		// If the entry found from the LSM Tree points to a newer vlog file, don't do anything.
 202  		if vp.Fid > f.fid {
 203  			return nil
 204  		}
 205  		// If the entry found from the LSM Tree points to an offset greater than the one
 206  		// read from vlog, don't do anything.
 207  		if vp.Offset > e.offset {
 208  			return nil
 209  		}
 210  		// If the entry read from LSM Tree and vlog file point to the same vlog file and offset,
 211  		// insert them back into the DB.
 212  		// NOTE: It might be possible that the entry read from the LSM Tree points to
 213  		// an older vlog file. See the comments in the else part.
 214  		if vp.Fid == f.fid && vp.Offset == e.offset {
 215  			moved++
 216  			// This new entry only contains the key, and a pointer to the value.
 217  			ne := new(Entry)
 218  			// Remove only the bitValuePointer and transaction markers. We
 219  			// should keep the other bits.
 220  			ne.meta = e.meta &^ (bitValuePointer | bitTxn | bitFinTxn)
 221  			ne.UserMeta = e.UserMeta
 222  			ne.ExpiresAt = e.ExpiresAt
 223  			ne.Key = append([]byte{}, e.Key...)
 224  			ne.Value = append([]byte{}, e.Value...)
 225  			es := ne.estimateSizeAndSetThreshold(vlog.db.valueThreshold())
 226  			// Consider size of value as well while considering the total size
 227  			// of the batch. There have been reports of high memory usage in
 228  			// rewrite because we don't consider the value size. See #1292.
 229  			es += int64(len(e.Value))
 230  
 231  			// Ensure length and size of wb is within transaction limits.
 232  			if int64(len(wb)+1) >= vlog.opt.maxBatchCount ||
 233  				size+es >= vlog.opt.maxBatchSize {
 234  				if err := vlog.db.batchSet(wb); err != nil {
 235  					return err
 236  				}
 237  				size = 0
 238  				wb = wb[:0]
 239  			}
 240  			wb = append(wb, ne)
 241  			size += es
 242  		} else { //nolint:staticcheck
 243  			// It might be possible that the entry read from LSM Tree points to
 244  			// an older vlog file.  This can happen in the following situation.
 245  			// Assume DB is opened with
 246  			// numberOfVersionsToKeep=1
 247  			//
 248  			// Now, if we have ONLY one key in the system "FOO" which has been
 249  			// updated 3 times and the same key has been garbage collected 3
 250  			// times, we'll have 3 versions of the movekey
 251  			// for the same key "FOO".
 252  			//
 253  			// NOTE: moveKeyi is the gc'ed version of the original key with version i
 254  			// We're calling the gc'ed keys as moveKey to simplify the
 255  			// explanantion. We used to add move keys but we no longer do that.
 256  			//
 257  			// Assume we have 3 move keys in L0.
 258  			// - moveKey1 (points to vlog file 10),
 259  			// - moveKey2 (points to vlog file 14) and
 260  			// - moveKey3 (points to vlog file 15).
 261  			//
 262  			// Also, assume there is another move key "moveKey1" (points to
 263  			// vlog file 6) (this is also a move Key for key "FOO" ) on upper
 264  			// levels (let's say 3). The move key "moveKey1" on level 0 was
 265  			// inserted because vlog file 6 was GCed.
 266  			//
 267  			// Here's what the arrangement looks like
 268  			// L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15)
 269  			// L1 => ....
 270  			// L2 => ....
 271  			// L3 => (moveKey1 => vlog6)
 272  			//
 273  			// When L0 compaction runs, it keeps only moveKey3 because the number of versions
 274  			// to keep is set to 1. (we've dropped moveKey1's latest version)
 275  			//
 276  			// The new arrangement of keys is
 277  			// L0 => ....
 278  			// L1 => (moveKey3 => vlog15)
 279  			// L2 => ....
 280  			// L3 => (moveKey1 => vlog6)
 281  			//
 282  			// Now if we try to GC vlog file 10, the entry read from vlog file
 283  			// will point to vlog10 but the entry read from LSM Tree will point
 284  			// to vlog6. The move key read from LSM tree will point to vlog6
 285  			// because we've asked for version 1 of the move key.
 286  			//
 287  			// This might seem like an issue but it's not really an issue
 288  			// because the user has set the number of versions to keep to 1 and
 289  			// the latest version of moveKey points to the correct vlog file
 290  			// and offset. The stale move key on L3 will be eventually dropped
 291  			// by compaction because there is a newer versions in the upper
 292  			// levels.
 293  		}
 294  		return nil
 295  	}
 296  
 297  	_, err := f.iterate(vlog.opt.ReadOnly, 0, func(e Entry, vp valuePointer) error {
 298  		return fe(e)
 299  	})
 300  	if err != nil {
 301  		return err
 302  	}
 303  
 304  	batchSize := 1024
 305  	var loops int
 306  	for i := 0; i < len(wb); {
 307  		loops++
 308  		if batchSize == 0 {
 309  			vlog.db.opt.Warningf("We shouldn't reach batch size of zero.")
 310  			return ErrNoRewrite
 311  		}
 312  		end := i + batchSize
 313  		if end > len(wb) {
 314  			end = len(wb)
 315  		}
 316  		if err := vlog.db.batchSet(wb[i:end]); err != nil {
 317  			if err == ErrTxnTooBig {
 318  				// Decrease the batch size to half.
 319  				batchSize = batchSize / 2
 320  				continue
 321  			}
 322  			return err
 323  		}
 324  		i += batchSize
 325  	}
 326  	vlog.opt.Infof("Processed %d entries in %d loops", len(wb), loops)
 327  	vlog.opt.Infof("Total entries: %d. Moved: %d", count, moved)
 328  	vlog.opt.Infof("Removing fid: %d", f.fid)
 329  	var deleteFileNow bool
 330  	// Entries written to LSM. Remove the older file now.
 331  	{
 332  		vlog.filesLock.Lock()
 333  		// Just a sanity-check.
 334  		if _, ok := vlog.filesMap[f.fid]; !ok {
 335  			vlog.filesLock.Unlock()
 336  			return fmt.Errorf("Unable to find fid: %d", f.fid)
 337  		}
 338  		if vlog.iteratorCount() == 0 {
 339  			delete(vlog.filesMap, f.fid)
 340  			deleteFileNow = true
 341  		} else {
 342  			vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, f.fid)
 343  		}
 344  		vlog.filesLock.Unlock()
 345  	}
 346  
 347  	if deleteFileNow {
 348  		if err := vlog.deleteLogFile(f); err != nil {
 349  			return err
 350  		}
 351  	}
 352  	return nil
 353  }
 354  
 355  func (vlog *valueLog) incrIteratorCount() {
 356  	vlog.numActiveIterators.Add(1)
 357  }
 358  
 359  func (vlog *valueLog) iteratorCount() int {
 360  	return int(vlog.numActiveIterators.Load())
 361  }
 362  
 363  func (vlog *valueLog) decrIteratorCount() error {
 364  	num := vlog.numActiveIterators.Add(-1)
 365  	if num != 0 {
 366  		return nil
 367  	}
 368  
 369  	vlog.filesLock.Lock()
 370  	lfs := make([]*logFile, 0, len(vlog.filesToBeDeleted))
 371  	for _, id := range vlog.filesToBeDeleted {
 372  		lfs = append(lfs, vlog.filesMap[id])
 373  		delete(vlog.filesMap, id)
 374  	}
 375  	vlog.filesToBeDeleted = nil
 376  	vlog.filesLock.Unlock()
 377  
 378  	for _, lf := range lfs {
 379  		if err := vlog.deleteLogFile(lf); err != nil {
 380  			return err
 381  		}
 382  	}
 383  	return nil
 384  }
 385  
 386  func (vlog *valueLog) deleteLogFile(lf *logFile) error {
 387  	if lf == nil {
 388  		return nil
 389  	}
 390  	lf.lock.Lock()
 391  	defer lf.lock.Unlock()
 392  	// Delete fid from discard stats as well.
 393  	vlog.discardStats.Update(lf.fid, -1)
 394  
 395  	return lf.Delete()
 396  }
 397  
 398  func (vlog *valueLog) dropAll() (int, error) {
 399  	// If db is opened in InMemory mode, we don't need to do anything since there are no vlog files.
 400  	if vlog.db.opt.InMemory {
 401  		return 0, nil
 402  	}
 403  	// We don't want to block dropAll on any pending transactions. So, don't worry about iterator
 404  	// count.
 405  	var count int
 406  	deleteAll := func() error {
 407  		vlog.filesLock.Lock()
 408  		defer vlog.filesLock.Unlock()
 409  		for _, lf := range vlog.filesMap {
 410  			if err := vlog.deleteLogFile(lf); err != nil {
 411  				return err
 412  			}
 413  			count++
 414  		}
 415  		vlog.filesMap = make(map[uint32]*logFile)
 416  		vlog.maxFid = 0
 417  		return nil
 418  	}
 419  	if err := deleteAll(); err != nil {
 420  		return count, err
 421  	}
 422  
 423  	vlog.db.opt.Infof("Value logs deleted. Creating value log file: 1")
 424  	if _, err := vlog.createVlogFile(); err != nil { // Called while writes are stopped.
 425  		return count, err
 426  	}
 427  	return count, nil
 428  }
 429  
 430  func (db *DB) valueThreshold() int64 {
 431  	return db.threshold.valueThreshold.Load()
 432  }
 433  
 434  type valueLog struct {
 435  	dirPath string
 436  
 437  	// guards our view of which files exist, which to be deleted, how many active iterators
 438  	filesLock        sync.RWMutex
 439  	filesMap         map[uint32]*logFile
 440  	maxFid           uint32
 441  	filesToBeDeleted []uint32
 442  	// A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
 443  	numActiveIterators atomic.Int32
 444  
 445  	db                *DB
 446  	writableLogOffset atomic.Uint32 // read by read, written by write
 447  	numEntriesWritten uint32
 448  	opt               Options
 449  
 450  	garbageCh    chan struct{}
 451  	discardStats *discardStats
 452  }
 453  
 454  func vlogFilePath(dirPath string, fid uint32) string {
 455  	return fmt.Sprintf("%s%s%06d.vlog", dirPath, string(os.PathSeparator), fid)
 456  }
 457  
 458  func (vlog *valueLog) fpath(fid uint32) string {
 459  	return vlogFilePath(vlog.dirPath, fid)
 460  }
 461  
 462  func (vlog *valueLog) populateFilesMap() error {
 463  	vlog.filesMap = make(map[uint32]*logFile)
 464  
 465  	files, err := os.ReadDir(vlog.dirPath)
 466  	if err != nil {
 467  		return errFile(err, vlog.dirPath, "Unable to open log dir.")
 468  	}
 469  
 470  	found := make(map[uint64]struct{})
 471  	for _, file := range files {
 472  		if !strings.HasSuffix(file.Name(), ".vlog") {
 473  			continue
 474  		}
 475  		fsz := len(file.Name())
 476  		fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
 477  		if err != nil {
 478  			return errFile(err, file.Name(), "Unable to parse log id.")
 479  		}
 480  		if _, ok := found[fid]; ok {
 481  			return errFile(err, file.Name(), "Duplicate file found. Please delete one.")
 482  		}
 483  		found[fid] = struct{}{}
 484  
 485  		lf := &logFile{
 486  			fid:      uint32(fid),
 487  			path:     vlog.fpath(uint32(fid)),
 488  			registry: vlog.db.registry,
 489  		}
 490  		vlog.filesMap[uint32(fid)] = lf
 491  		if vlog.maxFid < uint32(fid) {
 492  			vlog.maxFid = uint32(fid)
 493  		}
 494  	}
 495  	return nil
 496  }
 497  
 498  func (vlog *valueLog) createVlogFile() (*logFile, error) {
 499  	fid := vlog.maxFid + 1
 500  	path := vlog.fpath(fid)
 501  	lf := &logFile{
 502  		fid:      fid,
 503  		path:     path,
 504  		registry: vlog.db.registry,
 505  		writeAt:  vlogHeaderSize,
 506  		opt:      vlog.opt,
 507  	}
 508  	err := lf.open(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 2*vlog.opt.ValueLogFileSize)
 509  	if err != z.NewFile && err != nil {
 510  		return nil, err
 511  	}
 512  
 513  	vlog.filesLock.Lock()
 514  	vlog.filesMap[fid] = lf
 515  	y.AssertTrue(vlog.maxFid < fid)
 516  	vlog.maxFid = fid
 517  	// writableLogOffset is only written by write func, by read by Read func.
 518  	// To avoid a race condition, all reads and updates to this variable must be
 519  	// done via atomics.
 520  	vlog.writableLogOffset.Store(vlogHeaderSize)
 521  	vlog.numEntriesWritten = 0
 522  	vlog.filesLock.Unlock()
 523  
 524  	return lf, nil
 525  }
 526  
 527  func errFile(err error, path string, msg string) error {
 528  	return fmt.Errorf("%s. Path=%s. Error=%v", msg, path, err)
 529  }
 530  
 531  // init initializes the value log struct. This initialization needs to happen
 532  // before compactions start.
 533  func (vlog *valueLog) init(db *DB) {
 534  	vlog.opt = db.opt
 535  	vlog.db = db
 536  	// We don't need to open any vlog files or collect stats for GC if DB is opened
 537  	// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
 538  	if vlog.opt.InMemory {
 539  		return
 540  	}
 541  	vlog.dirPath = vlog.opt.ValueDir
 542  
 543  	vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
 544  	lf, err := InitDiscardStats(vlog.opt)
 545  	y.Check(err)
 546  	vlog.discardStats = lf
 547  	// See TestPersistLFDiscardStats for purpose of statement below.
 548  	db.logToSyncChan(endVLogInitMsg)
 549  }
 550  
 551  func (vlog *valueLog) open(db *DB) error {
 552  	// We don't need to open any vlog files or collect stats for GC if DB is opened
 553  	// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
 554  	if db.opt.InMemory {
 555  		return nil
 556  	}
 557  
 558  	if err := vlog.populateFilesMap(); err != nil {
 559  		return err
 560  	}
 561  	// If no files are found, then create a new file.
 562  	if len(vlog.filesMap) == 0 {
 563  		if vlog.opt.ReadOnly {
 564  			return nil
 565  		}
 566  		_, err := vlog.createVlogFile()
 567  		return y.Wrapf(err, "Error while creating log file in valueLog.open")
 568  	}
 569  	fids := vlog.sortedFids()
 570  	for _, fid := range fids {
 571  		lf, ok := vlog.filesMap[fid]
 572  		y.AssertTrue(ok)
 573  
 574  		// Just open in RDWR mode. This should not create a new log file.
 575  		lf.opt = vlog.opt
 576  		if err := lf.open(vlog.fpath(fid), os.O_RDWR,
 577  			2*vlog.opt.ValueLogFileSize); err != nil {
 578  			return y.Wrapf(err, "Open existing file: %q", lf.path)
 579  		}
 580  		// We shouldn't delete the maxFid file.
 581  		if lf.size.Load() == vlogHeaderSize && fid != vlog.maxFid {
 582  			vlog.opt.Infof("Deleting empty file: %s", lf.path)
 583  			if err := lf.Delete(); err != nil {
 584  				return y.Wrapf(err, "while trying to delete empty file: %s", lf.path)
 585  			}
 586  			delete(vlog.filesMap, fid)
 587  		}
 588  	}
 589  
 590  	if vlog.opt.ReadOnly {
 591  		return nil
 592  	}
 593  	// Now we can read the latest value log file, and see if it needs truncation. We could
 594  	// technically do this over all the value log files, but that would mean slowing down the value
 595  	// log open.
 596  	last, ok := vlog.filesMap[vlog.maxFid]
 597  	y.AssertTrue(ok)
 598  	lastOff, err := last.iterate(vlog.opt.ReadOnly, vlogHeaderSize,
 599  		func(_ Entry, vp valuePointer) error {
 600  			return nil
 601  		})
 602  	if err != nil {
 603  		return y.Wrapf(err, "while iterating over: %s", last.path)
 604  	}
 605  	if err := last.Truncate(int64(lastOff)); err != nil {
 606  		return y.Wrapf(err, "while truncating last value log file: %s", last.path)
 607  	}
 608  
 609  	// Don't write to the old log file. Always create a new one.
 610  	if _, err := vlog.createVlogFile(); err != nil {
 611  		return y.Wrapf(err, "Error while creating log file in valueLog.open")
 612  	}
 613  	return nil
 614  }
 615  
 616  func (vlog *valueLog) Close() error {
 617  	if vlog == nil || vlog.db == nil || vlog.db.opt.InMemory {
 618  		return nil
 619  	}
 620  
 621  	vlog.opt.Debugf("Stopping garbage collection of values.")
 622  	var err error
 623  	for id, lf := range vlog.filesMap {
 624  		lf.lock.Lock() // We won’t release the lock.
 625  		offset := int64(-1)
 626  
 627  		if !vlog.opt.ReadOnly && id == vlog.maxFid {
 628  			offset = int64(vlog.woffset())
 629  		}
 630  		if terr := lf.Close(offset); terr != nil && err == nil {
 631  			err = terr
 632  		}
 633  	}
 634  	if vlog.discardStats != nil {
 635  		vlog.db.captureDiscardStats()
 636  		if terr := vlog.discardStats.Close(-1); terr != nil && err == nil {
 637  			err = terr
 638  		}
 639  	}
 640  	return err
 641  }
 642  
 643  // sortedFids returns the file id's not pending deletion, sorted.  Assumes we have shared access to
 644  // filesMap.
 645  func (vlog *valueLog) sortedFids() []uint32 {
 646  	toBeDeleted := make(map[uint32]struct{})
 647  	for _, fid := range vlog.filesToBeDeleted {
 648  		toBeDeleted[fid] = struct{}{}
 649  	}
 650  	ret := make([]uint32, 0, len(vlog.filesMap))
 651  	for fid := range vlog.filesMap {
 652  		if _, ok := toBeDeleted[fid]; !ok {
 653  			ret = append(ret, fid)
 654  		}
 655  	}
 656  	sort.Slice(ret, func(i, j int) bool {
 657  		return ret[i] < ret[j]
 658  	})
 659  	return ret
 660  }
 661  
 662  type request struct {
 663  	// Input values
 664  	Entries []*Entry
 665  	// Output values and wait group stuff below
 666  	Ptrs []valuePointer
 667  	Wg   sync.WaitGroup
 668  	Err  error
 669  	ref  atomic.Int32
 670  }
 671  
 672  func (req *request) reset() {
 673  	req.Entries = req.Entries[:0]
 674  	req.Ptrs = req.Ptrs[:0]
 675  	req.Wg = sync.WaitGroup{}
 676  	req.Err = nil
 677  	req.ref.Store(0)
 678  }
 679  
 680  func (req *request) IncrRef() {
 681  	req.ref.Add(1)
 682  }
 683  
 684  func (req *request) DecrRef() {
 685  	nRef := req.ref.Add(-1)
 686  	if nRef > 0 {
 687  		return
 688  	}
 689  	req.Entries = nil
 690  	requestPool.Put(req)
 691  }
 692  
 693  func (req *request) Wait() error {
 694  	req.Wg.Wait()
 695  	err := req.Err
 696  	req.DecrRef() // DecrRef after writing to DB.
 697  	return err
 698  }
 699  
 700  type requests []*request
 701  
 702  func (reqs requests) DecrRef() {
 703  	for _, req := range reqs {
 704  		req.DecrRef()
 705  	}
 706  }
 707  
 708  func (reqs requests) IncrRef() {
 709  	for _, req := range reqs {
 710  		req.IncrRef()
 711  	}
 712  }
 713  
 714  // sync function syncs content of latest value log file to disk. Syncing of value log directory is
 715  // not required here as it happens every time a value log file rotation happens(check createVlogFile
 716  // function). During rotation, previous value log file also gets synced to disk. It only syncs file
 717  // if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with
 718  // fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32.
 719  func (vlog *valueLog) sync() error {
 720  	if vlog.opt.SyncWrites || vlog.opt.InMemory {
 721  		return nil
 722  	}
 723  
 724  	vlog.filesLock.RLock()
 725  	maxFid := vlog.maxFid
 726  	curlf := vlog.filesMap[maxFid]
 727  	// Sometimes it is possible that vlog.maxFid has been increased but file creation
 728  	// with same id is still in progress and this function is called. In those cases
 729  	// entry for the file might not be present in vlog.filesMap.
 730  	if curlf == nil {
 731  		vlog.filesLock.RUnlock()
 732  		return nil
 733  	}
 734  	curlf.lock.RLock()
 735  	vlog.filesLock.RUnlock()
 736  
 737  	err := curlf.Sync()
 738  	curlf.lock.RUnlock()
 739  	return err
 740  }
 741  
 742  func (vlog *valueLog) woffset() uint32 {
 743  	return vlog.writableLogOffset.Load()
 744  }
 745  
 746  // validateWrites will check whether the given requests can fit into 4GB vlog file.
 747  // NOTE: 4GB is the maximum size we can create for vlog because value pointer offset is of type
 748  // uint32. If we create more than 4GB, it will overflow uint32. So, limiting the size to 4GB.
 749  func (vlog *valueLog) validateWrites(reqs []*request) error {
 750  	vlogOffset := uint64(vlog.woffset())
 751  	for _, req := range reqs {
 752  		// calculate size of the request.
 753  		size := estimateRequestSize(req)
 754  		estimatedVlogOffset := vlogOffset + size
 755  		if estimatedVlogOffset > uint64(maxVlogFileSize) {
 756  			return fmt.Errorf("Request size offset %d is bigger than maximum offset %d",
 757  				estimatedVlogOffset, maxVlogFileSize)
 758  		}
 759  
 760  		if estimatedVlogOffset >= uint64(vlog.opt.ValueLogFileSize) {
 761  			// We'll create a new vlog file if the estimated offset is greater or equal to
 762  			// max vlog size. So, resetting the vlogOffset.
 763  			vlogOffset = 0
 764  			continue
 765  		}
 766  		// Estimated vlog offset will become current vlog offset if the vlog is not rotated.
 767  		vlogOffset = estimatedVlogOffset
 768  	}
 769  	return nil
 770  }
 771  
 772  // estimateRequestSize returns the size that needed to be written for the given request.
 773  func estimateRequestSize(req *request) uint64 {
 774  	size := uint64(0)
 775  	for _, e := range req.Entries {
 776  		size += uint64(maxHeaderSize + len(e.Key) + len(e.Value) + crc32.Size)
 777  	}
 778  	return size
 779  }
 780  
 781  // write is thread-unsafe by design and should not be called concurrently.
 782  func (vlog *valueLog) write(reqs []*request) error {
 783  	if vlog.db.opt.InMemory {
 784  		return nil
 785  	}
 786  	// Validate writes before writing to vlog. Because, we don't want to partially write and return
 787  	// an error.
 788  	if err := vlog.validateWrites(reqs); err != nil {
 789  		return y.Wrapf(err, "while validating writes")
 790  	}
 791  
 792  	vlog.filesLock.RLock()
 793  	maxFid := vlog.maxFid
 794  	curlf := vlog.filesMap[maxFid]
 795  	vlog.filesLock.RUnlock()
 796  
 797  	defer func() {
 798  		if vlog.opt.SyncWrites {
 799  			if err := curlf.Sync(); err != nil {
 800  				vlog.opt.Errorf("Error while curlf sync: %v\n", err)
 801  			}
 802  		}
 803  	}()
 804  
 805  	write := func(buf *bytes.Buffer) error {
 806  		if buf.Len() == 0 {
 807  			return nil
 808  		}
 809  
 810  		n := uint32(buf.Len())
 811  		endOffset := vlog.writableLogOffset.Add(n)
 812  		// Increase the file size if we cannot accommodate this entry.
 813  		// [Aman] Should this be >= or just >? Doesn't make sense to extend the file if it big enough already.
 814  		if int(endOffset) >= len(curlf.Data) {
 815  			if err := curlf.Truncate(int64(endOffset)); err != nil {
 816  				return err
 817  			}
 818  		}
 819  
 820  		start := int(endOffset - n)
 821  		y.AssertTrue(copy(curlf.Data[start:], buf.Bytes()) == int(n))
 822  
 823  		curlf.size.Store(endOffset)
 824  		return nil
 825  	}
 826  
 827  	toDisk := func() error {
 828  		if vlog.woffset() > uint32(vlog.opt.ValueLogFileSize) ||
 829  			vlog.numEntriesWritten > vlog.opt.ValueLogMaxEntries {
 830  			if err := curlf.doneWriting(vlog.woffset()); err != nil {
 831  				return err
 832  			}
 833  
 834  			newlf, err := vlog.createVlogFile()
 835  			if err != nil {
 836  				return err
 837  			}
 838  			curlf = newlf
 839  		}
 840  		return nil
 841  	}
 842  
 843  	buf := new(bytes.Buffer)
 844  	for i := range reqs {
 845  		b := reqs[i]
 846  		b.Ptrs = b.Ptrs[:0]
 847  		var written, bytesWritten int
 848  		valueSizes := make([]int64, 0, len(b.Entries))
 849  		for j := range b.Entries {
 850  			buf.Reset()
 851  
 852  			e := b.Entries[j]
 853  			valueSizes = append(valueSizes, int64(len(e.Value)))
 854  			if e.skipVlogAndSetThreshold(vlog.db.valueThreshold()) {
 855  				b.Ptrs = append(b.Ptrs, valuePointer{})
 856  				continue
 857  			}
 858  			var p valuePointer
 859  
 860  			p.Fid = curlf.fid
 861  			p.Offset = vlog.woffset()
 862  
 863  			// We should not store transaction marks in the vlog file because it will never have all
 864  			// the entries in a transaction. If we store entries with transaction marks then value
 865  			// GC will not be able to iterate on the entire vlog file.
 866  			// But, we still want the entry to stay intact for the memTable WAL. So, store the meta
 867  			// in a temporary variable and reassign it after writing to the value log.
 868  			tmpMeta := e.meta
 869  			e.meta = e.meta &^ (bitTxn | bitFinTxn)
 870  			plen, err := curlf.encodeEntry(buf, e, p.Offset) // Now encode the entry into buffer.
 871  			if err != nil {
 872  				return err
 873  			}
 874  			// Restore the meta.
 875  			e.meta = tmpMeta
 876  
 877  			p.Len = uint32(plen)
 878  			b.Ptrs = append(b.Ptrs, p)
 879  			if err := write(buf); err != nil {
 880  				return err
 881  			}
 882  			written++
 883  			bytesWritten += buf.Len()
 884  			// No need to flush anything, we write to file directly via mmap.
 885  		}
 886  		y.NumWritesVlogAdd(vlog.opt.MetricsEnabled, int64(written))
 887  		y.NumBytesWrittenVlogAdd(vlog.opt.MetricsEnabled, int64(bytesWritten))
 888  
 889  		vlog.numEntriesWritten += uint32(written)
 890  		vlog.db.threshold.update(valueSizes)
 891  		// We write to disk here so that all entries that are part of the same transaction are
 892  		// written to the same vlog file.
 893  		if err := toDisk(); err != nil {
 894  			return err
 895  		}
 896  	}
 897  	return toDisk()
 898  }
 899  
 900  // Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
 901  // (if non-nil)
 902  func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
 903  	vlog.filesLock.RLock()
 904  	defer vlog.filesLock.RUnlock()
 905  	ret, ok := vlog.filesMap[vp.Fid]
 906  	if !ok {
 907  		// log file has gone away, we can't do anything. Return.
 908  		return nil, fmt.Errorf("file with ID: %d not found", vp.Fid)
 909  	}
 910  
 911  	// Check for valid offset if we are reading from writable log.
 912  	maxFid := vlog.maxFid
 913  	// In read-only mode we don't need to check for writable offset as we are not writing anything.
 914  	// Moreover, this offset is not set in readonly mode.
 915  	if !vlog.opt.ReadOnly && vp.Fid == maxFid {
 916  		currentOffset := vlog.woffset()
 917  		if vp.Offset >= currentOffset {
 918  			return nil, fmt.Errorf(
 919  				"Invalid value pointer offset: %d greater than current offset: %d",
 920  				vp.Offset, currentOffset)
 921  		}
 922  	}
 923  
 924  	ret.lock.RLock()
 925  	return ret, nil
 926  }
 927  
 928  // Read reads the value log at a given location.
 929  // TODO: Make this read private.
 930  func (vlog *valueLog) Read(vp valuePointer, _ *y.Slice) ([]byte, func(), error) {
 931  	buf, lf, err := vlog.readValueBytes(vp)
 932  	// log file is locked so, decide whether to lock immediately or let the caller to
 933  	// unlock it, after caller uses it.
 934  	cb := vlog.getUnlockCallback(lf)
 935  	if err != nil {
 936  		return nil, cb, err
 937  	}
 938  
 939  	if vlog.opt.VerifyValueChecksum {
 940  		hash := crc32.New(y.CastagnoliCrcTable)
 941  		if _, err := hash.Write(buf[:len(buf)-crc32.Size]); err != nil {
 942  			runCallback(cb)
 943  			return nil, nil, y.Wrapf(err, "failed to write hash for vp %+v", vp)
 944  		}
 945  		// Fetch checksum from the end of the buffer.
 946  		checksum := buf[len(buf)-crc32.Size:]
 947  		if hash.Sum32() != y.BytesToU32(checksum) {
 948  			runCallback(cb)
 949  			return nil, nil, y.Wrapf(y.ErrChecksumMismatch, "value corrupted for vp: %+v", vp)
 950  		}
 951  	}
 952  	var h header
 953  	headerLen := h.Decode(buf)
 954  	kv := buf[headerLen:]
 955  	if lf.encryptionEnabled() {
 956  		kv, err = lf.decryptKV(kv, vp.Offset)
 957  		if err != nil {
 958  			return nil, cb, err
 959  		}
 960  	}
 961  	if uint32(len(kv)) < h.klen+h.vlen {
 962  		vlog.db.opt.Errorf("Invalid read: vp: %+v", vp)
 963  		return nil, nil, fmt.Errorf("Invalid read: Len: %d read at:[%d:%d]",
 964  			len(kv), h.klen, h.klen+h.vlen)
 965  	}
 966  	return kv[h.klen : h.klen+h.vlen], cb, nil
 967  }
 968  
 969  // getUnlockCallback will returns a function which unlock the logfile if the logfile is mmaped.
 970  // otherwise, it unlock the logfile and return nil.
 971  func (vlog *valueLog) getUnlockCallback(lf *logFile) func() {
 972  	if lf == nil {
 973  		return nil
 974  	}
 975  	return lf.lock.RUnlock
 976  }
 977  
 978  // readValueBytes return vlog entry slice and read locked log file. Caller should take care of
 979  // logFile unlocking.
 980  func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error) {
 981  	lf, err := vlog.getFileRLocked(vp)
 982  	if err != nil {
 983  		return nil, nil, err
 984  	}
 985  
 986  	buf, err := lf.read(vp)
 987  	y.NumReadsVlogAdd(vlog.db.opt.MetricsEnabled, 1)
 988  	y.NumBytesReadsVlogAdd(vlog.db.opt.MetricsEnabled, int64(len(buf)))
 989  	return buf, lf, err
 990  }
 991  
 992  func (vlog *valueLog) pickLog(discardRatio float64) *logFile {
 993  	vlog.filesLock.RLock()
 994  	defer vlog.filesLock.RUnlock()
 995  
 996  LOOP:
 997  	// Pick a candidate that contains the largest amount of discardable data
 998  	fid, discard := vlog.discardStats.MaxDiscard()
 999  
1000  	// MaxDiscard will return fid=0 if it doesn't have any discard data. The
1001  	// vlog files start from 1.
1002  	if fid == 0 {
1003  		vlog.opt.Debugf("No file with discard stats")
1004  		return nil
1005  	}
1006  	lf, ok := vlog.filesMap[fid]
1007  	// This file was deleted but it's discard stats increased because of compactions. The file
1008  	// doesn't exist so we don't need to do anything. Skip it and retry.
1009  	if !ok {
1010  		vlog.discardStats.Update(fid, -1)
1011  		goto LOOP
1012  	}
1013  	// We have a valid file.
1014  	fi, err := lf.Fd.Stat()
1015  	if err != nil {
1016  		vlog.opt.Errorf("Unable to get stats for value log fid: %d err: %+v", fi, err)
1017  		return nil
1018  	}
1019  	if thr := discardRatio * float64(fi.Size()); float64(discard) < thr {
1020  		vlog.opt.Debugf("Discard: %d less than threshold: %.0f for file: %s",
1021  			discard, thr, fi.Name())
1022  		return nil
1023  	}
1024  	if fid < vlog.maxFid {
1025  		vlog.opt.Infof("Found value log max discard fid: %d discard: %d\n", fid, discard)
1026  		lf, ok := vlog.filesMap[fid]
1027  		y.AssertTrue(ok)
1028  		return lf
1029  	}
1030  
1031  	// Don't randomly pick any value log file.
1032  	return nil
1033  }
1034  
1035  func discardEntry(e Entry, vs y.ValueStruct, db *DB) bool {
1036  	if vs.Version != y.ParseTs(e.Key) {
1037  		// Version not found. Discard.
1038  		return true
1039  	}
1040  	if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
1041  		return true
1042  	}
1043  	if (vs.Meta & bitValuePointer) == 0 {
1044  		// Key also stores the value in LSM. Discard.
1045  		return true
1046  	}
1047  	if (vs.Meta & bitFinTxn) > 0 {
1048  		// Just a txn finish entry. Discard.
1049  		return true
1050  	}
1051  	return false
1052  }
1053  
1054  func (vlog *valueLog) doRunGC(lf *logFile) error {
1055  	_, span := otel.Tracer("").Start(context.TODO(), "Badger.GC")
1056  	span.SetAttributes(attribute.String("GC rewrite for", lf.path))
1057  	defer span.End()
1058  	if err := vlog.rewrite(lf); err != nil {
1059  		return err
1060  	}
1061  	// Remove the file from discardStats.
1062  	vlog.discardStats.Update(lf.fid, -1)
1063  	return nil
1064  }
1065  
1066  func (vlog *valueLog) waitOnGC(lc *z.Closer) {
1067  	defer lc.Done()
1068  
1069  	<-lc.HasBeenClosed() // Wait for lc to be closed.
1070  
1071  	// Block any GC in progress to finish, and don't allow any more writes to runGC by filling up
1072  	// the channel of size 1.
1073  	vlog.garbageCh <- struct{}{}
1074  }
1075  
1076  func (vlog *valueLog) runGC(discardRatio float64) error {
1077  	select {
1078  	case vlog.garbageCh <- struct{}{}:
1079  		// Pick a log file for GC.
1080  		defer func() {
1081  			<-vlog.garbageCh
1082  		}()
1083  
1084  		lf := vlog.pickLog(discardRatio)
1085  		if lf == nil {
1086  			return ErrNoRewrite
1087  		}
1088  		return vlog.doRunGC(lf)
1089  	default:
1090  		return ErrRejected
1091  	}
1092  }
1093  
1094  func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
1095  	if vlog.opt.InMemory {
1096  		return
1097  	}
1098  	for fid, discard := range stats {
1099  		vlog.discardStats.Update(fid, discard)
1100  	}
1101  	// The following is to coordinate with some test cases where we want to
1102  	// verify that at least one iteration of updateDiscardStats has been completed.
1103  	vlog.db.logToSyncChan(updateDiscardStatsMsg)
1104  }
1105  
1106  type vlogThreshold struct {
1107  	logger         Logger
1108  	percentile     float64
1109  	valueThreshold atomic.Int64
1110  	valueCh        chan []int64
1111  	clearCh        chan bool
1112  	closer         *z.Closer
1113  	// Metrics contains a running log of statistics like amount of data stored etc.
1114  	vlMetrics *z.HistogramData
1115  }
1116  
1117  func initVlogThreshold(opt *Options) *vlogThreshold {
1118  	getBounds := func() []float64 {
1119  		mxbd := opt.maxValueThreshold
1120  		mnbd := float64(opt.ValueThreshold)
1121  		y.AssertTruef(mxbd >= mnbd, "maximum threshold bound is less than the min threshold")
1122  		size := math.Min(mxbd-mnbd+1, 1024.0)
1123  		bdstp := (mxbd - mnbd) / size
1124  		bounds := make([]float64, int64(size))
1125  		for i := range bounds {
1126  			if i == 0 {
1127  				bounds[0] = mnbd
1128  				continue
1129  			}
1130  			if i == int(size-1) {
1131  				bounds[i] = mxbd
1132  				continue
1133  			}
1134  			bounds[i] = bounds[i-1] + bdstp
1135  		}
1136  		return bounds
1137  	}
1138  	lt := &vlogThreshold{
1139  		logger:     opt.Logger,
1140  		percentile: opt.VLogPercentile,
1141  		valueCh:    make(chan []int64, 1000),
1142  		clearCh:    make(chan bool, 1),
1143  		closer:     z.NewCloser(1),
1144  		vlMetrics:  z.NewHistogramData(getBounds()),
1145  	}
1146  	lt.valueThreshold.Store(opt.ValueThreshold)
1147  	return lt
1148  }
1149  
1150  func (v *vlogThreshold) Clear(opt Options) {
1151  	v.valueThreshold.Store(opt.ValueThreshold)
1152  	v.clearCh <- true
1153  }
1154  
1155  func (v *vlogThreshold) update(sizes []int64) {
1156  	v.valueCh <- sizes
1157  }
1158  
1159  func (v *vlogThreshold) close() {
1160  	v.closer.SignalAndWait()
1161  }
1162  
1163  func (v *vlogThreshold) listenForValueThresholdUpdate() {
1164  	defer v.closer.Done()
1165  	for {
1166  		select {
1167  		case <-v.closer.HasBeenClosed():
1168  			return
1169  		case val := <-v.valueCh:
1170  			for _, e := range val {
1171  				v.vlMetrics.Update(e)
1172  			}
1173  			// we are making it to get Options.VlogPercentile so that values with sizes
1174  			// in range of Options.VlogPercentile will make it to the LSM tree and rest to the
1175  			// value log file.
1176  			p := int64(v.vlMetrics.Percentile(v.percentile))
1177  			if v.valueThreshold.Load() != p {
1178  				if v.logger != nil {
1179  					v.logger.Infof("updating value of threshold to: %d", p)
1180  				}
1181  				v.valueThreshold.Store(p)
1182  			}
1183  		case <-v.clearCh:
1184  			v.vlMetrics.Clear()
1185  		}
1186  	}
1187  }
1188