iterator.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package badger
   7  
   8  import (
   9  	"bytes"
  10  	"fmt"
  11  	"hash/crc32"
  12  	"math"
  13  	"sort"
  14  	"sync"
  15  	"time"
  16  
  17  	"github.com/dgraph-io/badger/v4/table"
  18  	"github.com/dgraph-io/badger/v4/y"
  19  	"github.com/dgraph-io/ristretto/v2/z"
  20  )
  21  
  22  type prefetchStatus uint8
  23  
  24  const (
  25  	prefetched prefetchStatus = iota + 1
  26  )
  27  
  28  // Item is returned during iteration. Both the Key() and Value() output is only valid until
  29  // iterator.Next() is called.
  30  type Item struct {
  31  	key       []byte
  32  	vptr      []byte
  33  	val       []byte
  34  	version   uint64
  35  	expiresAt uint64
  36  
  37  	slice *y.Slice // Used only during prefetching.
  38  	next  *Item
  39  	txn   *Txn
  40  
  41  	err      error
  42  	wg       sync.WaitGroup
  43  	status   prefetchStatus
  44  	meta     byte // We need to store meta to know about bitValuePointer.
  45  	userMeta byte
  46  }
  47  
  48  // String returns a string representation of Item
  49  func (item *Item) String() string {
  50  	return fmt.Sprintf("key=%q, version=%d, meta=%x", item.Key(), item.Version(), item.meta)
  51  }
  52  
  53  // Key returns the key.
  54  //
  55  // Key is only valid as long as item is valid, or transaction is valid.  If you need to use it
  56  // outside its validity, please use KeyCopy.
  57  func (item *Item) Key() []byte {
  58  	return item.key
  59  }
  60  
  61  // KeyCopy returns a copy of the key of the item, writing it to dst slice.
  62  // If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
  63  // returned.
  64  func (item *Item) KeyCopy(dst []byte) []byte {
  65  	return y.SafeCopy(dst, item.key)
  66  }
  67  
  68  // Version returns the commit timestamp of the item.
  69  func (item *Item) Version() uint64 {
  70  	return item.version
  71  }
  72  
  73  // Value retrieves the value of the item from the value log.
  74  //
  75  // This method must be called within a transaction. Calling it outside a
  76  // transaction is considered undefined behavior. If an iterator is being used,
  77  // then Item.Value() is defined in the current iteration only, because items are
  78  // reused.
  79  //
  80  // If you need to use a value outside a transaction, please use Item.ValueCopy
  81  // instead, or copy it yourself. Value might change once discard or commit is called.
  82  // Use ValueCopy if you want to do a Set after Get.
  83  func (item *Item) Value(fn func(val []byte) error) error {
  84  	item.wg.Wait()
  85  	if item.status == prefetched {
  86  		if item.err == nil && fn != nil {
  87  			if err := fn(item.val); err != nil {
  88  				return err
  89  			}
  90  		}
  91  		return item.err
  92  	}
  93  	buf, cb, err := item.yieldItemValue()
  94  	defer runCallback(cb)
  95  	if err != nil {
  96  		return err
  97  	}
  98  	if fn != nil {
  99  		return fn(buf)
 100  	}
 101  	return nil
 102  }
 103  
 104  // ValueCopy returns a copy of the value of the item from the value log, writing it to dst slice.
 105  // If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
 106  // returned. Tip: It might make sense to reuse the returned slice as dst argument for the next call.
 107  //
 108  // This function is useful in long running iterate/update transactions to avoid a write deadlock.
 109  // See Github issue: https://github.com/hypermodeinc/badger/issues/315
 110  func (item *Item) ValueCopy(dst []byte) ([]byte, error) {
 111  	item.wg.Wait()
 112  	if item.status == prefetched {
 113  		return y.SafeCopy(dst, item.val), item.err
 114  	}
 115  	buf, cb, err := item.yieldItemValue()
 116  	defer runCallback(cb)
 117  	return y.SafeCopy(dst, buf), err
 118  }
 119  
 120  func (item *Item) hasValue() bool {
 121  	if item.meta == 0 && item.vptr == nil {
 122  		// key not found
 123  		return false
 124  	}
 125  	return true
 126  }
 127  
 128  // IsDeletedOrExpired returns true if item contains deleted or expired value.
 129  func (item *Item) IsDeletedOrExpired() bool {
 130  	return isDeletedOrExpired(item.meta, item.expiresAt)
 131  }
 132  
 133  // DiscardEarlierVersions returns whether the item was created with the
 134  // option to discard earlier versions of a key when multiple are available.
 135  func (item *Item) DiscardEarlierVersions() bool {
 136  	return item.meta&bitDiscardEarlierVersions > 0
 137  }
 138  
 139  func (item *Item) yieldItemValue() ([]byte, func(), error) {
 140  	key := item.Key() // No need to copy.
 141  	if !item.hasValue() {
 142  		return nil, nil, nil
 143  	}
 144  
 145  	if item.slice == nil {
 146  		item.slice = new(y.Slice)
 147  	}
 148  
 149  	if (item.meta & bitValuePointer) == 0 {
 150  		val := item.slice.Resize(len(item.vptr))
 151  		copy(val, item.vptr)
 152  		return val, nil, nil
 153  	}
 154  
 155  	var vp valuePointer
 156  	vp.Decode(item.vptr)
 157  	db := item.txn.db
 158  	result, cb, err := db.vlog.Read(vp, item.slice)
 159  	if err != nil {
 160  		db.opt.Errorf("Unable to read: Key: %v, Version : %v, meta: %v, userMeta: %v"+
 161  			" Error: %v", key, item.version, item.meta, item.userMeta, err)
 162  		var txn *Txn
 163  		if db.opt.managedTxns {
 164  			txn = db.NewTransactionAt(math.MaxUint64, false)
 165  		} else {
 166  			txn = db.NewTransaction(false)
 167  		}
 168  		defer txn.Discard()
 169  
 170  		iopt := DefaultIteratorOptions
 171  		iopt.AllVersions = true
 172  		iopt.InternalAccess = true
 173  		iopt.PrefetchValues = false
 174  
 175  		it := txn.NewKeyIterator(item.Key(), iopt)
 176  		defer it.Close()
 177  		for it.Rewind(); it.Valid(); it.Next() {
 178  			item := it.Item()
 179  			var vp valuePointer
 180  			if item.meta&bitValuePointer > 0 {
 181  				vp.Decode(item.vptr)
 182  			}
 183  			db.opt.Errorf("Key: %v, Version : %v, meta: %v, userMeta: %v valuePointer: %+v",
 184  				item.Key(), item.version, item.meta, item.userMeta, vp)
 185  		}
 186  	}
 187  	// Don't return error if we cannot read the value. Just log the error.
 188  	return result, cb, nil
 189  }
 190  
 191  func runCallback(cb func()) {
 192  	if cb != nil {
 193  		cb()
 194  	}
 195  }
 196  
 197  func (item *Item) prefetchValue() {
 198  	val, cb, err := item.yieldItemValue()
 199  	defer runCallback(cb)
 200  
 201  	item.err = err
 202  	item.status = prefetched
 203  	if val == nil {
 204  		return
 205  	}
 206  	buf := item.slice.Resize(len(val))
 207  	copy(buf, val)
 208  	item.val = buf
 209  }
 210  
 211  // EstimatedSize returns the approximate size of the key-value pair.
 212  //
 213  // This can be called while iterating through a store to quickly estimate the
 214  // size of a range of key-value pairs (without fetching the corresponding
 215  // values).
 216  func (item *Item) EstimatedSize() int64 {
 217  	if !item.hasValue() {
 218  		return 0
 219  	}
 220  	if (item.meta & bitValuePointer) == 0 {
 221  		return int64(len(item.key) + len(item.vptr))
 222  	}
 223  	var vp valuePointer
 224  	vp.Decode(item.vptr)
 225  	return int64(vp.Len) // includes key length.
 226  }
 227  
 228  // KeySize returns the size of the key.
 229  // Exact size of the key is key + 8 bytes of timestamp
 230  func (item *Item) KeySize() int64 {
 231  	return int64(len(item.key))
 232  }
 233  
 234  // ValueSize returns the approximate size of the value.
 235  //
 236  // This can be called to quickly estimate the size of a value without fetching
 237  // it.
 238  func (item *Item) ValueSize() int64 {
 239  	if !item.hasValue() {
 240  		return 0
 241  	}
 242  	if (item.meta & bitValuePointer) == 0 {
 243  		return int64(len(item.vptr))
 244  	}
 245  	var vp valuePointer
 246  	vp.Decode(item.vptr)
 247  
 248  	klen := int64(len(item.key) + 8) // 8 bytes for timestamp.
 249  	// 6 bytes are for the approximate length of the header. Since header is encoded in varint, we
 250  	// cannot find the exact length of header without fetching it.
 251  	return int64(vp.Len) - klen - 6 - crc32.Size
 252  }
 253  
 254  // UserMeta returns the userMeta set by the user. Typically, this byte, optionally set by the user
 255  // is used to interpret the value.
 256  func (item *Item) UserMeta() byte {
 257  	return item.userMeta
 258  }
 259  
 260  // ExpiresAt returns a Unix time value indicating when the item will be
 261  // considered expired. 0 indicates that the item will never expire.
 262  func (item *Item) ExpiresAt() uint64 {
 263  	return item.expiresAt
 264  }
 265  
 266  // TODO: Switch this to use linked list container in Go.
 267  type list struct {
 268  	head *Item
 269  	tail *Item
 270  }
 271  
 272  func (l *list) push(i *Item) {
 273  	i.next = nil
 274  	if l.tail == nil {
 275  		l.head = i
 276  		l.tail = i
 277  		return
 278  	}
 279  	l.tail.next = i
 280  	l.tail = i
 281  }
 282  
 283  func (l *list) pop() *Item {
 284  	if l.head == nil {
 285  		return nil
 286  	}
 287  	i := l.head
 288  	if l.head == l.tail {
 289  		l.tail = nil
 290  		l.head = nil
 291  	} else {
 292  		l.head = i.next
 293  	}
 294  	i.next = nil
 295  	return i
 296  }
 297  
 298  // IteratorOptions is used to set options when iterating over Badger key-value
 299  // stores.
 300  //
 301  // This package provides DefaultIteratorOptions which contains options that
 302  // should work for most applications. Consider using that as a starting point
 303  // before customizing it for your own needs.
 304  type IteratorOptions struct {
 305  	// PrefetchSize is the number of KV pairs to prefetch while iterating.
 306  	// Valid only if PrefetchValues is true.
 307  	PrefetchSize int
 308  	// PrefetchValues Indicates whether we should prefetch values during
 309  	// iteration and store them.
 310  	PrefetchValues bool
 311  	Reverse        bool // Direction of iteration. False is forward, true is backward.
 312  	AllVersions    bool // Fetch all valid versions of the same key.
 313  	InternalAccess bool // Used to allow internal access to badger keys.
 314  
 315  	// The following option is used to narrow down the SSTables that iterator
 316  	// picks up. If Prefix is specified, only tables which could have this
 317  	// prefix are picked based on their range of keys.
 318  	prefixIsKey bool   // If set, use the prefix for bloom filter lookup.
 319  	Prefix      []byte // Only iterate over this given prefix.
 320  	SinceTs     uint64 // Only read data that has version > SinceTs.
 321  }
 322  
 323  func (opt *IteratorOptions) compareToPrefix(key []byte) int {
 324  	// We should compare key without timestamp. For example key - a[TS] might be > "aa" prefix.
 325  	key = y.ParseKey(key)
 326  	if len(key) > len(opt.Prefix) {
 327  		key = key[:len(opt.Prefix)]
 328  	}
 329  	return bytes.Compare(key, opt.Prefix)
 330  }
 331  
 332  func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
 333  	// Ignore this table if its max version is less than the sinceTs.
 334  	if t.MaxVersion() < opt.SinceTs {
 335  		return false
 336  	}
 337  	if len(opt.Prefix) == 0 {
 338  		return true
 339  	}
 340  	if opt.compareToPrefix(t.Smallest()) > 0 {
 341  		return false
 342  	}
 343  	if opt.compareToPrefix(t.Biggest()) < 0 {
 344  		return false
 345  	}
 346  	// Bloom filter lookup would only work if opt.Prefix does NOT have the read
 347  	// timestamp as part of the key.
 348  	if opt.prefixIsKey && t.DoesNotHave(y.Hash(opt.Prefix)) {
 349  		return false
 350  	}
 351  	return true
 352  }
 353  
 354  // pickTables picks the necessary table for the iterator. This function also assumes
 355  // that the tables are sorted in the right order.
 356  func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
 357  	filterTables := func(tables []*table.Table) []*table.Table {
 358  		if opt.SinceTs > 0 {
 359  			tmp := tables[:0]
 360  			for _, t := range tables {
 361  				if t.MaxVersion() < opt.SinceTs {
 362  					continue
 363  				}
 364  				tmp = append(tmp, t)
 365  			}
 366  			tables = tmp
 367  		}
 368  		return tables
 369  	}
 370  
 371  	if len(opt.Prefix) == 0 {
 372  		out := make([]*table.Table, len(all))
 373  		copy(out, all)
 374  		return filterTables(out)
 375  	}
 376  	sIdx := sort.Search(len(all), func(i int) bool {
 377  		// table.Biggest >= opt.prefix
 378  		// if opt.Prefix < table.Biggest, then surely it is not in any of the preceding tables.
 379  		return opt.compareToPrefix(all[i].Biggest()) >= 0
 380  	})
 381  	if sIdx == len(all) {
 382  		// Not found.
 383  		return []*table.Table{}
 384  	}
 385  
 386  	filtered := all[sIdx:]
 387  	if !opt.prefixIsKey {
 388  		eIdx := sort.Search(len(filtered), func(i int) bool {
 389  			return opt.compareToPrefix(filtered[i].Smallest()) > 0
 390  		})
 391  		out := make([]*table.Table, len(filtered[:eIdx]))
 392  		copy(out, filtered[:eIdx])
 393  		return filterTables(out)
 394  	}
 395  
 396  	// opt.prefixIsKey == true. This code is optimizing for opt.prefixIsKey part.
 397  	var out []*table.Table
 398  	hash := y.Hash(opt.Prefix)
 399  	for _, t := range filtered {
 400  		// When we encounter the first table whose smallest key is higher than opt.Prefix, we can
 401  		// stop. This is an IMPORTANT optimization, just considering how often we call
 402  		// NewKeyIterator.
 403  		if opt.compareToPrefix(t.Smallest()) > 0 {
 404  			// if table.Smallest > opt.Prefix, then this and all tables after this can be ignored.
 405  			break
 406  		}
 407  		// opt.Prefix is actually the key. So, we can run bloom filter checks
 408  		// as well.
 409  		if t.DoesNotHave(hash) {
 410  			continue
 411  		}
 412  		out = append(out, t)
 413  	}
 414  	return filterTables(out)
 415  }
 416  
 417  // DefaultIteratorOptions contains default options when iterating over Badger key-value stores.
 418  var DefaultIteratorOptions = IteratorOptions{
 419  	PrefetchValues: true,
 420  	PrefetchSize:   100,
 421  	Reverse:        false,
 422  	AllVersions:    false,
 423  }
 424  
 425  // Iterator helps iterating over the KV pairs in a lexicographically sorted order.
 426  type Iterator struct {
 427  	iitr   y.Iterator
 428  	txn    *Txn
 429  	readTs uint64
 430  
 431  	opt   IteratorOptions
 432  	item  *Item
 433  	data  list
 434  	waste list
 435  
 436  	lastKey []byte // Used to skip over multiple versions of the same key.
 437  
 438  	closed  bool
 439  	scanned int // Used to estimate the size of data scanned by iterator.
 440  
 441  	// ThreadId is an optional value that can be set to identify which goroutine created
 442  	// the iterator. It can be used, for example, to uniquely identify each of the
 443  	// iterators created by the stream interface
 444  	ThreadId int
 445  
 446  	Alloc *z.Allocator
 447  }
 448  
 449  // NewIterator returns a new iterator. Depending upon the options, either only keys, or both
 450  // key-value pairs would be fetched. The keys are returned in lexicographically sorted order.
 451  // Using prefetch is recommended if you're doing a long running iteration, for performance.
 452  //
 453  // Multiple Iterators:
 454  // For a read-only txn, multiple iterators can be running simultaneously. However, for a read-write
 455  // txn, iterators have the nuance of being a snapshot of the writes for the transaction at the time
 456  // iterator was created. If writes are performed after an iterator is created, then that iterator
 457  // will not be able to see those writes. Only writes performed before an iterator was created can be
 458  // viewed.
 459  func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
 460  	if txn.discarded {
 461  		panic(ErrDiscardedTxn)
 462  	}
 463  	if txn.db.IsClosed() {
 464  		panic(ErrDBClosed)
 465  	}
 466  
 467  	y.NumIteratorsCreatedAdd(txn.db.opt.MetricsEnabled, 1)
 468  
 469  	// Keep track of the number of active iterators.
 470  	txn.numIterators.Add(1)
 471  
 472  	// TODO: If Prefix is set, only pick those memtables which have keys with the prefix.
 473  	tables, decr := txn.db.getMemTables()
 474  	defer decr()
 475  	txn.db.vlog.incrIteratorCount()
 476  	var iters []y.Iterator
 477  	if itr := txn.newPendingWritesIterator(opt.Reverse); itr != nil {
 478  		iters = append(iters, itr)
 479  	}
 480  	for i := 0; i < len(tables); i++ {
 481  		iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
 482  	}
 483  	iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.
 484  	res := &Iterator{
 485  		txn:    txn,
 486  		iitr:   table.NewMergeIterator(iters, opt.Reverse),
 487  		opt:    opt,
 488  		readTs: txn.readTs,
 489  	}
 490  	return res
 491  }
 492  
 493  // NewKeyIterator is just like NewIterator, but allows the user to iterate over all versions of a
 494  // single key. Internally, it sets the Prefix option in provided opt, and uses that prefix to
 495  // additionally run bloom filter lookups before picking tables from the LSM tree.
 496  func (txn *Txn) NewKeyIterator(key []byte, opt IteratorOptions) *Iterator {
 497  	if len(opt.Prefix) > 0 {
 498  		panic("opt.Prefix should be nil for NewKeyIterator.")
 499  	}
 500  	opt.Prefix = key // This key must be without the timestamp.
 501  	opt.prefixIsKey = true
 502  	opt.AllVersions = true
 503  	return txn.NewIterator(opt)
 504  }
 505  
 506  func (it *Iterator) newItem() *Item {
 507  	item := it.waste.pop()
 508  	if item == nil {
 509  		item = &Item{slice: new(y.Slice), txn: it.txn}
 510  	}
 511  	return item
 512  }
 513  
 514  // Item returns pointer to the current key-value pair.
 515  // This item is only valid until it.Next() gets called.
 516  func (it *Iterator) Item() *Item {
 517  	tx := it.txn
 518  	tx.addReadKey(it.item.Key())
 519  	return it.item
 520  }
 521  
 522  // Valid returns false when iteration is done.
 523  func (it *Iterator) Valid() bool {
 524  	if it.item == nil {
 525  		return false
 526  	}
 527  	if it.opt.prefixIsKey {
 528  		return bytes.Equal(it.item.key, it.opt.Prefix)
 529  	}
 530  	return bytes.HasPrefix(it.item.key, it.opt.Prefix)
 531  }
 532  
 533  // ValidForPrefix returns false when iteration is done
 534  // or when the current key is not prefixed by the specified prefix.
 535  func (it *Iterator) ValidForPrefix(prefix []byte) bool {
 536  	return it.Valid() && bytes.HasPrefix(it.item.key, prefix)
 537  }
 538  
 539  // Close would close the iterator. It is important to call this when you're done with iteration.
 540  func (it *Iterator) Close() {
 541  	if it.closed {
 542  		return
 543  	}
 544  	it.closed = true
 545  	if it.iitr == nil {
 546  		it.txn.numIterators.Add(-1)
 547  		return
 548  	}
 549  
 550  	it.iitr.Close()
 551  	// It is important to wait for the fill goroutines to finish. Otherwise, we might leave zombie
 552  	// goroutines behind, which are waiting to acquire file read locks after DB has been closed.
 553  	waitFor := func(l list) {
 554  		item := l.pop()
 555  		for item != nil {
 556  			item.wg.Wait()
 557  			item = l.pop()
 558  		}
 559  	}
 560  	waitFor(it.waste)
 561  	waitFor(it.data)
 562  
 563  	// TODO: We could handle this error.
 564  	_ = it.txn.db.vlog.decrIteratorCount()
 565  	it.txn.numIterators.Add(-1)
 566  }
 567  
 568  // Next would advance the iterator by one. Always check it.Valid() after a Next()
 569  // to ensure you have access to a valid it.Item().
 570  func (it *Iterator) Next() {
 571  	if it.iitr == nil {
 572  		return
 573  	}
 574  	// Reuse current item
 575  	it.item.wg.Wait() // Just cleaner to wait before pushing to avoid doing ref counting.
 576  	it.scanned += len(it.item.key) + len(it.item.val) + len(it.item.vptr) + 2
 577  	it.waste.push(it.item)
 578  
 579  	// Set next item to current
 580  	it.item = it.data.pop()
 581  	for it.iitr.Valid() && hasPrefix(it) {
 582  		if it.parseItem() {
 583  			// parseItem calls one extra next.
 584  			// This is used to deal with the complexity of reverse iteration.
 585  			break
 586  		}
 587  	}
 588  }
 589  
 590  func isDeletedOrExpired(meta byte, expiresAt uint64) bool {
 591  	if meta&bitDelete > 0 {
 592  		return true
 593  	}
 594  	if expiresAt == 0 {
 595  		return false
 596  	}
 597  	return expiresAt <= uint64(time.Now().Unix())
 598  }
 599  
 600  // parseItem is a complex function because it needs to handle both forward and reverse iteration
 601  // implementation. We store keys such that their versions are sorted in descending order. This makes
 602  // forward iteration efficient, but revese iteration complicated. This tradeoff is better because
 603  // forward iteration is more common than reverse. It returns true, if either the iterator is invalid
 604  // or it has pushed an item into it.data list, else it returns false.
 605  //
 606  // This function advances the iterator.
 607  func (it *Iterator) parseItem() bool {
 608  	mi := it.iitr
 609  	key := mi.Key()
 610  
 611  	setItem := func(item *Item) {
 612  		if it.item == nil {
 613  			it.item = item
 614  		} else {
 615  			it.data.push(item)
 616  		}
 617  	}
 618  
 619  	isInternalKey := bytes.HasPrefix(key, badgerPrefix)
 620  	// Skip badger keys.
 621  	if !it.opt.InternalAccess && isInternalKey {
 622  		mi.Next()
 623  		return false
 624  	}
 625  
 626  	// Skip any versions which are beyond the readTs.
 627  	version := y.ParseTs(key)
 628  	// Ignore everything that is above the readTs and below or at the sinceTs.
 629  	if version > it.readTs || (it.opt.SinceTs > 0 && version <= it.opt.SinceTs) {
 630  		mi.Next()
 631  		return false
 632  	}
 633  
 634  	// Skip banned keys only if it does not have badger internal prefix.
 635  	if !isInternalKey && it.txn.db.isBanned(key) != nil {
 636  		mi.Next()
 637  		return false
 638  	}
 639  
 640  	if it.opt.AllVersions {
 641  		// Return deleted or expired values also, otherwise user can't figure out
 642  		// whether the key was deleted.
 643  		item := it.newItem()
 644  		it.fill(item)
 645  		setItem(item)
 646  		mi.Next()
 647  		return true
 648  	}
 649  
 650  	// If iterating in forward direction, then just checking the last key against current key would
 651  	// be sufficient.
 652  	if !it.opt.Reverse {
 653  		if y.SameKey(it.lastKey, key) {
 654  			mi.Next()
 655  			return false
 656  		}
 657  		// Only track in forward direction.
 658  		// We should update lastKey as soon as we find a different key in our snapshot.
 659  		// Consider keys: a 5, b 7 (del), b 5. When iterating, lastKey = a.
 660  		// Then we see b 7, which is deleted. If we don't store lastKey = b, we'll then return b 5,
 661  		// which is wrong. Therefore, update lastKey here.
 662  		it.lastKey = y.SafeCopy(it.lastKey, mi.Key())
 663  	}
 664  
 665  FILL:
 666  	// If deleted, advance and return.
 667  	vs := mi.Value()
 668  	if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
 669  		mi.Next()
 670  		return false
 671  	}
 672  
 673  	item := it.newItem()
 674  	it.fill(item)
 675  	// fill item based on current cursor position. All Next calls have returned, so reaching here
 676  	// means no Next was called.
 677  
 678  	mi.Next()                           // Advance but no fill item yet.
 679  	if !it.opt.Reverse || !mi.Valid() { // Forward direction, or invalid.
 680  		setItem(item)
 681  		return true
 682  	}
 683  
 684  	// Reverse direction.
 685  	nextTs := y.ParseTs(mi.Key())
 686  	mik := y.ParseKey(mi.Key())
 687  	if nextTs <= it.readTs && bytes.Equal(mik, item.key) {
 688  		// This is a valid potential candidate.
 689  		goto FILL
 690  	}
 691  	// Ignore the next candidate. Return the current one.
 692  	setItem(item)
 693  	return true
 694  }
 695  
 696  func (it *Iterator) fill(item *Item) {
 697  	vs := it.iitr.Value()
 698  	item.meta = vs.Meta
 699  	item.userMeta = vs.UserMeta
 700  	item.expiresAt = vs.ExpiresAt
 701  
 702  	item.version = y.ParseTs(it.iitr.Key())
 703  	item.key = y.SafeCopy(item.key, y.ParseKey(it.iitr.Key()))
 704  
 705  	item.vptr = y.SafeCopy(item.vptr, vs.Value)
 706  	item.val = nil
 707  	if it.opt.PrefetchValues {
 708  		item.wg.Add(1)
 709  		go func() {
 710  			// FIXME we are not handling errors here.
 711  			item.prefetchValue()
 712  			item.wg.Done()
 713  		}()
 714  	}
 715  }
 716  
 717  func hasPrefix(it *Iterator) bool {
 718  	// We shouldn't check prefix in case the iterator is going in reverse. Since in reverse we expect
 719  	// people to append items to the end of prefix.
 720  	if !it.opt.Reverse && len(it.opt.Prefix) > 0 {
 721  		return bytes.HasPrefix(y.ParseKey(it.iitr.Key()), it.opt.Prefix)
 722  	}
 723  	return true
 724  }
 725  
 726  func (it *Iterator) prefetch() {
 727  	prefetchSize := 2
 728  	if it.opt.PrefetchValues && it.opt.PrefetchSize > 1 {
 729  		prefetchSize = it.opt.PrefetchSize
 730  	}
 731  
 732  	i := it.iitr
 733  	var count int
 734  	it.item = nil
 735  	for i.Valid() && hasPrefix(it) {
 736  		if !it.parseItem() {
 737  			continue
 738  		}
 739  		count++
 740  		if count == prefetchSize {
 741  			break
 742  		}
 743  	}
 744  }
 745  
 746  // Seek would seek to the provided key if present. If absent, it would seek to the next
 747  // smallest key greater than the provided key if iterating in the forward direction.
 748  // Behavior would be reversed if iterating backwards.
 749  func (it *Iterator) Seek(key []byte) {
 750  	if it.iitr == nil {
 751  		return
 752  	}
 753  	if len(key) > 0 {
 754  		it.txn.addReadKey(key)
 755  	}
 756  	for i := it.data.pop(); i != nil; i = it.data.pop() {
 757  		i.wg.Wait()
 758  		it.waste.push(i)
 759  	}
 760  
 761  	it.lastKey = it.lastKey[:0]
 762  	if len(key) == 0 {
 763  		key = it.opt.Prefix
 764  	}
 765  	if len(key) == 0 {
 766  		it.iitr.Rewind()
 767  		it.prefetch()
 768  		return
 769  	}
 770  
 771  	if !it.opt.Reverse {
 772  		key = y.KeyWithTs(key, it.txn.readTs)
 773  	} else {
 774  		key = y.KeyWithTs(key, 0)
 775  	}
 776  	it.iitr.Seek(key)
 777  	it.prefetch()
 778  }
 779  
 780  // Rewind would rewind the iterator cursor all the way to zero-th position, which would be the
 781  // smallest key if iterating forward, and largest if iterating backward. It does not keep track of
 782  // whether the cursor started with a Seek().
 783  func (it *Iterator) Rewind() {
 784  	it.Seek(nil)
 785  }
 786