table.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package table
   7  
   8  import (
   9  	"bytes"
  10  	"crypto/aes"
  11  	"encoding/binary"
  12  	"errors"
  13  	"fmt"
  14  	"math"
  15  	"os"
  16  	"path/filepath"
  17  	"strconv"
  18  	"strings"
  19  	"sync"
  20  	"sync/atomic"
  21  	"time"
  22  	"unsafe"
  23  
  24  	"github.com/klauspost/compress/snappy"
  25  	"github.com/klauspost/compress/zstd"
  26  	"google.golang.org/protobuf/proto"
  27  
  28  	"github.com/dgraph-io/badger/v4/fb"
  29  	"github.com/dgraph-io/badger/v4/options"
  30  	"github.com/dgraph-io/badger/v4/pb"
  31  	"github.com/dgraph-io/badger/v4/y"
  32  	"github.com/dgraph-io/ristretto/v2"
  33  	"github.com/dgraph-io/ristretto/v2/z"
  34  )
  35  
  36  const fileSuffix = ".sst"
  37  const intSize = int(unsafe.Sizeof(int(0)))
  38  
  39  // Options contains configurable options for Table/Builder.
  40  type Options struct {
  41  	// Options for Opening/Building Table.
  42  
  43  	// Open tables in read only mode.
  44  	ReadOnly       bool
  45  	MetricsEnabled bool
  46  
  47  	// Maximum size of the table.
  48  	TableSize     uint64
  49  	tableCapacity uint64 // 0.9x TableSize.
  50  
  51  	// ChkMode is the checksum verification mode for Table.
  52  	ChkMode options.ChecksumVerificationMode
  53  
  54  	// Options for Table builder.
  55  
  56  	// BloomFalsePositive is the false positive probabiltiy of bloom filter.
  57  	BloomFalsePositive float64
  58  
  59  	// BlockSize is the size of each block inside SSTable in bytes.
  60  	BlockSize int
  61  
  62  	// DataKey is the key used to decrypt the encrypted text.
  63  	DataKey *pb.DataKey
  64  
  65  	// Compression indicates the compression algorithm used for block compression.
  66  	Compression options.CompressionType
  67  
  68  	// Block cache is used to cache decompressed and decrypted blocks.
  69  	BlockCache *ristretto.Cache[[]byte, *Block]
  70  	IndexCache *ristretto.Cache[uint64, *fb.TableIndex]
  71  
  72  	AllocPool *z.AllocatorPool
  73  
  74  	// ZSTDCompressionLevel is the ZSTD compression level used for compressing blocks.
  75  	ZSTDCompressionLevel int
  76  }
  77  
  78  // TableInterface is useful for testing.
  79  type TableInterface interface {
  80  	Smallest() []byte
  81  	Biggest() []byte
  82  	DoesNotHave(hash uint32) bool
  83  	MaxVersion() uint64
  84  }
  85  
  86  // Table represents a loaded table file with the info we have about it.
  87  type Table struct {
  88  	sync.Mutex
  89  	*z.MmapFile
  90  
  91  	tableSize int // Initialized in OpenTable, using fd.Stat().
  92  
  93  	_index *fb.TableIndex // Nil if encryption is enabled. Use fetchIndex to access.
  94  	_cheap *cheapIndex
  95  	ref    atomic.Int32 // For file garbage collection
  96  
  97  	// The following are initialized once and const.
  98  	smallest, biggest []byte // Smallest and largest keys (with timestamps).
  99  	id                uint64 // file id, part of filename
 100  
 101  	Checksum       []byte
 102  	CreatedAt      time.Time
 103  	indexStart     int
 104  	indexLen       int
 105  	hasBloomFilter bool
 106  
 107  	IsInmemory bool // Set to true if the table is on level 0 and opened in memory.
 108  	opt        *Options
 109  }
 110  
 111  type cheapIndex struct {
 112  	MaxVersion        uint64
 113  	KeyCount          uint32
 114  	UncompressedSize  uint32
 115  	OnDiskSize        uint32
 116  	BloomFilterLength int
 117  	OffsetsLength     int
 118  }
 119  
 120  func (t *Table) cheapIndex() *cheapIndex {
 121  	return t._cheap
 122  }
 123  func (t *Table) offsetsLength() int { return t.cheapIndex().OffsetsLength }
 124  
 125  // MaxVersion returns the maximum version across all keys stored in this table.
 126  func (t *Table) MaxVersion() uint64 { return t.cheapIndex().MaxVersion }
 127  
 128  // BloomFilterSize returns the size of the bloom filter in bytes stored in memory.
 129  func (t *Table) BloomFilterSize() int { return t.cheapIndex().BloomFilterLength }
 130  
 131  // UncompressedSize is the size uncompressed data stored in this file.
 132  func (t *Table) UncompressedSize() uint32 { return t.cheapIndex().UncompressedSize }
 133  
 134  // KeyCount is the total number of keys in this table.
 135  func (t *Table) KeyCount() uint32 { return t.cheapIndex().KeyCount }
 136  
 137  // OnDiskSize returns the total size of key-values stored in this table (including the
 138  // disk space occupied on the value log).
 139  func (t *Table) OnDiskSize() uint32 { return t.cheapIndex().OnDiskSize }
 140  
 141  // CompressionType returns the compression algorithm used for block compression.
 142  func (t *Table) CompressionType() options.CompressionType {
 143  	return t.opt.Compression
 144  }
 145  
 146  // IncrRef increments the refcount (having to do with whether the file should be deleted)
 147  func (t *Table) IncrRef() {
 148  	t.ref.Add(1)
 149  }
 150  
 151  // DecrRef decrements the refcount and possibly deletes the table
 152  func (t *Table) DecrRef() error {
 153  	newRef := t.ref.Add(-1)
 154  	if newRef == 0 {
 155  		// We can safely delete this file, because for all the current files, we always have
 156  		// at least one reference pointing to them.
 157  
 158  		// Delete all blocks from the cache.
 159  		for i := 0; i < t.offsetsLength(); i++ {
 160  			t.opt.BlockCache.Del(t.blockCacheKey(i))
 161  		}
 162  		if err := t.Delete(); err != nil {
 163  			return err
 164  		}
 165  	}
 166  	return nil
 167  }
 168  
 169  // BlockEvictHandler is used to reuse the byte slice stored in the block on cache eviction.
 170  func BlockEvictHandler(b *Block) {
 171  	b.decrRef()
 172  }
 173  
 174  type Block struct {
 175  	offset            int
 176  	data              []byte
 177  	checksum          []byte
 178  	entriesIndexStart int      // start index of entryOffsets list
 179  	entryOffsets      []uint32 // used to binary search an entry in the block.
 180  	chkLen            int      // checksum length.
 181  	freeMe            bool     // used to determine if the blocked should be reused.
 182  	ref               atomic.Int32
 183  }
 184  
 185  var NumBlocks atomic.Int32
 186  
 187  // incrRef increments the ref of a block and return a bool indicating if the
 188  // increment was successful. A true value indicates that the block can be used.
 189  func (b *Block) incrRef() bool {
 190  	for {
 191  		// We can't blindly add 1 to ref. We need to check whether it has
 192  		// reached zero first, because if it did, then we should absolutely not
 193  		// use this block.
 194  		ref := b.ref.Load()
 195  		// The ref would not be equal to 0 unless the existing
 196  		// block get evicted before this line. If the ref is zero, it means that
 197  		// the block is already added the the blockPool and cannot be used
 198  		// anymore. The ref of a new block is 1 so the following condition will
 199  		// be true only if the block got reused before we could increment its
 200  		// ref.
 201  		if ref == 0 {
 202  			return false
 203  		}
 204  		// Increment the ref only if it is not zero and has not changed between
 205  		// the time we read it and we're updating it.
 206  		//
 207  		if b.ref.CompareAndSwap(ref, ref+1) {
 208  			return true
 209  		}
 210  	}
 211  }
 212  func (b *Block) decrRef() {
 213  	if b == nil {
 214  		return
 215  	}
 216  
 217  	// Insert the []byte into pool only if the block is resuable. When a block
 218  	// is reusable a new []byte is used for decompression and this []byte can
 219  	// be reused.
 220  	// In case of an uncompressed block, the []byte is a reference to the
 221  	// table.mmap []byte slice. Any attempt to write data to the mmap []byte
 222  	// will lead to SEGFAULT.
 223  	if b.ref.Add(-1) == 0 {
 224  		if b.freeMe {
 225  			z.Free(b.data)
 226  		}
 227  		NumBlocks.Add(-1)
 228  		// blockPool.Put(&b.data)
 229  	}
 230  	y.AssertTrue(b.ref.Load() >= 0)
 231  }
 232  func (b *Block) size() int64 {
 233  	return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
 234  		cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4)
 235  }
 236  
 237  func (b *Block) verifyCheckSum() error {
 238  	cs := &pb.Checksum{}
 239  	if err := proto.Unmarshal(b.checksum, cs); err != nil {
 240  		return y.Wrapf(err, "unable to unmarshal checksum for block")
 241  	}
 242  	return y.VerifyChecksum(b.data, cs)
 243  }
 244  
 245  func CreateTable(fname string, builder *Builder) (*Table, error) {
 246  	bd := builder.Done()
 247  	mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR|os.O_EXCL, bd.Size)
 248  	if err == z.NewFile {
 249  		// Expected.
 250  	} else if err != nil {
 251  		return nil, y.Wrapf(err, "while creating table: %s", fname)
 252  	} else {
 253  		return nil, fmt.Errorf("file already exists: %s", fname)
 254  	}
 255  
 256  	written := bd.Copy(mf.Data)
 257  	y.AssertTrue(written == len(mf.Data))
 258  	if err := z.Msync(mf.Data); err != nil {
 259  		return nil, y.Wrapf(err, "while calling msync on %s", fname)
 260  	}
 261  	return OpenTable(mf, *builder.opts)
 262  }
 263  
 264  // OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function
 265  // entry. Returns a table with one reference count on it (decrementing which may delete the file!
 266  // -- consider t.Close() instead). The fd has to writeable because we call Truncate on it before
 267  // deleting. Checksum for all blocks of table is verified based on value of chkMode.
 268  func OpenTable(mf *z.MmapFile, opts Options) (*Table, error) {
 269  	// BlockSize is used to compute the approximate size of the decompressed
 270  	// block. It should not be zero if the table is compressed.
 271  	if opts.BlockSize == 0 && opts.Compression != options.None {
 272  		return nil, errors.New("Block size cannot be zero")
 273  	}
 274  	fileInfo, err := mf.Fd.Stat()
 275  	if err != nil {
 276  		mf.Close(-1)
 277  		return nil, y.Wrap(err, "")
 278  	}
 279  
 280  	filename := fileInfo.Name()
 281  	id, ok := ParseFileID(filename)
 282  	if !ok {
 283  		mf.Close(-1)
 284  		return nil, fmt.Errorf("Invalid filename: %s", filename)
 285  	}
 286  	t := &Table{
 287  		MmapFile:   mf,
 288  		id:         id,
 289  		opt:        &opts,
 290  		IsInmemory: false,
 291  		tableSize:  int(fileInfo.Size()),
 292  		CreatedAt:  fileInfo.ModTime(),
 293  	}
 294  	// Caller is given one reference.
 295  	t.ref.Store(1)
 296  
 297  	if err := t.initBiggestAndSmallest(); err != nil {
 298  		return nil, y.Wrapf(err, "failed to initialize table")
 299  	}
 300  
 301  	if opts.ChkMode == options.OnTableRead || opts.ChkMode == options.OnTableAndBlockRead {
 302  		if err := t.VerifyChecksum(); err != nil {
 303  			mf.Close(-1)
 304  			return nil, y.Wrapf(err, "failed to verify checksum")
 305  		}
 306  	}
 307  
 308  	return t, nil
 309  }
 310  
 311  // OpenInMemoryTable is similar to OpenTable but it opens a new table from the provided data.
 312  // OpenInMemoryTable is used for L0 tables.
 313  func OpenInMemoryTable(data []byte, id uint64, opt *Options) (*Table, error) {
 314  	mf := &z.MmapFile{
 315  		Data: data,
 316  		Fd:   nil,
 317  	}
 318  	t := &Table{
 319  		MmapFile:   mf,
 320  		opt:        opt,
 321  		tableSize:  len(data),
 322  		IsInmemory: true,
 323  		id:         id, // It is important that each table gets a unique ID.
 324  	}
 325  	// Caller is given one reference.
 326  	t.ref.Store(1)
 327  
 328  	if err := t.initBiggestAndSmallest(); err != nil {
 329  		return nil, err
 330  	}
 331  	return t, nil
 332  }
 333  
 334  func (t *Table) initBiggestAndSmallest() error {
 335  	// This defer will help gathering debugging info incase initIndex crashes.
 336  	defer func() {
 337  		if r := recover(); r != nil {
 338  			// Use defer for printing info because there may be an intermediate panic.
 339  			var debugBuf bytes.Buffer
 340  			defer func() {
 341  				panic(fmt.Sprintf("%s\n== Recovered ==\n", debugBuf.String()))
 342  			}()
 343  
 344  			// Get the count of null bytes at the end of file. This is to make sure if there was an
 345  			// issue with mmap sync or file copy.
 346  			count := 0
 347  			for i := len(t.Data) - 1; i >= 0; i-- {
 348  				if t.Data[i] != 0 {
 349  					break
 350  				}
 351  				count++
 352  			}
 353  
 354  			fmt.Fprintf(&debugBuf, "\n== Recovering from initIndex crash ==\n")
 355  			fmt.Fprintf(&debugBuf, "File Info: [ID: %d, Size: %d, Zeros: %d]\n",
 356  				t.id, t.tableSize, count)
 357  
 358  			fmt.Fprintf(&debugBuf, "isEnrypted: %v ", t.shouldDecrypt())
 359  
 360  			readPos := t.tableSize
 361  
 362  			// Read checksum size.
 363  			readPos -= 4
 364  			buf := t.readNoFail(readPos, 4)
 365  			checksumLen := int(y.BytesToU32(buf))
 366  			fmt.Fprintf(&debugBuf, "checksumLen: %d ", checksumLen)
 367  
 368  			// Read checksum.
 369  			checksum := &pb.Checksum{}
 370  			readPos -= checksumLen
 371  			buf = t.readNoFail(readPos, checksumLen)
 372  			_ = proto.Unmarshal(buf, checksum)
 373  			fmt.Fprintf(&debugBuf, "checksum: %+v ", checksum)
 374  
 375  			// Read index size from the footer.
 376  			readPos -= 4
 377  			buf = t.readNoFail(readPos, 4)
 378  			indexLen := int(y.BytesToU32(buf))
 379  			fmt.Fprintf(&debugBuf, "indexLen: %d ", indexLen)
 380  
 381  			// Read index.
 382  			readPos -= t.indexLen
 383  			t.indexStart = readPos
 384  			indexData := t.readNoFail(readPos, t.indexLen)
 385  			fmt.Fprintf(&debugBuf, "index: %v ", indexData)
 386  		}
 387  	}()
 388  
 389  	var err error
 390  	var ko *fb.BlockOffset
 391  	if ko, err = t.initIndex(); err != nil {
 392  		return y.Wrapf(err, "failed to read index.")
 393  	}
 394  
 395  	t.smallest = y.Copy(ko.KeyBytes())
 396  
 397  	it2 := t.NewIterator(REVERSED | NOCACHE)
 398  	defer it2.Close()
 399  	it2.Rewind()
 400  	if !it2.Valid() {
 401  		return y.Wrapf(it2.err, "failed to initialize biggest for table %s", t.Filename())
 402  	}
 403  	t.biggest = y.Copy(it2.Key())
 404  	return nil
 405  }
 406  
 407  func (t *Table) read(off, sz int) ([]byte, error) {
 408  	return t.Bytes(off, sz)
 409  }
 410  
 411  func (t *Table) readNoFail(off, sz int) []byte {
 412  	res, err := t.read(off, sz)
 413  	y.Check(err)
 414  	return res
 415  }
 416  
 417  // initIndex reads the index and populate the necessary table fields and returns
 418  // first block offset
 419  func (t *Table) initIndex() (*fb.BlockOffset, error) {
 420  	readPos := t.tableSize
 421  
 422  	// Read checksum len from the last 4 bytes.
 423  	readPos -= 4
 424  	buf := t.readNoFail(readPos, 4)
 425  	checksumLen := int(y.BytesToU32(buf))
 426  	if checksumLen < 0 {
 427  		return nil, errors.New("checksum length less than zero. Data corrupted")
 428  	}
 429  
 430  	// Read checksum.
 431  	expectedChk := &pb.Checksum{}
 432  	readPos -= checksumLen
 433  	buf = t.readNoFail(readPos, checksumLen)
 434  	if err := proto.Unmarshal(buf, expectedChk); err != nil {
 435  		return nil, err
 436  	}
 437  
 438  	// Read index size from the footer.
 439  	readPos -= 4
 440  	buf = t.readNoFail(readPos, 4)
 441  	t.indexLen = int(y.BytesToU32(buf))
 442  
 443  	// Read index.
 444  	readPos -= t.indexLen
 445  	t.indexStart = readPos
 446  	data := t.readNoFail(readPos, t.indexLen)
 447  
 448  	if err := y.VerifyChecksum(data, expectedChk); err != nil {
 449  		return nil, y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename())
 450  	}
 451  
 452  	index, err := t.readTableIndex()
 453  	if err != nil {
 454  		return nil, err
 455  	}
 456  	if !t.shouldDecrypt() {
 457  		// If there's no encryption, this points to the mmap'ed buffer.
 458  		t._index = index
 459  	}
 460  	t._cheap = &cheapIndex{
 461  		MaxVersion:        index.MaxVersion(),
 462  		KeyCount:          index.KeyCount(),
 463  		UncompressedSize:  index.UncompressedSize(),
 464  		OnDiskSize:        index.OnDiskSize(),
 465  		OffsetsLength:     index.OffsetsLength(),
 466  		BloomFilterLength: index.BloomFilterLength(),
 467  	}
 468  
 469  	t.hasBloomFilter = len(index.BloomFilterBytes()) > 0
 470  
 471  	var bo fb.BlockOffset
 472  	y.AssertTrue(index.Offsets(&bo, 0))
 473  	return &bo, nil
 474  }
 475  
 476  // KeySplits splits the table into at least n ranges based on the block offsets.
 477  func (t *Table) KeySplits(n int, prefix []byte) []string {
 478  	if n == 0 {
 479  		return nil
 480  	}
 481  
 482  	oLen := t.offsetsLength()
 483  	jump := oLen / n
 484  	if jump == 0 {
 485  		jump = 1
 486  	}
 487  
 488  	var bo fb.BlockOffset
 489  	var res []string
 490  	for i := 0; i < oLen; i += jump {
 491  		if i >= oLen {
 492  			i = oLen - 1
 493  		}
 494  		y.AssertTrue(t.offsets(&bo, i))
 495  		if bytes.HasPrefix(bo.KeyBytes(), prefix) {
 496  			res = append(res, string(bo.KeyBytes()))
 497  		}
 498  	}
 499  	return res
 500  }
 501  
 502  func (t *Table) fetchIndex() *fb.TableIndex {
 503  	if !t.shouldDecrypt() {
 504  		return t._index
 505  	}
 506  
 507  	if t.opt.IndexCache == nil {
 508  		panic("Index Cache must be set for encrypted workloads")
 509  	}
 510  	if val, ok := t.opt.IndexCache.Get(t.indexKey()); ok && val != nil {
 511  		return val
 512  	}
 513  
 514  	index, err := t.readTableIndex()
 515  	y.Check(err)
 516  	t.opt.IndexCache.Set(t.indexKey(), index, int64(t.indexLen))
 517  	return index
 518  }
 519  
 520  func (t *Table) offsets(ko *fb.BlockOffset, i int) bool {
 521  	return t.fetchIndex().Offsets(ko, i)
 522  }
 523  
 524  // block function return a new block. Each block holds a ref and the byte
 525  // slice stored in the block will be reused when the ref becomes zero. The
 526  // caller should release the block by calling block.decrRef() on it.
 527  func (t *Table) block(idx int, useCache bool) (*Block, error) {
 528  	y.AssertTruef(idx >= 0, "idx=%d", idx)
 529  	if idx >= t.offsetsLength() {
 530  		return nil, errors.New("block out of index")
 531  	}
 532  	if t.opt.BlockCache != nil {
 533  		key := t.blockCacheKey(idx)
 534  		blk, ok := t.opt.BlockCache.Get(key)
 535  		if ok && blk != nil {
 536  			// Use the block only if the increment was successful. The block
 537  			// could get evicted from the cache between the Get() call and the
 538  			// incrRef() call.
 539  			if blk.incrRef() {
 540  				return blk, nil
 541  			}
 542  		}
 543  	}
 544  
 545  	var ko fb.BlockOffset
 546  	y.AssertTrue(t.offsets(&ko, idx))
 547  	blk := &Block{offset: int(ko.Offset())}
 548  	blk.ref.Store(1)
 549  	defer blk.decrRef() // Deal with any errors, where blk would not be returned.
 550  	NumBlocks.Add(1)
 551  
 552  	var err error
 553  	if blk.data, err = t.read(blk.offset, int(ko.Len())); err != nil {
 554  		return nil, y.Wrapf(err,
 555  			"failed to read from file: %s at offset: %d, len: %d",
 556  			t.Fd.Name(), blk.offset, ko.Len())
 557  	}
 558  
 559  	if t.shouldDecrypt() {
 560  		// Decrypt the block if it is encrypted.
 561  		if blk.data, err = t.decrypt(blk.data, true); err != nil {
 562  			return nil, err
 563  		}
 564  		// blk.data is allocated via Calloc. So, do free.
 565  		blk.freeMe = true
 566  	}
 567  
 568  	if err = t.decompress(blk); err != nil {
 569  		return nil, y.Wrapf(err,
 570  			"failed to decode compressed data in file: %s at offset: %d, len: %d",
 571  			t.Fd.Name(), blk.offset, ko.Len())
 572  	}
 573  
 574  	// Read meta data related to block.
 575  	readPos := len(blk.data) - 4 // First read checksum length.
 576  	blk.chkLen = int(y.BytesToU32(blk.data[readPos : readPos+4]))
 577  
 578  	// Checksum length greater than block size could happen if the table was compressed and
 579  	// it was opened with an incorrect compression algorithm (or the data was corrupted).
 580  	if blk.chkLen > len(blk.data) {
 581  		return nil, errors.New("invalid checksum length. Either the data is " +
 582  			"corrupted or the table options are incorrectly set")
 583  	}
 584  
 585  	// Read checksum and store it
 586  	readPos -= blk.chkLen
 587  	blk.checksum = blk.data[readPos : readPos+blk.chkLen]
 588  	// Move back and read numEntries in the block.
 589  	readPos -= 4
 590  	numEntries := int(y.BytesToU32(blk.data[readPos : readPos+4]))
 591  	entriesIndexStart := readPos - (numEntries * 4)
 592  	entriesIndexEnd := entriesIndexStart + numEntries*4
 593  
 594  	blk.entryOffsets = y.BytesToU32Slice(blk.data[entriesIndexStart:entriesIndexEnd])
 595  
 596  	blk.entriesIndexStart = entriesIndexStart
 597  
 598  	// Drop checksum and checksum length.
 599  	// The checksum is calculated for actual data + entry index + index length
 600  	blk.data = blk.data[:readPos+4]
 601  
 602  	// Verify checksum on if checksum verification mode is OnRead on OnStartAndRead.
 603  	if t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead {
 604  		if err = blk.verifyCheckSum(); err != nil {
 605  			return nil, err
 606  		}
 607  	}
 608  
 609  	blk.incrRef()
 610  	if useCache && t.opt.BlockCache != nil {
 611  		key := t.blockCacheKey(idx)
 612  		// incrRef should never return false here because we're calling it on a
 613  		// new block with ref=1.
 614  		y.AssertTrue(blk.incrRef())
 615  
 616  		// Decrement the block ref if we could not insert it in the cache.
 617  		if !t.opt.BlockCache.Set(key, blk, blk.size()) {
 618  			blk.decrRef()
 619  		}
 620  		// We have added an OnReject func in our cache, which gets called in case the block is not
 621  		// admitted to the cache. So, every block would be accounted for.
 622  	}
 623  	return blk, nil
 624  }
 625  
 626  // blockCacheKey is used to store blocks in the block cache.
 627  func (t *Table) blockCacheKey(idx int) []byte {
 628  	y.AssertTrue(t.id < math.MaxUint32)
 629  	y.AssertTrue(uint32(idx) < math.MaxUint32)
 630  
 631  	buf := make([]byte, 8)
 632  	// Assume t.ID does not overflow uint32.
 633  	binary.BigEndian.PutUint32(buf[:4], uint32(t.ID()))
 634  	binary.BigEndian.PutUint32(buf[4:], uint32(idx))
 635  	return buf
 636  }
 637  
 638  // indexKey returns the cache key for block offsets. blockOffsets
 639  // are stored in the index cache.
 640  func (t *Table) indexKey() uint64 {
 641  	return t.id
 642  }
 643  
 644  // IndexSize is the size of table index in bytes.
 645  func (t *Table) IndexSize() int {
 646  	return t.indexLen
 647  }
 648  
 649  // Size is its file size in bytes
 650  func (t *Table) Size() int64 { return int64(t.tableSize) }
 651  
 652  // StaleDataSize is the amount of stale data (that can be dropped by a compaction )in this SST.
 653  func (t *Table) StaleDataSize() uint32 { return t.fetchIndex().StaleDataSize() }
 654  
 655  // Smallest is its smallest key, or nil if there are none
 656  func (t *Table) Smallest() []byte { return t.smallest }
 657  
 658  // Biggest is its biggest key, or nil if there are none
 659  func (t *Table) Biggest() []byte { return t.biggest }
 660  
 661  // Filename is NOT the file name.  Just kidding, it is.
 662  func (t *Table) Filename() string { return t.Fd.Name() }
 663  
 664  // ID is the table's ID number (used to make the file name).
 665  func (t *Table) ID() uint64 { return t.id }
 666  
 667  // DoesNotHave returns true if and only if the table does not have the key hash.
 668  // It does a bloom filter lookup.
 669  func (t *Table) DoesNotHave(hash uint32) bool {
 670  	if !t.hasBloomFilter {
 671  		return false
 672  	}
 673  
 674  	y.NumLSMBloomHitsAdd(t.opt.MetricsEnabled, "DoesNotHave_ALL", 1)
 675  	index := t.fetchIndex()
 676  	bf := index.BloomFilterBytes()
 677  	mayContain := y.Filter(bf).MayContain(hash)
 678  	if !mayContain {
 679  		y.NumLSMBloomHitsAdd(t.opt.MetricsEnabled, "DoesNotHave_HIT", 1)
 680  	}
 681  	return !mayContain
 682  }
 683  
 684  // readTableIndex reads table index from the sst and returns its pb format.
 685  func (t *Table) readTableIndex() (*fb.TableIndex, error) {
 686  	data := t.readNoFail(t.indexStart, t.indexLen)
 687  	var err error
 688  	// Decrypt the table index if it is encrypted.
 689  	if t.shouldDecrypt() {
 690  		if data, err = t.decrypt(data, false); err != nil {
 691  			return nil, y.Wrapf(err,
 692  				"Error while decrypting table index for the table %d in readTableIndex", t.id)
 693  		}
 694  	}
 695  	return fb.GetRootAsTableIndex(data, 0), nil
 696  }
 697  
 698  // VerifyChecksum verifies checksum for all blocks of table. This function is called by
 699  // OpenTable() function. This function is also called inside levelsController.VerifyChecksum().
 700  func (t *Table) VerifyChecksum() error {
 701  	ti := t.fetchIndex()
 702  	for i := 0; i < ti.OffsetsLength(); i++ {
 703  		b, err := t.block(i, true)
 704  		if err != nil {
 705  			return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d",
 706  				t.Filename(), i, b.offset)
 707  		}
 708  		// We should not call incrRef here, because the block already has one ref when created.
 709  		defer b.decrRef()
 710  		// OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum
 711  		// on block, verification would be done while reading block itself.
 712  		if !(t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead) {
 713  			if err = b.verifyCheckSum(); err != nil {
 714  				return y.Wrapf(err,
 715  					"checksum validation failed for table: %s, block: %d, offset:%d",
 716  					t.Filename(), i, b.offset)
 717  			}
 718  		}
 719  	}
 720  	return nil
 721  }
 722  
 723  // shouldDecrypt tells whether to decrypt or not. We decrypt only if the datakey exist
 724  // for the table.
 725  func (t *Table) shouldDecrypt() bool {
 726  	return t.opt.DataKey != nil
 727  }
 728  
 729  // KeyID returns data key id.
 730  func (t *Table) KeyID() uint64 {
 731  	if t.opt.DataKey != nil {
 732  		return t.opt.DataKey.KeyId
 733  	}
 734  	// By default it's 0, if it is plain text.
 735  	return 0
 736  }
 737  
 738  // decrypt decrypts the given data. It should be called only after checking shouldDecrypt.
 739  func (t *Table) decrypt(data []byte, viaCalloc bool) ([]byte, error) {
 740  	// Last BlockSize bytes of the data is the IV.
 741  	iv := data[len(data)-aes.BlockSize:]
 742  	// Rest all bytes are data.
 743  	data = data[:len(data)-aes.BlockSize]
 744  
 745  	var dst []byte
 746  	if viaCalloc {
 747  		dst = z.Calloc(len(data), "Table.Decrypt")
 748  	} else {
 749  		dst = make([]byte, len(data))
 750  	}
 751  	if err := y.XORBlock(dst, data, t.opt.DataKey.Data, iv); err != nil {
 752  		return nil, y.Wrapf(err, "while decrypt")
 753  	}
 754  	return dst, nil
 755  }
 756  
 757  // ParseFileID reads the file id out of a filename.
 758  func ParseFileID(name string) (uint64, bool) {
 759  	name = filepath.Base(name)
 760  	if !strings.HasSuffix(name, fileSuffix) {
 761  		return 0, false
 762  	}
 763  	//	suffix := name[len(fileSuffix):]
 764  	name = strings.TrimSuffix(name, fileSuffix)
 765  	id, err := strconv.Atoi(name)
 766  	if err != nil {
 767  		return 0, false
 768  	}
 769  	y.AssertTrue(id >= 0)
 770  	return uint64(id), true
 771  }
 772  
 773  // IDToFilename does the inverse of ParseFileID
 774  func IDToFilename(id uint64) string {
 775  	return fmt.Sprintf("%06d", id) + fileSuffix
 776  }
 777  
 778  // NewFilename should be named TableFilepath -- it combines the dir with the ID to make a table
 779  // filepath.
 780  func NewFilename(id uint64, dir string) string {
 781  	return filepath.Join(dir, IDToFilename(id))
 782  }
 783  
 784  // decompress decompresses the data stored in a block.
 785  func (t *Table) decompress(b *Block) error {
 786  	var dst []byte
 787  	var err error
 788  
 789  	// Point to the original b.data
 790  	src := b.data
 791  
 792  	switch t.opt.Compression {
 793  	case options.None:
 794  		// Nothing to be done here.
 795  		return nil
 796  	case options.Snappy:
 797  		if sz, err := snappy.DecodedLen(b.data); err == nil {
 798  			dst = z.Calloc(sz, "Table.Decompress")
 799  		} else {
 800  			dst = z.Calloc(len(b.data)*4, "Table.Decompress") // Take a guess.
 801  		}
 802  		b.data, err = snappy.Decode(dst, b.data)
 803  		if err != nil {
 804  			z.Free(dst)
 805  			return y.Wrap(err, "failed to decompress")
 806  		}
 807  	case options.ZSTD:
 808  		sz := int(float64(t.opt.BlockSize) * 1.2)
 809  		// Get frame content size from header.
 810  		var hdr zstd.Header
 811  		if err := hdr.Decode(b.data); err == nil && hdr.HasFCS && hdr.FrameContentSize < uint64(t.opt.BlockSize*2) {
 812  			sz = int(hdr.FrameContentSize)
 813  		}
 814  		dst = z.Calloc(sz, "Table.Decompress")
 815  		b.data, err = y.ZSTDDecompress(dst, b.data)
 816  		if err != nil {
 817  			z.Free(dst)
 818  			return y.Wrap(err, "failed to decompress")
 819  		}
 820  	default:
 821  		return errors.New("Unsupported compression type")
 822  	}
 823  
 824  	if b.freeMe {
 825  		z.Free(src)
 826  		b.freeMe = false
 827  	}
 828  
 829  	if len(b.data) > 0 && len(dst) > 0 && &dst[0] != &b.data[0] {
 830  		z.Free(dst)
 831  	} else {
 832  		b.freeMe = true
 833  	}
 834  	return nil
 835  }
 836