1 package ffldb
2 3 import (
4 "fmt"
5 "hash/crc32"
6 7 "github.com/p9c/p9/pkg/database"
8 )
9 10 // serializeWriteRow serialize the current block file and offset where new will be written into a format suitable for
11 // storage into the metadata.
12 //
13 // The serialized write cursor location format is:
14 //
15 // [0:4] Block file (4 bytes)
16 //
17 // [4:8] File offset (4 bytes)
18 //
19 // [8:12] Castagnoli CRC-32 checksum (4 bytes)
20 func serializeWriteRow(curBlockFileNum, curFileOffset uint32) []byte {
21 var serializedRow [12]byte
22 byteOrder.PutUint32(serializedRow[0:4], curBlockFileNum)
23 byteOrder.PutUint32(serializedRow[4:8], curFileOffset)
24 checksum := crc32.Checksum(serializedRow[:8], castagnoli)
25 byteOrder.PutUint32(serializedRow[8:12], checksum)
26 return serializedRow[:]
27 }
28 29 // deserializeWriteRow deserializes the write cursor location stored in the metadata. Returns ErrCorruption if the
30 // checksum of the entry doesn't match.
31 func deserializeWriteRow(writeRow []byte) (uint32, uint32, error) {
32 // Ensure the checksum matches. The checksum is at the end.
33 gotChecksum := crc32.Checksum(writeRow[:8], castagnoli)
34 wantChecksumBytes := writeRow[8:12]
35 wantChecksum := byteOrder.Uint32(wantChecksumBytes)
36 if gotChecksum != wantChecksum {
37 str := fmt.Sprintf(
38 "metadata for write cursor does not match "+
39 "the expected checksum - got %d, want %d", gotChecksum,
40 wantChecksum,
41 )
42 return 0, 0, makeDbErr(database.ErrCorruption, str, nil)
43 }
44 fileNum := byteOrder.Uint32(writeRow[0:4])
45 fileOffset := byteOrder.Uint32(writeRow[4:8])
46 return fileNum, fileOffset, nil
47 }
48 49 // reconcileDB reconciles the metadata with the flat block files on disk. It will also initialize the underlying
50 // database if the create flag is set.
51 func reconcileDB(pdb *db, create bool) (database.DB, error) {
52 // Perform initial internal bucket and value creation during database creation.
53 if create {
54 if e := initDB(pdb.cache.ldb); E.Chk(e) {
55 return nil, e
56 }
57 }
58 // Load the current write cursor position from the metadata.
59 var curFileNum, curOffset uint32
60 e := pdb.View(
61 func(tx database.Tx) (e error) {
62 writeRow := tx.Metadata().Get(writeLocKeyName)
63 if writeRow == nil {
64 str := "write cursor does not exist"
65 return makeDbErr(database.ErrCorruption, str, nil)
66 }
67 curFileNum, curOffset, e = deserializeWriteRow(writeRow)
68 return e
69 },
70 )
71 if e != nil {
72 return nil, e
73 }
74 // When the write cursor position found by scanning the block files on disk is AFTER the position the metadata
75 // believes to be true, truncate the files on disk to match the metadata. This can be a fairly common occurrence in
76 // unclean shutdown scenarios while the block files are in the middle of being written. Since the metadata isn't
77 // updated until after the block data is written, this is effectively just a rollback to the known good point before
78 // the unclean shutdown.
79 wc := pdb.store.writeCursor
80 if wc.curFileNum > curFileNum || (wc.curFileNum == curFileNum &&
81 wc.curOffset > curOffset) {
82 D.Ln("detected unclean shutdown - repairing")
83 D.F(
84 "metadata claims file %d, offset %d. block data is at file %d, offset %d",
85 curFileNum, curOffset, wc.curFileNum, wc.curOffset,
86 )
87 pdb.store.handleRollback(curFileNum, curOffset)
88 D.Ln("database sync complete")
89 }
90 91 // When the write cursor position found by scanning the block files on disk is BEFORE the position the metadata
92 // believes to be true, return a corruption error. Since sync is called after each block is written and before the
93 // metadata is updated, this should only happen in the case of missing, deleted, or truncated block files, which
94 // generally is not an easily recoverable scenario.
95 //
96 // todo: In the future, it might be possible to rescan and rebuild the metadata from the block files, however, that
97 // would need to happen with coordination from a higher layer since it could invalidate other metadata.
98 if wc.curFileNum < curFileNum || (wc.curFileNum == curFileNum &&
99 wc.curOffset < curOffset) {
100 str := fmt.Sprintf(
101 "metadata claims file %d, offset %d, but block data is at file %d, offset %d",
102 curFileNum, curOffset, wc.curFileNum, wc.curOffset,
103 )
104 W.Ln("***Database corruption detected***:", str)
105 return nil, makeDbErr(database.ErrCorruption, str, nil)
106 }
107 return pdb, nil
108 }
109