merge.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package badger
   7  
   8  import (
   9  	stderrors "errors"
  10  	"sync"
  11  	"time"
  12  
  13  	"github.com/dgraph-io/badger/v4/y"
  14  	"github.com/dgraph-io/ristretto/v2/z"
  15  )
  16  
  17  // MergeOperator represents a Badger merge operator.
  18  type MergeOperator struct {
  19  	sync.RWMutex
  20  	f      MergeFunc
  21  	db     *DB
  22  	key    []byte
  23  	closer *z.Closer
  24  }
  25  
  26  // MergeFunc accepts two byte slices, one representing an existing value, and
  27  // another representing a new value that needs to be ‘merged’ into it. MergeFunc
  28  // contains the logic to perform the ‘merge’ and return an updated value.
  29  // MergeFunc could perform operations like integer addition, list appends etc.
  30  // Note that the ordering of the operands is maintained.
  31  type MergeFunc func(existingVal, newVal []byte) []byte
  32  
  33  // GetMergeOperator creates a new MergeOperator for a given key and returns a
  34  // pointer to it. It also fires off a goroutine that performs a compaction using
  35  // the merge function that runs periodically, as specified by dur.
  36  func (db *DB) GetMergeOperator(key []byte,
  37  	f MergeFunc, dur time.Duration) *MergeOperator {
  38  	op := &MergeOperator{
  39  		f:      f,
  40  		db:     db,
  41  		key:    key,
  42  		closer: z.NewCloser(1),
  43  	}
  44  
  45  	go op.runCompactions(dur)
  46  	return op
  47  }
  48  
  49  var errNoMerge = stderrors.New("No need for merge")
  50  
  51  func (op *MergeOperator) iterateAndMerge() (newVal []byte, latest uint64, err error) {
  52  	txn := op.db.NewTransaction(false)
  53  	defer txn.Discard()
  54  	opt := DefaultIteratorOptions
  55  	opt.AllVersions = true
  56  	it := txn.NewKeyIterator(op.key, opt)
  57  	defer it.Close()
  58  
  59  	var numVersions int
  60  	for it.Rewind(); it.Valid(); it.Next() {
  61  		item := it.Item()
  62  		if item.IsDeletedOrExpired() {
  63  			break
  64  		}
  65  		numVersions++
  66  		if numVersions == 1 {
  67  			// This should be the newVal, considering this is the latest version.
  68  			newVal, err = item.ValueCopy(newVal)
  69  			if err != nil {
  70  				return nil, 0, err
  71  			}
  72  			latest = item.Version()
  73  		} else {
  74  			if err := item.Value(func(oldVal []byte) error {
  75  				// The merge should always be on the newVal considering it has the merge result of
  76  				// the latest version. The value read should be the oldVal.
  77  				newVal = op.f(oldVal, newVal)
  78  				return nil
  79  			}); err != nil {
  80  				return nil, 0, err
  81  			}
  82  		}
  83  		if item.DiscardEarlierVersions() {
  84  			break
  85  		}
  86  	}
  87  	if numVersions == 0 {
  88  		return nil, latest, ErrKeyNotFound
  89  	} else if numVersions == 1 {
  90  		return newVal, latest, errNoMerge
  91  	}
  92  	return newVal, latest, nil
  93  }
  94  
  95  func (op *MergeOperator) compact() error {
  96  	op.Lock()
  97  	defer op.Unlock()
  98  	val, version, err := op.iterateAndMerge()
  99  	if err == ErrKeyNotFound || err == errNoMerge {
 100  		return nil
 101  	} else if err != nil {
 102  		return err
 103  	}
 104  	entries := []*Entry{
 105  		{
 106  			Key:   y.KeyWithTs(op.key, version),
 107  			Value: val,
 108  			meta:  bitDiscardEarlierVersions,
 109  		},
 110  	}
 111  	// Write value back to the DB. It is important that we do not set the bitMergeEntry bit
 112  	// here. When compaction happens, all the older merged entries will be removed.
 113  	return op.db.batchSetAsync(entries, func(err error) {
 114  		if err != nil {
 115  			op.db.opt.Errorf("failed to insert the result of merge compaction: %s", err)
 116  		}
 117  	})
 118  }
 119  
 120  func (op *MergeOperator) runCompactions(dur time.Duration) {
 121  	ticker := time.NewTicker(dur)
 122  	defer op.closer.Done()
 123  	var stop bool
 124  	for {
 125  		select {
 126  		case <-op.closer.HasBeenClosed():
 127  			stop = true
 128  		case <-ticker.C: // wait for tick
 129  		}
 130  		if err := op.compact(); err != nil {
 131  			op.db.opt.Errorf("failure while running merge operation: %s", err)
 132  		}
 133  		if stop {
 134  			ticker.Stop()
 135  			break
 136  		}
 137  	}
 138  }
 139  
 140  // Add records a value in Badger which will eventually be merged by a background
 141  // routine into the values that were recorded by previous invocations to Add().
 142  func (op *MergeOperator) Add(val []byte) error {
 143  	return op.db.Update(func(txn *Txn) error {
 144  		return txn.SetEntry(NewEntry(op.key, val).withMergeBit())
 145  	})
 146  }
 147  
 148  // Get returns the latest value for the merge operator, which is derived by
 149  // applying the merge function to all the values added so far.
 150  //
 151  // If Add has not been called even once, Get will return ErrKeyNotFound.
 152  func (op *MergeOperator) Get() ([]byte, error) {
 153  	op.RLock()
 154  	defer op.RUnlock()
 155  	var existing []byte
 156  	err := op.db.View(func(txn *Txn) (err error) {
 157  		existing, _, err = op.iterateAndMerge()
 158  		return err
 159  	})
 160  	if err == errNoMerge {
 161  		return existing, nil
 162  	}
 163  	return existing, err
 164  }
 165  
 166  // Stop waits for any pending merge to complete and then stops the background
 167  // goroutine.
 168  func (op *MergeOperator) Stop() {
 169  	op.closer.SignalAndWait()
 170  }
 171