stream.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package badger
7
8 import (
9 "bytes"
10 "context"
11 "sort"
12 "sync"
13 "sync/atomic"
14 "time"
15
16 humanize "github.com/dustin/go-humanize"
17 "google.golang.org/protobuf/proto"
18
19 "github.com/dgraph-io/badger/v4/pb"
20 "github.com/dgraph-io/badger/v4/y"
21 "github.com/dgraph-io/ristretto/v2/z"
22 )
23
24 const batchSize = 16 << 20 // 16 MB
25
26 // maxStreamSize is the maximum allowed size of a stream batch. This is a soft limit
27 // as a single list that is still over the limit will have to be sent as is since it
28 // cannot be split further. This limit prevents the framework from creating batches
29 // so big that sending them causes issues (e.g running into the max size gRPC limit).
30 var maxStreamSize = uint64(100 << 20) // 100MB
31
32 // Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up
33 // key-values, batch them up and call Send. Stream does concurrent iteration over many smaller key
34 // ranges. It does NOT send keys in lexicographical sorted order. To get keys in sorted
35 // order, use Iterator.
36 type Stream struct {
37 // Prefix to only iterate over certain range of keys. If set to nil (default), Stream would
38 // iterate over the entire DB.
39 Prefix []byte
40
41 // Number of goroutines to use for iterating over key ranges. Defaults to 8.
42 NumGo int
43
44 // Badger would produce log entries in Infof to indicate the progress of Stream. LogPrefix can
45 // be used to help differentiate them from other activities. Default is "Badger.Stream".
46 LogPrefix string
47
48 // ChooseKey is invoked each time a new key is encountered. Note that this is not called
49 // on every version of the value, only the first encountered version (i.e. the highest version
50 // of the value a key has). ChooseKey can be left nil to select all keys.
51 //
52 // Note: Calls to ChooseKey are concurrent.
53 ChooseKey func(item *Item) bool
54
55 // MaxSize is the maximum allowed size of a stream batch. This is a soft limit
56 // as a single list that is still over the limit will have to be sent as is since it
57 // cannot be split further. This limit prevents the framework from creating batches
58 // so big that sending them causes issues (e.g running into the max size gRPC limit).
59 // If necessary, set it up before the Stream starts synchronisation
60 // This is not a concurrency-safe setting
61 MaxSize uint64
62
63 // KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It
64 // is upto the caller to iterate over the versions and generate zero, one or more KVs. It
65 // is expected that the user would advance the iterator to go through the versions of the
66 // values. However, the user MUST immediately return from this function on the first encounter
67 // with a mismatching key. See example usage in ToList function. Can be left nil to use ToList
68 // function by default.
69 //
70 // KeyToList has access to z.Allocator accessible via stream.Allocator(itr.ThreadId). This
71 // allocator can be used to allocate KVs, to decrease the memory pressure on Go GC. Stream
72 // framework takes care of releasing those resources after calling Send. AllocRef does
73 // NOT need to be set in the returned KVList, as Stream framework would ignore that field,
74 // instead using the allocator assigned to that thread id.
75 //
76 // Note: Calls to KeyToList are concurrent.
77 KeyToList func(key []byte, itr *Iterator) (*pb.KVList, error)
78 // UseKeyToListWithThreadId is used to indicate that KeyToListWithThreadId should be used
79 // instead of KeyToList. This is a new api that can be used to figure out parallelism
80 // of the stream. Each threadId would be run serially. KeyToList being concurrent makes you
81 // take care of concurrency in KeyToList. Here threadId could be used to do some things serially.
82 // Once a thread finishes FinishThread() would be called.
83 UseKeyToListWithThreadId bool
84 KeyToListWithThreadId func(key []byte, itr *Iterator, threadId int) (*pb.KVList, error)
85 FinishThread func(threadId int) (*pb.KVList, error)
86
87 // This is the method where Stream sends the final output. All calls to Send are done by a
88 // single goroutine, i.e. logic within Send method can expect single threaded execution.
89 Send func(buf *z.Buffer) error
90
91 // Read data above the sinceTs. All keys with version =< sinceTs will be ignored.
92 SinceTs uint64
93 readTs uint64
94 db *DB
95 rangeCh chan keyRange
96 kvChan chan *z.Buffer
97 nextStreamId atomic.Uint32
98 doneMarkers bool
99 scanned atomic.Uint64 // used to estimate the ETA for data scan.
100 numProducers atomic.Int32
101 }
102
103 // SendDoneMarkers when true would send out done markers on the stream. False by default.
104 func (st *Stream) SendDoneMarkers(done bool) {
105 st.doneMarkers = done
106 }
107
108 // ToList is a default implementation of KeyToList. It picks up all valid versions of the key,
109 // skipping over deleted or expired keys.
110 func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
111 a := itr.Alloc
112 ka := a.Copy(key)
113
114 list := &pb.KVList{}
115 for ; itr.Valid(); itr.Next() {
116 item := itr.Item()
117 if item.IsDeletedOrExpired() {
118 break
119 }
120 if !bytes.Equal(key, item.Key()) {
121 // Break out on the first encounter with another key.
122 break
123 }
124
125 kv := y.NewKV(a)
126 kv.Key = ka
127
128 if err := item.Value(func(val []byte) error {
129 kv.Value = a.Copy(val)
130 return nil
131
132 }); err != nil {
133 return nil, err
134 }
135 kv.Version = item.Version()
136 kv.ExpiresAt = item.ExpiresAt()
137 kv.UserMeta = a.Copy([]byte{item.UserMeta()})
138
139 list.Kv = append(list.Kv, kv)
140 if st.db.opt.NumVersionsToKeep == 1 {
141 break
142 }
143
144 if item.DiscardEarlierVersions() {
145 break
146 }
147 }
148 return list, nil
149 }
150
151 // keyRange is [start, end), including start, excluding end. Do ensure that the start,
152 // end byte slices are owned by keyRange struct.
153 func (st *Stream) produceRanges(ctx context.Context) {
154 ranges := st.db.Ranges(st.Prefix, st.NumGo)
155 y.AssertTrue(len(ranges) > 0)
156 y.AssertTrue(ranges[0].left == nil)
157 y.AssertTrue(ranges[len(ranges)-1].right == nil)
158 st.db.opt.Infof("Number of ranges found: %d\n", len(ranges))
159
160 // Sort in descending order of size.
161 sort.Slice(ranges, func(i, j int) bool {
162 return ranges[i].size > ranges[j].size
163 })
164 for i, r := range ranges {
165 st.rangeCh <- *r
166 st.db.opt.Infof("Sent range %d for iteration: [%x, %x) of size: %s\n",
167 i, r.left, r.right, humanize.IBytes(uint64(r.size)))
168 }
169 close(st.rangeCh)
170 }
171
172 // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan.
173 func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
174 st.numProducers.Add(1)
175 defer st.numProducers.Add(-1)
176
177 var txn *Txn
178 if st.readTs > 0 {
179 txn = st.db.NewTransactionAt(st.readTs, false)
180 } else {
181 txn = st.db.NewTransaction(false)
182 }
183 defer txn.Discard()
184
185 // produceKVs is running iterate serially. So, we can define the outList here.
186 outList := z.NewBuffer(2*batchSize, "Stream.ProduceKVs")
187 defer func() {
188 // The outList variable changes. So, we need to evaluate the variable in the defer. DO NOT
189 // call `defer outList.Release()`.
190 _ = outList.Release()
191 }()
192
193 iterate := func(kr keyRange) error {
194 iterOpts := DefaultIteratorOptions
195 iterOpts.AllVersions = true
196 iterOpts.Prefix = st.Prefix
197 iterOpts.PrefetchValues = true
198 iterOpts.SinceTs = st.SinceTs
199 itr := txn.NewIterator(iterOpts)
200 itr.ThreadId = threadId
201 defer itr.Close()
202
203 itr.Alloc = z.NewAllocator(1<<20, "Stream.Iterate")
204 defer itr.Alloc.Release()
205
206 // This unique stream id is used to identify all the keys from this iteration.
207 streamId := st.nextStreamId.Add(1)
208 var scanned int
209
210 sendIt := func() error {
211 select {
212 case st.kvChan <- outList:
213 outList = z.NewBuffer(2*batchSize, "Stream.ProduceKVs")
214 st.scanned.Add(uint64(itr.scanned - scanned))
215 scanned = itr.scanned
216 case <-ctx.Done():
217 return ctx.Err()
218 }
219 return nil
220 }
221
222 var prevKey []byte
223 for itr.Seek(kr.left); itr.Valid(); {
224 // it.Valid would only return true for keys with the provided Prefix in iterOpts.
225 item := itr.Item()
226 if bytes.Equal(item.Key(), prevKey) {
227 itr.Next()
228 continue
229 }
230 prevKey = append(prevKey[:0], item.Key()...)
231
232 // Check if we reached the end of the key range.
233 if len(kr.right) > 0 && bytes.Compare(item.Key(), kr.right) >= 0 {
234 break
235 }
236
237 // Check if we should pick this key.
238 if st.ChooseKey != nil && !st.ChooseKey(item) {
239 continue
240 }
241
242 // Now convert to key value.
243 itr.Alloc.Reset()
244 var list *pb.KVList
245 var err error
246 if st.UseKeyToListWithThreadId {
247 list, err = st.KeyToListWithThreadId(item.KeyCopy(nil), itr, threadId)
248 } else {
249 list, err = st.KeyToList(item.KeyCopy(nil), itr)
250 }
251 if err != nil {
252 st.db.opt.Warningf("While reading key: %x, got error: %v", item.Key(), err)
253 continue
254 }
255 if list == nil || len(list.Kv) == 0 {
256 continue
257 }
258 for _, kv := range list.Kv {
259 kv.StreamId = streamId
260 KVToBuffer(kv, outList)
261 if outList.LenNoPadding() < batchSize {
262 continue
263 }
264 if err := sendIt(); err != nil {
265 return err
266 }
267 }
268 }
269
270 if st.UseKeyToListWithThreadId {
271 if kvs, err := st.FinishThread(threadId); err != nil {
272 return err
273 } else {
274 for _, kv := range kvs.Kv {
275 kv.StreamId = streamId
276 KVToBuffer(kv, outList)
277 if outList.LenNoPadding() < batchSize {
278 continue
279 }
280 if err := sendIt(); err != nil {
281 return err
282 }
283 }
284 }
285 }
286 // Mark the stream as done.
287 if st.doneMarkers {
288 kv := &pb.KV{
289 StreamId: streamId,
290 StreamDone: true,
291 }
292 KVToBuffer(kv, outList)
293 }
294 return sendIt()
295 }
296
297 for {
298 select {
299 case kr, ok := <-st.rangeCh:
300 if !ok {
301 // Done with the keys.
302 return nil
303 }
304 if err := iterate(kr); err != nil {
305 return err
306 }
307 case <-ctx.Done():
308 return ctx.Err()
309 }
310 }
311 }
312
313 func (st *Stream) streamKVs(ctx context.Context) error {
314 onDiskSize, uncompressedSize := st.db.EstimateSize(st.Prefix)
315 // Manish has seen uncompressed size to be in 20% error margin.
316 uncompressedSize = uint64(float64(uncompressedSize) * 1.2)
317 st.db.opt.Infof("%s Streaming about %s of uncompressed data (%s on disk)\n",
318 st.LogPrefix, humanize.IBytes(uncompressedSize), humanize.IBytes(onDiskSize))
319
320 tickerDur := 5 * time.Second
321 var bytesSent uint64
322 t := time.NewTicker(tickerDur)
323 defer t.Stop()
324 now := time.Now()
325
326 sendBatch := func(batch *z.Buffer) error {
327 defer func() { _ = batch.Release() }()
328 sz := uint64(batch.LenNoPadding())
329 if sz == 0 {
330 return nil
331 }
332 bytesSent += sz
333 // st.db.opt.Infof("%s Sending batch of size: %s.\n", st.LogPrefix, humanize.IBytes(sz))
334 if err := st.Send(batch); err != nil {
335 st.db.opt.Warningf("Error while sending: %v\n", err)
336 return err
337 }
338 return nil
339 }
340
341 slurp := func(batch *z.Buffer) error {
342 loop:
343 for {
344 // Send the batch immediately if it already exceeds the maximum allowed size.
345 // If the size of the batch exceeds maxStreamSize, break from the loop to
346 // avoid creating a batch that is so big that certain limits are reached.
347 if uint64(batch.LenNoPadding()) > st.MaxSize {
348 break loop
349 }
350 select {
351 case kvs, ok := <-st.kvChan:
352 if !ok {
353 break loop
354 }
355 y.AssertTrue(kvs != nil)
356 y.Check2(batch.Write(kvs.Bytes()))
357 y.Check(kvs.Release())
358
359 default:
360 break loop
361 }
362 }
363 return sendBatch(batch)
364 } // end of slurp.
365
366 writeRate := y.NewRateMonitor(20)
367 scanRate := y.NewRateMonitor(20)
368 outer:
369 for {
370 var batch *z.Buffer
371 select {
372 case <-ctx.Done():
373 return ctx.Err()
374
375 case <-t.C:
376 // Instead of calculating speed over the entire lifetime, we average the speed over
377 // ticker duration.
378 writeRate.Capture(bytesSent)
379 scanned := st.scanned.Load()
380 scanRate.Capture(scanned)
381 numProducers := st.numProducers.Load()
382
383 st.db.opt.Infof("%s [%s] Scan (%d): ~%s/%s at %s/sec. Sent: %s at %s/sec."+
384 " jemalloc: %s\n",
385 st.LogPrefix, y.FixedDuration(time.Since(now)), numProducers,
386 y.IBytesToString(scanned, 1), humanize.IBytes(uncompressedSize),
387 humanize.IBytes(scanRate.Rate()),
388 y.IBytesToString(bytesSent, 1), humanize.IBytes(writeRate.Rate()),
389 humanize.IBytes(uint64(z.NumAllocBytes())))
390
391 case kvs, ok := <-st.kvChan:
392 if !ok {
393 break outer
394 }
395 y.AssertTrue(kvs != nil)
396 batch = kvs
397
398 // Otherwise, slurp more keys into this batch.
399 if err := slurp(batch); err != nil {
400 return err
401 }
402 }
403 }
404
405 st.db.opt.Infof("%s Sent data of size %s\n", st.LogPrefix, humanize.IBytes(bytesSent))
406 return nil
407 }
408
409 // Orchestrate runs Stream. It picks up ranges from the SSTables, then runs NumGo number of
410 // goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single
411 // goroutine to pick these lists, batch them up further and send to Output.Send. Orchestrate also
412 // spits logs out to Infof, using provided LogPrefix. Note that all calls to Output.Send
413 // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and
414 // return that error. Orchestrate can be called multiple times, but in serial order.
415 func (st *Stream) Orchestrate(ctx context.Context) error {
416 ctx, cancel := context.WithCancel(ctx)
417 defer cancel()
418 st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists.
419
420 // kvChan should only have a small capacity to ensure that we don't buffer up too much data if
421 // sending is slow. Page size is set to 4MB, which is used to lazily cap the size of each
422 // KVList. To get 128MB buffer, we can set the channel size to 32.
423 st.kvChan = make(chan *z.Buffer, 32)
424
425 if st.KeyToList == nil {
426 st.KeyToList = st.ToList
427 }
428
429 // Picks up ranges from Badger, and sends them to rangeCh.
430 go st.produceRanges(ctx)
431
432 errCh := make(chan error, st.NumGo) // Stores error by consumeKeys.
433 var wg sync.WaitGroup
434 for i := 0; i < st.NumGo; i++ {
435 wg.Add(1)
436
437 go func(threadId int) {
438 defer wg.Done()
439 // Picks up ranges from rangeCh, generates KV lists, and sends them to kvChan.
440 if err := st.produceKVs(ctx, threadId); err != nil {
441 select {
442 case errCh <- err:
443 default:
444 }
445 }
446 }(i)
447 }
448
449 // Pick up key-values from kvChan and send to stream.
450 kvErr := make(chan error, 1)
451 go func() {
452 // Picks up KV lists from kvChan, and sends them to Output.
453 err := st.streamKVs(ctx)
454 if err != nil {
455 cancel() // Stop all the go routines.
456 }
457 kvErr <- err
458 }()
459 wg.Wait() // Wait for produceKVs to be over.
460 close(st.kvChan) // Now we can close kvChan.
461 defer func() {
462 // If due to some error, we have buffers left in kvChan, we should release them.
463 for buf := range st.kvChan {
464 _ = buf.Release()
465 }
466 }()
467
468 select {
469 case err := <-errCh: // Check error from produceKVs.
470 return err
471 default:
472 }
473
474 // Wait for key streaming to be over.
475 err := <-kvErr
476 return err
477 }
478
479 func (db *DB) newStream() *Stream {
480 return &Stream{
481 db: db,
482 NumGo: db.opt.NumGoroutines,
483 LogPrefix: "Badger.Stream",
484 MaxSize: maxStreamSize,
485 }
486 }
487
488 // NewStream creates a new Stream.
489 func (db *DB) NewStream() *Stream {
490 if db.opt.managedTxns {
491 panic("This API can not be called in managed mode.")
492 }
493 return db.newStream()
494 }
495
496 // NewStreamAt creates a new Stream at a particular timestamp. Should only be used with managed DB.
497 func (db *DB) NewStreamAt(readTs uint64) *Stream {
498 if !db.opt.managedTxns {
499 panic("This API can only be called in managed mode.")
500 }
501 stream := db.newStream()
502 stream.readTs = readTs
503 return stream
504 }
505
506 func BufferToKVList(buf *z.Buffer) (*pb.KVList, error) {
507 var list pb.KVList
508 err := buf.SliceIterate(func(s []byte) error {
509 kv := new(pb.KV)
510 if err := proto.Unmarshal(s, kv); err != nil {
511 return err
512 }
513 list.Kv = append(list.Kv, kv)
514 return nil
515 })
516 return &list, err
517 }
518
519 func KVToBuffer(kv *pb.KV, buf *z.Buffer) {
520 in := buf.SliceAllocate(proto.Size(kv))[:0]
521 _, err := proto.MarshalOptions{}.MarshalAppend(in, kv)
522 y.AssertTrue(err == nil)
523 }
524