package ffldb import ( "container/list" "encoding/binary" "fmt" "hash/crc32" "io" "os" "path/filepath" "sync" "github.com/p9c/p9/pkg/chainhash" "github.com/p9c/p9/pkg/database" "github.com/p9c/p9/pkg/wire" ) const ( // The Bitcoin protocol encodes block height as int32, so max number of blocks is 2^31. Max block size per the // protocol is 32MiB per block. // // So the theoretical max at the time this comment was written is 64PiB (pebibytes). // // With files @ 512MiB each, this would require a maximum of 134,217,728 files. Thus, choose 9 digits of precision // for the filenames. An additional benefit is 9 digits provides 10^9 files @ 512MiB each for a total of ~476.84PiB // (roughly 7.4 times the current theoretical max), so there is room for the max block size to grow in the future. blockFilenameTemplate = "%09d.fdb" // maxOpenFiles is the max number of open files to maintain in the open blocks cache. Note that this does not // include the current write file, so there will typically be one more than this value open. maxOpenFiles = 25 // maxBlockFileSize is the maximum size for each file used to store blocks. // // NOTE: The current code uses uint32 for all offsets, so this value must be less than 2^32 (4 GiB). This is also // why it's a typed constant. maxBlockFileSize uint32 = 512 * 1024 * 1024 // 512 MiB // blockLocSize is the number of bytes the serialized block location data that is stored in the block index. // // The serialized block location format is: // // [0:4] Block file (4 bytes) // // [4:8] File offset (4 bytes) // // [8:12] Block length (4 bytes) blockLocSize = 12 ) var ( // castagnoli houses the Catagnoli polynomial used for CRC-32 checksums. castagnoli = crc32.MakeTable(crc32.Castagnoli) ) type ( // filer is an interface which acts very similar to a *os.File and is typically implemented by it. It exists so the // test code can provide mock files for properly testing corruption and file system issues. filer interface { io.Closer io.WriterAt io.ReaderAt Truncate(size int64) error Sync() error } // lockableFile represents a block file on disk that has been opened for either read or read/write access. It also // contains a read-write mutex to support multiple concurrent readers. lockableFile struct { sync.RWMutex file filer } // writeCursor represents the current file and offset of the block file on disk for performing all writes. It also // contains a read-write mutex to support multiple concurrent readers which can reuse the file handle. writeCursor struct { sync.RWMutex // curFile is the current block file that will be appended to when writing new blocks. curFile *lockableFile // curFileNum is the current block file number and is used to allow readers to use the same open file handle. curFileNum uint32 // curOffset is the offset in the current write block file where the next new block will be written. curOffset uint32 } // blockStore houses information used to handle reading and writing blocks (and part of blocks) into flat files with // support for multiple concurrent readers. blockStore struct { // network is the specific network to use in the flat files for each block. network wire.BitcoinNet // basePath is the base path used for the flat block files and metadata. basePath string // maxBlockFileSize is the maximum size for each file used to store blocks. It is defined on the store so the // whitebox tests can override the value. maxBlockFileSize uint32 // The following fields are related to the flat files which hold the actual blocks. // // The number of open files is limited by maxOpenFiles. // // obfMutex protects concurrent access to the openBlockFiles map. It is a RWMutex so multiple readers can // simultaneously access open files. // // openBlockFiles houses the open file handles for existing block files which have been opened read-only along // with an individual RWMutex. This scheme allows multiple concurrent readers to the same file while preventing // the file from being closed out from under them. // // lruMutex protects concurrent access to the least recently used list and lookup map. // // openBlocksLRU tracks how the open files are referenced by pushing the most recently used files to the front // of the list thereby trickling the least recently used files to end of the list. When a file needs to be // closed due to exceeding the the max number of allowed open files, the one at the end of the list is closed. // // fileNumToLRUElem is a mapping between a specific block file number and the associated list element on the // least recently used list. // // Thus, with the combination of these fields, the database supports concurrent non-blocking reads across // multiple and individual files along with intelligently limiting the number of open file handles by closing // the least recently used files as needed. // // NOTE: The locking order used throughout is well-defined and MUST be followed. Failure to do so could lead to deadlocks. In particular, the locking order is as follows: // // 1) obfMutex // // 2) lruMutex // // 3) writeCursor mutex // // 4) specific file mutexes // // None of the mutexes are required to be locked at the same time, and often aren't. However, if they are to be // locked simultaneously, they MUST be locked in the order previously specified. // // Due to the high performance and multi-read concurrency requirements, write locks should only be held for the // minimum time necessary. obfMutex sync.RWMutex lruMutex sync.Mutex openBlocksLRU *list.List // Contains uint32 block file numbers. fileNumToLRUElem map[uint32]*list.Element openBlockFiles map[uint32]*lockableFile // writeCursor houses the state for the current file and location that new blocks are written to. writeCursor *writeCursor // These functions are set to openFile, openWriteFile, and deleteFile by default, but are exposed here to allow // the whitebox tests to replace them when working with mock files. openFileFunc func(fileNum uint32) (*lockableFile, error) openWriteFileFunc func(fileNum uint32) (filer, error) deleteFileFunc func(fileNum uint32) error } // blockLocation identifies a particular block file and location. blockLocation struct { blockFileNum uint32 fileOffset uint32 blockLen uint32 } ) // deserializeBlockLoc deserializes the passed serialized block location information. This is data stored into the block // index metadata for each block. The serialized data passed to this function MUST be at least blockLocSize bytes or it // will panic. The error check is avoided here because this information will always be coming from the block index which // includes a checksum to detect corruption. Thus it is safe to use this unchecked here. func deserializeBlockLoc(serializedLoc []byte) blockLocation { // The serialized block location format is: // // [0:4] Block file (4 bytes) // // [4:8] File offset (4 bytes) // // [8:12] Block length (4 bytes) return blockLocation{ blockFileNum: byteOrder.Uint32(serializedLoc[0:4]), fileOffset: byteOrder.Uint32(serializedLoc[4:8]), blockLen: byteOrder.Uint32(serializedLoc[8:12]), } } // serializeBlockLoc returns the serialization of the passed block location. This is data to be stored into the block // index metadata for each block. func serializeBlockLoc(loc blockLocation) []byte { // The serialized block location format is: // // [0:4] Block file (4 bytes) // // [4:8] File offset (4 bytes) // // [8:12] Block length (4 bytes) var serializedData [12]byte byteOrder.PutUint32(serializedData[0:4], loc.blockFileNum) byteOrder.PutUint32(serializedData[4:8], loc.fileOffset) byteOrder.PutUint32(serializedData[8:12], loc.blockLen) return serializedData[:] } // blockFilePath return the file path for the provided block file number. func blockFilePath(dbPath string, fileNum uint32) string { fileName := fmt.Sprintf(blockFilenameTemplate, fileNum) return filepath.Join(dbPath, fileName) } // openWriteFile returns a file handle for the passed flat file number in read/write mode. The file will be created if // needed. It is typically used for the current file that will have all new data appended. Unlike openFile, this // function does not keep track of the open file and it is not subject to the maxOpenFiles limit. func (s *blockStore) openWriteFile(fileNum uint32) (filer, error) { // The current block file needs to be read-write so it is possible to append to it. Also, it shouldn't be part of // the least recently used file. filePath := blockFilePath(s.basePath, fileNum) file, e := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0666) if e != nil { str := fmt.Sprintf("failed to open file %q: %v", filePath, e) return nil, makeDbErr(database.ErrDriverSpecific, str, e) } return file, nil } // openFile returns a read-only file handle for the passed flat file number. The function also keeps track of the open // files, performs least recently used tracking, and limits the number of open files to maxOpenFiles by closing the // least recently used file as needed. // // This function MUST be called with the overall files mutex (s.obfMutex) locked for WRITES. func (s *blockStore) openFile(fileNum uint32) (*lockableFile, error) { // Open the appropriate file as read-only. filePath := blockFilePath(s.basePath, fileNum) file, e := os.Open(filePath) if e != nil { return nil, makeDbErr( database.ErrDriverSpecific, e.Error(), e, ) } blockFile := &lockableFile{file: file} // Close the least recently used file if the file exceeds the max allowed open files. This is not done until after // the file open in case the file fails to open, there is no need to close any files. // // A write lock is required on the LRU list here to protect against modifications happening as already open files // are read from and shuffled to the front of the list. // // Also, add the file that was just opened to the front of the least recently used list to indicate it is the most // recently used file and therefore should be closed last. s.lruMutex.Lock() lruList := s.openBlocksLRU if lruList.Len() >= maxOpenFiles { lruFileNum := lruList.Remove(lruList.Back()).(uint32) oldBlockFile := s.openBlockFiles[lruFileNum] // Close the old file under the write lock for the file in case any readers are currently reading from it so // it's not closed out from under them. oldBlockFile.Lock() _ = oldBlockFile.file.Close() oldBlockFile.Unlock() delete(s.openBlockFiles, lruFileNum) delete(s.fileNumToLRUElem, lruFileNum) } s.fileNumToLRUElem[fileNum] = lruList.PushFront(fileNum) s.lruMutex.Unlock() // Store a reference to it in the open block files map. s.openBlockFiles[fileNum] = blockFile return blockFile, nil } // deleteFile removes the block file for the passed flat file number. The file // must already be closed and it is the responsibility of the caller to do any // other state cleanup necessary. func (s *blockStore) deleteFile(fileNum uint32) (e error) { filePath := blockFilePath(s.basePath, fileNum) if e := os.Remove(filePath); E.Chk(e) { return makeDbErr(database.ErrDriverSpecific, e.Error(), e) } return nil } // blockFile attempts to return an existing file handle for the passed flat file // number if it is already open as well as marking it as most recently used. It // will also open the file when it's not already open subject to the rules // described in openFile. // // NOTE: The returned block file will already have the read lock acquired and // the caller MUST call .RUnlock() to release it once it has finished all read // operations. This is necessary because otherwise it would be possible for a // separate goroutine to close the file after it is returned from here, but // before the caller has acquired a read lock. func (s *blockStore) blockFile(fileNum uint32) (*lockableFile, error) { // When the requested block file is open for writes, return it. wc := s.writeCursor wc.RLock() if fileNum == wc.curFileNum && wc.curFile.file != nil { obf := wc.curFile obf.RLock() wc.RUnlock() return obf, nil } wc.RUnlock() // Try to return an open file under the overall files read lock. s.obfMutex.RLock() if obf, ok := s.openBlockFiles[fileNum]; ok { s.lruMutex.Lock() s.openBlocksLRU.MoveToFront(s.fileNumToLRUElem[fileNum]) s.lruMutex.Unlock() obf.RLock() s.obfMutex.RUnlock() return obf, nil } s.obfMutex.RUnlock() // Since the file isn't open already, need to check the open block files map again under write lock in case multiple // readers got here and a separate one is already opening the file. s.obfMutex.Lock() if obf, ok := s.openBlockFiles[fileNum]; ok { obf.RLock() s.obfMutex.Unlock() return obf, nil } // The file isn't open, so open it while potentially closing the least recently used one as needed. obf, e := s.openFileFunc(fileNum) if e != nil { s.obfMutex.Unlock() return nil, e } obf.RLock() s.obfMutex.Unlock() return obf, nil } // writeData is a helper function for writeBlock which writes the provided data at the current write offset and updates // the write cursor accordingly. The field name parameter is only used when there is an error to provide a nicer error // message. // // The write cursor will be advanced the number of bytes actually written in the event of failure. // // NOTE: This function MUST be called with the write cursor current file lock held and must only be called during a // write transaction so it is effectively locked for writes. Also, the write cursor current file must NOT be nilog. func (s *blockStore) writeData(data []byte, fieldName string) (e error) { wc := s.writeCursor n, e := wc.curFile.file.WriteAt(data, int64(wc.curOffset)) wc.curOffset += uint32(n) if e != nil { str := fmt.Sprintf( "failed to write %s to file %d at "+ "offset %d: %v", fieldName, wc.curFileNum, wc.curOffset-uint32(n), e, ) return makeDbErr(database.ErrDriverSpecific, str, e) } return nil } // writeBlock appends the specified raw block bytes to the store's write cursor location and increments it accordingly. // When the block would exceed the max file size for the current flat file, this function will close the current file, // create the next file, update the write cursor, and write the block to the new file. // // The write cursor will also be advanced the number of bytes actually written in the event of failure. Format: // func (s *blockStore) writeBlock(rawBlock []byte) (blockLocation, error) { // Compute how many bytes will be written. // // 4 bytes each for block network + 4 bytes for block length + length of raw block + 4 bytes for checksum. blockLen := uint32(len(rawBlock)) fullLen := blockLen + 12 // Move to the next block file if adding the new block would exceed the max allowed size for the current block file. // Also detect overflow to be paranoid, even though it isn't possible currently, numbers might change in the future // to make it possible. // // NOTE: The writeCursor.offset field isn't protected by the mutex since it's only read/changed during this function // which can only be called during a write transaction, of which there can be only one at a time. wc := s.writeCursor finalOffset := wc.curOffset + fullLen if finalOffset < wc.curOffset || finalOffset > s.maxBlockFileSize { // This is done under the write cursor lock since the curFileNum field is accessed elsewhere by readers. // // Close the current write file to force a read-only reopen with LRU tracking. The close is done under the write // lock for the file to prevent it from being closed out from under any readers currently reading from it. wc.Lock() wc.curFile.Lock() if wc.curFile.file != nil { _ = wc.curFile.file.Close() wc.curFile.file = nil } wc.curFile.Unlock() // Start writes into next file. wc.curFileNum++ wc.curOffset = 0 wc.Unlock() } // All writes are done under the write lock for the file to ensure any readers are finished and blocked first. wc.curFile.Lock() defer wc.curFile.Unlock() // Open the current file if needed. // // This will typically only be the case when moving to the next file to write to or on initial database load. // // However, it might also be the case if rollbacks happened after file writes started during a transaction commit. if wc.curFile.file == nil { file, e := s.openWriteFileFunc(wc.curFileNum) if e != nil { return blockLocation{}, e } wc.curFile.file = file } // Bitcoin network. origOffset := wc.curOffset hasher := crc32.New(castagnoli) var scratch [4]byte byteOrder.PutUint32(scratch[:], uint32(s.network)) if e := s.writeData(scratch[:], "network"); E.Chk(e) { return blockLocation{}, e } _, _ = hasher.Write(scratch[:]) // Block length. byteOrder.PutUint32(scratch[:], blockLen) if e := s.writeData(scratch[:], "block length"); E.Chk(e) { return blockLocation{}, e } _, _ = hasher.Write(scratch[:]) // Serialized block. if e := s.writeData(rawBlock[:], "block"); E.Chk(e) { return blockLocation{}, e } _, _ = hasher.Write(rawBlock) // Castagnoli CRC-32 as a checksum of all the previous. if e := s.writeData(hasher.Sum(nil), "checksum"); E.Chk(e) { return blockLocation{}, e } loc := blockLocation{ blockFileNum: wc.curFileNum, fileOffset: origOffset, blockLen: fullLen, } return loc, nil } // readBlock reads the specified block record and returns the serialized block. It ensures the integrity of the block // data by checking that the serialized network matches the current network associated with the block store and // comparing the calculated checksum against the one stored in the flat file. // // This function also automatically handles all file management such as opening and closing files as necessary to stay // within the maximum allowed open files limit. // // Returns ErrDriverSpecific if the data fails to read for any reason and ErrCorruption if the checksum of the read data // doesn't match the checksum read from the file. Format: func (s *blockStore) readBlock(hash *chainhash.Hash, loc blockLocation) ([]byte, error) { // Get the referenced block file handle opening the file as needed. The function also handles closing files as // needed to avoid going over the max allowed open files. blockFile, e := s.blockFile(loc.blockFileNum) if e != nil { return nil, e } serializedData := make([]byte, loc.blockLen) n, e := blockFile.file.ReadAt(serializedData, int64(loc.fileOffset)) blockFile.RUnlock() if e != nil { str := fmt.Sprintf( "failed to read block %s from file %d, "+ "offset %d: %v", hash, loc.blockFileNum, loc.fileOffset, e, ) return nil, makeDbErr(database.ErrDriverSpecific, str, e) } // Calculate the checksum of the read data and ensure it matches the serialized checksum. This will detect any data // corruption in the flat file without having to do much more expensive merkle root calculations on the loaded // block. serializedChecksum := binary.BigEndian.Uint32(serializedData[n-4:]) calculatedChecksum := crc32.Checksum(serializedData[:n-4], castagnoli) if serializedChecksum != calculatedChecksum { str := fmt.Sprintf( "block data for block %s checksum "+ "does not match - got %x, want %x", hash, calculatedChecksum, serializedChecksum, ) return nil, makeDbErr(database.ErrCorruption, str, nil) } // The network associated with the block must match the current active network, otherwise somebody probably put the // block files for the wrong network in the directory. serializedNet := byteOrder.Uint32(serializedData[:4]) if serializedNet != uint32(s.network) { str := fmt.Sprintf( "block data for block %s is for the "+ "wrong network - got %d, want %d", hash, serializedNet, uint32(s.network), ) return nil, makeDbErr(database.ErrDriverSpecific, str, nil) } // The raw block excludes the network, length of the block, and checksum. return serializedData[8 : n-4], nil } // readBlockRegion reads the specified amount of data at the provided offset for a given block location. The offset is // relative to the start of the serialized block (as opposed to the beginning of the block record). // // This function automatically handles all file management such as opening and closing files as necessary to stay within // the maximum allowed open files limit. // // Returns ErrDriverSpecific if the data fails to read for any reason. func (s *blockStore) readBlockRegion(loc blockLocation, offset, numBytes uint32) ([]byte, error) { // Get the referenced block file handle opening the file as needed. The function also handles closing files as // needed to avoid going over the max allowed open files. blockFile, e := s.blockFile(loc.blockFileNum) if e != nil { return nil, e } // Regions are offsets into the actual block, however the serialized data for a block includes an initial 4 bytes // for network + 4 bytes for block length. Thus, add 8 bytes to adjust. readOffset := loc.fileOffset + 8 + offset serializedData := make([]byte, numBytes) _, e = blockFile.file.ReadAt(serializedData, int64(readOffset)) blockFile.RUnlock() if e != nil { str := fmt.Sprintf( "failed to read region from block file %d, "+ "offset %d, len %d: %v", loc.blockFileNum, readOffset, numBytes, e, ) return nil, makeDbErr(database.ErrDriverSpecific, str, e) } return serializedData, nil } // syncBlocks performs a file system sync on the flat file associated with the store's current write cursor. It is safe // to call even when there is not a current write file in which case it will have no effect. // // This is used when flushing cached metadata updates to disk to ensure all the block data is fully written before // updating the metadata. This ensures the metadata and block data can be properly reconciled in failure scenarios. func (s *blockStore) syncBlocks() (e error) { wc := s.writeCursor wc.RLock() defer wc.RUnlock() // Nothing to do if there is no current file associated with the write cursor. wc.curFile.RLock() defer wc.curFile.RUnlock() if wc.curFile.file == nil { return nil } // Sync the file to disk. if e := wc.curFile.file.Sync(); E.Chk(e) { str := fmt.Sprintf( "failed to sync file %d: %v", wc.curFileNum, e, ) return makeDbErr(database.ErrDriverSpecific, str, e) } return nil } // handleRollback rolls the block files on disk back to the provided file number and offset. This involves potentially // deleting and truncating the files that were partially written. // // There are effectively two scenarios to consider here: // // 1) Transient write failures from which recovery is possible // // 2) More permanent failures such as hard disk death and/or removal // // In either case, the write cursor will be repositioned to the old block file offset regardless of any other errors // that occur while attempting to undo writes. // // For the first scenario, this will lead to any data which failed to be undone being overwritten and thus behaves as // desired as the system continues to run. For the second scenario, the metadata which stores the current write cursor // position within the block files will not have been updated yet and thus if the system eventually recovers (perhaps // the hard drive is reconnected), it will also lead to any data which failed to be undone being overwritten and thus // behaves as desired. // // Therefore, any errors are simply logged at a warning level rather than being returned since there is nothing more // that could be done about it anyways. func (s *blockStore) handleRollback(oldBlockFileNum, oldBlockOffset uint32) { // Grab the write cursor mutex since it is modified throughout this function. wc := s.writeCursor wc.Lock() defer wc.Unlock() // Nothing to do if the rollback point is the same as the current write cursor. if wc.curFileNum == oldBlockFileNum && wc.curOffset == oldBlockOffset { return } // Regardless of any failures that happen below, reposition the write cursor to the old block file and offset. defer func() { wc.curFileNum = oldBlockFileNum wc.curOffset = oldBlockOffset }() D.F( "ROLLBACK: Rolling back to file %d, offset %d", oldBlockFileNum, oldBlockOffset, ) // Close the current write file if it needs to be deleted. Then delete all files that are newer than the provided // rollback file while also moving the write cursor file backwards accordingly. if wc.curFileNum > oldBlockFileNum { wc.curFile.Lock() if wc.curFile.file != nil { _ = wc.curFile.file.Close() wc.curFile.file = nil } wc.curFile.Unlock() } for ; wc.curFileNum > oldBlockFileNum; wc.curFileNum-- { if e := s.deleteFileFunc(wc.curFileNum); E.Chk(e) { W.Ln( "ROLLBACK: Failed to delete block file number %d: %v %s", wc.curFileNum, e, ) return } } // Open the file for the current write cursor if needed. wc.curFile.Lock() if wc.curFile.file == nil { obf, e := s.openWriteFileFunc(wc.curFileNum) if e != nil { wc.curFile.Unlock() D.Ln("ROLLBACK:", e) return } wc.curFile.file = obf } // Truncate the to the provided rollback offset. if e := wc.curFile.file.Truncate(int64(oldBlockOffset)); E.Chk(e) { wc.curFile.Unlock() W.Ln( "ROLLBACK: Failed to truncate file %d: %v %s", wc.curFileNum, e, ) return } // Sync the file to disk. e := wc.curFile.file.Sync() wc.curFile.Unlock() if e != nil { W.Ln( "ROLLBACK: Failed to sync file %d: %v %s", wc.curFileNum, e, ) return } } // scanBlockFiles searches the database directory for all flat block files to find the end of the most recent file. // // This position is considered the current write cursor which is also stored in the metadata. // // Thus, it is used to detect unexpected shutdowns in the middle of writes so the block files can be reconciled. func scanBlockFiles(dbPath string) (int, uint32) { lastFile := -1 fileLen := uint32(0) for i := 0; ; i++ { filePath := blockFilePath(dbPath, uint32(i)) st, e := os.Stat(filePath) if e != nil { T.Ln(e) break } lastFile = i fileLen = uint32(st.Size()) } T.F("Scan found latest block file #%d with length %d", lastFile, fileLen) return lastFile, fileLen } // newBlockStore returns a new block store with the current block file number and offset set and all fields initialized. func newBlockStore(basePath string, network wire.BitcoinNet) *blockStore { // Look for the end of the latest block to file to determine what the write cursor position is from the viewpoint of // the block files on disk. fileNum, fileOff := scanBlockFiles(basePath) if fileNum == -1 { fileNum = 0 fileOff = 0 } store := &blockStore{ network: network, basePath: basePath, maxBlockFileSize: maxBlockFileSize, openBlockFiles: make(map[uint32]*lockableFile), openBlocksLRU: list.New(), fileNumToLRUElem: make(map[uint32]*list.Element), writeCursor: &writeCursor{ curFile: &lockableFile{}, curFileNum: uint32(fileNum), curOffset: fileOff, }, } store.openFileFunc = store.openFile store.openWriteFileFunc = store.openWriteFile store.deleteFileFunc = store.deleteFile return store }