manifest.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package badger
7
8 import (
9 "bufio"
10 "bytes"
11 "encoding/binary"
12 "errors"
13 "fmt"
14 "hash/crc32"
15 "io"
16 "math"
17 "os"
18 "path/filepath"
19 "sync"
20
21 "google.golang.org/protobuf/proto"
22
23 "github.com/dgraph-io/badger/v4/options"
24 "github.com/dgraph-io/badger/v4/pb"
25 "github.com/dgraph-io/badger/v4/y"
26 )
27
28 // Manifest represents the contents of the MANIFEST file in a Badger store.
29 //
30 // The MANIFEST file describes the startup state of the db -- all LSM files and what level they're
31 // at.
32 //
33 // It consists of a sequence of ManifestChangeSet objects. Each of these is treated atomically,
34 // and contains a sequence of ManifestChange's (file creations/deletions) which we use to
35 // reconstruct the manifest at startup.
36 type Manifest struct {
37 Levels []levelManifest
38 Tables map[uint64]TableManifest
39
40 // Contains total number of creation and deletion changes in the manifest -- used to compute
41 // whether it'd be useful to rewrite the manifest.
42 Creations int
43 Deletions int
44 }
45
46 func createManifest() Manifest {
47 levels := make([]levelManifest, 0)
48 return Manifest{
49 Levels: levels,
50 Tables: make(map[uint64]TableManifest),
51 }
52 }
53
54 // levelManifest contains information about LSM tree levels
55 // in the MANIFEST file.
56 type levelManifest struct {
57 Tables map[uint64]struct{} // Set of table id's
58 }
59
60 // TableManifest contains information about a specific table
61 // in the LSM tree.
62 type TableManifest struct {
63 Level uint8
64 KeyID uint64
65 Compression options.CompressionType
66 }
67
68 // manifestFile holds the file pointer (and other info) about the manifest file, which is a log
69 // file we append to.
70 type manifestFile struct {
71 fp *os.File
72 directory string
73
74 // The external magic number used by the application running badger.
75 externalMagic uint16
76
77 // We make this configurable so that unit tests can hit rewrite() code quickly
78 deletionsRewriteThreshold int
79
80 // Guards appends, which includes access to the manifest field.
81 appendLock sync.Mutex
82
83 // Used to track the current state of the manifest, used when rewriting.
84 manifest Manifest
85
86 // Used to indicate if badger was opened in InMemory mode.
87 inMemory bool
88 }
89
90 const (
91 // ManifestFilename is the filename for the manifest file.
92 ManifestFilename = "MANIFEST"
93 manifestRewriteFilename = "MANIFEST-REWRITE"
94 manifestDeletionsRewriteThreshold = 10000
95 manifestDeletionsRatio = 10
96 )
97
98 // asChanges returns a sequence of changes that could be used to recreate the Manifest in its
99 // present state.
100 func (m *Manifest) asChanges() []*pb.ManifestChange {
101 changes := make([]*pb.ManifestChange, 0, len(m.Tables))
102 for id, tm := range m.Tables {
103 changes = append(changes, newCreateChange(id, int(tm.Level), tm.KeyID, tm.Compression))
104 }
105 return changes
106 }
107
108 func (m *Manifest) clone(opt Options) Manifest {
109 changeSet := pb.ManifestChangeSet{Changes: m.asChanges()}
110 ret := createManifest()
111 y.Check(applyChangeSet(&ret, &changeSet, opt))
112 return ret
113 }
114
115 // openOrCreateManifestFile opens a Badger manifest file if it exists, or creates one if
116 // doesn’t exists.
117 func openOrCreateManifestFile(opt Options) (
118 ret *manifestFile, result Manifest, err error) {
119 if opt.InMemory {
120 return &manifestFile{inMemory: true}, Manifest{}, nil
121 }
122 return helpOpenOrCreateManifestFile(opt.Dir, opt.ReadOnly, opt.ExternalMagicVersion,
123 manifestDeletionsRewriteThreshold, opt)
124 }
125
126 func helpOpenOrCreateManifestFile(dir string, readOnly bool, extMagic uint16,
127 deletionsThreshold int, opt Options) (*manifestFile, Manifest, error) {
128
129 path := filepath.Join(dir, ManifestFilename)
130 var flags y.Flags
131 if readOnly {
132 flags |= y.ReadOnly
133 }
134 fp, err := y.OpenExistingFile(path, flags) // We explicitly sync in addChanges, outside the lock.
135 if err != nil {
136 if !os.IsNotExist(err) {
137 return nil, Manifest{}, err
138 }
139 if readOnly {
140 return nil, Manifest{}, fmt.Errorf("no manifest found, required for read-only db")
141 }
142 m := createManifest()
143 fp, netCreations, err := helpRewrite(dir, &m, extMagic)
144 if err != nil {
145 return nil, Manifest{}, err
146 }
147 y.AssertTrue(netCreations == 0)
148 mf := &manifestFile{
149 fp: fp,
150 directory: dir,
151 externalMagic: extMagic,
152 manifest: m.clone(opt),
153 deletionsRewriteThreshold: deletionsThreshold,
154 }
155 return mf, m, nil
156 }
157
158 manifest, truncOffset, err := ReplayManifestFile(fp, extMagic, opt)
159 if err != nil {
160 _ = fp.Close()
161 return nil, Manifest{}, err
162 }
163
164 if !readOnly {
165 // Truncate file so we don't have a half-written entry at the end.
166 if err := fp.Truncate(truncOffset); err != nil {
167 _ = fp.Close()
168 return nil, Manifest{}, err
169 }
170 }
171 if _, err = fp.Seek(0, io.SeekEnd); err != nil {
172 _ = fp.Close()
173 return nil, Manifest{}, err
174 }
175
176 mf := &manifestFile{
177 fp: fp,
178 directory: dir,
179 externalMagic: extMagic,
180 manifest: manifest.clone(opt),
181 deletionsRewriteThreshold: deletionsThreshold,
182 }
183 return mf, manifest, nil
184 }
185
186 func (mf *manifestFile) close() error {
187 if mf.inMemory {
188 return nil
189 }
190 return mf.fp.Close()
191 }
192
193 // addChanges writes a batch of changes, atomically, to the file. By "atomically" that means when
194 // we replay the MANIFEST file, we'll either replay all the changes or none of them. (The truth of
195 // this depends on the filesystem -- some might append garbage data if a system crash happens at
196 // the wrong time.)
197 func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange, opt Options) error {
198 if mf.inMemory {
199 return nil
200 }
201 changes := pb.ManifestChangeSet{Changes: changesParam}
202 buf, err := proto.Marshal(&changes)
203 if err != nil {
204 return err
205 }
206
207 // Maybe we could use O_APPEND instead (on certain file systems)
208 mf.appendLock.Lock()
209 defer mf.appendLock.Unlock()
210 if err := applyChangeSet(&mf.manifest, &changes, opt); err != nil {
211 return err
212 }
213 // Rewrite manifest if it'd shrink by 1/10 and it's big enough to care
214 if mf.manifest.Deletions > mf.deletionsRewriteThreshold &&
215 mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) {
216 if err := mf.rewrite(); err != nil {
217 return err
218 }
219 } else {
220 var lenCrcBuf [8]byte
221 binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(buf)))
222 binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(buf, y.CastagnoliCrcTable))
223 buf = append(lenCrcBuf[:], buf...)
224 if _, err := mf.fp.Write(buf); err != nil {
225 return err
226 }
227 }
228
229 return syncFunc(mf.fp)
230 }
231
232 // this function is saved here to allow injection of fake filesystem latency at test time.
233 var syncFunc = func(f *os.File) error { return f.Sync() }
234
235 // Has to be 4 bytes. The value can never change, ever, anyway.
236 var magicText = [4]byte{'B', 'd', 'g', 'r'}
237
238 // The magic version number. It is allocated 2 bytes, so it's value must be <= math.MaxUint16
239 const badgerMagicVersion = 8
240
241 func helpRewrite(dir string, m *Manifest, extMagic uint16) (*os.File, int, error) {
242 rewritePath := filepath.Join(dir, manifestRewriteFilename)
243 // We explicitly sync.
244 fp, err := y.OpenTruncFile(rewritePath, false)
245 if err != nil {
246 return nil, 0, err
247 }
248
249 // magic bytes are structured as
250 // +---------------------+-------------------------+-----------------------+
251 // | magicText (4 bytes) | externalMagic (2 bytes) | badgerMagic (2 bytes) |
252 // +---------------------+-------------------------+-----------------------+
253
254 y.AssertTrue(badgerMagicVersion <= math.MaxUint16)
255 buf := make([]byte, 8)
256 copy(buf[0:4], magicText[:])
257 binary.BigEndian.PutUint16(buf[4:6], extMagic)
258 binary.BigEndian.PutUint16(buf[6:8], badgerMagicVersion)
259
260 netCreations := len(m.Tables)
261 changes := m.asChanges()
262 set := pb.ManifestChangeSet{Changes: changes}
263
264 changeBuf, err := proto.Marshal(&set)
265 if err != nil {
266 fp.Close()
267 return nil, 0, err
268 }
269 var lenCrcBuf [8]byte
270 binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(changeBuf)))
271 binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(changeBuf, y.CastagnoliCrcTable))
272 buf = append(buf, lenCrcBuf[:]...)
273 buf = append(buf, changeBuf...)
274 if _, err := fp.Write(buf); err != nil {
275 fp.Close()
276 return nil, 0, err
277 }
278 if err := fp.Sync(); err != nil {
279 fp.Close()
280 return nil, 0, err
281 }
282
283 // In Windows the files should be closed before doing a Rename.
284 if err = fp.Close(); err != nil {
285 return nil, 0, err
286 }
287 manifestPath := filepath.Join(dir, ManifestFilename)
288 if err := os.Rename(rewritePath, manifestPath); err != nil {
289 return nil, 0, err
290 }
291 fp, err = y.OpenExistingFile(manifestPath, 0)
292 if err != nil {
293 return nil, 0, err
294 }
295 if _, err := fp.Seek(0, io.SeekEnd); err != nil {
296 fp.Close()
297 return nil, 0, err
298 }
299 if err := syncDir(dir); err != nil {
300 fp.Close()
301 return nil, 0, err
302 }
303
304 return fp, netCreations, nil
305 }
306
307 // Must be called while appendLock is held.
308 func (mf *manifestFile) rewrite() error {
309 // In Windows the files should be closed before doing a Rename.
310 if err := mf.fp.Close(); err != nil {
311 return err
312 }
313 fp, netCreations, err := helpRewrite(mf.directory, &mf.manifest, mf.externalMagic)
314 if err != nil {
315 return err
316 }
317 mf.fp = fp
318 mf.manifest.Creations = netCreations
319 mf.manifest.Deletions = 0
320
321 return nil
322 }
323
324 type countingReader struct {
325 wrapped *bufio.Reader
326 count int64
327 }
328
329 func (r *countingReader) Read(p []byte) (n int, err error) {
330 n, err = r.wrapped.Read(p)
331 r.count += int64(n)
332 return
333 }
334
335 func (r *countingReader) ReadByte() (b byte, err error) {
336 b, err = r.wrapped.ReadByte()
337 if err == nil {
338 r.count++
339 }
340 return
341 }
342
343 var (
344 errBadMagic = errors.New("manifest has bad magic")
345 errBadChecksum = errors.New("manifest has checksum mismatch")
346 )
347
348 // ReplayManifestFile reads the manifest file and constructs two manifest objects. (We need one
349 // immutable copy and one mutable copy of the manifest. Easiest way is to construct two of them.)
350 // Also, returns the last offset after a completely read manifest entry -- the file must be
351 // truncated at that point before further appends are made (if there is a partial entry after
352 // that). In normal conditions, truncOffset is the file size.
353 func ReplayManifestFile(fp *os.File, extMagic uint16, opt Options) (Manifest, int64, error) {
354 r := countingReader{wrapped: bufio.NewReader(fp)}
355
356 var magicBuf [8]byte
357 if _, err := io.ReadFull(&r, magicBuf[:]); err != nil {
358 return Manifest{}, 0, errBadMagic
359 }
360 if !bytes.Equal(magicBuf[0:4], magicText[:]) {
361 return Manifest{}, 0, errBadMagic
362 }
363
364 extVersion := y.BytesToU16(magicBuf[4:6])
365 version := y.BytesToU16(magicBuf[6:8])
366
367 if version != badgerMagicVersion {
368 return Manifest{}, 0,
369 //nolint:lll
370 fmt.Errorf("manifest has unsupported version: %d (we support %d).\n"+
371 "Please see https://docs.hypermode.com/badger/troubleshooting#i-see-manifest-has-unsupported-version-x-we-support-y-error"+
372 " on how to fix this.",
373 version, badgerMagicVersion)
374 }
375 if extVersion != extMagic {
376 return Manifest{}, 0,
377 fmt.Errorf("Cannot open DB because the external magic number doesn't match. "+
378 "Expected: %d, version present in manifest: %d\n", extMagic, extVersion)
379 }
380
381 stat, err := fp.Stat()
382 if err != nil {
383 return Manifest{}, 0, err
384 }
385
386 build := createManifest()
387 var offset int64
388 for {
389 offset = r.count
390 var lenCrcBuf [8]byte
391 _, err := io.ReadFull(&r, lenCrcBuf[:])
392 if err != nil {
393 if err == io.EOF || err == io.ErrUnexpectedEOF {
394 break
395 }
396 return Manifest{}, 0, err
397 }
398 length := y.BytesToU32(lenCrcBuf[0:4])
399 // Sanity check to ensure we don't over-allocate memory.
400 if length > uint32(stat.Size()) {
401 return Manifest{}, 0, fmt.Errorf(
402 "Buffer length: %d greater than file size: %d. Manifest file might be corrupted",
403 length, stat.Size())
404 }
405 var buf = make([]byte, length)
406 if _, err := io.ReadFull(&r, buf); err != nil {
407 if err == io.EOF || err == io.ErrUnexpectedEOF {
408 break
409 }
410 return Manifest{}, 0, err
411 }
412 if crc32.Checksum(buf, y.CastagnoliCrcTable) != y.BytesToU32(lenCrcBuf[4:8]) {
413 return Manifest{}, 0, errBadChecksum
414 }
415
416 var changeSet pb.ManifestChangeSet
417 if err := proto.Unmarshal(buf, &changeSet); err != nil {
418 return Manifest{}, 0, err
419 }
420
421 if err := applyChangeSet(&build, &changeSet, opt); err != nil {
422 return Manifest{}, 0, err
423 }
424 }
425
426 return build, offset, nil
427 }
428
429 func applyManifestChange(build *Manifest, tc *pb.ManifestChange, opt Options) error {
430 switch tc.Op {
431 case pb.ManifestChange_CREATE:
432 if _, ok := build.Tables[tc.Id]; ok {
433 return fmt.Errorf("MANIFEST invalid, table %d exists", tc.Id)
434 }
435 build.Tables[tc.Id] = TableManifest{
436 Level: uint8(tc.Level),
437 KeyID: tc.KeyId,
438 Compression: options.CompressionType(tc.Compression),
439 }
440 for len(build.Levels) <= int(tc.Level) {
441 build.Levels = append(build.Levels, levelManifest{make(map[uint64]struct{})})
442 }
443 build.Levels[tc.Level].Tables[tc.Id] = struct{}{}
444 build.Creations++
445 case pb.ManifestChange_DELETE:
446 tm, ok := build.Tables[tc.Id]
447 if !ok {
448 opt.Warningf("MANIFEST delete: table %d has already been removed", tc.Id)
449 for _, level := range build.Levels {
450 delete(level.Tables, tc.Id)
451 }
452 } else {
453 delete(build.Levels[tm.Level].Tables, tc.Id)
454 delete(build.Tables, tc.Id)
455 }
456 build.Deletions++
457 default:
458 return fmt.Errorf("MANIFEST file has invalid manifestChange op")
459 }
460 return nil
461 }
462
463 // This is not a "recoverable" error -- opening the KV store fails because the MANIFEST file is
464 // just plain broken.
465 func applyChangeSet(build *Manifest, changeSet *pb.ManifestChangeSet, opt Options) error {
466 for _, change := range changeSet.Changes {
467 if err := applyManifestChange(build, change, opt); err != nil {
468 return err
469 }
470 }
471 return nil
472 }
473
474 func newCreateChange(
475 id uint64, level int, keyID uint64, c options.CompressionType) *pb.ManifestChange {
476 return &pb.ManifestChange{
477 Id: id,
478 Op: pb.ManifestChange_CREATE,
479 Level: uint32(level),
480 KeyId: keyID,
481 // Hard coding it, since we're supporting only AES for now.
482 EncryptionAlgo: pb.EncryptionAlgo_aes,
483 Compression: uint32(c),
484 }
485 }
486
487 func newDeleteChange(id uint64) *pb.ManifestChange {
488 return &pb.ManifestChange{
489 Id: id,
490 Op: pb.ManifestChange_DELETE,
491 }
492 }
493