merge_iterator.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package table
   7  
   8  import (
   9  	"bytes"
  10  
  11  	"github.com/dgraph-io/badger/v4/y"
  12  )
  13  
  14  // MergeIterator merges multiple iterators.
  15  // NOTE: MergeIterator owns the array of iterators and is responsible for closing them.
  16  type MergeIterator struct {
  17  	left  node
  18  	right node
  19  	small *node
  20  
  21  	curKey  []byte
  22  	reverse bool
  23  }
  24  
  25  type node struct {
  26  	valid bool
  27  	key   []byte
  28  	iter  y.Iterator
  29  
  30  	// The two iterators are type asserted from `y.Iterator`, used to inline more function calls.
  31  	// Calling functions on concrete types is much faster (about 25-30%) than calling the
  32  	// interface's function.
  33  	merge  *MergeIterator
  34  	concat *ConcatIterator
  35  }
  36  
  37  func (n *node) setIterator(iter y.Iterator) {
  38  	n.iter = iter
  39  	// It's okay if the type assertion below fails and n.merge/n.concat are set to nil.
  40  	// We handle the nil values of merge and concat in all the methods.
  41  	n.merge, _ = iter.(*MergeIterator)
  42  	n.concat, _ = iter.(*ConcatIterator)
  43  }
  44  
  45  func (n *node) setKey() {
  46  	switch {
  47  	case n.merge != nil:
  48  		n.valid = n.merge.small.valid
  49  		if n.valid {
  50  			n.key = n.merge.small.key
  51  		}
  52  	case n.concat != nil:
  53  		n.valid = n.concat.Valid()
  54  		if n.valid {
  55  			n.key = n.concat.Key()
  56  		}
  57  	default:
  58  		n.valid = n.iter.Valid()
  59  		if n.valid {
  60  			n.key = n.iter.Key()
  61  		}
  62  	}
  63  }
  64  
  65  func (n *node) next() {
  66  	switch {
  67  	case n.merge != nil:
  68  		n.merge.Next()
  69  	case n.concat != nil:
  70  		n.concat.Next()
  71  	default:
  72  		n.iter.Next()
  73  	}
  74  	n.setKey()
  75  }
  76  
  77  func (n *node) rewind() {
  78  	n.iter.Rewind()
  79  	n.setKey()
  80  }
  81  
  82  func (n *node) seek(key []byte) {
  83  	n.iter.Seek(key)
  84  	n.setKey()
  85  }
  86  
  87  func (mi *MergeIterator) fix() {
  88  	if !mi.bigger().valid {
  89  		return
  90  	}
  91  	if !mi.small.valid {
  92  		mi.swapSmall()
  93  		return
  94  	}
  95  	cmp := y.CompareKeys(mi.small.key, mi.bigger().key)
  96  	switch {
  97  	case cmp == 0: // Both the keys are equal.
  98  		// In case of same keys, move the right iterator ahead.
  99  		mi.right.next()
 100  		if &mi.right == mi.small {
 101  			mi.swapSmall()
 102  		}
 103  		return
 104  	case cmp < 0: // Small is less than bigger().
 105  		if mi.reverse {
 106  			mi.swapSmall()
 107  		} else { //nolint:staticcheck
 108  			// we don't need to do anything. Small already points to the smallest.
 109  		}
 110  		return
 111  	default: // bigger() is less than small.
 112  		if mi.reverse {
 113  			// Do nothing since we're iterating in reverse. Small currently points to
 114  			// the bigger key and that's okay in reverse iteration.
 115  		} else {
 116  			mi.swapSmall()
 117  		}
 118  		return
 119  	}
 120  }
 121  
 122  func (mi *MergeIterator) bigger() *node {
 123  	if mi.small == &mi.left {
 124  		return &mi.right
 125  	}
 126  	return &mi.left
 127  }
 128  
 129  func (mi *MergeIterator) swapSmall() {
 130  	if mi.small == &mi.left {
 131  		mi.small = &mi.right
 132  		return
 133  	}
 134  	if mi.small == &mi.right {
 135  		mi.small = &mi.left
 136  		return
 137  	}
 138  }
 139  
 140  // Next returns the next element. If it is the same as the current key, ignore it.
 141  func (mi *MergeIterator) Next() {
 142  	for mi.Valid() {
 143  		if !bytes.Equal(mi.small.key, mi.curKey) {
 144  			break
 145  		}
 146  		mi.small.next()
 147  		mi.fix()
 148  	}
 149  	mi.setCurrent()
 150  }
 151  
 152  func (mi *MergeIterator) setCurrent() {
 153  	mi.curKey = append(mi.curKey[:0], mi.small.key...)
 154  }
 155  
 156  // Rewind seeks to first element (or last element for reverse iterator).
 157  func (mi *MergeIterator) Rewind() {
 158  	mi.left.rewind()
 159  	mi.right.rewind()
 160  	mi.fix()
 161  	mi.setCurrent()
 162  }
 163  
 164  // Seek brings us to element with key >= given key.
 165  func (mi *MergeIterator) Seek(key []byte) {
 166  	mi.left.seek(key)
 167  	mi.right.seek(key)
 168  	mi.fix()
 169  	mi.setCurrent()
 170  }
 171  
 172  // Valid returns whether the MergeIterator is at a valid element.
 173  func (mi *MergeIterator) Valid() bool {
 174  	return mi.small.valid
 175  }
 176  
 177  // Key returns the key associated with the current iterator.
 178  func (mi *MergeIterator) Key() []byte {
 179  	return mi.small.key
 180  }
 181  
 182  // Value returns the value associated with the iterator.
 183  func (mi *MergeIterator) Value() y.ValueStruct {
 184  	return mi.small.iter.Value()
 185  }
 186  
 187  // Close implements y.Iterator.
 188  func (mi *MergeIterator) Close() error {
 189  	err1 := mi.left.iter.Close()
 190  	err2 := mi.right.iter.Close()
 191  	if err1 != nil {
 192  		return y.Wrap(err1, "MergeIterator")
 193  	}
 194  	return y.Wrap(err2, "MergeIterator")
 195  }
 196  
 197  // NewMergeIterator creates a merge iterator.
 198  func NewMergeIterator(iters []y.Iterator, reverse bool) y.Iterator {
 199  	switch len(iters) {
 200  	case 0:
 201  		return nil
 202  	case 1:
 203  		return iters[0]
 204  	case 2:
 205  		mi := &MergeIterator{
 206  			reverse: reverse,
 207  		}
 208  		mi.left.setIterator(iters[0])
 209  		mi.right.setIterator(iters[1])
 210  		// Assign left iterator randomly. This will be fixed when user calls rewind/seek.
 211  		mi.small = &mi.left
 212  		return mi
 213  	}
 214  	mid := len(iters) / 2
 215  	return NewMergeIterator(
 216  		[]y.Iterator{
 217  			NewMergeIterator(iters[:mid], reverse),
 218  			NewMergeIterator(iters[mid:], reverse),
 219  		}, reverse)
 220  }
 221