iterator.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package table
   7  
   8  import (
   9  	"bytes"
  10  	"fmt"
  11  	"io"
  12  	"sort"
  13  
  14  	"github.com/dgraph-io/badger/v4/fb"
  15  	"github.com/dgraph-io/badger/v4/y"
  16  )
  17  
  18  type blockIterator struct {
  19  	data         []byte
  20  	idx          int // Idx of the entry inside a block
  21  	err          error
  22  	baseKey      []byte
  23  	key          []byte
  24  	val          []byte
  25  	entryOffsets []uint32
  26  	block        *Block
  27  
  28  	tableID uint64
  29  	blockID int
  30  	// prevOverlap stores the overlap of the previous key with the base key.
  31  	// This avoids unnecessary copy of base key when the overlap is same for multiple keys.
  32  	prevOverlap uint16
  33  }
  34  
  35  func (itr *blockIterator) setBlock(b *Block) {
  36  	// Decrement the ref for the old block. If the old block was compressed, we
  37  	// might be able to reuse it.
  38  	itr.block.decrRef()
  39  
  40  	itr.block = b
  41  	itr.err = nil
  42  	itr.idx = 0
  43  	itr.baseKey = itr.baseKey[:0]
  44  	itr.prevOverlap = 0
  45  	itr.key = itr.key[:0]
  46  	itr.val = itr.val[:0]
  47  	// Drop the index from the block. We don't need it anymore.
  48  	itr.data = b.data[:b.entriesIndexStart]
  49  	itr.entryOffsets = b.entryOffsets
  50  }
  51  
  52  // setIdx sets the iterator to the entry at index i and set it's key and value.
  53  func (itr *blockIterator) setIdx(i int) {
  54  	itr.idx = i
  55  	if i >= len(itr.entryOffsets) || i < 0 {
  56  		itr.err = io.EOF
  57  		return
  58  	}
  59  	itr.err = nil
  60  	startOffset := int(itr.entryOffsets[i])
  61  
  62  	// Set base key.
  63  	if len(itr.baseKey) == 0 {
  64  		var baseHeader header
  65  		baseHeader.Decode(itr.data)
  66  		itr.baseKey = itr.data[headerSize : headerSize+baseHeader.diff]
  67  	}
  68  
  69  	var endOffset int
  70  	// idx points to the last entry in the block.
  71  	if itr.idx+1 == len(itr.entryOffsets) {
  72  		endOffset = len(itr.data)
  73  	} else {
  74  		// idx point to some entry other than the last one in the block.
  75  		// EndOffset of the current entry is the start offset of the next entry.
  76  		endOffset = int(itr.entryOffsets[itr.idx+1])
  77  	}
  78  	defer func() {
  79  		if r := recover(); r != nil {
  80  			var debugBuf bytes.Buffer
  81  			fmt.Fprintf(&debugBuf, "==== Recovered====\n")
  82  			fmt.Fprintf(&debugBuf, "Table ID: %d\nBlock ID: %d\nEntry Idx: %d\nData len: %d\n"+
  83  				"StartOffset: %d\nEndOffset: %d\nEntryOffsets len: %d\nEntryOffsets: %v\n",
  84  				itr.tableID, itr.blockID, itr.idx, len(itr.data), startOffset, endOffset,
  85  				len(itr.entryOffsets), itr.entryOffsets)
  86  			panic(debugBuf.String())
  87  		}
  88  	}()
  89  
  90  	entryData := itr.data[startOffset:endOffset]
  91  	var h header
  92  	h.Decode(entryData)
  93  	// Header contains the length of key overlap and difference compared to the base key. If the key
  94  	// before this one had the same or better key overlap, we can avoid copying that part into
  95  	// itr.key. But, if the overlap was lesser, we could copy over just that portion.
  96  	if h.overlap > itr.prevOverlap {
  97  		itr.key = append(itr.key[:itr.prevOverlap], itr.baseKey[itr.prevOverlap:h.overlap]...)
  98  	}
  99  	itr.prevOverlap = h.overlap
 100  	valueOff := headerSize + h.diff
 101  	diffKey := entryData[headerSize:valueOff]
 102  	itr.key = append(itr.key[:h.overlap], diffKey...)
 103  	itr.val = entryData[valueOff:]
 104  }
 105  
 106  func (itr *blockIterator) Valid() bool {
 107  	return itr != nil && itr.err == nil
 108  }
 109  
 110  func (itr *blockIterator) Error() error {
 111  	return itr.err
 112  }
 113  
 114  func (itr *blockIterator) Close() {
 115  	itr.block.decrRef()
 116  }
 117  
 118  var (
 119  	origin  = 0
 120  	current = 1
 121  )
 122  
 123  // seek brings us to the first block element that is >= input key.
 124  func (itr *blockIterator) seek(key []byte, whence int) {
 125  	itr.err = nil
 126  	startIndex := 0 // This tells from which index we should start binary search.
 127  
 128  	switch whence {
 129  	case origin:
 130  		// We don't need to do anything. startIndex is already at 0
 131  	case current:
 132  		startIndex = itr.idx
 133  	}
 134  
 135  	foundEntryIdx := sort.Search(len(itr.entryOffsets), func(idx int) bool {
 136  		// If idx is less than start index then just return false.
 137  		if idx < startIndex {
 138  			return false
 139  		}
 140  		itr.setIdx(idx)
 141  		return y.CompareKeys(itr.key, key) >= 0
 142  	})
 143  	itr.setIdx(foundEntryIdx)
 144  }
 145  
 146  // seekToFirst brings us to the first element.
 147  func (itr *blockIterator) seekToFirst() {
 148  	itr.setIdx(0)
 149  }
 150  
 151  // seekToLast brings us to the last element.
 152  func (itr *blockIterator) seekToLast() {
 153  	itr.setIdx(len(itr.entryOffsets) - 1)
 154  }
 155  
 156  func (itr *blockIterator) next() {
 157  	itr.setIdx(itr.idx + 1)
 158  }
 159  
 160  func (itr *blockIterator) prev() {
 161  	itr.setIdx(itr.idx - 1)
 162  }
 163  
 164  // Iterator is an iterator for a Table.
 165  type Iterator struct {
 166  	t    *Table
 167  	bpos int
 168  	bi   blockIterator
 169  	err  error
 170  
 171  	// Internally, Iterator is bidirectional. However, we only expose the
 172  	// unidirectional functionality for now.
 173  	opt int // Valid options are REVERSED and NOCACHE.
 174  }
 175  
 176  // NewIterator returns a new iterator of the Table
 177  func (t *Table) NewIterator(opt int) *Iterator {
 178  	t.IncrRef() // Important.
 179  	ti := &Iterator{t: t, opt: opt}
 180  	return ti
 181  }
 182  
 183  // Close closes the iterator (and it must be called).
 184  func (itr *Iterator) Close() error {
 185  	itr.bi.Close()
 186  	return itr.t.DecrRef()
 187  }
 188  
 189  func (itr *Iterator) reset() {
 190  	itr.bpos = 0
 191  	itr.err = nil
 192  }
 193  
 194  // Valid follows the y.Iterator interface
 195  func (itr *Iterator) Valid() bool {
 196  	return itr.err == nil
 197  }
 198  
 199  func (itr *Iterator) useCache() bool {
 200  	return itr.opt&NOCACHE == 0
 201  }
 202  
 203  func (itr *Iterator) seekToFirst() {
 204  	numBlocks := itr.t.offsetsLength()
 205  	if numBlocks == 0 {
 206  		itr.err = io.EOF
 207  		return
 208  	}
 209  	itr.bpos = 0
 210  	block, err := itr.t.block(itr.bpos, itr.useCache())
 211  	if err != nil {
 212  		itr.err = err
 213  		return
 214  	}
 215  	itr.bi.tableID = itr.t.id
 216  	itr.bi.blockID = itr.bpos
 217  	itr.bi.setBlock(block)
 218  	itr.bi.seekToFirst()
 219  	itr.err = itr.bi.Error()
 220  }
 221  
 222  func (itr *Iterator) seekToLast() {
 223  	numBlocks := itr.t.offsetsLength()
 224  	if numBlocks == 0 {
 225  		itr.err = io.EOF
 226  		return
 227  	}
 228  	itr.bpos = numBlocks - 1
 229  	block, err := itr.t.block(itr.bpos, itr.useCache())
 230  	if err != nil {
 231  		itr.err = err
 232  		return
 233  	}
 234  	itr.bi.tableID = itr.t.id
 235  	itr.bi.blockID = itr.bpos
 236  	itr.bi.setBlock(block)
 237  	itr.bi.seekToLast()
 238  	itr.err = itr.bi.Error()
 239  }
 240  
 241  func (itr *Iterator) seekHelper(blockIdx int, key []byte) {
 242  	itr.bpos = blockIdx
 243  	block, err := itr.t.block(blockIdx, itr.useCache())
 244  	if err != nil {
 245  		itr.err = err
 246  		return
 247  	}
 248  	itr.bi.tableID = itr.t.id
 249  	itr.bi.blockID = itr.bpos
 250  	itr.bi.setBlock(block)
 251  	itr.bi.seek(key, origin)
 252  	itr.err = itr.bi.Error()
 253  }
 254  
 255  // seekFrom brings us to a key that is >= input key.
 256  func (itr *Iterator) seekFrom(key []byte, whence int) {
 257  	itr.err = nil
 258  	switch whence {
 259  	case origin:
 260  		itr.reset()
 261  	case current:
 262  	}
 263  
 264  	var ko fb.BlockOffset
 265  	idx := sort.Search(itr.t.offsetsLength(), func(idx int) bool {
 266  		// Offsets should never return false since we're iterating within the OffsetsLength.
 267  		y.AssertTrue(itr.t.offsets(&ko, idx))
 268  		return y.CompareKeys(ko.KeyBytes(), key) > 0
 269  	})
 270  	if idx == 0 {
 271  		// The smallest key in our table is already strictly > key. We can return that.
 272  		// This is like a SeekToFirst.
 273  		itr.seekHelper(0, key)
 274  		return
 275  	}
 276  
 277  	// block[idx].smallest is > key.
 278  	// Since idx>0, we know block[idx-1].smallest is <= key.
 279  	// There are two cases.
 280  	// 1) Everything in block[idx-1] is strictly < key. In this case, we should go to the first
 281  	//    element of block[idx].
 282  	// 2) Some element in block[idx-1] is >= key. We should go to that element.
 283  	itr.seekHelper(idx-1, key)
 284  	if itr.err == io.EOF {
 285  		// Case 1. Need to visit block[idx].
 286  		if idx == itr.t.offsetsLength() {
 287  			// If idx == len(itr.t.blockIndex), then input key is greater than ANY element of table.
 288  			// There's nothing we can do. Valid() should return false as we seek to end of table.
 289  			return
 290  		}
 291  		// Since block[idx].smallest is > key. This is essentially a block[idx].SeekToFirst.
 292  		itr.seekHelper(idx, key)
 293  	}
 294  	// Case 2: No need to do anything. We already did the seek in block[idx-1].
 295  }
 296  
 297  // seek will reset iterator and seek to >= key.
 298  func (itr *Iterator) seek(key []byte) {
 299  	itr.seekFrom(key, origin)
 300  }
 301  
 302  // seekForPrev will reset iterator and seek to <= key.
 303  func (itr *Iterator) seekForPrev(key []byte) {
 304  	// TODO: Optimize this. We shouldn't have to take a Prev step.
 305  	itr.seekFrom(key, origin)
 306  	if !bytes.Equal(itr.Key(), key) {
 307  		itr.prev()
 308  	}
 309  }
 310  
 311  func (itr *Iterator) next() {
 312  	itr.err = nil
 313  
 314  	if itr.bpos >= itr.t.offsetsLength() {
 315  		itr.err = io.EOF
 316  		return
 317  	}
 318  
 319  	if len(itr.bi.data) == 0 {
 320  		block, err := itr.t.block(itr.bpos, itr.useCache())
 321  		if err != nil {
 322  			itr.err = err
 323  			return
 324  		}
 325  		itr.bi.tableID = itr.t.id
 326  		itr.bi.blockID = itr.bpos
 327  		itr.bi.setBlock(block)
 328  		itr.bi.seekToFirst()
 329  		itr.err = itr.bi.Error()
 330  		return
 331  	}
 332  
 333  	itr.bi.next()
 334  	if !itr.bi.Valid() {
 335  		itr.bpos++
 336  		itr.bi.data = nil
 337  		itr.next()
 338  		return
 339  	}
 340  }
 341  
 342  func (itr *Iterator) prev() {
 343  	itr.err = nil
 344  	if itr.bpos < 0 {
 345  		itr.err = io.EOF
 346  		return
 347  	}
 348  
 349  	if len(itr.bi.data) == 0 {
 350  		block, err := itr.t.block(itr.bpos, itr.useCache())
 351  		if err != nil {
 352  			itr.err = err
 353  			return
 354  		}
 355  		itr.bi.tableID = itr.t.id
 356  		itr.bi.blockID = itr.bpos
 357  		itr.bi.setBlock(block)
 358  		itr.bi.seekToLast()
 359  		itr.err = itr.bi.Error()
 360  		return
 361  	}
 362  
 363  	itr.bi.prev()
 364  	if !itr.bi.Valid() {
 365  		itr.bpos--
 366  		itr.bi.data = nil
 367  		itr.prev()
 368  		return
 369  	}
 370  }
 371  
 372  // Key follows the y.Iterator interface.
 373  // Returns the key with timestamp.
 374  func (itr *Iterator) Key() []byte {
 375  	return itr.bi.key
 376  }
 377  
 378  // Value follows the y.Iterator interface
 379  func (itr *Iterator) Value() (ret y.ValueStruct) {
 380  	ret.Decode(itr.bi.val)
 381  	return
 382  }
 383  
 384  // ValueCopy copies the current value and returns it as decoded
 385  // ValueStruct.
 386  func (itr *Iterator) ValueCopy() (ret y.ValueStruct) {
 387  	dst := y.Copy(itr.bi.val)
 388  	ret.Decode(dst)
 389  	return
 390  }
 391  
 392  // Next follows the y.Iterator interface
 393  func (itr *Iterator) Next() {
 394  	if itr.opt&REVERSED == 0 {
 395  		itr.next()
 396  	} else {
 397  		itr.prev()
 398  	}
 399  }
 400  
 401  // Rewind follows the y.Iterator interface
 402  func (itr *Iterator) Rewind() {
 403  	if itr.opt&REVERSED == 0 {
 404  		itr.seekToFirst()
 405  	} else {
 406  		itr.seekToLast()
 407  	}
 408  }
 409  
 410  // Seek follows the y.Iterator interface
 411  func (itr *Iterator) Seek(key []byte) {
 412  	if itr.opt&REVERSED == 0 {
 413  		itr.seek(key)
 414  	} else {
 415  		itr.seekForPrev(key)
 416  	}
 417  }
 418  
 419  var (
 420  	REVERSED int = 2
 421  	NOCACHE  int = 4
 422  )
 423  
 424  // ConcatIterator concatenates the sequences defined by several iterators.  (It only works with
 425  // TableIterators, probably just because it's faster to not be so generic.)
 426  type ConcatIterator struct {
 427  	idx     int // Which iterator is active now.
 428  	cur     *Iterator
 429  	iters   []*Iterator // Corresponds to tables.
 430  	tables  []*Table    // Disregarding reversed, this is in ascending order.
 431  	options int         // Valid options are REVERSED and NOCACHE.
 432  }
 433  
 434  // NewConcatIterator creates a new concatenated iterator
 435  func NewConcatIterator(tbls []*Table, opt int) *ConcatIterator {
 436  	iters := make([]*Iterator, len(tbls))
 437  	for i := 0; i < len(tbls); i++ {
 438  		// Increment the reference count. Since, we're not creating the iterator right now.
 439  		// Here, We'll hold the reference of the tables, till the lifecycle of the iterator.
 440  		tbls[i].IncrRef()
 441  
 442  		// Save cycles by not initializing the iterators until needed.
 443  		// iters[i] = tbls[i].NewIterator(reversed)
 444  	}
 445  	return &ConcatIterator{
 446  		options: opt,
 447  		iters:   iters,
 448  		tables:  tbls,
 449  		idx:     -1, // Not really necessary because s.it.Valid()=false, but good to have.
 450  	}
 451  }
 452  
 453  func (s *ConcatIterator) setIdx(idx int) {
 454  	s.idx = idx
 455  	if idx < 0 || idx >= len(s.iters) {
 456  		s.cur = nil
 457  		return
 458  	}
 459  	if s.iters[idx] == nil {
 460  		s.iters[idx] = s.tables[idx].NewIterator(s.options)
 461  	}
 462  	s.cur = s.iters[s.idx]
 463  }
 464  
 465  // Rewind implements y.Interface
 466  func (s *ConcatIterator) Rewind() {
 467  	if len(s.iters) == 0 {
 468  		return
 469  	}
 470  	if s.options&REVERSED == 0 {
 471  		s.setIdx(0)
 472  	} else {
 473  		s.setIdx(len(s.iters) - 1)
 474  	}
 475  	s.cur.Rewind()
 476  }
 477  
 478  // Valid implements y.Interface
 479  func (s *ConcatIterator) Valid() bool {
 480  	return s.cur != nil && s.cur.Valid()
 481  }
 482  
 483  // Key implements y.Interface
 484  func (s *ConcatIterator) Key() []byte {
 485  	return s.cur.Key()
 486  }
 487  
 488  // Value implements y.Interface
 489  func (s *ConcatIterator) Value() y.ValueStruct {
 490  	return s.cur.Value()
 491  }
 492  
 493  // Seek brings us to element >= key if reversed is false. Otherwise, <= key.
 494  func (s *ConcatIterator) Seek(key []byte) {
 495  	var idx int
 496  	if s.options&REVERSED == 0 {
 497  		idx = sort.Search(len(s.tables), func(i int) bool {
 498  			return y.CompareKeys(s.tables[i].Biggest(), key) >= 0
 499  		})
 500  	} else {
 501  		n := len(s.tables)
 502  		idx = n - 1 - sort.Search(n, func(i int) bool {
 503  			return y.CompareKeys(s.tables[n-1-i].Smallest(), key) <= 0
 504  		})
 505  	}
 506  	if idx >= len(s.tables) || idx < 0 {
 507  		s.setIdx(-1)
 508  		return
 509  	}
 510  	// For reversed=false, we know s.tables[i-1].Biggest() < key. Thus, the
 511  	// previous table cannot possibly contain key.
 512  	s.setIdx(idx)
 513  	s.cur.Seek(key)
 514  }
 515  
 516  // Next advances our concat iterator.
 517  func (s *ConcatIterator) Next() {
 518  	s.cur.Next()
 519  	if s.cur.Valid() {
 520  		// Nothing to do. Just stay with the current table.
 521  		return
 522  	}
 523  	for { // In case there are empty tables.
 524  		if s.options&REVERSED == 0 {
 525  			s.setIdx(s.idx + 1)
 526  		} else {
 527  			s.setIdx(s.idx - 1)
 528  		}
 529  		if s.cur == nil {
 530  			// End of list. Valid will become false.
 531  			return
 532  		}
 533  		s.cur.Rewind()
 534  		if s.cur.Valid() {
 535  			break
 536  		}
 537  	}
 538  }
 539  
 540  // Close implements y.Interface.
 541  func (s *ConcatIterator) Close() error {
 542  	for _, t := range s.tables {
 543  		// DeReference the tables while closing the iterator.
 544  		if err := t.DecrRef(); err != nil {
 545  			return err
 546  		}
 547  	}
 548  	for _, it := range s.iters {
 549  		if it == nil {
 550  			continue
 551  		}
 552  		if err := it.Close(); err != nil {
 553  			return y.Wrap(err, "ConcatIterator")
 554  		}
 555  	}
 556  	return nil
 557  }
 558