backup.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package badger
   7  
   8  import (
   9  	"bufio"
  10  	"bytes"
  11  	"context"
  12  	"encoding/binary"
  13  	"fmt"
  14  	"io"
  15  
  16  	"google.golang.org/protobuf/proto"
  17  
  18  	"github.com/dgraph-io/badger/v4/pb"
  19  	"github.com/dgraph-io/badger/v4/y"
  20  	"github.com/dgraph-io/ristretto/v2/z"
  21  )
  22  
  23  // flushThreshold determines when a buffer will be flushed. When performing a
  24  // backup/restore, the entries will be batched up until the total size of batch
  25  // is more than flushThreshold or entry size (without the value size) is more
  26  // than the maxBatchSize.
  27  const flushThreshold = 100 << 20
  28  
  29  // Backup dumps a protobuf-encoded list of all entries in the database into the
  30  // given writer, that are newer than or equal to the specified version. It
  31  // returns a timestamp (version) indicating the version of last entry that is
  32  // dumped, which after incrementing by 1 can be passed into later invocation to
  33  // generate incremental backup of entries that have been added/modified since
  34  // the last invocation of DB.Backup().
  35  // DB.Backup is a wrapper function over Stream.Backup to generate full and
  36  // incremental backups of the DB. For more control over how many goroutines are
  37  // used to generate the backup, or if you wish to backup only a certain range
  38  // of keys, use Stream.Backup directly.
  39  func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) {
  40  	stream := db.NewStream()
  41  	stream.LogPrefix = "DB.Backup"
  42  	stream.SinceTs = since
  43  	return stream.Backup(w, since)
  44  }
  45  
  46  // Backup dumps a protobuf-encoded list of all entries in the database into the
  47  // given writer, that are newer than or equal to the specified version. It returns a
  48  // timestamp(version) indicating the version of last entry that was dumped, which
  49  // after incrementing by 1 can be passed into a later invocation to generate an
  50  // incremental dump of entries that have been added/modified since the last
  51  // invocation of Stream.Backup().
  52  //
  53  // This can be used to backup the data in a database at a given point in time.
  54  func (stream *Stream) Backup(w io.Writer, since uint64) (uint64, error) {
  55  	stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) {
  56  		list := &pb.KVList{}
  57  		a := itr.Alloc
  58  		for ; itr.Valid(); itr.Next() {
  59  			item := itr.Item()
  60  			if !bytes.Equal(item.Key(), key) {
  61  				return list, nil
  62  			}
  63  			if item.Version() < since {
  64  				return nil, fmt.Errorf("Backup: Item Version: %d less than sinceTs: %d",
  65  					item.Version(), since)
  66  			}
  67  
  68  			var valCopy []byte
  69  			if !item.IsDeletedOrExpired() {
  70  				// No need to copy value, if item is deleted or expired.
  71  				err := item.Value(func(val []byte) error {
  72  					valCopy = a.Copy(val)
  73  					return nil
  74  				})
  75  				if err != nil {
  76  					stream.db.opt.Errorf("Key [%x, %d]. Error while fetching value [%v]\n",
  77  						item.Key(), item.Version(), err)
  78  					return nil, err
  79  				}
  80  			}
  81  
  82  			// clear txn bits
  83  			meta := item.meta &^ (bitTxn | bitFinTxn)
  84  			kv := y.NewKV(a)
  85  			*kv = pb.KV{
  86  				Key:       a.Copy(item.Key()),
  87  				Value:     valCopy,
  88  				UserMeta:  a.Copy([]byte{item.UserMeta()}),
  89  				Version:   item.Version(),
  90  				ExpiresAt: item.ExpiresAt(),
  91  				Meta:      a.Copy([]byte{meta}),
  92  			}
  93  			list.Kv = append(list.Kv, kv)
  94  
  95  			switch {
  96  			case item.DiscardEarlierVersions():
  97  				// If we need to discard earlier versions of this item, add a delete
  98  				// marker just below the current version.
  99  				list.Kv = append(list.Kv, &pb.KV{
 100  					Key:     item.KeyCopy(nil),
 101  					Version: item.Version() - 1,
 102  					Meta:    []byte{bitDelete},
 103  				})
 104  				return list, nil
 105  
 106  			case item.IsDeletedOrExpired():
 107  				return list, nil
 108  			}
 109  		}
 110  		return list, nil
 111  	}
 112  
 113  	var maxVersion uint64
 114  	stream.Send = func(buf *z.Buffer) error {
 115  		list, err := BufferToKVList(buf)
 116  		if err != nil {
 117  			return err
 118  		}
 119  		out := list.Kv[:0]
 120  		for _, kv := range list.Kv {
 121  			if maxVersion < kv.Version {
 122  				maxVersion = kv.Version
 123  			}
 124  			if !kv.StreamDone {
 125  				// Don't pick stream done changes.
 126  				out = append(out, kv)
 127  			}
 128  		}
 129  		list.Kv = out
 130  		return writeTo(list, w)
 131  	}
 132  
 133  	if err := stream.Orchestrate(context.Background()); err != nil {
 134  		return 0, err
 135  	}
 136  	return maxVersion, nil
 137  }
 138  
 139  func writeTo(list *pb.KVList, w io.Writer) error {
 140  	if err := binary.Write(w, binary.LittleEndian, uint64(proto.Size(list))); err != nil {
 141  		return err
 142  	}
 143  	buf, err := proto.Marshal(list)
 144  	if err != nil {
 145  		return err
 146  	}
 147  	_, err = w.Write(buf)
 148  	return err
 149  }
 150  
 151  // KVLoader is used to write KVList objects in to badger. It can be used to restore a backup.
 152  type KVLoader struct {
 153  	db          *DB
 154  	throttle    *y.Throttle
 155  	entries     []*Entry
 156  	entriesSize int64
 157  	totalSize   int64
 158  }
 159  
 160  // NewKVLoader returns a new instance of KVLoader.
 161  func (db *DB) NewKVLoader(maxPendingWrites int) *KVLoader {
 162  	return &KVLoader{
 163  		db:       db,
 164  		throttle: y.NewThrottle(maxPendingWrites),
 165  		entries:  make([]*Entry, 0, db.opt.maxBatchCount),
 166  	}
 167  }
 168  
 169  // Set writes the key-value pair to the database.
 170  func (l *KVLoader) Set(kv *pb.KV) error {
 171  	var userMeta, meta byte
 172  	if len(kv.UserMeta) > 0 {
 173  		userMeta = kv.UserMeta[0]
 174  	}
 175  	if len(kv.Meta) > 0 {
 176  		meta = kv.Meta[0]
 177  	}
 178  	e := &Entry{
 179  		Key:       y.KeyWithTs(kv.Key, kv.Version),
 180  		Value:     kv.Value,
 181  		UserMeta:  userMeta,
 182  		ExpiresAt: kv.ExpiresAt,
 183  		meta:      meta,
 184  	}
 185  	estimatedSize := e.estimateSizeAndSetThreshold(l.db.valueThreshold())
 186  	// Flush entries if inserting the next entry would overflow the transactional limits.
 187  	if int64(len(l.entries))+1 >= l.db.opt.maxBatchCount ||
 188  		l.entriesSize+estimatedSize >= l.db.opt.maxBatchSize ||
 189  		l.totalSize >= flushThreshold {
 190  		if err := l.send(); err != nil {
 191  			return err
 192  		}
 193  	}
 194  	l.entries = append(l.entries, e)
 195  	l.entriesSize += estimatedSize
 196  	l.totalSize += estimatedSize + int64(len(e.Value))
 197  	return nil
 198  }
 199  
 200  func (l *KVLoader) send() error {
 201  	if err := l.throttle.Do(); err != nil {
 202  		return err
 203  	}
 204  	if err := l.db.batchSetAsync(l.entries, func(err error) {
 205  		l.throttle.Done(err)
 206  	}); err != nil {
 207  		return err
 208  	}
 209  
 210  	l.entries = make([]*Entry, 0, l.db.opt.maxBatchCount)
 211  	l.entriesSize = 0
 212  	l.totalSize = 0
 213  	return nil
 214  }
 215  
 216  // Finish is meant to be called after all the key-value pairs have been loaded.
 217  func (l *KVLoader) Finish() error {
 218  	if len(l.entries) > 0 {
 219  		if err := l.send(); err != nil {
 220  			return err
 221  		}
 222  	}
 223  	return l.throttle.Finish()
 224  }
 225  
 226  // Load reads a protobuf-encoded list of all entries from a reader and writes
 227  // them to the database. This can be used to restore the database from a backup
 228  // made by calling DB.Backup(). If more complex logic is needed to restore a badger
 229  // backup, the KVLoader interface should be used instead.
 230  //
 231  // DB.Load() should be called on a database that is not running any other
 232  // concurrent transactions while it is running.
 233  func (db *DB) Load(r io.Reader, maxPendingWrites int) error {
 234  	br := bufio.NewReaderSize(r, 16<<10)
 235  	unmarshalBuf := make([]byte, 1<<10)
 236  
 237  	ldr := db.NewKVLoader(maxPendingWrites)
 238  	for {
 239  		var sz uint64
 240  		err := binary.Read(br, binary.LittleEndian, &sz)
 241  		if err == io.EOF {
 242  			break
 243  		} else if err != nil {
 244  			return err
 245  		}
 246  
 247  		if cap(unmarshalBuf) < int(sz) {
 248  			unmarshalBuf = make([]byte, sz)
 249  		}
 250  
 251  		if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil {
 252  			return err
 253  		}
 254  
 255  		list := &pb.KVList{}
 256  		if err := proto.Unmarshal(unmarshalBuf[:sz], list); err != nil {
 257  			return err
 258  		}
 259  
 260  		for _, kv := range list.Kv {
 261  			if err := ldr.Set(kv); err != nil {
 262  				return err
 263  			}
 264  
 265  			// Update nextTxnTs, memtable stores this
 266  			// timestamp in badger head when flushed.
 267  			if kv.Version >= db.orc.nextTxnTs {
 268  				db.orc.nextTxnTs = kv.Version + 1
 269  			}
 270  		}
 271  	}
 272  
 273  	if err := ldr.Finish(); err != nil {
 274  		return err
 275  	}
 276  	db.orc.txnMark.Done(db.orc.nextTxnTs - 1)
 277  	return nil
 278  }
 279