db.go raw

   1  package ffldb
   2  
   3  import (
   4  	"bytes"
   5  	"encoding/binary"
   6  	"fmt"
   7  	"github.com/p9c/p9/pkg/block"
   8  	"os"
   9  	"path/filepath"
  10  	"runtime"
  11  	"sort"
  12  	"sync"
  13  	
  14  	"github.com/btcsuite/goleveldb/leveldb"
  15  	"github.com/btcsuite/goleveldb/leveldb/comparer"
  16  	ldberrors "github.com/btcsuite/goleveldb/leveldb/errors"
  17  	"github.com/btcsuite/goleveldb/leveldb/filter"
  18  	"github.com/btcsuite/goleveldb/leveldb/iterator"
  19  	"github.com/btcsuite/goleveldb/leveldb/opt"
  20  	"github.com/btcsuite/goleveldb/leveldb/util"
  21  	
  22  	"github.com/p9c/p9/pkg/chainhash"
  23  	"github.com/p9c/p9/pkg/database"
  24  	"github.com/p9c/p9/pkg/util/treap"
  25  	"github.com/p9c/p9/pkg/wire"
  26  )
  27  
  28  const (
  29  	// metadataDbName is the name used for the metadata database.
  30  	metadataDbName = "metadata"
  31  	// blockHdrSize is the size of a block header.
  32  	//
  33  	// This is simply the constant from wire and is only provided here for convenience since wire.MaxBlockHeaderPayload
  34  	// is quite long.
  35  	blockHdrSize = wire.MaxBlockHeaderPayload
  36  	// blockHdrOffset defines the offsets into a block index row for the block header.
  37  	//
  38  	// The serialized block index row format is:
  39  	//
  40  	//   <blocklocation><blockheader>
  41  	blockHdrOffset = blockLocSize
  42  )
  43  
  44  var (
  45  	// byteOrder is the preferred byte order used through the database and block files. Sometimes big endian will be
  46  	// used to allow ordered byte sortable integer values.
  47  	byteOrder = binary.LittleEndian
  48  	// bucketIndexPrefix is the prefix used for all entries in the bucket index.
  49  	bucketIndexPrefix = []byte("bidx")
  50  	// curBucketIDKeyName is the name of the key used to keep track of the current bucket ID counter.
  51  	curBucketIDKeyName = []byte("bidx-cbid")
  52  	// metadataBucketID is the ID of the top-level metadata bucket. It is the value 0 encoded as an unsigned big-endian
  53  	// uint32.
  54  	metadataBucketID = [4]byte{}
  55  	// blockIdxBucketID is the ID of the internal block metadata bucket. It is the value 1 encoded as an unsigned
  56  	// big-endian uint32.
  57  	blockIdxBucketID = [4]byte{0x00, 0x00, 0x00, 0x01}
  58  	// blockIdxBucketName is the bucket used internally to track block metadata.
  59  	blockIdxBucketName = []byte("ffldb-blockidx")
  60  	// writeLocKeyName is the key used to store the current write file location.
  61  	writeLocKeyName = []byte("ffldb-writeloc")
  62  )
  63  
  64  // Common error strings.
  65  const (
  66  	// errDbNotOpenStr is the text to use for the database.ErrDbNotOpen error code.
  67  	errDbNotOpenStr = "database is not open"
  68  	// errTxClosedStr is the text to use for the database.ErrTxClosed error code.
  69  	errTxClosedStr = "database tx is closed"
  70  )
  71  
  72  // bulkFetchData is allows a block location to be specified along with the index it was requested from.
  73  //
  74  // This in turn allows the bulk data loading functions to txsort the data accesses based on the location to improve
  75  // performance while keeping track of which result the data is for.
  76  type bulkFetchData struct {
  77  	*blockLocation
  78  	replyIndex int
  79  }
  80  
  81  // bulkFetchDataSorter implements txsort.
  82  //
  83  // Interface to allow a slice of bulkFetchData to be sorted. In particular it sorts by file and then offset so that
  84  // reads from files are grouped and linear.
  85  type bulkFetchDataSorter []bulkFetchData
  86  
  87  // Len returns the number of items in the slice.
  88  //
  89  // It is part of the sort.Interface implementation.
  90  func (s bulkFetchDataSorter) Len() int {
  91  	return len(s)
  92  }
  93  
  94  // Swap swaps the items at the passed indices.
  95  //
  96  // It is part of the sort.Interface implementation.
  97  func (s bulkFetchDataSorter) Swap(i, j int) {
  98  	s[i], s[j] = s[j], s[i]
  99  }
 100  
 101  // Less returns whether the item with index i should txsort before the item with index j.
 102  //
 103  // It is part of the sort.Interface implementation.
 104  func (s bulkFetchDataSorter) Less(i, j int) bool {
 105  	if s[i].blockFileNum < s[j].blockFileNum {
 106  		return true
 107  	}
 108  	if s[i].blockFileNum > s[j].blockFileNum {
 109  		return false
 110  	}
 111  	return s[i].fileOffset < s[j].fileOffset
 112  }
 113  
 114  // makeDbErr creates a database.DBError given a set of arguments.
 115  func makeDbErr(c database.ErrorCode, desc string, e error) database.DBError {
 116  	return database.DBError{ErrorCode: c, Description: desc, Err: e}
 117  }
 118  
 119  // convertErr converts the passed leveldb error into a database error with an equivalent error code and the passed
 120  // description.
 121  //
 122  // It also sets the passed error as the underlying error.
 123  func convertErr(desc string, ldbErr error) database.DBError {
 124  	// Use the driver-specific error code by default.
 125  	//
 126  	// The code below will update this with the converted error if it's recognized.
 127  	var code = database.ErrDriverSpecific
 128  	switch {
 129  	// Database corruption errors.
 130  	case ldberrors.IsCorrupted(ldbErr):
 131  		code = database.ErrCorruption
 132  	// Database open/create errors.
 133  	case ldbErr == leveldb.ErrClosed:
 134  		code = database.ErrDbNotOpen
 135  	// Transaction errors.
 136  	case ldbErr == leveldb.ErrSnapshotReleased:
 137  		code = database.ErrTxClosed
 138  	case ldbErr == leveldb.ErrIterReleased:
 139  		code = database.ErrTxClosed
 140  	}
 141  	return database.DBError{ErrorCode: code, Description: desc, Err: ldbErr}
 142  }
 143  
 144  // copySlice returns a copy of the passed slice.
 145  //
 146  // This is mostly used to copy leveldb iterator keys and values since they are only valid until the iterator is moved
 147  // instead of during the entirety of the transaction.
 148  func copySlice(slice []byte) []byte {
 149  	ret := make([]byte, len(slice))
 150  	copy(ret, slice)
 151  	return ret
 152  }
 153  
 154  // cursor is an internal type used to represent a cursor over key/value pairs and nested buckets of a bucket and
 155  // implements the database.Cursor interface.
 156  type cursor struct {
 157  	bucket      *bucket
 158  	dbIter      iterator.Iterator
 159  	pendingIter iterator.Iterator
 160  	currentIter iterator.Iterator
 161  }
 162  
 163  // Enforce cursor implements the database.Cursor interface.
 164  var _ database.Cursor = (*cursor)(nil)
 165  
 166  // Bucket returns the bucket the cursor was created for.
 167  //
 168  // This function is part of the database.Cursor interface implementation.
 169  func (c *cursor) Bucket() database.Bucket {
 170  	// Ensure transaction state is valid.
 171  	if e := c.bucket.tx.checkClosed(); E.Chk(e) {
 172  		return nil
 173  	}
 174  	return c.bucket
 175  }
 176  
 177  // Delete removes the current key/value pair the cursor is at without invalidating the cursor.
 178  //
 179  // Returns the following errors as required by the interface contract:
 180  //
 181  //   - ErrIncompatibleValue if attempted when the cursor points to a nested bucket
 182  //
 183  //   - ErrTxNotWritable if attempted against a read-only transaction
 184  //
 185  //   - ErrTxClosed if the transaction has already been closed
 186  //
 187  // This function is part of the database.Cursor interface implementation.
 188  func (c *cursor) Delete() (e error) {
 189  	// Ensure transaction state is valid.
 190  	if e := c.bucket.tx.checkClosed(); E.Chk(e) {
 191  		return e
 192  	}
 193  	// DBError if the cursor is exhausted.
 194  	if c.currentIter == nil {
 195  		str := "cursor is exhausted"
 196  		return makeDbErr(database.ErrIncompatibleValue, str, nil)
 197  	}
 198  	// Do not allow buckets to be deleted via the cursor.
 199  	key := c.currentIter.Key()
 200  	if bytes.HasPrefix(key, bucketIndexPrefix) {
 201  		str := "buckets may not be deleted from a cursor"
 202  		return makeDbErr(database.ErrIncompatibleValue, str, nil)
 203  	}
 204  	c.bucket.tx.deleteKey(copySlice(key), true)
 205  	return nil
 206  }
 207  
 208  // skipPendingUpdates skips any keys at the current database iterator position that are being updated by the
 209  // transaction.
 210  //
 211  // The forwards flag indicates the direction the cursor is moving.
 212  func (c *cursor) skipPendingUpdates(forwards bool) {
 213  	for c.dbIter.Valid() {
 214  		var skip bool
 215  		key := c.dbIter.Key()
 216  		if c.bucket.tx.pendingRemove.Has(key) {
 217  			skip = true
 218  		} else if c.bucket.tx.pendingKeys.Has(key) {
 219  			skip = true
 220  		}
 221  		if !skip {
 222  			break
 223  		}
 224  		if forwards {
 225  			c.dbIter.Next()
 226  		} else {
 227  			c.dbIter.Prev()
 228  		}
 229  	}
 230  }
 231  
 232  // chooseIterator first skips any entries in the database iterator that are being updated by the transaction and sets
 233  // the current iterator to the appropriate iterator depending on their validity and the order they compare in while
 234  // taking into account the direction flag.
 235  //
 236  // When the cursor is being moved forwards and both iterators are valid, the iterator with the smaller key is chosen and
 237  // vice versa when the cursor is being moved backwards.
 238  func (c *cursor) chooseIterator(forwards bool) bool {
 239  	// Skip any keys at the current database iterator position that are being updated by the transaction.
 240  	c.skipPendingUpdates(forwards)
 241  	// When both iterators are exhausted, the cursor is exhausted too.
 242  	if !c.dbIter.Valid() && !c.pendingIter.Valid() {
 243  		c.currentIter = nil
 244  		return false
 245  	}
 246  	// Choose the database iterator when the pending keys iterator is exhausted.
 247  	if !c.pendingIter.Valid() {
 248  		c.currentIter = c.dbIter
 249  		return true
 250  	}
 251  	// Choose the pending keys iterator when the database iterator is exhausted.
 252  	if !c.dbIter.Valid() {
 253  		c.currentIter = c.pendingIter
 254  		return true
 255  	}
 256  	// Both iterators are valid, so choose the iterator with either the smaller or larger key depending on the forwards
 257  	// flag.
 258  	compare := bytes.Compare(c.dbIter.Key(), c.pendingIter.Key())
 259  	if (forwards && compare > 0) || (!forwards && compare < 0) {
 260  		c.currentIter = c.pendingIter
 261  	} else {
 262  		c.currentIter = c.dbIter
 263  	}
 264  	return true
 265  }
 266  
 267  // First positions the cursor at the first key/value pair and returns whether or not the pair exists. This function is
 268  // part of the database.
 269  //
 270  // Cursor interface implementation.
 271  func (c *cursor) First() bool {
 272  	// Ensure transaction state is valid.
 273  	if e := c.bucket.tx.checkClosed(); E.Chk(e) {
 274  		return false
 275  	}
 276  	// Seek to the first key in both the database and pending iterators and choose the iterator that is both valid and
 277  	// has the smaller key.
 278  	c.dbIter.First()
 279  	c.pendingIter.First()
 280  	return c.chooseIterator(true)
 281  }
 282  
 283  // Last positions the cursor at the last key/value pair and returns whether or not the pair exists. This function is
 284  // part of the database.
 285  //
 286  // Cursor interface implementation.
 287  func (c *cursor) Last() bool {
 288  	// Ensure transaction state is valid.
 289  	if e := c.bucket.tx.checkClosed(); E.Chk(e) {
 290  		return false
 291  	}
 292  	// Seek to the last key in both the database and pending iterators and choose the iterator that is both valid and
 293  	// has the larger key.
 294  	c.dbIter.Last()
 295  	c.pendingIter.Last()
 296  	return c.chooseIterator(false)
 297  }
 298  
 299  // Next moves the cursor one key/value pair forward and returns whether or not the pair exists. This function is part of
 300  // the database.
 301  //
 302  // Cursor interface implementation.
 303  func (c *cursor) Next() bool {
 304  	// Ensure transaction state is valid.
 305  	if e := c.bucket.tx.checkClosed(); E.Chk(e) {
 306  		return false
 307  	}
 308  	// Nothing to return if cursor is exhausted.
 309  	if c.currentIter == nil {
 310  		return false
 311  	}
 312  	// Move the current iterator to the next entry and choose the iterator that is both valid and has the smaller key.
 313  	c.currentIter.Next()
 314  	return c.chooseIterator(true)
 315  }
 316  
 317  // Prev moves the cursor one key/value pair backward and returns whether or not the pair exists. This function is part
 318  // of the database.
 319  //
 320  // Cursor interface implementation.
 321  func (c *cursor) Prev() bool {
 322  	// Ensure transaction state is valid.
 323  	if e := c.bucket.tx.checkClosed(); E.Chk(e) {
 324  		return false
 325  	}
 326  	// Nothing to return if cursor is exhausted.
 327  	if c.currentIter == nil {
 328  		return false
 329  	}
 330  	// Move the current iterator to the previous entry and choose the iterator that is both valid and has the larger
 331  	// key.
 332  	c.currentIter.Prev()
 333  	return c.chooseIterator(false)
 334  }
 335  
 336  // Seek positions the cursor at the first key/value pair that is greater than or equal to the passed seek key.
 337  //
 338  // Returns false if no suitable key was found.
 339  //
 340  // This function is part of the database.Cursor interface implementation.
 341  func (c *cursor) Seek(seek []byte) bool {
 342  	// Ensure transaction state is valid.
 343  	if e := c.bucket.tx.checkClosed(); E.Chk(e) {
 344  		return false
 345  	}
 346  	// Seek to the provided key in both the database and pending iterators then choose the iterator that is both valid
 347  	// and has the larger key.
 348  	seekKey := bucketizedKey(c.bucket.id, seek)
 349  	c.dbIter.Seek(seekKey)
 350  	c.pendingIter.Seek(seekKey)
 351  	return c.chooseIterator(true)
 352  }
 353  
 354  // rawKey returns the current key the cursor is pointing to without stripping the current bucket prefix or bucket index
 355  // prefix.
 356  func (c *cursor) rawKey() []byte {
 357  	// Nothing to return if cursor is exhausted.
 358  	if c.currentIter == nil {
 359  		return nil
 360  	}
 361  	return copySlice(c.currentIter.Key())
 362  }
 363  
 364  // Key returns the current key the cursor is pointing to.
 365  //
 366  // This function is part of the database.Cursor interface implementation.
 367  func (c *cursor) Key() []byte {
 368  	// Ensure transaction state is valid.
 369  	if e := c.bucket.tx.checkClosed(); E.Chk(e) {
 370  		return nil
 371  	}
 372  	// Nothing to return if cursor is exhausted.
 373  	if c.currentIter == nil {
 374  		return nil
 375  	}
 376  	// Slice out the actual key name and make a copy since it is no longer valid after iterating to the next item.
 377  	//
 378  	// The key is after the bucket index prefix and parent ID when the cursor is pointing to a nested bucket.
 379  	key := c.currentIter.Key()
 380  	if bytes.HasPrefix(key, bucketIndexPrefix) {
 381  		key = key[len(bucketIndexPrefix)+4:]
 382  		return copySlice(key)
 383  	}
 384  	// The key is after the bucket ID when the cursor is pointing to a normal entry.
 385  	key = key[len(c.bucket.id):]
 386  	return copySlice(key)
 387  }
 388  
 389  // rawValue returns the current value the cursor is pointing to without stripping without filtering bucket index values.
 390  func (c *cursor) rawValue() []byte {
 391  	// Nothing to return if cursor is exhausted.
 392  	if c.currentIter == nil {
 393  		return nil
 394  	}
 395  	return copySlice(c.currentIter.Value())
 396  }
 397  
 398  // Value returns the current value the cursor is pointing to.
 399  //
 400  // This will be nil for nested buckets.
 401  //
 402  // This function is part of the database.Cursor interface implementation.
 403  func (c *cursor) Value() []byte {
 404  	// Ensure transaction state is valid.
 405  	if e := c.bucket.tx.checkClosed(); E.Chk(e) {
 406  		return nil
 407  	}
 408  	// Nothing to return if cursor is exhausted.
 409  	if c.currentIter == nil {
 410  		return nil
 411  	}
 412  	// Return nil for the value when the cursor is pointing to a nested
 413  	// bucket.
 414  	if bytes.HasPrefix(c.currentIter.Key(), bucketIndexPrefix) {
 415  		return nil
 416  	}
 417  	return copySlice(c.currentIter.Value())
 418  }
 419  
 420  // cursorType defines the type of cursor to create.
 421  type cursorType int
 422  
 423  // The following constants define the allowed cursor types.
 424  const (
 425  	// ctKeys iterates through all of the keys in a given bucket.
 426  	ctKeys cursorType = iota
 427  	// ctBuckets iterates through all directly nested buckets in a given bucket.
 428  	ctBuckets
 429  	// ctFull iterates through both the keys and the directly nested buckets
 430  	// in a given bucket.
 431  	ctFull
 432  )
 433  
 434  // cursorFinalizer is either invoked when a cursor is being garbage collected or called manually to ensure the
 435  // underlying cursor iterators are released.
 436  func cursorFinalizer(c *cursor) {
 437  	c.dbIter.Release()
 438  	c.pendingIter.Release()
 439  }
 440  
 441  // newCursor returns a new cursor for the given bucket, bucket ID, and cursor type.
 442  //
 443  // NOTE: The caller is responsible for calling the cursorFinalizer function on the returned cursor.
 444  func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor {
 445  	var dbIter, pendingIter iterator.Iterator
 446  	switch cursorTyp {
 447  	case ctKeys:
 448  		keyRange := util.BytesPrefix(bucketID)
 449  		dbIter = b.tx.snapshot.NewIterator(keyRange)
 450  		pendingKeyIter := newLdbTreapIter(b.tx, keyRange)
 451  		pendingIter = pendingKeyIter
 452  	case ctBuckets:
 453  		// The serialized bucket index key format is:
 454  		//
 455  		//   <bucketindexprefix><parentbucketid><bucketname>
 456  		//
 457  		// Create an iterator for the both the database and the pending keys which are prefixed by the bucket index
 458  		// identifier and the provided bucket ID.
 459  		prefix := make([]byte, len(bucketIndexPrefix)+4)
 460  		copy(prefix, bucketIndexPrefix)
 461  		copy(prefix[len(bucketIndexPrefix):], bucketID)
 462  		bucketRange := util.BytesPrefix(prefix)
 463  		dbIter = b.tx.snapshot.NewIterator(bucketRange)
 464  		pendingBucketIter := newLdbTreapIter(b.tx, bucketRange)
 465  		pendingIter = pendingBucketIter
 466  	case ctFull:
 467  		fallthrough
 468  	default:
 469  		// The serialized bucket index key format is:
 470  		//
 471  		//   <bucketindexprefix><parentbucketid><bucketname>
 472  		prefix := make([]byte, len(bucketIndexPrefix)+4)
 473  		copy(prefix, bucketIndexPrefix)
 474  		copy(prefix[len(bucketIndexPrefix):], bucketID)
 475  		bucketRange := util.BytesPrefix(prefix)
 476  		keyRange := util.BytesPrefix(bucketID)
 477  		// Since both keys and buckets are needed from the database, create an individual iterator for each prefix and
 478  		// then create a merged iterator from them.
 479  		dbKeyIter := b.tx.snapshot.NewIterator(keyRange)
 480  		dbBucketIter := b.tx.snapshot.NewIterator(bucketRange)
 481  		iters := []iterator.Iterator{dbKeyIter, dbBucketIter}
 482  		dbIter = iterator.NewMergedIterator(
 483  			iters,
 484  			comparer.DefaultComparer, true,
 485  		)
 486  		// Since both keys and buckets are needed from the pending keys, create an individual iterator for each prefix
 487  		// and then create a merged iterator from them.
 488  		pendingKeyIter := newLdbTreapIter(b.tx, keyRange)
 489  		pendingBucketIter := newLdbTreapIter(b.tx, bucketRange)
 490  		iters = []iterator.Iterator{pendingKeyIter, pendingBucketIter}
 491  		pendingIter = iterator.NewMergedIterator(
 492  			iters,
 493  			comparer.DefaultComparer, true,
 494  		)
 495  	}
 496  	// Create the cursor using the iterators.
 497  	return &cursor{bucket: b, dbIter: dbIter, pendingIter: pendingIter}
 498  }
 499  
 500  // bucket is an internal type used to represent a collection of key/value pairs and implements the database.Bucket
 501  // interface.
 502  type bucket struct {
 503  	tx *transaction
 504  	id [4]byte
 505  }
 506  
 507  // Enforce bucket implements the database.Bucket interface.
 508  var _ database.Bucket = (*bucket)(nil)
 509  
 510  // bucketIndexKey returns the actual key to use for storing and retrieving a child bucket in the bucket index.
 511  //
 512  // This is required because additional information is needed to distinguish nested buckets with the same name.
 513  func bucketIndexKey(parentID [4]byte, key []byte) []byte {
 514  	// The serialized bucket index key format is:
 515  	//
 516  	//   <bucketindexprefix><parentbucketid><bucketname>
 517  	indexKey := make([]byte, len(bucketIndexPrefix)+4+len(key))
 518  	copy(indexKey, bucketIndexPrefix)
 519  	copy(indexKey[len(bucketIndexPrefix):], parentID[:])
 520  	copy(indexKey[len(bucketIndexPrefix)+4:], key)
 521  	return indexKey
 522  }
 523  
 524  // bucketizedKey returns the actual key to use for storing and retrieving a key for the provided bucket ID.
 525  //
 526  // This is required because bucketizing is handled through the use of a unique prefix per bucket.
 527  func bucketizedKey(bucketID [4]byte, key []byte) []byte {
 528  	// The serialized block index key format is:
 529  	//   <bucketid><key>
 530  	bKey := make([]byte, 4+len(key))
 531  	copy(bKey, bucketID[:])
 532  	copy(bKey[4:], key)
 533  	return bKey
 534  }
 535  
 536  // Bucket retrieves a nested bucket with the given key.
 537  //
 538  // Returns nil if the bucket does not exist.
 539  //
 540  // This function is part of the database.Bucket interface implementation.
 541  func (b *bucket) Bucket(key []byte) database.Bucket {
 542  	// Ensure transaction state is valid.
 543  	if e := b.tx.checkClosed(); E.Chk(e) {
 544  		return nil
 545  	}
 546  	// Attempt to fetch the ID for the child bucket. The bucket does not exist if the bucket index entry does not exist.
 547  	childID := b.tx.fetchKey(bucketIndexKey(b.id, key))
 548  	if childID == nil {
 549  		return nil
 550  	}
 551  	childBucket := &bucket{tx: b.tx}
 552  	copy(childBucket.id[:], childID)
 553  	return childBucket
 554  }
 555  
 556  // CreateBucket creates and returns a new nested bucket with the given key.
 557  //
 558  // Returns the following errors as required by the interface contract:
 559  //
 560  //   - ErrBucketExists if the bucket already exists
 561  //
 562  //   - ErrBucketNameRequired if the key is empty
 563  //
 564  //   - ErrIncompatibleValue if the key is otherwise invalid for the particular implementation
 565  //
 566  //   - ErrTxNotWritable if attempted against a read-only transaction
 567  //
 568  //   - ErrTxClosed if the transaction has already been closed
 569  //
 570  // This function is part of the database.Bucket interface implementation.
 571  func (b *bucket) CreateBucket(key []byte) (database.Bucket, error) {
 572  	// Ensure transaction state is valid.
 573  	if e := b.tx.checkClosed(); E.Chk(e) {
 574  		return nil, e
 575  	}
 576  	// Ensure the transaction is writable.
 577  	if !b.tx.writable {
 578  		str := "create bucket requires a writable database transaction"
 579  		return nil, makeDbErr(database.ErrTxNotWritable, str, nil)
 580  	}
 581  	// Ensure a key was provided.
 582  	if len(key) == 0 {
 583  		str := "create bucket requires a key"
 584  		return nil, makeDbErr(database.ErrBucketNameRequired, str, nil)
 585  	}
 586  	// Ensure bucket does not already exist.
 587  	bidxKey := bucketIndexKey(b.id, key)
 588  	if b.tx.hasKey(bidxKey) {
 589  		str := "bucket already exists"
 590  		return nil, makeDbErr(database.ErrBucketExists, str, nil)
 591  	}
 592  	// Find the appropriate next bucket ID to use for the new bucket. In the case of the special internal block index,
 593  	// keep the fixed ID.
 594  	var childID [4]byte
 595  	if b.id == metadataBucketID && bytes.Equal(key, blockIdxBucketName) {
 596  		childID = blockIdxBucketID
 597  	} else {
 598  		var e error
 599  		childID, e = b.tx.nextBucketID()
 600  		if e != nil {
 601  			return nil, e
 602  		}
 603  	}
 604  	// Add the new bucket to the bucket index.
 605  	if e := b.tx.putKey(bidxKey, childID[:]); E.Chk(e) {
 606  		str := fmt.Sprintf("failed to create bucket with key %q", key)
 607  		return nil, convertErr(str, e)
 608  	}
 609  	return &bucket{tx: b.tx, id: childID}, nil
 610  }
 611  
 612  // CreateBucketIfNotExists creates and returns a new nested bucket with the given key if it does not already exist.
 613  //
 614  // Returns the following errors as required by the interface contract:
 615  //
 616  //   - ErrBucketNameRequired if the key is empty
 617  //
 618  //   - ErrIncompatibleValue if the key is otherwise invalid for the particular implementation
 619  //
 620  //   - ErrTxNotWritable if attempted against a read-only transaction
 621  //
 622  //   - ErrTxClosed if the transaction has already been closed
 623  //
 624  // This function is part of the database.Bucket interface implementation.
 625  func (b *bucket) CreateBucketIfNotExists(key []byte) (database.Bucket, error) {
 626  	// Ensure transaction state is valid.
 627  	if e := b.tx.checkClosed(); E.Chk(e) {
 628  		return nil, e
 629  	}
 630  	// Ensure the transaction is writable.
 631  	if !b.tx.writable {
 632  		str := "create bucket requires a writable database transaction"
 633  		return nil, makeDbErr(database.ErrTxNotWritable, str, nil)
 634  	}
 635  	// Return existing bucket if it already exists, otherwise create it.
 636  	if bucket := b.Bucket(key); bucket != nil {
 637  		return bucket, nil
 638  	}
 639  	return b.CreateBucket(key)
 640  }
 641  
 642  // DeleteBucket removes a nested bucket with the given key.
 643  //
 644  // Returns the following errors as required by the interface contract:
 645  //
 646  //   - ErrBucketNotFound if the specified bucket does not exist
 647  //
 648  //   - ErrTxNotWritable if attempted against a read-only transaction
 649  //
 650  //   - ErrTxClosed if the transaction has already been closed
 651  //
 652  // This function is part of the database.Bucket interface implementation.
 653  func (b *bucket) DeleteBucket(key []byte) (e error) {
 654  	// Ensure transaction state is valid.
 655  	if e := b.tx.checkClosed(); E.Chk(e) {
 656  		return e
 657  	}
 658  	// Ensure the transaction is writable.
 659  	if !b.tx.writable {
 660  		str := "delete bucket requires a writable database transaction"
 661  		return makeDbErr(database.ErrTxNotWritable, str, nil)
 662  	}
 663  	// Attempt to fetch the ID for the child bucket.
 664  	//
 665  	// The bucket does not exist if the bucket index entry does not exist.
 666  	//
 667  	// In the case of the special internal block index, keep the fixed ID.
 668  	bidxKey := bucketIndexKey(b.id, key)
 669  	childID := b.tx.fetchKey(bidxKey)
 670  	if childID == nil {
 671  		str := fmt.Sprintf("bucket %q does not exist", key)
 672  		return makeDbErr(database.ErrBucketNotFound, str, nil)
 673  	}
 674  	// Remove all nested buckets and their keys.
 675  	childIDs := [][]byte{childID}
 676  	for len(childIDs) > 0 {
 677  		childID = childIDs[len(childIDs)-1]
 678  		childIDs = childIDs[:len(childIDs)-1]
 679  		// Delete all keys in the nested bucket.
 680  		keyCursor := newCursor(b, childID, ctKeys)
 681  		for ok := keyCursor.First(); ok; ok = keyCursor.Next() {
 682  			b.tx.deleteKey(keyCursor.rawKey(), false)
 683  		}
 684  		cursorFinalizer(keyCursor)
 685  		// Iterate through all nested buckets.
 686  		bucketCursor := newCursor(b, childID, ctBuckets)
 687  		for ok := bucketCursor.First(); ok; ok = bucketCursor.Next() {
 688  			// Push the id of the nested bucket onto the stack for the next
 689  			// iteration.
 690  			childID := bucketCursor.rawValue()
 691  			childIDs = append(childIDs, childID)
 692  			// Remove the nested bucket from the bucket index.
 693  			b.tx.deleteKey(bucketCursor.rawKey(), false)
 694  		}
 695  		cursorFinalizer(bucketCursor)
 696  	}
 697  	// Remove the nested bucket from the bucket index. Any buckets nested under it were already removed above.
 698  	b.tx.deleteKey(bidxKey, true)
 699  	return nil
 700  }
 701  
 702  // Cursor returns a new cursor, allowing for iteration over the bucket's key/value pairs and nested buckets in forward
 703  // or backward order.
 704  //
 705  // You must seek to a position using the First, Last, or Seek functions before calling the Next, Prev, Key, or value
 706  // functions. Failure to do so will result in the same return values as an exhausted cursor, which is false for the Prev
 707  // and Next functions and nil for Key and value functions. This function is part of the database.
 708  //
 709  // Bucket interface implementation.
 710  func (b *bucket) Cursor() database.Cursor {
 711  	// Ensure transaction state is valid.
 712  	if e := b.tx.checkClosed(); E.Chk(e) {
 713  		return &cursor{bucket: b}
 714  	}
 715  	// Create the cursor and setup a runtime finalizer to ensure the iterators are released when the cursor is garbage
 716  	// collected.
 717  	c := newCursor(b, b.id[:], ctFull)
 718  	runtime.SetFinalizer(c, cursorFinalizer)
 719  	return c
 720  }
 721  
 722  // ForEach invokes the passed function with every key/value pair in the bucket. This does not include nested buckets or
 723  // the key/value pairs within those nested buckets.
 724  //
 725  // WARNING: It is not safe to mutate data while iterating with this method.
 726  //
 727  // Doing so may cause the underlying cursor to be invalidated and return
 728  // unexpected keys and/or values.
 729  //
 730  // Returns the following errors as required by the interface contract:
 731  //
 732  //   - ErrTxClosed if the transaction has already been closed
 733  //
 734  // NOTE: The values returned by this function are only valid during a transaction. Attempting to access them after a
 735  // transaction has ended will likely result in an access violation. This function is part of the database.Bucket
 736  // interface implementation.
 737  func (b *bucket) ForEach(fn func(k, v []byte) error) (e error) {
 738  	// Ensure transaction state is valid.
 739  	if e := b.tx.checkClosed(); E.Chk(e) {
 740  		return e
 741  	}
 742  	// Invoke the callback for each cursor item.  Return the error returned from the callback when it is non-nil.
 743  	c := newCursor(b, b.id[:], ctKeys)
 744  	defer cursorFinalizer(c)
 745  	for ok := c.First(); ok; ok = c.Next() {
 746  		e := fn(c.Key(), c.Value())
 747  		if e != nil {
 748  			return e
 749  		}
 750  	}
 751  	return nil
 752  }
 753  
 754  // ForEachBucket invokes the passed function with the key of every nested bucket in the current bucket.
 755  //
 756  // This does not include any nested buckets within those nested buckets.
 757  //
 758  // WARNING: It is not safe to mutate data while iterating with this method.
 759  //
 760  // Doing so may cause the underlying cursor to be invalidated and return unexpected keys.
 761  //
 762  // Returns the following errors as required by the interface contract:
 763  //
 764  //   - ErrTxClosed if the transaction has already been closed
 765  //
 766  // NOTE: The values returned by this function are only valid during a transaction. Attempting to access them after a
 767  // transaction has ended will likely result in an access violation.
 768  //
 769  // This function is part of the database.Bucket interface implementation.
 770  func (b *bucket) ForEachBucket(fn func(k []byte) error) (e error) {
 771  	// Ensure transaction state is valid.
 772  	if e := b.tx.checkClosed(); E.Chk(e) {
 773  		return e
 774  	}
 775  	// Invoke the callback for each cursor item. Return the error returned from the callback when it is non-nil.
 776  	c := newCursor(b, b.id[:], ctBuckets)
 777  	defer cursorFinalizer(c)
 778  	for ok := c.First(); ok; ok = c.Next() {
 779  		e := fn(c.Key())
 780  		if e != nil {
 781  			return e
 782  		}
 783  	}
 784  	return nil
 785  }
 786  
 787  // Writable returns whether or not the bucket is writable.
 788  //
 789  // This function is part of the database.Bucket interface implementation.
 790  func (b *bucket) Writable() bool {
 791  	return b.tx.writable
 792  }
 793  
 794  // Put saves the specified key/value pair to the bucket.
 795  //
 796  // Keys that do not already exist are added and keys that already exist are overwritten.
 797  //
 798  // Returns the following errors as required by the interface contract:
 799  //
 800  //   - ErrKeyRequired if the key is empty
 801  //   - ErrIncompatibleValue if the key is the same as an existing bucket
 802  //   - ErrTxNotWritable if attempted against a read-only transaction
 803  //   - ErrTxClosed if the transaction has already been closed
 804  //
 805  // This function is part of the database.Bucket interface implementation.
 806  func (b *bucket) Put(key, value []byte) (e error) {
 807  	// Ensure transaction state is valid.
 808  	if e := b.tx.checkClosed(); E.Chk(e) {
 809  		return e
 810  	}
 811  	// Ensure the transaction is writable.
 812  	if !b.tx.writable {
 813  		str := "setting a key requires a writable database transaction"
 814  		return makeDbErr(database.ErrTxNotWritable, str, nil)
 815  	}
 816  	// Ensure a key was provided.
 817  	if len(key) == 0 {
 818  		str := "put requires a key"
 819  		return makeDbErr(database.ErrKeyRequired, str, nil)
 820  	}
 821  	return b.tx.putKey(bucketizedKey(b.id, key), value)
 822  }
 823  
 824  // Get returns the value for the given key.
 825  //
 826  // Returns nil if the key does not exist in this bucket.
 827  //
 828  // An empty slice is returned for keys that exist but have no value assigned.
 829  //
 830  // NOTE: The value returned by this function is only valid during a transaction. Attempting to access it after a
 831  // transaction has ended results in undefined behavior. Additionally, the value must NOT be modified by the caller.
 832  //
 833  // This function is part of the database.Bucket interface implementation.
 834  func (b *bucket) Get(key []byte) []byte {
 835  	// Ensure transaction state is valid.
 836  	if e := b.tx.checkClosed(); E.Chk(e) {
 837  		return nil
 838  	}
 839  	// Nothing to return if there is no key.
 840  	if len(key) == 0 {
 841  		return nil
 842  	}
 843  	return b.tx.fetchKey(bucketizedKey(b.id, key))
 844  }
 845  
 846  // Delete removes the specified key from the bucket.
 847  //
 848  // Deleting a key that does not exist does not return an error.
 849  //
 850  // Returns the following errors as required by the interface contract:
 851  //
 852  //   - ErrKeyRequired if the key is empty
 853  //   - ErrIncompatibleValue if the key is the same as an existing bucket
 854  //   - ErrTxNotWritable if attempted against a read-only transaction
 855  //   - ErrTxClosed if the transaction has already been closed
 856  //
 857  // This function is part of the database.Bucket interface implementation.
 858  func (b *bucket) Delete(key []byte) (e error) {
 859  	// Ensure transaction state is valid.
 860  	if e := b.tx.checkClosed(); E.Chk(e) {
 861  		return e
 862  	}
 863  	// Ensure the transaction is writable.
 864  	if !b.tx.writable {
 865  		str := "deleting a value requires a writable database transaction"
 866  		return makeDbErr(database.ErrTxNotWritable, str, nil)
 867  	}
 868  	// Nothing to do if there is no key.
 869  	if len(key) == 0 {
 870  		return nil
 871  	}
 872  	b.tx.deleteKey(bucketizedKey(b.id, key), true)
 873  	return nil
 874  }
 875  
 876  // pendingBlock houses a block that will be written to disk when the database transaction is committed.
 877  type pendingBlock struct {
 878  	hash  *chainhash.Hash
 879  	bytes []byte
 880  }
 881  
 882  // transaction represents a database transaction.
 883  //
 884  // It can either be read-only or read-write and implements the database.
 885  //
 886  // Bucket interface. The transaction provides a root bucket against which all read and writes occur.
 887  type transaction struct {
 888  	managed        bool             // Is the transaction managed?
 889  	closed         bool             // Is the transaction closed?
 890  	writable       bool             // Is the transaction writable?
 891  	db             *db              // DB instance the tx was created from.
 892  	snapshot       *dbCacheSnapshot // Underlying snapshot for txns.
 893  	metaBucket     *bucket          // The root metadata bucket.
 894  	blockIdxBucket *bucket          // The block index bucket.
 895  	// Blocks that need to be stored on commit.
 896  	//
 897  	// The pendingBlocks map is kept to allow quick lookups of pending data by block hash.
 898  	pendingBlocks    map[chainhash.Hash]int
 899  	pendingBlockData []pendingBlock
 900  	// Keys that need to be stored or deleted on commit.
 901  	pendingKeys   *treap.Mutable
 902  	pendingRemove *treap.Mutable
 903  	// Active iterators that need to be notified when the pending keys have been updated so the cursors can properly
 904  	// handle updates to the transaction state.
 905  	activeIterLock sync.RWMutex
 906  	activeIters    []*treap.Iterator
 907  }
 908  
 909  // Enforce transaction implements the database.Tx interface.
 910  var _ database.Tx = (*transaction)(nil)
 911  
 912  // removeActiveIter removes the passed iterator from the list of active iterators against the pending keys treap.
 913  func (tx *transaction) removeActiveIter(iter *treap.Iterator) {
 914  	// An indexing for loop is intentionally used over a range here as range does not reevaluate the slice on each
 915  	// iteration nor does it adjust the index for the modified slice.
 916  	tx.activeIterLock.Lock()
 917  	for i := 0; i < len(tx.activeIters); i++ {
 918  		if tx.activeIters[i] == iter {
 919  			copy(tx.activeIters[i:], tx.activeIters[i+1:])
 920  			tx.activeIters[len(tx.activeIters)-1] = nil
 921  			tx.activeIters = tx.activeIters[:len(tx.activeIters)-1]
 922  		}
 923  	}
 924  	tx.activeIterLock.Unlock()
 925  }
 926  
 927  // addActiveIter adds the passed iterator to the list of active iterators for the pending keys treap.
 928  func (tx *transaction) addActiveIter(iter *treap.Iterator) {
 929  	tx.activeIterLock.Lock()
 930  	tx.activeIters = append(tx.activeIters, iter)
 931  	tx.activeIterLock.Unlock()
 932  }
 933  
 934  // notifyActiveIters notifies all of the active iterators for the pending keys treap that it has been updated.
 935  func (tx *transaction) notifyActiveIters() {
 936  	tx.activeIterLock.RLock()
 937  	for _, iter := range tx.activeIters {
 938  		iter.ForceReseek()
 939  	}
 940  	tx.activeIterLock.RUnlock()
 941  }
 942  
 943  // checkClosed returns an error if the the database or transaction is closed.
 944  func (tx *transaction) checkClosed() (e error) {
 945  	// The transaction is no longer valid if it has been closed.
 946  	if tx.closed {
 947  		return makeDbErr(database.ErrTxClosed, errTxClosedStr, nil)
 948  	}
 949  	return nil
 950  }
 951  
 952  // hasKey returns whether or not the provided key exists in the database while taking into account the current
 953  // transaction state.
 954  func (tx *transaction) hasKey(key []byte) bool {
 955  	// When the transaction is writable, check the pending transaction state first.
 956  	if tx.writable {
 957  		if tx.pendingRemove.Has(key) {
 958  			return false
 959  		}
 960  		if tx.pendingKeys.Has(key) {
 961  			return true
 962  		}
 963  	}
 964  	// Consult the database cache and underlying database.
 965  	return tx.snapshot.Has(key)
 966  }
 967  
 968  // putKey adds the provided key to the list of keys to be updated in the database when the transaction is committed.
 969  //
 970  // NOTE: This function must only be called on a writable transaction.
 971  //
 972  // Since it is an internal helper function, it does not check.
 973  func (tx *transaction) putKey(key, value []byte) (e error) {
 974  	// Prevent the key from being deleted if it was previously scheduled to
 975  	// be deleted on transaction commit.
 976  	tx.pendingRemove.Delete(key)
 977  	// Add the key/value pair to the list to be written on transaction commit.
 978  	tx.pendingKeys.Put(key, value)
 979  	tx.notifyActiveIters()
 980  	return nil
 981  }
 982  
 983  // fetchKey attempts to fetch the provided key from the database cache ( and hence underlying database) while taking
 984  // into account the current transaction state. Returns nil if the key does not exist.
 985  func (tx *transaction) fetchKey(key []byte) []byte {
 986  	// When the transaction is writable, check the pending transaction state first.
 987  	if tx.writable {
 988  		if tx.pendingRemove.Has(key) {
 989  			return nil
 990  		}
 991  		if value := tx.pendingKeys.Get(key); value != nil {
 992  			return value
 993  		}
 994  	}
 995  	// Consult the database cache and underlying database.
 996  	return tx.snapshot.Get(key)
 997  }
 998  
 999  // deleteKey adds the provided key to the list of keys to be deleted from the database when the transaction is
1000  // committed.
1001  //
1002  // The notify iterators flag is useful to delay notifying iterators about the changes during bulk deletes.
1003  //
1004  // NOTE: This function must only be called on a writable transaction.
1005  //
1006  // Since it is an internal helper function, it does not check.
1007  func (tx *transaction) deleteKey(key []byte, notifyIterators bool) {
1008  	// Remove the key from the list of pendings keys to be written on transaction commit if needed.
1009  	tx.pendingKeys.Delete(key)
1010  	// Add the key to the list to be deleted on transaction	commit.
1011  	tx.pendingRemove.Put(key, nil)
1012  	// Notify the active iterators about the change if the flag is set.
1013  	if notifyIterators {
1014  		tx.notifyActiveIters()
1015  	}
1016  }
1017  
1018  // nextBucketID returns the next bucket ID to use for creating a new bucket.
1019  //
1020  // NOTE: This function must only be called on a writable transaction.
1021  //
1022  // Since it is an internal helper function, it does not check.
1023  func (tx *transaction) nextBucketID() ([4]byte, error) {
1024  	// Load the currently highest used bucket ID.
1025  	curIDBytes := tx.fetchKey(curBucketIDKeyName)
1026  	curBucketNum := binary.BigEndian.Uint32(curIDBytes)
1027  	// Increment and update the current bucket ID and return it.
1028  	var nextBucketID [4]byte
1029  	binary.BigEndian.PutUint32(nextBucketID[:], curBucketNum+1)
1030  	if e := tx.putKey(curBucketIDKeyName, nextBucketID[:]); E.Chk(e) {
1031  		return [4]byte{}, e
1032  	}
1033  	return nextBucketID, nil
1034  }
1035  
1036  // Metadata returns the top-most bucket for all metadata storage.
1037  //
1038  // This function is part of the database.Tx interface implementation.
1039  func (tx *transaction) Metadata() database.Bucket {
1040  	return tx.metaBucket
1041  }
1042  
1043  // hasBlock returns whether or not a block with the given hash exists.
1044  func (tx *transaction) hasBlock(hash *chainhash.Hash) bool {
1045  	// Return true if the block is pending to be written on commit since it exists from the viewpoint of this
1046  	// transaction.
1047  	if _, exists := tx.pendingBlocks[*hash]; exists {
1048  		return true
1049  	}
1050  	return tx.hasKey(bucketizedKey(blockIdxBucketID, hash[:]))
1051  }
1052  
1053  // StoreBlock stores the provided block into the database.
1054  //
1055  // There are no checks to ensure the block connects to a previous block, contains double spends, or any additional
1056  // functionality such as transaction indexing.
1057  //
1058  // It simply stores the block in the database.
1059  //
1060  // Returns the following errors as required by the interface contract:
1061  //
1062  //   - ErrBlockExists when the block hash already exists
1063  //
1064  //   - ErrTxNotWritable if attempted against a read-only transaction
1065  //
1066  //   - ErrTxClosed if the transaction has already been closed
1067  //
1068  // This function is part of the database.Tx interface implementation.
1069  func (tx *transaction) StoreBlock(block *block.Block) (e error) {
1070  	// Ensure transaction state is valid.
1071  	if e = tx.checkClosed(); E.Chk(e) {
1072  		return e
1073  	}
1074  	// Ensure the transaction is writable.
1075  	if !tx.writable {
1076  		str := "store block requires a writable database transaction"
1077  		return makeDbErr(database.ErrTxNotWritable, str, nil)
1078  	}
1079  	// Reject the block if it already exists.
1080  	blockHash := block.Hash()
1081  	if tx.hasBlock(blockHash) {
1082  		str := fmt.Sprintf("block %s already exists", blockHash)
1083  		return makeDbErr(database.ErrBlockExists, str, nil)
1084  	}
1085  	blockBytes, e := block.Bytes()
1086  	if e != nil {
1087  		str := fmt.Sprintf(
1088  			"failed to get serialized bytes for block %s",
1089  			blockHash,
1090  		)
1091  		return makeDbErr(database.ErrDriverSpecific, str, e)
1092  	}
1093  	// Add the block to be stored to the list of pending blocks to store when the transaction is committed.
1094  	//
1095  	// Also add it to pending blocks map so it is easy to determine the block is pending based on the block hash.
1096  	if tx.pendingBlocks == nil {
1097  		tx.pendingBlocks = make(map[chainhash.Hash]int)
1098  	}
1099  	tx.pendingBlocks[*blockHash] = len(tx.pendingBlockData)
1100  	tx.pendingBlockData = append(
1101  		tx.pendingBlockData, pendingBlock{
1102  			hash:  blockHash,
1103  			bytes: blockBytes,
1104  		},
1105  	)
1106  	// Tracef("added block %s to pending blocks", blockHash)
1107  	return nil
1108  }
1109  
1110  // HasBlock returns whether or not a block with the given hash exists in the database.
1111  //
1112  // Returns the following errors as required by the interface contract:
1113  //
1114  //   - ErrTxClosed if the transaction has already been closed
1115  //
1116  // This function is part of the database.Tx interface implementation.
1117  func (tx *transaction) HasBlock(hash *chainhash.Hash) (bool, error) {
1118  	// Ensure transaction state is valid.
1119  	if e := tx.checkClosed(); E.Chk(e) {
1120  		return false, e
1121  	}
1122  	return tx.hasBlock(hash), nil
1123  }
1124  
1125  // HasBlocks returns whether or not the blocks with the provided hashes exist in the database.
1126  //
1127  // Returns the following errors as required by the interface contract:
1128  //
1129  //   - ErrTxClosed if the transaction has already been closed
1130  //
1131  // This function is part of the database.Tx interface implementation.
1132  func (tx *transaction) HasBlocks(hashes []chainhash.Hash) ([]bool, error) {
1133  	// Ensure transaction state is valid.
1134  	if e := tx.checkClosed(); E.Chk(e) {
1135  		return nil, e
1136  	}
1137  	results := make([]bool, len(hashes))
1138  	for i := range hashes {
1139  		results[i] = tx.hasBlock(&hashes[i])
1140  	}
1141  	return results, nil
1142  }
1143  
1144  // fetchBlockRow fetches the metadata stored in the block index for the provided hash. It will return ErrBlockNotFound
1145  // if there is no entry.
1146  func (tx *transaction) fetchBlockRow(hash *chainhash.Hash) ([]byte, error) {
1147  	blockRow := tx.blockIdxBucket.Get(hash[:])
1148  	if blockRow == nil {
1149  		str := fmt.Sprintf("block %s does not exist", hash)
1150  		return nil, makeDbErr(database.ErrBlockNotFound, str, nil)
1151  	}
1152  	return blockRow, nil
1153  }
1154  
1155  // FetchBlockHeader returns the raw serialized bytes for the block header identified by the given hash.
1156  //
1157  // The raw bytes are in the format returned by Serialize on a wire.BlockHeader.
1158  //
1159  // Returns the following errors as required by the interface contract:
1160  //
1161  //   - ErrBlockNotFound if the requested block hash does not exist
1162  //
1163  //   - ErrTxClosed if the transaction has already been closed
1164  //
1165  //   - ErrCorruption if the database has somehow become corrupted
1166  //
1167  // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a
1168  // transaction has ended results in undefined behavior.
1169  //
1170  // This constraint prevents additional data copies and allows support for memory-mapped database implementations.
1171  //
1172  // This function is part of the database.Tx interface implementation.
1173  func (tx *transaction) FetchBlockHeader(hash *chainhash.Hash) ([]byte, error) {
1174  	return tx.FetchBlockRegion(
1175  		&database.BlockRegion{
1176  			Hash:   hash,
1177  			Offset: 0,
1178  			Len:    blockHdrSize,
1179  		},
1180  	)
1181  }
1182  
1183  // FetchBlockHeaders returns the raw serialized bytes for the block headers identified by the given hashes.
1184  //
1185  // The raw bytes are in the format returned by Serialize on a wire.BlockHeader.
1186  //
1187  // Returns the following errors as required by the interface contract:
1188  //
1189  //   - ErrBlockNotFound if the any of the requested block hashes do not exist
1190  //
1191  //   - ErrTxClosed if the transaction has already been closed
1192  //
1193  //   - ErrCorruption if the database has somehow become corrupted
1194  //
1195  // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a
1196  // transaction has ended results in undefined behavior.
1197  //
1198  // This constraint prevents additional data copies and allows support for memory-mapped database implementations.
1199  //
1200  // This function is part of the database.Tx interface implementation.
1201  func (tx *transaction) FetchBlockHeaders(hashes []chainhash.Hash) ([][]byte, error) {
1202  	regions := make([]database.BlockRegion, len(hashes))
1203  	for i := range hashes {
1204  		regions[i].Hash = &hashes[i]
1205  		regions[i].Offset = 0
1206  		regions[i].Len = blockHdrSize
1207  	}
1208  	return tx.FetchBlockRegions(regions)
1209  }
1210  
1211  // FetchBlock returns the raw serialized bytes for the block identified by the given hash. The raw bytes are in the
1212  // format returned by Serialize on a wire.Block.
1213  //
1214  // Returns the following errors as required by the interface contract:
1215  //
1216  //   - ErrBlockNotFound if the requested block hash does not exist
1217  //
1218  //   - ErrTxClosed if the transaction has already been closed
1219  //
1220  //   - ErrCorruption if the database has somehow become corrupted
1221  //
1222  // In addition, returns ErrDriverSpecific if any failures occur when reading the block files.
1223  //
1224  // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a
1225  // transaction has ended results in undefined behavior.
1226  //
1227  // This constraint prevents additional data copies and allows support for memory-mapped database implementations.
1228  //
1229  // This function is part of the database.Tx interface implementation.
1230  func (tx *transaction) FetchBlock(hash *chainhash.Hash) ([]byte, error) {
1231  	// Ensure transaction state is valid.
1232  	if e := tx.checkClosed(); E.Chk(e) {
1233  		return nil, e
1234  	}
1235  	// When the block is pending to be written on commit return the bytes from there.
1236  	if idx, exists := tx.pendingBlocks[*hash]; exists {
1237  		return tx.pendingBlockData[idx].bytes, nil
1238  	}
1239  	// Lookup the location of the block in the files from the block index.
1240  	blockRow, e := tx.fetchBlockRow(hash)
1241  	if e != nil {
1242  		return nil, e
1243  	}
1244  	location := deserializeBlockLoc(blockRow)
1245  	// Read the block from the appropriate location. The function also performs a checksum over the data to detect data
1246  	// corruption.
1247  	blockBytes, e := tx.db.store.readBlock(hash, location)
1248  	if e != nil {
1249  		return nil, e
1250  	}
1251  	return blockBytes, nil
1252  }
1253  
1254  // FetchBlocks returns the raw serialized bytes for the blocks identified by the given hashes.
1255  //
1256  // The raw bytes are in the format returned by Serialize on a wire.Block.
1257  //
1258  // Returns the following errors as required by the interface contract:
1259  //
1260  //   - ErrBlockNotFound if any of the requested block hashed do not exist
1261  //
1262  //   - ErrTxClosed if the transaction has already been closed
1263  //
1264  //   - ErrCorruption if the database has somehow become corrupted
1265  //
1266  // In addition, returns ErrDriverSpecific if any failures occur when reading the block files.
1267  //
1268  // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a
1269  // transaction has ended results in undefined behavior.
1270  //
1271  // This constraint prevents additional data copies and allows support for memory-mapped database implementations.
1272  //
1273  // This function is part of the database.Tx interface implementation.
1274  func (tx *transaction) FetchBlocks(hashes []chainhash.Hash) ([][]byte, error) {
1275  	// Ensure transaction state is valid.
1276  	if e := tx.checkClosed(); E.Chk(e) {
1277  		return nil, e
1278  	}
1279  	// NOTE: This could check for the existence of all blocks before loading any of them which would be faster in the
1280  	// failure case, however callers will not typically be calling this function with invalid values, so optimize for
1281  	// the common case. Load the blocks.
1282  	blocks := make([][]byte, len(hashes))
1283  	for i := range hashes {
1284  		var e error
1285  		blocks[i], e = tx.FetchBlock(&hashes[i])
1286  		if e != nil {
1287  			return nil, e
1288  		}
1289  	}
1290  	return blocks, nil
1291  }
1292  
1293  // fetchPendingRegion attempts to fetch the provided region from any block which are pending to be written on commit.
1294  //
1295  // It will return nil for the byte slice when the region references a block which is not pending. When the region does
1296  // reference a pending block, it is bounds checked and returns ErrBlockRegionInvalid if invalid.
1297  func (tx *transaction) fetchPendingRegion(region *database.BlockRegion) ([]byte, error) {
1298  	// Nothing to do if the block is not pending to be written on commit.
1299  	idx, exists := tx.pendingBlocks[*region.Hash]
1300  	if !exists {
1301  		return nil, nil
1302  	}
1303  	// Ensure the region is within the bounds of the block.
1304  	blockBytes := tx.pendingBlockData[idx].bytes
1305  	blockLen := uint32(len(blockBytes))
1306  	endOffset := region.Offset + region.Len
1307  	if endOffset < region.Offset || endOffset > blockLen {
1308  		str := fmt.Sprintf(
1309  			"block %s region offset %d, length %d "+
1310  				"exceeds block length of %d", region.Hash,
1311  			region.Offset, region.Len, blockLen,
1312  		)
1313  		return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
1314  	}
1315  	// Return the bytes from the pending block.
1316  	return blockBytes[region.Offset:endOffset:endOffset], nil
1317  }
1318  
1319  // FetchBlockRegion returns the raw serialized bytes for the given block region.
1320  //
1321  // For example, it is possible to directly extract Bitcoin transactions and/or scripts from a block with this function.
1322  // Depending on the backend implementation, this can provide significant savings by avoiding the need to load entire
1323  // blocks.
1324  //
1325  // The raw bytes are in the format returned by Serialize on a wire.Block and the Offset field in the provided
1326  // BlockRegion is zero-based and relative to the start of the block (byte 0).
1327  //
1328  // Returns the following errors as required by the interface contract:
1329  //
1330  //   - ErrBlockNotFound if the requested block hash does not exist
1331  //
1332  //   - ErrBlockRegionInvalid if the region exceeds the bounds of the associated block
1333  //
1334  //   - ErrTxClosed if the transaction has already been closed
1335  //
1336  //   - ErrCorruption if the database has somehow become corrupted
1337  //
1338  // In addition, returns ErrDriverSpecific if any failures occur when reading the block files.
1339  //
1340  // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a
1341  // transaction has ended results in undefined behavior. This constraint prevents additional data copies and allows
1342  // support for memory-mapped database implementations. This function is part of the database.Tx interface
1343  // implementation.
1344  func (tx *transaction) FetchBlockRegion(region *database.BlockRegion) ([]byte, error) {
1345  	// Ensure transaction state is valid.
1346  	if e := tx.checkClosed(); E.Chk(e) {
1347  		return nil, e
1348  	}
1349  	// When the block is pending to be written on commit return the bytes from there.
1350  	if tx.pendingBlocks != nil {
1351  		regionBytes, e := tx.fetchPendingRegion(region)
1352  		if e != nil {
1353  			return nil, e
1354  		}
1355  		if regionBytes != nil {
1356  			return regionBytes, nil
1357  		}
1358  	}
1359  	// Lookup the location of the block in the files from the block index.
1360  	blockRow, e := tx.fetchBlockRow(region.Hash)
1361  	if e != nil {
1362  		return nil, e
1363  	}
1364  	location := deserializeBlockLoc(blockRow)
1365  	// Ensure the region is within the bounds of the block.
1366  	endOffset := region.Offset + region.Len
1367  	if endOffset < region.Offset || endOffset > location.blockLen {
1368  		str := fmt.Sprintf(
1369  			"block %s region offset %d, length %d "+
1370  				"exceeds block length of %d", region.Hash,
1371  			region.Offset, region.Len, location.blockLen,
1372  		)
1373  		return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
1374  	}
1375  	// Read the region from the appropriate disk block file.
1376  	regionBytes, e := tx.db.store.readBlockRegion(
1377  		location, region.Offset,
1378  		region.Len,
1379  	)
1380  	if e != nil {
1381  		return nil, e
1382  	}
1383  	return regionBytes, nil
1384  }
1385  
1386  // FetchBlockRegions returns the raw serialized bytes for the given block regions.
1387  //
1388  // For example, it is possible to directly extract Bitcoin transactions and/or scripts from various blocks with this
1389  // function. Depending on the backend implementation, this can provide significant savings by avoiding the need to load
1390  // entire blocks.
1391  //
1392  // The raw bytes are in the format returned by Serialize on a wire.Block and the Offset fields in the provided
1393  // BlockRegions are zero-based and relative to the start of the block (byte 0).
1394  //
1395  // Returns the following errors as required by the interface contract:
1396  //
1397  //   - ErrBlockNotFound if any of the request block hashes do not exist
1398  //
1399  //   - ErrBlockRegionInvalid if one or more region exceed the bounds of the associated block
1400  //
1401  //   - ErrTxClosed if the transaction has already been closed
1402  //
1403  //   - ErrCorruption if the database has somehow become corrupted
1404  //
1405  // In addition, returns ErrDriverSpecific if any failures occur when reading the block files.
1406  //
1407  // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a
1408  // transaction has ended results in undefined behavior. This constraint prevents additional data copies and allows
1409  // support for memory-mapped database implementations. This function is part of the database.Tx interface
1410  // implementation.
1411  func (tx *transaction) FetchBlockRegions(regions []database.BlockRegion) ([][]byte, error) {
1412  	// Ensure transaction state is valid.
1413  	if e := tx.checkClosed(); E.Chk(e) {
1414  		return nil, e
1415  	}
1416  	// NOTE: This could check for the existence of all blocks before deserializing the locations and building up the
1417  	// fetch list which would be faster in the failure case, however callers will not typically be calling this function
1418  	// with invalid values, so optimize for the common case.
1419  	//
1420  	// NOTE: A potential optimization here would be to combine adjacent regions to reduce the number of reads.
1421  	//
1422  	// In order to improve efficiency of loading the bulk data, first grab the block location for all of the requested
1423  	// block hashes and txsort the reads by filenum:offset so that all reads are grouped by file and linear within each
1424  	// file.
1425  	//
1426  	// This can result in quite a significant performance increase depending on how spread out the requested hashes are
1427  	// by reducing the number of file open/closes and random accesses needed.
1428  	//
1429  	// The fetchList is intentionally allocated with a cap because some of the regions might be fetched from the pending
1430  	// blocks and hence there is no need to fetch those from disk.
1431  	blockRegions := make([][]byte, len(regions))
1432  	fetchList := make([]bulkFetchData, 0, len(regions))
1433  	for i := range regions {
1434  		region := &regions[i]
1435  		// When the block is pending to be written on commit grab the bytes from there.
1436  		if tx.pendingBlocks != nil {
1437  			regionBytes, e := tx.fetchPendingRegion(region)
1438  			if e != nil {
1439  				return nil, e
1440  			}
1441  			if regionBytes != nil {
1442  				blockRegions[i] = regionBytes
1443  				continue
1444  			}
1445  		}
1446  		// Lookup the location of the block in the files from the block index.
1447  		blockRow, e := tx.fetchBlockRow(region.Hash)
1448  		if e != nil {
1449  			return nil, e
1450  		}
1451  		location := deserializeBlockLoc(blockRow)
1452  		// Ensure the region is within the bounds of the block.
1453  		endOffset := region.Offset + region.Len
1454  		if endOffset < region.Offset || endOffset > location.blockLen {
1455  			str := fmt.Sprintf(
1456  				"block %s region offset %d, length "+
1457  					"%d exceeds block length of %d", region.Hash,
1458  				region.Offset, region.Len, location.blockLen,
1459  			)
1460  			return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
1461  		}
1462  		fetchList = append(fetchList, bulkFetchData{&location, i})
1463  	}
1464  	sort.Sort(bulkFetchDataSorter(fetchList))
1465  	// Read all of the regions in the fetch list and set the results.
1466  	for i := range fetchList {
1467  		fetchData := &fetchList[i]
1468  		ri := fetchData.replyIndex
1469  		region := &regions[ri]
1470  		location := fetchData.blockLocation
1471  		regionBytes, e := tx.db.store.readBlockRegion(
1472  			*location,
1473  			region.Offset, region.Len,
1474  		)
1475  		if e != nil {
1476  			return nil, e
1477  		}
1478  		blockRegions[ri] = regionBytes
1479  	}
1480  	return blockRegions, nil
1481  }
1482  
1483  // close marks the transaction closed then releases any pending data, the underlying snapshot, the transaction read
1484  // lock, and the write lock when the transaction is writable.
1485  func (tx *transaction) close() {
1486  	tx.closed = true
1487  	// Clear pending blocks that would have been written on commit.
1488  	tx.pendingBlocks = nil
1489  	tx.pendingBlockData = nil
1490  	// Clear pending keys that would have been written or deleted on commit.
1491  	tx.pendingKeys = nil
1492  	tx.pendingRemove = nil
1493  	// Release the snapshot.
1494  	if tx.snapshot != nil {
1495  		tx.snapshot.Release()
1496  		tx.snapshot = nil
1497  	}
1498  	tx.db.closeLock.RUnlock()
1499  	// Release the writer lock for writable transactions to unblock any other write transaction which are possibly
1500  	// waiting.
1501  	if tx.writable {
1502  		tx.db.writeLock.Unlock()
1503  	}
1504  }
1505  
1506  // writePendingAndCommit writes pending block data to the flat block files, updates the metadata with their locations as
1507  // well as the new current write location, and commits the metadata to the memory database cache.
1508  //
1509  // It also properly handles rollback in the case of failures.
1510  //
1511  // This function MUST only be called when there is pending data to be written.
1512  func (tx *transaction) writePendingAndCommit() (e error) {
1513  	// Save the current block store write position for potential rollback.
1514  	//
1515  	// These variables are only updated here in this function and there can only be one write transaction active at a
1516  	// time, so it's safe to store them for potential rollback.
1517  	wc := tx.db.store.writeCursor
1518  	wc.RLock()
1519  	oldBlkFileNum := wc.curFileNum
1520  	oldBlkOffset := wc.curOffset
1521  	wc.RUnlock()
1522  	// rollback is a closure that is used to rollback all writes to the block files.
1523  	rollback := func() {
1524  		// Rollback any modifications made to the block files if needed.
1525  		tx.db.store.handleRollback(oldBlkFileNum, oldBlkOffset)
1526  	}
1527  	// Loop through all of the pending blocks to store and write them.
1528  	for _, blockData := range tx.pendingBlockData {
1529  		// Tracef("storing block %s", blockData.hash)
1530  		location, e := tx.db.store.writeBlock(blockData.bytes)
1531  		if e != nil {
1532  			rollback()
1533  			return e
1534  		}
1535  		// Add a record in the block index for the block. The record includes the location information needed to locate
1536  		// the block on the filesystem as well as the block header since they are so commonly needed.
1537  		blockRow := serializeBlockLoc(location)
1538  		e = tx.blockIdxBucket.Put(blockData.hash[:], blockRow)
1539  		if e != nil {
1540  			rollback()
1541  			return e
1542  		}
1543  	}
1544  	// Update the metadata for the current write file and offset.
1545  	writeRow := serializeWriteRow(wc.curFileNum, wc.curOffset)
1546  	if e := tx.metaBucket.Put(writeLocKeyName, writeRow); E.Chk(e) {
1547  		rollback()
1548  		return convertErr("failed to store write cursor", e)
1549  	}
1550  	// Atomically update the database cache.
1551  	//
1552  	// The cache automatically handles flushing to the underlying persistent storage database.
1553  	return tx.db.cache.commitTx(tx)
1554  }
1555  
1556  // Commit commits all changes that have been made to the root metadata bucket and all of its sub-buckets to the database
1557  // cache which is periodically synced to persistent storage. In addition, it commits all new blocks directly to
1558  // persistent storage bypassing the db cache.
1559  //
1560  // Blocks can be rather large so this help increase the amount of cache available for the metadata updates and is safe
1561  // since blocks are immutable.
1562  //
1563  // This function is part of the database.Tx interface implementation.
1564  func (tx *transaction) Commit() (e error) {
1565  	// Prevent commits on managed transactions.
1566  	if tx.managed {
1567  		tx.close()
1568  		panic("managed transaction commit not allowed")
1569  	}
1570  	// Ensure transaction state is valid.
1571  	if e := tx.checkClosed(); E.Chk(e) {
1572  		return e
1573  	}
1574  	// Regardless of whether the commit succeeds, the transaction is closed on return.
1575  	defer tx.close()
1576  	// Ensure the transaction is writable.
1577  	if !tx.writable {
1578  		str := "Commit requires a writable database transaction"
1579  		return makeDbErr(database.ErrTxNotWritable, str, nil)
1580  	}
1581  	// Write pending data.  The function will rollback if any errors occur.
1582  	return tx.writePendingAndCommit()
1583  }
1584  
1585  // Rollback undoes all changes that have been made to the root bucket and all of its sub-buckets.
1586  //
1587  // This function is part of the database.Tx interface implementation.
1588  func (tx *transaction) Rollback() (e error) {
1589  	// Prevent rollbacks on managed transactions.
1590  	if tx.managed {
1591  		tx.close()
1592  		panic("managed transaction rollback not allowed")
1593  	}
1594  	// Ensure transaction state is valid.
1595  	if e := tx.checkClosed(); E.Chk(e) {
1596  		return e
1597  	}
1598  	tx.close()
1599  	return nil
1600  }
1601  
1602  // db represents a collection of namespaces which are persisted and implements the database.DB interface. All database
1603  // access is performed through transactions which are obtained through the specific Namespace.
1604  type db struct {
1605  	writeLock sync.Mutex   // Limit to one write transaction at a time.
1606  	closeLock sync.RWMutex // Make database close block while txns active.
1607  	closed    bool         // Is the database closed?
1608  	store     *blockStore  // Handles read/writing blocks to flat files.
1609  	cache     *dbCache     // Cache layer which wraps underlying leveldb DB.
1610  }
1611  
1612  // Enforce db implements the database.DB interface.
1613  var _ database.DB = (*db)(nil)
1614  
1615  // Type returns the database driver type the current database instance was created with. This function is part of the
1616  // database DB interface implementation.
1617  func (db *db) Type() string {
1618  	return dbType
1619  }
1620  
1621  // begin is the implementation function for the Begin database method.
1622  //
1623  // See its documentation for more details.
1624  //
1625  // This function is only separate because it returns the internal transaction which is used by the managed transaction
1626  // code while the database method returns the interface.
1627  func (db *db) begin(writable bool) (*transaction, error) {
1628  	// Whenever a new writable transaction is started, grab the write lock to ensure only a single write transaction can
1629  	// be active at the same time.
1630  	//
1631  	// This lock will not be released until the transaction is closed ( via Rollback or Commit).
1632  	if writable {
1633  		db.writeLock.Lock()
1634  	}
1635  	// Whenever a new transaction is started, grab a read lock against the database to ensure Close will wait for the
1636  	// transaction to finish.
1637  	//
1638  	// This lock will not be released until the transaction is closed ( via Rollback or Commit).
1639  	db.closeLock.RLock()
1640  	if db.closed {
1641  		db.closeLock.RUnlock()
1642  		if writable {
1643  			db.writeLock.Unlock()
1644  		}
1645  		return nil, makeDbErr(
1646  			database.ErrDbNotOpen, errDbNotOpenStr,
1647  			nil,
1648  		)
1649  	}
1650  	// Grab a snapshot of the database cache (which in turn also handles the underlying database).
1651  	snapshot, e := db.cache.Snapshot()
1652  	if e != nil {
1653  		db.closeLock.RUnlock()
1654  		if writable {
1655  			db.writeLock.Unlock()
1656  		}
1657  		return nil, e
1658  	}
1659  	// The metadata and block index buckets are internal-only buckets, so they have defined IDs.
1660  	tx := &transaction{
1661  		writable:      writable,
1662  		db:            db,
1663  		snapshot:      snapshot,
1664  		pendingKeys:   treap.NewMutable(),
1665  		pendingRemove: treap.NewMutable(),
1666  	}
1667  	tx.metaBucket = &bucket{tx: tx, id: metadataBucketID}
1668  	tx.blockIdxBucket = &bucket{tx: tx, id: blockIdxBucketID}
1669  	return tx, nil
1670  }
1671  
1672  // Begin starts a transaction which is either read-only or read-write depending
1673  // on the specified flag.
1674  //
1675  // Multiple read-only transactions can be started simultaneously while only a
1676  // single read-write transaction can be started at a time.
1677  //
1678  // The call will block when starting a read-write transaction when one is
1679  // already open.
1680  //
1681  // NOTE: The transaction must be closed by calling Rollback or Commit on it when
1682  // it is no longer needed.
1683  //
1684  // Failure to do so will result in unclaimed memory.
1685  //
1686  // This function is part of the database.DB interface implementation.
1687  func (db *db) Begin(writable bool) (database.Tx, error) {
1688  	return db.begin(writable)
1689  }
1690  
1691  // rollbackOnPanic rolls the passed transaction back if the code in the calling
1692  // function panics. This is needed since the mutex on a transaction must be
1693  // released and a panic in called code would prevent that from happening.
1694  //
1695  // NOTE: This can only be handled manually for managed transactions since they
1696  // control the life-cycle of the transaction.
1697  //
1698  // As the documentation on Begin calls out, callers opting to use manual
1699  // transactions will have to ensure the transaction is rolled back on panic if
1700  // it desires that functionality as well or the database will fail to close
1701  // since the read-lock will never be released.
1702  func rollbackOnPanic(tx *transaction) {
1703  	if err := recover(); err != nil {
1704  		tx.managed = false
1705  		_ = tx.Rollback()
1706  		panic(err)
1707  	}
1708  }
1709  
1710  // View invokes the passed function in the context of a managed read-only
1711  // transaction with the root bucket for the namespace.
1712  //
1713  // Any errors returned from the user-supplied function are returned from this
1714  // function. This function is part of the database.DB interface implementation.
1715  func (db *db) View(fn func(database.Tx) error) (e error) {
1716  	// Start a read-only transaction.
1717  	tx, e := db.begin(false)
1718  	if e != nil {
1719  		return e
1720  	}
1721  	// Since the user-provided function might panic, ensure the transaction releases
1722  	// all mutexes and resources.
1723  	//
1724  	// There is no guarantee the caller won't use recover and keep going.
1725  	//
1726  	// Thus, the database must still be in a usable state on panics due to caller
1727  	// issues.
1728  	defer rollbackOnPanic(tx)
1729  	tx.managed = true
1730  	e = fn(tx)
1731  	tx.managed = false
1732  	if e != nil {
1733  		// The error is ignored here because nothing was written yet and regardless of a rollback failure, the tx is
1734  		// closed now anyways.
1735  		_ = tx.Rollback()
1736  		return e
1737  	}
1738  	return tx.Rollback()
1739  }
1740  
1741  // Update invokes the passed function in the context of a managed read -write transaction with the root bucket for the
1742  // namespace.
1743  //
1744  // Any errors returned from the user-supplied function will cause the transaction to be rolled back and are returned
1745  // from this function.
1746  //
1747  // Otherwise, the transaction is committed when the user-supplied function returns a nil error. This function is part of
1748  // the database. DB interface implementation.
1749  func (db *db) Update(fn func(database.Tx) error) (e error) {
1750  	// Start a read-write transaction.
1751  	tx, e := db.begin(true)
1752  	if e != nil {
1753  		return e
1754  	}
1755  	// Since the user-provided function might panic, ensure the transaction releases all mutexes and resources.
1756  	//
1757  	// There is no guarantee the caller won't use recover and keep going.
1758  	//
1759  	// Thus, the database must still be in a usable state on panics due to caller issues.
1760  	defer rollbackOnPanic(tx)
1761  	tx.managed = true
1762  	e = fn(tx)
1763  	tx.managed = false
1764  	if e != nil {
1765  		// The error is ignored here because nothing was written yet and regardless of a rollback failure, the tx is
1766  		// closed now anyways.
1767  		_ = tx.Rollback()
1768  		return e
1769  	}
1770  	return tx.Commit()
1771  }
1772  
1773  // Close cleanly shuts down the database and syncs all data.
1774  //
1775  // It will block until all database transactions have been finalized ( rolled back or committed). This function is part
1776  // of the database.
1777  //
1778  // DB interface implementation.
1779  func (db *db) Close() (e error) {
1780  	// Since all transactions have a read lock on this mutex, this will cause Close to wait for all readers to complete.
1781  	db.closeLock.Lock()
1782  	defer db.closeLock.Unlock()
1783  	if db.closed {
1784  		return makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr, nil)
1785  	}
1786  	db.closed = true
1787  	// NOTE: Since the above lock waits for all transactions to finish and prevents any new ones from being started, it
1788  	// is safe to flush the cache and clear all state without the individual locks. Close the database cache which will
1789  	// flush any existing entries to disk and close the underlying leveldb database.
1790  	//
1791  	// Any error is saved and returned at the end after the remaining cleanup since the database will be marked closed
1792  	// even if this fails given there is no good way for the caller to recover from a failure here anyways.
1793  	closeErr := db.cache.Close()
1794  	// Close any open flat files that house the blocks.
1795  	wc := db.store.writeCursor
1796  	if wc.curFile.file != nil {
1797  		_ = wc.curFile.file.Close()
1798  		wc.curFile.file = nil
1799  	}
1800  	for _, blockFile := range db.store.openBlockFiles {
1801  		_ = blockFile.file.Close()
1802  	}
1803  	db.store.openBlockFiles = nil
1804  	db.store.openBlocksLRU.Init()
1805  	db.store.fileNumToLRUElem = nil
1806  	return closeErr
1807  }
1808  
1809  func // fileExists reports whether the named file or directory exists.
1810  fileExists(name string) bool {
1811  	var e error
1812  	if _, e = os.Stat(name); E.Chk(e) {
1813  		if os.IsNotExist(e) {
1814  			return false
1815  		}
1816  	}
1817  	return true
1818  }
1819  
1820  // initDB creates the initial buckets and values used by the package.
1821  //
1822  // This is mainly in a separate function for testing purposes.
1823  func initDB(ldb *leveldb.DB) (e error) {
1824  	// The starting block file write cursor location is file num 0, offset 0.
1825  	batch := new(leveldb.Batch)
1826  	batch.Put(
1827  		bucketizedKey(metadataBucketID, writeLocKeyName),
1828  		serializeWriteRow(0, 0),
1829  	)
1830  	// Create block index bucket and set the current bucket id.
1831  	//
1832  	// NOTE: Since buckets are virtualized through the use of prefixes, there is no need to store the bucket index data
1833  	// for the metadata bucket in the database. However, the first bucket ID to use does need to account for it to
1834  	// ensure there are no key collisions.
1835  	batch.Put(
1836  		bucketIndexKey(metadataBucketID, blockIdxBucketName),
1837  		blockIdxBucketID[:],
1838  	)
1839  	batch.Put(curBucketIDKeyName, blockIdxBucketID[:])
1840  	// Write everything as a single batch.
1841  	if e := ldb.Write(batch, nil); E.Chk(e) {
1842  		str := fmt.Sprintf(
1843  			"failed to initialize metadata database: %v",
1844  			e,
1845  		)
1846  		return convertErr(str, e)
1847  	}
1848  	return nil
1849  }
1850  
1851  // openDB opens the database at the provided path
1852  //
1853  // ErrDbDoesNotExist is returned if the database doesn't exist and the create flag is not set.
1854  func openDB(dbPath string, network wire.BitcoinNet, create bool) (database.DB, error) {
1855  	// DBError if the database doesn't exist and the create flag is not set.
1856  	metadataDbPath := filepath.Join(dbPath, metadataDbName)
1857  	dbExists := fileExists(metadataDbPath)
1858  	if !create && !dbExists {
1859  		str := fmt.Sprintf("database %q does not exist", metadataDbPath)
1860  		return nil, makeDbErr(database.ErrDbDoesNotExist, str, nil)
1861  	}
1862  	// Ensure the full path to the database exists.
1863  	if !dbExists {
1864  		// The error can be ignored here since the call to leveldb. OpenFile will fail if the directory couldn't be
1865  		// created.
1866  		_ = os.MkdirAll(dbPath, 0700)
1867  	}
1868  	// Open the metadata database (will create it if needed).
1869  	opts := opt.Options{
1870  		ErrorIfExist: create,
1871  		Strict:       opt.DefaultStrict,
1872  		Compression:  opt.NoCompression,
1873  		Filter:       filter.NewBloomFilter(10),
1874  	}
1875  	ldb, e := leveldb.OpenFile(metadataDbPath, &opts)
1876  	if e != nil {
1877  		return nil, convertErr(e.Error(), e)
1878  	}
1879  	// Create the block store which includes scanning the existing flat block files to find what the current write
1880  	// cursor position is according to the data that is actually on disk.
1881  	//
1882  	// Also create the database cache which wraps the underlying leveldb database to provide write caching.
1883  	store := newBlockStore(dbPath, network)
1884  	cache := newDbCache(ldb, store, defaultCacheSize, defaultFlushSecs)
1885  	pdb := &db{store: store, cache: cache}
1886  	// Perform any reconciliation needed between the block and metadata as well as database initialization, if needed.
1887  	return reconcileDB(pdb, create)
1888  }
1889