table.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package table
7
8 import (
9 "bytes"
10 "crypto/aes"
11 "encoding/binary"
12 "errors"
13 "fmt"
14 "math"
15 "os"
16 "path/filepath"
17 "strconv"
18 "strings"
19 "sync"
20 "sync/atomic"
21 "time"
22 "unsafe"
23
24 "github.com/klauspost/compress/snappy"
25 "github.com/klauspost/compress/zstd"
26 "google.golang.org/protobuf/proto"
27
28 "github.com/dgraph-io/badger/v4/fb"
29 "github.com/dgraph-io/badger/v4/options"
30 "github.com/dgraph-io/badger/v4/pb"
31 "github.com/dgraph-io/badger/v4/y"
32 "github.com/dgraph-io/ristretto/v2"
33 "github.com/dgraph-io/ristretto/v2/z"
34 )
35
36 const fileSuffix = ".sst"
37 const intSize = int(unsafe.Sizeof(int(0)))
38
39 // Options contains configurable options for Table/Builder.
40 type Options struct {
41 // Options for Opening/Building Table.
42
43 // Open tables in read only mode.
44 ReadOnly bool
45 MetricsEnabled bool
46
47 // Maximum size of the table.
48 TableSize uint64
49 tableCapacity uint64 // 0.9x TableSize.
50
51 // ChkMode is the checksum verification mode for Table.
52 ChkMode options.ChecksumVerificationMode
53
54 // Options for Table builder.
55
56 // BloomFalsePositive is the false positive probabiltiy of bloom filter.
57 BloomFalsePositive float64
58
59 // BlockSize is the size of each block inside SSTable in bytes.
60 BlockSize int
61
62 // DataKey is the key used to decrypt the encrypted text.
63 DataKey *pb.DataKey
64
65 // Compression indicates the compression algorithm used for block compression.
66 Compression options.CompressionType
67
68 // Block cache is used to cache decompressed and decrypted blocks.
69 BlockCache *ristretto.Cache[[]byte, *Block]
70 IndexCache *ristretto.Cache[uint64, *fb.TableIndex]
71
72 AllocPool *z.AllocatorPool
73
74 // ZSTDCompressionLevel is the ZSTD compression level used for compressing blocks.
75 ZSTDCompressionLevel int
76 }
77
78 // TableInterface is useful for testing.
79 type TableInterface interface {
80 Smallest() []byte
81 Biggest() []byte
82 DoesNotHave(hash uint32) bool
83 MaxVersion() uint64
84 }
85
86 // Table represents a loaded table file with the info we have about it.
87 type Table struct {
88 sync.Mutex
89 *z.MmapFile
90
91 tableSize int // Initialized in OpenTable, using fd.Stat().
92
93 _index *fb.TableIndex // Nil if encryption is enabled. Use fetchIndex to access.
94 _cheap *cheapIndex
95 ref atomic.Int32 // For file garbage collection
96
97 // The following are initialized once and const.
98 smallest, biggest []byte // Smallest and largest keys (with timestamps).
99 id uint64 // file id, part of filename
100
101 Checksum []byte
102 CreatedAt time.Time
103 indexStart int
104 indexLen int
105 hasBloomFilter bool
106
107 IsInmemory bool // Set to true if the table is on level 0 and opened in memory.
108 opt *Options
109 }
110
111 type cheapIndex struct {
112 MaxVersion uint64
113 KeyCount uint32
114 UncompressedSize uint32
115 OnDiskSize uint32
116 BloomFilterLength int
117 OffsetsLength int
118 }
119
120 func (t *Table) cheapIndex() *cheapIndex {
121 return t._cheap
122 }
123 func (t *Table) offsetsLength() int { return t.cheapIndex().OffsetsLength }
124
125 // MaxVersion returns the maximum version across all keys stored in this table.
126 func (t *Table) MaxVersion() uint64 { return t.cheapIndex().MaxVersion }
127
128 // BloomFilterSize returns the size of the bloom filter in bytes stored in memory.
129 func (t *Table) BloomFilterSize() int { return t.cheapIndex().BloomFilterLength }
130
131 // UncompressedSize is the size uncompressed data stored in this file.
132 func (t *Table) UncompressedSize() uint32 { return t.cheapIndex().UncompressedSize }
133
134 // KeyCount is the total number of keys in this table.
135 func (t *Table) KeyCount() uint32 { return t.cheapIndex().KeyCount }
136
137 // OnDiskSize returns the total size of key-values stored in this table (including the
138 // disk space occupied on the value log).
139 func (t *Table) OnDiskSize() uint32 { return t.cheapIndex().OnDiskSize }
140
141 // CompressionType returns the compression algorithm used for block compression.
142 func (t *Table) CompressionType() options.CompressionType {
143 return t.opt.Compression
144 }
145
146 // IncrRef increments the refcount (having to do with whether the file should be deleted)
147 func (t *Table) IncrRef() {
148 t.ref.Add(1)
149 }
150
151 // DecrRef decrements the refcount and possibly deletes the table
152 func (t *Table) DecrRef() error {
153 newRef := t.ref.Add(-1)
154 if newRef == 0 {
155 // We can safely delete this file, because for all the current files, we always have
156 // at least one reference pointing to them.
157
158 // Delete all blocks from the cache.
159 for i := 0; i < t.offsetsLength(); i++ {
160 t.opt.BlockCache.Del(t.blockCacheKey(i))
161 }
162 if err := t.Delete(); err != nil {
163 return err
164 }
165 }
166 return nil
167 }
168
169 // BlockEvictHandler is used to reuse the byte slice stored in the block on cache eviction.
170 func BlockEvictHandler(b *Block) {
171 b.decrRef()
172 }
173
174 type Block struct {
175 offset int
176 data []byte
177 checksum []byte
178 entriesIndexStart int // start index of entryOffsets list
179 entryOffsets []uint32 // used to binary search an entry in the block.
180 chkLen int // checksum length.
181 freeMe bool // used to determine if the blocked should be reused.
182 ref atomic.Int32
183 }
184
185 var NumBlocks atomic.Int32
186
187 // incrRef increments the ref of a block and return a bool indicating if the
188 // increment was successful. A true value indicates that the block can be used.
189 func (b *Block) incrRef() bool {
190 for {
191 // We can't blindly add 1 to ref. We need to check whether it has
192 // reached zero first, because if it did, then we should absolutely not
193 // use this block.
194 ref := b.ref.Load()
195 // The ref would not be equal to 0 unless the existing
196 // block get evicted before this line. If the ref is zero, it means that
197 // the block is already added the the blockPool and cannot be used
198 // anymore. The ref of a new block is 1 so the following condition will
199 // be true only if the block got reused before we could increment its
200 // ref.
201 if ref == 0 {
202 return false
203 }
204 // Increment the ref only if it is not zero and has not changed between
205 // the time we read it and we're updating it.
206 //
207 if b.ref.CompareAndSwap(ref, ref+1) {
208 return true
209 }
210 }
211 }
212 func (b *Block) decrRef() {
213 if b == nil {
214 return
215 }
216
217 // Insert the []byte into pool only if the block is resuable. When a block
218 // is reusable a new []byte is used for decompression and this []byte can
219 // be reused.
220 // In case of an uncompressed block, the []byte is a reference to the
221 // table.mmap []byte slice. Any attempt to write data to the mmap []byte
222 // will lead to SEGFAULT.
223 if b.ref.Add(-1) == 0 {
224 if b.freeMe {
225 z.Free(b.data)
226 }
227 NumBlocks.Add(-1)
228 // blockPool.Put(&b.data)
229 }
230 y.AssertTrue(b.ref.Load() >= 0)
231 }
232 func (b *Block) size() int64 {
233 return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
234 cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4)
235 }
236
237 func (b *Block) verifyCheckSum() error {
238 cs := &pb.Checksum{}
239 if err := proto.Unmarshal(b.checksum, cs); err != nil {
240 return y.Wrapf(err, "unable to unmarshal checksum for block")
241 }
242 return y.VerifyChecksum(b.data, cs)
243 }
244
245 func CreateTable(fname string, builder *Builder) (*Table, error) {
246 bd := builder.Done()
247 mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR|os.O_EXCL, bd.Size)
248 if err == z.NewFile {
249 // Expected.
250 } else if err != nil {
251 return nil, y.Wrapf(err, "while creating table: %s", fname)
252 } else {
253 return nil, fmt.Errorf("file already exists: %s", fname)
254 }
255
256 written := bd.Copy(mf.Data)
257 y.AssertTrue(written == len(mf.Data))
258 if err := z.Msync(mf.Data); err != nil {
259 return nil, y.Wrapf(err, "while calling msync on %s", fname)
260 }
261 return OpenTable(mf, *builder.opts)
262 }
263
264 // OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function
265 // entry. Returns a table with one reference count on it (decrementing which may delete the file!
266 // -- consider t.Close() instead). The fd has to writeable because we call Truncate on it before
267 // deleting. Checksum for all blocks of table is verified based on value of chkMode.
268 func OpenTable(mf *z.MmapFile, opts Options) (*Table, error) {
269 // BlockSize is used to compute the approximate size of the decompressed
270 // block. It should not be zero if the table is compressed.
271 if opts.BlockSize == 0 && opts.Compression != options.None {
272 return nil, errors.New("Block size cannot be zero")
273 }
274 fileInfo, err := mf.Fd.Stat()
275 if err != nil {
276 mf.Close(-1)
277 return nil, y.Wrap(err, "")
278 }
279
280 filename := fileInfo.Name()
281 id, ok := ParseFileID(filename)
282 if !ok {
283 mf.Close(-1)
284 return nil, fmt.Errorf("Invalid filename: %s", filename)
285 }
286 t := &Table{
287 MmapFile: mf,
288 id: id,
289 opt: &opts,
290 IsInmemory: false,
291 tableSize: int(fileInfo.Size()),
292 CreatedAt: fileInfo.ModTime(),
293 }
294 // Caller is given one reference.
295 t.ref.Store(1)
296
297 if err := t.initBiggestAndSmallest(); err != nil {
298 return nil, y.Wrapf(err, "failed to initialize table")
299 }
300
301 if opts.ChkMode == options.OnTableRead || opts.ChkMode == options.OnTableAndBlockRead {
302 if err := t.VerifyChecksum(); err != nil {
303 mf.Close(-1)
304 return nil, y.Wrapf(err, "failed to verify checksum")
305 }
306 }
307
308 return t, nil
309 }
310
311 // OpenInMemoryTable is similar to OpenTable but it opens a new table from the provided data.
312 // OpenInMemoryTable is used for L0 tables.
313 func OpenInMemoryTable(data []byte, id uint64, opt *Options) (*Table, error) {
314 mf := &z.MmapFile{
315 Data: data,
316 Fd: nil,
317 }
318 t := &Table{
319 MmapFile: mf,
320 opt: opt,
321 tableSize: len(data),
322 IsInmemory: true,
323 id: id, // It is important that each table gets a unique ID.
324 }
325 // Caller is given one reference.
326 t.ref.Store(1)
327
328 if err := t.initBiggestAndSmallest(); err != nil {
329 return nil, err
330 }
331 return t, nil
332 }
333
334 func (t *Table) initBiggestAndSmallest() error {
335 // This defer will help gathering debugging info incase initIndex crashes.
336 defer func() {
337 if r := recover(); r != nil {
338 // Use defer for printing info because there may be an intermediate panic.
339 var debugBuf bytes.Buffer
340 defer func() {
341 panic(fmt.Sprintf("%s\n== Recovered ==\n", debugBuf.String()))
342 }()
343
344 // Get the count of null bytes at the end of file. This is to make sure if there was an
345 // issue with mmap sync or file copy.
346 count := 0
347 for i := len(t.Data) - 1; i >= 0; i-- {
348 if t.Data[i] != 0 {
349 break
350 }
351 count++
352 }
353
354 fmt.Fprintf(&debugBuf, "\n== Recovering from initIndex crash ==\n")
355 fmt.Fprintf(&debugBuf, "File Info: [ID: %d, Size: %d, Zeros: %d]\n",
356 t.id, t.tableSize, count)
357
358 fmt.Fprintf(&debugBuf, "isEnrypted: %v ", t.shouldDecrypt())
359
360 readPos := t.tableSize
361
362 // Read checksum size.
363 readPos -= 4
364 buf := t.readNoFail(readPos, 4)
365 checksumLen := int(y.BytesToU32(buf))
366 fmt.Fprintf(&debugBuf, "checksumLen: %d ", checksumLen)
367
368 // Read checksum.
369 checksum := &pb.Checksum{}
370 readPos -= checksumLen
371 buf = t.readNoFail(readPos, checksumLen)
372 _ = proto.Unmarshal(buf, checksum)
373 fmt.Fprintf(&debugBuf, "checksum: %+v ", checksum)
374
375 // Read index size from the footer.
376 readPos -= 4
377 buf = t.readNoFail(readPos, 4)
378 indexLen := int(y.BytesToU32(buf))
379 fmt.Fprintf(&debugBuf, "indexLen: %d ", indexLen)
380
381 // Read index.
382 readPos -= t.indexLen
383 t.indexStart = readPos
384 indexData := t.readNoFail(readPos, t.indexLen)
385 fmt.Fprintf(&debugBuf, "index: %v ", indexData)
386 }
387 }()
388
389 var err error
390 var ko *fb.BlockOffset
391 if ko, err = t.initIndex(); err != nil {
392 return y.Wrapf(err, "failed to read index.")
393 }
394
395 t.smallest = y.Copy(ko.KeyBytes())
396
397 it2 := t.NewIterator(REVERSED | NOCACHE)
398 defer it2.Close()
399 it2.Rewind()
400 if !it2.Valid() {
401 return y.Wrapf(it2.err, "failed to initialize biggest for table %s", t.Filename())
402 }
403 t.biggest = y.Copy(it2.Key())
404 return nil
405 }
406
407 func (t *Table) read(off, sz int) ([]byte, error) {
408 return t.Bytes(off, sz)
409 }
410
411 func (t *Table) readNoFail(off, sz int) []byte {
412 res, err := t.read(off, sz)
413 y.Check(err)
414 return res
415 }
416
417 // initIndex reads the index and populate the necessary table fields and returns
418 // first block offset
419 func (t *Table) initIndex() (*fb.BlockOffset, error) {
420 readPos := t.tableSize
421
422 // Read checksum len from the last 4 bytes.
423 readPos -= 4
424 buf := t.readNoFail(readPos, 4)
425 checksumLen := int(y.BytesToU32(buf))
426 if checksumLen < 0 {
427 return nil, errors.New("checksum length less than zero. Data corrupted")
428 }
429
430 // Read checksum.
431 expectedChk := &pb.Checksum{}
432 readPos -= checksumLen
433 buf = t.readNoFail(readPos, checksumLen)
434 if err := proto.Unmarshal(buf, expectedChk); err != nil {
435 return nil, err
436 }
437
438 // Read index size from the footer.
439 readPos -= 4
440 buf = t.readNoFail(readPos, 4)
441 t.indexLen = int(y.BytesToU32(buf))
442
443 // Read index.
444 readPos -= t.indexLen
445 t.indexStart = readPos
446 data := t.readNoFail(readPos, t.indexLen)
447
448 if err := y.VerifyChecksum(data, expectedChk); err != nil {
449 return nil, y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename())
450 }
451
452 index, err := t.readTableIndex()
453 if err != nil {
454 return nil, err
455 }
456 if !t.shouldDecrypt() {
457 // If there's no encryption, this points to the mmap'ed buffer.
458 t._index = index
459 }
460 t._cheap = &cheapIndex{
461 MaxVersion: index.MaxVersion(),
462 KeyCount: index.KeyCount(),
463 UncompressedSize: index.UncompressedSize(),
464 OnDiskSize: index.OnDiskSize(),
465 OffsetsLength: index.OffsetsLength(),
466 BloomFilterLength: index.BloomFilterLength(),
467 }
468
469 t.hasBloomFilter = len(index.BloomFilterBytes()) > 0
470
471 var bo fb.BlockOffset
472 y.AssertTrue(index.Offsets(&bo, 0))
473 return &bo, nil
474 }
475
476 // KeySplits splits the table into at least n ranges based on the block offsets.
477 func (t *Table) KeySplits(n int, prefix []byte) []string {
478 if n == 0 {
479 return nil
480 }
481
482 oLen := t.offsetsLength()
483 jump := oLen / n
484 if jump == 0 {
485 jump = 1
486 }
487
488 var bo fb.BlockOffset
489 var res []string
490 for i := 0; i < oLen; i += jump {
491 if i >= oLen {
492 i = oLen - 1
493 }
494 y.AssertTrue(t.offsets(&bo, i))
495 if bytes.HasPrefix(bo.KeyBytes(), prefix) {
496 res = append(res, string(bo.KeyBytes()))
497 }
498 }
499 return res
500 }
501
502 func (t *Table) fetchIndex() *fb.TableIndex {
503 if !t.shouldDecrypt() {
504 return t._index
505 }
506
507 if t.opt.IndexCache == nil {
508 panic("Index Cache must be set for encrypted workloads")
509 }
510 if val, ok := t.opt.IndexCache.Get(t.indexKey()); ok && val != nil {
511 return val
512 }
513
514 index, err := t.readTableIndex()
515 y.Check(err)
516 t.opt.IndexCache.Set(t.indexKey(), index, int64(t.indexLen))
517 return index
518 }
519
520 func (t *Table) offsets(ko *fb.BlockOffset, i int) bool {
521 return t.fetchIndex().Offsets(ko, i)
522 }
523
524 // block function return a new block. Each block holds a ref and the byte
525 // slice stored in the block will be reused when the ref becomes zero. The
526 // caller should release the block by calling block.decrRef() on it.
527 func (t *Table) block(idx int, useCache bool) (*Block, error) {
528 y.AssertTruef(idx >= 0, "idx=%d", idx)
529 if idx >= t.offsetsLength() {
530 return nil, errors.New("block out of index")
531 }
532 if t.opt.BlockCache != nil {
533 key := t.blockCacheKey(idx)
534 blk, ok := t.opt.BlockCache.Get(key)
535 if ok && blk != nil {
536 // Use the block only if the increment was successful. The block
537 // could get evicted from the cache between the Get() call and the
538 // incrRef() call.
539 if blk.incrRef() {
540 return blk, nil
541 }
542 }
543 }
544
545 var ko fb.BlockOffset
546 y.AssertTrue(t.offsets(&ko, idx))
547 blk := &Block{offset: int(ko.Offset())}
548 blk.ref.Store(1)
549 defer blk.decrRef() // Deal with any errors, where blk would not be returned.
550 NumBlocks.Add(1)
551
552 var err error
553 if blk.data, err = t.read(blk.offset, int(ko.Len())); err != nil {
554 return nil, y.Wrapf(err,
555 "failed to read from file: %s at offset: %d, len: %d",
556 t.Fd.Name(), blk.offset, ko.Len())
557 }
558
559 if t.shouldDecrypt() {
560 // Decrypt the block if it is encrypted.
561 if blk.data, err = t.decrypt(blk.data, true); err != nil {
562 return nil, err
563 }
564 // blk.data is allocated via Calloc. So, do free.
565 blk.freeMe = true
566 }
567
568 if err = t.decompress(blk); err != nil {
569 return nil, y.Wrapf(err,
570 "failed to decode compressed data in file: %s at offset: %d, len: %d",
571 t.Fd.Name(), blk.offset, ko.Len())
572 }
573
574 // Read meta data related to block.
575 readPos := len(blk.data) - 4 // First read checksum length.
576 blk.chkLen = int(y.BytesToU32(blk.data[readPos : readPos+4]))
577
578 // Checksum length greater than block size could happen if the table was compressed and
579 // it was opened with an incorrect compression algorithm (or the data was corrupted).
580 if blk.chkLen > len(blk.data) {
581 return nil, errors.New("invalid checksum length. Either the data is " +
582 "corrupted or the table options are incorrectly set")
583 }
584
585 // Read checksum and store it
586 readPos -= blk.chkLen
587 blk.checksum = blk.data[readPos : readPos+blk.chkLen]
588 // Move back and read numEntries in the block.
589 readPos -= 4
590 numEntries := int(y.BytesToU32(blk.data[readPos : readPos+4]))
591 entriesIndexStart := readPos - (numEntries * 4)
592 entriesIndexEnd := entriesIndexStart + numEntries*4
593
594 blk.entryOffsets = y.BytesToU32Slice(blk.data[entriesIndexStart:entriesIndexEnd])
595
596 blk.entriesIndexStart = entriesIndexStart
597
598 // Drop checksum and checksum length.
599 // The checksum is calculated for actual data + entry index + index length
600 blk.data = blk.data[:readPos+4]
601
602 // Verify checksum on if checksum verification mode is OnRead on OnStartAndRead.
603 if t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead {
604 if err = blk.verifyCheckSum(); err != nil {
605 return nil, err
606 }
607 }
608
609 blk.incrRef()
610 if useCache && t.opt.BlockCache != nil {
611 key := t.blockCacheKey(idx)
612 // incrRef should never return false here because we're calling it on a
613 // new block with ref=1.
614 y.AssertTrue(blk.incrRef())
615
616 // Decrement the block ref if we could not insert it in the cache.
617 if !t.opt.BlockCache.Set(key, blk, blk.size()) {
618 blk.decrRef()
619 }
620 // We have added an OnReject func in our cache, which gets called in case the block is not
621 // admitted to the cache. So, every block would be accounted for.
622 }
623 return blk, nil
624 }
625
626 // blockCacheKey is used to store blocks in the block cache.
627 func (t *Table) blockCacheKey(idx int) []byte {
628 y.AssertTrue(t.id < math.MaxUint32)
629 y.AssertTrue(uint32(idx) < math.MaxUint32)
630
631 buf := make([]byte, 8)
632 // Assume t.ID does not overflow uint32.
633 binary.BigEndian.PutUint32(buf[:4], uint32(t.ID()))
634 binary.BigEndian.PutUint32(buf[4:], uint32(idx))
635 return buf
636 }
637
638 // indexKey returns the cache key for block offsets. blockOffsets
639 // are stored in the index cache.
640 func (t *Table) indexKey() uint64 {
641 return t.id
642 }
643
644 // IndexSize is the size of table index in bytes.
645 func (t *Table) IndexSize() int {
646 return t.indexLen
647 }
648
649 // Size is its file size in bytes
650 func (t *Table) Size() int64 { return int64(t.tableSize) }
651
652 // StaleDataSize is the amount of stale data (that can be dropped by a compaction )in this SST.
653 func (t *Table) StaleDataSize() uint32 { return t.fetchIndex().StaleDataSize() }
654
655 // Smallest is its smallest key, or nil if there are none
656 func (t *Table) Smallest() []byte { return t.smallest }
657
658 // Biggest is its biggest key, or nil if there are none
659 func (t *Table) Biggest() []byte { return t.biggest }
660
661 // Filename is NOT the file name. Just kidding, it is.
662 func (t *Table) Filename() string { return t.Fd.Name() }
663
664 // ID is the table's ID number (used to make the file name).
665 func (t *Table) ID() uint64 { return t.id }
666
667 // DoesNotHave returns true if and only if the table does not have the key hash.
668 // It does a bloom filter lookup.
669 func (t *Table) DoesNotHave(hash uint32) bool {
670 if !t.hasBloomFilter {
671 return false
672 }
673
674 y.NumLSMBloomHitsAdd(t.opt.MetricsEnabled, "DoesNotHave_ALL", 1)
675 index := t.fetchIndex()
676 bf := index.BloomFilterBytes()
677 mayContain := y.Filter(bf).MayContain(hash)
678 if !mayContain {
679 y.NumLSMBloomHitsAdd(t.opt.MetricsEnabled, "DoesNotHave_HIT", 1)
680 }
681 return !mayContain
682 }
683
684 // readTableIndex reads table index from the sst and returns its pb format.
685 func (t *Table) readTableIndex() (*fb.TableIndex, error) {
686 data := t.readNoFail(t.indexStart, t.indexLen)
687 var err error
688 // Decrypt the table index if it is encrypted.
689 if t.shouldDecrypt() {
690 if data, err = t.decrypt(data, false); err != nil {
691 return nil, y.Wrapf(err,
692 "Error while decrypting table index for the table %d in readTableIndex", t.id)
693 }
694 }
695 return fb.GetRootAsTableIndex(data, 0), nil
696 }
697
698 // VerifyChecksum verifies checksum for all blocks of table. This function is called by
699 // OpenTable() function. This function is also called inside levelsController.VerifyChecksum().
700 func (t *Table) VerifyChecksum() error {
701 ti := t.fetchIndex()
702 for i := 0; i < ti.OffsetsLength(); i++ {
703 b, err := t.block(i, true)
704 if err != nil {
705 return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d",
706 t.Filename(), i, b.offset)
707 }
708 // We should not call incrRef here, because the block already has one ref when created.
709 defer b.decrRef()
710 // OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum
711 // on block, verification would be done while reading block itself.
712 if !(t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead) {
713 if err = b.verifyCheckSum(); err != nil {
714 return y.Wrapf(err,
715 "checksum validation failed for table: %s, block: %d, offset:%d",
716 t.Filename(), i, b.offset)
717 }
718 }
719 }
720 return nil
721 }
722
723 // shouldDecrypt tells whether to decrypt or not. We decrypt only if the datakey exist
724 // for the table.
725 func (t *Table) shouldDecrypt() bool {
726 return t.opt.DataKey != nil
727 }
728
729 // KeyID returns data key id.
730 func (t *Table) KeyID() uint64 {
731 if t.opt.DataKey != nil {
732 return t.opt.DataKey.KeyId
733 }
734 // By default it's 0, if it is plain text.
735 return 0
736 }
737
738 // decrypt decrypts the given data. It should be called only after checking shouldDecrypt.
739 func (t *Table) decrypt(data []byte, viaCalloc bool) ([]byte, error) {
740 // Last BlockSize bytes of the data is the IV.
741 iv := data[len(data)-aes.BlockSize:]
742 // Rest all bytes are data.
743 data = data[:len(data)-aes.BlockSize]
744
745 var dst []byte
746 if viaCalloc {
747 dst = z.Calloc(len(data), "Table.Decrypt")
748 } else {
749 dst = make([]byte, len(data))
750 }
751 if err := y.XORBlock(dst, data, t.opt.DataKey.Data, iv); err != nil {
752 return nil, y.Wrapf(err, "while decrypt")
753 }
754 return dst, nil
755 }
756
757 // ParseFileID reads the file id out of a filename.
758 func ParseFileID(name string) (uint64, bool) {
759 name = filepath.Base(name)
760 if !strings.HasSuffix(name, fileSuffix) {
761 return 0, false
762 }
763 // suffix := name[len(fileSuffix):]
764 name = strings.TrimSuffix(name, fileSuffix)
765 id, err := strconv.Atoi(name)
766 if err != nil {
767 return 0, false
768 }
769 y.AssertTrue(id >= 0)
770 return uint64(id), true
771 }
772
773 // IDToFilename does the inverse of ParseFileID
774 func IDToFilename(id uint64) string {
775 return fmt.Sprintf("%06d", id) + fileSuffix
776 }
777
778 // NewFilename should be named TableFilepath -- it combines the dir with the ID to make a table
779 // filepath.
780 func NewFilename(id uint64, dir string) string {
781 return filepath.Join(dir, IDToFilename(id))
782 }
783
784 // decompress decompresses the data stored in a block.
785 func (t *Table) decompress(b *Block) error {
786 var dst []byte
787 var err error
788
789 // Point to the original b.data
790 src := b.data
791
792 switch t.opt.Compression {
793 case options.None:
794 // Nothing to be done here.
795 return nil
796 case options.Snappy:
797 if sz, err := snappy.DecodedLen(b.data); err == nil {
798 dst = z.Calloc(sz, "Table.Decompress")
799 } else {
800 dst = z.Calloc(len(b.data)*4, "Table.Decompress") // Take a guess.
801 }
802 b.data, err = snappy.Decode(dst, b.data)
803 if err != nil {
804 z.Free(dst)
805 return y.Wrap(err, "failed to decompress")
806 }
807 case options.ZSTD:
808 sz := int(float64(t.opt.BlockSize) * 1.2)
809 // Get frame content size from header.
810 var hdr zstd.Header
811 if err := hdr.Decode(b.data); err == nil && hdr.HasFCS && hdr.FrameContentSize < uint64(t.opt.BlockSize*2) {
812 sz = int(hdr.FrameContentSize)
813 }
814 dst = z.Calloc(sz, "Table.Decompress")
815 b.data, err = y.ZSTDDecompress(dst, b.data)
816 if err != nil {
817 z.Free(dst)
818 return y.Wrap(err, "failed to decompress")
819 }
820 default:
821 return errors.New("Unsupported compression type")
822 }
823
824 if b.freeMe {
825 z.Free(src)
826 b.freeMe = false
827 }
828
829 if len(b.data) > 0 && len(dst) > 0 && &dst[0] != &b.data[0] {
830 z.Free(dst)
831 } else {
832 b.freeMe = true
833 }
834 return nil
835 }
836