1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package badger
7
8 import (
9 "bytes"
10 "context"
11 "encoding/binary"
12 "errors"
13 "expvar"
14 "fmt"
15 "math"
16 "os"
17 "path/filepath"
18 "sort"
19 "strings"
20 "sync"
21 "sync/atomic"
22 "time"
23
24 humanize "github.com/dustin/go-humanize"
25
26 "github.com/dgraph-io/badger/v4/fb"
27 "github.com/dgraph-io/badger/v4/options"
28 "github.com/dgraph-io/badger/v4/pb"
29 "github.com/dgraph-io/badger/v4/skl"
30 "github.com/dgraph-io/badger/v4/table"
31 "github.com/dgraph-io/badger/v4/y"
32 "github.com/dgraph-io/ristretto/v2"
33 "github.com/dgraph-io/ristretto/v2/z"
34 )
35
36 var (
37 badgerPrefix = []byte("!badger!") // Prefix for internal keys used by badger.
38 txnKey = []byte("!badger!txn") // For indicating end of entries in txn.
39 bannedNsKey = []byte("!badger!banned") // For storing the banned namespaces.
40 )
41
42 type closers struct {
43 updateSize *z.Closer
44 compactors *z.Closer
45 memtable *z.Closer
46 writes *z.Closer
47 valueGC *z.Closer
48 pub *z.Closer
49 cacheHealth *z.Closer
50 }
51
52 type lockedKeys struct {
53 sync.RWMutex
54 keys map[uint64]struct{}
55 }
56
57 func (lk *lockedKeys) add(key uint64) {
58 lk.Lock()
59 defer lk.Unlock()
60 lk.keys[key] = struct{}{}
61 }
62
63 func (lk *lockedKeys) has(key uint64) bool {
64 lk.RLock()
65 defer lk.RUnlock()
66 _, ok := lk.keys[key]
67 return ok
68 }
69
70 func (lk *lockedKeys) all() []uint64 {
71 lk.RLock()
72 defer lk.RUnlock()
73 keys := make([]uint64, 0, len(lk.keys))
74 for key := range lk.keys {
75 keys = append(keys, key)
76 }
77 return keys
78 }
79
80 // DB provides the various functions required to interact with Badger.
81 // DB is thread-safe.
82 type DB struct {
83 testOnlyDBExtensions
84
85 lock sync.RWMutex // Guards list of inmemory tables, not individual reads and writes.
86
87 dirLockGuard *directoryLockGuard
88 // nil if Dir and ValueDir are the same
89 valueDirGuard *directoryLockGuard
90
91 closers closers
92
93 mt *memTable // Our latest (actively written) in-memory table
94 imm []*memTable // Add here only AFTER pushing to flushChan.
95
96 // Initialized via openMemTables.
97 nextMemFid int
98
99 opt Options
100 manifest *manifestFile
101 lc *levelsController
102 vlog valueLog
103 writeCh chan *request
104 flushChan chan *memTable // For flushing memtables.
105 closeOnce sync.Once // For closing DB only once.
106
107 blockWrites atomic.Int32
108 isClosed atomic.Uint32
109
110 orc *oracle
111 bannedNamespaces *lockedKeys
112 threshold *vlogThreshold
113
114 pub *publisher
115 registry *KeyRegistry
116 blockCache *ristretto.Cache[[]byte, *table.Block]
117 indexCache *ristretto.Cache[uint64, *fb.TableIndex]
118 allocPool *z.AllocatorPool
119 }
120
121 const (
122 kvWriteChCapacity = 1000
123 )
124
125 func checkAndSetOptions(opt *Options) error {
126 // It's okay to have zero compactors which will disable all compactions but
127 // we cannot have just one compactor otherwise we will end up with all data
128 // on level 2.
129 if opt.NumCompactors == 1 {
130 return errors.New("Cannot have 1 compactor. Need at least 2")
131 }
132
133 if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") {
134 return errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set")
135 }
136 opt.maxBatchSize = (15 * opt.MemTableSize) / 100
137 opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize)
138
139 // This is the maximum value, vlogThreshold can have if dynamic thresholding is enabled.
140 opt.maxValueThreshold = math.Min(maxValueThreshold, float64(opt.maxBatchSize))
141 if opt.VLogPercentile < 0.0 || opt.VLogPercentile > 1.0 {
142 return errors.New("vlogPercentile must be within range of 0.0-1.0")
143 }
144
145 // We are limiting opt.ValueThreshold to maxValueThreshold for now.
146 if opt.ValueThreshold > maxValueThreshold {
147 return fmt.Errorf("Invalid ValueThreshold, must be less or equal to %d",
148 maxValueThreshold)
149 }
150
151 // If ValueThreshold is greater than opt.maxBatchSize, we won't be able to push any data using
152 // the transaction APIs. Transaction batches entries into batches of size opt.maxBatchSize.
153 if opt.ValueThreshold > opt.maxBatchSize {
154 return fmt.Errorf("Valuethreshold %d greater than max batch size of %d. Either "+
155 "reduce opt.ValueThreshold or increase opt.BaseTableSize.",
156 opt.ValueThreshold, opt.maxBatchSize)
157 }
158 // ValueLogFileSize should be strictly LESS than 2<<30 otherwise we will
159 // overflow the uint32 when we mmap it in OpenMemtable.
160 if !(opt.ValueLogFileSize < 2<<30 && opt.ValueLogFileSize >= 1<<20) {
161 return ErrValueLogSize
162 }
163
164 if opt.ReadOnly {
165 // Do not perform compaction in read only mode.
166 opt.CompactL0OnClose = false
167 }
168
169 needCache := (opt.Compression != options.None) || (len(opt.EncryptionKey) > 0)
170 if needCache && opt.BlockCacheSize == 0 {
171 panic("BlockCacheSize should be set since compression/encryption are enabled")
172 }
173 return nil
174 }
175
176 // Open returns a new DB object.
177 func Open(opt Options) (*DB, error) {
178 if err := checkAndSetOptions(&opt); err != nil {
179 return nil, err
180 }
181 var dirLockGuard, valueDirLockGuard *directoryLockGuard
182
183 // Create directories and acquire lock on it only if badger is not running in InMemory mode.
184 // We don't have any directories/files in InMemory mode so we don't need to acquire
185 // any locks on them.
186 if !opt.InMemory {
187 if err := createDirs(opt); err != nil {
188 return nil, err
189 }
190 var err error
191 if !opt.BypassLockGuard {
192 dirLockGuard, err = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
193 if err != nil {
194 return nil, err
195 }
196 defer func() {
197 if dirLockGuard != nil {
198 _ = dirLockGuard.release()
199 }
200 }()
201 absDir, err := filepath.Abs(opt.Dir)
202 if err != nil {
203 return nil, err
204 }
205 absValueDir, err := filepath.Abs(opt.ValueDir)
206 if err != nil {
207 return nil, err
208 }
209 if absValueDir != absDir {
210 valueDirLockGuard, err = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)
211 if err != nil {
212 return nil, err
213 }
214 defer func() {
215 if valueDirLockGuard != nil {
216 _ = valueDirLockGuard.release()
217 }
218 }()
219 }
220 }
221 }
222
223 manifestFile, manifest, err := openOrCreateManifestFile(opt)
224 if err != nil {
225 return nil, err
226 }
227 defer func() {
228 if manifestFile != nil {
229 _ = manifestFile.close()
230 }
231 }()
232
233 db := &DB{
234 imm: make([]*memTable, 0, opt.NumMemtables),
235 flushChan: make(chan *memTable, opt.NumMemtables),
236 writeCh: make(chan *request, kvWriteChCapacity),
237 opt: opt,
238 manifest: manifestFile,
239 dirLockGuard: dirLockGuard,
240 valueDirGuard: valueDirLockGuard,
241 orc: newOracle(opt),
242 pub: newPublisher(),
243 allocPool: z.NewAllocatorPool(8),
244 bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
245 threshold: initVlogThreshold(&opt),
246 }
247
248 db.syncChan = opt.syncChan
249
250 // Cleanup all the goroutines started by badger in case of an error.
251 defer func() {
252 if err != nil {
253 opt.Errorf("Received err: %v. Cleaning up...", err)
254 db.cleanup()
255 db = nil
256 }
257 }()
258
259 if opt.BlockCacheSize > 0 {
260 numInCache := opt.BlockCacheSize / int64(opt.BlockSize)
261 if numInCache == 0 {
262 // Make the value of this variable at least one since the cache requires
263 // the number of counters to be greater than zero.
264 numInCache = 1
265 }
266
267 config := ristretto.Config[[]byte, *table.Block]{
268 NumCounters: numInCache * 8,
269 MaxCost: opt.BlockCacheSize,
270 BufferItems: 64,
271 Metrics: true,
272 OnExit: table.BlockEvictHandler,
273 }
274 db.blockCache, err = ristretto.NewCache[[]byte, *table.Block](&config)
275 if err != nil {
276 return nil, y.Wrap(err, "failed to create data cache")
277 }
278 }
279
280 if opt.IndexCacheSize > 0 {
281 // Index size is around 5% of the table size.
282 indexSz := int64(float64(opt.MemTableSize) * 0.05)
283 numInCache := opt.IndexCacheSize / indexSz
284 if numInCache == 0 {
285 // Make the value of this variable at least one since the cache requires
286 // the number of counters to be greater than zero.
287 numInCache = 1
288 }
289
290 config := ristretto.Config[uint64, *fb.TableIndex]{
291 NumCounters: numInCache * 8,
292 MaxCost: opt.IndexCacheSize,
293 BufferItems: 64,
294 Metrics: true,
295 }
296 db.indexCache, err = ristretto.NewCache(&config)
297 if err != nil {
298 return nil, y.Wrap(err, "failed to create bf cache")
299 }
300 }
301
302 db.closers.cacheHealth = z.NewCloser(1)
303 go db.monitorCache(db.closers.cacheHealth)
304
305 if db.opt.InMemory {
306 db.opt.SyncWrites = false
307 // If badger is running in memory mode, push everything into the LSM Tree.
308 db.opt.ValueThreshold = math.MaxInt32
309 }
310 krOpt := KeyRegistryOptions{
311 ReadOnly: opt.ReadOnly,
312 Dir: opt.Dir,
313 EncryptionKey: opt.EncryptionKey,
314 EncryptionKeyRotationDuration: opt.EncryptionKeyRotationDuration,
315 InMemory: opt.InMemory,
316 }
317
318 if db.registry, err = OpenKeyRegistry(krOpt); err != nil {
319 return db, err
320 }
321 db.calculateSize()
322 db.closers.updateSize = z.NewCloser(1)
323 go db.updateSize(db.closers.updateSize)
324
325 if err := db.openMemTables(db.opt); err != nil {
326 return nil, y.Wrapf(err, "while opening memtables")
327 }
328
329 if !db.opt.ReadOnly {
330 if db.mt, err = db.newMemTable(); err != nil {
331 return nil, y.Wrapf(err, "cannot create memtable")
332 }
333 }
334
335 // newLevelsController potentially loads files in directory.
336 if db.lc, err = newLevelsController(db, &manifest); err != nil {
337 return db, err
338 }
339
340 // Initialize vlog struct.
341 db.vlog.init(db)
342
343 if !opt.ReadOnly {
344 db.closers.compactors = z.NewCloser(1)
345 db.lc.startCompact(db.closers.compactors)
346
347 db.closers.memtable = z.NewCloser(1)
348 go func() {
349 db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
350 }()
351 // Flush them to disk asap.
352 for _, mt := range db.imm {
353 db.flushChan <- mt
354 }
355 }
356 // We do increment nextTxnTs below. So, no need to do it here.
357 db.orc.nextTxnTs = db.MaxVersion()
358 db.opt.Infof("Set nextTxnTs to %d", db.orc.nextTxnTs)
359
360 if err = db.vlog.open(db); err != nil {
361 return db, y.Wrapf(err, "During db.vlog.open")
362 }
363
364 // Let's advance nextTxnTs to one more than whatever we observed via
365 // replaying the logs.
366 db.orc.txnMark.Done(db.orc.nextTxnTs)
367 // In normal mode, we must update readMark so older versions of keys can be removed during
368 // compaction when run in offline mode via the flatten tool.
369 db.orc.readMark.Done(db.orc.nextTxnTs)
370 db.orc.incrementNextTs()
371
372 go db.threshold.listenForValueThresholdUpdate()
373
374 if err := db.initBannedNamespaces(); err != nil {
375 return db, fmt.Errorf("While setting banned keys: %w", err)
376 }
377
378 db.closers.writes = z.NewCloser(1)
379 go db.doWrites(db.closers.writes)
380
381 if !db.opt.InMemory {
382 db.closers.valueGC = z.NewCloser(1)
383 go db.vlog.waitOnGC(db.closers.valueGC)
384 }
385
386 db.closers.pub = z.NewCloser(1)
387 go db.pub.listenForUpdates(db.closers.pub)
388
389 valueDirLockGuard = nil
390 dirLockGuard = nil
391 manifestFile = nil
392 return db, nil
393 }
394
395 // initBannedNamespaces retrieves the banned namespaces from the DB and updates in-memory structure.
396 func (db *DB) initBannedNamespaces() error {
397 if db.opt.NamespaceOffset < 0 {
398 return nil
399 }
400 return db.View(func(txn *Txn) error {
401 iopts := DefaultIteratorOptions
402 iopts.Prefix = bannedNsKey
403 iopts.PrefetchValues = false
404 iopts.InternalAccess = true
405 itr := txn.NewIterator(iopts)
406 defer itr.Close()
407 for itr.Rewind(); itr.Valid(); itr.Next() {
408 key := y.BytesToU64(itr.Item().Key()[len(bannedNsKey):])
409 db.bannedNamespaces.add(key)
410 }
411 return nil
412 })
413 }
414
415 func (db *DB) MaxVersion() uint64 {
416 var maxVersion uint64
417 update := func(a uint64) {
418 if a > maxVersion {
419 maxVersion = a
420 }
421 }
422 db.lock.Lock()
423 // In read only mode, we do not create new mem table.
424 if !db.opt.ReadOnly {
425 update(db.mt.maxVersion)
426 }
427 for _, mt := range db.imm {
428 update(mt.maxVersion)
429 }
430 db.lock.Unlock()
431 for _, ti := range db.Tables() {
432 update(ti.MaxVersion)
433 }
434 return maxVersion
435 }
436
437 func (db *DB) monitorCache(c *z.Closer) {
438 defer c.Done()
439 count := 0
440 analyze := func(name string, metrics *ristretto.Metrics) {
441 // If the mean life expectancy is less than 10 seconds, the cache
442 // might be too small.
443 le := metrics.LifeExpectancySeconds()
444 if le == nil {
445 return
446 }
447 lifeTooShort := le.Count > 0 && float64(le.Sum)/float64(le.Count) < 10
448 hitRatioTooLow := metrics.Ratio() > 0 && metrics.Ratio() < 0.4
449 if lifeTooShort && hitRatioTooLow {
450 db.opt.Warningf("%s might be too small. Metrics: %s\n", name, metrics)
451 db.opt.Warningf("Cache life expectancy (in seconds): %+v\n", le)
452
453 } else if le.Count > 1000 && count%5 == 0 {
454 db.opt.Infof("%s metrics: %s\n", name, metrics)
455 }
456 }
457
458 ticker := time.NewTicker(1 * time.Minute)
459 defer ticker.Stop()
460 for {
461 select {
462 case <-c.HasBeenClosed():
463 return
464 case <-ticker.C:
465 }
466
467 analyze("Block cache", db.BlockCacheMetrics())
468 analyze("Index cache", db.IndexCacheMetrics())
469 count++
470 }
471 }
472
473 // cleanup stops all the goroutines started by badger. This is used in open to
474 // cleanup goroutines in case of an error.
475 func (db *DB) cleanup() {
476 db.stopMemoryFlush()
477 db.stopCompactions()
478
479 db.blockCache.Close()
480 db.indexCache.Close()
481 if db.closers.updateSize != nil {
482 db.closers.updateSize.Signal()
483 }
484 if db.closers.valueGC != nil {
485 db.closers.valueGC.Signal()
486 }
487 if db.closers.writes != nil {
488 db.closers.writes.Signal()
489 }
490 if db.closers.pub != nil {
491 db.closers.pub.Signal()
492 }
493
494 db.orc.Stop()
495
496 // Do not use vlog.Close() here. vlog.Close truncates the files. We don't
497 // want to truncate files unless the user has specified the truncate flag.
498 }
499
500 // BlockCacheMetrics returns the metrics for the underlying block cache.
501 func (db *DB) BlockCacheMetrics() *ristretto.Metrics {
502 if db.blockCache != nil {
503 return db.blockCache.Metrics
504 }
505 return nil
506 }
507
508 // IndexCacheMetrics returns the metrics for the underlying index cache.
509 func (db *DB) IndexCacheMetrics() *ristretto.Metrics {
510 if db.indexCache != nil {
511 return db.indexCache.Metrics
512 }
513 return nil
514 }
515
516 // Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to
517 // disk. Calling DB.Close() multiple times would still only close the DB once.
518 func (db *DB) Close() error {
519 var err error
520 db.closeOnce.Do(func() {
521 err = db.close()
522 })
523 return err
524 }
525
526 // IsClosed denotes if the badger DB is closed or not. A DB instance should not
527 // be used after closing it.
528 func (db *DB) IsClosed() bool {
529 return db.isClosed.Load() == 1
530 }
531
532 func (db *DB) close() (err error) {
533 defer db.allocPool.Release()
534
535 db.opt.Debugf("Closing database")
536 db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs.Load()))
537
538 db.blockWrites.Store(1)
539 db.isClosed.Store(1)
540
541 if !db.opt.InMemory {
542 // Stop value GC first.
543 db.closers.valueGC.SignalAndWait()
544 }
545
546 // Stop writes next.
547 db.closers.writes.SignalAndWait()
548
549 // Don't accept any more write.
550 close(db.writeCh)
551
552 db.closers.pub.SignalAndWait()
553 db.closers.cacheHealth.Signal()
554
555 // Make sure that block writer is done pushing stuff into memtable!
556 // Otherwise, you will have a race condition: we are trying to flush memtables
557 // and remove them completely, while the block / memtable writer is still
558 // trying to push stuff into the memtable. This will also resolve the value
559 // offset problem: as we push into memtable, we update value offsets there.
560 if db.mt != nil {
561 if db.mt.sl.Empty() {
562 // Remove the memtable if empty.
563 db.mt.DecrRef()
564 } else {
565 db.opt.Debugf("Flushing memtable")
566 for {
567 pushedMemTable := func() bool {
568 db.lock.Lock()
569 defer db.lock.Unlock()
570 y.AssertTrue(db.mt != nil)
571 select {
572 case db.flushChan <- db.mt:
573 db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
574 db.mt = nil // Will segfault if we try writing!
575 db.opt.Debugf("pushed to flush chan\n")
576 return true
577 default:
578 // If we fail to push, we need to unlock and wait for a short while.
579 // The flushing operation needs to update s.imm. Otherwise, we have a
580 // deadlock.
581 // TODO: Think about how to do this more cleanly, maybe without any locks.
582 }
583 return false
584 }()
585 if pushedMemTable {
586 break
587 }
588 time.Sleep(10 * time.Millisecond)
589 }
590 }
591 }
592 db.stopMemoryFlush()
593 db.stopCompactions()
594
595 // Force Compact L0
596 // We don't need to care about cstatus since no parallel compaction is running.
597 if db.opt.CompactL0OnClose {
598 err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
599 switch err {
600 case errFillTables:
601 // This error only means that there might be enough tables to do a compaction. So, we
602 // should not report it to the end user to avoid confusing them.
603 case nil:
604 db.opt.Debugf("Force compaction on level 0 done")
605 default:
606 db.opt.Warningf("While forcing compaction on level 0: %v", err)
607 }
608 }
609
610 // Now close the value log.
611 if vlogErr := db.vlog.Close(); vlogErr != nil {
612 err = y.Wrap(vlogErr, "DB.Close")
613 }
614
615 db.opt.Infof(db.LevelsToString())
616 if lcErr := db.lc.close(); err == nil {
617 err = y.Wrap(lcErr, "DB.Close")
618 }
619 db.opt.Debugf("Waiting for closer")
620 db.closers.updateSize.SignalAndWait()
621 db.orc.Stop()
622 db.blockCache.Close()
623 db.indexCache.Close()
624
625 db.threshold.close()
626
627 if db.opt.InMemory {
628 return
629 }
630
631 if db.dirLockGuard != nil {
632 if guardErr := db.dirLockGuard.release(); err == nil {
633 err = y.Wrap(guardErr, "DB.Close")
634 }
635 }
636 if db.valueDirGuard != nil {
637 if guardErr := db.valueDirGuard.release(); err == nil {
638 err = y.Wrap(guardErr, "DB.Close")
639 }
640 }
641 if manifestErr := db.manifest.close(); err == nil {
642 err = y.Wrap(manifestErr, "DB.Close")
643 }
644 if registryErr := db.registry.Close(); err == nil {
645 err = y.Wrap(registryErr, "DB.Close")
646 }
647
648 // Fsync directories to ensure that lock file, and any other removed files whose directory
649 // we haven't specifically fsynced, are guaranteed to have their directory entry removal
650 // persisted to disk.
651 if syncErr := db.syncDir(db.opt.Dir); err == nil {
652 err = y.Wrap(syncErr, "DB.Close")
653 }
654 if syncErr := db.syncDir(db.opt.ValueDir); err == nil {
655 err = y.Wrap(syncErr, "DB.Close")
656 }
657
658 return err
659 }
660
661 // VerifyChecksum verifies checksum for all tables on all levels.
662 // This method can be used to verify checksum, if opt.ChecksumVerificationMode is NoVerification.
663 func (db *DB) VerifyChecksum() error {
664 return db.lc.verifyChecksum()
665 }
666
667 const (
668 lockFile = "LOCK"
669 )
670
671 // Sync syncs database content to disk. This function provides
672 // more control to user to sync data whenever required.
673 func (db *DB) Sync() error {
674 /**
675 Make an attempt to sync both the logs, the active memtable's WAL and the vLog (1847).
676 Cases:
677 - All_ok :: If both the logs sync successfully.
678
679 - Entry_Lost :: If an entry with a value pointer was present in the active memtable's WAL,
680 :: and the WAL was synced but there was an error in syncing the vLog.
681 :: The entry will be considered lost and this case will need to be handled during recovery.
682
683 - Entries_Lost :: If there were errors in syncing both the logs, multiple entries would be lost.
684
685 - Entries_Lost :: If the active memtable's WAL is not synced but the vLog is synced, it will
686 :: result in entries being lost because recovery of the active memtable is done from its WAL.
687 :: Check `UpdateSkipList` in memtable.go.
688
689 - Nothing_lost :: If an entry with its value was present in the active memtable's WAL, and the WAL was synced,
690 :: but there was an error in syncing the vLog.
691 :: Nothing is lost for this very specific entry because the entry is completely present in the memtable's WAL.
692
693 - Partially_lost :: If entries were written partially in either of the logs,
694 :: the logs will be truncated during recovery.
695 :: As a result of truncation, some entries might be lost.
696 :: Assume that 4KB of data is to be synced and invoking `Sync` results only in syncing 3KB
697 :: of data and then the machine shuts down or the disk failure happens,
698 :: this will result in partial writes. [[This case needs verification]]
699 */
700 db.lock.RLock()
701 memtableSyncError := db.mt.SyncWAL()
702 db.lock.RUnlock()
703
704 vLogSyncError := db.vlog.sync()
705 return y.CombineErrors(memtableSyncError, vLogSyncError)
706 }
707
708 // getMemtables returns the current memtables and get references.
709 func (db *DB) getMemTables() ([]*memTable, func()) {
710 db.lock.RLock()
711 defer db.lock.RUnlock()
712
713 var tables []*memTable
714
715 // Mutable memtable does not exist in read-only mode.
716 if !db.opt.ReadOnly {
717 // Get mutable memtable.
718 tables = append(tables, db.mt)
719 db.mt.IncrRef()
720 }
721
722 // Get immutable memtables.
723 last := len(db.imm) - 1
724 for i := range db.imm {
725 tables = append(tables, db.imm[last-i])
726 db.imm[last-i].IncrRef()
727 }
728 return tables, func() {
729 for _, tbl := range tables {
730 tbl.DecrRef()
731 }
732 }
733 }
734
735 // get returns the value in memtable or disk for given key.
736 // Note that value will include meta byte.
737 //
738 // IMPORTANT: We should never write an entry with an older timestamp for the same key, We need to
739 // maintain this invariant to search for the latest value of a key, or else we need to search in all
740 // tables and find the max version among them. To maintain this invariant, we also need to ensure
741 // that all versions of a key are always present in the same table from level 1, because compaction
742 // can push any table down.
743 //
744 // Update(23/09/2020) - We have dropped the move key implementation. Earlier we
745 // were inserting move keys to fix the invalid value pointers but we no longer
746 // do that. For every get("fooX") call where X is the version, we will search
747 // for "fooX" in all the levels of the LSM tree. This is expensive but it
748 // removes the overhead of handling move keys completely.
749 func (db *DB) get(key []byte) (y.ValueStruct, error) {
750 if db.IsClosed() {
751 return y.ValueStruct{}, ErrDBClosed
752 }
753 tables, decr := db.getMemTables() // Lock should be released.
754 defer decr()
755
756 var maxVs y.ValueStruct
757 version := y.ParseTs(key)
758
759 y.NumGetsAdd(db.opt.MetricsEnabled, 1)
760 for i := 0; i < len(tables); i++ {
761 vs := tables[i].sl.Get(key)
762 y.NumMemtableGetsAdd(db.opt.MetricsEnabled, 1)
763 if vs.Meta == 0 && vs.Value == nil {
764 continue
765 }
766 // Found the required version of the key, return immediately.
767 if vs.Version == version {
768 y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1)
769 return vs, nil
770 }
771 if maxVs.Version < vs.Version {
772 maxVs = vs
773 }
774 }
775 return db.lc.get(key, maxVs, 0)
776 }
777
778 var requestPool = sync.Pool{
779 New: func() interface{} {
780 return new(request)
781 },
782 }
783
784 func (db *DB) writeToLSM(b *request) error {
785 // We should check the length of b.Prts and b.Entries only when badger is not
786 // running in InMemory mode. In InMemory mode, we don't write anything to the
787 // value log and that's why the length of b.Ptrs will always be zero.
788 if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
789 return fmt.Errorf("Ptrs and Entries don't match: %+v", b)
790 }
791
792 for i, entry := range b.Entries {
793 var err error
794 if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
795 // Will include deletion / tombstone case.
796 err = db.mt.Put(entry.Key,
797 y.ValueStruct{
798 Value: entry.Value,
799 // Ensure value pointer flag is removed. Otherwise, the value will fail
800 // to be retrieved during iterator prefetch. `bitValuePointer` is only
801 // known to be set in write to LSM when the entry is loaded from a backup
802 // with lower ValueThreshold and its value was stored in the value log.
803 Meta: entry.meta &^ bitValuePointer,
804 UserMeta: entry.UserMeta,
805 ExpiresAt: entry.ExpiresAt,
806 })
807 } else {
808 // Write pointer to Memtable.
809 err = db.mt.Put(entry.Key,
810 y.ValueStruct{
811 Value: b.Ptrs[i].Encode(),
812 Meta: entry.meta | bitValuePointer,
813 UserMeta: entry.UserMeta,
814 ExpiresAt: entry.ExpiresAt,
815 })
816 }
817 if err != nil {
818 return y.Wrapf(err, "while writing to memTable")
819 }
820 }
821 if db.opt.SyncWrites {
822 return db.mt.SyncWAL()
823 }
824 return nil
825 }
826
827 // writeRequests is called serially by only one goroutine.
828 func (db *DB) writeRequests(reqs []*request) error {
829 if len(reqs) == 0 {
830 return nil
831 }
832
833 done := func(err error) {
834 for _, r := range reqs {
835 r.Err = err
836 r.Wg.Done()
837 }
838 }
839 db.opt.Debugf("writeRequests called. Writing to value log")
840 err := db.vlog.write(reqs)
841 if err != nil {
842 done(err)
843 return err
844 }
845
846 db.opt.Debugf("Writing to memtable")
847 var count int
848 for _, b := range reqs {
849 if len(b.Entries) == 0 {
850 continue
851 }
852 count += len(b.Entries)
853 var i uint64
854 var err error
855 for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
856 i++
857 if i%100 == 0 {
858 db.opt.Debugf("Making room for writes")
859 }
860 // We need to poll a bit because both hasRoomForWrite and the flusher need access to s.imm.
861 // When flushChan is full and you are blocked there, and the flusher is trying to update s.imm,
862 // you will get a deadlock.
863 time.Sleep(10 * time.Millisecond)
864 }
865 if err != nil {
866 done(err)
867 return y.Wrap(err, "writeRequests")
868 }
869 if err := db.writeToLSM(b); err != nil {
870 done(err)
871 return y.Wrap(err, "writeRequests")
872 }
873 }
874
875 db.opt.Debugf("Sending updates to subscribers")
876 db.pub.sendUpdates(reqs)
877
878 done(nil)
879 db.opt.Debugf("%d entries written", count)
880 return nil
881 }
882
883 func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
884 if db.blockWrites.Load() == 1 {
885 return nil, ErrBlockedWrites
886 }
887 var count, size int64
888 for _, e := range entries {
889 size += e.estimateSizeAndSetThreshold(db.valueThreshold())
890 count++
891 }
892 y.NumBytesWrittenUserAdd(db.opt.MetricsEnabled, size)
893 if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
894 return nil, ErrTxnTooBig
895 }
896
897 // We can only service one request because we need each txn to be stored in a contiguous section.
898 // Txns should not interleave among other txns or rewrites.
899 req := requestPool.Get().(*request)
900 req.reset()
901 req.Entries = entries
902 req.Wg.Add(1)
903 req.IncrRef() // for db write
904 db.writeCh <- req // Handled in doWrites.
905 y.NumPutsAdd(db.opt.MetricsEnabled, int64(len(entries)))
906
907 return req, nil
908 }
909
910 func (db *DB) doWrites(lc *z.Closer) {
911 defer lc.Done()
912 pendingCh := make(chan struct{}, 1)
913
914 writeRequests := func(reqs []*request) {
915 if err := db.writeRequests(reqs); err != nil {
916 db.opt.Errorf("writeRequests: %v", err)
917 }
918 <-pendingCh
919 }
920
921 // This variable tracks the number of pending writes.
922 reqLen := new(expvar.Int)
923 y.PendingWritesSet(db.opt.MetricsEnabled, db.opt.Dir, reqLen)
924
925 reqs := make([]*request, 0, 10)
926 for {
927 var r *request
928 select {
929 case r = <-db.writeCh:
930 case <-lc.HasBeenClosed():
931 goto closedCase
932 }
933
934 for {
935 reqs = append(reqs, r)
936 reqLen.Set(int64(len(reqs)))
937
938 if len(reqs) >= 3*kvWriteChCapacity {
939 pendingCh <- struct{}{} // blocking.
940 goto writeCase
941 }
942
943 select {
944 // Either push to pending, or continue to pick from writeCh.
945 case r = <-db.writeCh:
946 case pendingCh <- struct{}{}:
947 goto writeCase
948 case <-lc.HasBeenClosed():
949 goto closedCase
950 }
951 }
952
953 closedCase:
954 // All the pending request are drained.
955 // Don't close the writeCh, because it has be used in several places.
956 for {
957 select {
958 case r = <-db.writeCh:
959 reqs = append(reqs, r)
960 default:
961 pendingCh <- struct{}{} // Push to pending before doing a write.
962 writeRequests(reqs)
963 return
964 }
965 }
966
967 writeCase:
968 go writeRequests(reqs)
969 reqs = make([]*request, 0, 10)
970 reqLen.Set(0)
971 }
972 }
973
974 // batchSet applies a list of badger.Entry. If a request level error occurs it
975 // will be returned.
976 //
977 // Check(kv.BatchSet(entries))
978 func (db *DB) batchSet(entries []*Entry) error {
979 req, err := db.sendToWriteCh(entries)
980 if err != nil {
981 return err
982 }
983
984 return req.Wait()
985 }
986
987 // batchSetAsync is the asynchronous version of batchSet. It accepts a callback
988 // function which is called when all the sets are complete. If a request level
989 // error occurs, it will be passed back via the callback.
990 //
991 // err := kv.BatchSetAsync(entries, func(err error)) {
992 // Check(err)
993 // }
994 func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
995 req, err := db.sendToWriteCh(entries)
996 if err != nil {
997 return err
998 }
999 go func() {
1000 err := req.Wait()
1001 // Write is complete. Let's call the callback function now.
1002 f(err)
1003 }()
1004 return nil
1005 }
1006
1007 var errNoRoom = errors.New("No room for write")
1008
1009 // ensureRoomForWrite is always called serially.
1010 func (db *DB) ensureRoomForWrite() error {
1011 var err error
1012 db.lock.Lock()
1013 defer db.lock.Unlock()
1014
1015 y.AssertTrue(db.mt != nil) // A nil mt indicates that DB is being closed.
1016 if !db.mt.isFull() {
1017 return nil
1018 }
1019
1020 select {
1021 case db.flushChan <- db.mt:
1022 db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
1023 db.mt.sl.MemSize(), len(db.flushChan))
1024 // We manage to push this task. Let's modify imm.
1025 db.imm = append(db.imm, db.mt)
1026 db.mt, err = db.newMemTable()
1027 if err != nil {
1028 return y.Wrapf(err, "cannot create new mem table")
1029 }
1030 // New memtable is empty. We certainly have room.
1031 return nil
1032 default:
1033 // We need to do this to unlock and allow the flusher to modify imm.
1034 return errNoRoom
1035 }
1036 }
1037
1038 func arenaSize(opt Options) int64 {
1039 return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
1040 }
1041
1042 // buildL0Table builds a new table from the memtable.
1043 func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *table.Builder {
1044 defer iter.Close()
1045
1046 b := table.NewTableBuilder(bopts)
1047 for iter.Rewind(); iter.Valid(); iter.Next() {
1048 if len(dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), dropPrefixes) {
1049 continue
1050 }
1051 vs := iter.Value()
1052 var vp valuePointer
1053 if vs.Meta&bitValuePointer > 0 {
1054 vp.Decode(vs.Value)
1055 }
1056 b.Add(iter.Key(), iter.Value(), vp.Len)
1057 }
1058
1059 return b
1060 }
1061
1062 // handleMemTableFlush must be run serially.
1063 func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error {
1064 bopts := buildTableOptions(db)
1065 itr := mt.sl.NewUniIterator(false)
1066 builder := buildL0Table(itr, nil, bopts)
1067 defer builder.Close()
1068
1069 // buildL0Table can return nil if the none of the items in the skiplist are
1070 // added to the builder. This can happen when drop prefix is set and all
1071 // the items are skipped.
1072 if builder.Empty() {
1073 builder.Finish()
1074 return nil
1075 }
1076
1077 fileID := db.lc.reserveFileID()
1078 var tbl *table.Table
1079 var err error
1080 if db.opt.InMemory {
1081 data := builder.Finish()
1082 tbl, err = table.OpenInMemoryTable(data, fileID, &bopts)
1083 } else {
1084 tbl, err = table.CreateTable(table.NewFilename(fileID, db.opt.Dir), builder)
1085 }
1086 if err != nil {
1087 return y.Wrap(err, "error while creating table")
1088 }
1089 // We own a ref on tbl.
1090 err = db.lc.addLevel0Table(tbl) // This will incrRef
1091 _ = tbl.DecrRef() // Releases our ref.
1092 return err
1093 }
1094
1095 // flushMemtable must keep running until we send it an empty memtable. If there
1096 // are errors during handling the memtable flush, we'll retry indefinitely.
1097 func (db *DB) flushMemtable(lc *z.Closer) {
1098 defer lc.Done()
1099
1100 for mt := range db.flushChan {
1101 if mt == nil {
1102 continue
1103 }
1104
1105 for {
1106 if err := db.handleMemTableFlush(mt, nil); err != nil {
1107 // Encountered error. Retry indefinitely.
1108 db.opt.Errorf("error flushing memtable to disk: %v, retrying", err)
1109 time.Sleep(time.Second)
1110 continue
1111 }
1112
1113 // Update s.imm. Need a lock.
1114 db.lock.Lock()
1115 // This is a single-threaded operation. mt corresponds to the head of
1116 // db.imm list. Once we flush it, we advance db.imm. The next mt
1117 // which would arrive here would match db.imm[0], because we acquire a
1118 // lock over DB when pushing to flushChan.
1119 // TODO: This logic is dirty AF. Any change and this could easily break.
1120 y.AssertTrue(mt == db.imm[0])
1121 db.imm = db.imm[1:]
1122 mt.DecrRef() // Return memory.
1123 // unlock
1124 db.lock.Unlock()
1125 break
1126 }
1127 }
1128 }
1129
1130 func exists(path string) (bool, error) {
1131 _, err := os.Stat(path)
1132 if err == nil {
1133 return true, nil
1134 }
1135 if os.IsNotExist(err) {
1136 return false, nil
1137 }
1138 return true, err
1139 }
1140
1141 // This function does a filewalk, calculates the size of vlog and sst files and stores it in
1142 // y.LSMSize and y.VlogSize.
1143 func (db *DB) calculateSize() {
1144 if db.opt.InMemory {
1145 return
1146 }
1147 newInt := func(val int64) *expvar.Int {
1148 v := new(expvar.Int)
1149 v.Add(val)
1150 return v
1151 }
1152
1153 totalSize := func(dir string) (int64, int64) {
1154 var lsmSize, vlogSize int64
1155 err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
1156 if err != nil {
1157 return err
1158 }
1159 ext := filepath.Ext(path)
1160 switch ext {
1161 case ".sst":
1162 lsmSize += info.Size()
1163 case ".vlog":
1164 vlogSize += info.Size()
1165 }
1166 return nil
1167 })
1168 if err != nil {
1169 db.opt.Debugf("Got error while calculating total size of directory: %s", dir)
1170 }
1171 return lsmSize, vlogSize
1172 }
1173
1174 lsmSize, vlogSize := totalSize(db.opt.Dir)
1175 y.LSMSizeSet(db.opt.MetricsEnabled, db.opt.Dir, newInt(lsmSize))
1176 // If valueDir is different from dir, we'd have to do another walk.
1177 if db.opt.ValueDir != db.opt.Dir {
1178 _, vlogSize = totalSize(db.opt.ValueDir)
1179 }
1180 y.VlogSizeSet(db.opt.MetricsEnabled, db.opt.ValueDir, newInt(vlogSize))
1181 }
1182
1183 func (db *DB) updateSize(lc *z.Closer) {
1184 defer lc.Done()
1185 if db.opt.InMemory {
1186 return
1187 }
1188
1189 metricsTicker := time.NewTicker(time.Minute)
1190 defer metricsTicker.Stop()
1191
1192 for {
1193 select {
1194 case <-metricsTicker.C:
1195 db.calculateSize()
1196 case <-lc.HasBeenClosed():
1197 return
1198 }
1199 }
1200 }
1201
1202 // RunValueLogGC triggers a value log garbage collection.
1203 //
1204 // It picks value log files to perform GC based on statistics that are collected
1205 // during compactions. If no such statistics are available, then log files are
1206 // picked in random order. The process stops as soon as the first log file is
1207 // encountered which does not result in garbage collection.
1208 //
1209 // When a log file is picked, it is first sampled. If the sample shows that we
1210 // can discard at least discardRatio space of that file, it would be rewritten.
1211 //
1212 // If a call to RunValueLogGC results in no rewrites, then an ErrNoRewrite is
1213 // thrown indicating that the call resulted in no file rewrites.
1214 //
1215 // We recommend setting discardRatio to 0.5, thus indicating that a file be
1216 // rewritten if half the space can be discarded. This results in a lifetime
1217 // value log write amplification of 2 (1 from original write + 0.5 rewrite +
1218 // 0.25 + 0.125 + ... = 2). Setting it to higher value would result in fewer
1219 // space reclaims, while setting it to a lower value would result in more space
1220 // reclaims at the cost of increased activity on the LSM tree. discardRatio
1221 // must be in the range (0.0, 1.0), both endpoints excluded, otherwise an
1222 // ErrInvalidRequest is returned.
1223 //
1224 // Only one GC is allowed at a time. If another value log GC is running, or DB
1225 // has been closed, this would return an ErrRejected.
1226 //
1227 // Note: Every time GC is run, it would produce a spike of activity on the LSM
1228 // tree.
1229 func (db *DB) RunValueLogGC(discardRatio float64) error {
1230 if db.opt.InMemory {
1231 return ErrGCInMemoryMode
1232 }
1233 if discardRatio >= 1.0 || discardRatio <= 0.0 {
1234 return ErrInvalidRequest
1235 }
1236
1237 // Pick a log file and run GC
1238 return db.vlog.runGC(discardRatio)
1239 }
1240
1241 // Size returns the size of lsm and value log files in bytes. It can be used to decide how often to
1242 // call RunValueLogGC.
1243 func (db *DB) Size() (lsm, vlog int64) {
1244 if y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir) == nil {
1245 lsm, vlog = 0, 0
1246 return
1247 }
1248 lsm = y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir).(*expvar.Int).Value()
1249 vlog = y.VlogSizeGet(db.opt.MetricsEnabled, db.opt.ValueDir).(*expvar.Int).Value()
1250 return
1251 }
1252
1253 // Sequence represents a Badger sequence.
1254 type Sequence struct {
1255 lock sync.Mutex
1256 db *DB
1257 key []byte
1258 next uint64
1259 leased uint64
1260 bandwidth uint64
1261 }
1262
1263 // Next would return the next integer in the sequence, updating the lease by running a transaction
1264 // if needed.
1265 func (seq *Sequence) Next() (uint64, error) {
1266 seq.lock.Lock()
1267 defer seq.lock.Unlock()
1268 if seq.next >= seq.leased {
1269 if err := seq.updateLease(); err != nil {
1270 return 0, err
1271 }
1272 }
1273 val := seq.next
1274 seq.next++
1275 return val, nil
1276 }
1277
1278 // Release the leased sequence to avoid wasted integers. This should be done right
1279 // before closing the associated DB. However it is valid to use the sequence after
1280 // it was released, causing a new lease with full bandwidth.
1281 func (seq *Sequence) Release() error {
1282 seq.lock.Lock()
1283 defer seq.lock.Unlock()
1284 err := seq.db.Update(func(txn *Txn) error {
1285 item, err := txn.Get(seq.key)
1286 if err != nil {
1287 return err
1288 }
1289
1290 var num uint64
1291 if err := item.Value(func(v []byte) error {
1292 num = binary.BigEndian.Uint64(v)
1293 return nil
1294 }); err != nil {
1295 return err
1296 }
1297
1298 if num == seq.leased {
1299 var buf [8]byte
1300 binary.BigEndian.PutUint64(buf[:], seq.next)
1301 return txn.SetEntry(NewEntry(seq.key, buf[:]))
1302 }
1303
1304 return nil
1305 })
1306 if err != nil {
1307 return err
1308 }
1309 seq.leased = seq.next
1310 return nil
1311 }
1312
1313 func (seq *Sequence) updateLease() error {
1314 return seq.db.Update(func(txn *Txn) error {
1315 item, err := txn.Get(seq.key)
1316 switch {
1317 case err == ErrKeyNotFound:
1318 seq.next = 0
1319 case err != nil:
1320 return err
1321 default:
1322 var num uint64
1323 if err := item.Value(func(v []byte) error {
1324 num = binary.BigEndian.Uint64(v)
1325 return nil
1326 }); err != nil {
1327 return err
1328 }
1329 seq.next = num
1330 }
1331
1332 lease := seq.next + seq.bandwidth
1333 var buf [8]byte
1334 binary.BigEndian.PutUint64(buf[:], lease)
1335 if err = txn.SetEntry(NewEntry(seq.key, buf[:])); err != nil {
1336 return err
1337 }
1338 seq.leased = lease
1339 return nil
1340 })
1341 }
1342
1343 // GetSequence would initiate a new sequence object, generating it from the stored lease, if
1344 // available, in the database. Sequence can be used to get a list of monotonically increasing
1345 // integers. Multiple sequences can be created by providing different keys. Bandwidth sets the
1346 // size of the lease, determining how many Next() requests can be served from memory.
1347 //
1348 // GetSequence is not supported on ManagedDB. Calling this would result in a panic.
1349 func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) {
1350 if db.opt.managedTxns {
1351 panic("Cannot use GetSequence with managedDB=true.")
1352 }
1353
1354 switch {
1355 case len(key) == 0:
1356 return nil, ErrEmptyKey
1357 case bandwidth == 0:
1358 return nil, ErrZeroBandwidth
1359 }
1360 seq := &Sequence{
1361 db: db,
1362 key: key,
1363 next: 0,
1364 leased: 0,
1365 bandwidth: bandwidth,
1366 }
1367 err := seq.updateLease()
1368 return seq, err
1369 }
1370
1371 // Tables gets the TableInfo objects from the level controller. If withKeysCount
1372 // is true, TableInfo objects also contain counts of keys for the tables.
1373 func (db *DB) Tables() []TableInfo {
1374 return db.lc.getTableInfo()
1375 }
1376
1377 // Levels gets the LevelInfo.
1378 func (db *DB) Levels() []LevelInfo {
1379 return db.lc.getLevelInfo()
1380 }
1381
1382 // EstimateSize can be used to get rough estimate of data size for a given prefix.
1383 func (db *DB) EstimateSize(prefix []byte) (uint64, uint64) {
1384 var onDiskSize, uncompressedSize uint64
1385 tables := db.Tables()
1386 for _, ti := range tables {
1387 if bytes.HasPrefix(ti.Left, prefix) && bytes.HasPrefix(ti.Right, prefix) {
1388 onDiskSize += uint64(ti.OnDiskSize)
1389 uncompressedSize += uint64(ti.UncompressedSize)
1390 }
1391 }
1392 return onDiskSize, uncompressedSize
1393 }
1394
1395 // Ranges can be used to get rough key ranges to divide up iteration over the DB. The ranges here
1396 // would consider the prefix, but would not necessarily start or end with the prefix. In fact, the
1397 // first range would have nil as left key, and the last range would have nil as the right key.
1398 func (db *DB) Ranges(prefix []byte, numRanges int) []*keyRange {
1399 var splits []string
1400 tables := db.Tables()
1401
1402 // We just want table ranges here and not keys count.
1403 for _, ti := range tables {
1404 // We don't use ti.Left, because that has a tendency to store !badger keys. Skip over tables
1405 // at upper levels. Only choose tables from the last level.
1406 if ti.Level != db.opt.MaxLevels-1 {
1407 continue
1408 }
1409 if bytes.HasPrefix(ti.Right, prefix) {
1410 splits = append(splits, string(ti.Right))
1411 }
1412 }
1413
1414 // If the number of splits is low, look at the offsets inside the
1415 // tables to generate more splits.
1416 if len(splits) < 32 {
1417 numTables := len(tables)
1418 if numTables == 0 {
1419 numTables = 1
1420 }
1421 numPerTable := 32 / numTables
1422 if numPerTable == 0 {
1423 numPerTable = 1
1424 }
1425 splits = db.lc.keySplits(numPerTable, prefix)
1426 }
1427
1428 // If the number of splits is still < 32, then look at the memtables.
1429 if len(splits) < 32 {
1430 maxPerSplit := 10000
1431 mtSplits := func(mt *memTable) {
1432 if mt == nil {
1433 return
1434 }
1435 count := 0
1436 iter := mt.sl.NewIterator()
1437 for iter.SeekToFirst(); iter.Valid(); iter.Next() {
1438 if count%maxPerSplit == 0 {
1439 // Add a split every maxPerSplit keys.
1440 if bytes.HasPrefix(iter.Key(), prefix) {
1441 splits = append(splits, string(iter.Key()))
1442 }
1443 }
1444 count += 1
1445 }
1446 _ = iter.Close()
1447 }
1448
1449 db.lock.Lock()
1450 defer db.lock.Unlock()
1451 var memTables []*memTable
1452 memTables = append(memTables, db.imm...)
1453 for _, mt := range memTables {
1454 mtSplits(mt)
1455 }
1456 mtSplits(db.mt)
1457 }
1458
1459 // We have our splits now. Let's convert them to ranges.
1460 sort.Strings(splits)
1461 var ranges []*keyRange
1462 var start []byte
1463 for _, key := range splits {
1464 ranges = append(ranges, &keyRange{left: start, right: y.SafeCopy(nil, []byte(key))})
1465 start = y.SafeCopy(nil, []byte(key))
1466 }
1467 ranges = append(ranges, &keyRange{left: start})
1468
1469 // Figure out the approximate table size this range has to deal with.
1470 for _, t := range tables {
1471 tr := keyRange{left: t.Left, right: t.Right}
1472 for _, r := range ranges {
1473 if len(r.left) == 0 || len(r.right) == 0 {
1474 continue
1475 }
1476 if r.overlapsWith(tr) {
1477 r.size += int64(t.UncompressedSize)
1478 }
1479 }
1480 }
1481
1482 var total int64
1483 for _, r := range ranges {
1484 total += r.size
1485 }
1486 if total == 0 {
1487 return ranges
1488 }
1489 // Figure out the average size, so we know how to bin the ranges together.
1490 avg := total / int64(numRanges)
1491
1492 var out []*keyRange
1493 var i int
1494 for i < len(ranges) {
1495 r := ranges[i]
1496 cur := &keyRange{left: r.left, size: r.size, right: r.right}
1497 i++
1498 for ; i < len(ranges); i++ {
1499 next := ranges[i]
1500 if cur.size+next.size > avg {
1501 break
1502 }
1503 cur.right = next.right
1504 cur.size += next.size
1505 }
1506 out = append(out, cur)
1507 }
1508 return out
1509 }
1510
1511 // MaxBatchCount returns max possible entries in batch
1512 func (db *DB) MaxBatchCount() int64 {
1513 return db.opt.maxBatchCount
1514 }
1515
1516 // MaxBatchSize returns max possible batch size
1517 func (db *DB) MaxBatchSize() int64 {
1518 return db.opt.maxBatchSize
1519 }
1520
1521 func (db *DB) stopMemoryFlush() {
1522 // Stop memtable flushes.
1523 if db.closers.memtable != nil {
1524 close(db.flushChan)
1525 db.closers.memtable.SignalAndWait()
1526 }
1527 }
1528
1529 func (db *DB) stopCompactions() {
1530 // Stop compactions.
1531 if db.closers.compactors != nil {
1532 db.closers.compactors.SignalAndWait()
1533 }
1534 }
1535
1536 func (db *DB) startCompactions() {
1537 // Resume compactions.
1538 if db.closers.compactors != nil {
1539 db.closers.compactors = z.NewCloser(1)
1540 db.lc.startCompact(db.closers.compactors)
1541 }
1542 }
1543
1544 func (db *DB) startMemoryFlush() {
1545 // Start memory fluhser.
1546 if db.closers.memtable != nil {
1547 db.flushChan = make(chan *memTable, db.opt.NumMemtables)
1548 db.closers.memtable = z.NewCloser(1)
1549 go func() {
1550 db.flushMemtable(db.closers.memtable)
1551 }()
1552 }
1553 }
1554
1555 // Flatten can be used to force compactions on the LSM tree so all the tables fall on the same
1556 // level. This ensures that all the versions of keys are colocated and not split across multiple
1557 // levels, which is necessary after a restore from backup. During Flatten, live compactions are
1558 // stopped. Ideally, no writes are going on during Flatten. Otherwise, it would create competition
1559 // between flattening the tree and new tables being created at level zero.
1560 func (db *DB) Flatten(workers int) error {
1561
1562 db.stopCompactions()
1563 defer db.startCompactions()
1564
1565 compactAway := func(cp compactionPriority) error {
1566 db.opt.Infof("Attempting to compact with %+v\n", cp)
1567 errCh := make(chan error, 1)
1568 for i := 0; i < workers; i++ {
1569 go func() {
1570 errCh <- db.lc.doCompact(175, cp)
1571 }()
1572 }
1573 var success int
1574 var rerr error
1575 for i := 0; i < workers; i++ {
1576 err := <-errCh
1577 if err != nil {
1578 rerr = err
1579 db.opt.Warningf("While running doCompact with %+v. Error: %v\n", cp, err)
1580 } else {
1581 success++
1582 }
1583 }
1584 if success == 0 {
1585 return rerr
1586 }
1587 // We could do at least one successful compaction. So, we'll consider this a success.
1588 db.opt.Infof("%d compactor(s) succeeded. One or more tables from level %d compacted.\n",
1589 success, cp.level)
1590 return nil
1591 }
1592
1593 hbytes := func(sz int64) string {
1594 return humanize.IBytes(uint64(sz))
1595 }
1596
1597 t := db.lc.levelTargets()
1598 for {
1599 db.opt.Infof("\n")
1600 var levels []int
1601 for i, l := range db.lc.levels {
1602 sz := l.getTotalSize()
1603 db.opt.Infof("Level: %d. %8s Size. %8s Max.\n",
1604 i, hbytes(l.getTotalSize()), hbytes(t.targetSz[i]))
1605 if sz > 0 {
1606 levels = append(levels, i)
1607 }
1608 }
1609 if len(levels) <= 1 {
1610 prios := db.lc.pickCompactLevels(nil)
1611 if len(prios) == 0 || prios[0].score <= 1.0 {
1612 db.opt.Infof("All tables consolidated into one level. Flattening done.\n")
1613 return nil
1614 }
1615 if err := compactAway(prios[0]); err != nil {
1616 return err
1617 }
1618 continue
1619 }
1620 // Create an artificial compaction priority, to ensure that we compact the level.
1621 cp := compactionPriority{level: levels[0], score: 1.71}
1622 if err := compactAway(cp); err != nil {
1623 return err
1624 }
1625 }
1626 }
1627
1628 func (db *DB) blockWrite() error {
1629 // Stop accepting new writes.
1630 if !db.blockWrites.CompareAndSwap(0, 1) {
1631 return ErrBlockedWrites
1632 }
1633
1634 // Make all pending writes finish. The following will also close writeCh.
1635 db.closers.writes.SignalAndWait()
1636 db.opt.Infof("Writes flushed. Stopping compactions now...")
1637 return nil
1638 }
1639
1640 func (db *DB) unblockWrite() {
1641 db.closers.writes = z.NewCloser(1)
1642 go db.doWrites(db.closers.writes)
1643
1644 // Resume writes.
1645 db.blockWrites.Store(0)
1646 }
1647
1648 func (db *DB) prepareToDrop() (func(), error) {
1649 if db.opt.ReadOnly {
1650 panic("Attempting to drop data in read-only mode.")
1651 }
1652 // In order prepare for drop, we need to block the incoming writes and
1653 // write it to db. Then, flush all the pending memtable. So that, we
1654 // don't miss any entries.
1655 if err := db.blockWrite(); err != nil {
1656 return func() {}, err
1657 }
1658 reqs := make([]*request, 0, 10)
1659 for {
1660 select {
1661 case r := <-db.writeCh:
1662 reqs = append(reqs, r)
1663 default:
1664 if err := db.writeRequests(reqs); err != nil {
1665 db.opt.Errorf("writeRequests: %v", err)
1666 }
1667 db.stopMemoryFlush()
1668 return func() {
1669 db.opt.Infof("Resuming writes")
1670 db.startMemoryFlush()
1671 db.unblockWrite()
1672 }, nil
1673 }
1674 }
1675 }
1676
1677 // DropAll would drop all the data stored in Badger. It does this in the following way.
1678 // - Stop accepting new writes.
1679 // - Pause memtable flushes and compactions.
1680 // - Pick all tables from all levels, create a changeset to delete all these
1681 // tables and apply it to manifest.
1682 // - Pick all log files from value log, and delete all of them. Restart value log files from zero.
1683 // - Resume memtable flushes and compactions.
1684 //
1685 // NOTE: DropAll is resilient to concurrent writes, but not to reads. It is up to the user to not do
1686 // any reads while DropAll is going on, otherwise they may result in panics. Ideally, both reads and
1687 // writes are paused before running DropAll, and resumed after it is finished.
1688 func (db *DB) DropAll() error {
1689 f, err := db.dropAll()
1690 if f != nil {
1691 f()
1692 }
1693 return err
1694 }
1695
1696 func (db *DB) dropAll() (func(), error) {
1697 db.opt.Infof("DropAll called. Blocking writes...")
1698 f, err := db.prepareToDrop()
1699 if err != nil {
1700 return f, err
1701 }
1702 // prepareToDrop will stop all the incoming write and flushes any pending memtables.
1703 // Before we drop, we'll stop the compaction because anyways all the datas are going to
1704 // be deleted.
1705 db.stopCompactions()
1706 resume := func() {
1707 db.startCompactions()
1708 f()
1709 }
1710 // Block all foreign interactions with memory tables.
1711 db.lock.Lock()
1712 defer db.lock.Unlock()
1713
1714 // Remove inmemory tables. Calling DecrRef for safety. Not sure if they're absolutely needed.
1715 db.mt.DecrRef()
1716 for _, mt := range db.imm {
1717 mt.DecrRef()
1718 }
1719 db.imm = db.imm[:0]
1720 db.mt, err = db.newMemTable() // Set it up for future writes.
1721 if err != nil {
1722 return resume, y.Wrapf(err, "cannot open new memtable")
1723 }
1724
1725 num, err := db.lc.dropTree()
1726 if err != nil {
1727 return resume, err
1728 }
1729 db.opt.Infof("Deleted %d SSTables. Now deleting value logs...\n", num)
1730
1731 num, err = db.vlog.dropAll()
1732 if err != nil {
1733 return resume, err
1734 }
1735 db.lc.nextFileID.Store(1)
1736 db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
1737 db.blockCache.Clear()
1738 db.indexCache.Clear()
1739 db.threshold.Clear(db.opt)
1740 return resume, nil
1741 }
1742
1743 // DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
1744 // - Stop accepting new writes.
1745 // - Stop memtable flushes before acquiring lock. Because we're acquiring lock here
1746 // and memtable flush stalls for lock, which leads to deadlock
1747 // - Flush out all memtables, skipping over keys with the given prefix, Kp.
1748 // - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp
1749 // back after a restart.
1750 // - Stop compaction.
1751 // - Compact L0->L1, skipping over Kp.
1752 // - Compact rest of the levels, Li->Li, picking tables which have Kp.
1753 // - Resume memtable flushes, compactions and writes.
1754 func (db *DB) DropPrefix(prefixes ...[]byte) error {
1755 if len(prefixes) == 0 {
1756 return nil
1757 }
1758 db.opt.Infof("DropPrefix called for %s", prefixes)
1759 f, err := db.prepareToDrop()
1760 if err != nil {
1761 return err
1762 }
1763 defer f()
1764
1765 var filtered [][]byte
1766 if filtered, err = db.filterPrefixesToDrop(prefixes); err != nil {
1767 return err
1768 }
1769 // If there is no prefix for which the data already exist, do not do anything.
1770 if len(filtered) == 0 {
1771 db.opt.Infof("No prefixes to drop")
1772 return nil
1773 }
1774 // Block all foreign interactions with memory tables.
1775 db.lock.Lock()
1776 defer db.lock.Unlock()
1777
1778 db.imm = append(db.imm, db.mt)
1779 for _, memtable := range db.imm {
1780 if memtable.sl.Empty() {
1781 memtable.DecrRef()
1782 continue
1783 }
1784 db.opt.Debugf("Flushing memtable")
1785 if err := db.handleMemTableFlush(memtable, filtered); err != nil {
1786 db.opt.Errorf("While trying to flush memtable: %v", err)
1787 return err
1788 }
1789 memtable.DecrRef()
1790 }
1791 db.stopCompactions()
1792 defer db.startCompactions()
1793 db.imm = db.imm[:0]
1794 db.mt, err = db.newMemTable()
1795 if err != nil {
1796 return y.Wrapf(err, "cannot create new mem table")
1797 }
1798
1799 // Drop prefixes from the levels.
1800 if err := db.lc.dropPrefixes(filtered); err != nil {
1801 return err
1802 }
1803 db.opt.Infof("DropPrefix done")
1804 return nil
1805 }
1806
1807 func (db *DB) filterPrefixesToDrop(prefixes [][]byte) ([][]byte, error) {
1808 var filtered [][]byte
1809 for _, prefix := range prefixes {
1810 err := db.View(func(txn *Txn) error {
1811 iopts := DefaultIteratorOptions
1812 iopts.Prefix = prefix
1813 iopts.PrefetchValues = false
1814 itr := txn.NewIterator(iopts)
1815 defer itr.Close()
1816 itr.Rewind()
1817 if itr.ValidForPrefix(prefix) {
1818 filtered = append(filtered, prefix)
1819 }
1820 return nil
1821 })
1822 if err != nil {
1823 return filtered, err
1824 }
1825 }
1826 return filtered, nil
1827 }
1828
1829 // Checks if the key is banned. Returns the respective error if the key belongs to any of the banned
1830 // namepspaces. Else it returns nil.
1831 func (db *DB) isBanned(key []byte) error {
1832 if db.opt.NamespaceOffset < 0 {
1833 return nil
1834 }
1835 if len(key) <= db.opt.NamespaceOffset+8 {
1836 return nil
1837 }
1838 if db.bannedNamespaces.has(y.BytesToU64(key[db.opt.NamespaceOffset:])) {
1839 return ErrBannedKey
1840 }
1841 return nil
1842 }
1843
1844 // BanNamespace bans a namespace. Read/write to keys belonging to any of such namespace is denied.
1845 func (db *DB) BanNamespace(ns uint64) error {
1846 if db.opt.NamespaceOffset < 0 {
1847 return ErrNamespaceMode
1848 }
1849 db.opt.Infof("Banning namespace: %d", ns)
1850 // First set the banned namespaces in DB and then update the in-memory structure.
1851 key := y.KeyWithTs(append(bannedNsKey, y.U64ToBytes(ns)...), 1)
1852 entry := []*Entry{{
1853 Key: key,
1854 Value: nil,
1855 }}
1856 req, err := db.sendToWriteCh(entry)
1857 if err != nil {
1858 return err
1859 }
1860 if err := req.Wait(); err != nil {
1861 return err
1862 }
1863 db.bannedNamespaces.add(ns)
1864 return nil
1865 }
1866
1867 // BannedNamespaces returns the list of prefixes banned for DB.
1868 func (db *DB) BannedNamespaces() []uint64 {
1869 return db.bannedNamespaces.all()
1870 }
1871
1872 // KVList contains a list of key-value pairs.
1873 type KVList = pb.KVList
1874
1875 // Subscribe can be used to watch key changes for the given key prefixes and the ignore string.
1876 // At least one prefix should be passed, or an error will be returned.
1877 // You can use an empty prefix to monitor all changes to the DB.
1878 // Ignore string is the byte ranges for which prefix matching will be ignored.
1879 // For example: ignore = "2-3", and prefix = "abc" will match for keys "abxxc", "abdfc" etc.
1880 // This function blocks until the given context is done or an error occurs.
1881 // The given function will be called with a new KVList containing the modified keys and the
1882 // corresponding values.
1883 func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches []pb.Match) error {
1884 if cb == nil {
1885 return ErrNilCallback
1886 }
1887
1888 c := z.NewCloser(1)
1889 s, err := db.pub.newSubscriber(c, matches)
1890 if err != nil {
1891 return y.Wrapf(err, "while creating a new subscriber")
1892 }
1893 slurp := func(batch *pb.KVList) error {
1894 for {
1895 select {
1896 case kvs := <-s.sendCh:
1897 batch.Kv = append(batch.Kv, kvs.Kv...)
1898 default:
1899 if len(batch.GetKv()) > 0 {
1900 return cb(batch)
1901 }
1902 return nil
1903 }
1904 }
1905 }
1906
1907 drain := func() {
1908 for {
1909 select {
1910 case _, ok := <-s.sendCh:
1911 if !ok {
1912 // Channel is closed.
1913 return
1914 }
1915 default:
1916 return
1917 }
1918 }
1919 }
1920 for {
1921 select {
1922 case <-c.HasBeenClosed():
1923 // No need to delete here. Closer will be called only while
1924 // closing DB. Subscriber will be deleted by cleanSubscribers.
1925 err := slurp(new(pb.KVList))
1926 // Drain if any pending updates.
1927 c.Done()
1928 return err
1929 case <-ctx.Done():
1930 c.Done()
1931 s.active.Store(0)
1932 drain()
1933 db.pub.deleteSubscriber(s.id)
1934 // Delete the subscriber to avoid further updates.
1935 return ctx.Err()
1936 case batch := <-s.sendCh:
1937 err := slurp(batch)
1938 if err != nil {
1939 c.Done()
1940 s.active.Store(0)
1941 drain()
1942 // Delete the subscriber if there is an error by the callback.
1943 db.pub.deleteSubscriber(s.id)
1944 return err
1945 }
1946 }
1947 }
1948 }
1949
1950 func (db *DB) syncDir(dir string) error {
1951 if db.opt.InMemory {
1952 return nil
1953 }
1954 return syncDir(dir)
1955 }
1956
1957 func createDirs(opt Options) error {
1958 for _, path := range []string{opt.Dir, opt.ValueDir} {
1959 dirExists, err := exists(path)
1960 if err != nil {
1961 return y.Wrapf(err, "Invalid Dir: %q", path)
1962 }
1963 if !dirExists {
1964 if opt.ReadOnly {
1965 return fmt.Errorf("Cannot find directory %q for read-only open", path)
1966 }
1967 // Try to create the directory
1968 err = os.MkdirAll(path, 0700)
1969 if err != nil {
1970 return y.Wrapf(err, "Error Creating Dir: %q", path)
1971 }
1972 }
1973 }
1974 return nil
1975 }
1976
1977 // Stream the contents of this DB to a new DB with options outOptions that will be
1978 // created in outDir.
1979 func (db *DB) StreamDB(outOptions Options) error {
1980 outDir := outOptions.Dir
1981
1982 // Open output DB.
1983 outDB, err := OpenManaged(outOptions)
1984 if err != nil {
1985 return y.Wrapf(err, "cannot open out DB at %s", outDir)
1986 }
1987 defer outDB.Close()
1988 writer := outDB.NewStreamWriter()
1989 if err := writer.Prepare(); err != nil {
1990 return y.Wrapf(err, "cannot create stream writer in out DB at %s", outDir)
1991 }
1992
1993 // Stream contents of DB to the output DB.
1994 stream := db.NewStreamAt(math.MaxUint64)
1995 stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
1996
1997 stream.Send = func(buf *z.Buffer) error {
1998 return writer.Write(buf)
1999 }
2000 if err := stream.Orchestrate(context.Background()); err != nil {
2001 return y.Wrapf(err, "cannot stream DB to out DB at %s", outDir)
2002 }
2003 if err := writer.Flush(); err != nil {
2004 return y.Wrapf(err, "cannot flush writer")
2005 }
2006 return nil
2007 }
2008
2009 // Opts returns a copy of the DB options.
2010 func (db *DB) Opts() Options {
2011 return db.opt
2012 }
2013
2014 type CacheType int
2015
2016 const (
2017 BlockCache CacheType = iota
2018 IndexCache
2019 )
2020
2021 // CacheMaxCost updates the max cost of the given cache (either block or index cache).
2022 // The call will have an effect only if the DB was created with the cache. Otherwise it is
2023 // a no-op. If you pass a negative value, the function will return the current value
2024 // without updating it.
2025 func (db *DB) CacheMaxCost(cache CacheType, maxCost int64) (int64, error) {
2026 if db == nil {
2027 return 0, nil
2028 }
2029
2030 if maxCost < 0 {
2031 switch cache {
2032 case BlockCache:
2033 return db.blockCache.MaxCost(), nil
2034 case IndexCache:
2035 return db.indexCache.MaxCost(), nil
2036 default:
2037 return 0, errors.New("invalid cache type")
2038 }
2039 }
2040
2041 switch cache {
2042 case BlockCache:
2043 db.blockCache.UpdateMaxCost(maxCost)
2044 return maxCost, nil
2045 case IndexCache:
2046 db.indexCache.UpdateMaxCost(maxCost)
2047 return maxCost, nil
2048 default:
2049 return 0, errors.New("invalid cache type")
2050 }
2051 }
2052
2053 func (db *DB) LevelsToString() string {
2054 levels := db.Levels()
2055 h := func(sz int64) string {
2056 return humanize.IBytes(uint64(sz))
2057 }
2058 base := func(b bool) string {
2059 if b {
2060 return "B"
2061 }
2062 return " "
2063 }
2064
2065 var b strings.Builder
2066 b.WriteRune('\n')
2067 for _, li := range levels {
2068 b.WriteString(fmt.Sprintf(
2069 "Level %d [%s]: NumTables: %02d. Size: %s of %s. Score: %.2f->%.2f"+
2070 " StaleData: %s Target FileSize: %s\n",
2071 li.Level, base(li.IsBaseLevel), li.NumTables,
2072 h(li.Size), h(li.TargetSize), li.Score, li.Adjusted, h(li.StaleDatSize),
2073 h(li.TargetFileSize)))
2074 }
2075 b.WriteString("Level Done\n")
2076 return b.String()
2077 }
2078