levels.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package badger
7
8 import (
9 "bytes"
10 "context"
11 "encoding/hex"
12 "errors"
13 "fmt"
14 "math"
15 "math/rand"
16 "os"
17 "sort"
18 "strings"
19 "sync"
20 "sync/atomic"
21 "time"
22
23 "go.opentelemetry.io/otel"
24 "go.opentelemetry.io/otel/attribute"
25
26 "github.com/dgraph-io/badger/v4/pb"
27 "github.com/dgraph-io/badger/v4/table"
28 "github.com/dgraph-io/badger/v4/y"
29 "github.com/dgraph-io/ristretto/v2/z"
30 )
31
32 type levelsController struct {
33 nextFileID atomic.Uint64
34 l0stallsMs atomic.Int64
35
36 // The following are initialized once and const.
37 levels []*levelHandler
38 kv *DB
39
40 cstatus compactStatus
41 }
42
43 // revertToManifest checks that all necessary table files exist and removes all table files not
44 // referenced by the manifest. idMap is a set of table file id's that were read from the directory
45 // listing.
46 func revertToManifest(kv *DB, mf *Manifest, idMap map[uint64]struct{}) error {
47 // 1. Check all files in manifest exist.
48 for id := range mf.Tables {
49 if _, ok := idMap[id]; !ok {
50 return fmt.Errorf("file does not exist for table %d", id)
51 }
52 }
53
54 // 2. Delete files that shouldn't exist.
55 for id := range idMap {
56 if _, ok := mf.Tables[id]; !ok {
57 kv.opt.Debugf("Table file %d not referenced in MANIFEST\n", id)
58 filename := table.NewFilename(id, kv.opt.Dir)
59 if err := os.Remove(filename); err != nil {
60 return y.Wrapf(err, "While removing table %d", id)
61 }
62 }
63 }
64
65 return nil
66 }
67
68 func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
69 y.AssertTrue(db.opt.NumLevelZeroTablesStall > db.opt.NumLevelZeroTables)
70 s := &levelsController{
71 kv: db,
72 levels: make([]*levelHandler, db.opt.MaxLevels),
73 }
74 s.cstatus.tables = make(map[uint64]struct{})
75 s.cstatus.levels = make([]*levelCompactStatus, db.opt.MaxLevels)
76
77 for i := 0; i < db.opt.MaxLevels; i++ {
78 s.levels[i] = newLevelHandler(db, i)
79 s.cstatus.levels[i] = new(levelCompactStatus)
80 }
81
82 if db.opt.InMemory {
83 return s, nil
84 }
85 // Compare manifest against directory, check for existent/non-existent files, and remove.
86 if err := revertToManifest(db, mf, getIDMap(db.opt.Dir)); err != nil {
87 return nil, err
88 }
89
90 var mu sync.Mutex
91 tables := make([][]*table.Table, db.opt.MaxLevels)
92 var maxFileID uint64
93
94 // We found that using 3 goroutines allows disk throughput to be utilized to its max.
95 // Disk utilization is the main thing we should focus on, while trying to read the data. That's
96 // the one factor that remains constant between HDD and SSD.
97 throttle := y.NewThrottle(3)
98
99 start := time.Now()
100 var numOpened atomic.Int32
101 tick := time.NewTicker(3 * time.Second)
102 defer tick.Stop()
103
104 for fileID, tf := range mf.Tables {
105 fname := table.NewFilename(fileID, db.opt.Dir)
106 select {
107 case <-tick.C:
108 db.opt.Infof("%d tables out of %d opened in %s\n", numOpened.Load(),
109 len(mf.Tables), time.Since(start).Round(time.Millisecond))
110 default:
111 }
112 if err := throttle.Do(); err != nil {
113 closeAllTables(tables)
114 return nil, err
115 }
116 if fileID > maxFileID {
117 maxFileID = fileID
118 }
119 go func(fname string, tf TableManifest) {
120 var rerr error
121 defer func() {
122 throttle.Done(rerr)
123 numOpened.Add(1)
124 }()
125 dk, err := db.registry.DataKey(tf.KeyID)
126 if err != nil {
127 rerr = y.Wrapf(err, "Error while reading datakey")
128 return
129 }
130 topt := buildTableOptions(db)
131 // Explicitly set Compression and DataKey based on how the table was generated.
132 topt.Compression = tf.Compression
133 topt.DataKey = dk
134
135 mf, err := z.OpenMmapFile(fname, db.opt.getFileFlags(), 0)
136 if err != nil {
137 rerr = y.Wrapf(err, "Opening file: %q", fname)
138 return
139 }
140 t, err := table.OpenTable(mf, topt)
141 if err != nil {
142 if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
143 db.opt.Errorf(err.Error())
144 db.opt.Errorf("Ignoring table %s", mf.Fd.Name())
145 // Do not set rerr. We will continue without this table.
146 } else {
147 rerr = y.Wrapf(err, "Opening table: %q", fname)
148 }
149 return
150 }
151
152 mu.Lock()
153 tables[tf.Level] = append(tables[tf.Level], t)
154 mu.Unlock()
155 }(fname, tf)
156 }
157 if err := throttle.Finish(); err != nil {
158 closeAllTables(tables)
159 return nil, err
160 }
161 db.opt.Infof("All %d tables opened in %s\n", numOpened.Load(),
162 time.Since(start).Round(time.Millisecond))
163 s.nextFileID.Store(maxFileID + 1)
164 for i, tbls := range tables {
165 s.levels[i].initTables(tbls)
166 }
167
168 // Make sure key ranges do not overlap etc.
169 if err := s.validate(); err != nil {
170 _ = s.cleanupLevels()
171 return nil, y.Wrap(err, "Level validation")
172 }
173
174 // Sync directory (because we have at least removed some files, or previously created the
175 // manifest file).
176 if err := syncDir(db.opt.Dir); err != nil {
177 _ = s.close()
178 return nil, err
179 }
180
181 return s, nil
182 }
183
184 // Closes the tables, for cleanup in newLevelsController. (We Close() instead of using DecrRef()
185 // because that would delete the underlying files.) We ignore errors, which is OK because tables
186 // are read-only.
187 func closeAllTables(tables [][]*table.Table) {
188 for _, tableSlice := range tables {
189 for _, table := range tableSlice {
190 _ = table.Close(-1)
191 }
192 }
193 }
194
195 func (s *levelsController) cleanupLevels() error {
196 var firstErr error
197 for _, l := range s.levels {
198 if err := l.close(); err != nil && firstErr == nil {
199 firstErr = err
200 }
201 }
202 return firstErr
203 }
204
205 // dropTree picks all tables from all levels, creates a manifest changeset,
206 // applies it, and then decrements the refs of these tables, which would result
207 // in their deletion.
208 func (s *levelsController) dropTree() (int, error) {
209 // First pick all tables, so we can create a manifest changelog.
210 var all []*table.Table
211 for _, l := range s.levels {
212 l.RLock()
213 all = append(all, l.tables...)
214 l.RUnlock()
215 }
216 if len(all) == 0 {
217 return 0, nil
218 }
219
220 // Generate the manifest changes.
221 changes := []*pb.ManifestChange{}
222 for _, table := range all {
223 // Add a delete change only if the table is not in memory.
224 if !table.IsInmemory {
225 changes = append(changes, newDeleteChange(table.ID()))
226 }
227 }
228 changeSet := pb.ManifestChangeSet{Changes: changes}
229 if err := s.kv.manifest.addChanges(changeSet.Changes, s.kv.opt); err != nil {
230 return 0, err
231 }
232
233 // Now that manifest has been successfully written, we can delete the tables.
234 for _, l := range s.levels {
235 l.Lock()
236 l.totalSize = 0
237 l.tables = l.tables[:0]
238 l.Unlock()
239 }
240 for _, table := range all {
241 if err := table.DecrRef(); err != nil {
242 return 0, err
243 }
244 }
245 return len(all), nil
246 }
247
248 // dropPrefix runs a L0->L1 compaction, and then runs same level compaction on the rest of the
249 // levels. For L0->L1 compaction, it runs compactions normally, but skips over
250 // all the keys with the provided prefix.
251 // For Li->Li compactions, it picks up the tables which would have the prefix. The
252 // tables who only have keys with this prefix are quickly dropped. The ones which have other keys
253 // are run through MergeIterator and compacted to create new tables. All the mechanisms of
254 // compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow.
255 func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
256 opt := s.kv.opt
257 // Iterate levels in the reverse order because if we were to iterate from
258 // lower level (say level 0) to a higher level (say level 3) we could have
259 // a state in which level 0 is compacted and an older version of a key exists in lower level.
260 // At this point, if someone creates an iterator, they would see an old
261 // value for a key from lower levels. Iterating in reverse order ensures we
262 // drop the oldest data first so that lookups never return stale data.
263 for i := len(s.levels) - 1; i >= 0; i-- {
264 l := s.levels[i]
265
266 l.RLock()
267 if l.level == 0 {
268 size := len(l.tables)
269 l.RUnlock()
270
271 if size > 0 {
272 cp := compactionPriority{
273 level: 0,
274 score: 1.74,
275 // A unique number greater than 1.0 does two things. Helps identify this
276 // function in logs, and forces a compaction.
277 dropPrefixes: prefixes,
278 }
279 if err := s.doCompact(174, cp); err != nil {
280 opt.Warningf("While compacting level 0: %v", err)
281 return nil
282 }
283 }
284 continue
285 }
286
287 // Build a list of compaction tableGroups affecting all the prefixes we
288 // need to drop. We need to build tableGroups that satisfy the invariant that
289 // bottom tables are consecutive.
290 // tableGroup contains groups of consecutive tables.
291 var tableGroups [][]*table.Table
292 var tableGroup []*table.Table
293
294 finishGroup := func() {
295 if len(tableGroup) > 0 {
296 tableGroups = append(tableGroups, tableGroup)
297 tableGroup = nil
298 }
299 }
300
301 for _, table := range l.tables {
302 if containsAnyPrefixes(table, prefixes) {
303 tableGroup = append(tableGroup, table)
304 } else {
305 finishGroup()
306 }
307 }
308 finishGroup()
309
310 l.RUnlock()
311
312 if len(tableGroups) == 0 {
313 continue
314 }
315 opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups))
316 for _, operation := range tableGroups {
317 cd := compactDef{
318 thisLevel: l,
319 nextLevel: l,
320 top: nil,
321 bot: operation,
322 dropPrefixes: prefixes,
323 t: s.levelTargets(),
324 }
325 _, span := otel.Tracer("").Start(context.TODO(), "Badger.Compaction")
326 span.SetAttributes(attribute.Int("Compaction level", l.level))
327 span.SetAttributes(attribute.String("Drop Prefixes", fmt.Sprintf("%v", prefixes)))
328 cd.t.baseLevel = l.level
329 if err := s.runCompactDef(-1, l.level, cd); err != nil {
330 opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
331 span.End()
332 return err
333 }
334 span.SetAttributes(
335 attribute.Int("Top tables count", len(cd.top)),
336 attribute.Int("Bottom tables count", len(cd.bot)))
337 span.End()
338 }
339
340 }
341 return nil
342 }
343
344 func (s *levelsController) startCompact(lc *z.Closer) {
345 n := s.kv.opt.NumCompactors
346 lc.AddRunning(n - 1)
347 for i := 0; i < n; i++ {
348 go s.runCompactor(i, lc)
349 }
350 }
351
352 type targets struct {
353 baseLevel int
354 targetSz []int64
355 fileSz []int64
356 }
357
358 // levelTargets calculates the targets for levels in the LSM tree. The idea comes from Dynamic Level
359 // Sizes ( https://rocksdb.org/blog/2015/07/23/dynamic-level.html ) in RocksDB. The sizes of levels
360 // are calculated based on the size of the lowest level, typically L6. So, if L6 size is 1GB, then
361 // L5 target size is 100MB, L4 target size is 10MB and so on.
362 //
363 // L0 files don't automatically go to L1. Instead, they get compacted to Lbase, where Lbase is
364 // chosen based on the first level which is non-empty from top (check L1 through L6). For an empty
365 // DB, that would be L6. So, L0 compactions go to L6, then L5, L4 and so on.
366 //
367 // Lbase is advanced to the upper levels when its target size exceeds BaseLevelSize. For
368 // example, when L6 reaches 1.1GB, then L4 target sizes becomes 11MB, thus exceeding the
369 // BaseLevelSize of 10MB. L3 would then become the new Lbase, with a target size of 1MB <
370 // BaseLevelSize.
371 func (s *levelsController) levelTargets() targets {
372 adjust := func(sz int64) int64 {
373 if sz < s.kv.opt.BaseLevelSize {
374 return s.kv.opt.BaseLevelSize
375 }
376 return sz
377 }
378
379 t := targets{
380 targetSz: make([]int64, len(s.levels)),
381 fileSz: make([]int64, len(s.levels)),
382 }
383 // DB size is the size of the last level.
384 dbSize := s.lastLevel().getTotalSize()
385 for i := len(s.levels) - 1; i > 0; i-- {
386 ltarget := adjust(dbSize)
387 t.targetSz[i] = ltarget
388 if t.baseLevel == 0 && ltarget <= s.kv.opt.BaseLevelSize {
389 t.baseLevel = i
390 }
391 dbSize /= int64(s.kv.opt.LevelSizeMultiplier)
392 }
393
394 tsz := s.kv.opt.BaseTableSize
395 for i := 0; i < len(s.levels); i++ {
396 if i == 0 {
397 // Use MemTableSize for Level 0. Because at Level 0, we stop compactions based on the
398 // number of tables, not the size of the level. So, having a 1:1 size ratio between
399 // memtable size and the size of L0 files is better than churning out 32 files per
400 // memtable (assuming 64MB MemTableSize and 2MB BaseTableSize).
401 t.fileSz[i] = s.kv.opt.MemTableSize
402 } else if i <= t.baseLevel {
403 t.fileSz[i] = tsz
404 } else {
405 tsz *= int64(s.kv.opt.TableSizeMultiplier)
406 t.fileSz[i] = tsz
407 }
408 }
409
410 // Bring the base level down to the last empty level.
411 for i := t.baseLevel + 1; i < len(s.levels)-1; i++ {
412 if s.levels[i].getTotalSize() > 0 {
413 break
414 }
415 t.baseLevel = i
416 }
417
418 // If the base level is empty and the next level size is less than the
419 // target size, pick the next level as the base level.
420 b := t.baseLevel
421 lvl := s.levels
422 if b < len(lvl)-1 && lvl[b].getTotalSize() == 0 && lvl[b+1].getTotalSize() < t.targetSz[b+1] {
423 t.baseLevel++
424 }
425 return t
426 }
427
428 func (s *levelsController) runCompactor(id int, lc *z.Closer) {
429 defer lc.Done()
430
431 randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
432 select {
433 case <-randomDelay.C:
434 case <-lc.HasBeenClosed():
435 randomDelay.Stop()
436 return
437 }
438
439 moveL0toFront := func(prios []compactionPriority) []compactionPriority {
440 idx := -1
441 for i, p := range prios {
442 if p.level == 0 {
443 idx = i
444 break
445 }
446 }
447 // If idx == -1, we didn't find L0.
448 // If idx == 0, then we don't need to do anything. L0 is already at the front.
449 if idx > 0 {
450 out := append([]compactionPriority{}, prios[idx])
451 out = append(out, prios[:idx]...)
452 out = append(out, prios[idx+1:]...)
453 return out
454 }
455 return prios
456 }
457
458 run := func(p compactionPriority) bool {
459 err := s.doCompact(id, p)
460 switch err {
461 case nil:
462 return true
463 case errFillTables:
464 // pass
465 default:
466 s.kv.opt.Warningf("While running doCompact: %v\n", err)
467 }
468 return false
469 }
470
471 var priosBuffer []compactionPriority
472 runOnce := func() bool {
473 prios := s.pickCompactLevels(priosBuffer)
474 defer func() {
475 priosBuffer = prios
476 }()
477 if id == 0 {
478 // Worker ID zero prefers to compact L0 always.
479 prios = moveL0toFront(prios)
480 }
481 for _, p := range prios {
482 if id == 0 && p.level == 0 {
483 // Allow worker zero to run level 0, irrespective of its adjusted score.
484 } else if p.adjusted < 1.0 {
485 break
486 }
487 if run(p) {
488 return true
489 }
490 }
491
492 return false
493 }
494
495 tryLmaxToLmaxCompaction := func() {
496 p := compactionPriority{
497 level: s.lastLevel().level,
498 t: s.levelTargets(),
499 }
500 run(p)
501
502 }
503 count := 0
504 ticker := time.NewTicker(50 * time.Millisecond)
505 defer ticker.Stop()
506 for {
507 select {
508 // Can add a done channel or other stuff.
509 case <-ticker.C:
510 count++
511 // Each ticker is 50ms so 50*200=10seconds.
512 if s.kv.opt.LmaxCompaction && id == 2 && count >= 200 {
513 tryLmaxToLmaxCompaction()
514 count = 0
515 } else {
516 runOnce()
517 }
518 case <-lc.HasBeenClosed():
519 return
520 }
521 }
522 }
523
524 type compactionPriority struct {
525 level int
526 score float64
527 adjusted float64
528 dropPrefixes [][]byte
529 t targets
530 }
531
532 func (s *levelsController) lastLevel() *levelHandler {
533 return s.levels[len(s.levels)-1]
534 }
535
536 // pickCompactLevel determines which level to compact.
537 // Based on: https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
538 // It tries to reuse priosBuffer to reduce memory allocation,
539 // passing nil is acceptable, then new memory will be allocated.
540 func (s *levelsController) pickCompactLevels(priosBuffer []compactionPriority) (prios []compactionPriority) {
541 t := s.levelTargets()
542 addPriority := func(level int, score float64) {
543 pri := compactionPriority{
544 level: level,
545 score: score,
546 adjusted: score,
547 t: t,
548 }
549 prios = append(prios, pri)
550 }
551
552 // Grow buffer to fit all levels.
553 if cap(priosBuffer) < len(s.levels) {
554 priosBuffer = make([]compactionPriority, 0, len(s.levels))
555 }
556 prios = priosBuffer[:0]
557
558 // Add L0 priority based on the number of tables.
559 addPriority(0, float64(s.levels[0].numTables())/float64(s.kv.opt.NumLevelZeroTables))
560
561 // All other levels use size to calculate priority.
562 for i := 1; i < len(s.levels); i++ {
563 // Don't consider those tables that are already being compacted right now.
564 delSize := s.cstatus.delSize(i)
565
566 l := s.levels[i]
567 sz := l.getTotalSize() - delSize
568 addPriority(i, float64(sz)/float64(t.targetSz[i]))
569 }
570 y.AssertTrue(len(prios) == len(s.levels))
571
572 // The following code is borrowed from PebbleDB and results in healthier LSM tree structure.
573 // If Li-1 has score > 1.0, then we'll divide Li-1 score by Li. If Li score is >= 1.0, then Li-1
574 // score is reduced, which means we'll prioritize the compaction of lower levels (L5, L4 and so
575 // on) over the higher levels (L0, L1 and so on). On the other hand, if Li score is < 1.0, then
576 // we'll increase the priority of Li-1.
577 // Overall what this means is, if the bottom level is already overflowing, then de-prioritize
578 // compaction of the above level. If the bottom level is not full, then increase the priority of
579 // above level.
580 var prevLevel int
581 for level := t.baseLevel; level < len(s.levels); level++ {
582 if prios[prevLevel].adjusted >= 1 {
583 // Avoid absurdly large scores by placing a floor on the score that we'll
584 // adjust a level by. The value of 0.01 was chosen somewhat arbitrarily
585 const minScore = 0.01
586 if prios[level].score >= minScore {
587 prios[prevLevel].adjusted /= prios[level].adjusted
588 } else {
589 prios[prevLevel].adjusted /= minScore
590 }
591 }
592 prevLevel = level
593 }
594
595 // Pick all the levels whose original score is >= 1.0, irrespective of their adjusted score.
596 // We'll still sort them by their adjusted score below. Having both these scores allows us to
597 // make better decisions about compacting L0. If we see a score >= 1.0, we can do L0->L0
598 // compactions. If the adjusted score >= 1.0, then we can do L0->Lbase compactions.
599 out := prios[:0]
600 for _, p := range prios[:len(prios)-1] {
601 if p.score >= 1.0 {
602 out = append(out, p)
603 }
604 }
605 prios = out
606
607 // Sort by the adjusted score.
608 sort.Slice(prios, func(i, j int) bool {
609 return prios[i].adjusted > prios[j].adjusted
610 })
611 return prios
612 }
613
614 // checkOverlap checks if the given tables overlap with any level from the given "lev" onwards.
615 func (s *levelsController) checkOverlap(tables []*table.Table, lev int) bool {
616 kr := getKeyRange(tables...)
617 for i, lh := range s.levels {
618 if i < lev { // Skip upper levels.
619 continue
620 }
621 lh.RLock()
622 left, right := lh.overlappingTables(levelHandlerRLocked{}, kr)
623 lh.RUnlock()
624 if right-left > 0 {
625 return true
626 }
627 }
628 return false
629 }
630
631 // subcompact runs a single sub-compaction, iterating over the specified key-range only.
632 //
633 // We use splits to do a single compaction concurrently. If we have >= 3 tables
634 // involved in the bottom level during compaction, we choose key ranges to
635 // split the main compaction up into sub-compactions. Each sub-compaction runs
636 // concurrently, only iterating over the provided key range, generating tables.
637 // This speeds up the compaction significantly.
638 func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
639 inflightBuilders *y.Throttle, res chan<- *table.Table) {
640
641 // Check overlap of the top level with the levels which are not being
642 // compacted in this compaction.
643 hasOverlap := s.checkOverlap(cd.allTables(), cd.nextLevel.level+1)
644
645 // Pick a discard ts, so we can discard versions below this ts. We should
646 // never discard any versions starting from above this timestamp, because
647 // that would affect the snapshot view guarantee provided by transactions.
648 discardTs := s.kv.orc.discardAtOrBelow()
649
650 // Try to collect stats so that we can inform value log about GC. That would help us find which
651 // value log file should be GCed.
652 discardStats := make(map[uint32]int64)
653 updateStats := func(vs y.ValueStruct) {
654 // We don't need to store/update discard stats when badger is running in Disk-less mode.
655 if s.kv.opt.InMemory {
656 return
657 }
658 if vs.Meta&bitValuePointer > 0 {
659 var vp valuePointer
660 vp.Decode(vs.Value)
661 discardStats[vp.Fid] += int64(vp.Len)
662 }
663 }
664
665 // exceedsAllowedOverlap returns true if the given key range would overlap with more than 10
666 // tables from level below nextLevel (nextLevel+1). This helps avoid generating tables at Li
667 // with huge overlaps with Li+1.
668 exceedsAllowedOverlap := func(kr keyRange) bool {
669 n2n := cd.nextLevel.level + 1
670 if n2n <= 1 || n2n >= len(s.levels) {
671 return false
672 }
673 n2nl := s.levels[n2n]
674 n2nl.RLock()
675 defer n2nl.RUnlock()
676
677 l, r := n2nl.overlappingTables(levelHandlerRLocked{}, kr)
678 return r-l >= 10
679 }
680
681 var (
682 lastKey, skipKey []byte
683 numBuilds, numVersions int
684 // Denotes if the first key is a series of duplicate keys had
685 // "DiscardEarlierVersions" set
686 firstKeyHasDiscardSet bool
687 )
688
689 addKeys := func(builder *table.Builder) {
690 timeStart := time.Now()
691 var numKeys, numSkips uint64
692 var rangeCheck int
693 var tableKr keyRange
694 for ; it.Valid(); it.Next() {
695 // See if we need to skip the prefix.
696 if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) {
697 numSkips++
698 updateStats(it.Value())
699 continue
700 }
701
702 // See if we need to skip this key.
703 if len(skipKey) > 0 {
704 if y.SameKey(it.Key(), skipKey) {
705 numSkips++
706 updateStats(it.Value())
707 continue
708 } else {
709 skipKey = skipKey[:0]
710 }
711 }
712
713 if !y.SameKey(it.Key(), lastKey) {
714 firstKeyHasDiscardSet = false
715 if len(kr.right) > 0 && y.CompareKeys(it.Key(), kr.right) >= 0 {
716 break
717 }
718 if builder.ReachedCapacity() {
719 // Only break if we are on a different key, and have reached capacity. We want
720 // to ensure that all versions of the key are stored in the same sstable, and
721 // not divided across multiple tables at the same level.
722 break
723 }
724 lastKey = y.SafeCopy(lastKey, it.Key())
725 numVersions = 0
726 firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0
727
728 if len(tableKr.left) == 0 {
729 tableKr.left = y.SafeCopy(tableKr.left, it.Key())
730 }
731 tableKr.right = lastKey
732
733 rangeCheck++
734 if rangeCheck%5000 == 0 {
735 // This table's range exceeds the allowed range overlap with the level after
736 // next. So, we stop writing to this table. If we don't do this, then we end up
737 // doing very expensive compactions involving too many tables. To amortize the
738 // cost of this check, we do it only every N keys.
739 if exceedsAllowedOverlap(tableKr) {
740 // s.kv.opt.Debugf("L%d -> L%d Breaking due to exceedsAllowedOverlap with
741 // kr: %s\n", cd.thisLevel.level, cd.nextLevel.level, tableKr)
742 break
743 }
744 }
745 }
746
747 vs := it.Value()
748 version := y.ParseTs(it.Key())
749
750 isExpired := isDeletedOrExpired(vs.Meta, vs.ExpiresAt)
751
752 // Do not discard entries inserted by merge operator. These entries will be
753 // discarded once they're merged
754 if version <= discardTs && vs.Meta&bitMergeEntry == 0 {
755 // Keep track of the number of versions encountered for this key. Only consider the
756 // versions which are below the minReadTs, otherwise, we might end up discarding the
757 // only valid version for a running transaction.
758 numVersions++
759 // Keep the current version and discard all the next versions if
760 // - The `discardEarlierVersions` bit is set OR
761 // - We've already processed `NumVersionsToKeep` number of versions
762 // (including the current item being processed)
763 lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 ||
764 numVersions == s.kv.opt.NumVersionsToKeep
765
766 if isExpired || lastValidVersion {
767 // If this version of the key is deleted or expired, skip all the rest of the
768 // versions. Ensure that we're only removing versions below readTs.
769 skipKey = y.SafeCopy(skipKey, it.Key())
770
771 switch {
772 // Add the key to the table only if it has not expired.
773 // We don't want to add the deleted/expired keys.
774 case !isExpired && lastValidVersion:
775 // Add this key. We have set skipKey, so the following key versions
776 // would be skipped.
777 case hasOverlap:
778 // If this key range has overlap with lower levels, then keep the deletion
779 // marker with the latest version, discarding the rest. We have set skipKey,
780 // so the following key versions would be skipped.
781 default:
782 // If no overlap, we can skip all the versions, by continuing here.
783 numSkips++
784 updateStats(vs)
785 continue // Skip adding this key.
786 }
787 }
788 }
789 numKeys++
790 var vp valuePointer
791 if vs.Meta&bitValuePointer > 0 {
792 vp.Decode(vs.Value)
793 }
794 switch {
795 case firstKeyHasDiscardSet:
796 // This key is same as the last key which had "DiscardEarlierVersions" set. The
797 // the next compactions will drop this key if its ts >
798 // discardTs (of the next compaction).
799 builder.AddStaleKey(it.Key(), vs, vp.Len)
800 case isExpired:
801 // If the key is expired, the next compaction will drop it if
802 // its ts > discardTs (of the next compaction).
803 builder.AddStaleKey(it.Key(), vs, vp.Len)
804 default:
805 builder.Add(it.Key(), vs, vp.Len)
806 }
807 }
808 s.kv.opt.Debugf("[%d] LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
809 cd.compactorId, numKeys, numSkips, time.Since(timeStart).Round(time.Millisecond))
810 } // End of function: addKeys
811
812 if len(kr.left) > 0 {
813 it.Seek(kr.left)
814 } else {
815 it.Rewind()
816 }
817 for it.Valid() {
818 if len(kr.right) > 0 && y.CompareKeys(it.Key(), kr.right) >= 0 {
819 break
820 }
821
822 bopts := buildTableOptions(s.kv)
823 // Set TableSize to the target file size for that level.
824 bopts.TableSize = uint64(cd.t.fileSz[cd.nextLevel.level])
825 builder := table.NewTableBuilder(bopts)
826
827 // This would do the iteration and add keys to builder.
828 addKeys(builder)
829
830 // It was true that it.Valid() at least once in the loop above, which means we
831 // called Add() at least once, and builder is not Empty().
832 if builder.Empty() {
833 // Cleanup builder resources:
834 builder.Finish()
835 builder.Close()
836 continue
837 }
838 numBuilds++
839 if err := inflightBuilders.Do(); err != nil {
840 // Can't return from here, until I decrRef all the tables that I built so far.
841 break
842 }
843 go func(builder *table.Builder, fileID uint64) {
844 var err error
845 defer inflightBuilders.Done(err)
846 defer builder.Close()
847
848 var tbl *table.Table
849 if s.kv.opt.InMemory {
850 tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
851 } else {
852 fname := table.NewFilename(fileID, s.kv.opt.Dir)
853 tbl, err = table.CreateTable(fname, builder)
854 }
855
856 // If we couldn't build the table, return fast.
857 if err != nil {
858 return
859 }
860 res <- tbl
861 }(builder, s.reserveFileID())
862 }
863 s.kv.vlog.updateDiscardStats(discardStats)
864 s.kv.opt.Debugf("Discard stats: %v", discardStats)
865 }
866
867 // compactBuildTables merges topTables and botTables to form a list of new tables.
868 func (s *levelsController) compactBuildTables(
869 lev int, cd compactDef) ([]*table.Table, func() error, error) {
870
871 topTables := cd.top
872 botTables := cd.bot
873
874 numTables := int64(len(topTables) + len(botTables))
875 y.NumCompactionTablesAdd(s.kv.opt.MetricsEnabled, numTables)
876 defer y.NumCompactionTablesAdd(s.kv.opt.MetricsEnabled, -numTables)
877
878 keepTable := func(t *table.Table) bool {
879 for _, prefix := range cd.dropPrefixes {
880 if bytes.HasPrefix(t.Smallest(), prefix) &&
881 bytes.HasPrefix(t.Biggest(), prefix) {
882 // All the keys in this table have the dropPrefix. So, this
883 // table does not need to be in the iterator and can be
884 // dropped immediately.
885 return false
886 }
887 }
888 return true
889 }
890 var valid []*table.Table
891 for _, table := range botTables {
892 if keepTable(table) {
893 valid = append(valid, table)
894 }
895 }
896
897 newIterator := func() []y.Iterator {
898 // Create iterators across all the tables involved first.
899 var iters []y.Iterator
900 switch {
901 case lev == 0:
902 iters = appendIteratorsReversed(iters, topTables, table.NOCACHE)
903 case len(topTables) > 0:
904 y.AssertTrue(len(topTables) == 1)
905 iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)}
906 }
907 // Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
908 return append(iters, table.NewConcatIterator(valid, table.NOCACHE))
909 }
910
911 res := make(chan *table.Table, 3)
912 inflightBuilders := y.NewThrottle(8 + len(cd.splits))
913 for _, kr := range cd.splits {
914 // Initiate Do here so we can register the goroutines for buildTables too.
915 if err := inflightBuilders.Do(); err != nil {
916 s.kv.opt.Errorf("cannot start subcompaction: %+v", err)
917 return nil, nil, err
918 }
919 go func(kr keyRange) {
920 defer inflightBuilders.Done(nil)
921 it := table.NewMergeIterator(newIterator(), false)
922 defer it.Close()
923 s.subcompact(it, kr, cd, inflightBuilders, res)
924 }(kr)
925 }
926
927 var newTables []*table.Table
928 var wg sync.WaitGroup
929 wg.Add(1)
930 go func() {
931 defer wg.Done()
932 for t := range res {
933 newTables = append(newTables, t)
934 }
935 }()
936
937 // Wait for all table builders to finish and also for newTables accumulator to finish.
938 err := inflightBuilders.Finish()
939 close(res)
940 wg.Wait() // Wait for all tables to be picked up.
941
942 if err == nil {
943 // Ensure created files' directory entries are visible. We don't mind the extra latency
944 // from not doing this ASAP after all file creation has finished because this is a
945 // background operation.
946 err = s.kv.syncDir(s.kv.opt.Dir)
947 }
948
949 if err != nil {
950 // An error happened. Delete all the newly created table files (by calling DecrRef
951 // -- we're the only holders of a ref).
952 _ = decrRefs(newTables)
953 return nil, nil, y.Wrapf(err, "while running compactions for: %+v", cd)
954 }
955
956 sort.Slice(newTables, func(i, j int) bool {
957 return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0
958 })
959 return newTables, func() error { return decrRefs(newTables) }, nil
960 }
961
962 func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeSet {
963 changes := []*pb.ManifestChange{}
964 for _, table := range newTables {
965 changes = append(changes,
966 newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID(), table.CompressionType()))
967 }
968 for _, table := range cd.top {
969 // Add a delete change only if the table is not in memory.
970 if !table.IsInmemory {
971 changes = append(changes, newDeleteChange(table.ID()))
972 }
973 }
974 for _, table := range cd.bot {
975 changes = append(changes, newDeleteChange(table.ID()))
976 }
977 return pb.ManifestChangeSet{Changes: changes}
978 }
979
980 func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool {
981 for _, prefix := range listOfPrefixes {
982 if bytes.HasPrefix(s, prefix) {
983 return true
984 }
985 }
986
987 return false
988 }
989
990 func containsPrefix(table *table.Table, prefix []byte) bool {
991 smallValue := table.Smallest()
992 largeValue := table.Biggest()
993 if bytes.HasPrefix(smallValue, prefix) {
994 return true
995 }
996 if bytes.HasPrefix(largeValue, prefix) {
997 return true
998 }
999 isPresent := func() bool {
1000 ti := table.NewIterator(0)
1001 defer ti.Close()
1002 // In table iterator's Seek, we assume that key has version in last 8 bytes. We set
1003 // version=0 (ts=math.MaxUint64), so that we don't skip the key prefixed with prefix.
1004 ti.Seek(y.KeyWithTs(prefix, math.MaxUint64))
1005 return bytes.HasPrefix(ti.Key(), prefix)
1006 }
1007
1008 if bytes.Compare(prefix, smallValue) > 0 &&
1009 bytes.Compare(prefix, largeValue) < 0 {
1010 // There may be a case when table contains [0x0000,...., 0xffff]. If we are searching for
1011 // k=0x0011, we should not directly infer that k is present. It may not be present.
1012 return isPresent()
1013 }
1014
1015 return false
1016 }
1017
1018 func containsAnyPrefixes(table *table.Table, listOfPrefixes [][]byte) bool {
1019 for _, prefix := range listOfPrefixes {
1020 if containsPrefix(table, prefix) {
1021 return true
1022 }
1023 }
1024
1025 return false
1026 }
1027
1028 type compactDef struct {
1029 compactorId int
1030 t targets
1031 p compactionPriority
1032 thisLevel *levelHandler
1033 nextLevel *levelHandler
1034
1035 top []*table.Table
1036 bot []*table.Table
1037
1038 thisRange keyRange
1039 nextRange keyRange
1040 splits []keyRange
1041
1042 thisSize int64
1043
1044 dropPrefixes [][]byte
1045 }
1046
1047 // addSplits can allow us to run multiple sub-compactions in parallel across the split key ranges.
1048 func (s *levelsController) addSplits(cd *compactDef) {
1049 cd.splits = cd.splits[:0]
1050
1051 // Let's say we have 10 tables in cd.bot and min width = 3. Then, we'll pick
1052 // 0, 1, 2 (pick), 3, 4, 5 (pick), 6, 7, 8 (pick), 9 (pick, because last table).
1053 // This gives us 4 picks for 10 tables.
1054 // In an edge case, 142 tables in bottom led to 48 splits. That's too many splits, because it
1055 // then uses up a lot of memory for table builder.
1056 // We should keep it so we have at max 5 splits.
1057 width := int(math.Ceil(float64(len(cd.bot)) / 5.0))
1058 if width < 3 {
1059 width = 3
1060 }
1061 skr := cd.thisRange
1062 skr.extend(cd.nextRange)
1063
1064 addRange := func(right []byte) {
1065 skr.right = y.Copy(right)
1066 cd.splits = append(cd.splits, skr)
1067
1068 skr.left = skr.right
1069 }
1070
1071 for i, t := range cd.bot {
1072 // last entry in bottom table.
1073 if i == len(cd.bot)-1 {
1074 addRange([]byte{})
1075 return
1076 }
1077 if i%width == width-1 {
1078 // Right is assigned ts=0. The encoding ts bytes take MaxUint64-ts,
1079 // so, those with smaller TS will be considered larger for the same key.
1080 // Consider the following.
1081 // Top table is [A1...C3(deleted)]
1082 // bot table is [B1....C2]
1083 // It will generate a split [A1 ... C0], including any records of Key C.
1084 right := y.KeyWithTs(y.ParseKey(t.Biggest()), 0)
1085 addRange(right)
1086 }
1087 }
1088 }
1089
1090 func (cd *compactDef) lockLevels() {
1091 cd.thisLevel.RLock()
1092 cd.nextLevel.RLock()
1093 }
1094
1095 func (cd *compactDef) unlockLevels() {
1096 cd.nextLevel.RUnlock()
1097 cd.thisLevel.RUnlock()
1098 }
1099
1100 func (cd *compactDef) allTables() []*table.Table {
1101 ret := make([]*table.Table, 0, len(cd.top)+len(cd.bot))
1102 ret = append(ret, cd.top...)
1103 ret = append(ret, cd.bot...)
1104 return ret
1105 }
1106
1107 func (s *levelsController) fillTablesL0ToL0(cd *compactDef) bool {
1108 if cd.compactorId != 0 {
1109 // Only compactor zero can work on this.
1110 return false
1111 }
1112
1113 cd.nextLevel = s.levels[0]
1114 cd.nextRange = keyRange{}
1115 cd.bot = nil
1116
1117 // Because this level and next level are both level 0, we should NOT acquire
1118 // the read lock twice, because it can result in a deadlock. So, we don't
1119 // call compactDef.lockLevels, instead locking the level only once and
1120 // directly here.
1121 //
1122 // As per godocs on RWMutex:
1123 // If a goroutine holds a RWMutex for reading and another goroutine might
1124 // call Lock, no goroutine should expect to be able to acquire a read lock
1125 // until the initial read lock is released. In particular, this prohibits
1126 // recursive read locking. This is to ensure that the lock eventually
1127 // becomes available; a blocked Lock call excludes new readers from
1128 // acquiring the lock.
1129 y.AssertTrue(cd.thisLevel.level == 0)
1130 y.AssertTrue(cd.nextLevel.level == 0)
1131 s.levels[0].RLock()
1132 defer s.levels[0].RUnlock()
1133
1134 s.cstatus.Lock()
1135 defer s.cstatus.Unlock()
1136
1137 top := cd.thisLevel.tables
1138 var out []*table.Table
1139 now := time.Now()
1140 for _, t := range top {
1141 if t.Size() >= 2*cd.t.fileSz[0] {
1142 // This file is already big, don't include it.
1143 continue
1144 }
1145 if now.Sub(t.CreatedAt) < 10*time.Second {
1146 // Just created it 10s ago. Don't pick for compaction.
1147 continue
1148 }
1149 if _, beingCompacted := s.cstatus.tables[t.ID()]; beingCompacted {
1150 continue
1151 }
1152 out = append(out, t)
1153 }
1154
1155 if len(out) < 4 {
1156 // If we don't have enough tables to merge in L0, don't do it.
1157 return false
1158 }
1159 cd.thisRange = infRange
1160 cd.top = out
1161
1162 // Avoid any other L0 -> Lbase from happening, while this is going on.
1163 thisLevel := s.cstatus.levels[cd.thisLevel.level]
1164 thisLevel.ranges = append(thisLevel.ranges, infRange)
1165 for _, t := range out {
1166 s.cstatus.tables[t.ID()] = struct{}{}
1167 }
1168
1169 // For L0->L0 compaction, we set the target file size to max, so the output is always one file.
1170 // This significantly decreases the L0 table stalls and improves the performance.
1171 cd.t.fileSz[0] = math.MaxUint32
1172 return true
1173 }
1174
1175 func (s *levelsController) fillTablesL0ToLbase(cd *compactDef) bool {
1176 if cd.nextLevel.level == 0 {
1177 panic("Base level can't be zero.")
1178 }
1179 // We keep cd.p.adjusted > 0.0 here to allow functions in db.go to artificially trigger
1180 // L0->Lbase compactions. Those functions wouldn't be setting the adjusted score.
1181 if cd.p.adjusted > 0.0 && cd.p.adjusted < 1.0 {
1182 // Do not compact to Lbase if adjusted score is less than 1.0.
1183 return false
1184 }
1185 cd.lockLevels()
1186 defer cd.unlockLevels()
1187
1188 top := cd.thisLevel.tables
1189 if len(top) == 0 {
1190 return false
1191 }
1192
1193 var out []*table.Table
1194 if len(cd.dropPrefixes) > 0 {
1195 // Use all tables if drop prefix is set. We don't want to compact only a
1196 // sub-range. We want to compact all the tables.
1197 out = top
1198
1199 } else {
1200 var kr keyRange
1201 // cd.top[0] is the oldest file. So we start from the oldest file first.
1202 for _, t := range top {
1203 dkr := getKeyRange(t)
1204 if kr.overlapsWith(dkr) {
1205 out = append(out, t)
1206 kr.extend(dkr)
1207 } else {
1208 break
1209 }
1210 }
1211 }
1212 cd.thisRange = getKeyRange(out...)
1213 cd.top = out
1214
1215 left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange)
1216 cd.bot = make([]*table.Table, right-left)
1217 copy(cd.bot, cd.nextLevel.tables[left:right])
1218
1219 if len(cd.bot) == 0 {
1220 cd.nextRange = cd.thisRange
1221 } else {
1222 cd.nextRange = getKeyRange(cd.bot...)
1223 }
1224 return s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd)
1225 }
1226
1227 // fillTablesL0 would try to fill tables from L0 to be compacted with Lbase. If
1228 // it can not do that, it would try to compact tables from L0 -> L0.
1229 //
1230 // Say L0 has 10 tables.
1231 // fillTablesL0ToLbase picks up 5 tables to compact from L0 -> L5.
1232 // Next call to fillTablesL0 would run L0ToLbase again, which fails this time.
1233 // So, instead, we run fillTablesL0ToL0, which picks up rest of the 5 tables to
1234 // be compacted within L0. Additionally, it would set the compaction range in
1235 // cstatus to inf, so no other L0 -> Lbase compactions can happen.
1236 // Thus, L0 -> L0 must finish for the next L0 -> Lbase to begin.
1237 func (s *levelsController) fillTablesL0(cd *compactDef) bool {
1238 if ok := s.fillTablesL0ToLbase(cd); ok {
1239 return true
1240 }
1241 return s.fillTablesL0ToL0(cd)
1242 }
1243
1244 // sortByStaleData sorts tables based on the amount of stale data they have.
1245 // This is useful in removing tombstones.
1246 func (s *levelsController) sortByStaleDataSize(tables []*table.Table, cd *compactDef) {
1247 if len(tables) == 0 || cd.nextLevel == nil {
1248 return
1249 }
1250
1251 sort.Slice(tables, func(i, j int) bool {
1252 return tables[i].StaleDataSize() > tables[j].StaleDataSize()
1253 })
1254 }
1255
1256 // sortByHeuristic sorts tables in increasing order of MaxVersion, so we
1257 // compact older tables first.
1258 func (s *levelsController) sortByHeuristic(tables []*table.Table, cd *compactDef) {
1259 if len(tables) == 0 || cd.nextLevel == nil {
1260 return
1261 }
1262
1263 // Sort tables by max version. This is what RocksDB does.
1264 sort.Slice(tables, func(i, j int) bool {
1265 return tables[i].MaxVersion() < tables[j].MaxVersion()
1266 })
1267 }
1268
1269 // This function should be called with lock on levels.
1270 func (s *levelsController) fillMaxLevelTables(tables []*table.Table, cd *compactDef) bool {
1271 sortedTables := make([]*table.Table, len(tables))
1272 copy(sortedTables, tables)
1273 s.sortByStaleDataSize(sortedTables, cd)
1274
1275 if len(sortedTables) > 0 && sortedTables[0].StaleDataSize() == 0 {
1276 // This is a maxLevel to maxLevel compaction and we don't have any stale data.
1277 return false
1278 }
1279 cd.bot = []*table.Table{}
1280 collectBotTables := func(t *table.Table, needSz int64) {
1281 totalSize := t.Size()
1282
1283 j := sort.Search(len(tables), func(i int) bool {
1284 return y.CompareKeys(tables[i].Smallest(), t.Smallest()) >= 0
1285 })
1286 y.AssertTrue(tables[j].ID() == t.ID())
1287 j++
1288 // Collect tables until we reach the the required size.
1289 for j < len(tables) {
1290 newT := tables[j]
1291 totalSize += newT.Size()
1292
1293 if totalSize >= needSz {
1294 break
1295 }
1296 cd.bot = append(cd.bot, newT)
1297 cd.nextRange.extend(getKeyRange(newT))
1298 j++
1299 }
1300 }
1301 now := time.Now()
1302 for _, t := range sortedTables {
1303 // If the maxVersion is above the discardTs, we won't clean anything in
1304 // the compaction. So skip this table.
1305 if t.MaxVersion() > s.kv.orc.discardAtOrBelow() {
1306 continue
1307 }
1308 if now.Sub(t.CreatedAt) < time.Hour {
1309 // Just created it an hour ago. Don't pick for compaction.
1310 continue
1311 }
1312 // If the stale data size is less than 10 MB, it might not be worth
1313 // rewriting the table. Skip it.
1314 if t.StaleDataSize() < 10<<20 {
1315 continue
1316 }
1317
1318 cd.thisSize = t.Size()
1319 cd.thisRange = getKeyRange(t)
1320 // Set the next range as the same as the current range. If we don't do
1321 // this, we won't be able to run more than one max level compactions.
1322 cd.nextRange = cd.thisRange
1323 // If we're already compacting this range, don't do anything.
1324 if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
1325 continue
1326 }
1327
1328 // Found a valid table!
1329 cd.top = []*table.Table{t}
1330
1331 needFileSz := cd.t.fileSz[cd.thisLevel.level]
1332 // The table size is what we want so no need to collect more tables.
1333 if t.Size() >= needFileSz {
1334 break
1335 }
1336 // TableSize is less than what we want. Collect more tables for compaction.
1337 // If the level has multiple small tables, we collect all of them
1338 // together to form a bigger table.
1339 collectBotTables(t, needFileSz)
1340 if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
1341 cd.bot = cd.bot[:0]
1342 cd.nextRange = keyRange{}
1343 continue
1344 }
1345 return true
1346 }
1347 if len(cd.top) == 0 {
1348 return false
1349 }
1350
1351 return s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd)
1352 }
1353
1354 func (s *levelsController) fillTables(cd *compactDef) bool {
1355 cd.lockLevels()
1356 defer cd.unlockLevels()
1357
1358 tables := make([]*table.Table, len(cd.thisLevel.tables))
1359 copy(tables, cd.thisLevel.tables)
1360 if len(tables) == 0 {
1361 return false
1362 }
1363 // We're doing a maxLevel to maxLevel compaction. Pick tables based on the stale data size.
1364 if cd.thisLevel.isLastLevel() {
1365 return s.fillMaxLevelTables(tables, cd)
1366 }
1367 // We pick tables, so we compact older tables first. This is similar to
1368 // kOldestLargestSeqFirst in RocksDB.
1369 s.sortByHeuristic(tables, cd)
1370
1371 for _, t := range tables {
1372 cd.thisSize = t.Size()
1373 cd.thisRange = getKeyRange(t)
1374 // If we're already compacting this range, don't do anything.
1375 if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
1376 continue
1377 }
1378 cd.top = []*table.Table{t}
1379 left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange)
1380
1381 cd.bot = make([]*table.Table, right-left)
1382 copy(cd.bot, cd.nextLevel.tables[left:right])
1383
1384 if len(cd.bot) == 0 {
1385 cd.bot = []*table.Table{}
1386 cd.nextRange = cd.thisRange
1387 if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
1388 continue
1389 }
1390 return true
1391 }
1392 cd.nextRange = getKeyRange(cd.bot...)
1393
1394 if s.cstatus.overlapsWith(cd.nextLevel.level, cd.nextRange) {
1395 continue
1396 }
1397 if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
1398 continue
1399 }
1400 return true
1401 }
1402 return false
1403 }
1404
1405 func (s *levelsController) runCompactDef(id, l int, cd compactDef) (err error) {
1406 if len(cd.t.fileSz) == 0 {
1407 return errors.New("Filesizes cannot be zero. Targets are not set")
1408 }
1409 timeStart := time.Now()
1410
1411 thisLevel := cd.thisLevel
1412 nextLevel := cd.nextLevel
1413
1414 y.AssertTrue(len(cd.splits) == 0)
1415 if thisLevel.level == nextLevel.level {
1416 // don't do anything for L0 -> L0 and Lmax -> Lmax.
1417 } else {
1418 s.addSplits(&cd)
1419 }
1420 if len(cd.splits) == 0 {
1421 cd.splits = append(cd.splits, keyRange{})
1422 }
1423
1424 // Table should never be moved directly between levels,
1425 // always be rewritten to allow discarding invalid versions.
1426
1427 newTables, decr, err := s.compactBuildTables(l, cd)
1428 if err != nil {
1429 return err
1430 }
1431 defer func() {
1432 // Only assign to err, if it's not already nil.
1433 if decErr := decr(); err == nil {
1434 err = decErr
1435 }
1436 }()
1437 changeSet := buildChangeSet(&cd, newTables)
1438
1439 // We write to the manifest _before_ we delete files (and after we created files)
1440 if err := s.kv.manifest.addChanges(changeSet.Changes, s.kv.opt); err != nil {
1441 return err
1442 }
1443
1444 getSizes := func(tables []*table.Table) int64 {
1445 size := int64(0)
1446 for _, i := range tables {
1447 size += i.Size()
1448 }
1449 return size
1450 }
1451
1452 sizeNewTables := int64(0)
1453 sizeOldTables := int64(0)
1454 if s.kv.opt.MetricsEnabled {
1455 sizeNewTables = getSizes(newTables)
1456 sizeOldTables = getSizes(cd.bot) + getSizes(cd.top)
1457 y.NumBytesCompactionWrittenAdd(s.kv.opt.MetricsEnabled, nextLevel.strLevel, sizeNewTables)
1458 }
1459
1460 // See comment earlier in this function about the ordering of these ops, and the order in which
1461 // we access levels when reading.
1462 if err := nextLevel.replaceTables(cd.bot, newTables); err != nil {
1463 return err
1464 }
1465 if err := thisLevel.deleteTables(cd.top); err != nil {
1466 return err
1467 }
1468
1469 // Note: For level 0, while doCompact is running, it is possible that new tables are added.
1470 // However, the tables are added only to the end, so it is ok to just delete the first table.
1471
1472 from := append(tablesToString(cd.top), tablesToString(cd.bot)...)
1473 to := tablesToString(newTables)
1474 if dur := time.Since(timeStart); dur > 2*time.Second {
1475 var expensive string
1476 if dur > time.Second {
1477 expensive = " [E]"
1478 }
1479 s.kv.opt.Infof("[%d]%s LOG Compact %d->%d (%d, %d -> %d tables with %d splits)."+
1480 " [%s] -> [%s], took %v\n, deleted %d bytes",
1481 id, expensive, thisLevel.level, nextLevel.level, len(cd.top), len(cd.bot),
1482 len(newTables), len(cd.splits), strings.Join(from, " "), strings.Join(to, " "),
1483 dur.Round(time.Millisecond), sizeOldTables-sizeNewTables)
1484 }
1485
1486 if cd.thisLevel.level != 0 && len(newTables) > 2*s.kv.opt.LevelSizeMultiplier {
1487 s.kv.opt.Infof("This Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
1488 len(cd.top), hex.Dump(cd.thisRange.left), hex.Dump(cd.thisRange.right))
1489 s.kv.opt.Infof("Next Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
1490 len(cd.bot), hex.Dump(cd.nextRange.left), hex.Dump(cd.nextRange.right))
1491 }
1492 return nil
1493 }
1494
1495 func tablesToString(tables []*table.Table) []string {
1496 var res []string
1497 for _, t := range tables {
1498 res = append(res, fmt.Sprintf("%05d", t.ID()))
1499 }
1500 res = append(res, ".")
1501 return res
1502 }
1503
1504 var errFillTables = errors.New("Unable to fill tables")
1505
1506 // doCompact picks some table on level l and compacts it away to the next level.
1507 func (s *levelsController) doCompact(id int, p compactionPriority) error {
1508 l := p.level
1509 y.AssertTrue(l < s.kv.opt.MaxLevels) // Sanity check.
1510 if p.t.baseLevel == 0 {
1511 p.t = s.levelTargets()
1512 }
1513
1514 _, span := otel.Tracer("").Start(context.TODO(), "Badger.Compaction")
1515 defer span.End()
1516
1517 cd := compactDef{
1518 compactorId: id,
1519 p: p,
1520 t: p.t,
1521 thisLevel: s.levels[l],
1522 dropPrefixes: p.dropPrefixes,
1523 }
1524
1525 // While picking tables to be compacted, both levels' tables are expected to
1526 // remain unchanged.
1527 if l == 0 {
1528 cd.nextLevel = s.levels[p.t.baseLevel]
1529 if !s.fillTablesL0(&cd) {
1530 return errFillTables
1531 }
1532 } else {
1533 cd.nextLevel = cd.thisLevel
1534 // We're not compacting the last level so pick the next level.
1535 if !cd.thisLevel.isLastLevel() {
1536 cd.nextLevel = s.levels[l+1]
1537 }
1538 if !s.fillTables(&cd) {
1539 return errFillTables
1540 }
1541 }
1542 defer s.cstatus.delete(cd) // Remove the ranges from compaction status.
1543
1544 span.SetAttributes(attribute.String("Compaction", fmt.Sprintf("%+v", cd)))
1545 if err := s.runCompactDef(id, l, cd); err != nil {
1546 // This compaction couldn't be done successfully.
1547 s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd)
1548 return err
1549 }
1550
1551 span.SetAttributes(
1552 attribute.Int("Top tables count", len(cd.top)),
1553 attribute.Int("Bottom tables count", len(cd.bot)))
1554
1555 s.kv.opt.Debugf("[Compactor: %d] Compaction for level: %d DONE", id, cd.thisLevel.level)
1556 return nil
1557 }
1558
1559 func (s *levelsController) addLevel0Table(t *table.Table) error {
1560 // Add table to manifest file only if it is not opened in memory. We don't want to add a table
1561 // to the manifest file if it exists only in memory.
1562 if !t.IsInmemory {
1563 // We update the manifest _before_ the table becomes part of a levelHandler, because at that
1564 // point it could get used in some compaction. This ensures the manifest file gets updated in
1565 // the proper order. (That means this update happens before that of some compaction which
1566 // deletes the table.)
1567 err := s.kv.manifest.addChanges([]*pb.ManifestChange{
1568 newCreateChange(t.ID(), 0, t.KeyID(), t.CompressionType()),
1569 }, s.kv.opt)
1570 if err != nil {
1571 return err
1572 }
1573 }
1574
1575 for !s.levels[0].tryAddLevel0Table(t) {
1576 // Before we unstall, we need to make sure that level 0 is healthy.
1577 timeStart := time.Now()
1578 for s.levels[0].numTables() >= s.kv.opt.NumLevelZeroTablesStall {
1579 time.Sleep(10 * time.Millisecond)
1580 }
1581 dur := time.Since(timeStart)
1582 if dur > time.Second {
1583 s.kv.opt.Infof("L0 was stalled for %s\n", dur.Round(time.Millisecond))
1584 }
1585 s.l0stallsMs.Add(int64(dur.Round(time.Millisecond)))
1586 }
1587
1588 return nil
1589 }
1590
1591 func (s *levelsController) close() error {
1592 err := s.cleanupLevels()
1593 return y.Wrap(err, "levelsController.Close")
1594 }
1595
1596 // get searches for a given key in all the levels of the LSM tree. It returns
1597 // key version <= the expected version (version in key). If not found,
1598 // it returns an empty y.ValueStruct.
1599 func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int) (
1600 y.ValueStruct, error) {
1601 if s.kv.IsClosed() {
1602 return y.ValueStruct{}, ErrDBClosed
1603 }
1604 // It's important that we iterate the levels from 0 on upward. The reason is, if we iterated
1605 // in opposite order, or in parallel (naively calling all the h.RLock() in some order) we could
1606 // read level L's tables post-compaction and level L+1's tables pre-compaction. (If we do
1607 // parallelize this, we will need to call the h.RLock() function by increasing order of level
1608 // number.)
1609 version := y.ParseTs(key)
1610 for _, h := range s.levels {
1611 // Ignore all levels below startLevel. This is useful for GC when L0 is kept in memory.
1612 if h.level < startLevel {
1613 continue
1614 }
1615 vs, err := h.get(key) // Calls h.RLock() and h.RUnlock().
1616 if err != nil {
1617 return y.ValueStruct{}, y.Wrapf(err, "get key: %q", key)
1618 }
1619 if vs.Value == nil && vs.Meta == 0 {
1620 continue
1621 }
1622 y.NumBytesReadsLSMAdd(s.kv.opt.MetricsEnabled, int64(len(vs.Value)))
1623 if vs.Version == version {
1624 return vs, nil
1625 }
1626 if maxVs.Version < vs.Version {
1627 maxVs = vs
1628 }
1629 }
1630 if len(maxVs.Value) > 0 {
1631 y.NumGetsWithResultsAdd(s.kv.opt.MetricsEnabled, 1)
1632 }
1633 return maxVs, nil
1634 }
1635
1636 func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator {
1637 for i := len(th) - 1; i >= 0; i-- {
1638 // This will increment the reference of the table handler.
1639 out = append(out, th[i].NewIterator(opt))
1640 }
1641 return out
1642 }
1643
1644 // appendIterators appends iterators to an array of iterators, for merging.
1645 // Note: This obtains references for the table handlers. Remember to close these iterators.
1646 func (s *levelsController) appendIterators(
1647 iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
1648 // Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing
1649 // data when there's a compaction.
1650 for _, level := range s.levels {
1651 iters = level.appendIterators(iters, opt)
1652 }
1653 return iters
1654 }
1655
1656 // TableInfo represents the information about a table.
1657 type TableInfo struct {
1658 ID uint64
1659 Level int
1660 Left []byte
1661 Right []byte
1662 KeyCount uint32 // Number of keys in the table
1663 OnDiskSize uint32
1664 StaleDataSize uint32
1665 UncompressedSize uint32
1666 MaxVersion uint64
1667 IndexSz int
1668 BloomFilterSize int
1669 }
1670
1671 func (s *levelsController) getTableInfo() (result []TableInfo) {
1672 for _, l := range s.levels {
1673 l.RLock()
1674 for _, t := range l.tables {
1675 info := TableInfo{
1676 ID: t.ID(),
1677 Level: l.level,
1678 Left: t.Smallest(),
1679 Right: t.Biggest(),
1680 KeyCount: t.KeyCount(),
1681 OnDiskSize: t.OnDiskSize(),
1682 StaleDataSize: t.StaleDataSize(),
1683 IndexSz: t.IndexSize(),
1684 BloomFilterSize: t.BloomFilterSize(),
1685 UncompressedSize: t.UncompressedSize(),
1686 MaxVersion: t.MaxVersion(),
1687 }
1688 result = append(result, info)
1689 }
1690 l.RUnlock()
1691 }
1692 sort.Slice(result, func(i, j int) bool {
1693 if result[i].Level != result[j].Level {
1694 return result[i].Level < result[j].Level
1695 }
1696 return result[i].ID < result[j].ID
1697 })
1698 return
1699 }
1700
1701 type LevelInfo struct {
1702 Level int
1703 NumTables int
1704 Size int64
1705 TargetSize int64
1706 TargetFileSize int64
1707 IsBaseLevel bool
1708 Score float64
1709 Adjusted float64
1710 StaleDatSize int64
1711 }
1712
1713 func (s *levelsController) getLevelInfo() []LevelInfo {
1714 t := s.levelTargets()
1715 prios := s.pickCompactLevels(nil)
1716 result := make([]LevelInfo, len(s.levels))
1717 for i, l := range s.levels {
1718 l.RLock()
1719 result[i].Level = i
1720 result[i].Size = l.totalSize
1721 result[i].NumTables = len(l.tables)
1722 result[i].StaleDatSize = l.totalStaleSize
1723
1724 l.RUnlock()
1725
1726 result[i].TargetSize = t.targetSz[i]
1727 result[i].TargetFileSize = t.fileSz[i]
1728 result[i].IsBaseLevel = t.baseLevel == i
1729 }
1730 for _, p := range prios {
1731 result[p.level].Score = p.score
1732 result[p.level].Adjusted = p.adjusted
1733 }
1734 return result
1735 }
1736
1737 // verifyChecksum verifies checksum for all tables on all levels.
1738 func (s *levelsController) verifyChecksum() error {
1739 var tables []*table.Table
1740 for _, l := range s.levels {
1741 l.RLock()
1742 tables = tables[:0]
1743 for _, t := range l.tables {
1744 tables = append(tables, t)
1745 t.IncrRef()
1746 }
1747 l.RUnlock()
1748
1749 for _, t := range tables {
1750 errChkVerify := t.VerifyChecksum()
1751 if err := t.DecrRef(); err != nil {
1752 s.kv.opt.Errorf("unable to decrease reference of table: %s while "+
1753 "verifying checksum with error: %s", t.Filename(), err)
1754 }
1755
1756 if errChkVerify != nil {
1757 return errChkVerify
1758 }
1759 }
1760 }
1761
1762 return nil
1763 }
1764
1765 // Returns the sorted list of splits for all the levels and tables based
1766 // on the block offsets.
1767 func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string {
1768 splits := make([]string, 0)
1769 for _, l := range s.levels {
1770 l.RLock()
1771 for _, t := range l.tables {
1772 tableSplits := t.KeySplits(numPerTable, prefix)
1773 splits = append(splits, tableSplits...)
1774 }
1775 l.RUnlock()
1776 }
1777 sort.Strings(splits)
1778 return splits
1779 }
1780