package ffldb import ( "bytes" "encoding/binary" "fmt" "github.com/p9c/p9/pkg/block" "os" "path/filepath" "runtime" "sort" "sync" "github.com/btcsuite/goleveldb/leveldb" "github.com/btcsuite/goleveldb/leveldb/comparer" ldberrors "github.com/btcsuite/goleveldb/leveldb/errors" "github.com/btcsuite/goleveldb/leveldb/filter" "github.com/btcsuite/goleveldb/leveldb/iterator" "github.com/btcsuite/goleveldb/leveldb/opt" "github.com/btcsuite/goleveldb/leveldb/util" "github.com/p9c/p9/pkg/chainhash" "github.com/p9c/p9/pkg/database" "github.com/p9c/p9/pkg/util/treap" "github.com/p9c/p9/pkg/wire" ) const ( // metadataDbName is the name used for the metadata database. metadataDbName = "metadata" // blockHdrSize is the size of a block header. // // This is simply the constant from wire and is only provided here for convenience since wire.MaxBlockHeaderPayload // is quite long. blockHdrSize = wire.MaxBlockHeaderPayload // blockHdrOffset defines the offsets into a block index row for the block header. // // The serialized block index row format is: // // blockHdrOffset = blockLocSize ) var ( // byteOrder is the preferred byte order used through the database and block files. Sometimes big endian will be // used to allow ordered byte sortable integer values. byteOrder = binary.LittleEndian // bucketIndexPrefix is the prefix used for all entries in the bucket index. bucketIndexPrefix = []byte("bidx") // curBucketIDKeyName is the name of the key used to keep track of the current bucket ID counter. curBucketIDKeyName = []byte("bidx-cbid") // metadataBucketID is the ID of the top-level metadata bucket. It is the value 0 encoded as an unsigned big-endian // uint32. metadataBucketID = [4]byte{} // blockIdxBucketID is the ID of the internal block metadata bucket. It is the value 1 encoded as an unsigned // big-endian uint32. blockIdxBucketID = [4]byte{0x00, 0x00, 0x00, 0x01} // blockIdxBucketName is the bucket used internally to track block metadata. blockIdxBucketName = []byte("ffldb-blockidx") // writeLocKeyName is the key used to store the current write file location. writeLocKeyName = []byte("ffldb-writeloc") ) // Common error strings. const ( // errDbNotOpenStr is the text to use for the database.ErrDbNotOpen error code. errDbNotOpenStr = "database is not open" // errTxClosedStr is the text to use for the database.ErrTxClosed error code. errTxClosedStr = "database tx is closed" ) // bulkFetchData is allows a block location to be specified along with the index it was requested from. // // This in turn allows the bulk data loading functions to txsort the data accesses based on the location to improve // performance while keeping track of which result the data is for. type bulkFetchData struct { *blockLocation replyIndex int } // bulkFetchDataSorter implements txsort. // // Interface to allow a slice of bulkFetchData to be sorted. In particular it sorts by file and then offset so that // reads from files are grouped and linear. type bulkFetchDataSorter []bulkFetchData // Len returns the number of items in the slice. // // It is part of the sort.Interface implementation. func (s bulkFetchDataSorter) Len() int { return len(s) } // Swap swaps the items at the passed indices. // // It is part of the sort.Interface implementation. func (s bulkFetchDataSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] } // Less returns whether the item with index i should txsort before the item with index j. // // It is part of the sort.Interface implementation. func (s bulkFetchDataSorter) Less(i, j int) bool { if s[i].blockFileNum < s[j].blockFileNum { return true } if s[i].blockFileNum > s[j].blockFileNum { return false } return s[i].fileOffset < s[j].fileOffset } // makeDbErr creates a database.DBError given a set of arguments. func makeDbErr(c database.ErrorCode, desc string, e error) database.DBError { return database.DBError{ErrorCode: c, Description: desc, Err: e} } // convertErr converts the passed leveldb error into a database error with an equivalent error code and the passed // description. // // It also sets the passed error as the underlying error. func convertErr(desc string, ldbErr error) database.DBError { // Use the driver-specific error code by default. // // The code below will update this with the converted error if it's recognized. var code = database.ErrDriverSpecific switch { // Database corruption errors. case ldberrors.IsCorrupted(ldbErr): code = database.ErrCorruption // Database open/create errors. case ldbErr == leveldb.ErrClosed: code = database.ErrDbNotOpen // Transaction errors. case ldbErr == leveldb.ErrSnapshotReleased: code = database.ErrTxClosed case ldbErr == leveldb.ErrIterReleased: code = database.ErrTxClosed } return database.DBError{ErrorCode: code, Description: desc, Err: ldbErr} } // copySlice returns a copy of the passed slice. // // This is mostly used to copy leveldb iterator keys and values since they are only valid until the iterator is moved // instead of during the entirety of the transaction. func copySlice(slice []byte) []byte { ret := make([]byte, len(slice)) copy(ret, slice) return ret } // cursor is an internal type used to represent a cursor over key/value pairs and nested buckets of a bucket and // implements the database.Cursor interface. type cursor struct { bucket *bucket dbIter iterator.Iterator pendingIter iterator.Iterator currentIter iterator.Iterator } // Enforce cursor implements the database.Cursor interface. var _ database.Cursor = (*cursor)(nil) // Bucket returns the bucket the cursor was created for. // // This function is part of the database.Cursor interface implementation. func (c *cursor) Bucket() database.Bucket { // Ensure transaction state is valid. if e := c.bucket.tx.checkClosed(); E.Chk(e) { return nil } return c.bucket } // Delete removes the current key/value pair the cursor is at without invalidating the cursor. // // Returns the following errors as required by the interface contract: // // - ErrIncompatibleValue if attempted when the cursor points to a nested bucket // // - ErrTxNotWritable if attempted against a read-only transaction // // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Cursor interface implementation. func (c *cursor) Delete() (e error) { // Ensure transaction state is valid. if e := c.bucket.tx.checkClosed(); E.Chk(e) { return e } // DBError if the cursor is exhausted. if c.currentIter == nil { str := "cursor is exhausted" return makeDbErr(database.ErrIncompatibleValue, str, nil) } // Do not allow buckets to be deleted via the cursor. key := c.currentIter.Key() if bytes.HasPrefix(key, bucketIndexPrefix) { str := "buckets may not be deleted from a cursor" return makeDbErr(database.ErrIncompatibleValue, str, nil) } c.bucket.tx.deleteKey(copySlice(key), true) return nil } // skipPendingUpdates skips any keys at the current database iterator position that are being updated by the // transaction. // // The forwards flag indicates the direction the cursor is moving. func (c *cursor) skipPendingUpdates(forwards bool) { for c.dbIter.Valid() { var skip bool key := c.dbIter.Key() if c.bucket.tx.pendingRemove.Has(key) { skip = true } else if c.bucket.tx.pendingKeys.Has(key) { skip = true } if !skip { break } if forwards { c.dbIter.Next() } else { c.dbIter.Prev() } } } // chooseIterator first skips any entries in the database iterator that are being updated by the transaction and sets // the current iterator to the appropriate iterator depending on their validity and the order they compare in while // taking into account the direction flag. // // When the cursor is being moved forwards and both iterators are valid, the iterator with the smaller key is chosen and // vice versa when the cursor is being moved backwards. func (c *cursor) chooseIterator(forwards bool) bool { // Skip any keys at the current database iterator position that are being updated by the transaction. c.skipPendingUpdates(forwards) // When both iterators are exhausted, the cursor is exhausted too. if !c.dbIter.Valid() && !c.pendingIter.Valid() { c.currentIter = nil return false } // Choose the database iterator when the pending keys iterator is exhausted. if !c.pendingIter.Valid() { c.currentIter = c.dbIter return true } // Choose the pending keys iterator when the database iterator is exhausted. if !c.dbIter.Valid() { c.currentIter = c.pendingIter return true } // Both iterators are valid, so choose the iterator with either the smaller or larger key depending on the forwards // flag. compare := bytes.Compare(c.dbIter.Key(), c.pendingIter.Key()) if (forwards && compare > 0) || (!forwards && compare < 0) { c.currentIter = c.pendingIter } else { c.currentIter = c.dbIter } return true } // First positions the cursor at the first key/value pair and returns whether or not the pair exists. This function is // part of the database. // // Cursor interface implementation. func (c *cursor) First() bool { // Ensure transaction state is valid. if e := c.bucket.tx.checkClosed(); E.Chk(e) { return false } // Seek to the first key in both the database and pending iterators and choose the iterator that is both valid and // has the smaller key. c.dbIter.First() c.pendingIter.First() return c.chooseIterator(true) } // Last positions the cursor at the last key/value pair and returns whether or not the pair exists. This function is // part of the database. // // Cursor interface implementation. func (c *cursor) Last() bool { // Ensure transaction state is valid. if e := c.bucket.tx.checkClosed(); E.Chk(e) { return false } // Seek to the last key in both the database and pending iterators and choose the iterator that is both valid and // has the larger key. c.dbIter.Last() c.pendingIter.Last() return c.chooseIterator(false) } // Next moves the cursor one key/value pair forward and returns whether or not the pair exists. This function is part of // the database. // // Cursor interface implementation. func (c *cursor) Next() bool { // Ensure transaction state is valid. if e := c.bucket.tx.checkClosed(); E.Chk(e) { return false } // Nothing to return if cursor is exhausted. if c.currentIter == nil { return false } // Move the current iterator to the next entry and choose the iterator that is both valid and has the smaller key. c.currentIter.Next() return c.chooseIterator(true) } // Prev moves the cursor one key/value pair backward and returns whether or not the pair exists. This function is part // of the database. // // Cursor interface implementation. func (c *cursor) Prev() bool { // Ensure transaction state is valid. if e := c.bucket.tx.checkClosed(); E.Chk(e) { return false } // Nothing to return if cursor is exhausted. if c.currentIter == nil { return false } // Move the current iterator to the previous entry and choose the iterator that is both valid and has the larger // key. c.currentIter.Prev() return c.chooseIterator(false) } // Seek positions the cursor at the first key/value pair that is greater than or equal to the passed seek key. // // Returns false if no suitable key was found. // // This function is part of the database.Cursor interface implementation. func (c *cursor) Seek(seek []byte) bool { // Ensure transaction state is valid. if e := c.bucket.tx.checkClosed(); E.Chk(e) { return false } // Seek to the provided key in both the database and pending iterators then choose the iterator that is both valid // and has the larger key. seekKey := bucketizedKey(c.bucket.id, seek) c.dbIter.Seek(seekKey) c.pendingIter.Seek(seekKey) return c.chooseIterator(true) } // rawKey returns the current key the cursor is pointing to without stripping the current bucket prefix or bucket index // prefix. func (c *cursor) rawKey() []byte { // Nothing to return if cursor is exhausted. if c.currentIter == nil { return nil } return copySlice(c.currentIter.Key()) } // Key returns the current key the cursor is pointing to. // // This function is part of the database.Cursor interface implementation. func (c *cursor) Key() []byte { // Ensure transaction state is valid. if e := c.bucket.tx.checkClosed(); E.Chk(e) { return nil } // Nothing to return if cursor is exhausted. if c.currentIter == nil { return nil } // Slice out the actual key name and make a copy since it is no longer valid after iterating to the next item. // // The key is after the bucket index prefix and parent ID when the cursor is pointing to a nested bucket. key := c.currentIter.Key() if bytes.HasPrefix(key, bucketIndexPrefix) { key = key[len(bucketIndexPrefix)+4:] return copySlice(key) } // The key is after the bucket ID when the cursor is pointing to a normal entry. key = key[len(c.bucket.id):] return copySlice(key) } // rawValue returns the current value the cursor is pointing to without stripping without filtering bucket index values. func (c *cursor) rawValue() []byte { // Nothing to return if cursor is exhausted. if c.currentIter == nil { return nil } return copySlice(c.currentIter.Value()) } // Value returns the current value the cursor is pointing to. // // This will be nil for nested buckets. // // This function is part of the database.Cursor interface implementation. func (c *cursor) Value() []byte { // Ensure transaction state is valid. if e := c.bucket.tx.checkClosed(); E.Chk(e) { return nil } // Nothing to return if cursor is exhausted. if c.currentIter == nil { return nil } // Return nil for the value when the cursor is pointing to a nested // bucket. if bytes.HasPrefix(c.currentIter.Key(), bucketIndexPrefix) { return nil } return copySlice(c.currentIter.Value()) } // cursorType defines the type of cursor to create. type cursorType int // The following constants define the allowed cursor types. const ( // ctKeys iterates through all of the keys in a given bucket. ctKeys cursorType = iota // ctBuckets iterates through all directly nested buckets in a given bucket. ctBuckets // ctFull iterates through both the keys and the directly nested buckets // in a given bucket. ctFull ) // cursorFinalizer is either invoked when a cursor is being garbage collected or called manually to ensure the // underlying cursor iterators are released. func cursorFinalizer(c *cursor) { c.dbIter.Release() c.pendingIter.Release() } // newCursor returns a new cursor for the given bucket, bucket ID, and cursor type. // // NOTE: The caller is responsible for calling the cursorFinalizer function on the returned cursor. func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor { var dbIter, pendingIter iterator.Iterator switch cursorTyp { case ctKeys: keyRange := util.BytesPrefix(bucketID) dbIter = b.tx.snapshot.NewIterator(keyRange) pendingKeyIter := newLdbTreapIter(b.tx, keyRange) pendingIter = pendingKeyIter case ctBuckets: // The serialized bucket index key format is: // // // // Create an iterator for the both the database and the pending keys which are prefixed by the bucket index // identifier and the provided bucket ID. prefix := make([]byte, len(bucketIndexPrefix)+4) copy(prefix, bucketIndexPrefix) copy(prefix[len(bucketIndexPrefix):], bucketID) bucketRange := util.BytesPrefix(prefix) dbIter = b.tx.snapshot.NewIterator(bucketRange) pendingBucketIter := newLdbTreapIter(b.tx, bucketRange) pendingIter = pendingBucketIter case ctFull: fallthrough default: // The serialized bucket index key format is: // // prefix := make([]byte, len(bucketIndexPrefix)+4) copy(prefix, bucketIndexPrefix) copy(prefix[len(bucketIndexPrefix):], bucketID) bucketRange := util.BytesPrefix(prefix) keyRange := util.BytesPrefix(bucketID) // Since both keys and buckets are needed from the database, create an individual iterator for each prefix and // then create a merged iterator from them. dbKeyIter := b.tx.snapshot.NewIterator(keyRange) dbBucketIter := b.tx.snapshot.NewIterator(bucketRange) iters := []iterator.Iterator{dbKeyIter, dbBucketIter} dbIter = iterator.NewMergedIterator( iters, comparer.DefaultComparer, true, ) // Since both keys and buckets are needed from the pending keys, create an individual iterator for each prefix // and then create a merged iterator from them. pendingKeyIter := newLdbTreapIter(b.tx, keyRange) pendingBucketIter := newLdbTreapIter(b.tx, bucketRange) iters = []iterator.Iterator{pendingKeyIter, pendingBucketIter} pendingIter = iterator.NewMergedIterator( iters, comparer.DefaultComparer, true, ) } // Create the cursor using the iterators. return &cursor{bucket: b, dbIter: dbIter, pendingIter: pendingIter} } // bucket is an internal type used to represent a collection of key/value pairs and implements the database.Bucket // interface. type bucket struct { tx *transaction id [4]byte } // Enforce bucket implements the database.Bucket interface. var _ database.Bucket = (*bucket)(nil) // bucketIndexKey returns the actual key to use for storing and retrieving a child bucket in the bucket index. // // This is required because additional information is needed to distinguish nested buckets with the same name. func bucketIndexKey(parentID [4]byte, key []byte) []byte { // The serialized bucket index key format is: // // indexKey := make([]byte, len(bucketIndexPrefix)+4+len(key)) copy(indexKey, bucketIndexPrefix) copy(indexKey[len(bucketIndexPrefix):], parentID[:]) copy(indexKey[len(bucketIndexPrefix)+4:], key) return indexKey } // bucketizedKey returns the actual key to use for storing and retrieving a key for the provided bucket ID. // // This is required because bucketizing is handled through the use of a unique prefix per bucket. func bucketizedKey(bucketID [4]byte, key []byte) []byte { // The serialized block index key format is: // bKey := make([]byte, 4+len(key)) copy(bKey, bucketID[:]) copy(bKey[4:], key) return bKey } // Bucket retrieves a nested bucket with the given key. // // Returns nil if the bucket does not exist. // // This function is part of the database.Bucket interface implementation. func (b *bucket) Bucket(key []byte) database.Bucket { // Ensure transaction state is valid. if e := b.tx.checkClosed(); E.Chk(e) { return nil } // Attempt to fetch the ID for the child bucket. The bucket does not exist if the bucket index entry does not exist. childID := b.tx.fetchKey(bucketIndexKey(b.id, key)) if childID == nil { return nil } childBucket := &bucket{tx: b.tx} copy(childBucket.id[:], childID) return childBucket } // CreateBucket creates and returns a new nested bucket with the given key. // // Returns the following errors as required by the interface contract: // // - ErrBucketExists if the bucket already exists // // - ErrBucketNameRequired if the key is empty // // - ErrIncompatibleValue if the key is otherwise invalid for the particular implementation // // - ErrTxNotWritable if attempted against a read-only transaction // // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Bucket interface implementation. func (b *bucket) CreateBucket(key []byte) (database.Bucket, error) { // Ensure transaction state is valid. if e := b.tx.checkClosed(); E.Chk(e) { return nil, e } // Ensure the transaction is writable. if !b.tx.writable { str := "create bucket requires a writable database transaction" return nil, makeDbErr(database.ErrTxNotWritable, str, nil) } // Ensure a key was provided. if len(key) == 0 { str := "create bucket requires a key" return nil, makeDbErr(database.ErrBucketNameRequired, str, nil) } // Ensure bucket does not already exist. bidxKey := bucketIndexKey(b.id, key) if b.tx.hasKey(bidxKey) { str := "bucket already exists" return nil, makeDbErr(database.ErrBucketExists, str, nil) } // Find the appropriate next bucket ID to use for the new bucket. In the case of the special internal block index, // keep the fixed ID. var childID [4]byte if b.id == metadataBucketID && bytes.Equal(key, blockIdxBucketName) { childID = blockIdxBucketID } else { var e error childID, e = b.tx.nextBucketID() if e != nil { return nil, e } } // Add the new bucket to the bucket index. if e := b.tx.putKey(bidxKey, childID[:]); E.Chk(e) { str := fmt.Sprintf("failed to create bucket with key %q", key) return nil, convertErr(str, e) } return &bucket{tx: b.tx, id: childID}, nil } // CreateBucketIfNotExists creates and returns a new nested bucket with the given key if it does not already exist. // // Returns the following errors as required by the interface contract: // // - ErrBucketNameRequired if the key is empty // // - ErrIncompatibleValue if the key is otherwise invalid for the particular implementation // // - ErrTxNotWritable if attempted against a read-only transaction // // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Bucket interface implementation. func (b *bucket) CreateBucketIfNotExists(key []byte) (database.Bucket, error) { // Ensure transaction state is valid. if e := b.tx.checkClosed(); E.Chk(e) { return nil, e } // Ensure the transaction is writable. if !b.tx.writable { str := "create bucket requires a writable database transaction" return nil, makeDbErr(database.ErrTxNotWritable, str, nil) } // Return existing bucket if it already exists, otherwise create it. if bucket := b.Bucket(key); bucket != nil { return bucket, nil } return b.CreateBucket(key) } // DeleteBucket removes a nested bucket with the given key. // // Returns the following errors as required by the interface contract: // // - ErrBucketNotFound if the specified bucket does not exist // // - ErrTxNotWritable if attempted against a read-only transaction // // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Bucket interface implementation. func (b *bucket) DeleteBucket(key []byte) (e error) { // Ensure transaction state is valid. if e := b.tx.checkClosed(); E.Chk(e) { return e } // Ensure the transaction is writable. if !b.tx.writable { str := "delete bucket requires a writable database transaction" return makeDbErr(database.ErrTxNotWritable, str, nil) } // Attempt to fetch the ID for the child bucket. // // The bucket does not exist if the bucket index entry does not exist. // // In the case of the special internal block index, keep the fixed ID. bidxKey := bucketIndexKey(b.id, key) childID := b.tx.fetchKey(bidxKey) if childID == nil { str := fmt.Sprintf("bucket %q does not exist", key) return makeDbErr(database.ErrBucketNotFound, str, nil) } // Remove all nested buckets and their keys. childIDs := [][]byte{childID} for len(childIDs) > 0 { childID = childIDs[len(childIDs)-1] childIDs = childIDs[:len(childIDs)-1] // Delete all keys in the nested bucket. keyCursor := newCursor(b, childID, ctKeys) for ok := keyCursor.First(); ok; ok = keyCursor.Next() { b.tx.deleteKey(keyCursor.rawKey(), false) } cursorFinalizer(keyCursor) // Iterate through all nested buckets. bucketCursor := newCursor(b, childID, ctBuckets) for ok := bucketCursor.First(); ok; ok = bucketCursor.Next() { // Push the id of the nested bucket onto the stack for the next // iteration. childID := bucketCursor.rawValue() childIDs = append(childIDs, childID) // Remove the nested bucket from the bucket index. b.tx.deleteKey(bucketCursor.rawKey(), false) } cursorFinalizer(bucketCursor) } // Remove the nested bucket from the bucket index. Any buckets nested under it were already removed above. b.tx.deleteKey(bidxKey, true) return nil } // Cursor returns a new cursor, allowing for iteration over the bucket's key/value pairs and nested buckets in forward // or backward order. // // You must seek to a position using the First, Last, or Seek functions before calling the Next, Prev, Key, or value // functions. Failure to do so will result in the same return values as an exhausted cursor, which is false for the Prev // and Next functions and nil for Key and value functions. This function is part of the database. // // Bucket interface implementation. func (b *bucket) Cursor() database.Cursor { // Ensure transaction state is valid. if e := b.tx.checkClosed(); E.Chk(e) { return &cursor{bucket: b} } // Create the cursor and setup a runtime finalizer to ensure the iterators are released when the cursor is garbage // collected. c := newCursor(b, b.id[:], ctFull) runtime.SetFinalizer(c, cursorFinalizer) return c } // ForEach invokes the passed function with every key/value pair in the bucket. This does not include nested buckets or // the key/value pairs within those nested buckets. // // WARNING: It is not safe to mutate data while iterating with this method. // // Doing so may cause the underlying cursor to be invalidated and return // unexpected keys and/or values. // // Returns the following errors as required by the interface contract: // // - ErrTxClosed if the transaction has already been closed // // NOTE: The values returned by this function are only valid during a transaction. Attempting to access them after a // transaction has ended will likely result in an access violation. This function is part of the database.Bucket // interface implementation. func (b *bucket) ForEach(fn func(k, v []byte) error) (e error) { // Ensure transaction state is valid. if e := b.tx.checkClosed(); E.Chk(e) { return e } // Invoke the callback for each cursor item. Return the error returned from the callback when it is non-nil. c := newCursor(b, b.id[:], ctKeys) defer cursorFinalizer(c) for ok := c.First(); ok; ok = c.Next() { e := fn(c.Key(), c.Value()) if e != nil { return e } } return nil } // ForEachBucket invokes the passed function with the key of every nested bucket in the current bucket. // // This does not include any nested buckets within those nested buckets. // // WARNING: It is not safe to mutate data while iterating with this method. // // Doing so may cause the underlying cursor to be invalidated and return unexpected keys. // // Returns the following errors as required by the interface contract: // // - ErrTxClosed if the transaction has already been closed // // NOTE: The values returned by this function are only valid during a transaction. Attempting to access them after a // transaction has ended will likely result in an access violation. // // This function is part of the database.Bucket interface implementation. func (b *bucket) ForEachBucket(fn func(k []byte) error) (e error) { // Ensure transaction state is valid. if e := b.tx.checkClosed(); E.Chk(e) { return e } // Invoke the callback for each cursor item. Return the error returned from the callback when it is non-nil. c := newCursor(b, b.id[:], ctBuckets) defer cursorFinalizer(c) for ok := c.First(); ok; ok = c.Next() { e := fn(c.Key()) if e != nil { return e } } return nil } // Writable returns whether or not the bucket is writable. // // This function is part of the database.Bucket interface implementation. func (b *bucket) Writable() bool { return b.tx.writable } // Put saves the specified key/value pair to the bucket. // // Keys that do not already exist are added and keys that already exist are overwritten. // // Returns the following errors as required by the interface contract: // // - ErrKeyRequired if the key is empty // - ErrIncompatibleValue if the key is the same as an existing bucket // - ErrTxNotWritable if attempted against a read-only transaction // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Bucket interface implementation. func (b *bucket) Put(key, value []byte) (e error) { // Ensure transaction state is valid. if e := b.tx.checkClosed(); E.Chk(e) { return e } // Ensure the transaction is writable. if !b.tx.writable { str := "setting a key requires a writable database transaction" return makeDbErr(database.ErrTxNotWritable, str, nil) } // Ensure a key was provided. if len(key) == 0 { str := "put requires a key" return makeDbErr(database.ErrKeyRequired, str, nil) } return b.tx.putKey(bucketizedKey(b.id, key), value) } // Get returns the value for the given key. // // Returns nil if the key does not exist in this bucket. // // An empty slice is returned for keys that exist but have no value assigned. // // NOTE: The value returned by this function is only valid during a transaction. Attempting to access it after a // transaction has ended results in undefined behavior. Additionally, the value must NOT be modified by the caller. // // This function is part of the database.Bucket interface implementation. func (b *bucket) Get(key []byte) []byte { // Ensure transaction state is valid. if e := b.tx.checkClosed(); E.Chk(e) { return nil } // Nothing to return if there is no key. if len(key) == 0 { return nil } return b.tx.fetchKey(bucketizedKey(b.id, key)) } // Delete removes the specified key from the bucket. // // Deleting a key that does not exist does not return an error. // // Returns the following errors as required by the interface contract: // // - ErrKeyRequired if the key is empty // - ErrIncompatibleValue if the key is the same as an existing bucket // - ErrTxNotWritable if attempted against a read-only transaction // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Bucket interface implementation. func (b *bucket) Delete(key []byte) (e error) { // Ensure transaction state is valid. if e := b.tx.checkClosed(); E.Chk(e) { return e } // Ensure the transaction is writable. if !b.tx.writable { str := "deleting a value requires a writable database transaction" return makeDbErr(database.ErrTxNotWritable, str, nil) } // Nothing to do if there is no key. if len(key) == 0 { return nil } b.tx.deleteKey(bucketizedKey(b.id, key), true) return nil } // pendingBlock houses a block that will be written to disk when the database transaction is committed. type pendingBlock struct { hash *chainhash.Hash bytes []byte } // transaction represents a database transaction. // // It can either be read-only or read-write and implements the database. // // Bucket interface. The transaction provides a root bucket against which all read and writes occur. type transaction struct { managed bool // Is the transaction managed? closed bool // Is the transaction closed? writable bool // Is the transaction writable? db *db // DB instance the tx was created from. snapshot *dbCacheSnapshot // Underlying snapshot for txns. metaBucket *bucket // The root metadata bucket. blockIdxBucket *bucket // The block index bucket. // Blocks that need to be stored on commit. // // The pendingBlocks map is kept to allow quick lookups of pending data by block hash. pendingBlocks map[chainhash.Hash]int pendingBlockData []pendingBlock // Keys that need to be stored or deleted on commit. pendingKeys *treap.Mutable pendingRemove *treap.Mutable // Active iterators that need to be notified when the pending keys have been updated so the cursors can properly // handle updates to the transaction state. activeIterLock sync.RWMutex activeIters []*treap.Iterator } // Enforce transaction implements the database.Tx interface. var _ database.Tx = (*transaction)(nil) // removeActiveIter removes the passed iterator from the list of active iterators against the pending keys treap. func (tx *transaction) removeActiveIter(iter *treap.Iterator) { // An indexing for loop is intentionally used over a range here as range does not reevaluate the slice on each // iteration nor does it adjust the index for the modified slice. tx.activeIterLock.Lock() for i := 0; i < len(tx.activeIters); i++ { if tx.activeIters[i] == iter { copy(tx.activeIters[i:], tx.activeIters[i+1:]) tx.activeIters[len(tx.activeIters)-1] = nil tx.activeIters = tx.activeIters[:len(tx.activeIters)-1] } } tx.activeIterLock.Unlock() } // addActiveIter adds the passed iterator to the list of active iterators for the pending keys treap. func (tx *transaction) addActiveIter(iter *treap.Iterator) { tx.activeIterLock.Lock() tx.activeIters = append(tx.activeIters, iter) tx.activeIterLock.Unlock() } // notifyActiveIters notifies all of the active iterators for the pending keys treap that it has been updated. func (tx *transaction) notifyActiveIters() { tx.activeIterLock.RLock() for _, iter := range tx.activeIters { iter.ForceReseek() } tx.activeIterLock.RUnlock() } // checkClosed returns an error if the the database or transaction is closed. func (tx *transaction) checkClosed() (e error) { // The transaction is no longer valid if it has been closed. if tx.closed { return makeDbErr(database.ErrTxClosed, errTxClosedStr, nil) } return nil } // hasKey returns whether or not the provided key exists in the database while taking into account the current // transaction state. func (tx *transaction) hasKey(key []byte) bool { // When the transaction is writable, check the pending transaction state first. if tx.writable { if tx.pendingRemove.Has(key) { return false } if tx.pendingKeys.Has(key) { return true } } // Consult the database cache and underlying database. return tx.snapshot.Has(key) } // putKey adds the provided key to the list of keys to be updated in the database when the transaction is committed. // // NOTE: This function must only be called on a writable transaction. // // Since it is an internal helper function, it does not check. func (tx *transaction) putKey(key, value []byte) (e error) { // Prevent the key from being deleted if it was previously scheduled to // be deleted on transaction commit. tx.pendingRemove.Delete(key) // Add the key/value pair to the list to be written on transaction commit. tx.pendingKeys.Put(key, value) tx.notifyActiveIters() return nil } // fetchKey attempts to fetch the provided key from the database cache ( and hence underlying database) while taking // into account the current transaction state. Returns nil if the key does not exist. func (tx *transaction) fetchKey(key []byte) []byte { // When the transaction is writable, check the pending transaction state first. if tx.writable { if tx.pendingRemove.Has(key) { return nil } if value := tx.pendingKeys.Get(key); value != nil { return value } } // Consult the database cache and underlying database. return tx.snapshot.Get(key) } // deleteKey adds the provided key to the list of keys to be deleted from the database when the transaction is // committed. // // The notify iterators flag is useful to delay notifying iterators about the changes during bulk deletes. // // NOTE: This function must only be called on a writable transaction. // // Since it is an internal helper function, it does not check. func (tx *transaction) deleteKey(key []byte, notifyIterators bool) { // Remove the key from the list of pendings keys to be written on transaction commit if needed. tx.pendingKeys.Delete(key) // Add the key to the list to be deleted on transaction commit. tx.pendingRemove.Put(key, nil) // Notify the active iterators about the change if the flag is set. if notifyIterators { tx.notifyActiveIters() } } // nextBucketID returns the next bucket ID to use for creating a new bucket. // // NOTE: This function must only be called on a writable transaction. // // Since it is an internal helper function, it does not check. func (tx *transaction) nextBucketID() ([4]byte, error) { // Load the currently highest used bucket ID. curIDBytes := tx.fetchKey(curBucketIDKeyName) curBucketNum := binary.BigEndian.Uint32(curIDBytes) // Increment and update the current bucket ID and return it. var nextBucketID [4]byte binary.BigEndian.PutUint32(nextBucketID[:], curBucketNum+1) if e := tx.putKey(curBucketIDKeyName, nextBucketID[:]); E.Chk(e) { return [4]byte{}, e } return nextBucketID, nil } // Metadata returns the top-most bucket for all metadata storage. // // This function is part of the database.Tx interface implementation. func (tx *transaction) Metadata() database.Bucket { return tx.metaBucket } // hasBlock returns whether or not a block with the given hash exists. func (tx *transaction) hasBlock(hash *chainhash.Hash) bool { // Return true if the block is pending to be written on commit since it exists from the viewpoint of this // transaction. if _, exists := tx.pendingBlocks[*hash]; exists { return true } return tx.hasKey(bucketizedKey(blockIdxBucketID, hash[:])) } // StoreBlock stores the provided block into the database. // // There are no checks to ensure the block connects to a previous block, contains double spends, or any additional // functionality such as transaction indexing. // // It simply stores the block in the database. // // Returns the following errors as required by the interface contract: // // - ErrBlockExists when the block hash already exists // // - ErrTxNotWritable if attempted against a read-only transaction // // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Tx interface implementation. func (tx *transaction) StoreBlock(block *block.Block) (e error) { // Ensure transaction state is valid. if e = tx.checkClosed(); E.Chk(e) { return e } // Ensure the transaction is writable. if !tx.writable { str := "store block requires a writable database transaction" return makeDbErr(database.ErrTxNotWritable, str, nil) } // Reject the block if it already exists. blockHash := block.Hash() if tx.hasBlock(blockHash) { str := fmt.Sprintf("block %s already exists", blockHash) return makeDbErr(database.ErrBlockExists, str, nil) } blockBytes, e := block.Bytes() if e != nil { str := fmt.Sprintf( "failed to get serialized bytes for block %s", blockHash, ) return makeDbErr(database.ErrDriverSpecific, str, e) } // Add the block to be stored to the list of pending blocks to store when the transaction is committed. // // Also add it to pending blocks map so it is easy to determine the block is pending based on the block hash. if tx.pendingBlocks == nil { tx.pendingBlocks = make(map[chainhash.Hash]int) } tx.pendingBlocks[*blockHash] = len(tx.pendingBlockData) tx.pendingBlockData = append( tx.pendingBlockData, pendingBlock{ hash: blockHash, bytes: blockBytes, }, ) // Tracef("added block %s to pending blocks", blockHash) return nil } // HasBlock returns whether or not a block with the given hash exists in the database. // // Returns the following errors as required by the interface contract: // // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Tx interface implementation. func (tx *transaction) HasBlock(hash *chainhash.Hash) (bool, error) { // Ensure transaction state is valid. if e := tx.checkClosed(); E.Chk(e) { return false, e } return tx.hasBlock(hash), nil } // HasBlocks returns whether or not the blocks with the provided hashes exist in the database. // // Returns the following errors as required by the interface contract: // // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Tx interface implementation. func (tx *transaction) HasBlocks(hashes []chainhash.Hash) ([]bool, error) { // Ensure transaction state is valid. if e := tx.checkClosed(); E.Chk(e) { return nil, e } results := make([]bool, len(hashes)) for i := range hashes { results[i] = tx.hasBlock(&hashes[i]) } return results, nil } // fetchBlockRow fetches the metadata stored in the block index for the provided hash. It will return ErrBlockNotFound // if there is no entry. func (tx *transaction) fetchBlockRow(hash *chainhash.Hash) ([]byte, error) { blockRow := tx.blockIdxBucket.Get(hash[:]) if blockRow == nil { str := fmt.Sprintf("block %s does not exist", hash) return nil, makeDbErr(database.ErrBlockNotFound, str, nil) } return blockRow, nil } // FetchBlockHeader returns the raw serialized bytes for the block header identified by the given hash. // // The raw bytes are in the format returned by Serialize on a wire.BlockHeader. // // Returns the following errors as required by the interface contract: // // - ErrBlockNotFound if the requested block hash does not exist // // - ErrTxClosed if the transaction has already been closed // // - ErrCorruption if the database has somehow become corrupted // // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a // transaction has ended results in undefined behavior. // // This constraint prevents additional data copies and allows support for memory-mapped database implementations. // // This function is part of the database.Tx interface implementation. func (tx *transaction) FetchBlockHeader(hash *chainhash.Hash) ([]byte, error) { return tx.FetchBlockRegion( &database.BlockRegion{ Hash: hash, Offset: 0, Len: blockHdrSize, }, ) } // FetchBlockHeaders returns the raw serialized bytes for the block headers identified by the given hashes. // // The raw bytes are in the format returned by Serialize on a wire.BlockHeader. // // Returns the following errors as required by the interface contract: // // - ErrBlockNotFound if the any of the requested block hashes do not exist // // - ErrTxClosed if the transaction has already been closed // // - ErrCorruption if the database has somehow become corrupted // // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a // transaction has ended results in undefined behavior. // // This constraint prevents additional data copies and allows support for memory-mapped database implementations. // // This function is part of the database.Tx interface implementation. func (tx *transaction) FetchBlockHeaders(hashes []chainhash.Hash) ([][]byte, error) { regions := make([]database.BlockRegion, len(hashes)) for i := range hashes { regions[i].Hash = &hashes[i] regions[i].Offset = 0 regions[i].Len = blockHdrSize } return tx.FetchBlockRegions(regions) } // FetchBlock returns the raw serialized bytes for the block identified by the given hash. The raw bytes are in the // format returned by Serialize on a wire.Block. // // Returns the following errors as required by the interface contract: // // - ErrBlockNotFound if the requested block hash does not exist // // - ErrTxClosed if the transaction has already been closed // // - ErrCorruption if the database has somehow become corrupted // // In addition, returns ErrDriverSpecific if any failures occur when reading the block files. // // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a // transaction has ended results in undefined behavior. // // This constraint prevents additional data copies and allows support for memory-mapped database implementations. // // This function is part of the database.Tx interface implementation. func (tx *transaction) FetchBlock(hash *chainhash.Hash) ([]byte, error) { // Ensure transaction state is valid. if e := tx.checkClosed(); E.Chk(e) { return nil, e } // When the block is pending to be written on commit return the bytes from there. if idx, exists := tx.pendingBlocks[*hash]; exists { return tx.pendingBlockData[idx].bytes, nil } // Lookup the location of the block in the files from the block index. blockRow, e := tx.fetchBlockRow(hash) if e != nil { return nil, e } location := deserializeBlockLoc(blockRow) // Read the block from the appropriate location. The function also performs a checksum over the data to detect data // corruption. blockBytes, e := tx.db.store.readBlock(hash, location) if e != nil { return nil, e } return blockBytes, nil } // FetchBlocks returns the raw serialized bytes for the blocks identified by the given hashes. // // The raw bytes are in the format returned by Serialize on a wire.Block. // // Returns the following errors as required by the interface contract: // // - ErrBlockNotFound if any of the requested block hashed do not exist // // - ErrTxClosed if the transaction has already been closed // // - ErrCorruption if the database has somehow become corrupted // // In addition, returns ErrDriverSpecific if any failures occur when reading the block files. // // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a // transaction has ended results in undefined behavior. // // This constraint prevents additional data copies and allows support for memory-mapped database implementations. // // This function is part of the database.Tx interface implementation. func (tx *transaction) FetchBlocks(hashes []chainhash.Hash) ([][]byte, error) { // Ensure transaction state is valid. if e := tx.checkClosed(); E.Chk(e) { return nil, e } // NOTE: This could check for the existence of all blocks before loading any of them which would be faster in the // failure case, however callers will not typically be calling this function with invalid values, so optimize for // the common case. Load the blocks. blocks := make([][]byte, len(hashes)) for i := range hashes { var e error blocks[i], e = tx.FetchBlock(&hashes[i]) if e != nil { return nil, e } } return blocks, nil } // fetchPendingRegion attempts to fetch the provided region from any block which are pending to be written on commit. // // It will return nil for the byte slice when the region references a block which is not pending. When the region does // reference a pending block, it is bounds checked and returns ErrBlockRegionInvalid if invalid. func (tx *transaction) fetchPendingRegion(region *database.BlockRegion) ([]byte, error) { // Nothing to do if the block is not pending to be written on commit. idx, exists := tx.pendingBlocks[*region.Hash] if !exists { return nil, nil } // Ensure the region is within the bounds of the block. blockBytes := tx.pendingBlockData[idx].bytes blockLen := uint32(len(blockBytes)) endOffset := region.Offset + region.Len if endOffset < region.Offset || endOffset > blockLen { str := fmt.Sprintf( "block %s region offset %d, length %d "+ "exceeds block length of %d", region.Hash, region.Offset, region.Len, blockLen, ) return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil) } // Return the bytes from the pending block. return blockBytes[region.Offset:endOffset:endOffset], nil } // FetchBlockRegion returns the raw serialized bytes for the given block region. // // For example, it is possible to directly extract Bitcoin transactions and/or scripts from a block with this function. // Depending on the backend implementation, this can provide significant savings by avoiding the need to load entire // blocks. // // The raw bytes are in the format returned by Serialize on a wire.Block and the Offset field in the provided // BlockRegion is zero-based and relative to the start of the block (byte 0). // // Returns the following errors as required by the interface contract: // // - ErrBlockNotFound if the requested block hash does not exist // // - ErrBlockRegionInvalid if the region exceeds the bounds of the associated block // // - ErrTxClosed if the transaction has already been closed // // - ErrCorruption if the database has somehow become corrupted // // In addition, returns ErrDriverSpecific if any failures occur when reading the block files. // // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a // transaction has ended results in undefined behavior. This constraint prevents additional data copies and allows // support for memory-mapped database implementations. This function is part of the database.Tx interface // implementation. func (tx *transaction) FetchBlockRegion(region *database.BlockRegion) ([]byte, error) { // Ensure transaction state is valid. if e := tx.checkClosed(); E.Chk(e) { return nil, e } // When the block is pending to be written on commit return the bytes from there. if tx.pendingBlocks != nil { regionBytes, e := tx.fetchPendingRegion(region) if e != nil { return nil, e } if regionBytes != nil { return regionBytes, nil } } // Lookup the location of the block in the files from the block index. blockRow, e := tx.fetchBlockRow(region.Hash) if e != nil { return nil, e } location := deserializeBlockLoc(blockRow) // Ensure the region is within the bounds of the block. endOffset := region.Offset + region.Len if endOffset < region.Offset || endOffset > location.blockLen { str := fmt.Sprintf( "block %s region offset %d, length %d "+ "exceeds block length of %d", region.Hash, region.Offset, region.Len, location.blockLen, ) return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil) } // Read the region from the appropriate disk block file. regionBytes, e := tx.db.store.readBlockRegion( location, region.Offset, region.Len, ) if e != nil { return nil, e } return regionBytes, nil } // FetchBlockRegions returns the raw serialized bytes for the given block regions. // // For example, it is possible to directly extract Bitcoin transactions and/or scripts from various blocks with this // function. Depending on the backend implementation, this can provide significant savings by avoiding the need to load // entire blocks. // // The raw bytes are in the format returned by Serialize on a wire.Block and the Offset fields in the provided // BlockRegions are zero-based and relative to the start of the block (byte 0). // // Returns the following errors as required by the interface contract: // // - ErrBlockNotFound if any of the request block hashes do not exist // // - ErrBlockRegionInvalid if one or more region exceed the bounds of the associated block // // - ErrTxClosed if the transaction has already been closed // // - ErrCorruption if the database has somehow become corrupted // // In addition, returns ErrDriverSpecific if any failures occur when reading the block files. // // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a // transaction has ended results in undefined behavior. This constraint prevents additional data copies and allows // support for memory-mapped database implementations. This function is part of the database.Tx interface // implementation. func (tx *transaction) FetchBlockRegions(regions []database.BlockRegion) ([][]byte, error) { // Ensure transaction state is valid. if e := tx.checkClosed(); E.Chk(e) { return nil, e } // NOTE: This could check for the existence of all blocks before deserializing the locations and building up the // fetch list which would be faster in the failure case, however callers will not typically be calling this function // with invalid values, so optimize for the common case. // // NOTE: A potential optimization here would be to combine adjacent regions to reduce the number of reads. // // In order to improve efficiency of loading the bulk data, first grab the block location for all of the requested // block hashes and txsort the reads by filenum:offset so that all reads are grouped by file and linear within each // file. // // This can result in quite a significant performance increase depending on how spread out the requested hashes are // by reducing the number of file open/closes and random accesses needed. // // The fetchList is intentionally allocated with a cap because some of the regions might be fetched from the pending // blocks and hence there is no need to fetch those from disk. blockRegions := make([][]byte, len(regions)) fetchList := make([]bulkFetchData, 0, len(regions)) for i := range regions { region := ®ions[i] // When the block is pending to be written on commit grab the bytes from there. if tx.pendingBlocks != nil { regionBytes, e := tx.fetchPendingRegion(region) if e != nil { return nil, e } if regionBytes != nil { blockRegions[i] = regionBytes continue } } // Lookup the location of the block in the files from the block index. blockRow, e := tx.fetchBlockRow(region.Hash) if e != nil { return nil, e } location := deserializeBlockLoc(blockRow) // Ensure the region is within the bounds of the block. endOffset := region.Offset + region.Len if endOffset < region.Offset || endOffset > location.blockLen { str := fmt.Sprintf( "block %s region offset %d, length "+ "%d exceeds block length of %d", region.Hash, region.Offset, region.Len, location.blockLen, ) return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil) } fetchList = append(fetchList, bulkFetchData{&location, i}) } sort.Sort(bulkFetchDataSorter(fetchList)) // Read all of the regions in the fetch list and set the results. for i := range fetchList { fetchData := &fetchList[i] ri := fetchData.replyIndex region := ®ions[ri] location := fetchData.blockLocation regionBytes, e := tx.db.store.readBlockRegion( *location, region.Offset, region.Len, ) if e != nil { return nil, e } blockRegions[ri] = regionBytes } return blockRegions, nil } // close marks the transaction closed then releases any pending data, the underlying snapshot, the transaction read // lock, and the write lock when the transaction is writable. func (tx *transaction) close() { tx.closed = true // Clear pending blocks that would have been written on commit. tx.pendingBlocks = nil tx.pendingBlockData = nil // Clear pending keys that would have been written or deleted on commit. tx.pendingKeys = nil tx.pendingRemove = nil // Release the snapshot. if tx.snapshot != nil { tx.snapshot.Release() tx.snapshot = nil } tx.db.closeLock.RUnlock() // Release the writer lock for writable transactions to unblock any other write transaction which are possibly // waiting. if tx.writable { tx.db.writeLock.Unlock() } } // writePendingAndCommit writes pending block data to the flat block files, updates the metadata with their locations as // well as the new current write location, and commits the metadata to the memory database cache. // // It also properly handles rollback in the case of failures. // // This function MUST only be called when there is pending data to be written. func (tx *transaction) writePendingAndCommit() (e error) { // Save the current block store write position for potential rollback. // // These variables are only updated here in this function and there can only be one write transaction active at a // time, so it's safe to store them for potential rollback. wc := tx.db.store.writeCursor wc.RLock() oldBlkFileNum := wc.curFileNum oldBlkOffset := wc.curOffset wc.RUnlock() // rollback is a closure that is used to rollback all writes to the block files. rollback := func() { // Rollback any modifications made to the block files if needed. tx.db.store.handleRollback(oldBlkFileNum, oldBlkOffset) } // Loop through all of the pending blocks to store and write them. for _, blockData := range tx.pendingBlockData { // Tracef("storing block %s", blockData.hash) location, e := tx.db.store.writeBlock(blockData.bytes) if e != nil { rollback() return e } // Add a record in the block index for the block. The record includes the location information needed to locate // the block on the filesystem as well as the block header since they are so commonly needed. blockRow := serializeBlockLoc(location) e = tx.blockIdxBucket.Put(blockData.hash[:], blockRow) if e != nil { rollback() return e } } // Update the metadata for the current write file and offset. writeRow := serializeWriteRow(wc.curFileNum, wc.curOffset) if e := tx.metaBucket.Put(writeLocKeyName, writeRow); E.Chk(e) { rollback() return convertErr("failed to store write cursor", e) } // Atomically update the database cache. // // The cache automatically handles flushing to the underlying persistent storage database. return tx.db.cache.commitTx(tx) } // Commit commits all changes that have been made to the root metadata bucket and all of its sub-buckets to the database // cache which is periodically synced to persistent storage. In addition, it commits all new blocks directly to // persistent storage bypassing the db cache. // // Blocks can be rather large so this help increase the amount of cache available for the metadata updates and is safe // since blocks are immutable. // // This function is part of the database.Tx interface implementation. func (tx *transaction) Commit() (e error) { // Prevent commits on managed transactions. if tx.managed { tx.close() panic("managed transaction commit not allowed") } // Ensure transaction state is valid. if e := tx.checkClosed(); E.Chk(e) { return e } // Regardless of whether the commit succeeds, the transaction is closed on return. defer tx.close() // Ensure the transaction is writable. if !tx.writable { str := "Commit requires a writable database transaction" return makeDbErr(database.ErrTxNotWritable, str, nil) } // Write pending data. The function will rollback if any errors occur. return tx.writePendingAndCommit() } // Rollback undoes all changes that have been made to the root bucket and all of its sub-buckets. // // This function is part of the database.Tx interface implementation. func (tx *transaction) Rollback() (e error) { // Prevent rollbacks on managed transactions. if tx.managed { tx.close() panic("managed transaction rollback not allowed") } // Ensure transaction state is valid. if e := tx.checkClosed(); E.Chk(e) { return e } tx.close() return nil } // db represents a collection of namespaces which are persisted and implements the database.DB interface. All database // access is performed through transactions which are obtained through the specific Namespace. type db struct { writeLock sync.Mutex // Limit to one write transaction at a time. closeLock sync.RWMutex // Make database close block while txns active. closed bool // Is the database closed? store *blockStore // Handles read/writing blocks to flat files. cache *dbCache // Cache layer which wraps underlying leveldb DB. } // Enforce db implements the database.DB interface. var _ database.DB = (*db)(nil) // Type returns the database driver type the current database instance was created with. This function is part of the // database DB interface implementation. func (db *db) Type() string { return dbType } // begin is the implementation function for the Begin database method. // // See its documentation for more details. // // This function is only separate because it returns the internal transaction which is used by the managed transaction // code while the database method returns the interface. func (db *db) begin(writable bool) (*transaction, error) { // Whenever a new writable transaction is started, grab the write lock to ensure only a single write transaction can // be active at the same time. // // This lock will not be released until the transaction is closed ( via Rollback or Commit). if writable { db.writeLock.Lock() } // Whenever a new transaction is started, grab a read lock against the database to ensure Close will wait for the // transaction to finish. // // This lock will not be released until the transaction is closed ( via Rollback or Commit). db.closeLock.RLock() if db.closed { db.closeLock.RUnlock() if writable { db.writeLock.Unlock() } return nil, makeDbErr( database.ErrDbNotOpen, errDbNotOpenStr, nil, ) } // Grab a snapshot of the database cache (which in turn also handles the underlying database). snapshot, e := db.cache.Snapshot() if e != nil { db.closeLock.RUnlock() if writable { db.writeLock.Unlock() } return nil, e } // The metadata and block index buckets are internal-only buckets, so they have defined IDs. tx := &transaction{ writable: writable, db: db, snapshot: snapshot, pendingKeys: treap.NewMutable(), pendingRemove: treap.NewMutable(), } tx.metaBucket = &bucket{tx: tx, id: metadataBucketID} tx.blockIdxBucket = &bucket{tx: tx, id: blockIdxBucketID} return tx, nil } // Begin starts a transaction which is either read-only or read-write depending // on the specified flag. // // Multiple read-only transactions can be started simultaneously while only a // single read-write transaction can be started at a time. // // The call will block when starting a read-write transaction when one is // already open. // // NOTE: The transaction must be closed by calling Rollback or Commit on it when // it is no longer needed. // // Failure to do so will result in unclaimed memory. // // This function is part of the database.DB interface implementation. func (db *db) Begin(writable bool) (database.Tx, error) { return db.begin(writable) } // rollbackOnPanic rolls the passed transaction back if the code in the calling // function panics. This is needed since the mutex on a transaction must be // released and a panic in called code would prevent that from happening. // // NOTE: This can only be handled manually for managed transactions since they // control the life-cycle of the transaction. // // As the documentation on Begin calls out, callers opting to use manual // transactions will have to ensure the transaction is rolled back on panic if // it desires that functionality as well or the database will fail to close // since the read-lock will never be released. func rollbackOnPanic(tx *transaction) { if err := recover(); err != nil { tx.managed = false _ = tx.Rollback() panic(err) } } // View invokes the passed function in the context of a managed read-only // transaction with the root bucket for the namespace. // // Any errors returned from the user-supplied function are returned from this // function. This function is part of the database.DB interface implementation. func (db *db) View(fn func(database.Tx) error) (e error) { // Start a read-only transaction. tx, e := db.begin(false) if e != nil { return e } // Since the user-provided function might panic, ensure the transaction releases // all mutexes and resources. // // There is no guarantee the caller won't use recover and keep going. // // Thus, the database must still be in a usable state on panics due to caller // issues. defer rollbackOnPanic(tx) tx.managed = true e = fn(tx) tx.managed = false if e != nil { // The error is ignored here because nothing was written yet and regardless of a rollback failure, the tx is // closed now anyways. _ = tx.Rollback() return e } return tx.Rollback() } // Update invokes the passed function in the context of a managed read -write transaction with the root bucket for the // namespace. // // Any errors returned from the user-supplied function will cause the transaction to be rolled back and are returned // from this function. // // Otherwise, the transaction is committed when the user-supplied function returns a nil error. This function is part of // the database. DB interface implementation. func (db *db) Update(fn func(database.Tx) error) (e error) { // Start a read-write transaction. tx, e := db.begin(true) if e != nil { return e } // Since the user-provided function might panic, ensure the transaction releases all mutexes and resources. // // There is no guarantee the caller won't use recover and keep going. // // Thus, the database must still be in a usable state on panics due to caller issues. defer rollbackOnPanic(tx) tx.managed = true e = fn(tx) tx.managed = false if e != nil { // The error is ignored here because nothing was written yet and regardless of a rollback failure, the tx is // closed now anyways. _ = tx.Rollback() return e } return tx.Commit() } // Close cleanly shuts down the database and syncs all data. // // It will block until all database transactions have been finalized ( rolled back or committed). This function is part // of the database. // // DB interface implementation. func (db *db) Close() (e error) { // Since all transactions have a read lock on this mutex, this will cause Close to wait for all readers to complete. db.closeLock.Lock() defer db.closeLock.Unlock() if db.closed { return makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr, nil) } db.closed = true // NOTE: Since the above lock waits for all transactions to finish and prevents any new ones from being started, it // is safe to flush the cache and clear all state without the individual locks. Close the database cache which will // flush any existing entries to disk and close the underlying leveldb database. // // Any error is saved and returned at the end after the remaining cleanup since the database will be marked closed // even if this fails given there is no good way for the caller to recover from a failure here anyways. closeErr := db.cache.Close() // Close any open flat files that house the blocks. wc := db.store.writeCursor if wc.curFile.file != nil { _ = wc.curFile.file.Close() wc.curFile.file = nil } for _, blockFile := range db.store.openBlockFiles { _ = blockFile.file.Close() } db.store.openBlockFiles = nil db.store.openBlocksLRU.Init() db.store.fileNumToLRUElem = nil return closeErr } func // fileExists reports whether the named file or directory exists. fileExists(name string) bool { var e error if _, e = os.Stat(name); E.Chk(e) { if os.IsNotExist(e) { return false } } return true } // initDB creates the initial buckets and values used by the package. // // This is mainly in a separate function for testing purposes. func initDB(ldb *leveldb.DB) (e error) { // The starting block file write cursor location is file num 0, offset 0. batch := new(leveldb.Batch) batch.Put( bucketizedKey(metadataBucketID, writeLocKeyName), serializeWriteRow(0, 0), ) // Create block index bucket and set the current bucket id. // // NOTE: Since buckets are virtualized through the use of prefixes, there is no need to store the bucket index data // for the metadata bucket in the database. However, the first bucket ID to use does need to account for it to // ensure there are no key collisions. batch.Put( bucketIndexKey(metadataBucketID, blockIdxBucketName), blockIdxBucketID[:], ) batch.Put(curBucketIDKeyName, blockIdxBucketID[:]) // Write everything as a single batch. if e := ldb.Write(batch, nil); E.Chk(e) { str := fmt.Sprintf( "failed to initialize metadata database: %v", e, ) return convertErr(str, e) } return nil } // openDB opens the database at the provided path // // ErrDbDoesNotExist is returned if the database doesn't exist and the create flag is not set. func openDB(dbPath string, network wire.BitcoinNet, create bool) (database.DB, error) { // DBError if the database doesn't exist and the create flag is not set. metadataDbPath := filepath.Join(dbPath, metadataDbName) dbExists := fileExists(metadataDbPath) if !create && !dbExists { str := fmt.Sprintf("database %q does not exist", metadataDbPath) return nil, makeDbErr(database.ErrDbDoesNotExist, str, nil) } // Ensure the full path to the database exists. if !dbExists { // The error can be ignored here since the call to leveldb. OpenFile will fail if the directory couldn't be // created. _ = os.MkdirAll(dbPath, 0700) } // Open the metadata database (will create it if needed). opts := opt.Options{ ErrorIfExist: create, Strict: opt.DefaultStrict, Compression: opt.NoCompression, Filter: filter.NewBloomFilter(10), } ldb, e := leveldb.OpenFile(metadataDbPath, &opts) if e != nil { return nil, convertErr(e.Error(), e) } // Create the block store which includes scanning the existing flat block files to find what the current write // cursor position is according to the data that is actually on disk. // // Also create the database cache which wraps the underlying leveldb database to provide write caching. store := newBlockStore(dbPath, network) cache := newDbCache(ldb, store, defaultCacheSize, defaultFlushSecs) pdb := &db{store: store, cache: cache} // Perform any reconciliation needed between the block and metadata as well as database initialization, if needed. return reconcileDB(pdb, create) }