value.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package badger
7
8 import (
9 "bytes"
10 "context"
11 "errors"
12 "fmt"
13 "hash"
14 "hash/crc32"
15 "io"
16 "math"
17 "os"
18 "sort"
19 "strconv"
20 "strings"
21 "sync"
22 "sync/atomic"
23
24 "go.opentelemetry.io/otel"
25 "go.opentelemetry.io/otel/attribute"
26
27 "github.com/dgraph-io/badger/v4/y"
28 "github.com/dgraph-io/ristretto/v2/z"
29 )
30
31 // maxVlogFileSize is the maximum size of the vlog file which can be created. Vlog Offset is of
32 // uint32, so limiting at max uint32.
33 var maxVlogFileSize uint32 = math.MaxUint32
34
35 // Values have their first byte being byteData or byteDelete. This helps us distinguish between
36 // a key that has never been seen and a key that has been explicitly deleted.
37 const (
38 bitDelete byte = 1 << 0 // Set if the key has been deleted.
39 bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key.
40 bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
41 // Set if item shouldn't be discarded via compactions (used by merge operator)
42 bitMergeEntry byte = 1 << 3
43 // The MSB 2 bits are for transactions.
44 bitTxn byte = 1 << 6 // Set if the entry is part of a txn.
45 bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.
46
47 mi int64 = 1 << 20 //nolint:unused
48
49 // size of vlog header.
50 // +----------------+------------------+
51 // | keyID(8 bytes) | baseIV(12 bytes)|
52 // +----------------+------------------+
53 vlogHeaderSize = 20
54 )
55
56 var errStop = errors.New("Stop iteration")
57 var errTruncate = errors.New("Do truncate")
58
59 type logEntry func(e Entry, vp valuePointer) error
60
61 type safeRead struct {
62 k []byte
63 v []byte
64
65 recordOffset uint32
66 lf *logFile
67 }
68
69 // hashReader implements io.Reader, io.ByteReader interfaces. It also keeps track of the number
70 // bytes read. The hashReader writes to h (hash) what it reads from r.
71 type hashReader struct {
72 r io.Reader
73 h hash.Hash32
74 bytesRead int // Number of bytes read.
75 }
76
77 func newHashReader(r io.Reader) *hashReader {
78 hash := crc32.New(y.CastagnoliCrcTable)
79 return &hashReader{
80 r: r,
81 h: hash,
82 }
83 }
84
85 // Read reads len(p) bytes from the reader. Returns the number of bytes read, error on failure.
86 func (t *hashReader) Read(p []byte) (int, error) {
87 n, err := t.r.Read(p)
88 if err != nil {
89 return n, err
90 }
91 t.bytesRead += n
92 return t.h.Write(p[:n])
93 }
94
95 // ReadByte reads exactly one byte from the reader. Returns error on failure.
96 func (t *hashReader) ReadByte() (byte, error) {
97 b := make([]byte, 1)
98 _, err := t.Read(b)
99 return b[0], err
100 }
101
102 // Sum32 returns the sum32 of the underlying hash.
103 func (t *hashReader) Sum32() uint32 {
104 return t.h.Sum32()
105 }
106
107 // Entry reads an entry from the provided reader. It also validates the checksum for every entry
108 // read. Returns error on failure.
109 func (r *safeRead) Entry(reader io.Reader) (*Entry, error) {
110 tee := newHashReader(reader)
111 var h header
112 hlen, err := h.DecodeFrom(tee)
113 if err != nil {
114 return nil, err
115 }
116 if h.klen > uint32(1<<16) { // Key length must be below uint16.
117 return nil, errTruncate
118 }
119 kl := int(h.klen)
120 if cap(r.k) < kl {
121 r.k = make([]byte, 2*kl)
122 }
123 vl := int(h.vlen)
124 if cap(r.v) < vl {
125 r.v = make([]byte, 2*vl)
126 }
127
128 e := &Entry{}
129 e.offset = r.recordOffset
130 e.hlen = hlen
131 buf := make([]byte, h.klen+h.vlen)
132 if _, err := io.ReadFull(tee, buf[:]); err != nil {
133 if err == io.EOF {
134 err = errTruncate
135 }
136 return nil, err
137 }
138 if r.lf.encryptionEnabled() {
139 if buf, err = r.lf.decryptKV(buf[:], r.recordOffset); err != nil {
140 return nil, err
141 }
142 }
143 e.Key = buf[:h.klen]
144 e.Value = buf[h.klen:]
145 var crcBuf [crc32.Size]byte
146 if _, err := io.ReadFull(reader, crcBuf[:]); err != nil {
147 if err == io.EOF {
148 err = errTruncate
149 }
150 return nil, err
151 }
152 crc := y.BytesToU32(crcBuf[:])
153 if crc != tee.Sum32() {
154 return nil, errTruncate
155 }
156 e.meta = h.meta
157 e.UserMeta = h.userMeta
158 e.ExpiresAt = h.expiresAt
159 return e, nil
160 }
161
162 func (vlog *valueLog) rewrite(f *logFile) error {
163 vlog.filesLock.RLock()
164 for _, fid := range vlog.filesToBeDeleted {
165 if fid == f.fid {
166 vlog.filesLock.RUnlock()
167 return fmt.Errorf("value log file already marked for deletion fid: %d", fid)
168 }
169 }
170 maxFid := vlog.maxFid
171 y.AssertTruef(f.fid < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
172 vlog.filesLock.RUnlock()
173
174 vlog.opt.Infof("Rewriting fid: %d", f.fid)
175 wb := make([]*Entry, 0, 1000)
176 var size int64
177
178 y.AssertTrue(vlog.db != nil)
179 var count, moved int
180 fe := func(e Entry) error {
181 count++
182 if count%100000 == 0 {
183 vlog.opt.Debugf("Processing entry %d", count)
184 }
185
186 vs, err := vlog.db.get(e.Key)
187 if err != nil {
188 return err
189 }
190 if discardEntry(e, vs, vlog.db) {
191 return nil
192 }
193
194 // Value is still present in value log.
195 if len(vs.Value) == 0 {
196 return fmt.Errorf("Empty value: %+v", vs)
197 }
198 var vp valuePointer
199 vp.Decode(vs.Value)
200
201 // If the entry found from the LSM Tree points to a newer vlog file, don't do anything.
202 if vp.Fid > f.fid {
203 return nil
204 }
205 // If the entry found from the LSM Tree points to an offset greater than the one
206 // read from vlog, don't do anything.
207 if vp.Offset > e.offset {
208 return nil
209 }
210 // If the entry read from LSM Tree and vlog file point to the same vlog file and offset,
211 // insert them back into the DB.
212 // NOTE: It might be possible that the entry read from the LSM Tree points to
213 // an older vlog file. See the comments in the else part.
214 if vp.Fid == f.fid && vp.Offset == e.offset {
215 moved++
216 // This new entry only contains the key, and a pointer to the value.
217 ne := new(Entry)
218 // Remove only the bitValuePointer and transaction markers. We
219 // should keep the other bits.
220 ne.meta = e.meta &^ (bitValuePointer | bitTxn | bitFinTxn)
221 ne.UserMeta = e.UserMeta
222 ne.ExpiresAt = e.ExpiresAt
223 ne.Key = append([]byte{}, e.Key...)
224 ne.Value = append([]byte{}, e.Value...)
225 es := ne.estimateSizeAndSetThreshold(vlog.db.valueThreshold())
226 // Consider size of value as well while considering the total size
227 // of the batch. There have been reports of high memory usage in
228 // rewrite because we don't consider the value size. See #1292.
229 es += int64(len(e.Value))
230
231 // Ensure length and size of wb is within transaction limits.
232 if int64(len(wb)+1) >= vlog.opt.maxBatchCount ||
233 size+es >= vlog.opt.maxBatchSize {
234 if err := vlog.db.batchSet(wb); err != nil {
235 return err
236 }
237 size = 0
238 wb = wb[:0]
239 }
240 wb = append(wb, ne)
241 size += es
242 } else { //nolint:staticcheck
243 // It might be possible that the entry read from LSM Tree points to
244 // an older vlog file. This can happen in the following situation.
245 // Assume DB is opened with
246 // numberOfVersionsToKeep=1
247 //
248 // Now, if we have ONLY one key in the system "FOO" which has been
249 // updated 3 times and the same key has been garbage collected 3
250 // times, we'll have 3 versions of the movekey
251 // for the same key "FOO".
252 //
253 // NOTE: moveKeyi is the gc'ed version of the original key with version i
254 // We're calling the gc'ed keys as moveKey to simplify the
255 // explanantion. We used to add move keys but we no longer do that.
256 //
257 // Assume we have 3 move keys in L0.
258 // - moveKey1 (points to vlog file 10),
259 // - moveKey2 (points to vlog file 14) and
260 // - moveKey3 (points to vlog file 15).
261 //
262 // Also, assume there is another move key "moveKey1" (points to
263 // vlog file 6) (this is also a move Key for key "FOO" ) on upper
264 // levels (let's say 3). The move key "moveKey1" on level 0 was
265 // inserted because vlog file 6 was GCed.
266 //
267 // Here's what the arrangement looks like
268 // L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15)
269 // L1 => ....
270 // L2 => ....
271 // L3 => (moveKey1 => vlog6)
272 //
273 // When L0 compaction runs, it keeps only moveKey3 because the number of versions
274 // to keep is set to 1. (we've dropped moveKey1's latest version)
275 //
276 // The new arrangement of keys is
277 // L0 => ....
278 // L1 => (moveKey3 => vlog15)
279 // L2 => ....
280 // L3 => (moveKey1 => vlog6)
281 //
282 // Now if we try to GC vlog file 10, the entry read from vlog file
283 // will point to vlog10 but the entry read from LSM Tree will point
284 // to vlog6. The move key read from LSM tree will point to vlog6
285 // because we've asked for version 1 of the move key.
286 //
287 // This might seem like an issue but it's not really an issue
288 // because the user has set the number of versions to keep to 1 and
289 // the latest version of moveKey points to the correct vlog file
290 // and offset. The stale move key on L3 will be eventually dropped
291 // by compaction because there is a newer versions in the upper
292 // levels.
293 }
294 return nil
295 }
296
297 _, err := f.iterate(vlog.opt.ReadOnly, 0, func(e Entry, vp valuePointer) error {
298 return fe(e)
299 })
300 if err != nil {
301 return err
302 }
303
304 batchSize := 1024
305 var loops int
306 for i := 0; i < len(wb); {
307 loops++
308 if batchSize == 0 {
309 vlog.db.opt.Warningf("We shouldn't reach batch size of zero.")
310 return ErrNoRewrite
311 }
312 end := i + batchSize
313 if end > len(wb) {
314 end = len(wb)
315 }
316 if err := vlog.db.batchSet(wb[i:end]); err != nil {
317 if err == ErrTxnTooBig {
318 // Decrease the batch size to half.
319 batchSize = batchSize / 2
320 continue
321 }
322 return err
323 }
324 i += batchSize
325 }
326 vlog.opt.Infof("Processed %d entries in %d loops", len(wb), loops)
327 vlog.opt.Infof("Total entries: %d. Moved: %d", count, moved)
328 vlog.opt.Infof("Removing fid: %d", f.fid)
329 var deleteFileNow bool
330 // Entries written to LSM. Remove the older file now.
331 {
332 vlog.filesLock.Lock()
333 // Just a sanity-check.
334 if _, ok := vlog.filesMap[f.fid]; !ok {
335 vlog.filesLock.Unlock()
336 return fmt.Errorf("Unable to find fid: %d", f.fid)
337 }
338 if vlog.iteratorCount() == 0 {
339 delete(vlog.filesMap, f.fid)
340 deleteFileNow = true
341 } else {
342 vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, f.fid)
343 }
344 vlog.filesLock.Unlock()
345 }
346
347 if deleteFileNow {
348 if err := vlog.deleteLogFile(f); err != nil {
349 return err
350 }
351 }
352 return nil
353 }
354
355 func (vlog *valueLog) incrIteratorCount() {
356 vlog.numActiveIterators.Add(1)
357 }
358
359 func (vlog *valueLog) iteratorCount() int {
360 return int(vlog.numActiveIterators.Load())
361 }
362
363 func (vlog *valueLog) decrIteratorCount() error {
364 num := vlog.numActiveIterators.Add(-1)
365 if num != 0 {
366 return nil
367 }
368
369 vlog.filesLock.Lock()
370 lfs := make([]*logFile, 0, len(vlog.filesToBeDeleted))
371 for _, id := range vlog.filesToBeDeleted {
372 lfs = append(lfs, vlog.filesMap[id])
373 delete(vlog.filesMap, id)
374 }
375 vlog.filesToBeDeleted = nil
376 vlog.filesLock.Unlock()
377
378 for _, lf := range lfs {
379 if err := vlog.deleteLogFile(lf); err != nil {
380 return err
381 }
382 }
383 return nil
384 }
385
386 func (vlog *valueLog) deleteLogFile(lf *logFile) error {
387 if lf == nil {
388 return nil
389 }
390 lf.lock.Lock()
391 defer lf.lock.Unlock()
392 // Delete fid from discard stats as well.
393 vlog.discardStats.Update(lf.fid, -1)
394
395 return lf.Delete()
396 }
397
398 func (vlog *valueLog) dropAll() (int, error) {
399 // If db is opened in InMemory mode, we don't need to do anything since there are no vlog files.
400 if vlog.db.opt.InMemory {
401 return 0, nil
402 }
403 // We don't want to block dropAll on any pending transactions. So, don't worry about iterator
404 // count.
405 var count int
406 deleteAll := func() error {
407 vlog.filesLock.Lock()
408 defer vlog.filesLock.Unlock()
409 for _, lf := range vlog.filesMap {
410 if err := vlog.deleteLogFile(lf); err != nil {
411 return err
412 }
413 count++
414 }
415 vlog.filesMap = make(map[uint32]*logFile)
416 vlog.maxFid = 0
417 return nil
418 }
419 if err := deleteAll(); err != nil {
420 return count, err
421 }
422
423 vlog.db.opt.Infof("Value logs deleted. Creating value log file: 1")
424 if _, err := vlog.createVlogFile(); err != nil { // Called while writes are stopped.
425 return count, err
426 }
427 return count, nil
428 }
429
430 func (db *DB) valueThreshold() int64 {
431 return db.threshold.valueThreshold.Load()
432 }
433
434 type valueLog struct {
435 dirPath string
436
437 // guards our view of which files exist, which to be deleted, how many active iterators
438 filesLock sync.RWMutex
439 filesMap map[uint32]*logFile
440 maxFid uint32
441 filesToBeDeleted []uint32
442 // A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
443 numActiveIterators atomic.Int32
444
445 db *DB
446 writableLogOffset atomic.Uint32 // read by read, written by write
447 numEntriesWritten uint32
448 opt Options
449
450 garbageCh chan struct{}
451 discardStats *discardStats
452 }
453
454 func vlogFilePath(dirPath string, fid uint32) string {
455 return fmt.Sprintf("%s%s%06d.vlog", dirPath, string(os.PathSeparator), fid)
456 }
457
458 func (vlog *valueLog) fpath(fid uint32) string {
459 return vlogFilePath(vlog.dirPath, fid)
460 }
461
462 func (vlog *valueLog) populateFilesMap() error {
463 vlog.filesMap = make(map[uint32]*logFile)
464
465 files, err := os.ReadDir(vlog.dirPath)
466 if err != nil {
467 return errFile(err, vlog.dirPath, "Unable to open log dir.")
468 }
469
470 found := make(map[uint64]struct{})
471 for _, file := range files {
472 if !strings.HasSuffix(file.Name(), ".vlog") {
473 continue
474 }
475 fsz := len(file.Name())
476 fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
477 if err != nil {
478 return errFile(err, file.Name(), "Unable to parse log id.")
479 }
480 if _, ok := found[fid]; ok {
481 return errFile(err, file.Name(), "Duplicate file found. Please delete one.")
482 }
483 found[fid] = struct{}{}
484
485 lf := &logFile{
486 fid: uint32(fid),
487 path: vlog.fpath(uint32(fid)),
488 registry: vlog.db.registry,
489 }
490 vlog.filesMap[uint32(fid)] = lf
491 if vlog.maxFid < uint32(fid) {
492 vlog.maxFid = uint32(fid)
493 }
494 }
495 return nil
496 }
497
498 func (vlog *valueLog) createVlogFile() (*logFile, error) {
499 fid := vlog.maxFid + 1
500 path := vlog.fpath(fid)
501 lf := &logFile{
502 fid: fid,
503 path: path,
504 registry: vlog.db.registry,
505 writeAt: vlogHeaderSize,
506 opt: vlog.opt,
507 }
508 err := lf.open(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 2*vlog.opt.ValueLogFileSize)
509 if err != z.NewFile && err != nil {
510 return nil, err
511 }
512
513 vlog.filesLock.Lock()
514 vlog.filesMap[fid] = lf
515 y.AssertTrue(vlog.maxFid < fid)
516 vlog.maxFid = fid
517 // writableLogOffset is only written by write func, by read by Read func.
518 // To avoid a race condition, all reads and updates to this variable must be
519 // done via atomics.
520 vlog.writableLogOffset.Store(vlogHeaderSize)
521 vlog.numEntriesWritten = 0
522 vlog.filesLock.Unlock()
523
524 return lf, nil
525 }
526
527 func errFile(err error, path string, msg string) error {
528 return fmt.Errorf("%s. Path=%s. Error=%v", msg, path, err)
529 }
530
531 // init initializes the value log struct. This initialization needs to happen
532 // before compactions start.
533 func (vlog *valueLog) init(db *DB) {
534 vlog.opt = db.opt
535 vlog.db = db
536 // We don't need to open any vlog files or collect stats for GC if DB is opened
537 // in InMemory mode. InMemory mode doesn't create any files/directories on disk.
538 if vlog.opt.InMemory {
539 return
540 }
541 vlog.dirPath = vlog.opt.ValueDir
542
543 vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
544 lf, err := InitDiscardStats(vlog.opt)
545 y.Check(err)
546 vlog.discardStats = lf
547 // See TestPersistLFDiscardStats for purpose of statement below.
548 db.logToSyncChan(endVLogInitMsg)
549 }
550
551 func (vlog *valueLog) open(db *DB) error {
552 // We don't need to open any vlog files or collect stats for GC if DB is opened
553 // in InMemory mode. InMemory mode doesn't create any files/directories on disk.
554 if db.opt.InMemory {
555 return nil
556 }
557
558 if err := vlog.populateFilesMap(); err != nil {
559 return err
560 }
561 // If no files are found, then create a new file.
562 if len(vlog.filesMap) == 0 {
563 if vlog.opt.ReadOnly {
564 return nil
565 }
566 _, err := vlog.createVlogFile()
567 return y.Wrapf(err, "Error while creating log file in valueLog.open")
568 }
569 fids := vlog.sortedFids()
570 for _, fid := range fids {
571 lf, ok := vlog.filesMap[fid]
572 y.AssertTrue(ok)
573
574 // Just open in RDWR mode. This should not create a new log file.
575 lf.opt = vlog.opt
576 if err := lf.open(vlog.fpath(fid), os.O_RDWR,
577 2*vlog.opt.ValueLogFileSize); err != nil {
578 return y.Wrapf(err, "Open existing file: %q", lf.path)
579 }
580 // We shouldn't delete the maxFid file.
581 if lf.size.Load() == vlogHeaderSize && fid != vlog.maxFid {
582 vlog.opt.Infof("Deleting empty file: %s", lf.path)
583 if err := lf.Delete(); err != nil {
584 return y.Wrapf(err, "while trying to delete empty file: %s", lf.path)
585 }
586 delete(vlog.filesMap, fid)
587 }
588 }
589
590 if vlog.opt.ReadOnly {
591 return nil
592 }
593 // Now we can read the latest value log file, and see if it needs truncation. We could
594 // technically do this over all the value log files, but that would mean slowing down the value
595 // log open.
596 last, ok := vlog.filesMap[vlog.maxFid]
597 y.AssertTrue(ok)
598 lastOff, err := last.iterate(vlog.opt.ReadOnly, vlogHeaderSize,
599 func(_ Entry, vp valuePointer) error {
600 return nil
601 })
602 if err != nil {
603 return y.Wrapf(err, "while iterating over: %s", last.path)
604 }
605 if err := last.Truncate(int64(lastOff)); err != nil {
606 return y.Wrapf(err, "while truncating last value log file: %s", last.path)
607 }
608
609 // Don't write to the old log file. Always create a new one.
610 if _, err := vlog.createVlogFile(); err != nil {
611 return y.Wrapf(err, "Error while creating log file in valueLog.open")
612 }
613 return nil
614 }
615
616 func (vlog *valueLog) Close() error {
617 if vlog == nil || vlog.db == nil || vlog.db.opt.InMemory {
618 return nil
619 }
620
621 vlog.opt.Debugf("Stopping garbage collection of values.")
622 var err error
623 for id, lf := range vlog.filesMap {
624 lf.lock.Lock() // We won’t release the lock.
625 offset := int64(-1)
626
627 if !vlog.opt.ReadOnly && id == vlog.maxFid {
628 offset = int64(vlog.woffset())
629 }
630 if terr := lf.Close(offset); terr != nil && err == nil {
631 err = terr
632 }
633 }
634 if vlog.discardStats != nil {
635 vlog.db.captureDiscardStats()
636 if terr := vlog.discardStats.Close(-1); terr != nil && err == nil {
637 err = terr
638 }
639 }
640 return err
641 }
642
643 // sortedFids returns the file id's not pending deletion, sorted. Assumes we have shared access to
644 // filesMap.
645 func (vlog *valueLog) sortedFids() []uint32 {
646 toBeDeleted := make(map[uint32]struct{})
647 for _, fid := range vlog.filesToBeDeleted {
648 toBeDeleted[fid] = struct{}{}
649 }
650 ret := make([]uint32, 0, len(vlog.filesMap))
651 for fid := range vlog.filesMap {
652 if _, ok := toBeDeleted[fid]; !ok {
653 ret = append(ret, fid)
654 }
655 }
656 sort.Slice(ret, func(i, j int) bool {
657 return ret[i] < ret[j]
658 })
659 return ret
660 }
661
662 type request struct {
663 // Input values
664 Entries []*Entry
665 // Output values and wait group stuff below
666 Ptrs []valuePointer
667 Wg sync.WaitGroup
668 Err error
669 ref atomic.Int32
670 }
671
672 func (req *request) reset() {
673 req.Entries = req.Entries[:0]
674 req.Ptrs = req.Ptrs[:0]
675 req.Wg = sync.WaitGroup{}
676 req.Err = nil
677 req.ref.Store(0)
678 }
679
680 func (req *request) IncrRef() {
681 req.ref.Add(1)
682 }
683
684 func (req *request) DecrRef() {
685 nRef := req.ref.Add(-1)
686 if nRef > 0 {
687 return
688 }
689 req.Entries = nil
690 requestPool.Put(req)
691 }
692
693 func (req *request) Wait() error {
694 req.Wg.Wait()
695 err := req.Err
696 req.DecrRef() // DecrRef after writing to DB.
697 return err
698 }
699
700 type requests []*request
701
702 func (reqs requests) DecrRef() {
703 for _, req := range reqs {
704 req.DecrRef()
705 }
706 }
707
708 func (reqs requests) IncrRef() {
709 for _, req := range reqs {
710 req.IncrRef()
711 }
712 }
713
714 // sync function syncs content of latest value log file to disk. Syncing of value log directory is
715 // not required here as it happens every time a value log file rotation happens(check createVlogFile
716 // function). During rotation, previous value log file also gets synced to disk. It only syncs file
717 // if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with
718 // fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32.
719 func (vlog *valueLog) sync() error {
720 if vlog.opt.SyncWrites || vlog.opt.InMemory {
721 return nil
722 }
723
724 vlog.filesLock.RLock()
725 maxFid := vlog.maxFid
726 curlf := vlog.filesMap[maxFid]
727 // Sometimes it is possible that vlog.maxFid has been increased but file creation
728 // with same id is still in progress and this function is called. In those cases
729 // entry for the file might not be present in vlog.filesMap.
730 if curlf == nil {
731 vlog.filesLock.RUnlock()
732 return nil
733 }
734 curlf.lock.RLock()
735 vlog.filesLock.RUnlock()
736
737 err := curlf.Sync()
738 curlf.lock.RUnlock()
739 return err
740 }
741
742 func (vlog *valueLog) woffset() uint32 {
743 return vlog.writableLogOffset.Load()
744 }
745
746 // validateWrites will check whether the given requests can fit into 4GB vlog file.
747 // NOTE: 4GB is the maximum size we can create for vlog because value pointer offset is of type
748 // uint32. If we create more than 4GB, it will overflow uint32. So, limiting the size to 4GB.
749 func (vlog *valueLog) validateWrites(reqs []*request) error {
750 vlogOffset := uint64(vlog.woffset())
751 for _, req := range reqs {
752 // calculate size of the request.
753 size := estimateRequestSize(req)
754 estimatedVlogOffset := vlogOffset + size
755 if estimatedVlogOffset > uint64(maxVlogFileSize) {
756 return fmt.Errorf("Request size offset %d is bigger than maximum offset %d",
757 estimatedVlogOffset, maxVlogFileSize)
758 }
759
760 if estimatedVlogOffset >= uint64(vlog.opt.ValueLogFileSize) {
761 // We'll create a new vlog file if the estimated offset is greater or equal to
762 // max vlog size. So, resetting the vlogOffset.
763 vlogOffset = 0
764 continue
765 }
766 // Estimated vlog offset will become current vlog offset if the vlog is not rotated.
767 vlogOffset = estimatedVlogOffset
768 }
769 return nil
770 }
771
772 // estimateRequestSize returns the size that needed to be written for the given request.
773 func estimateRequestSize(req *request) uint64 {
774 size := uint64(0)
775 for _, e := range req.Entries {
776 size += uint64(maxHeaderSize + len(e.Key) + len(e.Value) + crc32.Size)
777 }
778 return size
779 }
780
781 // write is thread-unsafe by design and should not be called concurrently.
782 func (vlog *valueLog) write(reqs []*request) error {
783 if vlog.db.opt.InMemory {
784 return nil
785 }
786 // Validate writes before writing to vlog. Because, we don't want to partially write and return
787 // an error.
788 if err := vlog.validateWrites(reqs); err != nil {
789 return y.Wrapf(err, "while validating writes")
790 }
791
792 vlog.filesLock.RLock()
793 maxFid := vlog.maxFid
794 curlf := vlog.filesMap[maxFid]
795 vlog.filesLock.RUnlock()
796
797 defer func() {
798 if vlog.opt.SyncWrites {
799 if err := curlf.Sync(); err != nil {
800 vlog.opt.Errorf("Error while curlf sync: %v\n", err)
801 }
802 }
803 }()
804
805 write := func(buf *bytes.Buffer) error {
806 if buf.Len() == 0 {
807 return nil
808 }
809
810 n := uint32(buf.Len())
811 endOffset := vlog.writableLogOffset.Add(n)
812 // Increase the file size if we cannot accommodate this entry.
813 // [Aman] Should this be >= or just >? Doesn't make sense to extend the file if it big enough already.
814 if int(endOffset) >= len(curlf.Data) {
815 if err := curlf.Truncate(int64(endOffset)); err != nil {
816 return err
817 }
818 }
819
820 start := int(endOffset - n)
821 y.AssertTrue(copy(curlf.Data[start:], buf.Bytes()) == int(n))
822
823 curlf.size.Store(endOffset)
824 return nil
825 }
826
827 toDisk := func() error {
828 if vlog.woffset() > uint32(vlog.opt.ValueLogFileSize) ||
829 vlog.numEntriesWritten > vlog.opt.ValueLogMaxEntries {
830 if err := curlf.doneWriting(vlog.woffset()); err != nil {
831 return err
832 }
833
834 newlf, err := vlog.createVlogFile()
835 if err != nil {
836 return err
837 }
838 curlf = newlf
839 }
840 return nil
841 }
842
843 buf := new(bytes.Buffer)
844 for i := range reqs {
845 b := reqs[i]
846 b.Ptrs = b.Ptrs[:0]
847 var written, bytesWritten int
848 valueSizes := make([]int64, 0, len(b.Entries))
849 for j := range b.Entries {
850 buf.Reset()
851
852 e := b.Entries[j]
853 valueSizes = append(valueSizes, int64(len(e.Value)))
854 if e.skipVlogAndSetThreshold(vlog.db.valueThreshold()) {
855 b.Ptrs = append(b.Ptrs, valuePointer{})
856 continue
857 }
858 var p valuePointer
859
860 p.Fid = curlf.fid
861 p.Offset = vlog.woffset()
862
863 // We should not store transaction marks in the vlog file because it will never have all
864 // the entries in a transaction. If we store entries with transaction marks then value
865 // GC will not be able to iterate on the entire vlog file.
866 // But, we still want the entry to stay intact for the memTable WAL. So, store the meta
867 // in a temporary variable and reassign it after writing to the value log.
868 tmpMeta := e.meta
869 e.meta = e.meta &^ (bitTxn | bitFinTxn)
870 plen, err := curlf.encodeEntry(buf, e, p.Offset) // Now encode the entry into buffer.
871 if err != nil {
872 return err
873 }
874 // Restore the meta.
875 e.meta = tmpMeta
876
877 p.Len = uint32(plen)
878 b.Ptrs = append(b.Ptrs, p)
879 if err := write(buf); err != nil {
880 return err
881 }
882 written++
883 bytesWritten += buf.Len()
884 // No need to flush anything, we write to file directly via mmap.
885 }
886 y.NumWritesVlogAdd(vlog.opt.MetricsEnabled, int64(written))
887 y.NumBytesWrittenVlogAdd(vlog.opt.MetricsEnabled, int64(bytesWritten))
888
889 vlog.numEntriesWritten += uint32(written)
890 vlog.db.threshold.update(valueSizes)
891 // We write to disk here so that all entries that are part of the same transaction are
892 // written to the same vlog file.
893 if err := toDisk(); err != nil {
894 return err
895 }
896 }
897 return toDisk()
898 }
899
900 // Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
901 // (if non-nil)
902 func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
903 vlog.filesLock.RLock()
904 defer vlog.filesLock.RUnlock()
905 ret, ok := vlog.filesMap[vp.Fid]
906 if !ok {
907 // log file has gone away, we can't do anything. Return.
908 return nil, fmt.Errorf("file with ID: %d not found", vp.Fid)
909 }
910
911 // Check for valid offset if we are reading from writable log.
912 maxFid := vlog.maxFid
913 // In read-only mode we don't need to check for writable offset as we are not writing anything.
914 // Moreover, this offset is not set in readonly mode.
915 if !vlog.opt.ReadOnly && vp.Fid == maxFid {
916 currentOffset := vlog.woffset()
917 if vp.Offset >= currentOffset {
918 return nil, fmt.Errorf(
919 "Invalid value pointer offset: %d greater than current offset: %d",
920 vp.Offset, currentOffset)
921 }
922 }
923
924 ret.lock.RLock()
925 return ret, nil
926 }
927
928 // Read reads the value log at a given location.
929 // TODO: Make this read private.
930 func (vlog *valueLog) Read(vp valuePointer, _ *y.Slice) ([]byte, func(), error) {
931 buf, lf, err := vlog.readValueBytes(vp)
932 // log file is locked so, decide whether to lock immediately or let the caller to
933 // unlock it, after caller uses it.
934 cb := vlog.getUnlockCallback(lf)
935 if err != nil {
936 return nil, cb, err
937 }
938
939 if vlog.opt.VerifyValueChecksum {
940 hash := crc32.New(y.CastagnoliCrcTable)
941 if _, err := hash.Write(buf[:len(buf)-crc32.Size]); err != nil {
942 runCallback(cb)
943 return nil, nil, y.Wrapf(err, "failed to write hash for vp %+v", vp)
944 }
945 // Fetch checksum from the end of the buffer.
946 checksum := buf[len(buf)-crc32.Size:]
947 if hash.Sum32() != y.BytesToU32(checksum) {
948 runCallback(cb)
949 return nil, nil, y.Wrapf(y.ErrChecksumMismatch, "value corrupted for vp: %+v", vp)
950 }
951 }
952 var h header
953 headerLen := h.Decode(buf)
954 kv := buf[headerLen:]
955 if lf.encryptionEnabled() {
956 kv, err = lf.decryptKV(kv, vp.Offset)
957 if err != nil {
958 return nil, cb, err
959 }
960 }
961 if uint32(len(kv)) < h.klen+h.vlen {
962 vlog.db.opt.Errorf("Invalid read: vp: %+v", vp)
963 return nil, nil, fmt.Errorf("Invalid read: Len: %d read at:[%d:%d]",
964 len(kv), h.klen, h.klen+h.vlen)
965 }
966 return kv[h.klen : h.klen+h.vlen], cb, nil
967 }
968
969 // getUnlockCallback will returns a function which unlock the logfile if the logfile is mmaped.
970 // otherwise, it unlock the logfile and return nil.
971 func (vlog *valueLog) getUnlockCallback(lf *logFile) func() {
972 if lf == nil {
973 return nil
974 }
975 return lf.lock.RUnlock
976 }
977
978 // readValueBytes return vlog entry slice and read locked log file. Caller should take care of
979 // logFile unlocking.
980 func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error) {
981 lf, err := vlog.getFileRLocked(vp)
982 if err != nil {
983 return nil, nil, err
984 }
985
986 buf, err := lf.read(vp)
987 y.NumReadsVlogAdd(vlog.db.opt.MetricsEnabled, 1)
988 y.NumBytesReadsVlogAdd(vlog.db.opt.MetricsEnabled, int64(len(buf)))
989 return buf, lf, err
990 }
991
992 func (vlog *valueLog) pickLog(discardRatio float64) *logFile {
993 vlog.filesLock.RLock()
994 defer vlog.filesLock.RUnlock()
995
996 LOOP:
997 // Pick a candidate that contains the largest amount of discardable data
998 fid, discard := vlog.discardStats.MaxDiscard()
999
1000 // MaxDiscard will return fid=0 if it doesn't have any discard data. The
1001 // vlog files start from 1.
1002 if fid == 0 {
1003 vlog.opt.Debugf("No file with discard stats")
1004 return nil
1005 }
1006 lf, ok := vlog.filesMap[fid]
1007 // This file was deleted but it's discard stats increased because of compactions. The file
1008 // doesn't exist so we don't need to do anything. Skip it and retry.
1009 if !ok {
1010 vlog.discardStats.Update(fid, -1)
1011 goto LOOP
1012 }
1013 // We have a valid file.
1014 fi, err := lf.Fd.Stat()
1015 if err != nil {
1016 vlog.opt.Errorf("Unable to get stats for value log fid: %d err: %+v", fi, err)
1017 return nil
1018 }
1019 if thr := discardRatio * float64(fi.Size()); float64(discard) < thr {
1020 vlog.opt.Debugf("Discard: %d less than threshold: %.0f for file: %s",
1021 discard, thr, fi.Name())
1022 return nil
1023 }
1024 if fid < vlog.maxFid {
1025 vlog.opt.Infof("Found value log max discard fid: %d discard: %d\n", fid, discard)
1026 lf, ok := vlog.filesMap[fid]
1027 y.AssertTrue(ok)
1028 return lf
1029 }
1030
1031 // Don't randomly pick any value log file.
1032 return nil
1033 }
1034
1035 func discardEntry(e Entry, vs y.ValueStruct, db *DB) bool {
1036 if vs.Version != y.ParseTs(e.Key) {
1037 // Version not found. Discard.
1038 return true
1039 }
1040 if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
1041 return true
1042 }
1043 if (vs.Meta & bitValuePointer) == 0 {
1044 // Key also stores the value in LSM. Discard.
1045 return true
1046 }
1047 if (vs.Meta & bitFinTxn) > 0 {
1048 // Just a txn finish entry. Discard.
1049 return true
1050 }
1051 return false
1052 }
1053
1054 func (vlog *valueLog) doRunGC(lf *logFile) error {
1055 _, span := otel.Tracer("").Start(context.TODO(), "Badger.GC")
1056 span.SetAttributes(attribute.String("GC rewrite for", lf.path))
1057 defer span.End()
1058 if err := vlog.rewrite(lf); err != nil {
1059 return err
1060 }
1061 // Remove the file from discardStats.
1062 vlog.discardStats.Update(lf.fid, -1)
1063 return nil
1064 }
1065
1066 func (vlog *valueLog) waitOnGC(lc *z.Closer) {
1067 defer lc.Done()
1068
1069 <-lc.HasBeenClosed() // Wait for lc to be closed.
1070
1071 // Block any GC in progress to finish, and don't allow any more writes to runGC by filling up
1072 // the channel of size 1.
1073 vlog.garbageCh <- struct{}{}
1074 }
1075
1076 func (vlog *valueLog) runGC(discardRatio float64) error {
1077 select {
1078 case vlog.garbageCh <- struct{}{}:
1079 // Pick a log file for GC.
1080 defer func() {
1081 <-vlog.garbageCh
1082 }()
1083
1084 lf := vlog.pickLog(discardRatio)
1085 if lf == nil {
1086 return ErrNoRewrite
1087 }
1088 return vlog.doRunGC(lf)
1089 default:
1090 return ErrRejected
1091 }
1092 }
1093
1094 func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
1095 if vlog.opt.InMemory {
1096 return
1097 }
1098 for fid, discard := range stats {
1099 vlog.discardStats.Update(fid, discard)
1100 }
1101 // The following is to coordinate with some test cases where we want to
1102 // verify that at least one iteration of updateDiscardStats has been completed.
1103 vlog.db.logToSyncChan(updateDiscardStatsMsg)
1104 }
1105
1106 type vlogThreshold struct {
1107 logger Logger
1108 percentile float64
1109 valueThreshold atomic.Int64
1110 valueCh chan []int64
1111 clearCh chan bool
1112 closer *z.Closer
1113 // Metrics contains a running log of statistics like amount of data stored etc.
1114 vlMetrics *z.HistogramData
1115 }
1116
1117 func initVlogThreshold(opt *Options) *vlogThreshold {
1118 getBounds := func() []float64 {
1119 mxbd := opt.maxValueThreshold
1120 mnbd := float64(opt.ValueThreshold)
1121 y.AssertTruef(mxbd >= mnbd, "maximum threshold bound is less than the min threshold")
1122 size := math.Min(mxbd-mnbd+1, 1024.0)
1123 bdstp := (mxbd - mnbd) / size
1124 bounds := make([]float64, int64(size))
1125 for i := range bounds {
1126 if i == 0 {
1127 bounds[0] = mnbd
1128 continue
1129 }
1130 if i == int(size-1) {
1131 bounds[i] = mxbd
1132 continue
1133 }
1134 bounds[i] = bounds[i-1] + bdstp
1135 }
1136 return bounds
1137 }
1138 lt := &vlogThreshold{
1139 logger: opt.Logger,
1140 percentile: opt.VLogPercentile,
1141 valueCh: make(chan []int64, 1000),
1142 clearCh: make(chan bool, 1),
1143 closer: z.NewCloser(1),
1144 vlMetrics: z.NewHistogramData(getBounds()),
1145 }
1146 lt.valueThreshold.Store(opt.ValueThreshold)
1147 return lt
1148 }
1149
1150 func (v *vlogThreshold) Clear(opt Options) {
1151 v.valueThreshold.Store(opt.ValueThreshold)
1152 v.clearCh <- true
1153 }
1154
1155 func (v *vlogThreshold) update(sizes []int64) {
1156 v.valueCh <- sizes
1157 }
1158
1159 func (v *vlogThreshold) close() {
1160 v.closer.SignalAndWait()
1161 }
1162
1163 func (v *vlogThreshold) listenForValueThresholdUpdate() {
1164 defer v.closer.Done()
1165 for {
1166 select {
1167 case <-v.closer.HasBeenClosed():
1168 return
1169 case val := <-v.valueCh:
1170 for _, e := range val {
1171 v.vlMetrics.Update(e)
1172 }
1173 // we are making it to get Options.VlogPercentile so that values with sizes
1174 // in range of Options.VlogPercentile will make it to the LSM tree and rest to the
1175 // value log file.
1176 p := int64(v.vlMetrics.Percentile(v.percentile))
1177 if v.valueThreshold.Load() != p {
1178 if v.logger != nil {
1179 v.logger.Infof("updating value of threshold to: %d", p)
1180 }
1181 v.valueThreshold.Store(p)
1182 }
1183 case <-v.clearCh:
1184 v.vlMetrics.Clear()
1185 }
1186 }
1187 }
1188