builder.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package table
7
8 import (
9 "crypto/aes"
10 "errors"
11 "math"
12 "runtime"
13 "sync"
14 "sync/atomic"
15 "unsafe"
16
17 fbs "github.com/google/flatbuffers/go"
18 "github.com/klauspost/compress/s2"
19 "google.golang.org/protobuf/proto"
20
21 "github.com/dgraph-io/badger/v4/fb"
22 "github.com/dgraph-io/badger/v4/options"
23 "github.com/dgraph-io/badger/v4/pb"
24 "github.com/dgraph-io/badger/v4/y"
25 "github.com/dgraph-io/ristretto/v2/z"
26 )
27
28 const (
29 KB = 1024
30 MB = KB * 1024
31
32 // When a block is encrypted, it's length increases. We add 256 bytes of padding to
33 // handle cases when block size increases. This is an approximate number.
34 padding = 256
35 )
36
37 type header struct {
38 overlap uint16 // Overlap with base key.
39 diff uint16 // Length of the diff.
40 }
41
42 const headerSize = uint16(unsafe.Sizeof(header{}))
43
44 // Encode encodes the header.
45 func (h header) Encode() []byte {
46 var b [4]byte
47 *(*header)(unsafe.Pointer(&b[0])) = h
48 return b[:]
49 }
50
51 // Decode decodes the header.
52 func (h *header) Decode(buf []byte) {
53 // Copy over data from buf into h. Using *h=unsafe.pointer(...) leads to
54 // pointer alignment issues. See https://github.com/hypermodeinc/badger/issues/1096
55 // and comment https://github.com/hypermodeinc/badger/pull/1097#pullrequestreview-307361714
56 copy(((*[headerSize]byte)(unsafe.Pointer(h))[:]), buf[:headerSize])
57 }
58
59 // bblock represents a block that is being compressed/encrypted in the background.
60 type bblock struct {
61 data []byte
62 baseKey []byte // Base key for the current block.
63 entryOffsets []uint32 // Offsets of entries present in current block.
64 end int // Points to the end offset of the block.
65 }
66
67 // Builder is used in building a table.
68 type Builder struct {
69 // Typically tens or hundreds of meg. This is for one single file.
70 alloc *z.Allocator
71 curBlock *bblock
72 compressedSize atomic.Uint32
73 uncompressedSize atomic.Uint32
74
75 lenOffsets uint32
76 keyHashes []uint32 // Used for building the bloomfilter.
77 opts *Options
78 maxVersion uint64
79 onDiskSize uint32
80 staleDataSize int
81
82 // Used to concurrently compress/encrypt blocks.
83 wg sync.WaitGroup
84 blockChan chan *bblock
85 blockList []*bblock
86 }
87
88 func (b *Builder) allocate(need int) []byte {
89 bb := b.curBlock
90 if len(bb.data[bb.end:]) < need {
91 // We need to reallocate. 1GB is the max size that the allocator can allocate.
92 // While reallocating, if doubling exceeds that limit, then put the upper bound on it.
93 sz := 2 * len(bb.data)
94 if sz > (1 << 30) {
95 sz = 1 << 30
96 }
97 if bb.end+need > sz {
98 sz = bb.end + need
99 }
100 tmp := b.alloc.Allocate(sz)
101 copy(tmp, bb.data)
102 bb.data = tmp
103 }
104 bb.end += need
105 return bb.data[bb.end-need : bb.end]
106 }
107
108 // append appends to curBlock.data
109 func (b *Builder) append(data []byte) {
110 dst := b.allocate(len(data))
111 y.AssertTrue(len(data) == copy(dst, data))
112 }
113
114 const maxAllocatorInitialSz = 256 << 20
115
116 // NewTableBuilder makes a new TableBuilder.
117 func NewTableBuilder(opts Options) *Builder {
118 sz := 2 * int(opts.TableSize)
119 if sz > maxAllocatorInitialSz {
120 sz = maxAllocatorInitialSz
121 }
122 b := &Builder{
123 alloc: opts.AllocPool.Get(sz, "TableBuilder"),
124 opts: &opts,
125 }
126 b.alloc.Tag = "Builder"
127 b.curBlock = &bblock{
128 data: b.alloc.Allocate(opts.BlockSize + padding),
129 }
130 b.opts.tableCapacity = uint64(float64(b.opts.TableSize) * 0.95)
131
132 // If encryption or compression is not enabled, do not start compression/encryption goroutines
133 // and write directly to the buffer.
134 if b.opts.Compression == options.None && b.opts.DataKey == nil {
135 return b
136 }
137
138 count := 2 * runtime.NumCPU()
139 b.blockChan = make(chan *bblock, count*2)
140
141 b.wg.Add(count)
142 for i := 0; i < count; i++ {
143 go b.handleBlock()
144 }
145 return b
146 }
147
148 func maxEncodedLen(ctype options.CompressionType, sz int) int {
149 switch ctype {
150 case options.Snappy:
151 return s2.MaxEncodedLen(sz)
152 case options.ZSTD:
153 return y.ZSTDCompressBound(sz)
154 }
155 return sz
156 }
157
158 func (b *Builder) handleBlock() {
159 defer b.wg.Done()
160
161 doCompress := b.opts.Compression != options.None
162 for item := range b.blockChan {
163 // Extract the block.
164 blockBuf := item.data[:item.end]
165 // Compress the block.
166 if doCompress {
167 out, err := b.compressData(blockBuf)
168 y.Check(err)
169 blockBuf = out
170 }
171 if b.shouldEncrypt() {
172 out, err := b.encrypt(blockBuf)
173 y.Check(y.Wrapf(err, "Error while encrypting block in table builder."))
174 blockBuf = out
175 }
176
177 // BlockBuf should always less than or equal to allocated space. If the blockBuf is greater
178 // than allocated space that means the data from this block cannot be stored in its
179 // existing location.
180 allocatedSpace := maxEncodedLen(b.opts.Compression, (item.end)) + padding + 1
181 y.AssertTrue(len(blockBuf) <= allocatedSpace)
182
183 // blockBuf was allocated on allocator. So, we don't need to copy it over.
184 item.data = blockBuf
185 item.end = len(blockBuf)
186 b.compressedSize.Add(uint32(len(blockBuf)))
187 }
188 }
189
190 // Close closes the TableBuilder.
191 func (b *Builder) Close() {
192 b.opts.AllocPool.Return(b.alloc)
193 }
194
195 // Empty returns whether it's empty.
196 func (b *Builder) Empty() bool { return len(b.keyHashes) == 0 }
197
198 // keyDiff returns a suffix of newKey that is different from b.baseKey.
199 func (b *Builder) keyDiff(newKey []byte) []byte {
200 var i int
201 for i = 0; i < len(newKey) && i < len(b.curBlock.baseKey); i++ {
202 if newKey[i] != b.curBlock.baseKey[i] {
203 break
204 }
205 }
206 return newKey[i:]
207 }
208
209 func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint32) {
210 b.keyHashes = append(b.keyHashes, y.Hash(y.ParseKey(key)))
211
212 if version := y.ParseTs(key); version > b.maxVersion {
213 b.maxVersion = version
214 }
215
216 // diffKey stores the difference of key with baseKey.
217 var diffKey []byte
218 if len(b.curBlock.baseKey) == 0 {
219 // Make a copy. Builder should not keep references. Otherwise, caller has to be very careful
220 // and will have to make copies of keys every time they add to builder, which is even worse.
221 b.curBlock.baseKey = append(b.curBlock.baseKey[:0], key...)
222 diffKey = key
223 } else {
224 diffKey = b.keyDiff(key)
225 }
226
227 y.AssertTrue(len(key)-len(diffKey) <= math.MaxUint16)
228 y.AssertTrue(len(diffKey) <= math.MaxUint16)
229
230 h := header{
231 overlap: uint16(len(key) - len(diffKey)),
232 diff: uint16(len(diffKey)),
233 }
234
235 // store current entry's offset
236 b.curBlock.entryOffsets = append(b.curBlock.entryOffsets, uint32(b.curBlock.end))
237
238 // Layout: header, diffKey, value.
239 b.append(h.Encode())
240 b.append(diffKey)
241
242 dst := b.allocate(int(v.EncodedSize()))
243 v.Encode(dst)
244
245 // Add the vpLen to the onDisk size. We'll add the size of the block to
246 // onDisk size in Finish() function.
247 b.onDiskSize += vpLen
248 }
249
250 /*
251 Structure of Block.
252 +-------------------+---------------------+--------------------+--------------+------------------+
253 | Entry1 | Entry2 | Entry3 | Entry4 | Entry5 |
254 +-------------------+---------------------+--------------------+--------------+------------------+
255 | Entry6 | ... | ... | ... | EntryN |
256 +-------------------+---------------------+--------------------+--------------+------------------+
257 | Block Meta(contains list of offsets used| Block Meta Size | Block | Checksum Size |
258 | to perform binary search in the block) | (4 Bytes) | Checksum | (4 Bytes) |
259 +-----------------------------------------+--------------------+--------------+------------------+
260 */
261 // In case the data is encrypted, the "IV" is added to the end of the block.
262 func (b *Builder) finishBlock() {
263 if len(b.curBlock.entryOffsets) == 0 {
264 return
265 }
266 // Append the entryOffsets and its length.
267 b.append(y.U32SliceToBytes(b.curBlock.entryOffsets))
268 b.append(y.U32ToBytes(uint32(len(b.curBlock.entryOffsets))))
269
270 checksum := b.calculateChecksum(b.curBlock.data[:b.curBlock.end])
271
272 // Append the block checksum and its length.
273 b.append(checksum)
274 b.append(y.U32ToBytes(uint32(len(checksum))))
275
276 b.blockList = append(b.blockList, b.curBlock)
277 b.uncompressedSize.Add(uint32(b.curBlock.end))
278
279 // Add length of baseKey (rounded to next multiple of 4 because of alignment).
280 // Add another 40 Bytes, these additional 40 bytes consists of
281 // 12 bytes of metadata of flatbuffer
282 // 8 bytes for Key in flat buffer
283 // 8 bytes for offset
284 // 8 bytes for the len
285 // 4 bytes for the size of slice while SliceAllocate
286 b.lenOffsets += uint32(int(math.Ceil(float64(len(b.curBlock.baseKey))/4))*4) + 40
287
288 // If compression/encryption is enabled, we need to send the block to the blockChan.
289 if b.blockChan != nil {
290 b.blockChan <- b.curBlock
291 }
292 }
293
294 func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool {
295 // If there is no entry till now, we will return false.
296 if len(b.curBlock.entryOffsets) <= 0 {
297 return false
298 }
299
300 // Integer overflow check for statements below.
301 y.AssertTrue((uint32(len(b.curBlock.entryOffsets))+1)*4+4+8+4 < math.MaxUint32)
302 // We should include current entry also in size, that's why +1 to len(b.entryOffsets).
303 entriesOffsetsSize := uint32((len(b.curBlock.entryOffsets)+1)*4 +
304 4 + // size of list
305 8 + // Sum64 in checksum proto
306 4) // checksum length
307 estimatedSize := uint32(b.curBlock.end) + uint32(6 /*header size for entry*/) +
308 uint32(len(key)) + value.EncodedSize() + entriesOffsetsSize
309
310 if b.shouldEncrypt() {
311 // IV is added at the end of the block, while encrypting.
312 // So, size of IV is added to estimatedSize.
313 estimatedSize += aes.BlockSize
314 }
315
316 // Integer overflow check for table size.
317 y.AssertTrue(uint64(b.curBlock.end)+uint64(estimatedSize) < math.MaxUint32)
318
319 return estimatedSize > uint32(b.opts.BlockSize)
320 }
321
322 // AddStaleKey is same is Add function but it also increments the internal
323 // staleDataSize counter. This value will be used to prioritize this table for
324 // compaction.
325 func (b *Builder) AddStaleKey(key []byte, v y.ValueStruct, valueLen uint32) {
326 // Rough estimate based on how much space it will occupy in the SST.
327 b.staleDataSize += len(key) + len(v.Value) + 4 /* entry offset */ + 4 /* header size */
328 b.addInternal(key, v, valueLen, true)
329 }
330
331 // Add adds a key-value pair to the block.
332 func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) {
333 b.addInternal(key, value, valueLen, false)
334 }
335
336 func (b *Builder) addInternal(key []byte, value y.ValueStruct, valueLen uint32, isStale bool) {
337 if b.shouldFinishBlock(key, value) {
338 if isStale {
339 // This key will be added to tableIndex and it is stale.
340 b.staleDataSize += len(key) + 4 /* len */ + 4 /* offset */
341 }
342 b.finishBlock()
343 // Create a new block and start writing.
344 b.curBlock = &bblock{
345 data: b.alloc.Allocate(b.opts.BlockSize + padding),
346 }
347 }
348 b.addHelper(key, value, valueLen)
349 }
350
351 // TODO: vvv this was the comment on ReachedCapacity.
352 // FinalSize returns the *rough* final size of the array, counting the header which is
353 // not yet written.
354 // TODO: Look into why there is a discrepancy. I suspect it is because of Write(empty, empty)
355 // at the end. The diff can vary.
356
357 // ReachedCapacity returns true if we... roughly (?) reached capacity?
358 func (b *Builder) ReachedCapacity() bool {
359 // If encryption/compression is enabled then use the compresssed size.
360 sumBlockSizes := b.compressedSize.Load()
361 if b.opts.Compression == options.None && b.opts.DataKey == nil {
362 sumBlockSizes = b.uncompressedSize.Load()
363 }
364 blocksSize := sumBlockSizes + // actual length of current buffer
365 uint32(len(b.curBlock.entryOffsets)*4) + // all entry offsets size
366 4 + // count of all entry offsets
367 8 + // checksum bytes
368 4 // checksum length
369
370 estimateSz := blocksSize +
371 4 + // Index length
372 b.lenOffsets
373
374 return uint64(estimateSz) > b.opts.tableCapacity
375 }
376
377 // Finish finishes the table by appending the index.
378 /*
379 The table structure looks like
380 +---------+------------+-----------+---------------+
381 | Block 1 | Block 2 | Block 3 | Block 4 |
382 +---------+------------+-----------+---------------+
383 | Block 5 | Block 6 | Block ... | Block N |
384 +---------+------------+-----------+---------------+
385 | Index | Index Size | Checksum | Checksum Size |
386 +---------+------------+-----------+---------------+
387 */
388 // In case the data is encrypted, the "IV" is added to the end of the index.
389 func (b *Builder) Finish() []byte {
390 bd := b.Done()
391 buf := make([]byte, bd.Size)
392 written := bd.Copy(buf)
393 y.AssertTrue(written == len(buf))
394 return buf
395 }
396
397 type buildData struct {
398 blockList []*bblock
399 index []byte
400 checksum []byte
401 Size int
402 alloc *z.Allocator
403 }
404
405 func (bd *buildData) Copy(dst []byte) int {
406 var written int
407 for _, bl := range bd.blockList {
408 written += copy(dst[written:], bl.data[:bl.end])
409 }
410 written += copy(dst[written:], bd.index)
411 written += copy(dst[written:], y.U32ToBytes(uint32(len(bd.index))))
412
413 written += copy(dst[written:], bd.checksum)
414 written += copy(dst[written:], y.U32ToBytes(uint32(len(bd.checksum))))
415 return written
416 }
417
418 func (b *Builder) Done() buildData {
419 b.finishBlock() // This will never start a new block.
420 if b.blockChan != nil {
421 close(b.blockChan)
422 }
423 // Wait for block handler to finish.
424 b.wg.Wait()
425
426 if len(b.blockList) == 0 {
427 return buildData{}
428 }
429 bd := buildData{
430 blockList: b.blockList,
431 alloc: b.alloc,
432 }
433
434 var f y.Filter
435 if b.opts.BloomFalsePositive > 0 {
436 bits := y.BloomBitsPerKey(len(b.keyHashes), b.opts.BloomFalsePositive)
437 f = y.NewFilter(b.keyHashes, bits)
438 }
439 index, dataSize := b.buildIndex(f)
440
441 var err error
442 if b.shouldEncrypt() {
443 index, err = b.encrypt(index)
444 y.Check(err)
445 }
446 checksum := b.calculateChecksum(index)
447
448 bd.index = index
449 bd.checksum = checksum
450 bd.Size = int(dataSize) + len(index) + len(checksum) + 4 + 4
451 return bd
452 }
453
454 func (b *Builder) calculateChecksum(data []byte) []byte {
455 // Build checksum for the index.
456 checksum := pb.Checksum{
457 // TODO: The checksum type should be configurable from the
458 // options.
459 // We chose to use CRC32 as the default option because
460 // it performed better compared to xxHash64.
461 // See the BenchmarkChecksum in table_test.go file
462 // Size => 1024 B 2048 B
463 // CRC32 => 63.7 ns/op 112 ns/op
464 // xxHash64 => 87.5 ns/op 158 ns/op
465 Sum: y.CalculateChecksum(data, pb.Checksum_CRC32C),
466 Algo: pb.Checksum_CRC32C,
467 }
468
469 // Write checksum to the file.
470 chksum, err := proto.Marshal(&checksum)
471 y.Check(err)
472 // Write checksum size.
473 return chksum
474 }
475
476 // DataKey returns datakey of the builder.
477 func (b *Builder) DataKey() *pb.DataKey {
478 return b.opts.DataKey
479 }
480
481 func (b *Builder) Opts() *Options {
482 return b.opts
483 }
484
485 // encrypt will encrypt the given data and appends IV to the end of the encrypted data.
486 // This should be only called only after checking shouldEncrypt method.
487 func (b *Builder) encrypt(data []byte) ([]byte, error) {
488 iv, err := y.GenerateIV()
489 if err != nil {
490 return data, y.Wrapf(err, "Error while generating IV in Builder.encrypt")
491 }
492 needSz := len(data) + len(iv)
493 dst := b.alloc.Allocate(needSz)
494
495 if err = y.XORBlock(dst[:len(data)], data, b.DataKey().Data, iv); err != nil {
496 return data, y.Wrapf(err, "Error while encrypting in Builder.encrypt")
497 }
498
499 y.AssertTrue(len(iv) == copy(dst[len(data):], iv))
500 return dst, nil
501 }
502
503 // shouldEncrypt tells us whether to encrypt the data or not.
504 // We encrypt only if the data key exist. Otherwise, not.
505 func (b *Builder) shouldEncrypt() bool {
506 return b.opts.DataKey != nil
507 }
508
509 // compressData compresses the given data.
510 func (b *Builder) compressData(data []byte) ([]byte, error) {
511 switch b.opts.Compression {
512 case options.None:
513 return data, nil
514 case options.Snappy:
515 sz := s2.MaxEncodedLen(len(data))
516 dst := b.alloc.Allocate(sz)
517 return s2.EncodeSnappy(dst, data), nil
518 case options.ZSTD:
519 sz := y.ZSTDCompressBound(len(data))
520 dst := b.alloc.Allocate(sz)
521 return y.ZSTDCompress(dst, data, b.opts.ZSTDCompressionLevel)
522 }
523 return nil, errors.New("Unsupported compression type")
524 }
525
526 func (b *Builder) buildIndex(bloom []byte) ([]byte, uint32) {
527 builder := fbs.NewBuilder(3 << 20)
528
529 boList, dataSize := b.writeBlockOffsets(builder)
530 // Write block offset vector the the idxBuilder.
531 fb.TableIndexStartOffsetsVector(builder, len(boList))
532
533 // Write individual block offsets in reverse order to work around how Flatbuffers expects it.
534 for i := len(boList) - 1; i >= 0; i-- {
535 builder.PrependUOffsetT(boList[i])
536 }
537 boEnd := builder.EndVector(len(boList))
538
539 var bfoff fbs.UOffsetT
540 // Write the bloom filter.
541 if len(bloom) > 0 {
542 bfoff = builder.CreateByteVector(bloom)
543 }
544 b.onDiskSize += dataSize
545 fb.TableIndexStart(builder)
546 fb.TableIndexAddOffsets(builder, boEnd)
547 fb.TableIndexAddBloomFilter(builder, bfoff)
548 fb.TableIndexAddMaxVersion(builder, b.maxVersion)
549 fb.TableIndexAddUncompressedSize(builder, b.uncompressedSize.Load())
550 fb.TableIndexAddKeyCount(builder, uint32(len(b.keyHashes)))
551 fb.TableIndexAddOnDiskSize(builder, b.onDiskSize)
552 fb.TableIndexAddStaleDataSize(builder, uint32(b.staleDataSize))
553 builder.Finish(fb.TableIndexEnd(builder))
554
555 buf := builder.FinishedBytes()
556 index := fb.GetRootAsTableIndex(buf, 0)
557 // Mutate the ondisk size to include the size of the index as well.
558 y.AssertTrue(index.MutateOnDiskSize(index.OnDiskSize() + uint32(len(buf))))
559 return buf, dataSize
560 }
561
562 // writeBlockOffsets writes all the blockOffets in b.offsets and returns the
563 // offsets for the newly written items.
564 func (b *Builder) writeBlockOffsets(builder *fbs.Builder) ([]fbs.UOffsetT, uint32) {
565 var startOffset uint32
566 var uoffs []fbs.UOffsetT
567 for _, bl := range b.blockList {
568 uoff := b.writeBlockOffset(builder, bl, startOffset)
569 uoffs = append(uoffs, uoff)
570 startOffset += uint32(bl.end)
571 }
572 return uoffs, startOffset
573 }
574
575 // writeBlockOffset writes the given key,offset,len triple to the indexBuilder.
576 // It returns the offset of the newly written blockoffset.
577 func (b *Builder) writeBlockOffset(
578 builder *fbs.Builder, bl *bblock, startOffset uint32) fbs.UOffsetT {
579 // Write the key to the buffer.
580 k := builder.CreateByteVector(bl.baseKey)
581
582 // Build the blockOffset.
583 fb.BlockOffsetStart(builder)
584 fb.BlockOffsetAddKey(builder, k)
585 fb.BlockOffsetAddOffset(builder, startOffset)
586 fb.BlockOffsetAddLen(builder, uint32(bl.end))
587 return fb.BlockOffsetEnd(builder)
588 }
589