stream_writer.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package badger
   7  
   8  import (
   9  	"encoding/hex"
  10  	"fmt"
  11  	"sync"
  12  
  13  	"github.com/dustin/go-humanize"
  14  	"google.golang.org/protobuf/proto"
  15  
  16  	"github.com/dgraph-io/badger/v4/pb"
  17  	"github.com/dgraph-io/badger/v4/table"
  18  	"github.com/dgraph-io/badger/v4/y"
  19  	"github.com/dgraph-io/ristretto/v2/z"
  20  )
  21  
  22  // StreamWriter is used to write data coming from multiple streams. The streams must not have any
  23  // overlapping key ranges. Within each stream, the keys must be sorted. Badger Stream framework is
  24  // capable of generating such an output. So, this StreamWriter can be used at the other end to build
  25  // BadgerDB at a much faster pace by writing SSTables (and value logs) directly to LSM tree levels
  26  // without causing any compactions at all. This is way faster than using batched writer or using
  27  // transactions, but only applicable in situations where the keys are pre-sorted and the DB is being
  28  // bootstrapped. Existing data would get deleted when using this writer. So, this is only useful
  29  // when restoring from backup or replicating DB across servers.
  30  //
  31  // StreamWriter should not be called on in-use DB instances. It is designed only to bootstrap new
  32  // DBs.
  33  type StreamWriter struct {
  34  	writeLock  sync.Mutex
  35  	db         *DB
  36  	done       func()
  37  	throttle   *y.Throttle
  38  	maxVersion uint64
  39  	writers    map[uint32]*sortedWriter
  40  	prevLevel  int
  41  }
  42  
  43  // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
  44  // called. The memory usage of a StreamWriter is directly proportional to the number of streams
  45  // possible. So, efforts must be made to keep the number of streams low. Stream framework would
  46  // typically use 16 goroutines and hence create 16 streams.
  47  func (db *DB) NewStreamWriter() *StreamWriter {
  48  	return &StreamWriter{
  49  		db: db,
  50  		// throttle shouldn't make much difference. Memory consumption is based on the number of
  51  		// concurrent streams being processed.
  52  		throttle: y.NewThrottle(16),
  53  		writers:  make(map[uint32]*sortedWriter),
  54  	}
  55  }
  56  
  57  // Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
  58  // existing DB, stops compactions and any writes being done by other means. Be very careful when
  59  // calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
  60  // in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
  61  func (sw *StreamWriter) Prepare() error {
  62  	sw.writeLock.Lock()
  63  	defer sw.writeLock.Unlock()
  64  
  65  	done, err := sw.db.dropAll()
  66  	// Ensure that done() is never called more than once.
  67  	var once sync.Once
  68  	sw.done = func() { once.Do(done) }
  69  	return err
  70  }
  71  
  72  // PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
  73  // In incremental stream write, the tables are written at one level above the current base level.
  74  func (sw *StreamWriter) PrepareIncremental() error {
  75  	sw.writeLock.Lock()
  76  	defer sw.writeLock.Unlock()
  77  
  78  	// Ensure that done() is never called more than once.
  79  	var once sync.Once
  80  
  81  	// prepareToDrop will stop all the incoming writes and process any pending flush tasks.
  82  	// Before we start writing, we'll stop the compactions because no one else should be writing to
  83  	// the same level as the stream writer is writing to.
  84  	f, err := sw.db.prepareToDrop()
  85  	if err != nil {
  86  		sw.done = func() { once.Do(f) }
  87  		return err
  88  	}
  89  	sw.db.stopCompactions()
  90  	done := func() {
  91  		sw.db.startCompactions()
  92  		f()
  93  	}
  94  	sw.done = func() { once.Do(done) }
  95  
  96  	mts, decr := sw.db.getMemTables()
  97  	defer decr()
  98  	for _, m := range mts {
  99  		if !m.sl.Empty() {
 100  			return fmt.Errorf("Unable to do incremental writes because MemTable has data")
 101  		}
 102  	}
 103  
 104  	isEmptyDB := true
 105  	for _, level := range sw.db.Levels() {
 106  		if level.NumTables > 0 {
 107  			sw.prevLevel = level.Level
 108  			isEmptyDB = false
 109  			break
 110  		}
 111  	}
 112  	if isEmptyDB {
 113  		// If DB is empty, we should allow doing incremental stream write.
 114  		return nil
 115  	}
 116  	if sw.prevLevel == 0 {
 117  		// It seems that data is present in all levels from Lmax to L0. If we call flatten
 118  		// on the tree, all the data will go to Lmax. All the levels above will be empty
 119  		// after flatten call. Now, we should be able to use incremental stream writer again.
 120  		if err := sw.db.Flatten(3); err != nil {
 121  			return fmt.Errorf("error during flatten in StreamWriter: %w", err)
 122  		}
 123  		sw.prevLevel = len(sw.db.Levels()) - 1
 124  	}
 125  	return nil
 126  }
 127  
 128  // Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
 129  // would use to demux the writes. Write is thread safe and can be called concurrently by multiple
 130  // goroutines.
 131  func (sw *StreamWriter) Write(buf *z.Buffer) error {
 132  	if buf.LenNoPadding() == 0 {
 133  		return nil
 134  	}
 135  
 136  	// closedStreams keeps track of all streams which are going to be marked as done. We are
 137  	// keeping track of all streams so that we can close them at the end, after inserting all
 138  	// the valid kvs.
 139  	closedStreams := make(map[uint32]struct{})
 140  	streamReqs := make(map[uint32]*request)
 141  
 142  	err := buf.SliceIterate(func(s []byte) error {
 143  		var kv pb.KV
 144  		if err := proto.Unmarshal(s, &kv); err != nil {
 145  			return err
 146  		}
 147  		if kv.StreamDone {
 148  			closedStreams[kv.StreamId] = struct{}{}
 149  			return nil
 150  		}
 151  
 152  		// Panic if some kv comes after stream has been marked as closed.
 153  		if _, ok := closedStreams[kv.StreamId]; ok {
 154  			panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId))
 155  		}
 156  
 157  		sw.writeLock.Lock()
 158  		if sw.maxVersion < kv.Version {
 159  			sw.maxVersion = kv.Version
 160  		}
 161  		if sw.prevLevel == 0 {
 162  			// If prevLevel is 0, that means that we have not written anything yet.
 163  			// So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
 164  			// so we can set prevLevel to len(levels).
 165  			sw.prevLevel = len(sw.db.lc.levels)
 166  		}
 167  		sw.writeLock.Unlock()
 168  
 169  		var meta, userMeta byte
 170  		if len(kv.Meta) > 0 {
 171  			meta = kv.Meta[0]
 172  		}
 173  		if len(kv.UserMeta) > 0 {
 174  			userMeta = kv.UserMeta[0]
 175  		}
 176  		e := &Entry{
 177  			Key:       y.KeyWithTs(kv.Key, kv.Version),
 178  			Value:     y.Copy(kv.Value),
 179  			UserMeta:  userMeta,
 180  			ExpiresAt: kv.ExpiresAt,
 181  			meta:      meta,
 182  		}
 183  		// If the value can be collocated with the key in LSM tree, we can skip
 184  		// writing the value to value log.
 185  		req := streamReqs[kv.StreamId]
 186  		if req == nil {
 187  			req = &request{}
 188  			streamReqs[kv.StreamId] = req
 189  		}
 190  		req.Entries = append(req.Entries, e)
 191  		return nil
 192  	})
 193  	if err != nil {
 194  		return err
 195  	}
 196  
 197  	all := make([]*request, 0, len(streamReqs))
 198  	for _, req := range streamReqs {
 199  		all = append(all, req)
 200  	}
 201  
 202  	sw.writeLock.Lock()
 203  	defer sw.writeLock.Unlock()
 204  
 205  	// We are writing all requests to vlog even if some request belongs to already closed stream.
 206  	// It is safe to do because we are panicking while writing to sorted writer, which will be nil
 207  	// for closed stream. At restart, stream writer will drop all the data in Prepare function.
 208  	if err := sw.db.vlog.write(all); err != nil {
 209  		return err
 210  	}
 211  
 212  	for streamID, req := range streamReqs {
 213  		writer, ok := sw.writers[streamID]
 214  		if !ok {
 215  			var err error
 216  			writer, err = sw.newWriter(streamID)
 217  			if err != nil {
 218  				return y.Wrapf(err, "failed to create writer with ID %d", streamID)
 219  			}
 220  			sw.writers[streamID] = writer
 221  		}
 222  
 223  		if writer == nil {
 224  			panic(fmt.Sprintf("write performed on closed stream: %d", streamID))
 225  		}
 226  
 227  		writer.reqCh <- req
 228  	}
 229  
 230  	// Now we can close any streams if required. We will make writer for
 231  	// the closed streams as nil.
 232  	for streamId := range closedStreams {
 233  		writer, ok := sw.writers[streamId]
 234  		if !ok {
 235  			sw.db.opt.Warningf("Trying to close stream: %d, but no sorted "+
 236  				"writer found for it", streamId)
 237  			continue
 238  		}
 239  
 240  		writer.closer.SignalAndWait()
 241  		if err := writer.Done(); err != nil {
 242  			return err
 243  		}
 244  
 245  		sw.writers[streamId] = nil
 246  	}
 247  	return nil
 248  }
 249  
 250  // Flush is called once we are done writing all the entries. It syncs DB directories. It also
 251  // updates Oracle with maxVersion found in all entries (if DB is not managed).
 252  func (sw *StreamWriter) Flush() error {
 253  	sw.writeLock.Lock()
 254  	defer sw.writeLock.Unlock()
 255  
 256  	defer sw.done()
 257  
 258  	for _, writer := range sw.writers {
 259  		if writer != nil {
 260  			writer.closer.SignalAndWait()
 261  		}
 262  	}
 263  
 264  	for _, writer := range sw.writers {
 265  		if writer == nil {
 266  			continue
 267  		}
 268  		if err := writer.Done(); err != nil {
 269  			return err
 270  		}
 271  	}
 272  
 273  	if !sw.db.opt.managedTxns {
 274  		if sw.db.orc != nil {
 275  			sw.db.orc.Stop()
 276  		}
 277  
 278  		if curMax := sw.db.orc.readTs(); curMax >= sw.maxVersion {
 279  			sw.maxVersion = curMax
 280  		}
 281  
 282  		sw.db.orc = newOracle(sw.db.opt)
 283  		sw.db.orc.nextTxnTs = sw.maxVersion
 284  		sw.db.orc.txnMark.Done(sw.maxVersion)
 285  		sw.db.orc.readMark.Done(sw.maxVersion)
 286  		sw.db.orc.incrementNextTs()
 287  	}
 288  
 289  	// Wait for all files to be written.
 290  	if err := sw.throttle.Finish(); err != nil {
 291  		return err
 292  	}
 293  
 294  	// Sort tables at the end.
 295  	for _, l := range sw.db.lc.levels {
 296  		l.sortTables()
 297  	}
 298  
 299  	// Now sync the directories, so all the files are registered.
 300  	if sw.db.opt.ValueDir != sw.db.opt.Dir {
 301  		if err := sw.db.syncDir(sw.db.opt.ValueDir); err != nil {
 302  			return err
 303  		}
 304  	}
 305  	if err := sw.db.syncDir(sw.db.opt.Dir); err != nil {
 306  		return err
 307  	}
 308  	return sw.db.lc.validate()
 309  }
 310  
 311  // Cancel signals all goroutines to exit. Calling defer sw.Cancel() immediately after creating a new StreamWriter
 312  // ensures that writes are unblocked even upon early return. Note that dropAll() is not called here, so any
 313  // partially written data will not be erased until a new StreamWriter is initialized.
 314  func (sw *StreamWriter) Cancel() {
 315  	sw.writeLock.Lock()
 316  	defer sw.writeLock.Unlock()
 317  
 318  	for _, writer := range sw.writers {
 319  		if writer != nil {
 320  			writer.closer.Signal()
 321  		}
 322  	}
 323  	for _, writer := range sw.writers {
 324  		if writer != nil {
 325  			writer.closer.Wait()
 326  		}
 327  	}
 328  
 329  	if err := sw.throttle.Finish(); err != nil {
 330  		sw.db.opt.Errorf("error in throttle.Finish: %+v", err)
 331  	}
 332  
 333  	// Handle Cancel() being called before Prepare().
 334  	if sw.done != nil {
 335  		sw.done()
 336  	}
 337  }
 338  
 339  type sortedWriter struct {
 340  	db       *DB
 341  	throttle *y.Throttle
 342  	opts     table.Options
 343  
 344  	builder  *table.Builder
 345  	lastKey  []byte
 346  	level    int
 347  	streamID uint32
 348  	reqCh    chan *request
 349  	// Have separate closer for each writer, as it can be closed at any time.
 350  	closer *z.Closer
 351  }
 352  
 353  func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) {
 354  	bopts := buildTableOptions(sw.db)
 355  	for i := 2; i < sw.db.opt.MaxLevels; i++ {
 356  		bopts.TableSize *= uint64(sw.db.opt.TableSizeMultiplier)
 357  	}
 358  	w := &sortedWriter{
 359  		db:       sw.db,
 360  		opts:     bopts,
 361  		streamID: streamID,
 362  		throttle: sw.throttle,
 363  		builder:  table.NewTableBuilder(bopts),
 364  		reqCh:    make(chan *request, 3),
 365  		closer:   z.NewCloser(1),
 366  		level:    sw.prevLevel - 1, // Write at the level just above the one we were writing to.
 367  	}
 368  
 369  	go w.handleRequests()
 370  	return w, nil
 371  }
 372  
 373  func (w *sortedWriter) handleRequests() {
 374  	defer w.closer.Done()
 375  
 376  	process := func(req *request) {
 377  		for i, e := range req.Entries {
 378  			// If badger is running in InMemory mode, len(req.Ptrs) == 0.
 379  			var vs y.ValueStruct
 380  			if e.skipVlogAndSetThreshold(w.db.valueThreshold()) {
 381  				vs = y.ValueStruct{
 382  					Value:     e.Value,
 383  					Meta:      e.meta,
 384  					UserMeta:  e.UserMeta,
 385  					ExpiresAt: e.ExpiresAt,
 386  				}
 387  			} else {
 388  				vptr := req.Ptrs[i]
 389  				vs = y.ValueStruct{
 390  					Value:     vptr.Encode(),
 391  					Meta:      e.meta | bitValuePointer,
 392  					UserMeta:  e.UserMeta,
 393  					ExpiresAt: e.ExpiresAt,
 394  				}
 395  			}
 396  			if err := w.Add(e.Key, vs); err != nil {
 397  				panic(err)
 398  			}
 399  		}
 400  	}
 401  
 402  	for {
 403  		select {
 404  		case req := <-w.reqCh:
 405  			process(req)
 406  		case <-w.closer.HasBeenClosed():
 407  			close(w.reqCh)
 408  			for req := range w.reqCh {
 409  				process(req)
 410  			}
 411  			return
 412  		}
 413  	}
 414  }
 415  
 416  // Add adds key and vs to sortedWriter.
 417  func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error {
 418  	if len(w.lastKey) > 0 && y.CompareKeys(key, w.lastKey) <= 0 {
 419  		return fmt.Errorf("keys not in sorted order (last key: %s, key: %s)",
 420  			hex.Dump(w.lastKey), hex.Dump(key))
 421  	}
 422  
 423  	sameKey := y.SameKey(key, w.lastKey)
 424  
 425  	// Same keys should go into the same SSTable.
 426  	if !sameKey && w.builder.ReachedCapacity() {
 427  		if err := w.send(false); err != nil {
 428  			return err
 429  		}
 430  	}
 431  
 432  	w.lastKey = y.SafeCopy(w.lastKey, key)
 433  	var vp valuePointer
 434  	if vs.Meta&bitValuePointer > 0 {
 435  		vp.Decode(vs.Value)
 436  	}
 437  
 438  	w.builder.Add(key, vs, vp.Len)
 439  	return nil
 440  }
 441  
 442  func (w *sortedWriter) send(done bool) error {
 443  	if err := w.throttle.Do(); err != nil {
 444  		return err
 445  	}
 446  	go func(builder *table.Builder) {
 447  		err := w.createTable(builder)
 448  		w.throttle.Done(err)
 449  	}(w.builder)
 450  	// If done is true, this indicates we can close the writer.
 451  	// No need to allocate underlying TableBuilder now.
 452  	if done {
 453  		w.builder = nil
 454  		return nil
 455  	}
 456  
 457  	w.builder = table.NewTableBuilder(w.opts)
 458  	return nil
 459  }
 460  
 461  // Done is called once we are done writing all keys and valueStructs
 462  // to sortedWriter. It completes writing current SST to disk.
 463  func (w *sortedWriter) Done() error {
 464  	if w.builder.Empty() {
 465  		w.builder.Close()
 466  		// Assign builder as nil, so that underlying memory can be garbage collected.
 467  		w.builder = nil
 468  		return nil
 469  	}
 470  
 471  	return w.send(true)
 472  }
 473  
 474  func (w *sortedWriter) createTable(builder *table.Builder) error {
 475  	defer builder.Close()
 476  	if builder.Empty() {
 477  		builder.Finish()
 478  		return nil
 479  	}
 480  
 481  	fileID := w.db.lc.reserveFileID()
 482  	var tbl *table.Table
 483  	if w.db.opt.InMemory {
 484  		data := builder.Finish()
 485  		var err error
 486  		if tbl, err = table.OpenInMemoryTable(data, fileID, builder.Opts()); err != nil {
 487  			return err
 488  		}
 489  	} else {
 490  		var err error
 491  		fname := table.NewFilename(fileID, w.db.opt.Dir)
 492  		if tbl, err = table.CreateTable(fname, builder); err != nil {
 493  			return err
 494  		}
 495  	}
 496  	lc := w.db.lc
 497  
 498  	lhandler := lc.levels[w.level]
 499  	// Now that table can be opened successfully, let's add this to the MANIFEST.
 500  	change := &pb.ManifestChange{
 501  		Id:          tbl.ID(),
 502  		KeyId:       tbl.KeyID(),
 503  		Op:          pb.ManifestChange_CREATE,
 504  		Level:       uint32(lhandler.level),
 505  		Compression: uint32(tbl.CompressionType()),
 506  	}
 507  	if err := w.db.manifest.addChanges([]*pb.ManifestChange{change}, w.db.opt); err != nil {
 508  		return err
 509  	}
 510  
 511  	// We are not calling lhandler.replaceTables() here, as it sorts tables on every addition.
 512  	// We can sort all tables only once during Flush() call.
 513  	lhandler.addTable(tbl)
 514  
 515  	// Release the ref held by OpenTable.
 516  	_ = tbl.DecrRef()
 517  	w.db.opt.Infof("Table created: %d at level: %d for stream: %d. Size: %s\n",
 518  		fileID, lhandler.level, w.streamID, humanize.IBytes(uint64(tbl.Size())))
 519  	return nil
 520  }
 521