memtable.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package badger
7
8 import (
9 "bufio"
10 "bytes"
11 "crypto/aes"
12 cryptorand "crypto/rand"
13 "encoding/binary"
14 "fmt"
15 "hash/crc32"
16 "io"
17 "os"
18 "path/filepath"
19 "sort"
20 "strconv"
21 "strings"
22 "sync"
23 "sync/atomic"
24
25 "github.com/dgraph-io/badger/v4/pb"
26 "github.com/dgraph-io/badger/v4/skl"
27 "github.com/dgraph-io/badger/v4/y"
28 "github.com/dgraph-io/ristretto/v2/z"
29 )
30
31 // memTable structure stores a skiplist and a corresponding WAL. Writes to memTable are written
32 // both to the WAL and the skiplist. On a crash, the WAL is replayed to bring the skiplist back to
33 // its pre-crash form.
34 type memTable struct {
35 // TODO: Give skiplist z.Calloc'd []byte.
36 sl *skl.Skiplist
37 wal *logFile
38 maxVersion uint64
39 opt Options
40 buf *bytes.Buffer
41 }
42
43 func (db *DB) openMemTables(opt Options) error {
44 // We don't need to open any tables in in-memory mode.
45 if db.opt.InMemory {
46 return nil
47 }
48 files, err := os.ReadDir(db.opt.Dir)
49 if err != nil {
50 return errFile(err, db.opt.Dir, "Unable to open mem dir.")
51 }
52
53 var fids []int
54 for _, file := range files {
55 if !strings.HasSuffix(file.Name(), memFileExt) {
56 continue
57 }
58 fsz := len(file.Name())
59 fid, err := strconv.ParseInt(file.Name()[:fsz-len(memFileExt)], 10, 64)
60 if err != nil {
61 return errFile(err, file.Name(), "Unable to parse log id.")
62 }
63 fids = append(fids, int(fid))
64 }
65
66 // Sort in ascending order.
67 sort.Slice(fids, func(i, j int) bool {
68 return fids[i] < fids[j]
69 })
70 for _, fid := range fids {
71 flags := os.O_RDWR
72 if db.opt.ReadOnly {
73 flags = os.O_RDONLY
74 }
75 mt, err := db.openMemTable(fid, flags)
76 if err != nil {
77 return y.Wrapf(err, "while opening fid: %d", fid)
78 }
79 // If this memtable is empty we don't need to add it. This is a
80 // memtable that was completely truncated.
81 if mt.sl.Empty() {
82 mt.DecrRef()
83 continue
84 }
85 // These should no longer be written to. So, make them part of the imm.
86 db.imm = append(db.imm, mt)
87 }
88 if len(fids) != 0 {
89 db.nextMemFid = fids[len(fids)-1]
90 }
91 db.nextMemFid++
92 return nil
93 }
94
95 const memFileExt string = ".mem"
96
97 func (db *DB) openMemTable(fid, flags int) (*memTable, error) {
98 filepath := db.mtFilePath(fid)
99 s := skl.NewSkiplist(arenaSize(db.opt))
100 mt := &memTable{
101 sl: s,
102 opt: db.opt,
103 buf: &bytes.Buffer{},
104 }
105 // We don't need to create the wal for the skiplist in in-memory mode so return the mt.
106 if db.opt.InMemory {
107 return mt, z.NewFile
108 }
109
110 mt.wal = &logFile{
111 fid: uint32(fid),
112 path: filepath,
113 registry: db.registry,
114 writeAt: vlogHeaderSize,
115 opt: db.opt,
116 }
117 lerr := mt.wal.open(filepath, flags, 2*db.opt.MemTableSize)
118 if lerr != z.NewFile && lerr != nil {
119 return nil, y.Wrapf(lerr, "While opening memtable: %s", filepath)
120 }
121
122 // Have a callback set to delete WAL when skiplist reference count goes down to zero. That is,
123 // when it gets flushed to L0.
124 s.OnClose = func() {
125 if err := mt.wal.Delete(); err != nil {
126 db.opt.Errorf("while deleting file: %s, err: %v", filepath, err)
127 }
128 }
129
130 if lerr == z.NewFile {
131 return mt, lerr
132 }
133 err := mt.UpdateSkipList()
134 return mt, y.Wrapf(err, "while updating skiplist")
135 }
136
137 func (db *DB) newMemTable() (*memTable, error) {
138 mt, err := db.openMemTable(db.nextMemFid, os.O_CREATE|os.O_RDWR)
139 if err == z.NewFile {
140 db.nextMemFid++
141 return mt, nil
142 }
143
144 if err != nil {
145 db.opt.Errorf("Got error: %v for id: %d\n", err, db.nextMemFid)
146 return nil, y.Wrapf(err, "newMemTable")
147 }
148 return nil, fmt.Errorf("File %s already exists", mt.wal.Fd.Name())
149 }
150
151 func (db *DB) mtFilePath(fid int) string {
152 return filepath.Join(db.opt.Dir, fmt.Sprintf("%05d%s", fid, memFileExt))
153 }
154
155 func (mt *memTable) SyncWAL() error {
156 return mt.wal.Sync()
157 }
158
159 func (mt *memTable) isFull() bool {
160 if mt.sl.MemSize() >= mt.opt.MemTableSize {
161 return true
162 }
163 if mt.opt.InMemory {
164 // InMemory mode doesn't have any WAL.
165 return false
166 }
167 return int64(mt.wal.writeAt) >= mt.opt.MemTableSize
168 }
169
170 func (mt *memTable) Put(key []byte, value y.ValueStruct) error {
171 entry := &Entry{
172 Key: key,
173 Value: value.Value,
174 UserMeta: value.UserMeta,
175 meta: value.Meta,
176 ExpiresAt: value.ExpiresAt,
177 }
178
179 // wal is nil only when badger in running in in-memory mode and we don't need the wal.
180 if mt.wal != nil {
181 // If WAL exceeds opt.ValueLogFileSize, we'll force flush the memTable. See logic in
182 // ensureRoomForWrite.
183 if err := mt.wal.writeEntry(mt.buf, entry, mt.opt); err != nil {
184 return y.Wrapf(err, "cannot write entry to WAL file")
185 }
186 }
187 // We insert the finish marker in the WAL but not in the memtable.
188 if entry.meta&bitFinTxn > 0 {
189 return nil
190 }
191
192 // Write to skiplist and update maxVersion encountered.
193 mt.sl.Put(key, value)
194 if ts := y.ParseTs(entry.Key); ts > mt.maxVersion {
195 mt.maxVersion = ts
196 }
197 y.NumBytesWrittenToL0Add(mt.opt.MetricsEnabled, entry.estimateSizeAndSetThreshold(mt.opt.ValueThreshold))
198 return nil
199 }
200
201 func (mt *memTable) UpdateSkipList() error {
202 if mt.wal == nil || mt.sl == nil {
203 return nil
204 }
205 endOff, err := mt.wal.iterate(true, 0, mt.replayFunction(mt.opt))
206 if err != nil {
207 return y.Wrapf(err, "while iterating wal: %s", mt.wal.Fd.Name())
208 }
209 if endOff < mt.wal.size.Load() && mt.opt.ReadOnly {
210 return y.Wrapf(ErrTruncateNeeded, "end offset: %d < size: %d", endOff, mt.wal.size.Load())
211 }
212 return mt.wal.Truncate(int64(endOff))
213 }
214
215 // IncrRef increases the refcount
216 func (mt *memTable) IncrRef() {
217 mt.sl.IncrRef()
218 }
219
220 // DecrRef decrements the refcount, deallocating the Skiplist when done using it
221 func (mt *memTable) DecrRef() {
222 mt.sl.DecrRef()
223 }
224
225 func (mt *memTable) replayFunction(opt Options) func(Entry, valuePointer) error {
226 first := true
227 return func(e Entry, _ valuePointer) error { // Function for replaying.
228 if first {
229 opt.Debugf("First key=%q\n", e.Key)
230 }
231 first = false
232 if ts := y.ParseTs(e.Key); ts > mt.maxVersion {
233 mt.maxVersion = ts
234 }
235 v := y.ValueStruct{
236 Value: e.Value,
237 Meta: e.meta,
238 UserMeta: e.UserMeta,
239 ExpiresAt: e.ExpiresAt,
240 }
241 // This is already encoded correctly. Value would be either a vptr, or a full value
242 // depending upon how big the original value was. Skiplist makes a copy of the key and
243 // value.
244 mt.sl.Put(e.Key, v)
245 return nil
246 }
247 }
248
249 type logFile struct {
250 *z.MmapFile
251 path string
252 // This is a lock on the log file. It guards the fd’s value, the file’s
253 // existence and the file’s memory map.
254 //
255 // Use shared ownership when reading/writing the file or memory map, use
256 // exclusive ownership to open/close the descriptor, unmap or remove the file.
257 lock sync.RWMutex
258 fid uint32
259 size atomic.Uint32
260 dataKey *pb.DataKey
261 baseIV []byte
262 registry *KeyRegistry
263 writeAt uint32
264 opt Options
265 }
266
267 func (lf *logFile) Truncate(end int64) error {
268 if fi, err := lf.Fd.Stat(); err != nil {
269 return fmt.Errorf("while file.stat on file: %s, error: %v\n", lf.Fd.Name(), err)
270 } else if fi.Size() == end {
271 return nil
272 }
273 y.AssertTrue(!lf.opt.ReadOnly)
274 lf.size.Store(uint32(end))
275 return lf.MmapFile.Truncate(end)
276 }
277
278 // encodeEntry will encode entry to the buf
279 // layout of entry
280 // +--------+-----+-------+-------+
281 // | header | key | value | crc32 |
282 // +--------+-----+-------+-------+
283 func (lf *logFile) encodeEntry(buf *bytes.Buffer, e *Entry, offset uint32) (int, error) {
284 h := header{
285 klen: uint32(len(e.Key)),
286 vlen: uint32(len(e.Value)),
287 expiresAt: e.ExpiresAt,
288 meta: e.meta,
289 userMeta: e.UserMeta,
290 }
291
292 hash := crc32.New(y.CastagnoliCrcTable)
293 writer := io.MultiWriter(buf, hash)
294
295 // encode header.
296 var headerEnc [maxHeaderSize]byte
297 sz := h.Encode(headerEnc[:])
298 y.Check2(writer.Write(headerEnc[:sz]))
299 // we'll encrypt only key and value.
300 if lf.encryptionEnabled() {
301 // TODO: no need to allocate the bytes. we can calculate the encrypted buf one by one
302 // since we're using ctr mode of AES encryption. Ordering won't changed. Need some
303 // refactoring in XORBlock which will work like stream cipher.
304 eBuf := make([]byte, 0, len(e.Key)+len(e.Value))
305 eBuf = append(eBuf, e.Key...)
306 eBuf = append(eBuf, e.Value...)
307 if err := y.XORBlockStream(
308 writer, eBuf, lf.dataKey.Data, lf.generateIV(offset)); err != nil {
309 return 0, y.Wrapf(err, "Error while encoding entry for vlog.")
310 }
311 } else {
312 // Encryption is disabled so writing directly to the buffer.
313 y.Check2(writer.Write(e.Key))
314 y.Check2(writer.Write(e.Value))
315 }
316 // write crc32 hash.
317 var crcBuf [crc32.Size]byte
318 binary.BigEndian.PutUint32(crcBuf[:], hash.Sum32())
319 y.Check2(buf.Write(crcBuf[:]))
320 // return encoded length.
321 return len(headerEnc[:sz]) + len(e.Key) + len(e.Value) + len(crcBuf), nil
322 }
323
324 func (lf *logFile) writeEntry(buf *bytes.Buffer, e *Entry, opt Options) error {
325 buf.Reset()
326 plen, err := lf.encodeEntry(buf, e, lf.writeAt)
327 if err != nil {
328 return err
329 }
330 y.AssertTrue(plen == copy(lf.Data[lf.writeAt:], buf.Bytes()))
331 lf.writeAt += uint32(plen)
332
333 lf.zeroNextEntry()
334 return nil
335 }
336
337 func (lf *logFile) decodeEntry(buf []byte, offset uint32) (*Entry, error) {
338 var h header
339 hlen := h.Decode(buf)
340 kv := buf[hlen:]
341 if lf.encryptionEnabled() {
342 var err error
343 // No need to worry about mmap. because, XORBlock allocates a byte array to do the
344 // xor. So, the given slice is not being mutated.
345 if kv, err = lf.decryptKV(kv, offset); err != nil {
346 return nil, err
347 }
348 }
349 e := &Entry{
350 meta: h.meta,
351 UserMeta: h.userMeta,
352 ExpiresAt: h.expiresAt,
353 offset: offset,
354 Key: kv[:h.klen],
355 Value: kv[h.klen : h.klen+h.vlen],
356 }
357 return e, nil
358 }
359
360 func (lf *logFile) decryptKV(buf []byte, offset uint32) ([]byte, error) {
361 return y.XORBlockAllocate(buf, lf.dataKey.Data, lf.generateIV(offset))
362 }
363
364 // KeyID returns datakey's ID.
365 func (lf *logFile) keyID() uint64 {
366 if lf.dataKey == nil {
367 // If there is no datakey, then we'll return 0. Which means no encryption.
368 return 0
369 }
370 return lf.dataKey.KeyId
371 }
372
373 func (lf *logFile) encryptionEnabled() bool {
374 return lf.dataKey != nil
375 }
376
377 // Acquire lock on mmap/file if you are calling this
378 func (lf *logFile) read(p valuePointer) (buf []byte, err error) {
379 offset := p.Offset
380 // Do not convert size to uint32, because the lf.Data can be of size
381 // 4GB, which overflows the uint32 during conversion to make the size 0,
382 // causing the read to fail with ErrEOF. See issue #585.
383 size := int64(len(lf.Data))
384 valsz := p.Len
385 lfsz := lf.size.Load()
386 if int64(offset) >= size || int64(offset+valsz) > size ||
387 // Ensure that the read is within the file's actual size. It might be possible that
388 // the offset+valsz length is beyond the file's actual size. This could happen when
389 // dropAll and iterations are running simultaneously.
390 int64(offset+valsz) > int64(lfsz) {
391 err = y.ErrEOF
392 } else {
393 buf = lf.Data[offset : offset+valsz]
394 }
395 return buf, err
396 }
397
398 // generateIV will generate IV by appending given offset with the base IV.
399 func (lf *logFile) generateIV(offset uint32) []byte {
400 iv := make([]byte, aes.BlockSize)
401 // baseIV is of 12 bytes.
402 y.AssertTrue(12 == copy(iv[:12], lf.baseIV))
403 // remaining 4 bytes is obtained from offset.
404 binary.BigEndian.PutUint32(iv[12:], offset)
405 return iv
406 }
407
408 func (lf *logFile) doneWriting(offset uint32) error {
409 if lf.opt.SyncWrites {
410 if err := lf.Sync(); err != nil {
411 return y.Wrapf(err, "Unable to sync value log: %q", lf.path)
412 }
413 }
414
415 // Before we were acquiring a lock here on lf.lock, because we were invalidating the file
416 // descriptor due to reopening it as read-only. Now, we don't invalidate the fd, but unmap it,
417 // truncate it and remap it. That creates a window where we have segfaults because the mmap is
418 // no longer valid, while someone might be reading it. Therefore, we need a lock here again.
419 lf.lock.Lock()
420 defer lf.lock.Unlock()
421
422 if err := lf.Truncate(int64(offset)); err != nil {
423 return y.Wrapf(err, "Unable to truncate file: %q", lf.path)
424 }
425
426 // Previously we used to close the file after it was written and reopen it in read-only mode.
427 // We no longer open files in read-only mode. We keep all vlog files open in read-write mode.
428 return nil
429 }
430
431 // iterate iterates over log file. It doesn't not allocate new memory for every kv pair.
432 // Therefore, the kv pair is only valid for the duration of fn call.
433 func (lf *logFile) iterate(readOnly bool, offset uint32, fn logEntry) (uint32, error) {
434 if offset == 0 {
435 // If offset is set to zero, let's advance past the encryption key header.
436 offset = vlogHeaderSize
437 }
438
439 // For now, read directly from file, because it allows
440 reader := bufio.NewReader(lf.NewReader(int(offset)))
441 read := &safeRead{
442 k: make([]byte, 10),
443 v: make([]byte, 10),
444 recordOffset: offset,
445 lf: lf,
446 }
447
448 var lastCommit uint64
449 var validEndOffset uint32 = offset
450
451 var entries []*Entry
452 var vptrs []valuePointer
453
454 loop:
455 for {
456 e, err := read.Entry(reader)
457 switch {
458 // We have not reached the end of the file but the entry we read is
459 // zero. This happens because we have truncated the file and
460 // zero'ed it out.
461 case err == io.EOF:
462 break loop
463 case err == io.ErrUnexpectedEOF || err == errTruncate:
464 break loop
465 case err != nil:
466 return 0, err
467 case e == nil:
468 continue
469 case e.isZero():
470 break loop
471 }
472
473 var vp valuePointer
474 vp.Len = uint32(e.hlen + len(e.Key) + len(e.Value) + crc32.Size)
475 read.recordOffset += vp.Len
476
477 vp.Offset = e.offset
478 vp.Fid = lf.fid
479
480 switch {
481 case e.meta&bitTxn > 0:
482 txnTs := y.ParseTs(e.Key)
483 if lastCommit == 0 {
484 lastCommit = txnTs
485 }
486 if lastCommit != txnTs {
487 break loop
488 }
489 entries = append(entries, e)
490 vptrs = append(vptrs, vp)
491
492 case e.meta&bitFinTxn > 0:
493 txnTs, err := strconv.ParseUint(string(e.Value), 10, 64)
494 if err != nil || lastCommit != txnTs {
495 break loop
496 }
497 // Got the end of txn. Now we can store them.
498 lastCommit = 0
499 validEndOffset = read.recordOffset
500
501 for i, e := range entries {
502 vp := vptrs[i]
503 if err := fn(*e, vp); err != nil {
504 if err == errStop {
505 break
506 }
507 return 0, errFile(err, lf.path, "Iteration function")
508 }
509 }
510 entries = entries[:0]
511 vptrs = vptrs[:0]
512
513 default:
514 if lastCommit != 0 {
515 // This is most likely an entry which was moved as part of GC.
516 // We shouldn't get this entry in the middle of a transaction.
517 break loop
518 }
519 validEndOffset = read.recordOffset
520
521 if err := fn(*e, vp); err != nil {
522 if err == errStop {
523 break
524 }
525 return 0, errFile(err, lf.path, "Iteration function")
526 }
527 }
528 }
529 return validEndOffset, nil
530 }
531
532 // Zero out the next entry to deal with any crashes.
533 func (lf *logFile) zeroNextEntry() {
534 z.ZeroOut(lf.Data, int(lf.writeAt), int(lf.writeAt+maxHeaderSize))
535 }
536
537 func (lf *logFile) open(path string, flags int, fsize int64) error {
538 mf, ferr := z.OpenMmapFile(path, flags, int(fsize))
539 lf.MmapFile = mf
540
541 if ferr == z.NewFile {
542 if err := lf.bootstrap(); err != nil {
543 os.Remove(path)
544 return err
545 }
546 lf.size.Store(vlogHeaderSize)
547
548 } else if ferr != nil {
549 return y.Wrapf(ferr, "while opening file: %s", path)
550 }
551 lf.size.Store(uint32(len(lf.Data)))
552
553 if lf.size.Load() < vlogHeaderSize {
554 // Every vlog file should have at least vlogHeaderSize. If it is less than vlogHeaderSize
555 // then it must have been corrupted. But no need to handle here. log replayer will truncate
556 // and bootstrap the logfile. So ignoring here.
557 return nil
558 }
559
560 // Copy over the encryption registry data.
561 buf := make([]byte, vlogHeaderSize)
562
563 y.AssertTruef(vlogHeaderSize == copy(buf, lf.Data),
564 "Unable to copy from %s, size %d", path, lf.size.Load())
565 keyID := binary.BigEndian.Uint64(buf[:8])
566 // retrieve datakey.
567 if dk, err := lf.registry.DataKey(keyID); err != nil {
568 return y.Wrapf(err, "While opening vlog file %d", lf.fid)
569 } else {
570 lf.dataKey = dk
571 }
572 lf.baseIV = buf[8:]
573 y.AssertTrue(len(lf.baseIV) == 12)
574
575 // Preserved ferr so we can return if this was a new file.
576 return ferr
577 }
578
579 // bootstrap will initialize the log file with key id and baseIV.
580 // The below figure shows the layout of log file.
581 // +----------------+------------------+------------------+
582 // | keyID(8 bytes) | baseIV(12 bytes)| entry... |
583 // +----------------+------------------+------------------+
584 func (lf *logFile) bootstrap() error {
585 var err error
586
587 // generate data key for the log file.
588 var dk *pb.DataKey
589 if dk, err = lf.registry.LatestDataKey(); err != nil {
590 return y.Wrapf(err, "Error while retrieving datakey in logFile.bootstarp")
591 }
592 lf.dataKey = dk
593
594 // We'll always preserve vlogHeaderSize for key id and baseIV.
595 buf := make([]byte, vlogHeaderSize)
596
597 // write key id to the buf.
598 // key id will be zero if the logfile is in plain text.
599 binary.BigEndian.PutUint64(buf[:8], lf.keyID())
600 // generate base IV. It'll be used with offset of the vptr to encrypt the entry.
601 if _, err := cryptorand.Read(buf[8:]); err != nil {
602 return y.Wrapf(err, "Error while creating base IV, while creating logfile")
603 }
604
605 // Initialize base IV.
606 lf.baseIV = buf[8:]
607 y.AssertTrue(len(lf.baseIV) == 12)
608
609 // Copy over to the logFile.
610 y.AssertTrue(vlogHeaderSize == copy(lf.Data[0:], buf))
611
612 // Zero out the next entry.
613 lf.zeroNextEntry()
614 return nil
615 }
616