manifest.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package badger
   7  
   8  import (
   9  	"bufio"
  10  	"bytes"
  11  	"encoding/binary"
  12  	"errors"
  13  	"fmt"
  14  	"hash/crc32"
  15  	"io"
  16  	"math"
  17  	"os"
  18  	"path/filepath"
  19  	"sync"
  20  
  21  	"google.golang.org/protobuf/proto"
  22  
  23  	"github.com/dgraph-io/badger/v4/options"
  24  	"github.com/dgraph-io/badger/v4/pb"
  25  	"github.com/dgraph-io/badger/v4/y"
  26  )
  27  
  28  // Manifest represents the contents of the MANIFEST file in a Badger store.
  29  //
  30  // The MANIFEST file describes the startup state of the db -- all LSM files and what level they're
  31  // at.
  32  //
  33  // It consists of a sequence of ManifestChangeSet objects.  Each of these is treated atomically,
  34  // and contains a sequence of ManifestChange's (file creations/deletions) which we use to
  35  // reconstruct the manifest at startup.
  36  type Manifest struct {
  37  	Levels []levelManifest
  38  	Tables map[uint64]TableManifest
  39  
  40  	// Contains total number of creation and deletion changes in the manifest -- used to compute
  41  	// whether it'd be useful to rewrite the manifest.
  42  	Creations int
  43  	Deletions int
  44  }
  45  
  46  func createManifest() Manifest {
  47  	levels := make([]levelManifest, 0)
  48  	return Manifest{
  49  		Levels: levels,
  50  		Tables: make(map[uint64]TableManifest),
  51  	}
  52  }
  53  
  54  // levelManifest contains information about LSM tree levels
  55  // in the MANIFEST file.
  56  type levelManifest struct {
  57  	Tables map[uint64]struct{} // Set of table id's
  58  }
  59  
  60  // TableManifest contains information about a specific table
  61  // in the LSM tree.
  62  type TableManifest struct {
  63  	Level       uint8
  64  	KeyID       uint64
  65  	Compression options.CompressionType
  66  }
  67  
  68  // manifestFile holds the file pointer (and other info) about the manifest file, which is a log
  69  // file we append to.
  70  type manifestFile struct {
  71  	fp        *os.File
  72  	directory string
  73  
  74  	// The external magic number used by the application running badger.
  75  	externalMagic uint16
  76  
  77  	// We make this configurable so that unit tests can hit rewrite() code quickly
  78  	deletionsRewriteThreshold int
  79  
  80  	// Guards appends, which includes access to the manifest field.
  81  	appendLock sync.Mutex
  82  
  83  	// Used to track the current state of the manifest, used when rewriting.
  84  	manifest Manifest
  85  
  86  	// Used to indicate if badger was opened in InMemory mode.
  87  	inMemory bool
  88  }
  89  
  90  const (
  91  	// ManifestFilename is the filename for the manifest file.
  92  	ManifestFilename                  = "MANIFEST"
  93  	manifestRewriteFilename           = "MANIFEST-REWRITE"
  94  	manifestDeletionsRewriteThreshold = 10000
  95  	manifestDeletionsRatio            = 10
  96  )
  97  
  98  // asChanges returns a sequence of changes that could be used to recreate the Manifest in its
  99  // present state.
 100  func (m *Manifest) asChanges() []*pb.ManifestChange {
 101  	changes := make([]*pb.ManifestChange, 0, len(m.Tables))
 102  	for id, tm := range m.Tables {
 103  		changes = append(changes, newCreateChange(id, int(tm.Level), tm.KeyID, tm.Compression))
 104  	}
 105  	return changes
 106  }
 107  
 108  func (m *Manifest) clone(opt Options) Manifest {
 109  	changeSet := pb.ManifestChangeSet{Changes: m.asChanges()}
 110  	ret := createManifest()
 111  	y.Check(applyChangeSet(&ret, &changeSet, opt))
 112  	return ret
 113  }
 114  
 115  // openOrCreateManifestFile opens a Badger manifest file if it exists, or creates one if
 116  // doesn’t exists.
 117  func openOrCreateManifestFile(opt Options) (
 118  	ret *manifestFile, result Manifest, err error) {
 119  	if opt.InMemory {
 120  		return &manifestFile{inMemory: true}, Manifest{}, nil
 121  	}
 122  	return helpOpenOrCreateManifestFile(opt.Dir, opt.ReadOnly, opt.ExternalMagicVersion,
 123  		manifestDeletionsRewriteThreshold, opt)
 124  }
 125  
 126  func helpOpenOrCreateManifestFile(dir string, readOnly bool, extMagic uint16,
 127  	deletionsThreshold int, opt Options) (*manifestFile, Manifest, error) {
 128  
 129  	path := filepath.Join(dir, ManifestFilename)
 130  	var flags y.Flags
 131  	if readOnly {
 132  		flags |= y.ReadOnly
 133  	}
 134  	fp, err := y.OpenExistingFile(path, flags) // We explicitly sync in addChanges, outside the lock.
 135  	if err != nil {
 136  		if !os.IsNotExist(err) {
 137  			return nil, Manifest{}, err
 138  		}
 139  		if readOnly {
 140  			return nil, Manifest{}, fmt.Errorf("no manifest found, required for read-only db")
 141  		}
 142  		m := createManifest()
 143  		fp, netCreations, err := helpRewrite(dir, &m, extMagic)
 144  		if err != nil {
 145  			return nil, Manifest{}, err
 146  		}
 147  		y.AssertTrue(netCreations == 0)
 148  		mf := &manifestFile{
 149  			fp:                        fp,
 150  			directory:                 dir,
 151  			externalMagic:             extMagic,
 152  			manifest:                  m.clone(opt),
 153  			deletionsRewriteThreshold: deletionsThreshold,
 154  		}
 155  		return mf, m, nil
 156  	}
 157  
 158  	manifest, truncOffset, err := ReplayManifestFile(fp, extMagic, opt)
 159  	if err != nil {
 160  		_ = fp.Close()
 161  		return nil, Manifest{}, err
 162  	}
 163  
 164  	if !readOnly {
 165  		// Truncate file so we don't have a half-written entry at the end.
 166  		if err := fp.Truncate(truncOffset); err != nil {
 167  			_ = fp.Close()
 168  			return nil, Manifest{}, err
 169  		}
 170  	}
 171  	if _, err = fp.Seek(0, io.SeekEnd); err != nil {
 172  		_ = fp.Close()
 173  		return nil, Manifest{}, err
 174  	}
 175  
 176  	mf := &manifestFile{
 177  		fp:                        fp,
 178  		directory:                 dir,
 179  		externalMagic:             extMagic,
 180  		manifest:                  manifest.clone(opt),
 181  		deletionsRewriteThreshold: deletionsThreshold,
 182  	}
 183  	return mf, manifest, nil
 184  }
 185  
 186  func (mf *manifestFile) close() error {
 187  	if mf.inMemory {
 188  		return nil
 189  	}
 190  	return mf.fp.Close()
 191  }
 192  
 193  // addChanges writes a batch of changes, atomically, to the file.  By "atomically" that means when
 194  // we replay the MANIFEST file, we'll either replay all the changes or none of them.  (The truth of
 195  // this depends on the filesystem -- some might append garbage data if a system crash happens at
 196  // the wrong time.)
 197  func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange, opt Options) error {
 198  	if mf.inMemory {
 199  		return nil
 200  	}
 201  	changes := pb.ManifestChangeSet{Changes: changesParam}
 202  	buf, err := proto.Marshal(&changes)
 203  	if err != nil {
 204  		return err
 205  	}
 206  
 207  	// Maybe we could use O_APPEND instead (on certain file systems)
 208  	mf.appendLock.Lock()
 209  	defer mf.appendLock.Unlock()
 210  	if err := applyChangeSet(&mf.manifest, &changes, opt); err != nil {
 211  		return err
 212  	}
 213  	// Rewrite manifest if it'd shrink by 1/10 and it's big enough to care
 214  	if mf.manifest.Deletions > mf.deletionsRewriteThreshold &&
 215  		mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) {
 216  		if err := mf.rewrite(); err != nil {
 217  			return err
 218  		}
 219  	} else {
 220  		var lenCrcBuf [8]byte
 221  		binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(buf)))
 222  		binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(buf, y.CastagnoliCrcTable))
 223  		buf = append(lenCrcBuf[:], buf...)
 224  		if _, err := mf.fp.Write(buf); err != nil {
 225  			return err
 226  		}
 227  	}
 228  
 229  	return syncFunc(mf.fp)
 230  }
 231  
 232  // this function is saved here to allow injection of fake filesystem latency at test time.
 233  var syncFunc = func(f *os.File) error { return f.Sync() }
 234  
 235  // Has to be 4 bytes.  The value can never change, ever, anyway.
 236  var magicText = [4]byte{'B', 'd', 'g', 'r'}
 237  
 238  // The magic version number. It is allocated 2 bytes, so it's value must be <= math.MaxUint16
 239  const badgerMagicVersion = 8
 240  
 241  func helpRewrite(dir string, m *Manifest, extMagic uint16) (*os.File, int, error) {
 242  	rewritePath := filepath.Join(dir, manifestRewriteFilename)
 243  	// We explicitly sync.
 244  	fp, err := y.OpenTruncFile(rewritePath, false)
 245  	if err != nil {
 246  		return nil, 0, err
 247  	}
 248  
 249  	// magic bytes are structured as
 250  	// +---------------------+-------------------------+-----------------------+
 251  	// | magicText (4 bytes) | externalMagic (2 bytes) | badgerMagic (2 bytes) |
 252  	// +---------------------+-------------------------+-----------------------+
 253  
 254  	y.AssertTrue(badgerMagicVersion <= math.MaxUint16)
 255  	buf := make([]byte, 8)
 256  	copy(buf[0:4], magicText[:])
 257  	binary.BigEndian.PutUint16(buf[4:6], extMagic)
 258  	binary.BigEndian.PutUint16(buf[6:8], badgerMagicVersion)
 259  
 260  	netCreations := len(m.Tables)
 261  	changes := m.asChanges()
 262  	set := pb.ManifestChangeSet{Changes: changes}
 263  
 264  	changeBuf, err := proto.Marshal(&set)
 265  	if err != nil {
 266  		fp.Close()
 267  		return nil, 0, err
 268  	}
 269  	var lenCrcBuf [8]byte
 270  	binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(changeBuf)))
 271  	binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(changeBuf, y.CastagnoliCrcTable))
 272  	buf = append(buf, lenCrcBuf[:]...)
 273  	buf = append(buf, changeBuf...)
 274  	if _, err := fp.Write(buf); err != nil {
 275  		fp.Close()
 276  		return nil, 0, err
 277  	}
 278  	if err := fp.Sync(); err != nil {
 279  		fp.Close()
 280  		return nil, 0, err
 281  	}
 282  
 283  	// In Windows the files should be closed before doing a Rename.
 284  	if err = fp.Close(); err != nil {
 285  		return nil, 0, err
 286  	}
 287  	manifestPath := filepath.Join(dir, ManifestFilename)
 288  	if err := os.Rename(rewritePath, manifestPath); err != nil {
 289  		return nil, 0, err
 290  	}
 291  	fp, err = y.OpenExistingFile(manifestPath, 0)
 292  	if err != nil {
 293  		return nil, 0, err
 294  	}
 295  	if _, err := fp.Seek(0, io.SeekEnd); err != nil {
 296  		fp.Close()
 297  		return nil, 0, err
 298  	}
 299  	if err := syncDir(dir); err != nil {
 300  		fp.Close()
 301  		return nil, 0, err
 302  	}
 303  
 304  	return fp, netCreations, nil
 305  }
 306  
 307  // Must be called while appendLock is held.
 308  func (mf *manifestFile) rewrite() error {
 309  	// In Windows the files should be closed before doing a Rename.
 310  	if err := mf.fp.Close(); err != nil {
 311  		return err
 312  	}
 313  	fp, netCreations, err := helpRewrite(mf.directory, &mf.manifest, mf.externalMagic)
 314  	if err != nil {
 315  		return err
 316  	}
 317  	mf.fp = fp
 318  	mf.manifest.Creations = netCreations
 319  	mf.manifest.Deletions = 0
 320  
 321  	return nil
 322  }
 323  
 324  type countingReader struct {
 325  	wrapped *bufio.Reader
 326  	count   int64
 327  }
 328  
 329  func (r *countingReader) Read(p []byte) (n int, err error) {
 330  	n, err = r.wrapped.Read(p)
 331  	r.count += int64(n)
 332  	return
 333  }
 334  
 335  func (r *countingReader) ReadByte() (b byte, err error) {
 336  	b, err = r.wrapped.ReadByte()
 337  	if err == nil {
 338  		r.count++
 339  	}
 340  	return
 341  }
 342  
 343  var (
 344  	errBadMagic    = errors.New("manifest has bad magic")
 345  	errBadChecksum = errors.New("manifest has checksum mismatch")
 346  )
 347  
 348  // ReplayManifestFile reads the manifest file and constructs two manifest objects.  (We need one
 349  // immutable copy and one mutable copy of the manifest.  Easiest way is to construct two of them.)
 350  // Also, returns the last offset after a completely read manifest entry -- the file must be
 351  // truncated at that point before further appends are made (if there is a partial entry after
 352  // that).  In normal conditions, truncOffset is the file size.
 353  func ReplayManifestFile(fp *os.File, extMagic uint16, opt Options) (Manifest, int64, error) {
 354  	r := countingReader{wrapped: bufio.NewReader(fp)}
 355  
 356  	var magicBuf [8]byte
 357  	if _, err := io.ReadFull(&r, magicBuf[:]); err != nil {
 358  		return Manifest{}, 0, errBadMagic
 359  	}
 360  	if !bytes.Equal(magicBuf[0:4], magicText[:]) {
 361  		return Manifest{}, 0, errBadMagic
 362  	}
 363  
 364  	extVersion := y.BytesToU16(magicBuf[4:6])
 365  	version := y.BytesToU16(magicBuf[6:8])
 366  
 367  	if version != badgerMagicVersion {
 368  		return Manifest{}, 0,
 369  			//nolint:lll
 370  			fmt.Errorf("manifest has unsupported version: %d (we support %d).\n"+
 371  				"Please see https://docs.hypermode.com/badger/troubleshooting#i-see-manifest-has-unsupported-version-x-we-support-y-error"+
 372  				" on how to fix this.",
 373  				version, badgerMagicVersion)
 374  	}
 375  	if extVersion != extMagic {
 376  		return Manifest{}, 0,
 377  			fmt.Errorf("Cannot open DB because the external magic number doesn't match. "+
 378  				"Expected: %d, version present in manifest: %d\n", extMagic, extVersion)
 379  	}
 380  
 381  	stat, err := fp.Stat()
 382  	if err != nil {
 383  		return Manifest{}, 0, err
 384  	}
 385  
 386  	build := createManifest()
 387  	var offset int64
 388  	for {
 389  		offset = r.count
 390  		var lenCrcBuf [8]byte
 391  		_, err := io.ReadFull(&r, lenCrcBuf[:])
 392  		if err != nil {
 393  			if err == io.EOF || err == io.ErrUnexpectedEOF {
 394  				break
 395  			}
 396  			return Manifest{}, 0, err
 397  		}
 398  		length := y.BytesToU32(lenCrcBuf[0:4])
 399  		// Sanity check to ensure we don't over-allocate memory.
 400  		if length > uint32(stat.Size()) {
 401  			return Manifest{}, 0, fmt.Errorf(
 402  				"Buffer length: %d greater than file size: %d. Manifest file might be corrupted",
 403  				length, stat.Size())
 404  		}
 405  		var buf = make([]byte, length)
 406  		if _, err := io.ReadFull(&r, buf); err != nil {
 407  			if err == io.EOF || err == io.ErrUnexpectedEOF {
 408  				break
 409  			}
 410  			return Manifest{}, 0, err
 411  		}
 412  		if crc32.Checksum(buf, y.CastagnoliCrcTable) != y.BytesToU32(lenCrcBuf[4:8]) {
 413  			return Manifest{}, 0, errBadChecksum
 414  		}
 415  
 416  		var changeSet pb.ManifestChangeSet
 417  		if err := proto.Unmarshal(buf, &changeSet); err != nil {
 418  			return Manifest{}, 0, err
 419  		}
 420  
 421  		if err := applyChangeSet(&build, &changeSet, opt); err != nil {
 422  			return Manifest{}, 0, err
 423  		}
 424  	}
 425  
 426  	return build, offset, nil
 427  }
 428  
 429  func applyManifestChange(build *Manifest, tc *pb.ManifestChange, opt Options) error {
 430  	switch tc.Op {
 431  	case pb.ManifestChange_CREATE:
 432  		if _, ok := build.Tables[tc.Id]; ok {
 433  			return fmt.Errorf("MANIFEST invalid, table %d exists", tc.Id)
 434  		}
 435  		build.Tables[tc.Id] = TableManifest{
 436  			Level:       uint8(tc.Level),
 437  			KeyID:       tc.KeyId,
 438  			Compression: options.CompressionType(tc.Compression),
 439  		}
 440  		for len(build.Levels) <= int(tc.Level) {
 441  			build.Levels = append(build.Levels, levelManifest{make(map[uint64]struct{})})
 442  		}
 443  		build.Levels[tc.Level].Tables[tc.Id] = struct{}{}
 444  		build.Creations++
 445  	case pb.ManifestChange_DELETE:
 446  		tm, ok := build.Tables[tc.Id]
 447  		if !ok {
 448  			opt.Warningf("MANIFEST delete: table %d has already been removed", tc.Id)
 449  			for _, level := range build.Levels {
 450  				delete(level.Tables, tc.Id)
 451  			}
 452  		} else {
 453  			delete(build.Levels[tm.Level].Tables, tc.Id)
 454  			delete(build.Tables, tc.Id)
 455  		}
 456  		build.Deletions++
 457  	default:
 458  		return fmt.Errorf("MANIFEST file has invalid manifestChange op")
 459  	}
 460  	return nil
 461  }
 462  
 463  // This is not a "recoverable" error -- opening the KV store fails because the MANIFEST file is
 464  // just plain broken.
 465  func applyChangeSet(build *Manifest, changeSet *pb.ManifestChangeSet, opt Options) error {
 466  	for _, change := range changeSet.Changes {
 467  		if err := applyManifestChange(build, change, opt); err != nil {
 468  			return err
 469  		}
 470  	}
 471  	return nil
 472  }
 473  
 474  func newCreateChange(
 475  	id uint64, level int, keyID uint64, c options.CompressionType) *pb.ManifestChange {
 476  	return &pb.ManifestChange{
 477  		Id:    id,
 478  		Op:    pb.ManifestChange_CREATE,
 479  		Level: uint32(level),
 480  		KeyId: keyID,
 481  		// Hard coding it, since we're supporting only AES for now.
 482  		EncryptionAlgo: pb.EncryptionAlgo_aes,
 483  		Compression:    uint32(c),
 484  	}
 485  }
 486  
 487  func newDeleteChange(id uint64) *pb.ManifestChange {
 488  	return &pb.ManifestChange{
 489  		Id: id,
 490  		Op: pb.ManifestChange_DELETE,
 491  	}
 492  }
 493