merge.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package badger
7
8 import (
9 stderrors "errors"
10 "sync"
11 "time"
12
13 "github.com/dgraph-io/badger/v4/y"
14 "github.com/dgraph-io/ristretto/v2/z"
15 )
16
17 // MergeOperator represents a Badger merge operator.
18 type MergeOperator struct {
19 sync.RWMutex
20 f MergeFunc
21 db *DB
22 key []byte
23 closer *z.Closer
24 }
25
26 // MergeFunc accepts two byte slices, one representing an existing value, and
27 // another representing a new value that needs to be ‘merged’ into it. MergeFunc
28 // contains the logic to perform the ‘merge’ and return an updated value.
29 // MergeFunc could perform operations like integer addition, list appends etc.
30 // Note that the ordering of the operands is maintained.
31 type MergeFunc func(existingVal, newVal []byte) []byte
32
33 // GetMergeOperator creates a new MergeOperator for a given key and returns a
34 // pointer to it. It also fires off a goroutine that performs a compaction using
35 // the merge function that runs periodically, as specified by dur.
36 func (db *DB) GetMergeOperator(key []byte,
37 f MergeFunc, dur time.Duration) *MergeOperator {
38 op := &MergeOperator{
39 f: f,
40 db: db,
41 key: key,
42 closer: z.NewCloser(1),
43 }
44
45 go op.runCompactions(dur)
46 return op
47 }
48
49 var errNoMerge = stderrors.New("No need for merge")
50
51 func (op *MergeOperator) iterateAndMerge() (newVal []byte, latest uint64, err error) {
52 txn := op.db.NewTransaction(false)
53 defer txn.Discard()
54 opt := DefaultIteratorOptions
55 opt.AllVersions = true
56 it := txn.NewKeyIterator(op.key, opt)
57 defer it.Close()
58
59 var numVersions int
60 for it.Rewind(); it.Valid(); it.Next() {
61 item := it.Item()
62 if item.IsDeletedOrExpired() {
63 break
64 }
65 numVersions++
66 if numVersions == 1 {
67 // This should be the newVal, considering this is the latest version.
68 newVal, err = item.ValueCopy(newVal)
69 if err != nil {
70 return nil, 0, err
71 }
72 latest = item.Version()
73 } else {
74 if err := item.Value(func(oldVal []byte) error {
75 // The merge should always be on the newVal considering it has the merge result of
76 // the latest version. The value read should be the oldVal.
77 newVal = op.f(oldVal, newVal)
78 return nil
79 }); err != nil {
80 return nil, 0, err
81 }
82 }
83 if item.DiscardEarlierVersions() {
84 break
85 }
86 }
87 if numVersions == 0 {
88 return nil, latest, ErrKeyNotFound
89 } else if numVersions == 1 {
90 return newVal, latest, errNoMerge
91 }
92 return newVal, latest, nil
93 }
94
95 func (op *MergeOperator) compact() error {
96 op.Lock()
97 defer op.Unlock()
98 val, version, err := op.iterateAndMerge()
99 if err == ErrKeyNotFound || err == errNoMerge {
100 return nil
101 } else if err != nil {
102 return err
103 }
104 entries := []*Entry{
105 {
106 Key: y.KeyWithTs(op.key, version),
107 Value: val,
108 meta: bitDiscardEarlierVersions,
109 },
110 }
111 // Write value back to the DB. It is important that we do not set the bitMergeEntry bit
112 // here. When compaction happens, all the older merged entries will be removed.
113 return op.db.batchSetAsync(entries, func(err error) {
114 if err != nil {
115 op.db.opt.Errorf("failed to insert the result of merge compaction: %s", err)
116 }
117 })
118 }
119
120 func (op *MergeOperator) runCompactions(dur time.Duration) {
121 ticker := time.NewTicker(dur)
122 defer op.closer.Done()
123 var stop bool
124 for {
125 select {
126 case <-op.closer.HasBeenClosed():
127 stop = true
128 case <-ticker.C: // wait for tick
129 }
130 if err := op.compact(); err != nil {
131 op.db.opt.Errorf("failure while running merge operation: %s", err)
132 }
133 if stop {
134 ticker.Stop()
135 break
136 }
137 }
138 }
139
140 // Add records a value in Badger which will eventually be merged by a background
141 // routine into the values that were recorded by previous invocations to Add().
142 func (op *MergeOperator) Add(val []byte) error {
143 return op.db.Update(func(txn *Txn) error {
144 return txn.SetEntry(NewEntry(op.key, val).withMergeBit())
145 })
146 }
147
148 // Get returns the latest value for the merge operator, which is derived by
149 // applying the merge function to all the values added so far.
150 //
151 // If Add has not been called even once, Get will return ErrKeyNotFound.
152 func (op *MergeOperator) Get() ([]byte, error) {
153 op.RLock()
154 defer op.RUnlock()
155 var existing []byte
156 err := op.db.View(func(txn *Txn) (err error) {
157 existing, _, err = op.iterateAndMerge()
158 return err
159 })
160 if err == errNoMerge {
161 return existing, nil
162 }
163 return existing, err
164 }
165
166 // Stop waits for any pending merge to complete and then stops the background
167 // goroutine.
168 func (op *MergeOperator) Stop() {
169 op.closer.SignalAndWait()
170 }
171