reconcile.go raw

   1  package ffldb
   2  
   3  import (
   4  	"fmt"
   5  	"hash/crc32"
   6  	
   7  	"github.com/p9c/p9/pkg/database"
   8  )
   9  
  10  // serializeWriteRow serialize the current block file and offset where new will be written into a format suitable for
  11  // storage into the metadata.
  12  //
  13  // The serialized write cursor location format is:
  14  //
  15  //  [0:4]  Block file (4 bytes)
  16  //
  17  //  [4:8]  File offset (4 bytes)
  18  //
  19  //  [8:12] Castagnoli CRC-32 checksum (4 bytes)
  20  func serializeWriteRow(curBlockFileNum, curFileOffset uint32) []byte {
  21  	var serializedRow [12]byte
  22  	byteOrder.PutUint32(serializedRow[0:4], curBlockFileNum)
  23  	byteOrder.PutUint32(serializedRow[4:8], curFileOffset)
  24  	checksum := crc32.Checksum(serializedRow[:8], castagnoli)
  25  	byteOrder.PutUint32(serializedRow[8:12], checksum)
  26  	return serializedRow[:]
  27  }
  28  
  29  // deserializeWriteRow deserializes the write cursor location stored in the metadata. Returns ErrCorruption if the
  30  // checksum of the entry doesn't match.
  31  func deserializeWriteRow(writeRow []byte) (uint32, uint32, error) {
  32  	// Ensure the checksum matches.  The checksum is at the end.
  33  	gotChecksum := crc32.Checksum(writeRow[:8], castagnoli)
  34  	wantChecksumBytes := writeRow[8:12]
  35  	wantChecksum := byteOrder.Uint32(wantChecksumBytes)
  36  	if gotChecksum != wantChecksum {
  37  		str := fmt.Sprintf(
  38  			"metadata for write cursor does not match "+
  39  				"the expected checksum - got %d, want %d", gotChecksum,
  40  			wantChecksum,
  41  		)
  42  		return 0, 0, makeDbErr(database.ErrCorruption, str, nil)
  43  	}
  44  	fileNum := byteOrder.Uint32(writeRow[0:4])
  45  	fileOffset := byteOrder.Uint32(writeRow[4:8])
  46  	return fileNum, fileOffset, nil
  47  }
  48  
  49  // reconcileDB reconciles the metadata with the flat block files on disk. It will also initialize the underlying
  50  // database if the create flag is set.
  51  func reconcileDB(pdb *db, create bool) (database.DB, error) {
  52  	// Perform initial internal bucket and value creation during database creation.
  53  	if create {
  54  		if e := initDB(pdb.cache.ldb); E.Chk(e) {
  55  			return nil, e
  56  		}
  57  	}
  58  	// Load the current write cursor position from the metadata.
  59  	var curFileNum, curOffset uint32
  60  	e := pdb.View(
  61  		func(tx database.Tx) (e error) {
  62  			writeRow := tx.Metadata().Get(writeLocKeyName)
  63  			if writeRow == nil {
  64  				str := "write cursor does not exist"
  65  				return makeDbErr(database.ErrCorruption, str, nil)
  66  			}
  67  			curFileNum, curOffset, e = deserializeWriteRow(writeRow)
  68  			return e
  69  		},
  70  	)
  71  	if e != nil {
  72  		return nil, e
  73  	}
  74  	// When the write cursor position found by scanning the block files on disk is AFTER the position the metadata
  75  	// believes to be true, truncate the files on disk to match the metadata. This can be a fairly common occurrence in
  76  	// unclean shutdown scenarios while the block files are in the middle of being written. Since the metadata isn't
  77  	// updated until after the block data is written, this is effectively just a rollback to the known good point before
  78  	// the unclean shutdown.
  79  	wc := pdb.store.writeCursor
  80  	if wc.curFileNum > curFileNum || (wc.curFileNum == curFileNum &&
  81  		wc.curOffset > curOffset) {
  82  		D.Ln("detected unclean shutdown - repairing")
  83  		D.F(
  84  			"metadata claims file %d, offset %d. block data is at file %d, offset %d",
  85  			curFileNum, curOffset, wc.curFileNum, wc.curOffset,
  86  		)
  87  		pdb.store.handleRollback(curFileNum, curOffset)
  88  		D.Ln("database sync complete")
  89  	}
  90  	
  91  	// When the write cursor position found by scanning the block files on disk is BEFORE the position the metadata
  92  	// believes to be true, return a corruption error. Since sync is called after each block is written and before the
  93  	// metadata is updated, this should only happen in the case of missing, deleted, or truncated block files, which
  94  	// generally is not an easily recoverable scenario.
  95  	//
  96  	// todo: In the future, it might be possible to rescan and rebuild the metadata from the block files, however, that
  97  	//  would need to happen with coordination from a higher layer since it could invalidate other metadata.
  98  	if wc.curFileNum < curFileNum || (wc.curFileNum == curFileNum &&
  99  		wc.curOffset < curOffset) {
 100  		str := fmt.Sprintf(
 101  			"metadata claims file %d, offset %d, but block data is at file %d, offset %d",
 102  			curFileNum, curOffset, wc.curFileNum, wc.curOffset,
 103  		)
 104  		W.Ln("***Database corruption detected***:", str)
 105  		return nil, makeDbErr(database.ErrCorruption, str, nil)
 106  	}
 107  	return pdb, nil
 108  }
 109