stream_writer.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package badger
7
8 import (
9 "encoding/hex"
10 "fmt"
11 "sync"
12
13 "github.com/dustin/go-humanize"
14 "google.golang.org/protobuf/proto"
15
16 "github.com/dgraph-io/badger/v4/pb"
17 "github.com/dgraph-io/badger/v4/table"
18 "github.com/dgraph-io/badger/v4/y"
19 "github.com/dgraph-io/ristretto/v2/z"
20 )
21
22 // StreamWriter is used to write data coming from multiple streams. The streams must not have any
23 // overlapping key ranges. Within each stream, the keys must be sorted. Badger Stream framework is
24 // capable of generating such an output. So, this StreamWriter can be used at the other end to build
25 // BadgerDB at a much faster pace by writing SSTables (and value logs) directly to LSM tree levels
26 // without causing any compactions at all. This is way faster than using batched writer or using
27 // transactions, but only applicable in situations where the keys are pre-sorted and the DB is being
28 // bootstrapped. Existing data would get deleted when using this writer. So, this is only useful
29 // when restoring from backup or replicating DB across servers.
30 //
31 // StreamWriter should not be called on in-use DB instances. It is designed only to bootstrap new
32 // DBs.
33 type StreamWriter struct {
34 writeLock sync.Mutex
35 db *DB
36 done func()
37 throttle *y.Throttle
38 maxVersion uint64
39 writers map[uint32]*sortedWriter
40 prevLevel int
41 }
42
43 // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
44 // called. The memory usage of a StreamWriter is directly proportional to the number of streams
45 // possible. So, efforts must be made to keep the number of streams low. Stream framework would
46 // typically use 16 goroutines and hence create 16 streams.
47 func (db *DB) NewStreamWriter() *StreamWriter {
48 return &StreamWriter{
49 db: db,
50 // throttle shouldn't make much difference. Memory consumption is based on the number of
51 // concurrent streams being processed.
52 throttle: y.NewThrottle(16),
53 writers: make(map[uint32]*sortedWriter),
54 }
55 }
56
57 // Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
58 // existing DB, stops compactions and any writes being done by other means. Be very careful when
59 // calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
60 // in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
61 func (sw *StreamWriter) Prepare() error {
62 sw.writeLock.Lock()
63 defer sw.writeLock.Unlock()
64
65 done, err := sw.db.dropAll()
66 // Ensure that done() is never called more than once.
67 var once sync.Once
68 sw.done = func() { once.Do(done) }
69 return err
70 }
71
72 // PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
73 // In incremental stream write, the tables are written at one level above the current base level.
74 func (sw *StreamWriter) PrepareIncremental() error {
75 sw.writeLock.Lock()
76 defer sw.writeLock.Unlock()
77
78 // Ensure that done() is never called more than once.
79 var once sync.Once
80
81 // prepareToDrop will stop all the incoming writes and process any pending flush tasks.
82 // Before we start writing, we'll stop the compactions because no one else should be writing to
83 // the same level as the stream writer is writing to.
84 f, err := sw.db.prepareToDrop()
85 if err != nil {
86 sw.done = func() { once.Do(f) }
87 return err
88 }
89 sw.db.stopCompactions()
90 done := func() {
91 sw.db.startCompactions()
92 f()
93 }
94 sw.done = func() { once.Do(done) }
95
96 mts, decr := sw.db.getMemTables()
97 defer decr()
98 for _, m := range mts {
99 if !m.sl.Empty() {
100 return fmt.Errorf("Unable to do incremental writes because MemTable has data")
101 }
102 }
103
104 isEmptyDB := true
105 for _, level := range sw.db.Levels() {
106 if level.NumTables > 0 {
107 sw.prevLevel = level.Level
108 isEmptyDB = false
109 break
110 }
111 }
112 if isEmptyDB {
113 // If DB is empty, we should allow doing incremental stream write.
114 return nil
115 }
116 if sw.prevLevel == 0 {
117 // It seems that data is present in all levels from Lmax to L0. If we call flatten
118 // on the tree, all the data will go to Lmax. All the levels above will be empty
119 // after flatten call. Now, we should be able to use incremental stream writer again.
120 if err := sw.db.Flatten(3); err != nil {
121 return fmt.Errorf("error during flatten in StreamWriter: %w", err)
122 }
123 sw.prevLevel = len(sw.db.Levels()) - 1
124 }
125 return nil
126 }
127
128 // Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
129 // would use to demux the writes. Write is thread safe and can be called concurrently by multiple
130 // goroutines.
131 func (sw *StreamWriter) Write(buf *z.Buffer) error {
132 if buf.LenNoPadding() == 0 {
133 return nil
134 }
135
136 // closedStreams keeps track of all streams which are going to be marked as done. We are
137 // keeping track of all streams so that we can close them at the end, after inserting all
138 // the valid kvs.
139 closedStreams := make(map[uint32]struct{})
140 streamReqs := make(map[uint32]*request)
141
142 err := buf.SliceIterate(func(s []byte) error {
143 var kv pb.KV
144 if err := proto.Unmarshal(s, &kv); err != nil {
145 return err
146 }
147 if kv.StreamDone {
148 closedStreams[kv.StreamId] = struct{}{}
149 return nil
150 }
151
152 // Panic if some kv comes after stream has been marked as closed.
153 if _, ok := closedStreams[kv.StreamId]; ok {
154 panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId))
155 }
156
157 sw.writeLock.Lock()
158 if sw.maxVersion < kv.Version {
159 sw.maxVersion = kv.Version
160 }
161 if sw.prevLevel == 0 {
162 // If prevLevel is 0, that means that we have not written anything yet.
163 // So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
164 // so we can set prevLevel to len(levels).
165 sw.prevLevel = len(sw.db.lc.levels)
166 }
167 sw.writeLock.Unlock()
168
169 var meta, userMeta byte
170 if len(kv.Meta) > 0 {
171 meta = kv.Meta[0]
172 }
173 if len(kv.UserMeta) > 0 {
174 userMeta = kv.UserMeta[0]
175 }
176 e := &Entry{
177 Key: y.KeyWithTs(kv.Key, kv.Version),
178 Value: y.Copy(kv.Value),
179 UserMeta: userMeta,
180 ExpiresAt: kv.ExpiresAt,
181 meta: meta,
182 }
183 // If the value can be collocated with the key in LSM tree, we can skip
184 // writing the value to value log.
185 req := streamReqs[kv.StreamId]
186 if req == nil {
187 req = &request{}
188 streamReqs[kv.StreamId] = req
189 }
190 req.Entries = append(req.Entries, e)
191 return nil
192 })
193 if err != nil {
194 return err
195 }
196
197 all := make([]*request, 0, len(streamReqs))
198 for _, req := range streamReqs {
199 all = append(all, req)
200 }
201
202 sw.writeLock.Lock()
203 defer sw.writeLock.Unlock()
204
205 // We are writing all requests to vlog even if some request belongs to already closed stream.
206 // It is safe to do because we are panicking while writing to sorted writer, which will be nil
207 // for closed stream. At restart, stream writer will drop all the data in Prepare function.
208 if err := sw.db.vlog.write(all); err != nil {
209 return err
210 }
211
212 for streamID, req := range streamReqs {
213 writer, ok := sw.writers[streamID]
214 if !ok {
215 var err error
216 writer, err = sw.newWriter(streamID)
217 if err != nil {
218 return y.Wrapf(err, "failed to create writer with ID %d", streamID)
219 }
220 sw.writers[streamID] = writer
221 }
222
223 if writer == nil {
224 panic(fmt.Sprintf("write performed on closed stream: %d", streamID))
225 }
226
227 writer.reqCh <- req
228 }
229
230 // Now we can close any streams if required. We will make writer for
231 // the closed streams as nil.
232 for streamId := range closedStreams {
233 writer, ok := sw.writers[streamId]
234 if !ok {
235 sw.db.opt.Warningf("Trying to close stream: %d, but no sorted "+
236 "writer found for it", streamId)
237 continue
238 }
239
240 writer.closer.SignalAndWait()
241 if err := writer.Done(); err != nil {
242 return err
243 }
244
245 sw.writers[streamId] = nil
246 }
247 return nil
248 }
249
250 // Flush is called once we are done writing all the entries. It syncs DB directories. It also
251 // updates Oracle with maxVersion found in all entries (if DB is not managed).
252 func (sw *StreamWriter) Flush() error {
253 sw.writeLock.Lock()
254 defer sw.writeLock.Unlock()
255
256 defer sw.done()
257
258 for _, writer := range sw.writers {
259 if writer != nil {
260 writer.closer.SignalAndWait()
261 }
262 }
263
264 for _, writer := range sw.writers {
265 if writer == nil {
266 continue
267 }
268 if err := writer.Done(); err != nil {
269 return err
270 }
271 }
272
273 if !sw.db.opt.managedTxns {
274 if sw.db.orc != nil {
275 sw.db.orc.Stop()
276 }
277
278 if curMax := sw.db.orc.readTs(); curMax >= sw.maxVersion {
279 sw.maxVersion = curMax
280 }
281
282 sw.db.orc = newOracle(sw.db.opt)
283 sw.db.orc.nextTxnTs = sw.maxVersion
284 sw.db.orc.txnMark.Done(sw.maxVersion)
285 sw.db.orc.readMark.Done(sw.maxVersion)
286 sw.db.orc.incrementNextTs()
287 }
288
289 // Wait for all files to be written.
290 if err := sw.throttle.Finish(); err != nil {
291 return err
292 }
293
294 // Sort tables at the end.
295 for _, l := range sw.db.lc.levels {
296 l.sortTables()
297 }
298
299 // Now sync the directories, so all the files are registered.
300 if sw.db.opt.ValueDir != sw.db.opt.Dir {
301 if err := sw.db.syncDir(sw.db.opt.ValueDir); err != nil {
302 return err
303 }
304 }
305 if err := sw.db.syncDir(sw.db.opt.Dir); err != nil {
306 return err
307 }
308 return sw.db.lc.validate()
309 }
310
311 // Cancel signals all goroutines to exit. Calling defer sw.Cancel() immediately after creating a new StreamWriter
312 // ensures that writes are unblocked even upon early return. Note that dropAll() is not called here, so any
313 // partially written data will not be erased until a new StreamWriter is initialized.
314 func (sw *StreamWriter) Cancel() {
315 sw.writeLock.Lock()
316 defer sw.writeLock.Unlock()
317
318 for _, writer := range sw.writers {
319 if writer != nil {
320 writer.closer.Signal()
321 }
322 }
323 for _, writer := range sw.writers {
324 if writer != nil {
325 writer.closer.Wait()
326 }
327 }
328
329 if err := sw.throttle.Finish(); err != nil {
330 sw.db.opt.Errorf("error in throttle.Finish: %+v", err)
331 }
332
333 // Handle Cancel() being called before Prepare().
334 if sw.done != nil {
335 sw.done()
336 }
337 }
338
339 type sortedWriter struct {
340 db *DB
341 throttle *y.Throttle
342 opts table.Options
343
344 builder *table.Builder
345 lastKey []byte
346 level int
347 streamID uint32
348 reqCh chan *request
349 // Have separate closer for each writer, as it can be closed at any time.
350 closer *z.Closer
351 }
352
353 func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) {
354 bopts := buildTableOptions(sw.db)
355 for i := 2; i < sw.db.opt.MaxLevels; i++ {
356 bopts.TableSize *= uint64(sw.db.opt.TableSizeMultiplier)
357 }
358 w := &sortedWriter{
359 db: sw.db,
360 opts: bopts,
361 streamID: streamID,
362 throttle: sw.throttle,
363 builder: table.NewTableBuilder(bopts),
364 reqCh: make(chan *request, 3),
365 closer: z.NewCloser(1),
366 level: sw.prevLevel - 1, // Write at the level just above the one we were writing to.
367 }
368
369 go w.handleRequests()
370 return w, nil
371 }
372
373 func (w *sortedWriter) handleRequests() {
374 defer w.closer.Done()
375
376 process := func(req *request) {
377 for i, e := range req.Entries {
378 // If badger is running in InMemory mode, len(req.Ptrs) == 0.
379 var vs y.ValueStruct
380 if e.skipVlogAndSetThreshold(w.db.valueThreshold()) {
381 vs = y.ValueStruct{
382 Value: e.Value,
383 Meta: e.meta,
384 UserMeta: e.UserMeta,
385 ExpiresAt: e.ExpiresAt,
386 }
387 } else {
388 vptr := req.Ptrs[i]
389 vs = y.ValueStruct{
390 Value: vptr.Encode(),
391 Meta: e.meta | bitValuePointer,
392 UserMeta: e.UserMeta,
393 ExpiresAt: e.ExpiresAt,
394 }
395 }
396 if err := w.Add(e.Key, vs); err != nil {
397 panic(err)
398 }
399 }
400 }
401
402 for {
403 select {
404 case req := <-w.reqCh:
405 process(req)
406 case <-w.closer.HasBeenClosed():
407 close(w.reqCh)
408 for req := range w.reqCh {
409 process(req)
410 }
411 return
412 }
413 }
414 }
415
416 // Add adds key and vs to sortedWriter.
417 func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error {
418 if len(w.lastKey) > 0 && y.CompareKeys(key, w.lastKey) <= 0 {
419 return fmt.Errorf("keys not in sorted order (last key: %s, key: %s)",
420 hex.Dump(w.lastKey), hex.Dump(key))
421 }
422
423 sameKey := y.SameKey(key, w.lastKey)
424
425 // Same keys should go into the same SSTable.
426 if !sameKey && w.builder.ReachedCapacity() {
427 if err := w.send(false); err != nil {
428 return err
429 }
430 }
431
432 w.lastKey = y.SafeCopy(w.lastKey, key)
433 var vp valuePointer
434 if vs.Meta&bitValuePointer > 0 {
435 vp.Decode(vs.Value)
436 }
437
438 w.builder.Add(key, vs, vp.Len)
439 return nil
440 }
441
442 func (w *sortedWriter) send(done bool) error {
443 if err := w.throttle.Do(); err != nil {
444 return err
445 }
446 go func(builder *table.Builder) {
447 err := w.createTable(builder)
448 w.throttle.Done(err)
449 }(w.builder)
450 // If done is true, this indicates we can close the writer.
451 // No need to allocate underlying TableBuilder now.
452 if done {
453 w.builder = nil
454 return nil
455 }
456
457 w.builder = table.NewTableBuilder(w.opts)
458 return nil
459 }
460
461 // Done is called once we are done writing all keys and valueStructs
462 // to sortedWriter. It completes writing current SST to disk.
463 func (w *sortedWriter) Done() error {
464 if w.builder.Empty() {
465 w.builder.Close()
466 // Assign builder as nil, so that underlying memory can be garbage collected.
467 w.builder = nil
468 return nil
469 }
470
471 return w.send(true)
472 }
473
474 func (w *sortedWriter) createTable(builder *table.Builder) error {
475 defer builder.Close()
476 if builder.Empty() {
477 builder.Finish()
478 return nil
479 }
480
481 fileID := w.db.lc.reserveFileID()
482 var tbl *table.Table
483 if w.db.opt.InMemory {
484 data := builder.Finish()
485 var err error
486 if tbl, err = table.OpenInMemoryTable(data, fileID, builder.Opts()); err != nil {
487 return err
488 }
489 } else {
490 var err error
491 fname := table.NewFilename(fileID, w.db.opt.Dir)
492 if tbl, err = table.CreateTable(fname, builder); err != nil {
493 return err
494 }
495 }
496 lc := w.db.lc
497
498 lhandler := lc.levels[w.level]
499 // Now that table can be opened successfully, let's add this to the MANIFEST.
500 change := &pb.ManifestChange{
501 Id: tbl.ID(),
502 KeyId: tbl.KeyID(),
503 Op: pb.ManifestChange_CREATE,
504 Level: uint32(lhandler.level),
505 Compression: uint32(tbl.CompressionType()),
506 }
507 if err := w.db.manifest.addChanges([]*pb.ManifestChange{change}, w.db.opt); err != nil {
508 return err
509 }
510
511 // We are not calling lhandler.replaceTables() here, as it sorts tables on every addition.
512 // We can sort all tables only once during Flush() call.
513 lhandler.addTable(tbl)
514
515 // Release the ref held by OpenTable.
516 _ = tbl.DecrRef()
517 w.db.opt.Infof("Table created: %d at level: %d for stream: %d. Size: %s\n",
518 fileID, lhandler.level, w.streamID, humanize.IBytes(uint64(tbl.Size())))
519 return nil
520 }
521