iterator.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package table
7
8 import (
9 "bytes"
10 "fmt"
11 "io"
12 "sort"
13
14 "github.com/dgraph-io/badger/v4/fb"
15 "github.com/dgraph-io/badger/v4/y"
16 )
17
18 type blockIterator struct {
19 data []byte
20 idx int // Idx of the entry inside a block
21 err error
22 baseKey []byte
23 key []byte
24 val []byte
25 entryOffsets []uint32
26 block *Block
27
28 tableID uint64
29 blockID int
30 // prevOverlap stores the overlap of the previous key with the base key.
31 // This avoids unnecessary copy of base key when the overlap is same for multiple keys.
32 prevOverlap uint16
33 }
34
35 func (itr *blockIterator) setBlock(b *Block) {
36 // Decrement the ref for the old block. If the old block was compressed, we
37 // might be able to reuse it.
38 itr.block.decrRef()
39
40 itr.block = b
41 itr.err = nil
42 itr.idx = 0
43 itr.baseKey = itr.baseKey[:0]
44 itr.prevOverlap = 0
45 itr.key = itr.key[:0]
46 itr.val = itr.val[:0]
47 // Drop the index from the block. We don't need it anymore.
48 itr.data = b.data[:b.entriesIndexStart]
49 itr.entryOffsets = b.entryOffsets
50 }
51
52 // setIdx sets the iterator to the entry at index i and set it's key and value.
53 func (itr *blockIterator) setIdx(i int) {
54 itr.idx = i
55 if i >= len(itr.entryOffsets) || i < 0 {
56 itr.err = io.EOF
57 return
58 }
59 itr.err = nil
60 startOffset := int(itr.entryOffsets[i])
61
62 // Set base key.
63 if len(itr.baseKey) == 0 {
64 var baseHeader header
65 baseHeader.Decode(itr.data)
66 itr.baseKey = itr.data[headerSize : headerSize+baseHeader.diff]
67 }
68
69 var endOffset int
70 // idx points to the last entry in the block.
71 if itr.idx+1 == len(itr.entryOffsets) {
72 endOffset = len(itr.data)
73 } else {
74 // idx point to some entry other than the last one in the block.
75 // EndOffset of the current entry is the start offset of the next entry.
76 endOffset = int(itr.entryOffsets[itr.idx+1])
77 }
78 defer func() {
79 if r := recover(); r != nil {
80 var debugBuf bytes.Buffer
81 fmt.Fprintf(&debugBuf, "==== Recovered====\n")
82 fmt.Fprintf(&debugBuf, "Table ID: %d\nBlock ID: %d\nEntry Idx: %d\nData len: %d\n"+
83 "StartOffset: %d\nEndOffset: %d\nEntryOffsets len: %d\nEntryOffsets: %v\n",
84 itr.tableID, itr.blockID, itr.idx, len(itr.data), startOffset, endOffset,
85 len(itr.entryOffsets), itr.entryOffsets)
86 panic(debugBuf.String())
87 }
88 }()
89
90 entryData := itr.data[startOffset:endOffset]
91 var h header
92 h.Decode(entryData)
93 // Header contains the length of key overlap and difference compared to the base key. If the key
94 // before this one had the same or better key overlap, we can avoid copying that part into
95 // itr.key. But, if the overlap was lesser, we could copy over just that portion.
96 if h.overlap > itr.prevOverlap {
97 itr.key = append(itr.key[:itr.prevOverlap], itr.baseKey[itr.prevOverlap:h.overlap]...)
98 }
99 itr.prevOverlap = h.overlap
100 valueOff := headerSize + h.diff
101 diffKey := entryData[headerSize:valueOff]
102 itr.key = append(itr.key[:h.overlap], diffKey...)
103 itr.val = entryData[valueOff:]
104 }
105
106 func (itr *blockIterator) Valid() bool {
107 return itr != nil && itr.err == nil
108 }
109
110 func (itr *blockIterator) Error() error {
111 return itr.err
112 }
113
114 func (itr *blockIterator) Close() {
115 itr.block.decrRef()
116 }
117
118 var (
119 origin = 0
120 current = 1
121 )
122
123 // seek brings us to the first block element that is >= input key.
124 func (itr *blockIterator) seek(key []byte, whence int) {
125 itr.err = nil
126 startIndex := 0 // This tells from which index we should start binary search.
127
128 switch whence {
129 case origin:
130 // We don't need to do anything. startIndex is already at 0
131 case current:
132 startIndex = itr.idx
133 }
134
135 foundEntryIdx := sort.Search(len(itr.entryOffsets), func(idx int) bool {
136 // If idx is less than start index then just return false.
137 if idx < startIndex {
138 return false
139 }
140 itr.setIdx(idx)
141 return y.CompareKeys(itr.key, key) >= 0
142 })
143 itr.setIdx(foundEntryIdx)
144 }
145
146 // seekToFirst brings us to the first element.
147 func (itr *blockIterator) seekToFirst() {
148 itr.setIdx(0)
149 }
150
151 // seekToLast brings us to the last element.
152 func (itr *blockIterator) seekToLast() {
153 itr.setIdx(len(itr.entryOffsets) - 1)
154 }
155
156 func (itr *blockIterator) next() {
157 itr.setIdx(itr.idx + 1)
158 }
159
160 func (itr *blockIterator) prev() {
161 itr.setIdx(itr.idx - 1)
162 }
163
164 // Iterator is an iterator for a Table.
165 type Iterator struct {
166 t *Table
167 bpos int
168 bi blockIterator
169 err error
170
171 // Internally, Iterator is bidirectional. However, we only expose the
172 // unidirectional functionality for now.
173 opt int // Valid options are REVERSED and NOCACHE.
174 }
175
176 // NewIterator returns a new iterator of the Table
177 func (t *Table) NewIterator(opt int) *Iterator {
178 t.IncrRef() // Important.
179 ti := &Iterator{t: t, opt: opt}
180 return ti
181 }
182
183 // Close closes the iterator (and it must be called).
184 func (itr *Iterator) Close() error {
185 itr.bi.Close()
186 return itr.t.DecrRef()
187 }
188
189 func (itr *Iterator) reset() {
190 itr.bpos = 0
191 itr.err = nil
192 }
193
194 // Valid follows the y.Iterator interface
195 func (itr *Iterator) Valid() bool {
196 return itr.err == nil
197 }
198
199 func (itr *Iterator) useCache() bool {
200 return itr.opt&NOCACHE == 0
201 }
202
203 func (itr *Iterator) seekToFirst() {
204 numBlocks := itr.t.offsetsLength()
205 if numBlocks == 0 {
206 itr.err = io.EOF
207 return
208 }
209 itr.bpos = 0
210 block, err := itr.t.block(itr.bpos, itr.useCache())
211 if err != nil {
212 itr.err = err
213 return
214 }
215 itr.bi.tableID = itr.t.id
216 itr.bi.blockID = itr.bpos
217 itr.bi.setBlock(block)
218 itr.bi.seekToFirst()
219 itr.err = itr.bi.Error()
220 }
221
222 func (itr *Iterator) seekToLast() {
223 numBlocks := itr.t.offsetsLength()
224 if numBlocks == 0 {
225 itr.err = io.EOF
226 return
227 }
228 itr.bpos = numBlocks - 1
229 block, err := itr.t.block(itr.bpos, itr.useCache())
230 if err != nil {
231 itr.err = err
232 return
233 }
234 itr.bi.tableID = itr.t.id
235 itr.bi.blockID = itr.bpos
236 itr.bi.setBlock(block)
237 itr.bi.seekToLast()
238 itr.err = itr.bi.Error()
239 }
240
241 func (itr *Iterator) seekHelper(blockIdx int, key []byte) {
242 itr.bpos = blockIdx
243 block, err := itr.t.block(blockIdx, itr.useCache())
244 if err != nil {
245 itr.err = err
246 return
247 }
248 itr.bi.tableID = itr.t.id
249 itr.bi.blockID = itr.bpos
250 itr.bi.setBlock(block)
251 itr.bi.seek(key, origin)
252 itr.err = itr.bi.Error()
253 }
254
255 // seekFrom brings us to a key that is >= input key.
256 func (itr *Iterator) seekFrom(key []byte, whence int) {
257 itr.err = nil
258 switch whence {
259 case origin:
260 itr.reset()
261 case current:
262 }
263
264 var ko fb.BlockOffset
265 idx := sort.Search(itr.t.offsetsLength(), func(idx int) bool {
266 // Offsets should never return false since we're iterating within the OffsetsLength.
267 y.AssertTrue(itr.t.offsets(&ko, idx))
268 return y.CompareKeys(ko.KeyBytes(), key) > 0
269 })
270 if idx == 0 {
271 // The smallest key in our table is already strictly > key. We can return that.
272 // This is like a SeekToFirst.
273 itr.seekHelper(0, key)
274 return
275 }
276
277 // block[idx].smallest is > key.
278 // Since idx>0, we know block[idx-1].smallest is <= key.
279 // There are two cases.
280 // 1) Everything in block[idx-1] is strictly < key. In this case, we should go to the first
281 // element of block[idx].
282 // 2) Some element in block[idx-1] is >= key. We should go to that element.
283 itr.seekHelper(idx-1, key)
284 if itr.err == io.EOF {
285 // Case 1. Need to visit block[idx].
286 if idx == itr.t.offsetsLength() {
287 // If idx == len(itr.t.blockIndex), then input key is greater than ANY element of table.
288 // There's nothing we can do. Valid() should return false as we seek to end of table.
289 return
290 }
291 // Since block[idx].smallest is > key. This is essentially a block[idx].SeekToFirst.
292 itr.seekHelper(idx, key)
293 }
294 // Case 2: No need to do anything. We already did the seek in block[idx-1].
295 }
296
297 // seek will reset iterator and seek to >= key.
298 func (itr *Iterator) seek(key []byte) {
299 itr.seekFrom(key, origin)
300 }
301
302 // seekForPrev will reset iterator and seek to <= key.
303 func (itr *Iterator) seekForPrev(key []byte) {
304 // TODO: Optimize this. We shouldn't have to take a Prev step.
305 itr.seekFrom(key, origin)
306 if !bytes.Equal(itr.Key(), key) {
307 itr.prev()
308 }
309 }
310
311 func (itr *Iterator) next() {
312 itr.err = nil
313
314 if itr.bpos >= itr.t.offsetsLength() {
315 itr.err = io.EOF
316 return
317 }
318
319 if len(itr.bi.data) == 0 {
320 block, err := itr.t.block(itr.bpos, itr.useCache())
321 if err != nil {
322 itr.err = err
323 return
324 }
325 itr.bi.tableID = itr.t.id
326 itr.bi.blockID = itr.bpos
327 itr.bi.setBlock(block)
328 itr.bi.seekToFirst()
329 itr.err = itr.bi.Error()
330 return
331 }
332
333 itr.bi.next()
334 if !itr.bi.Valid() {
335 itr.bpos++
336 itr.bi.data = nil
337 itr.next()
338 return
339 }
340 }
341
342 func (itr *Iterator) prev() {
343 itr.err = nil
344 if itr.bpos < 0 {
345 itr.err = io.EOF
346 return
347 }
348
349 if len(itr.bi.data) == 0 {
350 block, err := itr.t.block(itr.bpos, itr.useCache())
351 if err != nil {
352 itr.err = err
353 return
354 }
355 itr.bi.tableID = itr.t.id
356 itr.bi.blockID = itr.bpos
357 itr.bi.setBlock(block)
358 itr.bi.seekToLast()
359 itr.err = itr.bi.Error()
360 return
361 }
362
363 itr.bi.prev()
364 if !itr.bi.Valid() {
365 itr.bpos--
366 itr.bi.data = nil
367 itr.prev()
368 return
369 }
370 }
371
372 // Key follows the y.Iterator interface.
373 // Returns the key with timestamp.
374 func (itr *Iterator) Key() []byte {
375 return itr.bi.key
376 }
377
378 // Value follows the y.Iterator interface
379 func (itr *Iterator) Value() (ret y.ValueStruct) {
380 ret.Decode(itr.bi.val)
381 return
382 }
383
384 // ValueCopy copies the current value and returns it as decoded
385 // ValueStruct.
386 func (itr *Iterator) ValueCopy() (ret y.ValueStruct) {
387 dst := y.Copy(itr.bi.val)
388 ret.Decode(dst)
389 return
390 }
391
392 // Next follows the y.Iterator interface
393 func (itr *Iterator) Next() {
394 if itr.opt&REVERSED == 0 {
395 itr.next()
396 } else {
397 itr.prev()
398 }
399 }
400
401 // Rewind follows the y.Iterator interface
402 func (itr *Iterator) Rewind() {
403 if itr.opt&REVERSED == 0 {
404 itr.seekToFirst()
405 } else {
406 itr.seekToLast()
407 }
408 }
409
410 // Seek follows the y.Iterator interface
411 func (itr *Iterator) Seek(key []byte) {
412 if itr.opt&REVERSED == 0 {
413 itr.seek(key)
414 } else {
415 itr.seekForPrev(key)
416 }
417 }
418
419 var (
420 REVERSED int = 2
421 NOCACHE int = 4
422 )
423
424 // ConcatIterator concatenates the sequences defined by several iterators. (It only works with
425 // TableIterators, probably just because it's faster to not be so generic.)
426 type ConcatIterator struct {
427 idx int // Which iterator is active now.
428 cur *Iterator
429 iters []*Iterator // Corresponds to tables.
430 tables []*Table // Disregarding reversed, this is in ascending order.
431 options int // Valid options are REVERSED and NOCACHE.
432 }
433
434 // NewConcatIterator creates a new concatenated iterator
435 func NewConcatIterator(tbls []*Table, opt int) *ConcatIterator {
436 iters := make([]*Iterator, len(tbls))
437 for i := 0; i < len(tbls); i++ {
438 // Increment the reference count. Since, we're not creating the iterator right now.
439 // Here, We'll hold the reference of the tables, till the lifecycle of the iterator.
440 tbls[i].IncrRef()
441
442 // Save cycles by not initializing the iterators until needed.
443 // iters[i] = tbls[i].NewIterator(reversed)
444 }
445 return &ConcatIterator{
446 options: opt,
447 iters: iters,
448 tables: tbls,
449 idx: -1, // Not really necessary because s.it.Valid()=false, but good to have.
450 }
451 }
452
453 func (s *ConcatIterator) setIdx(idx int) {
454 s.idx = idx
455 if idx < 0 || idx >= len(s.iters) {
456 s.cur = nil
457 return
458 }
459 if s.iters[idx] == nil {
460 s.iters[idx] = s.tables[idx].NewIterator(s.options)
461 }
462 s.cur = s.iters[s.idx]
463 }
464
465 // Rewind implements y.Interface
466 func (s *ConcatIterator) Rewind() {
467 if len(s.iters) == 0 {
468 return
469 }
470 if s.options&REVERSED == 0 {
471 s.setIdx(0)
472 } else {
473 s.setIdx(len(s.iters) - 1)
474 }
475 s.cur.Rewind()
476 }
477
478 // Valid implements y.Interface
479 func (s *ConcatIterator) Valid() bool {
480 return s.cur != nil && s.cur.Valid()
481 }
482
483 // Key implements y.Interface
484 func (s *ConcatIterator) Key() []byte {
485 return s.cur.Key()
486 }
487
488 // Value implements y.Interface
489 func (s *ConcatIterator) Value() y.ValueStruct {
490 return s.cur.Value()
491 }
492
493 // Seek brings us to element >= key if reversed is false. Otherwise, <= key.
494 func (s *ConcatIterator) Seek(key []byte) {
495 var idx int
496 if s.options&REVERSED == 0 {
497 idx = sort.Search(len(s.tables), func(i int) bool {
498 return y.CompareKeys(s.tables[i].Biggest(), key) >= 0
499 })
500 } else {
501 n := len(s.tables)
502 idx = n - 1 - sort.Search(n, func(i int) bool {
503 return y.CompareKeys(s.tables[n-1-i].Smallest(), key) <= 0
504 })
505 }
506 if idx >= len(s.tables) || idx < 0 {
507 s.setIdx(-1)
508 return
509 }
510 // For reversed=false, we know s.tables[i-1].Biggest() < key. Thus, the
511 // previous table cannot possibly contain key.
512 s.setIdx(idx)
513 s.cur.Seek(key)
514 }
515
516 // Next advances our concat iterator.
517 func (s *ConcatIterator) Next() {
518 s.cur.Next()
519 if s.cur.Valid() {
520 // Nothing to do. Just stay with the current table.
521 return
522 }
523 for { // In case there are empty tables.
524 if s.options&REVERSED == 0 {
525 s.setIdx(s.idx + 1)
526 } else {
527 s.setIdx(s.idx - 1)
528 }
529 if s.cur == nil {
530 // End of list. Valid will become false.
531 return
532 }
533 s.cur.Rewind()
534 if s.cur.Valid() {
535 break
536 }
537 }
538 }
539
540 // Close implements y.Interface.
541 func (s *ConcatIterator) Close() error {
542 for _, t := range s.tables {
543 // DeReference the tables while closing the iterator.
544 if err := t.DecrRef(); err != nil {
545 return err
546 }
547 }
548 for _, it := range s.iters {
549 if it == nil {
550 continue
551 }
552 if err := it.Close(); err != nil {
553 return y.Wrap(err, "ConcatIterator")
554 }
555 }
556 return nil
557 }
558