db.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package badger
   7  
   8  import (
   9  	"bytes"
  10  	"context"
  11  	"encoding/binary"
  12  	"errors"
  13  	"expvar"
  14  	"fmt"
  15  	"math"
  16  	"os"
  17  	"path/filepath"
  18  	"sort"
  19  	"strings"
  20  	"sync"
  21  	"sync/atomic"
  22  	"time"
  23  
  24  	humanize "github.com/dustin/go-humanize"
  25  
  26  	"github.com/dgraph-io/badger/v4/fb"
  27  	"github.com/dgraph-io/badger/v4/options"
  28  	"github.com/dgraph-io/badger/v4/pb"
  29  	"github.com/dgraph-io/badger/v4/skl"
  30  	"github.com/dgraph-io/badger/v4/table"
  31  	"github.com/dgraph-io/badger/v4/y"
  32  	"github.com/dgraph-io/ristretto/v2"
  33  	"github.com/dgraph-io/ristretto/v2/z"
  34  )
  35  
  36  var (
  37  	badgerPrefix = []byte("!badger!")       // Prefix for internal keys used by badger.
  38  	txnKey       = []byte("!badger!txn")    // For indicating end of entries in txn.
  39  	bannedNsKey  = []byte("!badger!banned") // For storing the banned namespaces.
  40  )
  41  
  42  type closers struct {
  43  	updateSize  *z.Closer
  44  	compactors  *z.Closer
  45  	memtable    *z.Closer
  46  	writes      *z.Closer
  47  	valueGC     *z.Closer
  48  	pub         *z.Closer
  49  	cacheHealth *z.Closer
  50  }
  51  
  52  type lockedKeys struct {
  53  	sync.RWMutex
  54  	keys map[uint64]struct{}
  55  }
  56  
  57  func (lk *lockedKeys) add(key uint64) {
  58  	lk.Lock()
  59  	defer lk.Unlock()
  60  	lk.keys[key] = struct{}{}
  61  }
  62  
  63  func (lk *lockedKeys) has(key uint64) bool {
  64  	lk.RLock()
  65  	defer lk.RUnlock()
  66  	_, ok := lk.keys[key]
  67  	return ok
  68  }
  69  
  70  func (lk *lockedKeys) all() []uint64 {
  71  	lk.RLock()
  72  	defer lk.RUnlock()
  73  	keys := make([]uint64, 0, len(lk.keys))
  74  	for key := range lk.keys {
  75  		keys = append(keys, key)
  76  	}
  77  	return keys
  78  }
  79  
  80  // DB provides the various functions required to interact with Badger.
  81  // DB is thread-safe.
  82  type DB struct {
  83  	testOnlyDBExtensions
  84  
  85  	lock sync.RWMutex // Guards list of inmemory tables, not individual reads and writes.
  86  
  87  	dirLockGuard *directoryLockGuard
  88  	// nil if Dir and ValueDir are the same
  89  	valueDirGuard *directoryLockGuard
  90  
  91  	closers closers
  92  
  93  	mt  *memTable   // Our latest (actively written) in-memory table
  94  	imm []*memTable // Add here only AFTER pushing to flushChan.
  95  
  96  	// Initialized via openMemTables.
  97  	nextMemFid int
  98  
  99  	opt       Options
 100  	manifest  *manifestFile
 101  	lc        *levelsController
 102  	vlog      valueLog
 103  	writeCh   chan *request
 104  	flushChan chan *memTable // For flushing memtables.
 105  	closeOnce sync.Once      // For closing DB only once.
 106  
 107  	blockWrites atomic.Int32
 108  	isClosed    atomic.Uint32
 109  
 110  	orc              *oracle
 111  	bannedNamespaces *lockedKeys
 112  	threshold        *vlogThreshold
 113  
 114  	pub        *publisher
 115  	registry   *KeyRegistry
 116  	blockCache *ristretto.Cache[[]byte, *table.Block]
 117  	indexCache *ristretto.Cache[uint64, *fb.TableIndex]
 118  	allocPool  *z.AllocatorPool
 119  }
 120  
 121  const (
 122  	kvWriteChCapacity = 1000
 123  )
 124  
 125  func checkAndSetOptions(opt *Options) error {
 126  	// It's okay to have zero compactors which will disable all compactions but
 127  	// we cannot have just one compactor otherwise we will end up with all data
 128  	// on level 2.
 129  	if opt.NumCompactors == 1 {
 130  		return errors.New("Cannot have 1 compactor. Need at least 2")
 131  	}
 132  
 133  	if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") {
 134  		return errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set")
 135  	}
 136  	opt.maxBatchSize = (15 * opt.MemTableSize) / 100
 137  	opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize)
 138  
 139  	// This is the maximum value, vlogThreshold can have if dynamic thresholding is enabled.
 140  	opt.maxValueThreshold = math.Min(maxValueThreshold, float64(opt.maxBatchSize))
 141  	if opt.VLogPercentile < 0.0 || opt.VLogPercentile > 1.0 {
 142  		return errors.New("vlogPercentile must be within range of 0.0-1.0")
 143  	}
 144  
 145  	// We are limiting opt.ValueThreshold to maxValueThreshold for now.
 146  	if opt.ValueThreshold > maxValueThreshold {
 147  		return fmt.Errorf("Invalid ValueThreshold, must be less or equal to %d",
 148  			maxValueThreshold)
 149  	}
 150  
 151  	// If ValueThreshold is greater than opt.maxBatchSize, we won't be able to push any data using
 152  	// the transaction APIs. Transaction batches entries into batches of size opt.maxBatchSize.
 153  	if opt.ValueThreshold > opt.maxBatchSize {
 154  		return fmt.Errorf("Valuethreshold %d greater than max batch size of %d. Either "+
 155  			"reduce opt.ValueThreshold or increase opt.BaseTableSize.",
 156  			opt.ValueThreshold, opt.maxBatchSize)
 157  	}
 158  	// ValueLogFileSize should be strictly LESS than 2<<30 otherwise we will
 159  	// overflow the uint32 when we mmap it in OpenMemtable.
 160  	if !(opt.ValueLogFileSize < 2<<30 && opt.ValueLogFileSize >= 1<<20) {
 161  		return ErrValueLogSize
 162  	}
 163  
 164  	if opt.ReadOnly {
 165  		// Do not perform compaction in read only mode.
 166  		opt.CompactL0OnClose = false
 167  	}
 168  
 169  	needCache := (opt.Compression != options.None) || (len(opt.EncryptionKey) > 0)
 170  	if needCache && opt.BlockCacheSize == 0 {
 171  		panic("BlockCacheSize should be set since compression/encryption are enabled")
 172  	}
 173  	return nil
 174  }
 175  
 176  // Open returns a new DB object.
 177  func Open(opt Options) (*DB, error) {
 178  	if err := checkAndSetOptions(&opt); err != nil {
 179  		return nil, err
 180  	}
 181  	var dirLockGuard, valueDirLockGuard *directoryLockGuard
 182  
 183  	// Create directories and acquire lock on it only if badger is not running in InMemory mode.
 184  	// We don't have any directories/files in InMemory mode so we don't need to acquire
 185  	// any locks on them.
 186  	if !opt.InMemory {
 187  		if err := createDirs(opt); err != nil {
 188  			return nil, err
 189  		}
 190  		var err error
 191  		if !opt.BypassLockGuard {
 192  			dirLockGuard, err = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
 193  			if err != nil {
 194  				return nil, err
 195  			}
 196  			defer func() {
 197  				if dirLockGuard != nil {
 198  					_ = dirLockGuard.release()
 199  				}
 200  			}()
 201  			absDir, err := filepath.Abs(opt.Dir)
 202  			if err != nil {
 203  				return nil, err
 204  			}
 205  			absValueDir, err := filepath.Abs(opt.ValueDir)
 206  			if err != nil {
 207  				return nil, err
 208  			}
 209  			if absValueDir != absDir {
 210  				valueDirLockGuard, err = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)
 211  				if err != nil {
 212  					return nil, err
 213  				}
 214  				defer func() {
 215  					if valueDirLockGuard != nil {
 216  						_ = valueDirLockGuard.release()
 217  					}
 218  				}()
 219  			}
 220  		}
 221  	}
 222  
 223  	manifestFile, manifest, err := openOrCreateManifestFile(opt)
 224  	if err != nil {
 225  		return nil, err
 226  	}
 227  	defer func() {
 228  		if manifestFile != nil {
 229  			_ = manifestFile.close()
 230  		}
 231  	}()
 232  
 233  	db := &DB{
 234  		imm:              make([]*memTable, 0, opt.NumMemtables),
 235  		flushChan:        make(chan *memTable, opt.NumMemtables),
 236  		writeCh:          make(chan *request, kvWriteChCapacity),
 237  		opt:              opt,
 238  		manifest:         manifestFile,
 239  		dirLockGuard:     dirLockGuard,
 240  		valueDirGuard:    valueDirLockGuard,
 241  		orc:              newOracle(opt),
 242  		pub:              newPublisher(),
 243  		allocPool:        z.NewAllocatorPool(8),
 244  		bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
 245  		threshold:        initVlogThreshold(&opt),
 246  	}
 247  
 248  	db.syncChan = opt.syncChan
 249  
 250  	// Cleanup all the goroutines started by badger in case of an error.
 251  	defer func() {
 252  		if err != nil {
 253  			opt.Errorf("Received err: %v. Cleaning up...", err)
 254  			db.cleanup()
 255  			db = nil
 256  		}
 257  	}()
 258  
 259  	if opt.BlockCacheSize > 0 {
 260  		numInCache := opt.BlockCacheSize / int64(opt.BlockSize)
 261  		if numInCache == 0 {
 262  			// Make the value of this variable at least one since the cache requires
 263  			// the number of counters to be greater than zero.
 264  			numInCache = 1
 265  		}
 266  
 267  		config := ristretto.Config[[]byte, *table.Block]{
 268  			NumCounters: numInCache * 8,
 269  			MaxCost:     opt.BlockCacheSize,
 270  			BufferItems: 64,
 271  			Metrics:     true,
 272  			OnExit:      table.BlockEvictHandler,
 273  		}
 274  		db.blockCache, err = ristretto.NewCache[[]byte, *table.Block](&config)
 275  		if err != nil {
 276  			return nil, y.Wrap(err, "failed to create data cache")
 277  		}
 278  	}
 279  
 280  	if opt.IndexCacheSize > 0 {
 281  		// Index size is around 5% of the table size.
 282  		indexSz := int64(float64(opt.MemTableSize) * 0.05)
 283  		numInCache := opt.IndexCacheSize / indexSz
 284  		if numInCache == 0 {
 285  			// Make the value of this variable at least one since the cache requires
 286  			// the number of counters to be greater than zero.
 287  			numInCache = 1
 288  		}
 289  
 290  		config := ristretto.Config[uint64, *fb.TableIndex]{
 291  			NumCounters: numInCache * 8,
 292  			MaxCost:     opt.IndexCacheSize,
 293  			BufferItems: 64,
 294  			Metrics:     true,
 295  		}
 296  		db.indexCache, err = ristretto.NewCache(&config)
 297  		if err != nil {
 298  			return nil, y.Wrap(err, "failed to create bf cache")
 299  		}
 300  	}
 301  
 302  	db.closers.cacheHealth = z.NewCloser(1)
 303  	go db.monitorCache(db.closers.cacheHealth)
 304  
 305  	if db.opt.InMemory {
 306  		db.opt.SyncWrites = false
 307  		// If badger is running in memory mode, push everything into the LSM Tree.
 308  		db.opt.ValueThreshold = math.MaxInt32
 309  	}
 310  	krOpt := KeyRegistryOptions{
 311  		ReadOnly:                      opt.ReadOnly,
 312  		Dir:                           opt.Dir,
 313  		EncryptionKey:                 opt.EncryptionKey,
 314  		EncryptionKeyRotationDuration: opt.EncryptionKeyRotationDuration,
 315  		InMemory:                      opt.InMemory,
 316  	}
 317  
 318  	if db.registry, err = OpenKeyRegistry(krOpt); err != nil {
 319  		return db, err
 320  	}
 321  	db.calculateSize()
 322  	db.closers.updateSize = z.NewCloser(1)
 323  	go db.updateSize(db.closers.updateSize)
 324  
 325  	if err := db.openMemTables(db.opt); err != nil {
 326  		return nil, y.Wrapf(err, "while opening memtables")
 327  	}
 328  
 329  	if !db.opt.ReadOnly {
 330  		if db.mt, err = db.newMemTable(); err != nil {
 331  			return nil, y.Wrapf(err, "cannot create memtable")
 332  		}
 333  	}
 334  
 335  	// newLevelsController potentially loads files in directory.
 336  	if db.lc, err = newLevelsController(db, &manifest); err != nil {
 337  		return db, err
 338  	}
 339  
 340  	// Initialize vlog struct.
 341  	db.vlog.init(db)
 342  
 343  	if !opt.ReadOnly {
 344  		db.closers.compactors = z.NewCloser(1)
 345  		db.lc.startCompact(db.closers.compactors)
 346  
 347  		db.closers.memtable = z.NewCloser(1)
 348  		go func() {
 349  			db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
 350  		}()
 351  		// Flush them to disk asap.
 352  		for _, mt := range db.imm {
 353  			db.flushChan <- mt
 354  		}
 355  	}
 356  	// We do increment nextTxnTs below. So, no need to do it here.
 357  	db.orc.nextTxnTs = db.MaxVersion()
 358  	db.opt.Infof("Set nextTxnTs to %d", db.orc.nextTxnTs)
 359  
 360  	if err = db.vlog.open(db); err != nil {
 361  		return db, y.Wrapf(err, "During db.vlog.open")
 362  	}
 363  
 364  	// Let's advance nextTxnTs to one more than whatever we observed via
 365  	// replaying the logs.
 366  	db.orc.txnMark.Done(db.orc.nextTxnTs)
 367  	// In normal mode, we must update readMark so older versions of keys can be removed during
 368  	// compaction when run in offline mode via the flatten tool.
 369  	db.orc.readMark.Done(db.orc.nextTxnTs)
 370  	db.orc.incrementNextTs()
 371  
 372  	go db.threshold.listenForValueThresholdUpdate()
 373  
 374  	if err := db.initBannedNamespaces(); err != nil {
 375  		return db, fmt.Errorf("While setting banned keys: %w", err)
 376  	}
 377  
 378  	db.closers.writes = z.NewCloser(1)
 379  	go db.doWrites(db.closers.writes)
 380  
 381  	if !db.opt.InMemory {
 382  		db.closers.valueGC = z.NewCloser(1)
 383  		go db.vlog.waitOnGC(db.closers.valueGC)
 384  	}
 385  
 386  	db.closers.pub = z.NewCloser(1)
 387  	go db.pub.listenForUpdates(db.closers.pub)
 388  
 389  	valueDirLockGuard = nil
 390  	dirLockGuard = nil
 391  	manifestFile = nil
 392  	return db, nil
 393  }
 394  
 395  // initBannedNamespaces retrieves the banned namespaces from the DB and updates in-memory structure.
 396  func (db *DB) initBannedNamespaces() error {
 397  	if db.opt.NamespaceOffset < 0 {
 398  		return nil
 399  	}
 400  	return db.View(func(txn *Txn) error {
 401  		iopts := DefaultIteratorOptions
 402  		iopts.Prefix = bannedNsKey
 403  		iopts.PrefetchValues = false
 404  		iopts.InternalAccess = true
 405  		itr := txn.NewIterator(iopts)
 406  		defer itr.Close()
 407  		for itr.Rewind(); itr.Valid(); itr.Next() {
 408  			key := y.BytesToU64(itr.Item().Key()[len(bannedNsKey):])
 409  			db.bannedNamespaces.add(key)
 410  		}
 411  		return nil
 412  	})
 413  }
 414  
 415  func (db *DB) MaxVersion() uint64 {
 416  	var maxVersion uint64
 417  	update := func(a uint64) {
 418  		if a > maxVersion {
 419  			maxVersion = a
 420  		}
 421  	}
 422  	db.lock.Lock()
 423  	// In read only mode, we do not create new mem table.
 424  	if !db.opt.ReadOnly {
 425  		update(db.mt.maxVersion)
 426  	}
 427  	for _, mt := range db.imm {
 428  		update(mt.maxVersion)
 429  	}
 430  	db.lock.Unlock()
 431  	for _, ti := range db.Tables() {
 432  		update(ti.MaxVersion)
 433  	}
 434  	return maxVersion
 435  }
 436  
 437  func (db *DB) monitorCache(c *z.Closer) {
 438  	defer c.Done()
 439  	count := 0
 440  	analyze := func(name string, metrics *ristretto.Metrics) {
 441  		// If the mean life expectancy is less than 10 seconds, the cache
 442  		// might be too small.
 443  		le := metrics.LifeExpectancySeconds()
 444  		if le == nil {
 445  			return
 446  		}
 447  		lifeTooShort := le.Count > 0 && float64(le.Sum)/float64(le.Count) < 10
 448  		hitRatioTooLow := metrics.Ratio() > 0 && metrics.Ratio() < 0.4
 449  		if lifeTooShort && hitRatioTooLow {
 450  			db.opt.Warningf("%s might be too small. Metrics: %s\n", name, metrics)
 451  			db.opt.Warningf("Cache life expectancy (in seconds): %+v\n", le)
 452  
 453  		} else if le.Count > 1000 && count%5 == 0 {
 454  			db.opt.Infof("%s metrics: %s\n", name, metrics)
 455  		}
 456  	}
 457  
 458  	ticker := time.NewTicker(1 * time.Minute)
 459  	defer ticker.Stop()
 460  	for {
 461  		select {
 462  		case <-c.HasBeenClosed():
 463  			return
 464  		case <-ticker.C:
 465  		}
 466  
 467  		analyze("Block cache", db.BlockCacheMetrics())
 468  		analyze("Index cache", db.IndexCacheMetrics())
 469  		count++
 470  	}
 471  }
 472  
 473  // cleanup stops all the goroutines started by badger. This is used in open to
 474  // cleanup goroutines in case of an error.
 475  func (db *DB) cleanup() {
 476  	db.stopMemoryFlush()
 477  	db.stopCompactions()
 478  
 479  	db.blockCache.Close()
 480  	db.indexCache.Close()
 481  	if db.closers.updateSize != nil {
 482  		db.closers.updateSize.Signal()
 483  	}
 484  	if db.closers.valueGC != nil {
 485  		db.closers.valueGC.Signal()
 486  	}
 487  	if db.closers.writes != nil {
 488  		db.closers.writes.Signal()
 489  	}
 490  	if db.closers.pub != nil {
 491  		db.closers.pub.Signal()
 492  	}
 493  
 494  	db.orc.Stop()
 495  
 496  	// Do not use vlog.Close() here. vlog.Close truncates the files. We don't
 497  	// want to truncate files unless the user has specified the truncate flag.
 498  }
 499  
 500  // BlockCacheMetrics returns the metrics for the underlying block cache.
 501  func (db *DB) BlockCacheMetrics() *ristretto.Metrics {
 502  	if db.blockCache != nil {
 503  		return db.blockCache.Metrics
 504  	}
 505  	return nil
 506  }
 507  
 508  // IndexCacheMetrics returns the metrics for the underlying index cache.
 509  func (db *DB) IndexCacheMetrics() *ristretto.Metrics {
 510  	if db.indexCache != nil {
 511  		return db.indexCache.Metrics
 512  	}
 513  	return nil
 514  }
 515  
 516  // Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to
 517  // disk. Calling DB.Close() multiple times would still only close the DB once.
 518  func (db *DB) Close() error {
 519  	var err error
 520  	db.closeOnce.Do(func() {
 521  		err = db.close()
 522  	})
 523  	return err
 524  }
 525  
 526  // IsClosed denotes if the badger DB is closed or not. A DB instance should not
 527  // be used after closing it.
 528  func (db *DB) IsClosed() bool {
 529  	return db.isClosed.Load() == 1
 530  }
 531  
 532  func (db *DB) close() (err error) {
 533  	defer db.allocPool.Release()
 534  
 535  	db.opt.Debugf("Closing database")
 536  	db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs.Load()))
 537  
 538  	db.blockWrites.Store(1)
 539  	db.isClosed.Store(1)
 540  
 541  	if !db.opt.InMemory {
 542  		// Stop value GC first.
 543  		db.closers.valueGC.SignalAndWait()
 544  	}
 545  
 546  	// Stop writes next.
 547  	db.closers.writes.SignalAndWait()
 548  
 549  	// Don't accept any more write.
 550  	close(db.writeCh)
 551  
 552  	db.closers.pub.SignalAndWait()
 553  	db.closers.cacheHealth.Signal()
 554  
 555  	// Make sure that block writer is done pushing stuff into memtable!
 556  	// Otherwise, you will have a race condition: we are trying to flush memtables
 557  	// and remove them completely, while the block / memtable writer is still
 558  	// trying to push stuff into the memtable. This will also resolve the value
 559  	// offset problem: as we push into memtable, we update value offsets there.
 560  	if db.mt != nil {
 561  		if db.mt.sl.Empty() {
 562  			// Remove the memtable if empty.
 563  			db.mt.DecrRef()
 564  		} else {
 565  			db.opt.Debugf("Flushing memtable")
 566  			for {
 567  				pushedMemTable := func() bool {
 568  					db.lock.Lock()
 569  					defer db.lock.Unlock()
 570  					y.AssertTrue(db.mt != nil)
 571  					select {
 572  					case db.flushChan <- db.mt:
 573  						db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
 574  						db.mt = nil                    // Will segfault if we try writing!
 575  						db.opt.Debugf("pushed to flush chan\n")
 576  						return true
 577  					default:
 578  						// If we fail to push, we need to unlock and wait for a short while.
 579  						// The flushing operation needs to update s.imm. Otherwise, we have a
 580  						// deadlock.
 581  						// TODO: Think about how to do this more cleanly, maybe without any locks.
 582  					}
 583  					return false
 584  				}()
 585  				if pushedMemTable {
 586  					break
 587  				}
 588  				time.Sleep(10 * time.Millisecond)
 589  			}
 590  		}
 591  	}
 592  	db.stopMemoryFlush()
 593  	db.stopCompactions()
 594  
 595  	// Force Compact L0
 596  	// We don't need to care about cstatus since no parallel compaction is running.
 597  	if db.opt.CompactL0OnClose {
 598  		err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
 599  		switch err {
 600  		case errFillTables:
 601  			// This error only means that there might be enough tables to do a compaction. So, we
 602  			// should not report it to the end user to avoid confusing them.
 603  		case nil:
 604  			db.opt.Debugf("Force compaction on level 0 done")
 605  		default:
 606  			db.opt.Warningf("While forcing compaction on level 0: %v", err)
 607  		}
 608  	}
 609  
 610  	// Now close the value log.
 611  	if vlogErr := db.vlog.Close(); vlogErr != nil {
 612  		err = y.Wrap(vlogErr, "DB.Close")
 613  	}
 614  
 615  	db.opt.Infof(db.LevelsToString())
 616  	if lcErr := db.lc.close(); err == nil {
 617  		err = y.Wrap(lcErr, "DB.Close")
 618  	}
 619  	db.opt.Debugf("Waiting for closer")
 620  	db.closers.updateSize.SignalAndWait()
 621  	db.orc.Stop()
 622  	db.blockCache.Close()
 623  	db.indexCache.Close()
 624  
 625  	db.threshold.close()
 626  
 627  	if db.opt.InMemory {
 628  		return
 629  	}
 630  
 631  	if db.dirLockGuard != nil {
 632  		if guardErr := db.dirLockGuard.release(); err == nil {
 633  			err = y.Wrap(guardErr, "DB.Close")
 634  		}
 635  	}
 636  	if db.valueDirGuard != nil {
 637  		if guardErr := db.valueDirGuard.release(); err == nil {
 638  			err = y.Wrap(guardErr, "DB.Close")
 639  		}
 640  	}
 641  	if manifestErr := db.manifest.close(); err == nil {
 642  		err = y.Wrap(manifestErr, "DB.Close")
 643  	}
 644  	if registryErr := db.registry.Close(); err == nil {
 645  		err = y.Wrap(registryErr, "DB.Close")
 646  	}
 647  
 648  	// Fsync directories to ensure that lock file, and any other removed files whose directory
 649  	// we haven't specifically fsynced, are guaranteed to have their directory entry removal
 650  	// persisted to disk.
 651  	if syncErr := db.syncDir(db.opt.Dir); err == nil {
 652  		err = y.Wrap(syncErr, "DB.Close")
 653  	}
 654  	if syncErr := db.syncDir(db.opt.ValueDir); err == nil {
 655  		err = y.Wrap(syncErr, "DB.Close")
 656  	}
 657  
 658  	return err
 659  }
 660  
 661  // VerifyChecksum verifies checksum for all tables on all levels.
 662  // This method can be used to verify checksum, if opt.ChecksumVerificationMode is NoVerification.
 663  func (db *DB) VerifyChecksum() error {
 664  	return db.lc.verifyChecksum()
 665  }
 666  
 667  const (
 668  	lockFile = "LOCK"
 669  )
 670  
 671  // Sync syncs database content to disk. This function provides
 672  // more control to user to sync data whenever required.
 673  func (db *DB) Sync() error {
 674  	/**
 675  	Make an attempt to sync both the logs, the active memtable's WAL and the vLog (1847).
 676  	Cases:
 677  	- All_ok			:: If both the logs sync successfully.
 678  
 679  	- Entry_Lost		:: If an entry with a value pointer was present in the active memtable's WAL,
 680  						:: and the WAL was synced but there was an error in syncing the vLog.
 681  						:: The entry will be considered lost and this case will need to be handled during recovery.
 682  
 683  	- Entries_Lost		:: If there were errors in syncing both the logs, multiple entries would be lost.
 684  
 685  	- Entries_Lost      :: If the active memtable's WAL is not synced but the vLog is synced, it will
 686  						:: result in entries being lost because recovery of the active memtable is done from its WAL.
 687  						:: Check `UpdateSkipList` in memtable.go.
 688  
 689  	- Nothing_lost		:: If an entry with its value was present in the active memtable's WAL, and the WAL was synced,
 690  						:: but there was an error in syncing the vLog.
 691  						:: Nothing is lost for this very specific entry because the entry is completely present in the memtable's WAL.
 692  
 693  	- Partially_lost    :: If entries were written partially in either of the logs,
 694  						:: the logs will be truncated during recovery.
 695  						:: As a result of truncation, some entries might be lost.
 696  					    :: Assume that 4KB of data is to be synced and invoking `Sync` results only in syncing 3KB
 697  	                    :: of data and then the machine shuts down or the disk failure happens,
 698  						:: this will result in partial writes. [[This case needs verification]]
 699  	*/
 700  	db.lock.RLock()
 701  	memtableSyncError := db.mt.SyncWAL()
 702  	db.lock.RUnlock()
 703  
 704  	vLogSyncError := db.vlog.sync()
 705  	return y.CombineErrors(memtableSyncError, vLogSyncError)
 706  }
 707  
 708  // getMemtables returns the current memtables and get references.
 709  func (db *DB) getMemTables() ([]*memTable, func()) {
 710  	db.lock.RLock()
 711  	defer db.lock.RUnlock()
 712  
 713  	var tables []*memTable
 714  
 715  	// Mutable memtable does not exist in read-only mode.
 716  	if !db.opt.ReadOnly {
 717  		// Get mutable memtable.
 718  		tables = append(tables, db.mt)
 719  		db.mt.IncrRef()
 720  	}
 721  
 722  	// Get immutable memtables.
 723  	last := len(db.imm) - 1
 724  	for i := range db.imm {
 725  		tables = append(tables, db.imm[last-i])
 726  		db.imm[last-i].IncrRef()
 727  	}
 728  	return tables, func() {
 729  		for _, tbl := range tables {
 730  			tbl.DecrRef()
 731  		}
 732  	}
 733  }
 734  
 735  // get returns the value in memtable or disk for given key.
 736  // Note that value will include meta byte.
 737  //
 738  // IMPORTANT: We should never write an entry with an older timestamp for the same key, We need to
 739  // maintain this invariant to search for the latest value of a key, or else we need to search in all
 740  // tables and find the max version among them.  To maintain this invariant, we also need to ensure
 741  // that all versions of a key are always present in the same table from level 1, because compaction
 742  // can push any table down.
 743  //
 744  // Update(23/09/2020) - We have dropped the move key implementation. Earlier we
 745  // were inserting move keys to fix the invalid value pointers but we no longer
 746  // do that. For every get("fooX") call where X is the version, we will search
 747  // for "fooX" in all the levels of the LSM tree. This is expensive but it
 748  // removes the overhead of handling move keys completely.
 749  func (db *DB) get(key []byte) (y.ValueStruct, error) {
 750  	if db.IsClosed() {
 751  		return y.ValueStruct{}, ErrDBClosed
 752  	}
 753  	tables, decr := db.getMemTables() // Lock should be released.
 754  	defer decr()
 755  
 756  	var maxVs y.ValueStruct
 757  	version := y.ParseTs(key)
 758  
 759  	y.NumGetsAdd(db.opt.MetricsEnabled, 1)
 760  	for i := 0; i < len(tables); i++ {
 761  		vs := tables[i].sl.Get(key)
 762  		y.NumMemtableGetsAdd(db.opt.MetricsEnabled, 1)
 763  		if vs.Meta == 0 && vs.Value == nil {
 764  			continue
 765  		}
 766  		// Found the required version of the key, return immediately.
 767  		if vs.Version == version {
 768  			y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1)
 769  			return vs, nil
 770  		}
 771  		if maxVs.Version < vs.Version {
 772  			maxVs = vs
 773  		}
 774  	}
 775  	return db.lc.get(key, maxVs, 0)
 776  }
 777  
 778  var requestPool = sync.Pool{
 779  	New: func() interface{} {
 780  		return new(request)
 781  	},
 782  }
 783  
 784  func (db *DB) writeToLSM(b *request) error {
 785  	// We should check the length of b.Prts and b.Entries only when badger is not
 786  	// running in InMemory mode. In InMemory mode, we don't write anything to the
 787  	// value log and that's why the length of b.Ptrs will always be zero.
 788  	if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
 789  		return fmt.Errorf("Ptrs and Entries don't match: %+v", b)
 790  	}
 791  
 792  	for i, entry := range b.Entries {
 793  		var err error
 794  		if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
 795  			// Will include deletion / tombstone case.
 796  			err = db.mt.Put(entry.Key,
 797  				y.ValueStruct{
 798  					Value: entry.Value,
 799  					// Ensure value pointer flag is removed. Otherwise, the value will fail
 800  					// to be retrieved during iterator prefetch. `bitValuePointer` is only
 801  					// known to be set in write to LSM when the entry is loaded from a backup
 802  					// with lower ValueThreshold and its value was stored in the value log.
 803  					Meta:      entry.meta &^ bitValuePointer,
 804  					UserMeta:  entry.UserMeta,
 805  					ExpiresAt: entry.ExpiresAt,
 806  				})
 807  		} else {
 808  			// Write pointer to Memtable.
 809  			err = db.mt.Put(entry.Key,
 810  				y.ValueStruct{
 811  					Value:     b.Ptrs[i].Encode(),
 812  					Meta:      entry.meta | bitValuePointer,
 813  					UserMeta:  entry.UserMeta,
 814  					ExpiresAt: entry.ExpiresAt,
 815  				})
 816  		}
 817  		if err != nil {
 818  			return y.Wrapf(err, "while writing to memTable")
 819  		}
 820  	}
 821  	if db.opt.SyncWrites {
 822  		return db.mt.SyncWAL()
 823  	}
 824  	return nil
 825  }
 826  
 827  // writeRequests is called serially by only one goroutine.
 828  func (db *DB) writeRequests(reqs []*request) error {
 829  	if len(reqs) == 0 {
 830  		return nil
 831  	}
 832  
 833  	done := func(err error) {
 834  		for _, r := range reqs {
 835  			r.Err = err
 836  			r.Wg.Done()
 837  		}
 838  	}
 839  	db.opt.Debugf("writeRequests called. Writing to value log")
 840  	err := db.vlog.write(reqs)
 841  	if err != nil {
 842  		done(err)
 843  		return err
 844  	}
 845  
 846  	db.opt.Debugf("Writing to memtable")
 847  	var count int
 848  	for _, b := range reqs {
 849  		if len(b.Entries) == 0 {
 850  			continue
 851  		}
 852  		count += len(b.Entries)
 853  		var i uint64
 854  		var err error
 855  		for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
 856  			i++
 857  			if i%100 == 0 {
 858  				db.opt.Debugf("Making room for writes")
 859  			}
 860  			// We need to poll a bit because both hasRoomForWrite and the flusher need access to s.imm.
 861  			// When flushChan is full and you are blocked there, and the flusher is trying to update s.imm,
 862  			// you will get a deadlock.
 863  			time.Sleep(10 * time.Millisecond)
 864  		}
 865  		if err != nil {
 866  			done(err)
 867  			return y.Wrap(err, "writeRequests")
 868  		}
 869  		if err := db.writeToLSM(b); err != nil {
 870  			done(err)
 871  			return y.Wrap(err, "writeRequests")
 872  		}
 873  	}
 874  
 875  	db.opt.Debugf("Sending updates to subscribers")
 876  	db.pub.sendUpdates(reqs)
 877  
 878  	done(nil)
 879  	db.opt.Debugf("%d entries written", count)
 880  	return nil
 881  }
 882  
 883  func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
 884  	if db.blockWrites.Load() == 1 {
 885  		return nil, ErrBlockedWrites
 886  	}
 887  	var count, size int64
 888  	for _, e := range entries {
 889  		size += e.estimateSizeAndSetThreshold(db.valueThreshold())
 890  		count++
 891  	}
 892  	y.NumBytesWrittenUserAdd(db.opt.MetricsEnabled, size)
 893  	if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
 894  		return nil, ErrTxnTooBig
 895  	}
 896  
 897  	// We can only service one request because we need each txn to be stored in a contiguous section.
 898  	// Txns should not interleave among other txns or rewrites.
 899  	req := requestPool.Get().(*request)
 900  	req.reset()
 901  	req.Entries = entries
 902  	req.Wg.Add(1)
 903  	req.IncrRef()     // for db write
 904  	db.writeCh <- req // Handled in doWrites.
 905  	y.NumPutsAdd(db.opt.MetricsEnabled, int64(len(entries)))
 906  
 907  	return req, nil
 908  }
 909  
 910  func (db *DB) doWrites(lc *z.Closer) {
 911  	defer lc.Done()
 912  	pendingCh := make(chan struct{}, 1)
 913  
 914  	writeRequests := func(reqs []*request) {
 915  		if err := db.writeRequests(reqs); err != nil {
 916  			db.opt.Errorf("writeRequests: %v", err)
 917  		}
 918  		<-pendingCh
 919  	}
 920  
 921  	// This variable tracks the number of pending writes.
 922  	reqLen := new(expvar.Int)
 923  	y.PendingWritesSet(db.opt.MetricsEnabled, db.opt.Dir, reqLen)
 924  
 925  	reqs := make([]*request, 0, 10)
 926  	for {
 927  		var r *request
 928  		select {
 929  		case r = <-db.writeCh:
 930  		case <-lc.HasBeenClosed():
 931  			goto closedCase
 932  		}
 933  
 934  		for {
 935  			reqs = append(reqs, r)
 936  			reqLen.Set(int64(len(reqs)))
 937  
 938  			if len(reqs) >= 3*kvWriteChCapacity {
 939  				pendingCh <- struct{}{} // blocking.
 940  				goto writeCase
 941  			}
 942  
 943  			select {
 944  			// Either push to pending, or continue to pick from writeCh.
 945  			case r = <-db.writeCh:
 946  			case pendingCh <- struct{}{}:
 947  				goto writeCase
 948  			case <-lc.HasBeenClosed():
 949  				goto closedCase
 950  			}
 951  		}
 952  
 953  	closedCase:
 954  		// All the pending request are drained.
 955  		// Don't close the writeCh, because it has be used in several places.
 956  		for {
 957  			select {
 958  			case r = <-db.writeCh:
 959  				reqs = append(reqs, r)
 960  			default:
 961  				pendingCh <- struct{}{} // Push to pending before doing a write.
 962  				writeRequests(reqs)
 963  				return
 964  			}
 965  		}
 966  
 967  	writeCase:
 968  		go writeRequests(reqs)
 969  		reqs = make([]*request, 0, 10)
 970  		reqLen.Set(0)
 971  	}
 972  }
 973  
 974  // batchSet applies a list of badger.Entry. If a request level error occurs it
 975  // will be returned.
 976  //
 977  //	Check(kv.BatchSet(entries))
 978  func (db *DB) batchSet(entries []*Entry) error {
 979  	req, err := db.sendToWriteCh(entries)
 980  	if err != nil {
 981  		return err
 982  	}
 983  
 984  	return req.Wait()
 985  }
 986  
 987  // batchSetAsync is the asynchronous version of batchSet. It accepts a callback
 988  // function which is called when all the sets are complete. If a request level
 989  // error occurs, it will be passed back via the callback.
 990  //
 991  //	err := kv.BatchSetAsync(entries, func(err error)) {
 992  //	   Check(err)
 993  //	}
 994  func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
 995  	req, err := db.sendToWriteCh(entries)
 996  	if err != nil {
 997  		return err
 998  	}
 999  	go func() {
1000  		err := req.Wait()
1001  		// Write is complete. Let's call the callback function now.
1002  		f(err)
1003  	}()
1004  	return nil
1005  }
1006  
1007  var errNoRoom = errors.New("No room for write")
1008  
1009  // ensureRoomForWrite is always called serially.
1010  func (db *DB) ensureRoomForWrite() error {
1011  	var err error
1012  	db.lock.Lock()
1013  	defer db.lock.Unlock()
1014  
1015  	y.AssertTrue(db.mt != nil) // A nil mt indicates that DB is being closed.
1016  	if !db.mt.isFull() {
1017  		return nil
1018  	}
1019  
1020  	select {
1021  	case db.flushChan <- db.mt:
1022  		db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
1023  			db.mt.sl.MemSize(), len(db.flushChan))
1024  		// We manage to push this task. Let's modify imm.
1025  		db.imm = append(db.imm, db.mt)
1026  		db.mt, err = db.newMemTable()
1027  		if err != nil {
1028  			return y.Wrapf(err, "cannot create new mem table")
1029  		}
1030  		// New memtable is empty. We certainly have room.
1031  		return nil
1032  	default:
1033  		// We need to do this to unlock and allow the flusher to modify imm.
1034  		return errNoRoom
1035  	}
1036  }
1037  
1038  func arenaSize(opt Options) int64 {
1039  	return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
1040  }
1041  
1042  // buildL0Table builds a new table from the memtable.
1043  func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *table.Builder {
1044  	defer iter.Close()
1045  
1046  	b := table.NewTableBuilder(bopts)
1047  	for iter.Rewind(); iter.Valid(); iter.Next() {
1048  		if len(dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), dropPrefixes) {
1049  			continue
1050  		}
1051  		vs := iter.Value()
1052  		var vp valuePointer
1053  		if vs.Meta&bitValuePointer > 0 {
1054  			vp.Decode(vs.Value)
1055  		}
1056  		b.Add(iter.Key(), iter.Value(), vp.Len)
1057  	}
1058  
1059  	return b
1060  }
1061  
1062  // handleMemTableFlush must be run serially.
1063  func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error {
1064  	bopts := buildTableOptions(db)
1065  	itr := mt.sl.NewUniIterator(false)
1066  	builder := buildL0Table(itr, nil, bopts)
1067  	defer builder.Close()
1068  
1069  	// buildL0Table can return nil if the none of the items in the skiplist are
1070  	// added to the builder. This can happen when drop prefix is set and all
1071  	// the items are skipped.
1072  	if builder.Empty() {
1073  		builder.Finish()
1074  		return nil
1075  	}
1076  
1077  	fileID := db.lc.reserveFileID()
1078  	var tbl *table.Table
1079  	var err error
1080  	if db.opt.InMemory {
1081  		data := builder.Finish()
1082  		tbl, err = table.OpenInMemoryTable(data, fileID, &bopts)
1083  	} else {
1084  		tbl, err = table.CreateTable(table.NewFilename(fileID, db.opt.Dir), builder)
1085  	}
1086  	if err != nil {
1087  		return y.Wrap(err, "error while creating table")
1088  	}
1089  	// We own a ref on tbl.
1090  	err = db.lc.addLevel0Table(tbl) // This will incrRef
1091  	_ = tbl.DecrRef()               // Releases our ref.
1092  	return err
1093  }
1094  
1095  // flushMemtable must keep running until we send it an empty memtable. If there
1096  // are errors during handling the memtable flush, we'll retry indefinitely.
1097  func (db *DB) flushMemtable(lc *z.Closer) {
1098  	defer lc.Done()
1099  
1100  	for mt := range db.flushChan {
1101  		if mt == nil {
1102  			continue
1103  		}
1104  
1105  		for {
1106  			if err := db.handleMemTableFlush(mt, nil); err != nil {
1107  				// Encountered error. Retry indefinitely.
1108  				db.opt.Errorf("error flushing memtable to disk: %v, retrying", err)
1109  				time.Sleep(time.Second)
1110  				continue
1111  			}
1112  
1113  			// Update s.imm. Need a lock.
1114  			db.lock.Lock()
1115  			// This is a single-threaded operation. mt corresponds to the head of
1116  			// db.imm list. Once we flush it, we advance db.imm. The next mt
1117  			// which would arrive here would match db.imm[0], because we acquire a
1118  			// lock over DB when pushing to flushChan.
1119  			// TODO: This logic is dirty AF. Any change and this could easily break.
1120  			y.AssertTrue(mt == db.imm[0])
1121  			db.imm = db.imm[1:]
1122  			mt.DecrRef() // Return memory.
1123  			// unlock
1124  			db.lock.Unlock()
1125  			break
1126  		}
1127  	}
1128  }
1129  
1130  func exists(path string) (bool, error) {
1131  	_, err := os.Stat(path)
1132  	if err == nil {
1133  		return true, nil
1134  	}
1135  	if os.IsNotExist(err) {
1136  		return false, nil
1137  	}
1138  	return true, err
1139  }
1140  
1141  // This function does a filewalk, calculates the size of vlog and sst files and stores it in
1142  // y.LSMSize and y.VlogSize.
1143  func (db *DB) calculateSize() {
1144  	if db.opt.InMemory {
1145  		return
1146  	}
1147  	newInt := func(val int64) *expvar.Int {
1148  		v := new(expvar.Int)
1149  		v.Add(val)
1150  		return v
1151  	}
1152  
1153  	totalSize := func(dir string) (int64, int64) {
1154  		var lsmSize, vlogSize int64
1155  		err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
1156  			if err != nil {
1157  				return err
1158  			}
1159  			ext := filepath.Ext(path)
1160  			switch ext {
1161  			case ".sst":
1162  				lsmSize += info.Size()
1163  			case ".vlog":
1164  				vlogSize += info.Size()
1165  			}
1166  			return nil
1167  		})
1168  		if err != nil {
1169  			db.opt.Debugf("Got error while calculating total size of directory: %s", dir)
1170  		}
1171  		return lsmSize, vlogSize
1172  	}
1173  
1174  	lsmSize, vlogSize := totalSize(db.opt.Dir)
1175  	y.LSMSizeSet(db.opt.MetricsEnabled, db.opt.Dir, newInt(lsmSize))
1176  	// If valueDir is different from dir, we'd have to do another walk.
1177  	if db.opt.ValueDir != db.opt.Dir {
1178  		_, vlogSize = totalSize(db.opt.ValueDir)
1179  	}
1180  	y.VlogSizeSet(db.opt.MetricsEnabled, db.opt.ValueDir, newInt(vlogSize))
1181  }
1182  
1183  func (db *DB) updateSize(lc *z.Closer) {
1184  	defer lc.Done()
1185  	if db.opt.InMemory {
1186  		return
1187  	}
1188  
1189  	metricsTicker := time.NewTicker(time.Minute)
1190  	defer metricsTicker.Stop()
1191  
1192  	for {
1193  		select {
1194  		case <-metricsTicker.C:
1195  			db.calculateSize()
1196  		case <-lc.HasBeenClosed():
1197  			return
1198  		}
1199  	}
1200  }
1201  
1202  // RunValueLogGC triggers a value log garbage collection.
1203  //
1204  // It picks value log files to perform GC based on statistics that are collected
1205  // during compactions.  If no such statistics are available, then log files are
1206  // picked in random order. The process stops as soon as the first log file is
1207  // encountered which does not result in garbage collection.
1208  //
1209  // When a log file is picked, it is first sampled. If the sample shows that we
1210  // can discard at least discardRatio space of that file, it would be rewritten.
1211  //
1212  // If a call to RunValueLogGC results in no rewrites, then an ErrNoRewrite is
1213  // thrown indicating that the call resulted in no file rewrites.
1214  //
1215  // We recommend setting discardRatio to 0.5, thus indicating that a file be
1216  // rewritten if half the space can be discarded.  This results in a lifetime
1217  // value log write amplification of 2 (1 from original write + 0.5 rewrite +
1218  // 0.25 + 0.125 + ... = 2). Setting it to higher value would result in fewer
1219  // space reclaims, while setting it to a lower value would result in more space
1220  // reclaims at the cost of increased activity on the LSM tree. discardRatio
1221  // must be in the range (0.0, 1.0), both endpoints excluded, otherwise an
1222  // ErrInvalidRequest is returned.
1223  //
1224  // Only one GC is allowed at a time. If another value log GC is running, or DB
1225  // has been closed, this would return an ErrRejected.
1226  //
1227  // Note: Every time GC is run, it would produce a spike of activity on the LSM
1228  // tree.
1229  func (db *DB) RunValueLogGC(discardRatio float64) error {
1230  	if db.opt.InMemory {
1231  		return ErrGCInMemoryMode
1232  	}
1233  	if discardRatio >= 1.0 || discardRatio <= 0.0 {
1234  		return ErrInvalidRequest
1235  	}
1236  
1237  	// Pick a log file and run GC
1238  	return db.vlog.runGC(discardRatio)
1239  }
1240  
1241  // Size returns the size of lsm and value log files in bytes. It can be used to decide how often to
1242  // call RunValueLogGC.
1243  func (db *DB) Size() (lsm, vlog int64) {
1244  	if y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir) == nil {
1245  		lsm, vlog = 0, 0
1246  		return
1247  	}
1248  	lsm = y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir).(*expvar.Int).Value()
1249  	vlog = y.VlogSizeGet(db.opt.MetricsEnabled, db.opt.ValueDir).(*expvar.Int).Value()
1250  	return
1251  }
1252  
1253  // Sequence represents a Badger sequence.
1254  type Sequence struct {
1255  	lock      sync.Mutex
1256  	db        *DB
1257  	key       []byte
1258  	next      uint64
1259  	leased    uint64
1260  	bandwidth uint64
1261  }
1262  
1263  // Next would return the next integer in the sequence, updating the lease by running a transaction
1264  // if needed.
1265  func (seq *Sequence) Next() (uint64, error) {
1266  	seq.lock.Lock()
1267  	defer seq.lock.Unlock()
1268  	if seq.next >= seq.leased {
1269  		if err := seq.updateLease(); err != nil {
1270  			return 0, err
1271  		}
1272  	}
1273  	val := seq.next
1274  	seq.next++
1275  	return val, nil
1276  }
1277  
1278  // Release the leased sequence to avoid wasted integers. This should be done right
1279  // before closing the associated DB. However it is valid to use the sequence after
1280  // it was released, causing a new lease with full bandwidth.
1281  func (seq *Sequence) Release() error {
1282  	seq.lock.Lock()
1283  	defer seq.lock.Unlock()
1284  	err := seq.db.Update(func(txn *Txn) error {
1285  		item, err := txn.Get(seq.key)
1286  		if err != nil {
1287  			return err
1288  		}
1289  
1290  		var num uint64
1291  		if err := item.Value(func(v []byte) error {
1292  			num = binary.BigEndian.Uint64(v)
1293  			return nil
1294  		}); err != nil {
1295  			return err
1296  		}
1297  
1298  		if num == seq.leased {
1299  			var buf [8]byte
1300  			binary.BigEndian.PutUint64(buf[:], seq.next)
1301  			return txn.SetEntry(NewEntry(seq.key, buf[:]))
1302  		}
1303  
1304  		return nil
1305  	})
1306  	if err != nil {
1307  		return err
1308  	}
1309  	seq.leased = seq.next
1310  	return nil
1311  }
1312  
1313  func (seq *Sequence) updateLease() error {
1314  	return seq.db.Update(func(txn *Txn) error {
1315  		item, err := txn.Get(seq.key)
1316  		switch {
1317  		case err == ErrKeyNotFound:
1318  			seq.next = 0
1319  		case err != nil:
1320  			return err
1321  		default:
1322  			var num uint64
1323  			if err := item.Value(func(v []byte) error {
1324  				num = binary.BigEndian.Uint64(v)
1325  				return nil
1326  			}); err != nil {
1327  				return err
1328  			}
1329  			seq.next = num
1330  		}
1331  
1332  		lease := seq.next + seq.bandwidth
1333  		var buf [8]byte
1334  		binary.BigEndian.PutUint64(buf[:], lease)
1335  		if err = txn.SetEntry(NewEntry(seq.key, buf[:])); err != nil {
1336  			return err
1337  		}
1338  		seq.leased = lease
1339  		return nil
1340  	})
1341  }
1342  
1343  // GetSequence would initiate a new sequence object, generating it from the stored lease, if
1344  // available, in the database. Sequence can be used to get a list of monotonically increasing
1345  // integers. Multiple sequences can be created by providing different keys. Bandwidth sets the
1346  // size of the lease, determining how many Next() requests can be served from memory.
1347  //
1348  // GetSequence is not supported on ManagedDB. Calling this would result in a panic.
1349  func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) {
1350  	if db.opt.managedTxns {
1351  		panic("Cannot use GetSequence with managedDB=true.")
1352  	}
1353  
1354  	switch {
1355  	case len(key) == 0:
1356  		return nil, ErrEmptyKey
1357  	case bandwidth == 0:
1358  		return nil, ErrZeroBandwidth
1359  	}
1360  	seq := &Sequence{
1361  		db:        db,
1362  		key:       key,
1363  		next:      0,
1364  		leased:    0,
1365  		bandwidth: bandwidth,
1366  	}
1367  	err := seq.updateLease()
1368  	return seq, err
1369  }
1370  
1371  // Tables gets the TableInfo objects from the level controller. If withKeysCount
1372  // is true, TableInfo objects also contain counts of keys for the tables.
1373  func (db *DB) Tables() []TableInfo {
1374  	return db.lc.getTableInfo()
1375  }
1376  
1377  // Levels gets the LevelInfo.
1378  func (db *DB) Levels() []LevelInfo {
1379  	return db.lc.getLevelInfo()
1380  }
1381  
1382  // EstimateSize can be used to get rough estimate of data size for a given prefix.
1383  func (db *DB) EstimateSize(prefix []byte) (uint64, uint64) {
1384  	var onDiskSize, uncompressedSize uint64
1385  	tables := db.Tables()
1386  	for _, ti := range tables {
1387  		if bytes.HasPrefix(ti.Left, prefix) && bytes.HasPrefix(ti.Right, prefix) {
1388  			onDiskSize += uint64(ti.OnDiskSize)
1389  			uncompressedSize += uint64(ti.UncompressedSize)
1390  		}
1391  	}
1392  	return onDiskSize, uncompressedSize
1393  }
1394  
1395  // Ranges can be used to get rough key ranges to divide up iteration over the DB. The ranges here
1396  // would consider the prefix, but would not necessarily start or end with the prefix. In fact, the
1397  // first range would have nil as left key, and the last range would have nil as the right key.
1398  func (db *DB) Ranges(prefix []byte, numRanges int) []*keyRange {
1399  	var splits []string
1400  	tables := db.Tables()
1401  
1402  	// We just want table ranges here and not keys count.
1403  	for _, ti := range tables {
1404  		// We don't use ti.Left, because that has a tendency to store !badger keys. Skip over tables
1405  		// at upper levels. Only choose tables from the last level.
1406  		if ti.Level != db.opt.MaxLevels-1 {
1407  			continue
1408  		}
1409  		if bytes.HasPrefix(ti.Right, prefix) {
1410  			splits = append(splits, string(ti.Right))
1411  		}
1412  	}
1413  
1414  	// If the number of splits is low, look at the offsets inside the
1415  	// tables to generate more splits.
1416  	if len(splits) < 32 {
1417  		numTables := len(tables)
1418  		if numTables == 0 {
1419  			numTables = 1
1420  		}
1421  		numPerTable := 32 / numTables
1422  		if numPerTable == 0 {
1423  			numPerTable = 1
1424  		}
1425  		splits = db.lc.keySplits(numPerTable, prefix)
1426  	}
1427  
1428  	// If the number of splits is still < 32, then look at the memtables.
1429  	if len(splits) < 32 {
1430  		maxPerSplit := 10000
1431  		mtSplits := func(mt *memTable) {
1432  			if mt == nil {
1433  				return
1434  			}
1435  			count := 0
1436  			iter := mt.sl.NewIterator()
1437  			for iter.SeekToFirst(); iter.Valid(); iter.Next() {
1438  				if count%maxPerSplit == 0 {
1439  					// Add a split every maxPerSplit keys.
1440  					if bytes.HasPrefix(iter.Key(), prefix) {
1441  						splits = append(splits, string(iter.Key()))
1442  					}
1443  				}
1444  				count += 1
1445  			}
1446  			_ = iter.Close()
1447  		}
1448  
1449  		db.lock.Lock()
1450  		defer db.lock.Unlock()
1451  		var memTables []*memTable
1452  		memTables = append(memTables, db.imm...)
1453  		for _, mt := range memTables {
1454  			mtSplits(mt)
1455  		}
1456  		mtSplits(db.mt)
1457  	}
1458  
1459  	// We have our splits now. Let's convert them to ranges.
1460  	sort.Strings(splits)
1461  	var ranges []*keyRange
1462  	var start []byte
1463  	for _, key := range splits {
1464  		ranges = append(ranges, &keyRange{left: start, right: y.SafeCopy(nil, []byte(key))})
1465  		start = y.SafeCopy(nil, []byte(key))
1466  	}
1467  	ranges = append(ranges, &keyRange{left: start})
1468  
1469  	// Figure out the approximate table size this range has to deal with.
1470  	for _, t := range tables {
1471  		tr := keyRange{left: t.Left, right: t.Right}
1472  		for _, r := range ranges {
1473  			if len(r.left) == 0 || len(r.right) == 0 {
1474  				continue
1475  			}
1476  			if r.overlapsWith(tr) {
1477  				r.size += int64(t.UncompressedSize)
1478  			}
1479  		}
1480  	}
1481  
1482  	var total int64
1483  	for _, r := range ranges {
1484  		total += r.size
1485  	}
1486  	if total == 0 {
1487  		return ranges
1488  	}
1489  	// Figure out the average size, so we know how to bin the ranges together.
1490  	avg := total / int64(numRanges)
1491  
1492  	var out []*keyRange
1493  	var i int
1494  	for i < len(ranges) {
1495  		r := ranges[i]
1496  		cur := &keyRange{left: r.left, size: r.size, right: r.right}
1497  		i++
1498  		for ; i < len(ranges); i++ {
1499  			next := ranges[i]
1500  			if cur.size+next.size > avg {
1501  				break
1502  			}
1503  			cur.right = next.right
1504  			cur.size += next.size
1505  		}
1506  		out = append(out, cur)
1507  	}
1508  	return out
1509  }
1510  
1511  // MaxBatchCount returns max possible entries in batch
1512  func (db *DB) MaxBatchCount() int64 {
1513  	return db.opt.maxBatchCount
1514  }
1515  
1516  // MaxBatchSize returns max possible batch size
1517  func (db *DB) MaxBatchSize() int64 {
1518  	return db.opt.maxBatchSize
1519  }
1520  
1521  func (db *DB) stopMemoryFlush() {
1522  	// Stop memtable flushes.
1523  	if db.closers.memtable != nil {
1524  		close(db.flushChan)
1525  		db.closers.memtable.SignalAndWait()
1526  	}
1527  }
1528  
1529  func (db *DB) stopCompactions() {
1530  	// Stop compactions.
1531  	if db.closers.compactors != nil {
1532  		db.closers.compactors.SignalAndWait()
1533  	}
1534  }
1535  
1536  func (db *DB) startCompactions() {
1537  	// Resume compactions.
1538  	if db.closers.compactors != nil {
1539  		db.closers.compactors = z.NewCloser(1)
1540  		db.lc.startCompact(db.closers.compactors)
1541  	}
1542  }
1543  
1544  func (db *DB) startMemoryFlush() {
1545  	// Start memory fluhser.
1546  	if db.closers.memtable != nil {
1547  		db.flushChan = make(chan *memTable, db.opt.NumMemtables)
1548  		db.closers.memtable = z.NewCloser(1)
1549  		go func() {
1550  			db.flushMemtable(db.closers.memtable)
1551  		}()
1552  	}
1553  }
1554  
1555  // Flatten can be used to force compactions on the LSM tree so all the tables fall on the same
1556  // level. This ensures that all the versions of keys are colocated and not split across multiple
1557  // levels, which is necessary after a restore from backup. During Flatten, live compactions are
1558  // stopped. Ideally, no writes are going on during Flatten. Otherwise, it would create competition
1559  // between flattening the tree and new tables being created at level zero.
1560  func (db *DB) Flatten(workers int) error {
1561  
1562  	db.stopCompactions()
1563  	defer db.startCompactions()
1564  
1565  	compactAway := func(cp compactionPriority) error {
1566  		db.opt.Infof("Attempting to compact with %+v\n", cp)
1567  		errCh := make(chan error, 1)
1568  		for i := 0; i < workers; i++ {
1569  			go func() {
1570  				errCh <- db.lc.doCompact(175, cp)
1571  			}()
1572  		}
1573  		var success int
1574  		var rerr error
1575  		for i := 0; i < workers; i++ {
1576  			err := <-errCh
1577  			if err != nil {
1578  				rerr = err
1579  				db.opt.Warningf("While running doCompact with %+v. Error: %v\n", cp, err)
1580  			} else {
1581  				success++
1582  			}
1583  		}
1584  		if success == 0 {
1585  			return rerr
1586  		}
1587  		// We could do at least one successful compaction. So, we'll consider this a success.
1588  		db.opt.Infof("%d compactor(s) succeeded. One or more tables from level %d compacted.\n",
1589  			success, cp.level)
1590  		return nil
1591  	}
1592  
1593  	hbytes := func(sz int64) string {
1594  		return humanize.IBytes(uint64(sz))
1595  	}
1596  
1597  	t := db.lc.levelTargets()
1598  	for {
1599  		db.opt.Infof("\n")
1600  		var levels []int
1601  		for i, l := range db.lc.levels {
1602  			sz := l.getTotalSize()
1603  			db.opt.Infof("Level: %d. %8s Size. %8s Max.\n",
1604  				i, hbytes(l.getTotalSize()), hbytes(t.targetSz[i]))
1605  			if sz > 0 {
1606  				levels = append(levels, i)
1607  			}
1608  		}
1609  		if len(levels) <= 1 {
1610  			prios := db.lc.pickCompactLevels(nil)
1611  			if len(prios) == 0 || prios[0].score <= 1.0 {
1612  				db.opt.Infof("All tables consolidated into one level. Flattening done.\n")
1613  				return nil
1614  			}
1615  			if err := compactAway(prios[0]); err != nil {
1616  				return err
1617  			}
1618  			continue
1619  		}
1620  		// Create an artificial compaction priority, to ensure that we compact the level.
1621  		cp := compactionPriority{level: levels[0], score: 1.71}
1622  		if err := compactAway(cp); err != nil {
1623  			return err
1624  		}
1625  	}
1626  }
1627  
1628  func (db *DB) blockWrite() error {
1629  	// Stop accepting new writes.
1630  	if !db.blockWrites.CompareAndSwap(0, 1) {
1631  		return ErrBlockedWrites
1632  	}
1633  
1634  	// Make all pending writes finish. The following will also close writeCh.
1635  	db.closers.writes.SignalAndWait()
1636  	db.opt.Infof("Writes flushed. Stopping compactions now...")
1637  	return nil
1638  }
1639  
1640  func (db *DB) unblockWrite() {
1641  	db.closers.writes = z.NewCloser(1)
1642  	go db.doWrites(db.closers.writes)
1643  
1644  	// Resume writes.
1645  	db.blockWrites.Store(0)
1646  }
1647  
1648  func (db *DB) prepareToDrop() (func(), error) {
1649  	if db.opt.ReadOnly {
1650  		panic("Attempting to drop data in read-only mode.")
1651  	}
1652  	// In order prepare for drop, we need to block the incoming writes and
1653  	// write it to db. Then, flush all the pending memtable. So that, we
1654  	// don't miss any entries.
1655  	if err := db.blockWrite(); err != nil {
1656  		return func() {}, err
1657  	}
1658  	reqs := make([]*request, 0, 10)
1659  	for {
1660  		select {
1661  		case r := <-db.writeCh:
1662  			reqs = append(reqs, r)
1663  		default:
1664  			if err := db.writeRequests(reqs); err != nil {
1665  				db.opt.Errorf("writeRequests: %v", err)
1666  			}
1667  			db.stopMemoryFlush()
1668  			return func() {
1669  				db.opt.Infof("Resuming writes")
1670  				db.startMemoryFlush()
1671  				db.unblockWrite()
1672  			}, nil
1673  		}
1674  	}
1675  }
1676  
1677  // DropAll would drop all the data stored in Badger. It does this in the following way.
1678  // - Stop accepting new writes.
1679  // - Pause memtable flushes and compactions.
1680  // - Pick all tables from all levels, create a changeset to delete all these
1681  // tables and apply it to manifest.
1682  // - Pick all log files from value log, and delete all of them. Restart value log files from zero.
1683  // - Resume memtable flushes and compactions.
1684  //
1685  // NOTE: DropAll is resilient to concurrent writes, but not to reads. It is up to the user to not do
1686  // any reads while DropAll is going on, otherwise they may result in panics. Ideally, both reads and
1687  // writes are paused before running DropAll, and resumed after it is finished.
1688  func (db *DB) DropAll() error {
1689  	f, err := db.dropAll()
1690  	if f != nil {
1691  		f()
1692  	}
1693  	return err
1694  }
1695  
1696  func (db *DB) dropAll() (func(), error) {
1697  	db.opt.Infof("DropAll called. Blocking writes...")
1698  	f, err := db.prepareToDrop()
1699  	if err != nil {
1700  		return f, err
1701  	}
1702  	// prepareToDrop will stop all the incoming write and flushes any pending memtables.
1703  	// Before we drop, we'll stop the compaction because anyways all the datas are going to
1704  	// be deleted.
1705  	db.stopCompactions()
1706  	resume := func() {
1707  		db.startCompactions()
1708  		f()
1709  	}
1710  	// Block all foreign interactions with memory tables.
1711  	db.lock.Lock()
1712  	defer db.lock.Unlock()
1713  
1714  	// Remove inmemory tables. Calling DecrRef for safety. Not sure if they're absolutely needed.
1715  	db.mt.DecrRef()
1716  	for _, mt := range db.imm {
1717  		mt.DecrRef()
1718  	}
1719  	db.imm = db.imm[:0]
1720  	db.mt, err = db.newMemTable() // Set it up for future writes.
1721  	if err != nil {
1722  		return resume, y.Wrapf(err, "cannot open new memtable")
1723  	}
1724  
1725  	num, err := db.lc.dropTree()
1726  	if err != nil {
1727  		return resume, err
1728  	}
1729  	db.opt.Infof("Deleted %d SSTables. Now deleting value logs...\n", num)
1730  
1731  	num, err = db.vlog.dropAll()
1732  	if err != nil {
1733  		return resume, err
1734  	}
1735  	db.lc.nextFileID.Store(1)
1736  	db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
1737  	db.blockCache.Clear()
1738  	db.indexCache.Clear()
1739  	db.threshold.Clear(db.opt)
1740  	return resume, nil
1741  }
1742  
1743  // DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
1744  //   - Stop accepting new writes.
1745  //   - Stop memtable flushes before acquiring lock. Because we're acquiring lock here
1746  //     and memtable flush stalls for lock, which leads to deadlock
1747  //   - Flush out all memtables, skipping over keys with the given prefix, Kp.
1748  //   - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp
1749  //     back after a restart.
1750  //   - Stop compaction.
1751  //   - Compact L0->L1, skipping over Kp.
1752  //   - Compact rest of the levels, Li->Li, picking tables which have Kp.
1753  //   - Resume memtable flushes, compactions and writes.
1754  func (db *DB) DropPrefix(prefixes ...[]byte) error {
1755  	if len(prefixes) == 0 {
1756  		return nil
1757  	}
1758  	db.opt.Infof("DropPrefix called for %s", prefixes)
1759  	f, err := db.prepareToDrop()
1760  	if err != nil {
1761  		return err
1762  	}
1763  	defer f()
1764  
1765  	var filtered [][]byte
1766  	if filtered, err = db.filterPrefixesToDrop(prefixes); err != nil {
1767  		return err
1768  	}
1769  	// If there is no prefix for which the data already exist, do not do anything.
1770  	if len(filtered) == 0 {
1771  		db.opt.Infof("No prefixes to drop")
1772  		return nil
1773  	}
1774  	// Block all foreign interactions with memory tables.
1775  	db.lock.Lock()
1776  	defer db.lock.Unlock()
1777  
1778  	db.imm = append(db.imm, db.mt)
1779  	for _, memtable := range db.imm {
1780  		if memtable.sl.Empty() {
1781  			memtable.DecrRef()
1782  			continue
1783  		}
1784  		db.opt.Debugf("Flushing memtable")
1785  		if err := db.handleMemTableFlush(memtable, filtered); err != nil {
1786  			db.opt.Errorf("While trying to flush memtable: %v", err)
1787  			return err
1788  		}
1789  		memtable.DecrRef()
1790  	}
1791  	db.stopCompactions()
1792  	defer db.startCompactions()
1793  	db.imm = db.imm[:0]
1794  	db.mt, err = db.newMemTable()
1795  	if err != nil {
1796  		return y.Wrapf(err, "cannot create new mem table")
1797  	}
1798  
1799  	// Drop prefixes from the levels.
1800  	if err := db.lc.dropPrefixes(filtered); err != nil {
1801  		return err
1802  	}
1803  	db.opt.Infof("DropPrefix done")
1804  	return nil
1805  }
1806  
1807  func (db *DB) filterPrefixesToDrop(prefixes [][]byte) ([][]byte, error) {
1808  	var filtered [][]byte
1809  	for _, prefix := range prefixes {
1810  		err := db.View(func(txn *Txn) error {
1811  			iopts := DefaultIteratorOptions
1812  			iopts.Prefix = prefix
1813  			iopts.PrefetchValues = false
1814  			itr := txn.NewIterator(iopts)
1815  			defer itr.Close()
1816  			itr.Rewind()
1817  			if itr.ValidForPrefix(prefix) {
1818  				filtered = append(filtered, prefix)
1819  			}
1820  			return nil
1821  		})
1822  		if err != nil {
1823  			return filtered, err
1824  		}
1825  	}
1826  	return filtered, nil
1827  }
1828  
1829  // Checks if the key is banned. Returns the respective error if the key belongs to any of the banned
1830  // namepspaces. Else it returns nil.
1831  func (db *DB) isBanned(key []byte) error {
1832  	if db.opt.NamespaceOffset < 0 {
1833  		return nil
1834  	}
1835  	if len(key) <= db.opt.NamespaceOffset+8 {
1836  		return nil
1837  	}
1838  	if db.bannedNamespaces.has(y.BytesToU64(key[db.opt.NamespaceOffset:])) {
1839  		return ErrBannedKey
1840  	}
1841  	return nil
1842  }
1843  
1844  // BanNamespace bans a namespace. Read/write to keys belonging to any of such namespace is denied.
1845  func (db *DB) BanNamespace(ns uint64) error {
1846  	if db.opt.NamespaceOffset < 0 {
1847  		return ErrNamespaceMode
1848  	}
1849  	db.opt.Infof("Banning namespace: %d", ns)
1850  	// First set the banned namespaces in DB and then update the in-memory structure.
1851  	key := y.KeyWithTs(append(bannedNsKey, y.U64ToBytes(ns)...), 1)
1852  	entry := []*Entry{{
1853  		Key:   key,
1854  		Value: nil,
1855  	}}
1856  	req, err := db.sendToWriteCh(entry)
1857  	if err != nil {
1858  		return err
1859  	}
1860  	if err := req.Wait(); err != nil {
1861  		return err
1862  	}
1863  	db.bannedNamespaces.add(ns)
1864  	return nil
1865  }
1866  
1867  // BannedNamespaces returns the list of prefixes banned for DB.
1868  func (db *DB) BannedNamespaces() []uint64 {
1869  	return db.bannedNamespaces.all()
1870  }
1871  
1872  // KVList contains a list of key-value pairs.
1873  type KVList = pb.KVList
1874  
1875  // Subscribe can be used to watch key changes for the given key prefixes and the ignore string.
1876  // At least one prefix should be passed, or an error will be returned.
1877  // You can use an empty prefix to monitor all changes to the DB.
1878  // Ignore string is the byte ranges for which prefix matching will be ignored.
1879  // For example: ignore = "2-3", and prefix = "abc" will match for keys "abxxc", "abdfc" etc.
1880  // This function blocks until the given context is done or an error occurs.
1881  // The given function will be called with a new KVList containing the modified keys and the
1882  // corresponding values.
1883  func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches []pb.Match) error {
1884  	if cb == nil {
1885  		return ErrNilCallback
1886  	}
1887  
1888  	c := z.NewCloser(1)
1889  	s, err := db.pub.newSubscriber(c, matches)
1890  	if err != nil {
1891  		return y.Wrapf(err, "while creating a new subscriber")
1892  	}
1893  	slurp := func(batch *pb.KVList) error {
1894  		for {
1895  			select {
1896  			case kvs := <-s.sendCh:
1897  				batch.Kv = append(batch.Kv, kvs.Kv...)
1898  			default:
1899  				if len(batch.GetKv()) > 0 {
1900  					return cb(batch)
1901  				}
1902  				return nil
1903  			}
1904  		}
1905  	}
1906  
1907  	drain := func() {
1908  		for {
1909  			select {
1910  			case _, ok := <-s.sendCh:
1911  				if !ok {
1912  					// Channel is closed.
1913  					return
1914  				}
1915  			default:
1916  				return
1917  			}
1918  		}
1919  	}
1920  	for {
1921  		select {
1922  		case <-c.HasBeenClosed():
1923  			// No need to delete here. Closer will be called only while
1924  			// closing DB. Subscriber will be deleted by cleanSubscribers.
1925  			err := slurp(new(pb.KVList))
1926  			// Drain if any pending updates.
1927  			c.Done()
1928  			return err
1929  		case <-ctx.Done():
1930  			c.Done()
1931  			s.active.Store(0)
1932  			drain()
1933  			db.pub.deleteSubscriber(s.id)
1934  			// Delete the subscriber to avoid further updates.
1935  			return ctx.Err()
1936  		case batch := <-s.sendCh:
1937  			err := slurp(batch)
1938  			if err != nil {
1939  				c.Done()
1940  				s.active.Store(0)
1941  				drain()
1942  				// Delete the subscriber if there is an error by the callback.
1943  				db.pub.deleteSubscriber(s.id)
1944  				return err
1945  			}
1946  		}
1947  	}
1948  }
1949  
1950  func (db *DB) syncDir(dir string) error {
1951  	if db.opt.InMemory {
1952  		return nil
1953  	}
1954  	return syncDir(dir)
1955  }
1956  
1957  func createDirs(opt Options) error {
1958  	for _, path := range []string{opt.Dir, opt.ValueDir} {
1959  		dirExists, err := exists(path)
1960  		if err != nil {
1961  			return y.Wrapf(err, "Invalid Dir: %q", path)
1962  		}
1963  		if !dirExists {
1964  			if opt.ReadOnly {
1965  				return fmt.Errorf("Cannot find directory %q for read-only open", path)
1966  			}
1967  			// Try to create the directory
1968  			err = os.MkdirAll(path, 0700)
1969  			if err != nil {
1970  				return y.Wrapf(err, "Error Creating Dir: %q", path)
1971  			}
1972  		}
1973  	}
1974  	return nil
1975  }
1976  
1977  // Stream the contents of this DB to a new DB with options outOptions that will be
1978  // created in outDir.
1979  func (db *DB) StreamDB(outOptions Options) error {
1980  	outDir := outOptions.Dir
1981  
1982  	// Open output DB.
1983  	outDB, err := OpenManaged(outOptions)
1984  	if err != nil {
1985  		return y.Wrapf(err, "cannot open out DB at %s", outDir)
1986  	}
1987  	defer outDB.Close()
1988  	writer := outDB.NewStreamWriter()
1989  	if err := writer.Prepare(); err != nil {
1990  		return y.Wrapf(err, "cannot create stream writer in out DB at %s", outDir)
1991  	}
1992  
1993  	// Stream contents of DB to the output DB.
1994  	stream := db.NewStreamAt(math.MaxUint64)
1995  	stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
1996  
1997  	stream.Send = func(buf *z.Buffer) error {
1998  		return writer.Write(buf)
1999  	}
2000  	if err := stream.Orchestrate(context.Background()); err != nil {
2001  		return y.Wrapf(err, "cannot stream DB to out DB at %s", outDir)
2002  	}
2003  	if err := writer.Flush(); err != nil {
2004  		return y.Wrapf(err, "cannot flush writer")
2005  	}
2006  	return nil
2007  }
2008  
2009  // Opts returns a copy of the DB options.
2010  func (db *DB) Opts() Options {
2011  	return db.opt
2012  }
2013  
2014  type CacheType int
2015  
2016  const (
2017  	BlockCache CacheType = iota
2018  	IndexCache
2019  )
2020  
2021  // CacheMaxCost updates the max cost of the given cache (either block or index cache).
2022  // The call will have an effect only if the DB was created with the cache. Otherwise it is
2023  // a no-op. If you pass a negative value, the function will return the current value
2024  // without updating it.
2025  func (db *DB) CacheMaxCost(cache CacheType, maxCost int64) (int64, error) {
2026  	if db == nil {
2027  		return 0, nil
2028  	}
2029  
2030  	if maxCost < 0 {
2031  		switch cache {
2032  		case BlockCache:
2033  			return db.blockCache.MaxCost(), nil
2034  		case IndexCache:
2035  			return db.indexCache.MaxCost(), nil
2036  		default:
2037  			return 0, errors.New("invalid cache type")
2038  		}
2039  	}
2040  
2041  	switch cache {
2042  	case BlockCache:
2043  		db.blockCache.UpdateMaxCost(maxCost)
2044  		return maxCost, nil
2045  	case IndexCache:
2046  		db.indexCache.UpdateMaxCost(maxCost)
2047  		return maxCost, nil
2048  	default:
2049  		return 0, errors.New("invalid cache type")
2050  	}
2051  }
2052  
2053  func (db *DB) LevelsToString() string {
2054  	levels := db.Levels()
2055  	h := func(sz int64) string {
2056  		return humanize.IBytes(uint64(sz))
2057  	}
2058  	base := func(b bool) string {
2059  		if b {
2060  			return "B"
2061  		}
2062  		return " "
2063  	}
2064  
2065  	var b strings.Builder
2066  	b.WriteRune('\n')
2067  	for _, li := range levels {
2068  		b.WriteString(fmt.Sprintf(
2069  			"Level %d [%s]: NumTables: %02d. Size: %s of %s. Score: %.2f->%.2f"+
2070  				" StaleData: %s Target FileSize: %s\n",
2071  			li.Level, base(li.IsBaseLevel), li.NumTables,
2072  			h(li.Size), h(li.TargetSize), li.Score, li.Adjusted, h(li.StaleDatSize),
2073  			h(li.TargetFileSize)))
2074  	}
2075  	b.WriteString("Level Done\n")
2076  	return b.String()
2077  }
2078