iterator.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package badger
7
8 import (
9 "bytes"
10 "fmt"
11 "hash/crc32"
12 "math"
13 "sort"
14 "sync"
15 "time"
16
17 "github.com/dgraph-io/badger/v4/table"
18 "github.com/dgraph-io/badger/v4/y"
19 "github.com/dgraph-io/ristretto/v2/z"
20 )
21
22 type prefetchStatus uint8
23
24 const (
25 prefetched prefetchStatus = iota + 1
26 )
27
28 // Item is returned during iteration. Both the Key() and Value() output is only valid until
29 // iterator.Next() is called.
30 type Item struct {
31 key []byte
32 vptr []byte
33 val []byte
34 version uint64
35 expiresAt uint64
36
37 slice *y.Slice // Used only during prefetching.
38 next *Item
39 txn *Txn
40
41 err error
42 wg sync.WaitGroup
43 status prefetchStatus
44 meta byte // We need to store meta to know about bitValuePointer.
45 userMeta byte
46 }
47
48 // String returns a string representation of Item
49 func (item *Item) String() string {
50 return fmt.Sprintf("key=%q, version=%d, meta=%x", item.Key(), item.Version(), item.meta)
51 }
52
53 // Key returns the key.
54 //
55 // Key is only valid as long as item is valid, or transaction is valid. If you need to use it
56 // outside its validity, please use KeyCopy.
57 func (item *Item) Key() []byte {
58 return item.key
59 }
60
61 // KeyCopy returns a copy of the key of the item, writing it to dst slice.
62 // If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
63 // returned.
64 func (item *Item) KeyCopy(dst []byte) []byte {
65 return y.SafeCopy(dst, item.key)
66 }
67
68 // Version returns the commit timestamp of the item.
69 func (item *Item) Version() uint64 {
70 return item.version
71 }
72
73 // Value retrieves the value of the item from the value log.
74 //
75 // This method must be called within a transaction. Calling it outside a
76 // transaction is considered undefined behavior. If an iterator is being used,
77 // then Item.Value() is defined in the current iteration only, because items are
78 // reused.
79 //
80 // If you need to use a value outside a transaction, please use Item.ValueCopy
81 // instead, or copy it yourself. Value might change once discard or commit is called.
82 // Use ValueCopy if you want to do a Set after Get.
83 func (item *Item) Value(fn func(val []byte) error) error {
84 item.wg.Wait()
85 if item.status == prefetched {
86 if item.err == nil && fn != nil {
87 if err := fn(item.val); err != nil {
88 return err
89 }
90 }
91 return item.err
92 }
93 buf, cb, err := item.yieldItemValue()
94 defer runCallback(cb)
95 if err != nil {
96 return err
97 }
98 if fn != nil {
99 return fn(buf)
100 }
101 return nil
102 }
103
104 // ValueCopy returns a copy of the value of the item from the value log, writing it to dst slice.
105 // If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
106 // returned. Tip: It might make sense to reuse the returned slice as dst argument for the next call.
107 //
108 // This function is useful in long running iterate/update transactions to avoid a write deadlock.
109 // See Github issue: https://github.com/hypermodeinc/badger/issues/315
110 func (item *Item) ValueCopy(dst []byte) ([]byte, error) {
111 item.wg.Wait()
112 if item.status == prefetched {
113 return y.SafeCopy(dst, item.val), item.err
114 }
115 buf, cb, err := item.yieldItemValue()
116 defer runCallback(cb)
117 return y.SafeCopy(dst, buf), err
118 }
119
120 func (item *Item) hasValue() bool {
121 if item.meta == 0 && item.vptr == nil {
122 // key not found
123 return false
124 }
125 return true
126 }
127
128 // IsDeletedOrExpired returns true if item contains deleted or expired value.
129 func (item *Item) IsDeletedOrExpired() bool {
130 return isDeletedOrExpired(item.meta, item.expiresAt)
131 }
132
133 // DiscardEarlierVersions returns whether the item was created with the
134 // option to discard earlier versions of a key when multiple are available.
135 func (item *Item) DiscardEarlierVersions() bool {
136 return item.meta&bitDiscardEarlierVersions > 0
137 }
138
139 func (item *Item) yieldItemValue() ([]byte, func(), error) {
140 key := item.Key() // No need to copy.
141 if !item.hasValue() {
142 return nil, nil, nil
143 }
144
145 if item.slice == nil {
146 item.slice = new(y.Slice)
147 }
148
149 if (item.meta & bitValuePointer) == 0 {
150 val := item.slice.Resize(len(item.vptr))
151 copy(val, item.vptr)
152 return val, nil, nil
153 }
154
155 var vp valuePointer
156 vp.Decode(item.vptr)
157 db := item.txn.db
158 result, cb, err := db.vlog.Read(vp, item.slice)
159 if err != nil {
160 db.opt.Errorf("Unable to read: Key: %v, Version : %v, meta: %v, userMeta: %v"+
161 " Error: %v", key, item.version, item.meta, item.userMeta, err)
162 var txn *Txn
163 if db.opt.managedTxns {
164 txn = db.NewTransactionAt(math.MaxUint64, false)
165 } else {
166 txn = db.NewTransaction(false)
167 }
168 defer txn.Discard()
169
170 iopt := DefaultIteratorOptions
171 iopt.AllVersions = true
172 iopt.InternalAccess = true
173 iopt.PrefetchValues = false
174
175 it := txn.NewKeyIterator(item.Key(), iopt)
176 defer it.Close()
177 for it.Rewind(); it.Valid(); it.Next() {
178 item := it.Item()
179 var vp valuePointer
180 if item.meta&bitValuePointer > 0 {
181 vp.Decode(item.vptr)
182 }
183 db.opt.Errorf("Key: %v, Version : %v, meta: %v, userMeta: %v valuePointer: %+v",
184 item.Key(), item.version, item.meta, item.userMeta, vp)
185 }
186 }
187 // Don't return error if we cannot read the value. Just log the error.
188 return result, cb, nil
189 }
190
191 func runCallback(cb func()) {
192 if cb != nil {
193 cb()
194 }
195 }
196
197 func (item *Item) prefetchValue() {
198 val, cb, err := item.yieldItemValue()
199 defer runCallback(cb)
200
201 item.err = err
202 item.status = prefetched
203 if val == nil {
204 return
205 }
206 buf := item.slice.Resize(len(val))
207 copy(buf, val)
208 item.val = buf
209 }
210
211 // EstimatedSize returns the approximate size of the key-value pair.
212 //
213 // This can be called while iterating through a store to quickly estimate the
214 // size of a range of key-value pairs (without fetching the corresponding
215 // values).
216 func (item *Item) EstimatedSize() int64 {
217 if !item.hasValue() {
218 return 0
219 }
220 if (item.meta & bitValuePointer) == 0 {
221 return int64(len(item.key) + len(item.vptr))
222 }
223 var vp valuePointer
224 vp.Decode(item.vptr)
225 return int64(vp.Len) // includes key length.
226 }
227
228 // KeySize returns the size of the key.
229 // Exact size of the key is key + 8 bytes of timestamp
230 func (item *Item) KeySize() int64 {
231 return int64(len(item.key))
232 }
233
234 // ValueSize returns the approximate size of the value.
235 //
236 // This can be called to quickly estimate the size of a value without fetching
237 // it.
238 func (item *Item) ValueSize() int64 {
239 if !item.hasValue() {
240 return 0
241 }
242 if (item.meta & bitValuePointer) == 0 {
243 return int64(len(item.vptr))
244 }
245 var vp valuePointer
246 vp.Decode(item.vptr)
247
248 klen := int64(len(item.key) + 8) // 8 bytes for timestamp.
249 // 6 bytes are for the approximate length of the header. Since header is encoded in varint, we
250 // cannot find the exact length of header without fetching it.
251 return int64(vp.Len) - klen - 6 - crc32.Size
252 }
253
254 // UserMeta returns the userMeta set by the user. Typically, this byte, optionally set by the user
255 // is used to interpret the value.
256 func (item *Item) UserMeta() byte {
257 return item.userMeta
258 }
259
260 // ExpiresAt returns a Unix time value indicating when the item will be
261 // considered expired. 0 indicates that the item will never expire.
262 func (item *Item) ExpiresAt() uint64 {
263 return item.expiresAt
264 }
265
266 // TODO: Switch this to use linked list container in Go.
267 type list struct {
268 head *Item
269 tail *Item
270 }
271
272 func (l *list) push(i *Item) {
273 i.next = nil
274 if l.tail == nil {
275 l.head = i
276 l.tail = i
277 return
278 }
279 l.tail.next = i
280 l.tail = i
281 }
282
283 func (l *list) pop() *Item {
284 if l.head == nil {
285 return nil
286 }
287 i := l.head
288 if l.head == l.tail {
289 l.tail = nil
290 l.head = nil
291 } else {
292 l.head = i.next
293 }
294 i.next = nil
295 return i
296 }
297
298 // IteratorOptions is used to set options when iterating over Badger key-value
299 // stores.
300 //
301 // This package provides DefaultIteratorOptions which contains options that
302 // should work for most applications. Consider using that as a starting point
303 // before customizing it for your own needs.
304 type IteratorOptions struct {
305 // PrefetchSize is the number of KV pairs to prefetch while iterating.
306 // Valid only if PrefetchValues is true.
307 PrefetchSize int
308 // PrefetchValues Indicates whether we should prefetch values during
309 // iteration and store them.
310 PrefetchValues bool
311 Reverse bool // Direction of iteration. False is forward, true is backward.
312 AllVersions bool // Fetch all valid versions of the same key.
313 InternalAccess bool // Used to allow internal access to badger keys.
314
315 // The following option is used to narrow down the SSTables that iterator
316 // picks up. If Prefix is specified, only tables which could have this
317 // prefix are picked based on their range of keys.
318 prefixIsKey bool // If set, use the prefix for bloom filter lookup.
319 Prefix []byte // Only iterate over this given prefix.
320 SinceTs uint64 // Only read data that has version > SinceTs.
321 }
322
323 func (opt *IteratorOptions) compareToPrefix(key []byte) int {
324 // We should compare key without timestamp. For example key - a[TS] might be > "aa" prefix.
325 key = y.ParseKey(key)
326 if len(key) > len(opt.Prefix) {
327 key = key[:len(opt.Prefix)]
328 }
329 return bytes.Compare(key, opt.Prefix)
330 }
331
332 func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
333 // Ignore this table if its max version is less than the sinceTs.
334 if t.MaxVersion() < opt.SinceTs {
335 return false
336 }
337 if len(opt.Prefix) == 0 {
338 return true
339 }
340 if opt.compareToPrefix(t.Smallest()) > 0 {
341 return false
342 }
343 if opt.compareToPrefix(t.Biggest()) < 0 {
344 return false
345 }
346 // Bloom filter lookup would only work if opt.Prefix does NOT have the read
347 // timestamp as part of the key.
348 if opt.prefixIsKey && t.DoesNotHave(y.Hash(opt.Prefix)) {
349 return false
350 }
351 return true
352 }
353
354 // pickTables picks the necessary table for the iterator. This function also assumes
355 // that the tables are sorted in the right order.
356 func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
357 filterTables := func(tables []*table.Table) []*table.Table {
358 if opt.SinceTs > 0 {
359 tmp := tables[:0]
360 for _, t := range tables {
361 if t.MaxVersion() < opt.SinceTs {
362 continue
363 }
364 tmp = append(tmp, t)
365 }
366 tables = tmp
367 }
368 return tables
369 }
370
371 if len(opt.Prefix) == 0 {
372 out := make([]*table.Table, len(all))
373 copy(out, all)
374 return filterTables(out)
375 }
376 sIdx := sort.Search(len(all), func(i int) bool {
377 // table.Biggest >= opt.prefix
378 // if opt.Prefix < table.Biggest, then surely it is not in any of the preceding tables.
379 return opt.compareToPrefix(all[i].Biggest()) >= 0
380 })
381 if sIdx == len(all) {
382 // Not found.
383 return []*table.Table{}
384 }
385
386 filtered := all[sIdx:]
387 if !opt.prefixIsKey {
388 eIdx := sort.Search(len(filtered), func(i int) bool {
389 return opt.compareToPrefix(filtered[i].Smallest()) > 0
390 })
391 out := make([]*table.Table, len(filtered[:eIdx]))
392 copy(out, filtered[:eIdx])
393 return filterTables(out)
394 }
395
396 // opt.prefixIsKey == true. This code is optimizing for opt.prefixIsKey part.
397 var out []*table.Table
398 hash := y.Hash(opt.Prefix)
399 for _, t := range filtered {
400 // When we encounter the first table whose smallest key is higher than opt.Prefix, we can
401 // stop. This is an IMPORTANT optimization, just considering how often we call
402 // NewKeyIterator.
403 if opt.compareToPrefix(t.Smallest()) > 0 {
404 // if table.Smallest > opt.Prefix, then this and all tables after this can be ignored.
405 break
406 }
407 // opt.Prefix is actually the key. So, we can run bloom filter checks
408 // as well.
409 if t.DoesNotHave(hash) {
410 continue
411 }
412 out = append(out, t)
413 }
414 return filterTables(out)
415 }
416
417 // DefaultIteratorOptions contains default options when iterating over Badger key-value stores.
418 var DefaultIteratorOptions = IteratorOptions{
419 PrefetchValues: true,
420 PrefetchSize: 100,
421 Reverse: false,
422 AllVersions: false,
423 }
424
425 // Iterator helps iterating over the KV pairs in a lexicographically sorted order.
426 type Iterator struct {
427 iitr y.Iterator
428 txn *Txn
429 readTs uint64
430
431 opt IteratorOptions
432 item *Item
433 data list
434 waste list
435
436 lastKey []byte // Used to skip over multiple versions of the same key.
437
438 closed bool
439 scanned int // Used to estimate the size of data scanned by iterator.
440
441 // ThreadId is an optional value that can be set to identify which goroutine created
442 // the iterator. It can be used, for example, to uniquely identify each of the
443 // iterators created by the stream interface
444 ThreadId int
445
446 Alloc *z.Allocator
447 }
448
449 // NewIterator returns a new iterator. Depending upon the options, either only keys, or both
450 // key-value pairs would be fetched. The keys are returned in lexicographically sorted order.
451 // Using prefetch is recommended if you're doing a long running iteration, for performance.
452 //
453 // Multiple Iterators:
454 // For a read-only txn, multiple iterators can be running simultaneously. However, for a read-write
455 // txn, iterators have the nuance of being a snapshot of the writes for the transaction at the time
456 // iterator was created. If writes are performed after an iterator is created, then that iterator
457 // will not be able to see those writes. Only writes performed before an iterator was created can be
458 // viewed.
459 func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
460 if txn.discarded {
461 panic(ErrDiscardedTxn)
462 }
463 if txn.db.IsClosed() {
464 panic(ErrDBClosed)
465 }
466
467 y.NumIteratorsCreatedAdd(txn.db.opt.MetricsEnabled, 1)
468
469 // Keep track of the number of active iterators.
470 txn.numIterators.Add(1)
471
472 // TODO: If Prefix is set, only pick those memtables which have keys with the prefix.
473 tables, decr := txn.db.getMemTables()
474 defer decr()
475 txn.db.vlog.incrIteratorCount()
476 var iters []y.Iterator
477 if itr := txn.newPendingWritesIterator(opt.Reverse); itr != nil {
478 iters = append(iters, itr)
479 }
480 for i := 0; i < len(tables); i++ {
481 iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
482 }
483 iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.
484 res := &Iterator{
485 txn: txn,
486 iitr: table.NewMergeIterator(iters, opt.Reverse),
487 opt: opt,
488 readTs: txn.readTs,
489 }
490 return res
491 }
492
493 // NewKeyIterator is just like NewIterator, but allows the user to iterate over all versions of a
494 // single key. Internally, it sets the Prefix option in provided opt, and uses that prefix to
495 // additionally run bloom filter lookups before picking tables from the LSM tree.
496 func (txn *Txn) NewKeyIterator(key []byte, opt IteratorOptions) *Iterator {
497 if len(opt.Prefix) > 0 {
498 panic("opt.Prefix should be nil for NewKeyIterator.")
499 }
500 opt.Prefix = key // This key must be without the timestamp.
501 opt.prefixIsKey = true
502 opt.AllVersions = true
503 return txn.NewIterator(opt)
504 }
505
506 func (it *Iterator) newItem() *Item {
507 item := it.waste.pop()
508 if item == nil {
509 item = &Item{slice: new(y.Slice), txn: it.txn}
510 }
511 return item
512 }
513
514 // Item returns pointer to the current key-value pair.
515 // This item is only valid until it.Next() gets called.
516 func (it *Iterator) Item() *Item {
517 tx := it.txn
518 tx.addReadKey(it.item.Key())
519 return it.item
520 }
521
522 // Valid returns false when iteration is done.
523 func (it *Iterator) Valid() bool {
524 if it.item == nil {
525 return false
526 }
527 if it.opt.prefixIsKey {
528 return bytes.Equal(it.item.key, it.opt.Prefix)
529 }
530 return bytes.HasPrefix(it.item.key, it.opt.Prefix)
531 }
532
533 // ValidForPrefix returns false when iteration is done
534 // or when the current key is not prefixed by the specified prefix.
535 func (it *Iterator) ValidForPrefix(prefix []byte) bool {
536 return it.Valid() && bytes.HasPrefix(it.item.key, prefix)
537 }
538
539 // Close would close the iterator. It is important to call this when you're done with iteration.
540 func (it *Iterator) Close() {
541 if it.closed {
542 return
543 }
544 it.closed = true
545 if it.iitr == nil {
546 it.txn.numIterators.Add(-1)
547 return
548 }
549
550 it.iitr.Close()
551 // It is important to wait for the fill goroutines to finish. Otherwise, we might leave zombie
552 // goroutines behind, which are waiting to acquire file read locks after DB has been closed.
553 waitFor := func(l list) {
554 item := l.pop()
555 for item != nil {
556 item.wg.Wait()
557 item = l.pop()
558 }
559 }
560 waitFor(it.waste)
561 waitFor(it.data)
562
563 // TODO: We could handle this error.
564 _ = it.txn.db.vlog.decrIteratorCount()
565 it.txn.numIterators.Add(-1)
566 }
567
568 // Next would advance the iterator by one. Always check it.Valid() after a Next()
569 // to ensure you have access to a valid it.Item().
570 func (it *Iterator) Next() {
571 if it.iitr == nil {
572 return
573 }
574 // Reuse current item
575 it.item.wg.Wait() // Just cleaner to wait before pushing to avoid doing ref counting.
576 it.scanned += len(it.item.key) + len(it.item.val) + len(it.item.vptr) + 2
577 it.waste.push(it.item)
578
579 // Set next item to current
580 it.item = it.data.pop()
581 for it.iitr.Valid() && hasPrefix(it) {
582 if it.parseItem() {
583 // parseItem calls one extra next.
584 // This is used to deal with the complexity of reverse iteration.
585 break
586 }
587 }
588 }
589
590 func isDeletedOrExpired(meta byte, expiresAt uint64) bool {
591 if meta&bitDelete > 0 {
592 return true
593 }
594 if expiresAt == 0 {
595 return false
596 }
597 return expiresAt <= uint64(time.Now().Unix())
598 }
599
600 // parseItem is a complex function because it needs to handle both forward and reverse iteration
601 // implementation. We store keys such that their versions are sorted in descending order. This makes
602 // forward iteration efficient, but revese iteration complicated. This tradeoff is better because
603 // forward iteration is more common than reverse. It returns true, if either the iterator is invalid
604 // or it has pushed an item into it.data list, else it returns false.
605 //
606 // This function advances the iterator.
607 func (it *Iterator) parseItem() bool {
608 mi := it.iitr
609 key := mi.Key()
610
611 setItem := func(item *Item) {
612 if it.item == nil {
613 it.item = item
614 } else {
615 it.data.push(item)
616 }
617 }
618
619 isInternalKey := bytes.HasPrefix(key, badgerPrefix)
620 // Skip badger keys.
621 if !it.opt.InternalAccess && isInternalKey {
622 mi.Next()
623 return false
624 }
625
626 // Skip any versions which are beyond the readTs.
627 version := y.ParseTs(key)
628 // Ignore everything that is above the readTs and below or at the sinceTs.
629 if version > it.readTs || (it.opt.SinceTs > 0 && version <= it.opt.SinceTs) {
630 mi.Next()
631 return false
632 }
633
634 // Skip banned keys only if it does not have badger internal prefix.
635 if !isInternalKey && it.txn.db.isBanned(key) != nil {
636 mi.Next()
637 return false
638 }
639
640 if it.opt.AllVersions {
641 // Return deleted or expired values also, otherwise user can't figure out
642 // whether the key was deleted.
643 item := it.newItem()
644 it.fill(item)
645 setItem(item)
646 mi.Next()
647 return true
648 }
649
650 // If iterating in forward direction, then just checking the last key against current key would
651 // be sufficient.
652 if !it.opt.Reverse {
653 if y.SameKey(it.lastKey, key) {
654 mi.Next()
655 return false
656 }
657 // Only track in forward direction.
658 // We should update lastKey as soon as we find a different key in our snapshot.
659 // Consider keys: a 5, b 7 (del), b 5. When iterating, lastKey = a.
660 // Then we see b 7, which is deleted. If we don't store lastKey = b, we'll then return b 5,
661 // which is wrong. Therefore, update lastKey here.
662 it.lastKey = y.SafeCopy(it.lastKey, mi.Key())
663 }
664
665 FILL:
666 // If deleted, advance and return.
667 vs := mi.Value()
668 if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
669 mi.Next()
670 return false
671 }
672
673 item := it.newItem()
674 it.fill(item)
675 // fill item based on current cursor position. All Next calls have returned, so reaching here
676 // means no Next was called.
677
678 mi.Next() // Advance but no fill item yet.
679 if !it.opt.Reverse || !mi.Valid() { // Forward direction, or invalid.
680 setItem(item)
681 return true
682 }
683
684 // Reverse direction.
685 nextTs := y.ParseTs(mi.Key())
686 mik := y.ParseKey(mi.Key())
687 if nextTs <= it.readTs && bytes.Equal(mik, item.key) {
688 // This is a valid potential candidate.
689 goto FILL
690 }
691 // Ignore the next candidate. Return the current one.
692 setItem(item)
693 return true
694 }
695
696 func (it *Iterator) fill(item *Item) {
697 vs := it.iitr.Value()
698 item.meta = vs.Meta
699 item.userMeta = vs.UserMeta
700 item.expiresAt = vs.ExpiresAt
701
702 item.version = y.ParseTs(it.iitr.Key())
703 item.key = y.SafeCopy(item.key, y.ParseKey(it.iitr.Key()))
704
705 item.vptr = y.SafeCopy(item.vptr, vs.Value)
706 item.val = nil
707 if it.opt.PrefetchValues {
708 item.wg.Add(1)
709 go func() {
710 // FIXME we are not handling errors here.
711 item.prefetchValue()
712 item.wg.Done()
713 }()
714 }
715 }
716
717 func hasPrefix(it *Iterator) bool {
718 // We shouldn't check prefix in case the iterator is going in reverse. Since in reverse we expect
719 // people to append items to the end of prefix.
720 if !it.opt.Reverse && len(it.opt.Prefix) > 0 {
721 return bytes.HasPrefix(y.ParseKey(it.iitr.Key()), it.opt.Prefix)
722 }
723 return true
724 }
725
726 func (it *Iterator) prefetch() {
727 prefetchSize := 2
728 if it.opt.PrefetchValues && it.opt.PrefetchSize > 1 {
729 prefetchSize = it.opt.PrefetchSize
730 }
731
732 i := it.iitr
733 var count int
734 it.item = nil
735 for i.Valid() && hasPrefix(it) {
736 if !it.parseItem() {
737 continue
738 }
739 count++
740 if count == prefetchSize {
741 break
742 }
743 }
744 }
745
746 // Seek would seek to the provided key if present. If absent, it would seek to the next
747 // smallest key greater than the provided key if iterating in the forward direction.
748 // Behavior would be reversed if iterating backwards.
749 func (it *Iterator) Seek(key []byte) {
750 if it.iitr == nil {
751 return
752 }
753 if len(key) > 0 {
754 it.txn.addReadKey(key)
755 }
756 for i := it.data.pop(); i != nil; i = it.data.pop() {
757 i.wg.Wait()
758 it.waste.push(i)
759 }
760
761 it.lastKey = it.lastKey[:0]
762 if len(key) == 0 {
763 key = it.opt.Prefix
764 }
765 if len(key) == 0 {
766 it.iitr.Rewind()
767 it.prefetch()
768 return
769 }
770
771 if !it.opt.Reverse {
772 key = y.KeyWithTs(key, it.txn.readTs)
773 } else {
774 key = y.KeyWithTs(key, 0)
775 }
776 it.iitr.Seek(key)
777 it.prefetch()
778 }
779
780 // Rewind would rewind the iterator cursor all the way to zero-th position, which would be the
781 // smallest key if iterating forward, and largest if iterating backward. It does not keep track of
782 // whether the cursor started with a Seek().
783 func (it *Iterator) Rewind() {
784 it.Seek(nil)
785 }
786