stream.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package badger
   7  
   8  import (
   9  	"bytes"
  10  	"context"
  11  	"sort"
  12  	"sync"
  13  	"sync/atomic"
  14  	"time"
  15  
  16  	humanize "github.com/dustin/go-humanize"
  17  	"google.golang.org/protobuf/proto"
  18  
  19  	"github.com/dgraph-io/badger/v4/pb"
  20  	"github.com/dgraph-io/badger/v4/y"
  21  	"github.com/dgraph-io/ristretto/v2/z"
  22  )
  23  
  24  const batchSize = 16 << 20 // 16 MB
  25  
  26  // maxStreamSize is the maximum allowed size of a stream batch. This is a soft limit
  27  // as a single list that is still over the limit will have to be sent as is since it
  28  // cannot be split further. This limit prevents the framework from creating batches
  29  // so big that sending them causes issues (e.g running into the max size gRPC limit).
  30  var maxStreamSize = uint64(100 << 20) // 100MB
  31  
  32  // Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up
  33  // key-values, batch them up and call Send. Stream does concurrent iteration over many smaller key
  34  // ranges. It does NOT send keys in lexicographical sorted order. To get keys in sorted
  35  // order, use Iterator.
  36  type Stream struct {
  37  	// Prefix to only iterate over certain range of keys. If set to nil (default), Stream would
  38  	// iterate over the entire DB.
  39  	Prefix []byte
  40  
  41  	// Number of goroutines to use for iterating over key ranges. Defaults to 8.
  42  	NumGo int
  43  
  44  	// Badger would produce log entries in Infof to indicate the progress of Stream. LogPrefix can
  45  	// be used to help differentiate them from other activities. Default is "Badger.Stream".
  46  	LogPrefix string
  47  
  48  	// ChooseKey is invoked each time a new key is encountered. Note that this is not called
  49  	// on every version of the value, only the first encountered version (i.e. the highest version
  50  	// of the value a key has). ChooseKey can be left nil to select all keys.
  51  	//
  52  	// Note: Calls to ChooseKey are concurrent.
  53  	ChooseKey func(item *Item) bool
  54  
  55  	// MaxSize is the maximum allowed size of a stream batch. This is a soft limit
  56  	// as a single list that is still over the limit will have to be sent as is since it
  57  	// cannot be split further. This limit prevents the framework from creating batches
  58  	// so big that sending them causes issues (e.g running into the max size gRPC limit).
  59  	// If necessary, set it up before the Stream starts synchronisation
  60  	// This is not a concurrency-safe setting
  61  	MaxSize uint64
  62  
  63  	// KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It
  64  	// is upto the caller to iterate over the versions and generate zero, one or more KVs. It
  65  	// is expected that the user would advance the iterator to go through the versions of the
  66  	// values. However, the user MUST immediately return from this function on the first encounter
  67  	// with a mismatching key. See example usage in ToList function. Can be left nil to use ToList
  68  	// function by default.
  69  	//
  70  	// KeyToList has access to z.Allocator accessible via stream.Allocator(itr.ThreadId). This
  71  	// allocator can be used to allocate KVs, to decrease the memory pressure on Go GC. Stream
  72  	// framework takes care of releasing those resources after calling Send. AllocRef does
  73  	// NOT need to be set in the returned KVList, as Stream framework would ignore that field,
  74  	// instead using the allocator assigned to that thread id.
  75  	//
  76  	// Note: Calls to KeyToList are concurrent.
  77  	KeyToList func(key []byte, itr *Iterator) (*pb.KVList, error)
  78  	// UseKeyToListWithThreadId is used to indicate that KeyToListWithThreadId should be used
  79  	// instead of KeyToList. This is a new api that can be used to figure out parallelism
  80  	// of the stream. Each threadId would be run serially. KeyToList being concurrent makes you
  81  	// take care of concurrency in KeyToList. Here threadId could be used to do some things serially.
  82  	// Once a thread finishes FinishThread() would be called.
  83  	UseKeyToListWithThreadId bool
  84  	KeyToListWithThreadId    func(key []byte, itr *Iterator, threadId int) (*pb.KVList, error)
  85  	FinishThread             func(threadId int) (*pb.KVList, error)
  86  
  87  	// This is the method where Stream sends the final output. All calls to Send are done by a
  88  	// single goroutine, i.e. logic within Send method can expect single threaded execution.
  89  	Send func(buf *z.Buffer) error
  90  
  91  	// Read data above the sinceTs. All keys with version =< sinceTs will be ignored.
  92  	SinceTs      uint64
  93  	readTs       uint64
  94  	db           *DB
  95  	rangeCh      chan keyRange
  96  	kvChan       chan *z.Buffer
  97  	nextStreamId atomic.Uint32
  98  	doneMarkers  bool
  99  	scanned      atomic.Uint64 // used to estimate the ETA for data scan.
 100  	numProducers atomic.Int32
 101  }
 102  
 103  // SendDoneMarkers when true would send out done markers on the stream. False by default.
 104  func (st *Stream) SendDoneMarkers(done bool) {
 105  	st.doneMarkers = done
 106  }
 107  
 108  // ToList is a default implementation of KeyToList. It picks up all valid versions of the key,
 109  // skipping over deleted or expired keys.
 110  func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
 111  	a := itr.Alloc
 112  	ka := a.Copy(key)
 113  
 114  	list := &pb.KVList{}
 115  	for ; itr.Valid(); itr.Next() {
 116  		item := itr.Item()
 117  		if item.IsDeletedOrExpired() {
 118  			break
 119  		}
 120  		if !bytes.Equal(key, item.Key()) {
 121  			// Break out on the first encounter with another key.
 122  			break
 123  		}
 124  
 125  		kv := y.NewKV(a)
 126  		kv.Key = ka
 127  
 128  		if err := item.Value(func(val []byte) error {
 129  			kv.Value = a.Copy(val)
 130  			return nil
 131  
 132  		}); err != nil {
 133  			return nil, err
 134  		}
 135  		kv.Version = item.Version()
 136  		kv.ExpiresAt = item.ExpiresAt()
 137  		kv.UserMeta = a.Copy([]byte{item.UserMeta()})
 138  
 139  		list.Kv = append(list.Kv, kv)
 140  		if st.db.opt.NumVersionsToKeep == 1 {
 141  			break
 142  		}
 143  
 144  		if item.DiscardEarlierVersions() {
 145  			break
 146  		}
 147  	}
 148  	return list, nil
 149  }
 150  
 151  // keyRange is [start, end), including start, excluding end. Do ensure that the start,
 152  // end byte slices are owned by keyRange struct.
 153  func (st *Stream) produceRanges(ctx context.Context) {
 154  	ranges := st.db.Ranges(st.Prefix, st.NumGo)
 155  	y.AssertTrue(len(ranges) > 0)
 156  	y.AssertTrue(ranges[0].left == nil)
 157  	y.AssertTrue(ranges[len(ranges)-1].right == nil)
 158  	st.db.opt.Infof("Number of ranges found: %d\n", len(ranges))
 159  
 160  	// Sort in descending order of size.
 161  	sort.Slice(ranges, func(i, j int) bool {
 162  		return ranges[i].size > ranges[j].size
 163  	})
 164  	for i, r := range ranges {
 165  		st.rangeCh <- *r
 166  		st.db.opt.Infof("Sent range %d for iteration: [%x, %x) of size: %s\n",
 167  			i, r.left, r.right, humanize.IBytes(uint64(r.size)))
 168  	}
 169  	close(st.rangeCh)
 170  }
 171  
 172  // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan.
 173  func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
 174  	st.numProducers.Add(1)
 175  	defer st.numProducers.Add(-1)
 176  
 177  	var txn *Txn
 178  	if st.readTs > 0 {
 179  		txn = st.db.NewTransactionAt(st.readTs, false)
 180  	} else {
 181  		txn = st.db.NewTransaction(false)
 182  	}
 183  	defer txn.Discard()
 184  
 185  	// produceKVs is running iterate serially. So, we can define the outList here.
 186  	outList := z.NewBuffer(2*batchSize, "Stream.ProduceKVs")
 187  	defer func() {
 188  		// The outList variable changes. So, we need to evaluate the variable in the defer. DO NOT
 189  		// call `defer outList.Release()`.
 190  		_ = outList.Release()
 191  	}()
 192  
 193  	iterate := func(kr keyRange) error {
 194  		iterOpts := DefaultIteratorOptions
 195  		iterOpts.AllVersions = true
 196  		iterOpts.Prefix = st.Prefix
 197  		iterOpts.PrefetchValues = true
 198  		iterOpts.SinceTs = st.SinceTs
 199  		itr := txn.NewIterator(iterOpts)
 200  		itr.ThreadId = threadId
 201  		defer itr.Close()
 202  
 203  		itr.Alloc = z.NewAllocator(1<<20, "Stream.Iterate")
 204  		defer itr.Alloc.Release()
 205  
 206  		// This unique stream id is used to identify all the keys from this iteration.
 207  		streamId := st.nextStreamId.Add(1)
 208  		var scanned int
 209  
 210  		sendIt := func() error {
 211  			select {
 212  			case st.kvChan <- outList:
 213  				outList = z.NewBuffer(2*batchSize, "Stream.ProduceKVs")
 214  				st.scanned.Add(uint64(itr.scanned - scanned))
 215  				scanned = itr.scanned
 216  			case <-ctx.Done():
 217  				return ctx.Err()
 218  			}
 219  			return nil
 220  		}
 221  
 222  		var prevKey []byte
 223  		for itr.Seek(kr.left); itr.Valid(); {
 224  			// it.Valid would only return true for keys with the provided Prefix in iterOpts.
 225  			item := itr.Item()
 226  			if bytes.Equal(item.Key(), prevKey) {
 227  				itr.Next()
 228  				continue
 229  			}
 230  			prevKey = append(prevKey[:0], item.Key()...)
 231  
 232  			// Check if we reached the end of the key range.
 233  			if len(kr.right) > 0 && bytes.Compare(item.Key(), kr.right) >= 0 {
 234  				break
 235  			}
 236  
 237  			// Check if we should pick this key.
 238  			if st.ChooseKey != nil && !st.ChooseKey(item) {
 239  				continue
 240  			}
 241  
 242  			// Now convert to key value.
 243  			itr.Alloc.Reset()
 244  			var list *pb.KVList
 245  			var err error
 246  			if st.UseKeyToListWithThreadId {
 247  				list, err = st.KeyToListWithThreadId(item.KeyCopy(nil), itr, threadId)
 248  			} else {
 249  				list, err = st.KeyToList(item.KeyCopy(nil), itr)
 250  			}
 251  			if err != nil {
 252  				st.db.opt.Warningf("While reading key: %x, got error: %v", item.Key(), err)
 253  				continue
 254  			}
 255  			if list == nil || len(list.Kv) == 0 {
 256  				continue
 257  			}
 258  			for _, kv := range list.Kv {
 259  				kv.StreamId = streamId
 260  				KVToBuffer(kv, outList)
 261  				if outList.LenNoPadding() < batchSize {
 262  					continue
 263  				}
 264  				if err := sendIt(); err != nil {
 265  					return err
 266  				}
 267  			}
 268  		}
 269  
 270  		if st.UseKeyToListWithThreadId {
 271  			if kvs, err := st.FinishThread(threadId); err != nil {
 272  				return err
 273  			} else {
 274  				for _, kv := range kvs.Kv {
 275  					kv.StreamId = streamId
 276  					KVToBuffer(kv, outList)
 277  					if outList.LenNoPadding() < batchSize {
 278  						continue
 279  					}
 280  					if err := sendIt(); err != nil {
 281  						return err
 282  					}
 283  				}
 284  			}
 285  		}
 286  		// Mark the stream as done.
 287  		if st.doneMarkers {
 288  			kv := &pb.KV{
 289  				StreamId:   streamId,
 290  				StreamDone: true,
 291  			}
 292  			KVToBuffer(kv, outList)
 293  		}
 294  		return sendIt()
 295  	}
 296  
 297  	for {
 298  		select {
 299  		case kr, ok := <-st.rangeCh:
 300  			if !ok {
 301  				// Done with the keys.
 302  				return nil
 303  			}
 304  			if err := iterate(kr); err != nil {
 305  				return err
 306  			}
 307  		case <-ctx.Done():
 308  			return ctx.Err()
 309  		}
 310  	}
 311  }
 312  
 313  func (st *Stream) streamKVs(ctx context.Context) error {
 314  	onDiskSize, uncompressedSize := st.db.EstimateSize(st.Prefix)
 315  	// Manish has seen uncompressed size to be in 20% error margin.
 316  	uncompressedSize = uint64(float64(uncompressedSize) * 1.2)
 317  	st.db.opt.Infof("%s Streaming about %s of uncompressed data (%s on disk)\n",
 318  		st.LogPrefix, humanize.IBytes(uncompressedSize), humanize.IBytes(onDiskSize))
 319  
 320  	tickerDur := 5 * time.Second
 321  	var bytesSent uint64
 322  	t := time.NewTicker(tickerDur)
 323  	defer t.Stop()
 324  	now := time.Now()
 325  
 326  	sendBatch := func(batch *z.Buffer) error {
 327  		defer func() { _ = batch.Release() }()
 328  		sz := uint64(batch.LenNoPadding())
 329  		if sz == 0 {
 330  			return nil
 331  		}
 332  		bytesSent += sz
 333  		// st.db.opt.Infof("%s Sending batch of size: %s.\n", st.LogPrefix, humanize.IBytes(sz))
 334  		if err := st.Send(batch); err != nil {
 335  			st.db.opt.Warningf("Error while sending: %v\n", err)
 336  			return err
 337  		}
 338  		return nil
 339  	}
 340  
 341  	slurp := func(batch *z.Buffer) error {
 342  	loop:
 343  		for {
 344  			// Send the batch immediately if it already exceeds the maximum allowed size.
 345  			// If the size of the batch exceeds maxStreamSize, break from the loop to
 346  			// avoid creating a batch that is so big that certain limits are reached.
 347  			if uint64(batch.LenNoPadding()) > st.MaxSize {
 348  				break loop
 349  			}
 350  			select {
 351  			case kvs, ok := <-st.kvChan:
 352  				if !ok {
 353  					break loop
 354  				}
 355  				y.AssertTrue(kvs != nil)
 356  				y.Check2(batch.Write(kvs.Bytes()))
 357  				y.Check(kvs.Release())
 358  
 359  			default:
 360  				break loop
 361  			}
 362  		}
 363  		return sendBatch(batch)
 364  	} // end of slurp.
 365  
 366  	writeRate := y.NewRateMonitor(20)
 367  	scanRate := y.NewRateMonitor(20)
 368  outer:
 369  	for {
 370  		var batch *z.Buffer
 371  		select {
 372  		case <-ctx.Done():
 373  			return ctx.Err()
 374  
 375  		case <-t.C:
 376  			// Instead of calculating speed over the entire lifetime, we average the speed over
 377  			// ticker duration.
 378  			writeRate.Capture(bytesSent)
 379  			scanned := st.scanned.Load()
 380  			scanRate.Capture(scanned)
 381  			numProducers := st.numProducers.Load()
 382  
 383  			st.db.opt.Infof("%s [%s] Scan (%d): ~%s/%s at %s/sec. Sent: %s at %s/sec."+
 384  				" jemalloc: %s\n",
 385  				st.LogPrefix, y.FixedDuration(time.Since(now)), numProducers,
 386  				y.IBytesToString(scanned, 1), humanize.IBytes(uncompressedSize),
 387  				humanize.IBytes(scanRate.Rate()),
 388  				y.IBytesToString(bytesSent, 1), humanize.IBytes(writeRate.Rate()),
 389  				humanize.IBytes(uint64(z.NumAllocBytes())))
 390  
 391  		case kvs, ok := <-st.kvChan:
 392  			if !ok {
 393  				break outer
 394  			}
 395  			y.AssertTrue(kvs != nil)
 396  			batch = kvs
 397  
 398  			// Otherwise, slurp more keys into this batch.
 399  			if err := slurp(batch); err != nil {
 400  				return err
 401  			}
 402  		}
 403  	}
 404  
 405  	st.db.opt.Infof("%s Sent data of size %s\n", st.LogPrefix, humanize.IBytes(bytesSent))
 406  	return nil
 407  }
 408  
 409  // Orchestrate runs Stream. It picks up ranges from the SSTables, then runs NumGo number of
 410  // goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single
 411  // goroutine to pick these lists, batch them up further and send to Output.Send. Orchestrate also
 412  // spits logs out to Infof, using provided LogPrefix. Note that all calls to Output.Send
 413  // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and
 414  // return that error. Orchestrate can be called multiple times, but in serial order.
 415  func (st *Stream) Orchestrate(ctx context.Context) error {
 416  	ctx, cancel := context.WithCancel(ctx)
 417  	defer cancel()
 418  	st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists.
 419  
 420  	// kvChan should only have a small capacity to ensure that we don't buffer up too much data if
 421  	// sending is slow. Page size is set to 4MB, which is used to lazily cap the size of each
 422  	// KVList. To get 128MB buffer, we can set the channel size to 32.
 423  	st.kvChan = make(chan *z.Buffer, 32)
 424  
 425  	if st.KeyToList == nil {
 426  		st.KeyToList = st.ToList
 427  	}
 428  
 429  	// Picks up ranges from Badger, and sends them to rangeCh.
 430  	go st.produceRanges(ctx)
 431  
 432  	errCh := make(chan error, st.NumGo) // Stores error by consumeKeys.
 433  	var wg sync.WaitGroup
 434  	for i := 0; i < st.NumGo; i++ {
 435  		wg.Add(1)
 436  
 437  		go func(threadId int) {
 438  			defer wg.Done()
 439  			// Picks up ranges from rangeCh, generates KV lists, and sends them to kvChan.
 440  			if err := st.produceKVs(ctx, threadId); err != nil {
 441  				select {
 442  				case errCh <- err:
 443  				default:
 444  				}
 445  			}
 446  		}(i)
 447  	}
 448  
 449  	// Pick up key-values from kvChan and send to stream.
 450  	kvErr := make(chan error, 1)
 451  	go func() {
 452  		// Picks up KV lists from kvChan, and sends them to Output.
 453  		err := st.streamKVs(ctx)
 454  		if err != nil {
 455  			cancel() // Stop all the go routines.
 456  		}
 457  		kvErr <- err
 458  	}()
 459  	wg.Wait()        // Wait for produceKVs to be over.
 460  	close(st.kvChan) // Now we can close kvChan.
 461  	defer func() {
 462  		// If due to some error, we have buffers left in kvChan, we should release them.
 463  		for buf := range st.kvChan {
 464  			_ = buf.Release()
 465  		}
 466  	}()
 467  
 468  	select {
 469  	case err := <-errCh: // Check error from produceKVs.
 470  		return err
 471  	default:
 472  	}
 473  
 474  	// Wait for key streaming to be over.
 475  	err := <-kvErr
 476  	return err
 477  }
 478  
 479  func (db *DB) newStream() *Stream {
 480  	return &Stream{
 481  		db:        db,
 482  		NumGo:     db.opt.NumGoroutines,
 483  		LogPrefix: "Badger.Stream",
 484  		MaxSize:   maxStreamSize,
 485  	}
 486  }
 487  
 488  // NewStream creates a new Stream.
 489  func (db *DB) NewStream() *Stream {
 490  	if db.opt.managedTxns {
 491  		panic("This API can not be called in managed mode.")
 492  	}
 493  	return db.newStream()
 494  }
 495  
 496  // NewStreamAt creates a new Stream at a particular timestamp. Should only be used with managed DB.
 497  func (db *DB) NewStreamAt(readTs uint64) *Stream {
 498  	if !db.opt.managedTxns {
 499  		panic("This API can only be called in managed mode.")
 500  	}
 501  	stream := db.newStream()
 502  	stream.readTs = readTs
 503  	return stream
 504  }
 505  
 506  func BufferToKVList(buf *z.Buffer) (*pb.KVList, error) {
 507  	var list pb.KVList
 508  	err := buf.SliceIterate(func(s []byte) error {
 509  		kv := new(pb.KV)
 510  		if err := proto.Unmarshal(s, kv); err != nil {
 511  			return err
 512  		}
 513  		list.Kv = append(list.Kv, kv)
 514  		return nil
 515  	})
 516  	return &list, err
 517  }
 518  
 519  func KVToBuffer(kv *pb.KV, buf *z.Buffer) {
 520  	in := buf.SliceAllocate(proto.Size(kv))[:0]
 521  	_, err := proto.MarshalOptions{}.MarshalAppend(in, kv)
 522  	y.AssertTrue(err == nil)
 523  }
 524