levels.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package badger
   7  
   8  import (
   9  	"bytes"
  10  	"context"
  11  	"encoding/hex"
  12  	"errors"
  13  	"fmt"
  14  	"math"
  15  	"math/rand"
  16  	"os"
  17  	"sort"
  18  	"strings"
  19  	"sync"
  20  	"sync/atomic"
  21  	"time"
  22  
  23  	"go.opentelemetry.io/otel"
  24  	"go.opentelemetry.io/otel/attribute"
  25  
  26  	"github.com/dgraph-io/badger/v4/pb"
  27  	"github.com/dgraph-io/badger/v4/table"
  28  	"github.com/dgraph-io/badger/v4/y"
  29  	"github.com/dgraph-io/ristretto/v2/z"
  30  )
  31  
  32  type levelsController struct {
  33  	nextFileID atomic.Uint64
  34  	l0stallsMs atomic.Int64
  35  
  36  	// The following are initialized once and const.
  37  	levels []*levelHandler
  38  	kv     *DB
  39  
  40  	cstatus compactStatus
  41  }
  42  
  43  // revertToManifest checks that all necessary table files exist and removes all table files not
  44  // referenced by the manifest. idMap is a set of table file id's that were read from the directory
  45  // listing.
  46  func revertToManifest(kv *DB, mf *Manifest, idMap map[uint64]struct{}) error {
  47  	// 1. Check all files in manifest exist.
  48  	for id := range mf.Tables {
  49  		if _, ok := idMap[id]; !ok {
  50  			return fmt.Errorf("file does not exist for table %d", id)
  51  		}
  52  	}
  53  
  54  	// 2. Delete files that shouldn't exist.
  55  	for id := range idMap {
  56  		if _, ok := mf.Tables[id]; !ok {
  57  			kv.opt.Debugf("Table file %d not referenced in MANIFEST\n", id)
  58  			filename := table.NewFilename(id, kv.opt.Dir)
  59  			if err := os.Remove(filename); err != nil {
  60  				return y.Wrapf(err, "While removing table %d", id)
  61  			}
  62  		}
  63  	}
  64  
  65  	return nil
  66  }
  67  
  68  func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
  69  	y.AssertTrue(db.opt.NumLevelZeroTablesStall > db.opt.NumLevelZeroTables)
  70  	s := &levelsController{
  71  		kv:     db,
  72  		levels: make([]*levelHandler, db.opt.MaxLevels),
  73  	}
  74  	s.cstatus.tables = make(map[uint64]struct{})
  75  	s.cstatus.levels = make([]*levelCompactStatus, db.opt.MaxLevels)
  76  
  77  	for i := 0; i < db.opt.MaxLevels; i++ {
  78  		s.levels[i] = newLevelHandler(db, i)
  79  		s.cstatus.levels[i] = new(levelCompactStatus)
  80  	}
  81  
  82  	if db.opt.InMemory {
  83  		return s, nil
  84  	}
  85  	// Compare manifest against directory, check for existent/non-existent files, and remove.
  86  	if err := revertToManifest(db, mf, getIDMap(db.opt.Dir)); err != nil {
  87  		return nil, err
  88  	}
  89  
  90  	var mu sync.Mutex
  91  	tables := make([][]*table.Table, db.opt.MaxLevels)
  92  	var maxFileID uint64
  93  
  94  	// We found that using 3 goroutines allows disk throughput to be utilized to its max.
  95  	// Disk utilization is the main thing we should focus on, while trying to read the data. That's
  96  	// the one factor that remains constant between HDD and SSD.
  97  	throttle := y.NewThrottle(3)
  98  
  99  	start := time.Now()
 100  	var numOpened atomic.Int32
 101  	tick := time.NewTicker(3 * time.Second)
 102  	defer tick.Stop()
 103  
 104  	for fileID, tf := range mf.Tables {
 105  		fname := table.NewFilename(fileID, db.opt.Dir)
 106  		select {
 107  		case <-tick.C:
 108  			db.opt.Infof("%d tables out of %d opened in %s\n", numOpened.Load(),
 109  				len(mf.Tables), time.Since(start).Round(time.Millisecond))
 110  		default:
 111  		}
 112  		if err := throttle.Do(); err != nil {
 113  			closeAllTables(tables)
 114  			return nil, err
 115  		}
 116  		if fileID > maxFileID {
 117  			maxFileID = fileID
 118  		}
 119  		go func(fname string, tf TableManifest) {
 120  			var rerr error
 121  			defer func() {
 122  				throttle.Done(rerr)
 123  				numOpened.Add(1)
 124  			}()
 125  			dk, err := db.registry.DataKey(tf.KeyID)
 126  			if err != nil {
 127  				rerr = y.Wrapf(err, "Error while reading datakey")
 128  				return
 129  			}
 130  			topt := buildTableOptions(db)
 131  			// Explicitly set Compression and DataKey based on how the table was generated.
 132  			topt.Compression = tf.Compression
 133  			topt.DataKey = dk
 134  
 135  			mf, err := z.OpenMmapFile(fname, db.opt.getFileFlags(), 0)
 136  			if err != nil {
 137  				rerr = y.Wrapf(err, "Opening file: %q", fname)
 138  				return
 139  			}
 140  			t, err := table.OpenTable(mf, topt)
 141  			if err != nil {
 142  				if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
 143  					db.opt.Errorf(err.Error())
 144  					db.opt.Errorf("Ignoring table %s", mf.Fd.Name())
 145  					// Do not set rerr. We will continue without this table.
 146  				} else {
 147  					rerr = y.Wrapf(err, "Opening table: %q", fname)
 148  				}
 149  				return
 150  			}
 151  
 152  			mu.Lock()
 153  			tables[tf.Level] = append(tables[tf.Level], t)
 154  			mu.Unlock()
 155  		}(fname, tf)
 156  	}
 157  	if err := throttle.Finish(); err != nil {
 158  		closeAllTables(tables)
 159  		return nil, err
 160  	}
 161  	db.opt.Infof("All %d tables opened in %s\n", numOpened.Load(),
 162  		time.Since(start).Round(time.Millisecond))
 163  	s.nextFileID.Store(maxFileID + 1)
 164  	for i, tbls := range tables {
 165  		s.levels[i].initTables(tbls)
 166  	}
 167  
 168  	// Make sure key ranges do not overlap etc.
 169  	if err := s.validate(); err != nil {
 170  		_ = s.cleanupLevels()
 171  		return nil, y.Wrap(err, "Level validation")
 172  	}
 173  
 174  	// Sync directory (because we have at least removed some files, or previously created the
 175  	// manifest file).
 176  	if err := syncDir(db.opt.Dir); err != nil {
 177  		_ = s.close()
 178  		return nil, err
 179  	}
 180  
 181  	return s, nil
 182  }
 183  
 184  // Closes the tables, for cleanup in newLevelsController.  (We Close() instead of using DecrRef()
 185  // because that would delete the underlying files.)  We ignore errors, which is OK because tables
 186  // are read-only.
 187  func closeAllTables(tables [][]*table.Table) {
 188  	for _, tableSlice := range tables {
 189  		for _, table := range tableSlice {
 190  			_ = table.Close(-1)
 191  		}
 192  	}
 193  }
 194  
 195  func (s *levelsController) cleanupLevels() error {
 196  	var firstErr error
 197  	for _, l := range s.levels {
 198  		if err := l.close(); err != nil && firstErr == nil {
 199  			firstErr = err
 200  		}
 201  	}
 202  	return firstErr
 203  }
 204  
 205  // dropTree picks all tables from all levels, creates a manifest changeset,
 206  // applies it, and then decrements the refs of these tables, which would result
 207  // in their deletion.
 208  func (s *levelsController) dropTree() (int, error) {
 209  	// First pick all tables, so we can create a manifest changelog.
 210  	var all []*table.Table
 211  	for _, l := range s.levels {
 212  		l.RLock()
 213  		all = append(all, l.tables...)
 214  		l.RUnlock()
 215  	}
 216  	if len(all) == 0 {
 217  		return 0, nil
 218  	}
 219  
 220  	// Generate the manifest changes.
 221  	changes := []*pb.ManifestChange{}
 222  	for _, table := range all {
 223  		// Add a delete change only if the table is not in memory.
 224  		if !table.IsInmemory {
 225  			changes = append(changes, newDeleteChange(table.ID()))
 226  		}
 227  	}
 228  	changeSet := pb.ManifestChangeSet{Changes: changes}
 229  	if err := s.kv.manifest.addChanges(changeSet.Changes, s.kv.opt); err != nil {
 230  		return 0, err
 231  	}
 232  
 233  	// Now that manifest has been successfully written, we can delete the tables.
 234  	for _, l := range s.levels {
 235  		l.Lock()
 236  		l.totalSize = 0
 237  		l.tables = l.tables[:0]
 238  		l.Unlock()
 239  	}
 240  	for _, table := range all {
 241  		if err := table.DecrRef(); err != nil {
 242  			return 0, err
 243  		}
 244  	}
 245  	return len(all), nil
 246  }
 247  
 248  // dropPrefix runs a L0->L1 compaction, and then runs same level compaction on the rest of the
 249  // levels. For L0->L1 compaction, it runs compactions normally, but skips over
 250  // all the keys with the provided prefix.
 251  // For Li->Li compactions, it picks up the tables which would have the prefix. The
 252  // tables who only have keys with this prefix are quickly dropped. The ones which have other keys
 253  // are run through MergeIterator and compacted to create new tables. All the mechanisms of
 254  // compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow.
 255  func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
 256  	opt := s.kv.opt
 257  	// Iterate levels in the reverse order because if we were to iterate from
 258  	// lower level (say level 0) to a higher level (say level 3) we could have
 259  	// a state in which level 0 is compacted and an older version of a key exists in lower level.
 260  	// At this point, if someone creates an iterator, they would see an old
 261  	// value for a key from lower levels. Iterating in reverse order ensures we
 262  	// drop the oldest data first so that lookups never return stale data.
 263  	for i := len(s.levels) - 1; i >= 0; i-- {
 264  		l := s.levels[i]
 265  
 266  		l.RLock()
 267  		if l.level == 0 {
 268  			size := len(l.tables)
 269  			l.RUnlock()
 270  
 271  			if size > 0 {
 272  				cp := compactionPriority{
 273  					level: 0,
 274  					score: 1.74,
 275  					// A unique number greater than 1.0 does two things. Helps identify this
 276  					// function in logs, and forces a compaction.
 277  					dropPrefixes: prefixes,
 278  				}
 279  				if err := s.doCompact(174, cp); err != nil {
 280  					opt.Warningf("While compacting level 0: %v", err)
 281  					return nil
 282  				}
 283  			}
 284  			continue
 285  		}
 286  
 287  		// Build a list of compaction tableGroups affecting all the prefixes we
 288  		// need to drop. We need to build tableGroups that satisfy the invariant that
 289  		// bottom tables are consecutive.
 290  		// tableGroup contains groups of consecutive tables.
 291  		var tableGroups [][]*table.Table
 292  		var tableGroup []*table.Table
 293  
 294  		finishGroup := func() {
 295  			if len(tableGroup) > 0 {
 296  				tableGroups = append(tableGroups, tableGroup)
 297  				tableGroup = nil
 298  			}
 299  		}
 300  
 301  		for _, table := range l.tables {
 302  			if containsAnyPrefixes(table, prefixes) {
 303  				tableGroup = append(tableGroup, table)
 304  			} else {
 305  				finishGroup()
 306  			}
 307  		}
 308  		finishGroup()
 309  
 310  		l.RUnlock()
 311  
 312  		if len(tableGroups) == 0 {
 313  			continue
 314  		}
 315  		opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups))
 316  		for _, operation := range tableGroups {
 317  			cd := compactDef{
 318  				thisLevel:    l,
 319  				nextLevel:    l,
 320  				top:          nil,
 321  				bot:          operation,
 322  				dropPrefixes: prefixes,
 323  				t:            s.levelTargets(),
 324  			}
 325  			_, span := otel.Tracer("").Start(context.TODO(), "Badger.Compaction")
 326  			span.SetAttributes(attribute.Int("Compaction level", l.level))
 327  			span.SetAttributes(attribute.String("Drop Prefixes", fmt.Sprintf("%v", prefixes)))
 328  			cd.t.baseLevel = l.level
 329  			if err := s.runCompactDef(-1, l.level, cd); err != nil {
 330  				opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
 331  				span.End()
 332  				return err
 333  			}
 334  			span.SetAttributes(
 335  				attribute.Int("Top tables count", len(cd.top)),
 336  				attribute.Int("Bottom tables count", len(cd.bot)))
 337  			span.End()
 338  		}
 339  
 340  	}
 341  	return nil
 342  }
 343  
 344  func (s *levelsController) startCompact(lc *z.Closer) {
 345  	n := s.kv.opt.NumCompactors
 346  	lc.AddRunning(n - 1)
 347  	for i := 0; i < n; i++ {
 348  		go s.runCompactor(i, lc)
 349  	}
 350  }
 351  
 352  type targets struct {
 353  	baseLevel int
 354  	targetSz  []int64
 355  	fileSz    []int64
 356  }
 357  
 358  // levelTargets calculates the targets for levels in the LSM tree. The idea comes from Dynamic Level
 359  // Sizes ( https://rocksdb.org/blog/2015/07/23/dynamic-level.html ) in RocksDB. The sizes of levels
 360  // are calculated based on the size of the lowest level, typically L6. So, if L6 size is 1GB, then
 361  // L5 target size is 100MB, L4 target size is 10MB and so on.
 362  //
 363  // L0 files don't automatically go to L1. Instead, they get compacted to Lbase, where Lbase is
 364  // chosen based on the first level which is non-empty from top (check L1 through L6). For an empty
 365  // DB, that would be L6.  So, L0 compactions go to L6, then L5, L4 and so on.
 366  //
 367  // Lbase is advanced to the upper levels when its target size exceeds BaseLevelSize. For
 368  // example, when L6 reaches 1.1GB, then L4 target sizes becomes 11MB, thus exceeding the
 369  // BaseLevelSize of 10MB. L3 would then become the new Lbase, with a target size of 1MB <
 370  // BaseLevelSize.
 371  func (s *levelsController) levelTargets() targets {
 372  	adjust := func(sz int64) int64 {
 373  		if sz < s.kv.opt.BaseLevelSize {
 374  			return s.kv.opt.BaseLevelSize
 375  		}
 376  		return sz
 377  	}
 378  
 379  	t := targets{
 380  		targetSz: make([]int64, len(s.levels)),
 381  		fileSz:   make([]int64, len(s.levels)),
 382  	}
 383  	// DB size is the size of the last level.
 384  	dbSize := s.lastLevel().getTotalSize()
 385  	for i := len(s.levels) - 1; i > 0; i-- {
 386  		ltarget := adjust(dbSize)
 387  		t.targetSz[i] = ltarget
 388  		if t.baseLevel == 0 && ltarget <= s.kv.opt.BaseLevelSize {
 389  			t.baseLevel = i
 390  		}
 391  		dbSize /= int64(s.kv.opt.LevelSizeMultiplier)
 392  	}
 393  
 394  	tsz := s.kv.opt.BaseTableSize
 395  	for i := 0; i < len(s.levels); i++ {
 396  		if i == 0 {
 397  			// Use MemTableSize for Level 0. Because at Level 0, we stop compactions based on the
 398  			// number of tables, not the size of the level. So, having a 1:1 size ratio between
 399  			// memtable size and the size of L0 files is better than churning out 32 files per
 400  			// memtable (assuming 64MB MemTableSize and 2MB BaseTableSize).
 401  			t.fileSz[i] = s.kv.opt.MemTableSize
 402  		} else if i <= t.baseLevel {
 403  			t.fileSz[i] = tsz
 404  		} else {
 405  			tsz *= int64(s.kv.opt.TableSizeMultiplier)
 406  			t.fileSz[i] = tsz
 407  		}
 408  	}
 409  
 410  	// Bring the base level down to the last empty level.
 411  	for i := t.baseLevel + 1; i < len(s.levels)-1; i++ {
 412  		if s.levels[i].getTotalSize() > 0 {
 413  			break
 414  		}
 415  		t.baseLevel = i
 416  	}
 417  
 418  	// If the base level is empty and the next level size is less than the
 419  	// target size, pick the next level as the base level.
 420  	b := t.baseLevel
 421  	lvl := s.levels
 422  	if b < len(lvl)-1 && lvl[b].getTotalSize() == 0 && lvl[b+1].getTotalSize() < t.targetSz[b+1] {
 423  		t.baseLevel++
 424  	}
 425  	return t
 426  }
 427  
 428  func (s *levelsController) runCompactor(id int, lc *z.Closer) {
 429  	defer lc.Done()
 430  
 431  	randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
 432  	select {
 433  	case <-randomDelay.C:
 434  	case <-lc.HasBeenClosed():
 435  		randomDelay.Stop()
 436  		return
 437  	}
 438  
 439  	moveL0toFront := func(prios []compactionPriority) []compactionPriority {
 440  		idx := -1
 441  		for i, p := range prios {
 442  			if p.level == 0 {
 443  				idx = i
 444  				break
 445  			}
 446  		}
 447  		// If idx == -1, we didn't find L0.
 448  		// If idx == 0, then we don't need to do anything. L0 is already at the front.
 449  		if idx > 0 {
 450  			out := append([]compactionPriority{}, prios[idx])
 451  			out = append(out, prios[:idx]...)
 452  			out = append(out, prios[idx+1:]...)
 453  			return out
 454  		}
 455  		return prios
 456  	}
 457  
 458  	run := func(p compactionPriority) bool {
 459  		err := s.doCompact(id, p)
 460  		switch err {
 461  		case nil:
 462  			return true
 463  		case errFillTables:
 464  			// pass
 465  		default:
 466  			s.kv.opt.Warningf("While running doCompact: %v\n", err)
 467  		}
 468  		return false
 469  	}
 470  
 471  	var priosBuffer []compactionPriority
 472  	runOnce := func() bool {
 473  		prios := s.pickCompactLevels(priosBuffer)
 474  		defer func() {
 475  			priosBuffer = prios
 476  		}()
 477  		if id == 0 {
 478  			// Worker ID zero prefers to compact L0 always.
 479  			prios = moveL0toFront(prios)
 480  		}
 481  		for _, p := range prios {
 482  			if id == 0 && p.level == 0 {
 483  				// Allow worker zero to run level 0, irrespective of its adjusted score.
 484  			} else if p.adjusted < 1.0 {
 485  				break
 486  			}
 487  			if run(p) {
 488  				return true
 489  			}
 490  		}
 491  
 492  		return false
 493  	}
 494  
 495  	tryLmaxToLmaxCompaction := func() {
 496  		p := compactionPriority{
 497  			level: s.lastLevel().level,
 498  			t:     s.levelTargets(),
 499  		}
 500  		run(p)
 501  
 502  	}
 503  	count := 0
 504  	ticker := time.NewTicker(50 * time.Millisecond)
 505  	defer ticker.Stop()
 506  	for {
 507  		select {
 508  		// Can add a done channel or other stuff.
 509  		case <-ticker.C:
 510  			count++
 511  			// Each ticker is 50ms so 50*200=10seconds.
 512  			if s.kv.opt.LmaxCompaction && id == 2 && count >= 200 {
 513  				tryLmaxToLmaxCompaction()
 514  				count = 0
 515  			} else {
 516  				runOnce()
 517  			}
 518  		case <-lc.HasBeenClosed():
 519  			return
 520  		}
 521  	}
 522  }
 523  
 524  type compactionPriority struct {
 525  	level        int
 526  	score        float64
 527  	adjusted     float64
 528  	dropPrefixes [][]byte
 529  	t            targets
 530  }
 531  
 532  func (s *levelsController) lastLevel() *levelHandler {
 533  	return s.levels[len(s.levels)-1]
 534  }
 535  
 536  // pickCompactLevel determines which level to compact.
 537  // Based on: https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
 538  // It tries to reuse priosBuffer to reduce memory allocation,
 539  // passing nil is acceptable, then new memory will be allocated.
 540  func (s *levelsController) pickCompactLevels(priosBuffer []compactionPriority) (prios []compactionPriority) {
 541  	t := s.levelTargets()
 542  	addPriority := func(level int, score float64) {
 543  		pri := compactionPriority{
 544  			level:    level,
 545  			score:    score,
 546  			adjusted: score,
 547  			t:        t,
 548  		}
 549  		prios = append(prios, pri)
 550  	}
 551  
 552  	// Grow buffer to fit all levels.
 553  	if cap(priosBuffer) < len(s.levels) {
 554  		priosBuffer = make([]compactionPriority, 0, len(s.levels))
 555  	}
 556  	prios = priosBuffer[:0]
 557  
 558  	// Add L0 priority based on the number of tables.
 559  	addPriority(0, float64(s.levels[0].numTables())/float64(s.kv.opt.NumLevelZeroTables))
 560  
 561  	// All other levels use size to calculate priority.
 562  	for i := 1; i < len(s.levels); i++ {
 563  		// Don't consider those tables that are already being compacted right now.
 564  		delSize := s.cstatus.delSize(i)
 565  
 566  		l := s.levels[i]
 567  		sz := l.getTotalSize() - delSize
 568  		addPriority(i, float64(sz)/float64(t.targetSz[i]))
 569  	}
 570  	y.AssertTrue(len(prios) == len(s.levels))
 571  
 572  	// The following code is borrowed from PebbleDB and results in healthier LSM tree structure.
 573  	// If Li-1 has score > 1.0, then we'll divide Li-1 score by Li. If Li score is >= 1.0, then Li-1
 574  	// score is reduced, which means we'll prioritize the compaction of lower levels (L5, L4 and so
 575  	// on) over the higher levels (L0, L1 and so on). On the other hand, if Li score is < 1.0, then
 576  	// we'll increase the priority of Li-1.
 577  	// Overall what this means is, if the bottom level is already overflowing, then de-prioritize
 578  	// compaction of the above level. If the bottom level is not full, then increase the priority of
 579  	// above level.
 580  	var prevLevel int
 581  	for level := t.baseLevel; level < len(s.levels); level++ {
 582  		if prios[prevLevel].adjusted >= 1 {
 583  			// Avoid absurdly large scores by placing a floor on the score that we'll
 584  			// adjust a level by. The value of 0.01 was chosen somewhat arbitrarily
 585  			const minScore = 0.01
 586  			if prios[level].score >= minScore {
 587  				prios[prevLevel].adjusted /= prios[level].adjusted
 588  			} else {
 589  				prios[prevLevel].adjusted /= minScore
 590  			}
 591  		}
 592  		prevLevel = level
 593  	}
 594  
 595  	// Pick all the levels whose original score is >= 1.0, irrespective of their adjusted score.
 596  	// We'll still sort them by their adjusted score below. Having both these scores allows us to
 597  	// make better decisions about compacting L0. If we see a score >= 1.0, we can do L0->L0
 598  	// compactions. If the adjusted score >= 1.0, then we can do L0->Lbase compactions.
 599  	out := prios[:0]
 600  	for _, p := range prios[:len(prios)-1] {
 601  		if p.score >= 1.0 {
 602  			out = append(out, p)
 603  		}
 604  	}
 605  	prios = out
 606  
 607  	// Sort by the adjusted score.
 608  	sort.Slice(prios, func(i, j int) bool {
 609  		return prios[i].adjusted > prios[j].adjusted
 610  	})
 611  	return prios
 612  }
 613  
 614  // checkOverlap checks if the given tables overlap with any level from the given "lev" onwards.
 615  func (s *levelsController) checkOverlap(tables []*table.Table, lev int) bool {
 616  	kr := getKeyRange(tables...)
 617  	for i, lh := range s.levels {
 618  		if i < lev { // Skip upper levels.
 619  			continue
 620  		}
 621  		lh.RLock()
 622  		left, right := lh.overlappingTables(levelHandlerRLocked{}, kr)
 623  		lh.RUnlock()
 624  		if right-left > 0 {
 625  			return true
 626  		}
 627  	}
 628  	return false
 629  }
 630  
 631  // subcompact runs a single sub-compaction, iterating over the specified key-range only.
 632  //
 633  // We use splits to do a single compaction concurrently. If we have >= 3 tables
 634  // involved in the bottom level during compaction, we choose key ranges to
 635  // split the main compaction up into sub-compactions. Each sub-compaction runs
 636  // concurrently, only iterating over the provided key range, generating tables.
 637  // This speeds up the compaction significantly.
 638  func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
 639  	inflightBuilders *y.Throttle, res chan<- *table.Table) {
 640  
 641  	// Check overlap of the top level with the levels which are not being
 642  	// compacted in this compaction.
 643  	hasOverlap := s.checkOverlap(cd.allTables(), cd.nextLevel.level+1)
 644  
 645  	// Pick a discard ts, so we can discard versions below this ts. We should
 646  	// never discard any versions starting from above this timestamp, because
 647  	// that would affect the snapshot view guarantee provided by transactions.
 648  	discardTs := s.kv.orc.discardAtOrBelow()
 649  
 650  	// Try to collect stats so that we can inform value log about GC. That would help us find which
 651  	// value log file should be GCed.
 652  	discardStats := make(map[uint32]int64)
 653  	updateStats := func(vs y.ValueStruct) {
 654  		// We don't need to store/update discard stats when badger is running in Disk-less mode.
 655  		if s.kv.opt.InMemory {
 656  			return
 657  		}
 658  		if vs.Meta&bitValuePointer > 0 {
 659  			var vp valuePointer
 660  			vp.Decode(vs.Value)
 661  			discardStats[vp.Fid] += int64(vp.Len)
 662  		}
 663  	}
 664  
 665  	// exceedsAllowedOverlap returns true if the given key range would overlap with more than 10
 666  	// tables from level below nextLevel (nextLevel+1). This helps avoid generating tables at Li
 667  	// with huge overlaps with Li+1.
 668  	exceedsAllowedOverlap := func(kr keyRange) bool {
 669  		n2n := cd.nextLevel.level + 1
 670  		if n2n <= 1 || n2n >= len(s.levels) {
 671  			return false
 672  		}
 673  		n2nl := s.levels[n2n]
 674  		n2nl.RLock()
 675  		defer n2nl.RUnlock()
 676  
 677  		l, r := n2nl.overlappingTables(levelHandlerRLocked{}, kr)
 678  		return r-l >= 10
 679  	}
 680  
 681  	var (
 682  		lastKey, skipKey       []byte
 683  		numBuilds, numVersions int
 684  		// Denotes if the first key is a series of duplicate keys had
 685  		// "DiscardEarlierVersions" set
 686  		firstKeyHasDiscardSet bool
 687  	)
 688  
 689  	addKeys := func(builder *table.Builder) {
 690  		timeStart := time.Now()
 691  		var numKeys, numSkips uint64
 692  		var rangeCheck int
 693  		var tableKr keyRange
 694  		for ; it.Valid(); it.Next() {
 695  			// See if we need to skip the prefix.
 696  			if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) {
 697  				numSkips++
 698  				updateStats(it.Value())
 699  				continue
 700  			}
 701  
 702  			// See if we need to skip this key.
 703  			if len(skipKey) > 0 {
 704  				if y.SameKey(it.Key(), skipKey) {
 705  					numSkips++
 706  					updateStats(it.Value())
 707  					continue
 708  				} else {
 709  					skipKey = skipKey[:0]
 710  				}
 711  			}
 712  
 713  			if !y.SameKey(it.Key(), lastKey) {
 714  				firstKeyHasDiscardSet = false
 715  				if len(kr.right) > 0 && y.CompareKeys(it.Key(), kr.right) >= 0 {
 716  					break
 717  				}
 718  				if builder.ReachedCapacity() {
 719  					// Only break if we are on a different key, and have reached capacity. We want
 720  					// to ensure that all versions of the key are stored in the same sstable, and
 721  					// not divided across multiple tables at the same level.
 722  					break
 723  				}
 724  				lastKey = y.SafeCopy(lastKey, it.Key())
 725  				numVersions = 0
 726  				firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0
 727  
 728  				if len(tableKr.left) == 0 {
 729  					tableKr.left = y.SafeCopy(tableKr.left, it.Key())
 730  				}
 731  				tableKr.right = lastKey
 732  
 733  				rangeCheck++
 734  				if rangeCheck%5000 == 0 {
 735  					// This table's range exceeds the allowed range overlap with the level after
 736  					// next. So, we stop writing to this table. If we don't do this, then we end up
 737  					// doing very expensive compactions involving too many tables. To amortize the
 738  					// cost of this check, we do it only every N keys.
 739  					if exceedsAllowedOverlap(tableKr) {
 740  						// s.kv.opt.Debugf("L%d -> L%d Breaking due to exceedsAllowedOverlap with
 741  						// kr: %s\n", cd.thisLevel.level, cd.nextLevel.level, tableKr)
 742  						break
 743  					}
 744  				}
 745  			}
 746  
 747  			vs := it.Value()
 748  			version := y.ParseTs(it.Key())
 749  
 750  			isExpired := isDeletedOrExpired(vs.Meta, vs.ExpiresAt)
 751  
 752  			// Do not discard entries inserted by merge operator. These entries will be
 753  			// discarded once they're merged
 754  			if version <= discardTs && vs.Meta&bitMergeEntry == 0 {
 755  				// Keep track of the number of versions encountered for this key. Only consider the
 756  				// versions which are below the minReadTs, otherwise, we might end up discarding the
 757  				// only valid version for a running transaction.
 758  				numVersions++
 759  				// Keep the current version and discard all the next versions if
 760  				// - The `discardEarlierVersions` bit is set OR
 761  				// - We've already processed `NumVersionsToKeep` number of versions
 762  				// (including the current item being processed)
 763  				lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 ||
 764  					numVersions == s.kv.opt.NumVersionsToKeep
 765  
 766  				if isExpired || lastValidVersion {
 767  					// If this version of the key is deleted or expired, skip all the rest of the
 768  					// versions. Ensure that we're only removing versions below readTs.
 769  					skipKey = y.SafeCopy(skipKey, it.Key())
 770  
 771  					switch {
 772  					// Add the key to the table only if it has not expired.
 773  					// We don't want to add the deleted/expired keys.
 774  					case !isExpired && lastValidVersion:
 775  						// Add this key. We have set skipKey, so the following key versions
 776  						// would be skipped.
 777  					case hasOverlap:
 778  						// If this key range has overlap with lower levels, then keep the deletion
 779  						// marker with the latest version, discarding the rest. We have set skipKey,
 780  						// so the following key versions would be skipped.
 781  					default:
 782  						// If no overlap, we can skip all the versions, by continuing here.
 783  						numSkips++
 784  						updateStats(vs)
 785  						continue // Skip adding this key.
 786  					}
 787  				}
 788  			}
 789  			numKeys++
 790  			var vp valuePointer
 791  			if vs.Meta&bitValuePointer > 0 {
 792  				vp.Decode(vs.Value)
 793  			}
 794  			switch {
 795  			case firstKeyHasDiscardSet:
 796  				// This key is same as the last key which had "DiscardEarlierVersions" set. The
 797  				// the next compactions will drop this key if its ts >
 798  				// discardTs (of the next compaction).
 799  				builder.AddStaleKey(it.Key(), vs, vp.Len)
 800  			case isExpired:
 801  				// If the key is expired, the next compaction will drop it if
 802  				// its ts > discardTs (of the next compaction).
 803  				builder.AddStaleKey(it.Key(), vs, vp.Len)
 804  			default:
 805  				builder.Add(it.Key(), vs, vp.Len)
 806  			}
 807  		}
 808  		s.kv.opt.Debugf("[%d] LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
 809  			cd.compactorId, numKeys, numSkips, time.Since(timeStart).Round(time.Millisecond))
 810  	} // End of function: addKeys
 811  
 812  	if len(kr.left) > 0 {
 813  		it.Seek(kr.left)
 814  	} else {
 815  		it.Rewind()
 816  	}
 817  	for it.Valid() {
 818  		if len(kr.right) > 0 && y.CompareKeys(it.Key(), kr.right) >= 0 {
 819  			break
 820  		}
 821  
 822  		bopts := buildTableOptions(s.kv)
 823  		// Set TableSize to the target file size for that level.
 824  		bopts.TableSize = uint64(cd.t.fileSz[cd.nextLevel.level])
 825  		builder := table.NewTableBuilder(bopts)
 826  
 827  		// This would do the iteration and add keys to builder.
 828  		addKeys(builder)
 829  
 830  		// It was true that it.Valid() at least once in the loop above, which means we
 831  		// called Add() at least once, and builder is not Empty().
 832  		if builder.Empty() {
 833  			// Cleanup builder resources:
 834  			builder.Finish()
 835  			builder.Close()
 836  			continue
 837  		}
 838  		numBuilds++
 839  		if err := inflightBuilders.Do(); err != nil {
 840  			// Can't return from here, until I decrRef all the tables that I built so far.
 841  			break
 842  		}
 843  		go func(builder *table.Builder, fileID uint64) {
 844  			var err error
 845  			defer inflightBuilders.Done(err)
 846  			defer builder.Close()
 847  
 848  			var tbl *table.Table
 849  			if s.kv.opt.InMemory {
 850  				tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
 851  			} else {
 852  				fname := table.NewFilename(fileID, s.kv.opt.Dir)
 853  				tbl, err = table.CreateTable(fname, builder)
 854  			}
 855  
 856  			// If we couldn't build the table, return fast.
 857  			if err != nil {
 858  				return
 859  			}
 860  			res <- tbl
 861  		}(builder, s.reserveFileID())
 862  	}
 863  	s.kv.vlog.updateDiscardStats(discardStats)
 864  	s.kv.opt.Debugf("Discard stats: %v", discardStats)
 865  }
 866  
 867  // compactBuildTables merges topTables and botTables to form a list of new tables.
 868  func (s *levelsController) compactBuildTables(
 869  	lev int, cd compactDef) ([]*table.Table, func() error, error) {
 870  
 871  	topTables := cd.top
 872  	botTables := cd.bot
 873  
 874  	numTables := int64(len(topTables) + len(botTables))
 875  	y.NumCompactionTablesAdd(s.kv.opt.MetricsEnabled, numTables)
 876  	defer y.NumCompactionTablesAdd(s.kv.opt.MetricsEnabled, -numTables)
 877  
 878  	keepTable := func(t *table.Table) bool {
 879  		for _, prefix := range cd.dropPrefixes {
 880  			if bytes.HasPrefix(t.Smallest(), prefix) &&
 881  				bytes.HasPrefix(t.Biggest(), prefix) {
 882  				// All the keys in this table have the dropPrefix. So, this
 883  				// table does not need to be in the iterator and can be
 884  				// dropped immediately.
 885  				return false
 886  			}
 887  		}
 888  		return true
 889  	}
 890  	var valid []*table.Table
 891  	for _, table := range botTables {
 892  		if keepTable(table) {
 893  			valid = append(valid, table)
 894  		}
 895  	}
 896  
 897  	newIterator := func() []y.Iterator {
 898  		// Create iterators across all the tables involved first.
 899  		var iters []y.Iterator
 900  		switch {
 901  		case lev == 0:
 902  			iters = appendIteratorsReversed(iters, topTables, table.NOCACHE)
 903  		case len(topTables) > 0:
 904  			y.AssertTrue(len(topTables) == 1)
 905  			iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)}
 906  		}
 907  		// Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
 908  		return append(iters, table.NewConcatIterator(valid, table.NOCACHE))
 909  	}
 910  
 911  	res := make(chan *table.Table, 3)
 912  	inflightBuilders := y.NewThrottle(8 + len(cd.splits))
 913  	for _, kr := range cd.splits {
 914  		// Initiate Do here so we can register the goroutines for buildTables too.
 915  		if err := inflightBuilders.Do(); err != nil {
 916  			s.kv.opt.Errorf("cannot start subcompaction: %+v", err)
 917  			return nil, nil, err
 918  		}
 919  		go func(kr keyRange) {
 920  			defer inflightBuilders.Done(nil)
 921  			it := table.NewMergeIterator(newIterator(), false)
 922  			defer it.Close()
 923  			s.subcompact(it, kr, cd, inflightBuilders, res)
 924  		}(kr)
 925  	}
 926  
 927  	var newTables []*table.Table
 928  	var wg sync.WaitGroup
 929  	wg.Add(1)
 930  	go func() {
 931  		defer wg.Done()
 932  		for t := range res {
 933  			newTables = append(newTables, t)
 934  		}
 935  	}()
 936  
 937  	// Wait for all table builders to finish and also for newTables accumulator to finish.
 938  	err := inflightBuilders.Finish()
 939  	close(res)
 940  	wg.Wait() // Wait for all tables to be picked up.
 941  
 942  	if err == nil {
 943  		// Ensure created files' directory entries are visible.  We don't mind the extra latency
 944  		// from not doing this ASAP after all file creation has finished because this is a
 945  		// background operation.
 946  		err = s.kv.syncDir(s.kv.opt.Dir)
 947  	}
 948  
 949  	if err != nil {
 950  		// An error happened.  Delete all the newly created table files (by calling DecrRef
 951  		// -- we're the only holders of a ref).
 952  		_ = decrRefs(newTables)
 953  		return nil, nil, y.Wrapf(err, "while running compactions for: %+v", cd)
 954  	}
 955  
 956  	sort.Slice(newTables, func(i, j int) bool {
 957  		return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0
 958  	})
 959  	return newTables, func() error { return decrRefs(newTables) }, nil
 960  }
 961  
 962  func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeSet {
 963  	changes := []*pb.ManifestChange{}
 964  	for _, table := range newTables {
 965  		changes = append(changes,
 966  			newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID(), table.CompressionType()))
 967  	}
 968  	for _, table := range cd.top {
 969  		// Add a delete change only if the table is not in memory.
 970  		if !table.IsInmemory {
 971  			changes = append(changes, newDeleteChange(table.ID()))
 972  		}
 973  	}
 974  	for _, table := range cd.bot {
 975  		changes = append(changes, newDeleteChange(table.ID()))
 976  	}
 977  	return pb.ManifestChangeSet{Changes: changes}
 978  }
 979  
 980  func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool {
 981  	for _, prefix := range listOfPrefixes {
 982  		if bytes.HasPrefix(s, prefix) {
 983  			return true
 984  		}
 985  	}
 986  
 987  	return false
 988  }
 989  
 990  func containsPrefix(table *table.Table, prefix []byte) bool {
 991  	smallValue := table.Smallest()
 992  	largeValue := table.Biggest()
 993  	if bytes.HasPrefix(smallValue, prefix) {
 994  		return true
 995  	}
 996  	if bytes.HasPrefix(largeValue, prefix) {
 997  		return true
 998  	}
 999  	isPresent := func() bool {
1000  		ti := table.NewIterator(0)
1001  		defer ti.Close()
1002  		// In table iterator's Seek, we assume that key has version in last 8 bytes. We set
1003  		// version=0 (ts=math.MaxUint64), so that we don't skip the key prefixed with prefix.
1004  		ti.Seek(y.KeyWithTs(prefix, math.MaxUint64))
1005  		return bytes.HasPrefix(ti.Key(), prefix)
1006  	}
1007  
1008  	if bytes.Compare(prefix, smallValue) > 0 &&
1009  		bytes.Compare(prefix, largeValue) < 0 {
1010  		// There may be a case when table contains [0x0000,...., 0xffff]. If we are searching for
1011  		// k=0x0011, we should not directly infer that k is present. It may not be present.
1012  		return isPresent()
1013  	}
1014  
1015  	return false
1016  }
1017  
1018  func containsAnyPrefixes(table *table.Table, listOfPrefixes [][]byte) bool {
1019  	for _, prefix := range listOfPrefixes {
1020  		if containsPrefix(table, prefix) {
1021  			return true
1022  		}
1023  	}
1024  
1025  	return false
1026  }
1027  
1028  type compactDef struct {
1029  	compactorId int
1030  	t           targets
1031  	p           compactionPriority
1032  	thisLevel   *levelHandler
1033  	nextLevel   *levelHandler
1034  
1035  	top []*table.Table
1036  	bot []*table.Table
1037  
1038  	thisRange keyRange
1039  	nextRange keyRange
1040  	splits    []keyRange
1041  
1042  	thisSize int64
1043  
1044  	dropPrefixes [][]byte
1045  }
1046  
1047  // addSplits can allow us to run multiple sub-compactions in parallel across the split key ranges.
1048  func (s *levelsController) addSplits(cd *compactDef) {
1049  	cd.splits = cd.splits[:0]
1050  
1051  	// Let's say we have 10 tables in cd.bot and min width = 3. Then, we'll pick
1052  	// 0, 1, 2 (pick), 3, 4, 5 (pick), 6, 7, 8 (pick), 9 (pick, because last table).
1053  	// This gives us 4 picks for 10 tables.
1054  	// In an edge case, 142 tables in bottom led to 48 splits. That's too many splits, because it
1055  	// then uses up a lot of memory for table builder.
1056  	// We should keep it so we have at max 5 splits.
1057  	width := int(math.Ceil(float64(len(cd.bot)) / 5.0))
1058  	if width < 3 {
1059  		width = 3
1060  	}
1061  	skr := cd.thisRange
1062  	skr.extend(cd.nextRange)
1063  
1064  	addRange := func(right []byte) {
1065  		skr.right = y.Copy(right)
1066  		cd.splits = append(cd.splits, skr)
1067  
1068  		skr.left = skr.right
1069  	}
1070  
1071  	for i, t := range cd.bot {
1072  		// last entry in bottom table.
1073  		if i == len(cd.bot)-1 {
1074  			addRange([]byte{})
1075  			return
1076  		}
1077  		if i%width == width-1 {
1078  			// Right is assigned ts=0. The encoding ts bytes take MaxUint64-ts,
1079  			// so, those with smaller TS will be considered larger for the same key.
1080  			// Consider the following.
1081  			// Top table is [A1...C3(deleted)]
1082  			// bot table is [B1....C2]
1083  			// It will generate a split [A1 ... C0], including any records of Key C.
1084  			right := y.KeyWithTs(y.ParseKey(t.Biggest()), 0)
1085  			addRange(right)
1086  		}
1087  	}
1088  }
1089  
1090  func (cd *compactDef) lockLevels() {
1091  	cd.thisLevel.RLock()
1092  	cd.nextLevel.RLock()
1093  }
1094  
1095  func (cd *compactDef) unlockLevels() {
1096  	cd.nextLevel.RUnlock()
1097  	cd.thisLevel.RUnlock()
1098  }
1099  
1100  func (cd *compactDef) allTables() []*table.Table {
1101  	ret := make([]*table.Table, 0, len(cd.top)+len(cd.bot))
1102  	ret = append(ret, cd.top...)
1103  	ret = append(ret, cd.bot...)
1104  	return ret
1105  }
1106  
1107  func (s *levelsController) fillTablesL0ToL0(cd *compactDef) bool {
1108  	if cd.compactorId != 0 {
1109  		// Only compactor zero can work on this.
1110  		return false
1111  	}
1112  
1113  	cd.nextLevel = s.levels[0]
1114  	cd.nextRange = keyRange{}
1115  	cd.bot = nil
1116  
1117  	// Because this level and next level are both level 0, we should NOT acquire
1118  	// the read lock twice, because it can result in a deadlock. So, we don't
1119  	// call compactDef.lockLevels, instead locking the level only once and
1120  	// directly here.
1121  	//
1122  	// As per godocs on RWMutex:
1123  	// If a goroutine holds a RWMutex for reading and another goroutine might
1124  	// call Lock, no goroutine should expect to be able to acquire a read lock
1125  	// until the initial read lock is released. In particular, this prohibits
1126  	// recursive read locking. This is to ensure that the lock eventually
1127  	// becomes available; a blocked Lock call excludes new readers from
1128  	// acquiring the lock.
1129  	y.AssertTrue(cd.thisLevel.level == 0)
1130  	y.AssertTrue(cd.nextLevel.level == 0)
1131  	s.levels[0].RLock()
1132  	defer s.levels[0].RUnlock()
1133  
1134  	s.cstatus.Lock()
1135  	defer s.cstatus.Unlock()
1136  
1137  	top := cd.thisLevel.tables
1138  	var out []*table.Table
1139  	now := time.Now()
1140  	for _, t := range top {
1141  		if t.Size() >= 2*cd.t.fileSz[0] {
1142  			// This file is already big, don't include it.
1143  			continue
1144  		}
1145  		if now.Sub(t.CreatedAt) < 10*time.Second {
1146  			// Just created it 10s ago. Don't pick for compaction.
1147  			continue
1148  		}
1149  		if _, beingCompacted := s.cstatus.tables[t.ID()]; beingCompacted {
1150  			continue
1151  		}
1152  		out = append(out, t)
1153  	}
1154  
1155  	if len(out) < 4 {
1156  		// If we don't have enough tables to merge in L0, don't do it.
1157  		return false
1158  	}
1159  	cd.thisRange = infRange
1160  	cd.top = out
1161  
1162  	// Avoid any other L0 -> Lbase from happening, while this is going on.
1163  	thisLevel := s.cstatus.levels[cd.thisLevel.level]
1164  	thisLevel.ranges = append(thisLevel.ranges, infRange)
1165  	for _, t := range out {
1166  		s.cstatus.tables[t.ID()] = struct{}{}
1167  	}
1168  
1169  	// For L0->L0 compaction, we set the target file size to max, so the output is always one file.
1170  	// This significantly decreases the L0 table stalls and improves the performance.
1171  	cd.t.fileSz[0] = math.MaxUint32
1172  	return true
1173  }
1174  
1175  func (s *levelsController) fillTablesL0ToLbase(cd *compactDef) bool {
1176  	if cd.nextLevel.level == 0 {
1177  		panic("Base level can't be zero.")
1178  	}
1179  	// We keep cd.p.adjusted > 0.0 here to allow functions in db.go to artificially trigger
1180  	// L0->Lbase compactions. Those functions wouldn't be setting the adjusted score.
1181  	if cd.p.adjusted > 0.0 && cd.p.adjusted < 1.0 {
1182  		// Do not compact to Lbase if adjusted score is less than 1.0.
1183  		return false
1184  	}
1185  	cd.lockLevels()
1186  	defer cd.unlockLevels()
1187  
1188  	top := cd.thisLevel.tables
1189  	if len(top) == 0 {
1190  		return false
1191  	}
1192  
1193  	var out []*table.Table
1194  	if len(cd.dropPrefixes) > 0 {
1195  		// Use all tables if drop prefix is set. We don't want to compact only a
1196  		// sub-range. We want to compact all the tables.
1197  		out = top
1198  
1199  	} else {
1200  		var kr keyRange
1201  		// cd.top[0] is the oldest file. So we start from the oldest file first.
1202  		for _, t := range top {
1203  			dkr := getKeyRange(t)
1204  			if kr.overlapsWith(dkr) {
1205  				out = append(out, t)
1206  				kr.extend(dkr)
1207  			} else {
1208  				break
1209  			}
1210  		}
1211  	}
1212  	cd.thisRange = getKeyRange(out...)
1213  	cd.top = out
1214  
1215  	left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange)
1216  	cd.bot = make([]*table.Table, right-left)
1217  	copy(cd.bot, cd.nextLevel.tables[left:right])
1218  
1219  	if len(cd.bot) == 0 {
1220  		cd.nextRange = cd.thisRange
1221  	} else {
1222  		cd.nextRange = getKeyRange(cd.bot...)
1223  	}
1224  	return s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd)
1225  }
1226  
1227  // fillTablesL0 would try to fill tables from L0 to be compacted with Lbase. If
1228  // it can not do that, it would try to compact tables from L0 -> L0.
1229  //
1230  // Say L0 has 10 tables.
1231  // fillTablesL0ToLbase picks up 5 tables to compact from L0 -> L5.
1232  // Next call to fillTablesL0 would run L0ToLbase again, which fails this time.
1233  // So, instead, we run fillTablesL0ToL0, which picks up rest of the 5 tables to
1234  // be compacted within L0. Additionally, it would set the compaction range in
1235  // cstatus to inf, so no other L0 -> Lbase compactions can happen.
1236  // Thus, L0 -> L0 must finish for the next L0 -> Lbase to begin.
1237  func (s *levelsController) fillTablesL0(cd *compactDef) bool {
1238  	if ok := s.fillTablesL0ToLbase(cd); ok {
1239  		return true
1240  	}
1241  	return s.fillTablesL0ToL0(cd)
1242  }
1243  
1244  // sortByStaleData sorts tables based on the amount of stale data they have.
1245  // This is useful in removing tombstones.
1246  func (s *levelsController) sortByStaleDataSize(tables []*table.Table, cd *compactDef) {
1247  	if len(tables) == 0 || cd.nextLevel == nil {
1248  		return
1249  	}
1250  
1251  	sort.Slice(tables, func(i, j int) bool {
1252  		return tables[i].StaleDataSize() > tables[j].StaleDataSize()
1253  	})
1254  }
1255  
1256  // sortByHeuristic sorts tables in increasing order of MaxVersion, so we
1257  // compact older tables first.
1258  func (s *levelsController) sortByHeuristic(tables []*table.Table, cd *compactDef) {
1259  	if len(tables) == 0 || cd.nextLevel == nil {
1260  		return
1261  	}
1262  
1263  	// Sort tables by max version. This is what RocksDB does.
1264  	sort.Slice(tables, func(i, j int) bool {
1265  		return tables[i].MaxVersion() < tables[j].MaxVersion()
1266  	})
1267  }
1268  
1269  // This function should be called with lock on levels.
1270  func (s *levelsController) fillMaxLevelTables(tables []*table.Table, cd *compactDef) bool {
1271  	sortedTables := make([]*table.Table, len(tables))
1272  	copy(sortedTables, tables)
1273  	s.sortByStaleDataSize(sortedTables, cd)
1274  
1275  	if len(sortedTables) > 0 && sortedTables[0].StaleDataSize() == 0 {
1276  		// This is a maxLevel to maxLevel compaction and we don't have any stale data.
1277  		return false
1278  	}
1279  	cd.bot = []*table.Table{}
1280  	collectBotTables := func(t *table.Table, needSz int64) {
1281  		totalSize := t.Size()
1282  
1283  		j := sort.Search(len(tables), func(i int) bool {
1284  			return y.CompareKeys(tables[i].Smallest(), t.Smallest()) >= 0
1285  		})
1286  		y.AssertTrue(tables[j].ID() == t.ID())
1287  		j++
1288  		// Collect tables until we reach the the required size.
1289  		for j < len(tables) {
1290  			newT := tables[j]
1291  			totalSize += newT.Size()
1292  
1293  			if totalSize >= needSz {
1294  				break
1295  			}
1296  			cd.bot = append(cd.bot, newT)
1297  			cd.nextRange.extend(getKeyRange(newT))
1298  			j++
1299  		}
1300  	}
1301  	now := time.Now()
1302  	for _, t := range sortedTables {
1303  		// If the maxVersion is above the discardTs, we won't clean anything in
1304  		// the compaction. So skip this table.
1305  		if t.MaxVersion() > s.kv.orc.discardAtOrBelow() {
1306  			continue
1307  		}
1308  		if now.Sub(t.CreatedAt) < time.Hour {
1309  			// Just created it an hour ago. Don't pick for compaction.
1310  			continue
1311  		}
1312  		// If the stale data size is less than 10 MB, it might not be worth
1313  		// rewriting the table. Skip it.
1314  		if t.StaleDataSize() < 10<<20 {
1315  			continue
1316  		}
1317  
1318  		cd.thisSize = t.Size()
1319  		cd.thisRange = getKeyRange(t)
1320  		// Set the next range as the same as the current range. If we don't do
1321  		// this, we won't be able to run more than one max level compactions.
1322  		cd.nextRange = cd.thisRange
1323  		// If we're already compacting this range, don't do anything.
1324  		if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
1325  			continue
1326  		}
1327  
1328  		// Found a valid table!
1329  		cd.top = []*table.Table{t}
1330  
1331  		needFileSz := cd.t.fileSz[cd.thisLevel.level]
1332  		// The table size is what we want so no need to collect more tables.
1333  		if t.Size() >= needFileSz {
1334  			break
1335  		}
1336  		// TableSize is less than what we want. Collect more tables for compaction.
1337  		// If the level has multiple small tables, we collect all of them
1338  		// together to form a bigger table.
1339  		collectBotTables(t, needFileSz)
1340  		if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
1341  			cd.bot = cd.bot[:0]
1342  			cd.nextRange = keyRange{}
1343  			continue
1344  		}
1345  		return true
1346  	}
1347  	if len(cd.top) == 0 {
1348  		return false
1349  	}
1350  
1351  	return s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd)
1352  }
1353  
1354  func (s *levelsController) fillTables(cd *compactDef) bool {
1355  	cd.lockLevels()
1356  	defer cd.unlockLevels()
1357  
1358  	tables := make([]*table.Table, len(cd.thisLevel.tables))
1359  	copy(tables, cd.thisLevel.tables)
1360  	if len(tables) == 0 {
1361  		return false
1362  	}
1363  	// We're doing a maxLevel to maxLevel compaction. Pick tables based on the stale data size.
1364  	if cd.thisLevel.isLastLevel() {
1365  		return s.fillMaxLevelTables(tables, cd)
1366  	}
1367  	// We pick tables, so we compact older tables first. This is similar to
1368  	// kOldestLargestSeqFirst in RocksDB.
1369  	s.sortByHeuristic(tables, cd)
1370  
1371  	for _, t := range tables {
1372  		cd.thisSize = t.Size()
1373  		cd.thisRange = getKeyRange(t)
1374  		// If we're already compacting this range, don't do anything.
1375  		if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
1376  			continue
1377  		}
1378  		cd.top = []*table.Table{t}
1379  		left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange)
1380  
1381  		cd.bot = make([]*table.Table, right-left)
1382  		copy(cd.bot, cd.nextLevel.tables[left:right])
1383  
1384  		if len(cd.bot) == 0 {
1385  			cd.bot = []*table.Table{}
1386  			cd.nextRange = cd.thisRange
1387  			if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
1388  				continue
1389  			}
1390  			return true
1391  		}
1392  		cd.nextRange = getKeyRange(cd.bot...)
1393  
1394  		if s.cstatus.overlapsWith(cd.nextLevel.level, cd.nextRange) {
1395  			continue
1396  		}
1397  		if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
1398  			continue
1399  		}
1400  		return true
1401  	}
1402  	return false
1403  }
1404  
1405  func (s *levelsController) runCompactDef(id, l int, cd compactDef) (err error) {
1406  	if len(cd.t.fileSz) == 0 {
1407  		return errors.New("Filesizes cannot be zero. Targets are not set")
1408  	}
1409  	timeStart := time.Now()
1410  
1411  	thisLevel := cd.thisLevel
1412  	nextLevel := cd.nextLevel
1413  
1414  	y.AssertTrue(len(cd.splits) == 0)
1415  	if thisLevel.level == nextLevel.level {
1416  		// don't do anything for L0 -> L0 and Lmax -> Lmax.
1417  	} else {
1418  		s.addSplits(&cd)
1419  	}
1420  	if len(cd.splits) == 0 {
1421  		cd.splits = append(cd.splits, keyRange{})
1422  	}
1423  
1424  	// Table should never be moved directly between levels,
1425  	// always be rewritten to allow discarding invalid versions.
1426  
1427  	newTables, decr, err := s.compactBuildTables(l, cd)
1428  	if err != nil {
1429  		return err
1430  	}
1431  	defer func() {
1432  		// Only assign to err, if it's not already nil.
1433  		if decErr := decr(); err == nil {
1434  			err = decErr
1435  		}
1436  	}()
1437  	changeSet := buildChangeSet(&cd, newTables)
1438  
1439  	// We write to the manifest _before_ we delete files (and after we created files)
1440  	if err := s.kv.manifest.addChanges(changeSet.Changes, s.kv.opt); err != nil {
1441  		return err
1442  	}
1443  
1444  	getSizes := func(tables []*table.Table) int64 {
1445  		size := int64(0)
1446  		for _, i := range tables {
1447  			size += i.Size()
1448  		}
1449  		return size
1450  	}
1451  
1452  	sizeNewTables := int64(0)
1453  	sizeOldTables := int64(0)
1454  	if s.kv.opt.MetricsEnabled {
1455  		sizeNewTables = getSizes(newTables)
1456  		sizeOldTables = getSizes(cd.bot) + getSizes(cd.top)
1457  		y.NumBytesCompactionWrittenAdd(s.kv.opt.MetricsEnabled, nextLevel.strLevel, sizeNewTables)
1458  	}
1459  
1460  	// See comment earlier in this function about the ordering of these ops, and the order in which
1461  	// we access levels when reading.
1462  	if err := nextLevel.replaceTables(cd.bot, newTables); err != nil {
1463  		return err
1464  	}
1465  	if err := thisLevel.deleteTables(cd.top); err != nil {
1466  		return err
1467  	}
1468  
1469  	// Note: For level 0, while doCompact is running, it is possible that new tables are added.
1470  	// However, the tables are added only to the end, so it is ok to just delete the first table.
1471  
1472  	from := append(tablesToString(cd.top), tablesToString(cd.bot)...)
1473  	to := tablesToString(newTables)
1474  	if dur := time.Since(timeStart); dur > 2*time.Second {
1475  		var expensive string
1476  		if dur > time.Second {
1477  			expensive = " [E]"
1478  		}
1479  		s.kv.opt.Infof("[%d]%s LOG Compact %d->%d (%d, %d -> %d tables with %d splits)."+
1480  			" [%s] -> [%s], took %v\n, deleted %d bytes",
1481  			id, expensive, thisLevel.level, nextLevel.level, len(cd.top), len(cd.bot),
1482  			len(newTables), len(cd.splits), strings.Join(from, " "), strings.Join(to, " "),
1483  			dur.Round(time.Millisecond), sizeOldTables-sizeNewTables)
1484  	}
1485  
1486  	if cd.thisLevel.level != 0 && len(newTables) > 2*s.kv.opt.LevelSizeMultiplier {
1487  		s.kv.opt.Infof("This Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
1488  			len(cd.top), hex.Dump(cd.thisRange.left), hex.Dump(cd.thisRange.right))
1489  		s.kv.opt.Infof("Next Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
1490  			len(cd.bot), hex.Dump(cd.nextRange.left), hex.Dump(cd.nextRange.right))
1491  	}
1492  	return nil
1493  }
1494  
1495  func tablesToString(tables []*table.Table) []string {
1496  	var res []string
1497  	for _, t := range tables {
1498  		res = append(res, fmt.Sprintf("%05d", t.ID()))
1499  	}
1500  	res = append(res, ".")
1501  	return res
1502  }
1503  
1504  var errFillTables = errors.New("Unable to fill tables")
1505  
1506  // doCompact picks some table on level l and compacts it away to the next level.
1507  func (s *levelsController) doCompact(id int, p compactionPriority) error {
1508  	l := p.level
1509  	y.AssertTrue(l < s.kv.opt.MaxLevels) // Sanity check.
1510  	if p.t.baseLevel == 0 {
1511  		p.t = s.levelTargets()
1512  	}
1513  
1514  	_, span := otel.Tracer("").Start(context.TODO(), "Badger.Compaction")
1515  	defer span.End()
1516  
1517  	cd := compactDef{
1518  		compactorId:  id,
1519  		p:            p,
1520  		t:            p.t,
1521  		thisLevel:    s.levels[l],
1522  		dropPrefixes: p.dropPrefixes,
1523  	}
1524  
1525  	// While picking tables to be compacted, both levels' tables are expected to
1526  	// remain unchanged.
1527  	if l == 0 {
1528  		cd.nextLevel = s.levels[p.t.baseLevel]
1529  		if !s.fillTablesL0(&cd) {
1530  			return errFillTables
1531  		}
1532  	} else {
1533  		cd.nextLevel = cd.thisLevel
1534  		// We're not compacting the last level so pick the next level.
1535  		if !cd.thisLevel.isLastLevel() {
1536  			cd.nextLevel = s.levels[l+1]
1537  		}
1538  		if !s.fillTables(&cd) {
1539  			return errFillTables
1540  		}
1541  	}
1542  	defer s.cstatus.delete(cd) // Remove the ranges from compaction status.
1543  
1544  	span.SetAttributes(attribute.String("Compaction", fmt.Sprintf("%+v", cd)))
1545  	if err := s.runCompactDef(id, l, cd); err != nil {
1546  		// This compaction couldn't be done successfully.
1547  		s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd)
1548  		return err
1549  	}
1550  
1551  	span.SetAttributes(
1552  		attribute.Int("Top tables count", len(cd.top)),
1553  		attribute.Int("Bottom tables count", len(cd.bot)))
1554  
1555  	s.kv.opt.Debugf("[Compactor: %d] Compaction for level: %d DONE", id, cd.thisLevel.level)
1556  	return nil
1557  }
1558  
1559  func (s *levelsController) addLevel0Table(t *table.Table) error {
1560  	// Add table to manifest file only if it is not opened in memory. We don't want to add a table
1561  	// to the manifest file if it exists only in memory.
1562  	if !t.IsInmemory {
1563  		// We update the manifest _before_ the table becomes part of a levelHandler, because at that
1564  		// point it could get used in some compaction.  This ensures the manifest file gets updated in
1565  		// the proper order. (That means this update happens before that of some compaction which
1566  		// deletes the table.)
1567  		err := s.kv.manifest.addChanges([]*pb.ManifestChange{
1568  			newCreateChange(t.ID(), 0, t.KeyID(), t.CompressionType()),
1569  		}, s.kv.opt)
1570  		if err != nil {
1571  			return err
1572  		}
1573  	}
1574  
1575  	for !s.levels[0].tryAddLevel0Table(t) {
1576  		// Before we unstall, we need to make sure that level 0 is healthy.
1577  		timeStart := time.Now()
1578  		for s.levels[0].numTables() >= s.kv.opt.NumLevelZeroTablesStall {
1579  			time.Sleep(10 * time.Millisecond)
1580  		}
1581  		dur := time.Since(timeStart)
1582  		if dur > time.Second {
1583  			s.kv.opt.Infof("L0 was stalled for %s\n", dur.Round(time.Millisecond))
1584  		}
1585  		s.l0stallsMs.Add(int64(dur.Round(time.Millisecond)))
1586  	}
1587  
1588  	return nil
1589  }
1590  
1591  func (s *levelsController) close() error {
1592  	err := s.cleanupLevels()
1593  	return y.Wrap(err, "levelsController.Close")
1594  }
1595  
1596  // get searches for a given key in all the levels of the LSM tree. It returns
1597  // key version <= the expected version (version in key). If not found,
1598  // it returns an empty y.ValueStruct.
1599  func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int) (
1600  	y.ValueStruct, error) {
1601  	if s.kv.IsClosed() {
1602  		return y.ValueStruct{}, ErrDBClosed
1603  	}
1604  	// It's important that we iterate the levels from 0 on upward. The reason is, if we iterated
1605  	// in opposite order, or in parallel (naively calling all the h.RLock() in some order) we could
1606  	// read level L's tables post-compaction and level L+1's tables pre-compaction. (If we do
1607  	// parallelize this, we will need to call the h.RLock() function by increasing order of level
1608  	// number.)
1609  	version := y.ParseTs(key)
1610  	for _, h := range s.levels {
1611  		// Ignore all levels below startLevel. This is useful for GC when L0 is kept in memory.
1612  		if h.level < startLevel {
1613  			continue
1614  		}
1615  		vs, err := h.get(key) // Calls h.RLock() and h.RUnlock().
1616  		if err != nil {
1617  			return y.ValueStruct{}, y.Wrapf(err, "get key: %q", key)
1618  		}
1619  		if vs.Value == nil && vs.Meta == 0 {
1620  			continue
1621  		}
1622  		y.NumBytesReadsLSMAdd(s.kv.opt.MetricsEnabled, int64(len(vs.Value)))
1623  		if vs.Version == version {
1624  			return vs, nil
1625  		}
1626  		if maxVs.Version < vs.Version {
1627  			maxVs = vs
1628  		}
1629  	}
1630  	if len(maxVs.Value) > 0 {
1631  		y.NumGetsWithResultsAdd(s.kv.opt.MetricsEnabled, 1)
1632  	}
1633  	return maxVs, nil
1634  }
1635  
1636  func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator {
1637  	for i := len(th) - 1; i >= 0; i-- {
1638  		// This will increment the reference of the table handler.
1639  		out = append(out, th[i].NewIterator(opt))
1640  	}
1641  	return out
1642  }
1643  
1644  // appendIterators appends iterators to an array of iterators, for merging.
1645  // Note: This obtains references for the table handlers. Remember to close these iterators.
1646  func (s *levelsController) appendIterators(
1647  	iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
1648  	// Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing
1649  	// data when there's a compaction.
1650  	for _, level := range s.levels {
1651  		iters = level.appendIterators(iters, opt)
1652  	}
1653  	return iters
1654  }
1655  
1656  // TableInfo represents the information about a table.
1657  type TableInfo struct {
1658  	ID               uint64
1659  	Level            int
1660  	Left             []byte
1661  	Right            []byte
1662  	KeyCount         uint32 // Number of keys in the table
1663  	OnDiskSize       uint32
1664  	StaleDataSize    uint32
1665  	UncompressedSize uint32
1666  	MaxVersion       uint64
1667  	IndexSz          int
1668  	BloomFilterSize  int
1669  }
1670  
1671  func (s *levelsController) getTableInfo() (result []TableInfo) {
1672  	for _, l := range s.levels {
1673  		l.RLock()
1674  		for _, t := range l.tables {
1675  			info := TableInfo{
1676  				ID:               t.ID(),
1677  				Level:            l.level,
1678  				Left:             t.Smallest(),
1679  				Right:            t.Biggest(),
1680  				KeyCount:         t.KeyCount(),
1681  				OnDiskSize:       t.OnDiskSize(),
1682  				StaleDataSize:    t.StaleDataSize(),
1683  				IndexSz:          t.IndexSize(),
1684  				BloomFilterSize:  t.BloomFilterSize(),
1685  				UncompressedSize: t.UncompressedSize(),
1686  				MaxVersion:       t.MaxVersion(),
1687  			}
1688  			result = append(result, info)
1689  		}
1690  		l.RUnlock()
1691  	}
1692  	sort.Slice(result, func(i, j int) bool {
1693  		if result[i].Level != result[j].Level {
1694  			return result[i].Level < result[j].Level
1695  		}
1696  		return result[i].ID < result[j].ID
1697  	})
1698  	return
1699  }
1700  
1701  type LevelInfo struct {
1702  	Level          int
1703  	NumTables      int
1704  	Size           int64
1705  	TargetSize     int64
1706  	TargetFileSize int64
1707  	IsBaseLevel    bool
1708  	Score          float64
1709  	Adjusted       float64
1710  	StaleDatSize   int64
1711  }
1712  
1713  func (s *levelsController) getLevelInfo() []LevelInfo {
1714  	t := s.levelTargets()
1715  	prios := s.pickCompactLevels(nil)
1716  	result := make([]LevelInfo, len(s.levels))
1717  	for i, l := range s.levels {
1718  		l.RLock()
1719  		result[i].Level = i
1720  		result[i].Size = l.totalSize
1721  		result[i].NumTables = len(l.tables)
1722  		result[i].StaleDatSize = l.totalStaleSize
1723  
1724  		l.RUnlock()
1725  
1726  		result[i].TargetSize = t.targetSz[i]
1727  		result[i].TargetFileSize = t.fileSz[i]
1728  		result[i].IsBaseLevel = t.baseLevel == i
1729  	}
1730  	for _, p := range prios {
1731  		result[p.level].Score = p.score
1732  		result[p.level].Adjusted = p.adjusted
1733  	}
1734  	return result
1735  }
1736  
1737  // verifyChecksum verifies checksum for all tables on all levels.
1738  func (s *levelsController) verifyChecksum() error {
1739  	var tables []*table.Table
1740  	for _, l := range s.levels {
1741  		l.RLock()
1742  		tables = tables[:0]
1743  		for _, t := range l.tables {
1744  			tables = append(tables, t)
1745  			t.IncrRef()
1746  		}
1747  		l.RUnlock()
1748  
1749  		for _, t := range tables {
1750  			errChkVerify := t.VerifyChecksum()
1751  			if err := t.DecrRef(); err != nil {
1752  				s.kv.opt.Errorf("unable to decrease reference of table: %s while "+
1753  					"verifying checksum with error: %s", t.Filename(), err)
1754  			}
1755  
1756  			if errChkVerify != nil {
1757  				return errChkVerify
1758  			}
1759  		}
1760  	}
1761  
1762  	return nil
1763  }
1764  
1765  // Returns the sorted list of splits for all the levels and tables based
1766  // on the block offsets.
1767  func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string {
1768  	splits := make([]string, 0)
1769  	for _, l := range s.levels {
1770  		l.RLock()
1771  		for _, t := range l.tables {
1772  			tableSplits := t.KeySplits(numPerTable, prefix)
1773  			splits = append(splits, tableSplits...)
1774  		}
1775  		l.RUnlock()
1776  	}
1777  	sort.Strings(splits)
1778  	return splits
1779  }
1780