1 package ffldb
2 3 import (
4 "bytes"
5 "encoding/binary"
6 "fmt"
7 "github.com/p9c/p9/pkg/block"
8 "os"
9 "path/filepath"
10 "runtime"
11 "sort"
12 "sync"
13 14 "github.com/btcsuite/goleveldb/leveldb"
15 "github.com/btcsuite/goleveldb/leveldb/comparer"
16 ldberrors "github.com/btcsuite/goleveldb/leveldb/errors"
17 "github.com/btcsuite/goleveldb/leveldb/filter"
18 "github.com/btcsuite/goleveldb/leveldb/iterator"
19 "github.com/btcsuite/goleveldb/leveldb/opt"
20 "github.com/btcsuite/goleveldb/leveldb/util"
21 22 "github.com/p9c/p9/pkg/chainhash"
23 "github.com/p9c/p9/pkg/database"
24 "github.com/p9c/p9/pkg/util/treap"
25 "github.com/p9c/p9/pkg/wire"
26 )
27 28 const (
29 // metadataDbName is the name used for the metadata database.
30 metadataDbName = "metadata"
31 // blockHdrSize is the size of a block header.
32 //
33 // This is simply the constant from wire and is only provided here for convenience since wire.MaxBlockHeaderPayload
34 // is quite long.
35 blockHdrSize = wire.MaxBlockHeaderPayload
36 // blockHdrOffset defines the offsets into a block index row for the block header.
37 //
38 // The serialized block index row format is:
39 //
40 // <blocklocation><blockheader>
41 blockHdrOffset = blockLocSize
42 )
43 44 var (
45 // byteOrder is the preferred byte order used through the database and block files. Sometimes big endian will be
46 // used to allow ordered byte sortable integer values.
47 byteOrder = binary.LittleEndian
48 // bucketIndexPrefix is the prefix used for all entries in the bucket index.
49 bucketIndexPrefix = []byte("bidx")
50 // curBucketIDKeyName is the name of the key used to keep track of the current bucket ID counter.
51 curBucketIDKeyName = []byte("bidx-cbid")
52 // metadataBucketID is the ID of the top-level metadata bucket. It is the value 0 encoded as an unsigned big-endian
53 // uint32.
54 metadataBucketID = [4]byte{}
55 // blockIdxBucketID is the ID of the internal block metadata bucket. It is the value 1 encoded as an unsigned
56 // big-endian uint32.
57 blockIdxBucketID = [4]byte{0x00, 0x00, 0x00, 0x01}
58 // blockIdxBucketName is the bucket used internally to track block metadata.
59 blockIdxBucketName = []byte("ffldb-blockidx")
60 // writeLocKeyName is the key used to store the current write file location.
61 writeLocKeyName = []byte("ffldb-writeloc")
62 )
63 64 // Common error strings.
65 const (
66 // errDbNotOpenStr is the text to use for the database.ErrDbNotOpen error code.
67 errDbNotOpenStr = "database is not open"
68 // errTxClosedStr is the text to use for the database.ErrTxClosed error code.
69 errTxClosedStr = "database tx is closed"
70 )
71 72 // bulkFetchData is allows a block location to be specified along with the index it was requested from.
73 //
74 // This in turn allows the bulk data loading functions to txsort the data accesses based on the location to improve
75 // performance while keeping track of which result the data is for.
76 type bulkFetchData struct {
77 *blockLocation
78 replyIndex int
79 }
80 81 // bulkFetchDataSorter implements txsort.
82 //
83 // Interface to allow a slice of bulkFetchData to be sorted. In particular it sorts by file and then offset so that
84 // reads from files are grouped and linear.
85 type bulkFetchDataSorter []bulkFetchData
86 87 // Len returns the number of items in the slice.
88 //
89 // It is part of the sort.Interface implementation.
90 func (s bulkFetchDataSorter) Len() int {
91 return len(s)
92 }
93 94 // Swap swaps the items at the passed indices.
95 //
96 // It is part of the sort.Interface implementation.
97 func (s bulkFetchDataSorter) Swap(i, j int) {
98 s[i], s[j] = s[j], s[i]
99 }
100 101 // Less returns whether the item with index i should txsort before the item with index j.
102 //
103 // It is part of the sort.Interface implementation.
104 func (s bulkFetchDataSorter) Less(i, j int) bool {
105 if s[i].blockFileNum < s[j].blockFileNum {
106 return true
107 }
108 if s[i].blockFileNum > s[j].blockFileNum {
109 return false
110 }
111 return s[i].fileOffset < s[j].fileOffset
112 }
113 114 // makeDbErr creates a database.DBError given a set of arguments.
115 func makeDbErr(c database.ErrorCode, desc string, e error) database.DBError {
116 return database.DBError{ErrorCode: c, Description: desc, Err: e}
117 }
118 119 // convertErr converts the passed leveldb error into a database error with an equivalent error code and the passed
120 // description.
121 //
122 // It also sets the passed error as the underlying error.
123 func convertErr(desc string, ldbErr error) database.DBError {
124 // Use the driver-specific error code by default.
125 //
126 // The code below will update this with the converted error if it's recognized.
127 var code = database.ErrDriverSpecific
128 switch {
129 // Database corruption errors.
130 case ldberrors.IsCorrupted(ldbErr):
131 code = database.ErrCorruption
132 // Database open/create errors.
133 case ldbErr == leveldb.ErrClosed:
134 code = database.ErrDbNotOpen
135 // Transaction errors.
136 case ldbErr == leveldb.ErrSnapshotReleased:
137 code = database.ErrTxClosed
138 case ldbErr == leveldb.ErrIterReleased:
139 code = database.ErrTxClosed
140 }
141 return database.DBError{ErrorCode: code, Description: desc, Err: ldbErr}
142 }
143 144 // copySlice returns a copy of the passed slice.
145 //
146 // This is mostly used to copy leveldb iterator keys and values since they are only valid until the iterator is moved
147 // instead of during the entirety of the transaction.
148 func copySlice(slice []byte) []byte {
149 ret := make([]byte, len(slice))
150 copy(ret, slice)
151 return ret
152 }
153 154 // cursor is an internal type used to represent a cursor over key/value pairs and nested buckets of a bucket and
155 // implements the database.Cursor interface.
156 type cursor struct {
157 bucket *bucket
158 dbIter iterator.Iterator
159 pendingIter iterator.Iterator
160 currentIter iterator.Iterator
161 }
162 163 // Enforce cursor implements the database.Cursor interface.
164 var _ database.Cursor = (*cursor)(nil)
165 166 // Bucket returns the bucket the cursor was created for.
167 //
168 // This function is part of the database.Cursor interface implementation.
169 func (c *cursor) Bucket() database.Bucket {
170 // Ensure transaction state is valid.
171 if e := c.bucket.tx.checkClosed(); E.Chk(e) {
172 return nil
173 }
174 return c.bucket
175 }
176 177 // Delete removes the current key/value pair the cursor is at without invalidating the cursor.
178 //
179 // Returns the following errors as required by the interface contract:
180 //
181 // - ErrIncompatibleValue if attempted when the cursor points to a nested bucket
182 //
183 // - ErrTxNotWritable if attempted against a read-only transaction
184 //
185 // - ErrTxClosed if the transaction has already been closed
186 //
187 // This function is part of the database.Cursor interface implementation.
188 func (c *cursor) Delete() (e error) {
189 // Ensure transaction state is valid.
190 if e := c.bucket.tx.checkClosed(); E.Chk(e) {
191 return e
192 }
193 // DBError if the cursor is exhausted.
194 if c.currentIter == nil {
195 str := "cursor is exhausted"
196 return makeDbErr(database.ErrIncompatibleValue, str, nil)
197 }
198 // Do not allow buckets to be deleted via the cursor.
199 key := c.currentIter.Key()
200 if bytes.HasPrefix(key, bucketIndexPrefix) {
201 str := "buckets may not be deleted from a cursor"
202 return makeDbErr(database.ErrIncompatibleValue, str, nil)
203 }
204 c.bucket.tx.deleteKey(copySlice(key), true)
205 return nil
206 }
207 208 // skipPendingUpdates skips any keys at the current database iterator position that are being updated by the
209 // transaction.
210 //
211 // The forwards flag indicates the direction the cursor is moving.
212 func (c *cursor) skipPendingUpdates(forwards bool) {
213 for c.dbIter.Valid() {
214 var skip bool
215 key := c.dbIter.Key()
216 if c.bucket.tx.pendingRemove.Has(key) {
217 skip = true
218 } else if c.bucket.tx.pendingKeys.Has(key) {
219 skip = true
220 }
221 if !skip {
222 break
223 }
224 if forwards {
225 c.dbIter.Next()
226 } else {
227 c.dbIter.Prev()
228 }
229 }
230 }
231 232 // chooseIterator first skips any entries in the database iterator that are being updated by the transaction and sets
233 // the current iterator to the appropriate iterator depending on their validity and the order they compare in while
234 // taking into account the direction flag.
235 //
236 // When the cursor is being moved forwards and both iterators are valid, the iterator with the smaller key is chosen and
237 // vice versa when the cursor is being moved backwards.
238 func (c *cursor) chooseIterator(forwards bool) bool {
239 // Skip any keys at the current database iterator position that are being updated by the transaction.
240 c.skipPendingUpdates(forwards)
241 // When both iterators are exhausted, the cursor is exhausted too.
242 if !c.dbIter.Valid() && !c.pendingIter.Valid() {
243 c.currentIter = nil
244 return false
245 }
246 // Choose the database iterator when the pending keys iterator is exhausted.
247 if !c.pendingIter.Valid() {
248 c.currentIter = c.dbIter
249 return true
250 }
251 // Choose the pending keys iterator when the database iterator is exhausted.
252 if !c.dbIter.Valid() {
253 c.currentIter = c.pendingIter
254 return true
255 }
256 // Both iterators are valid, so choose the iterator with either the smaller or larger key depending on the forwards
257 // flag.
258 compare := bytes.Compare(c.dbIter.Key(), c.pendingIter.Key())
259 if (forwards && compare > 0) || (!forwards && compare < 0) {
260 c.currentIter = c.pendingIter
261 } else {
262 c.currentIter = c.dbIter
263 }
264 return true
265 }
266 267 // First positions the cursor at the first key/value pair and returns whether or not the pair exists. This function is
268 // part of the database.
269 //
270 // Cursor interface implementation.
271 func (c *cursor) First() bool {
272 // Ensure transaction state is valid.
273 if e := c.bucket.tx.checkClosed(); E.Chk(e) {
274 return false
275 }
276 // Seek to the first key in both the database and pending iterators and choose the iterator that is both valid and
277 // has the smaller key.
278 c.dbIter.First()
279 c.pendingIter.First()
280 return c.chooseIterator(true)
281 }
282 283 // Last positions the cursor at the last key/value pair and returns whether or not the pair exists. This function is
284 // part of the database.
285 //
286 // Cursor interface implementation.
287 func (c *cursor) Last() bool {
288 // Ensure transaction state is valid.
289 if e := c.bucket.tx.checkClosed(); E.Chk(e) {
290 return false
291 }
292 // Seek to the last key in both the database and pending iterators and choose the iterator that is both valid and
293 // has the larger key.
294 c.dbIter.Last()
295 c.pendingIter.Last()
296 return c.chooseIterator(false)
297 }
298 299 // Next moves the cursor one key/value pair forward and returns whether or not the pair exists. This function is part of
300 // the database.
301 //
302 // Cursor interface implementation.
303 func (c *cursor) Next() bool {
304 // Ensure transaction state is valid.
305 if e := c.bucket.tx.checkClosed(); E.Chk(e) {
306 return false
307 }
308 // Nothing to return if cursor is exhausted.
309 if c.currentIter == nil {
310 return false
311 }
312 // Move the current iterator to the next entry and choose the iterator that is both valid and has the smaller key.
313 c.currentIter.Next()
314 return c.chooseIterator(true)
315 }
316 317 // Prev moves the cursor one key/value pair backward and returns whether or not the pair exists. This function is part
318 // of the database.
319 //
320 // Cursor interface implementation.
321 func (c *cursor) Prev() bool {
322 // Ensure transaction state is valid.
323 if e := c.bucket.tx.checkClosed(); E.Chk(e) {
324 return false
325 }
326 // Nothing to return if cursor is exhausted.
327 if c.currentIter == nil {
328 return false
329 }
330 // Move the current iterator to the previous entry and choose the iterator that is both valid and has the larger
331 // key.
332 c.currentIter.Prev()
333 return c.chooseIterator(false)
334 }
335 336 // Seek positions the cursor at the first key/value pair that is greater than or equal to the passed seek key.
337 //
338 // Returns false if no suitable key was found.
339 //
340 // This function is part of the database.Cursor interface implementation.
341 func (c *cursor) Seek(seek []byte) bool {
342 // Ensure transaction state is valid.
343 if e := c.bucket.tx.checkClosed(); E.Chk(e) {
344 return false
345 }
346 // Seek to the provided key in both the database and pending iterators then choose the iterator that is both valid
347 // and has the larger key.
348 seekKey := bucketizedKey(c.bucket.id, seek)
349 c.dbIter.Seek(seekKey)
350 c.pendingIter.Seek(seekKey)
351 return c.chooseIterator(true)
352 }
353 354 // rawKey returns the current key the cursor is pointing to without stripping the current bucket prefix or bucket index
355 // prefix.
356 func (c *cursor) rawKey() []byte {
357 // Nothing to return if cursor is exhausted.
358 if c.currentIter == nil {
359 return nil
360 }
361 return copySlice(c.currentIter.Key())
362 }
363 364 // Key returns the current key the cursor is pointing to.
365 //
366 // This function is part of the database.Cursor interface implementation.
367 func (c *cursor) Key() []byte {
368 // Ensure transaction state is valid.
369 if e := c.bucket.tx.checkClosed(); E.Chk(e) {
370 return nil
371 }
372 // Nothing to return if cursor is exhausted.
373 if c.currentIter == nil {
374 return nil
375 }
376 // Slice out the actual key name and make a copy since it is no longer valid after iterating to the next item.
377 //
378 // The key is after the bucket index prefix and parent ID when the cursor is pointing to a nested bucket.
379 key := c.currentIter.Key()
380 if bytes.HasPrefix(key, bucketIndexPrefix) {
381 key = key[len(bucketIndexPrefix)+4:]
382 return copySlice(key)
383 }
384 // The key is after the bucket ID when the cursor is pointing to a normal entry.
385 key = key[len(c.bucket.id):]
386 return copySlice(key)
387 }
388 389 // rawValue returns the current value the cursor is pointing to without stripping without filtering bucket index values.
390 func (c *cursor) rawValue() []byte {
391 // Nothing to return if cursor is exhausted.
392 if c.currentIter == nil {
393 return nil
394 }
395 return copySlice(c.currentIter.Value())
396 }
397 398 // Value returns the current value the cursor is pointing to.
399 //
400 // This will be nil for nested buckets.
401 //
402 // This function is part of the database.Cursor interface implementation.
403 func (c *cursor) Value() []byte {
404 // Ensure transaction state is valid.
405 if e := c.bucket.tx.checkClosed(); E.Chk(e) {
406 return nil
407 }
408 // Nothing to return if cursor is exhausted.
409 if c.currentIter == nil {
410 return nil
411 }
412 // Return nil for the value when the cursor is pointing to a nested
413 // bucket.
414 if bytes.HasPrefix(c.currentIter.Key(), bucketIndexPrefix) {
415 return nil
416 }
417 return copySlice(c.currentIter.Value())
418 }
419 420 // cursorType defines the type of cursor to create.
421 type cursorType int
422 423 // The following constants define the allowed cursor types.
424 const (
425 // ctKeys iterates through all of the keys in a given bucket.
426 ctKeys cursorType = iota
427 // ctBuckets iterates through all directly nested buckets in a given bucket.
428 ctBuckets
429 // ctFull iterates through both the keys and the directly nested buckets
430 // in a given bucket.
431 ctFull
432 )
433 434 // cursorFinalizer is either invoked when a cursor is being garbage collected or called manually to ensure the
435 // underlying cursor iterators are released.
436 func cursorFinalizer(c *cursor) {
437 c.dbIter.Release()
438 c.pendingIter.Release()
439 }
440 441 // newCursor returns a new cursor for the given bucket, bucket ID, and cursor type.
442 //
443 // NOTE: The caller is responsible for calling the cursorFinalizer function on the returned cursor.
444 func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor {
445 var dbIter, pendingIter iterator.Iterator
446 switch cursorTyp {
447 case ctKeys:
448 keyRange := util.BytesPrefix(bucketID)
449 dbIter = b.tx.snapshot.NewIterator(keyRange)
450 pendingKeyIter := newLdbTreapIter(b.tx, keyRange)
451 pendingIter = pendingKeyIter
452 case ctBuckets:
453 // The serialized bucket index key format is:
454 //
455 // <bucketindexprefix><parentbucketid><bucketname>
456 //
457 // Create an iterator for the both the database and the pending keys which are prefixed by the bucket index
458 // identifier and the provided bucket ID.
459 prefix := make([]byte, len(bucketIndexPrefix)+4)
460 copy(prefix, bucketIndexPrefix)
461 copy(prefix[len(bucketIndexPrefix):], bucketID)
462 bucketRange := util.BytesPrefix(prefix)
463 dbIter = b.tx.snapshot.NewIterator(bucketRange)
464 pendingBucketIter := newLdbTreapIter(b.tx, bucketRange)
465 pendingIter = pendingBucketIter
466 case ctFull:
467 fallthrough
468 default:
469 // The serialized bucket index key format is:
470 //
471 // <bucketindexprefix><parentbucketid><bucketname>
472 prefix := make([]byte, len(bucketIndexPrefix)+4)
473 copy(prefix, bucketIndexPrefix)
474 copy(prefix[len(bucketIndexPrefix):], bucketID)
475 bucketRange := util.BytesPrefix(prefix)
476 keyRange := util.BytesPrefix(bucketID)
477 // Since both keys and buckets are needed from the database, create an individual iterator for each prefix and
478 // then create a merged iterator from them.
479 dbKeyIter := b.tx.snapshot.NewIterator(keyRange)
480 dbBucketIter := b.tx.snapshot.NewIterator(bucketRange)
481 iters := []iterator.Iterator{dbKeyIter, dbBucketIter}
482 dbIter = iterator.NewMergedIterator(
483 iters,
484 comparer.DefaultComparer, true,
485 )
486 // Since both keys and buckets are needed from the pending keys, create an individual iterator for each prefix
487 // and then create a merged iterator from them.
488 pendingKeyIter := newLdbTreapIter(b.tx, keyRange)
489 pendingBucketIter := newLdbTreapIter(b.tx, bucketRange)
490 iters = []iterator.Iterator{pendingKeyIter, pendingBucketIter}
491 pendingIter = iterator.NewMergedIterator(
492 iters,
493 comparer.DefaultComparer, true,
494 )
495 }
496 // Create the cursor using the iterators.
497 return &cursor{bucket: b, dbIter: dbIter, pendingIter: pendingIter}
498 }
499 500 // bucket is an internal type used to represent a collection of key/value pairs and implements the database.Bucket
501 // interface.
502 type bucket struct {
503 tx *transaction
504 id [4]byte
505 }
506 507 // Enforce bucket implements the database.Bucket interface.
508 var _ database.Bucket = (*bucket)(nil)
509 510 // bucketIndexKey returns the actual key to use for storing and retrieving a child bucket in the bucket index.
511 //
512 // This is required because additional information is needed to distinguish nested buckets with the same name.
513 func bucketIndexKey(parentID [4]byte, key []byte) []byte {
514 // The serialized bucket index key format is:
515 //
516 // <bucketindexprefix><parentbucketid><bucketname>
517 indexKey := make([]byte, len(bucketIndexPrefix)+4+len(key))
518 copy(indexKey, bucketIndexPrefix)
519 copy(indexKey[len(bucketIndexPrefix):], parentID[:])
520 copy(indexKey[len(bucketIndexPrefix)+4:], key)
521 return indexKey
522 }
523 524 // bucketizedKey returns the actual key to use for storing and retrieving a key for the provided bucket ID.
525 //
526 // This is required because bucketizing is handled through the use of a unique prefix per bucket.
527 func bucketizedKey(bucketID [4]byte, key []byte) []byte {
528 // The serialized block index key format is:
529 // <bucketid><key>
530 bKey := make([]byte, 4+len(key))
531 copy(bKey, bucketID[:])
532 copy(bKey[4:], key)
533 return bKey
534 }
535 536 // Bucket retrieves a nested bucket with the given key.
537 //
538 // Returns nil if the bucket does not exist.
539 //
540 // This function is part of the database.Bucket interface implementation.
541 func (b *bucket) Bucket(key []byte) database.Bucket {
542 // Ensure transaction state is valid.
543 if e := b.tx.checkClosed(); E.Chk(e) {
544 return nil
545 }
546 // Attempt to fetch the ID for the child bucket. The bucket does not exist if the bucket index entry does not exist.
547 childID := b.tx.fetchKey(bucketIndexKey(b.id, key))
548 if childID == nil {
549 return nil
550 }
551 childBucket := &bucket{tx: b.tx}
552 copy(childBucket.id[:], childID)
553 return childBucket
554 }
555 556 // CreateBucket creates and returns a new nested bucket with the given key.
557 //
558 // Returns the following errors as required by the interface contract:
559 //
560 // - ErrBucketExists if the bucket already exists
561 //
562 // - ErrBucketNameRequired if the key is empty
563 //
564 // - ErrIncompatibleValue if the key is otherwise invalid for the particular implementation
565 //
566 // - ErrTxNotWritable if attempted against a read-only transaction
567 //
568 // - ErrTxClosed if the transaction has already been closed
569 //
570 // This function is part of the database.Bucket interface implementation.
571 func (b *bucket) CreateBucket(key []byte) (database.Bucket, error) {
572 // Ensure transaction state is valid.
573 if e := b.tx.checkClosed(); E.Chk(e) {
574 return nil, e
575 }
576 // Ensure the transaction is writable.
577 if !b.tx.writable {
578 str := "create bucket requires a writable database transaction"
579 return nil, makeDbErr(database.ErrTxNotWritable, str, nil)
580 }
581 // Ensure a key was provided.
582 if len(key) == 0 {
583 str := "create bucket requires a key"
584 return nil, makeDbErr(database.ErrBucketNameRequired, str, nil)
585 }
586 // Ensure bucket does not already exist.
587 bidxKey := bucketIndexKey(b.id, key)
588 if b.tx.hasKey(bidxKey) {
589 str := "bucket already exists"
590 return nil, makeDbErr(database.ErrBucketExists, str, nil)
591 }
592 // Find the appropriate next bucket ID to use for the new bucket. In the case of the special internal block index,
593 // keep the fixed ID.
594 var childID [4]byte
595 if b.id == metadataBucketID && bytes.Equal(key, blockIdxBucketName) {
596 childID = blockIdxBucketID
597 } else {
598 var e error
599 childID, e = b.tx.nextBucketID()
600 if e != nil {
601 return nil, e
602 }
603 }
604 // Add the new bucket to the bucket index.
605 if e := b.tx.putKey(bidxKey, childID[:]); E.Chk(e) {
606 str := fmt.Sprintf("failed to create bucket with key %q", key)
607 return nil, convertErr(str, e)
608 }
609 return &bucket{tx: b.tx, id: childID}, nil
610 }
611 612 // CreateBucketIfNotExists creates and returns a new nested bucket with the given key if it does not already exist.
613 //
614 // Returns the following errors as required by the interface contract:
615 //
616 // - ErrBucketNameRequired if the key is empty
617 //
618 // - ErrIncompatibleValue if the key is otherwise invalid for the particular implementation
619 //
620 // - ErrTxNotWritable if attempted against a read-only transaction
621 //
622 // - ErrTxClosed if the transaction has already been closed
623 //
624 // This function is part of the database.Bucket interface implementation.
625 func (b *bucket) CreateBucketIfNotExists(key []byte) (database.Bucket, error) {
626 // Ensure transaction state is valid.
627 if e := b.tx.checkClosed(); E.Chk(e) {
628 return nil, e
629 }
630 // Ensure the transaction is writable.
631 if !b.tx.writable {
632 str := "create bucket requires a writable database transaction"
633 return nil, makeDbErr(database.ErrTxNotWritable, str, nil)
634 }
635 // Return existing bucket if it already exists, otherwise create it.
636 if bucket := b.Bucket(key); bucket != nil {
637 return bucket, nil
638 }
639 return b.CreateBucket(key)
640 }
641 642 // DeleteBucket removes a nested bucket with the given key.
643 //
644 // Returns the following errors as required by the interface contract:
645 //
646 // - ErrBucketNotFound if the specified bucket does not exist
647 //
648 // - ErrTxNotWritable if attempted against a read-only transaction
649 //
650 // - ErrTxClosed if the transaction has already been closed
651 //
652 // This function is part of the database.Bucket interface implementation.
653 func (b *bucket) DeleteBucket(key []byte) (e error) {
654 // Ensure transaction state is valid.
655 if e := b.tx.checkClosed(); E.Chk(e) {
656 return e
657 }
658 // Ensure the transaction is writable.
659 if !b.tx.writable {
660 str := "delete bucket requires a writable database transaction"
661 return makeDbErr(database.ErrTxNotWritable, str, nil)
662 }
663 // Attempt to fetch the ID for the child bucket.
664 //
665 // The bucket does not exist if the bucket index entry does not exist.
666 //
667 // In the case of the special internal block index, keep the fixed ID.
668 bidxKey := bucketIndexKey(b.id, key)
669 childID := b.tx.fetchKey(bidxKey)
670 if childID == nil {
671 str := fmt.Sprintf("bucket %q does not exist", key)
672 return makeDbErr(database.ErrBucketNotFound, str, nil)
673 }
674 // Remove all nested buckets and their keys.
675 childIDs := [][]byte{childID}
676 for len(childIDs) > 0 {
677 childID = childIDs[len(childIDs)-1]
678 childIDs = childIDs[:len(childIDs)-1]
679 // Delete all keys in the nested bucket.
680 keyCursor := newCursor(b, childID, ctKeys)
681 for ok := keyCursor.First(); ok; ok = keyCursor.Next() {
682 b.tx.deleteKey(keyCursor.rawKey(), false)
683 }
684 cursorFinalizer(keyCursor)
685 // Iterate through all nested buckets.
686 bucketCursor := newCursor(b, childID, ctBuckets)
687 for ok := bucketCursor.First(); ok; ok = bucketCursor.Next() {
688 // Push the id of the nested bucket onto the stack for the next
689 // iteration.
690 childID := bucketCursor.rawValue()
691 childIDs = append(childIDs, childID)
692 // Remove the nested bucket from the bucket index.
693 b.tx.deleteKey(bucketCursor.rawKey(), false)
694 }
695 cursorFinalizer(bucketCursor)
696 }
697 // Remove the nested bucket from the bucket index. Any buckets nested under it were already removed above.
698 b.tx.deleteKey(bidxKey, true)
699 return nil
700 }
701 702 // Cursor returns a new cursor, allowing for iteration over the bucket's key/value pairs and nested buckets in forward
703 // or backward order.
704 //
705 // You must seek to a position using the First, Last, or Seek functions before calling the Next, Prev, Key, or value
706 // functions. Failure to do so will result in the same return values as an exhausted cursor, which is false for the Prev
707 // and Next functions and nil for Key and value functions. This function is part of the database.
708 //
709 // Bucket interface implementation.
710 func (b *bucket) Cursor() database.Cursor {
711 // Ensure transaction state is valid.
712 if e := b.tx.checkClosed(); E.Chk(e) {
713 return &cursor{bucket: b}
714 }
715 // Create the cursor and setup a runtime finalizer to ensure the iterators are released when the cursor is garbage
716 // collected.
717 c := newCursor(b, b.id[:], ctFull)
718 runtime.SetFinalizer(c, cursorFinalizer)
719 return c
720 }
721 722 // ForEach invokes the passed function with every key/value pair in the bucket. This does not include nested buckets or
723 // the key/value pairs within those nested buckets.
724 //
725 // WARNING: It is not safe to mutate data while iterating with this method.
726 //
727 // Doing so may cause the underlying cursor to be invalidated and return
728 // unexpected keys and/or values.
729 //
730 // Returns the following errors as required by the interface contract:
731 //
732 // - ErrTxClosed if the transaction has already been closed
733 //
734 // NOTE: The values returned by this function are only valid during a transaction. Attempting to access them after a
735 // transaction has ended will likely result in an access violation. This function is part of the database.Bucket
736 // interface implementation.
737 func (b *bucket) ForEach(fn func(k, v []byte) error) (e error) {
738 // Ensure transaction state is valid.
739 if e := b.tx.checkClosed(); E.Chk(e) {
740 return e
741 }
742 // Invoke the callback for each cursor item. Return the error returned from the callback when it is non-nil.
743 c := newCursor(b, b.id[:], ctKeys)
744 defer cursorFinalizer(c)
745 for ok := c.First(); ok; ok = c.Next() {
746 e := fn(c.Key(), c.Value())
747 if e != nil {
748 return e
749 }
750 }
751 return nil
752 }
753 754 // ForEachBucket invokes the passed function with the key of every nested bucket in the current bucket.
755 //
756 // This does not include any nested buckets within those nested buckets.
757 //
758 // WARNING: It is not safe to mutate data while iterating with this method.
759 //
760 // Doing so may cause the underlying cursor to be invalidated and return unexpected keys.
761 //
762 // Returns the following errors as required by the interface contract:
763 //
764 // - ErrTxClosed if the transaction has already been closed
765 //
766 // NOTE: The values returned by this function are only valid during a transaction. Attempting to access them after a
767 // transaction has ended will likely result in an access violation.
768 //
769 // This function is part of the database.Bucket interface implementation.
770 func (b *bucket) ForEachBucket(fn func(k []byte) error) (e error) {
771 // Ensure transaction state is valid.
772 if e := b.tx.checkClosed(); E.Chk(e) {
773 return e
774 }
775 // Invoke the callback for each cursor item. Return the error returned from the callback when it is non-nil.
776 c := newCursor(b, b.id[:], ctBuckets)
777 defer cursorFinalizer(c)
778 for ok := c.First(); ok; ok = c.Next() {
779 e := fn(c.Key())
780 if e != nil {
781 return e
782 }
783 }
784 return nil
785 }
786 787 // Writable returns whether or not the bucket is writable.
788 //
789 // This function is part of the database.Bucket interface implementation.
790 func (b *bucket) Writable() bool {
791 return b.tx.writable
792 }
793 794 // Put saves the specified key/value pair to the bucket.
795 //
796 // Keys that do not already exist are added and keys that already exist are overwritten.
797 //
798 // Returns the following errors as required by the interface contract:
799 //
800 // - ErrKeyRequired if the key is empty
801 // - ErrIncompatibleValue if the key is the same as an existing bucket
802 // - ErrTxNotWritable if attempted against a read-only transaction
803 // - ErrTxClosed if the transaction has already been closed
804 //
805 // This function is part of the database.Bucket interface implementation.
806 func (b *bucket) Put(key, value []byte) (e error) {
807 // Ensure transaction state is valid.
808 if e := b.tx.checkClosed(); E.Chk(e) {
809 return e
810 }
811 // Ensure the transaction is writable.
812 if !b.tx.writable {
813 str := "setting a key requires a writable database transaction"
814 return makeDbErr(database.ErrTxNotWritable, str, nil)
815 }
816 // Ensure a key was provided.
817 if len(key) == 0 {
818 str := "put requires a key"
819 return makeDbErr(database.ErrKeyRequired, str, nil)
820 }
821 return b.tx.putKey(bucketizedKey(b.id, key), value)
822 }
823 824 // Get returns the value for the given key.
825 //
826 // Returns nil if the key does not exist in this bucket.
827 //
828 // An empty slice is returned for keys that exist but have no value assigned.
829 //
830 // NOTE: The value returned by this function is only valid during a transaction. Attempting to access it after a
831 // transaction has ended results in undefined behavior. Additionally, the value must NOT be modified by the caller.
832 //
833 // This function is part of the database.Bucket interface implementation.
834 func (b *bucket) Get(key []byte) []byte {
835 // Ensure transaction state is valid.
836 if e := b.tx.checkClosed(); E.Chk(e) {
837 return nil
838 }
839 // Nothing to return if there is no key.
840 if len(key) == 0 {
841 return nil
842 }
843 return b.tx.fetchKey(bucketizedKey(b.id, key))
844 }
845 846 // Delete removes the specified key from the bucket.
847 //
848 // Deleting a key that does not exist does not return an error.
849 //
850 // Returns the following errors as required by the interface contract:
851 //
852 // - ErrKeyRequired if the key is empty
853 // - ErrIncompatibleValue if the key is the same as an existing bucket
854 // - ErrTxNotWritable if attempted against a read-only transaction
855 // - ErrTxClosed if the transaction has already been closed
856 //
857 // This function is part of the database.Bucket interface implementation.
858 func (b *bucket) Delete(key []byte) (e error) {
859 // Ensure transaction state is valid.
860 if e := b.tx.checkClosed(); E.Chk(e) {
861 return e
862 }
863 // Ensure the transaction is writable.
864 if !b.tx.writable {
865 str := "deleting a value requires a writable database transaction"
866 return makeDbErr(database.ErrTxNotWritable, str, nil)
867 }
868 // Nothing to do if there is no key.
869 if len(key) == 0 {
870 return nil
871 }
872 b.tx.deleteKey(bucketizedKey(b.id, key), true)
873 return nil
874 }
875 876 // pendingBlock houses a block that will be written to disk when the database transaction is committed.
877 type pendingBlock struct {
878 hash *chainhash.Hash
879 bytes []byte
880 }
881 882 // transaction represents a database transaction.
883 //
884 // It can either be read-only or read-write and implements the database.
885 //
886 // Bucket interface. The transaction provides a root bucket against which all read and writes occur.
887 type transaction struct {
888 managed bool // Is the transaction managed?
889 closed bool // Is the transaction closed?
890 writable bool // Is the transaction writable?
891 db *db // DB instance the tx was created from.
892 snapshot *dbCacheSnapshot // Underlying snapshot for txns.
893 metaBucket *bucket // The root metadata bucket.
894 blockIdxBucket *bucket // The block index bucket.
895 // Blocks that need to be stored on commit.
896 //
897 // The pendingBlocks map is kept to allow quick lookups of pending data by block hash.
898 pendingBlocks map[chainhash.Hash]int
899 pendingBlockData []pendingBlock
900 // Keys that need to be stored or deleted on commit.
901 pendingKeys *treap.Mutable
902 pendingRemove *treap.Mutable
903 // Active iterators that need to be notified when the pending keys have been updated so the cursors can properly
904 // handle updates to the transaction state.
905 activeIterLock sync.RWMutex
906 activeIters []*treap.Iterator
907 }
908 909 // Enforce transaction implements the database.Tx interface.
910 var _ database.Tx = (*transaction)(nil)
911 912 // removeActiveIter removes the passed iterator from the list of active iterators against the pending keys treap.
913 func (tx *transaction) removeActiveIter(iter *treap.Iterator) {
914 // An indexing for loop is intentionally used over a range here as range does not reevaluate the slice on each
915 // iteration nor does it adjust the index for the modified slice.
916 tx.activeIterLock.Lock()
917 for i := 0; i < len(tx.activeIters); i++ {
918 if tx.activeIters[i] == iter {
919 copy(tx.activeIters[i:], tx.activeIters[i+1:])
920 tx.activeIters[len(tx.activeIters)-1] = nil
921 tx.activeIters = tx.activeIters[:len(tx.activeIters)-1]
922 }
923 }
924 tx.activeIterLock.Unlock()
925 }
926 927 // addActiveIter adds the passed iterator to the list of active iterators for the pending keys treap.
928 func (tx *transaction) addActiveIter(iter *treap.Iterator) {
929 tx.activeIterLock.Lock()
930 tx.activeIters = append(tx.activeIters, iter)
931 tx.activeIterLock.Unlock()
932 }
933 934 // notifyActiveIters notifies all of the active iterators for the pending keys treap that it has been updated.
935 func (tx *transaction) notifyActiveIters() {
936 tx.activeIterLock.RLock()
937 for _, iter := range tx.activeIters {
938 iter.ForceReseek()
939 }
940 tx.activeIterLock.RUnlock()
941 }
942 943 // checkClosed returns an error if the the database or transaction is closed.
944 func (tx *transaction) checkClosed() (e error) {
945 // The transaction is no longer valid if it has been closed.
946 if tx.closed {
947 return makeDbErr(database.ErrTxClosed, errTxClosedStr, nil)
948 }
949 return nil
950 }
951 952 // hasKey returns whether or not the provided key exists in the database while taking into account the current
953 // transaction state.
954 func (tx *transaction) hasKey(key []byte) bool {
955 // When the transaction is writable, check the pending transaction state first.
956 if tx.writable {
957 if tx.pendingRemove.Has(key) {
958 return false
959 }
960 if tx.pendingKeys.Has(key) {
961 return true
962 }
963 }
964 // Consult the database cache and underlying database.
965 return tx.snapshot.Has(key)
966 }
967 968 // putKey adds the provided key to the list of keys to be updated in the database when the transaction is committed.
969 //
970 // NOTE: This function must only be called on a writable transaction.
971 //
972 // Since it is an internal helper function, it does not check.
973 func (tx *transaction) putKey(key, value []byte) (e error) {
974 // Prevent the key from being deleted if it was previously scheduled to
975 // be deleted on transaction commit.
976 tx.pendingRemove.Delete(key)
977 // Add the key/value pair to the list to be written on transaction commit.
978 tx.pendingKeys.Put(key, value)
979 tx.notifyActiveIters()
980 return nil
981 }
982 983 // fetchKey attempts to fetch the provided key from the database cache ( and hence underlying database) while taking
984 // into account the current transaction state. Returns nil if the key does not exist.
985 func (tx *transaction) fetchKey(key []byte) []byte {
986 // When the transaction is writable, check the pending transaction state first.
987 if tx.writable {
988 if tx.pendingRemove.Has(key) {
989 return nil
990 }
991 if value := tx.pendingKeys.Get(key); value != nil {
992 return value
993 }
994 }
995 // Consult the database cache and underlying database.
996 return tx.snapshot.Get(key)
997 }
998 999 // deleteKey adds the provided key to the list of keys to be deleted from the database when the transaction is
1000 // committed.
1001 //
1002 // The notify iterators flag is useful to delay notifying iterators about the changes during bulk deletes.
1003 //
1004 // NOTE: This function must only be called on a writable transaction.
1005 //
1006 // Since it is an internal helper function, it does not check.
1007 func (tx *transaction) deleteKey(key []byte, notifyIterators bool) {
1008 // Remove the key from the list of pendings keys to be written on transaction commit if needed.
1009 tx.pendingKeys.Delete(key)
1010 // Add the key to the list to be deleted on transaction commit.
1011 tx.pendingRemove.Put(key, nil)
1012 // Notify the active iterators about the change if the flag is set.
1013 if notifyIterators {
1014 tx.notifyActiveIters()
1015 }
1016 }
1017 1018 // nextBucketID returns the next bucket ID to use for creating a new bucket.
1019 //
1020 // NOTE: This function must only be called on a writable transaction.
1021 //
1022 // Since it is an internal helper function, it does not check.
1023 func (tx *transaction) nextBucketID() ([4]byte, error) {
1024 // Load the currently highest used bucket ID.
1025 curIDBytes := tx.fetchKey(curBucketIDKeyName)
1026 curBucketNum := binary.BigEndian.Uint32(curIDBytes)
1027 // Increment and update the current bucket ID and return it.
1028 var nextBucketID [4]byte
1029 binary.BigEndian.PutUint32(nextBucketID[:], curBucketNum+1)
1030 if e := tx.putKey(curBucketIDKeyName, nextBucketID[:]); E.Chk(e) {
1031 return [4]byte{}, e
1032 }
1033 return nextBucketID, nil
1034 }
1035 1036 // Metadata returns the top-most bucket for all metadata storage.
1037 //
1038 // This function is part of the database.Tx interface implementation.
1039 func (tx *transaction) Metadata() database.Bucket {
1040 return tx.metaBucket
1041 }
1042 1043 // hasBlock returns whether or not a block with the given hash exists.
1044 func (tx *transaction) hasBlock(hash *chainhash.Hash) bool {
1045 // Return true if the block is pending to be written on commit since it exists from the viewpoint of this
1046 // transaction.
1047 if _, exists := tx.pendingBlocks[*hash]; exists {
1048 return true
1049 }
1050 return tx.hasKey(bucketizedKey(blockIdxBucketID, hash[:]))
1051 }
1052 1053 // StoreBlock stores the provided block into the database.
1054 //
1055 // There are no checks to ensure the block connects to a previous block, contains double spends, or any additional
1056 // functionality such as transaction indexing.
1057 //
1058 // It simply stores the block in the database.
1059 //
1060 // Returns the following errors as required by the interface contract:
1061 //
1062 // - ErrBlockExists when the block hash already exists
1063 //
1064 // - ErrTxNotWritable if attempted against a read-only transaction
1065 //
1066 // - ErrTxClosed if the transaction has already been closed
1067 //
1068 // This function is part of the database.Tx interface implementation.
1069 func (tx *transaction) StoreBlock(block *block.Block) (e error) {
1070 // Ensure transaction state is valid.
1071 if e = tx.checkClosed(); E.Chk(e) {
1072 return e
1073 }
1074 // Ensure the transaction is writable.
1075 if !tx.writable {
1076 str := "store block requires a writable database transaction"
1077 return makeDbErr(database.ErrTxNotWritable, str, nil)
1078 }
1079 // Reject the block if it already exists.
1080 blockHash := block.Hash()
1081 if tx.hasBlock(blockHash) {
1082 str := fmt.Sprintf("block %s already exists", blockHash)
1083 return makeDbErr(database.ErrBlockExists, str, nil)
1084 }
1085 blockBytes, e := block.Bytes()
1086 if e != nil {
1087 str := fmt.Sprintf(
1088 "failed to get serialized bytes for block %s",
1089 blockHash,
1090 )
1091 return makeDbErr(database.ErrDriverSpecific, str, e)
1092 }
1093 // Add the block to be stored to the list of pending blocks to store when the transaction is committed.
1094 //
1095 // Also add it to pending blocks map so it is easy to determine the block is pending based on the block hash.
1096 if tx.pendingBlocks == nil {
1097 tx.pendingBlocks = make(map[chainhash.Hash]int)
1098 }
1099 tx.pendingBlocks[*blockHash] = len(tx.pendingBlockData)
1100 tx.pendingBlockData = append(
1101 tx.pendingBlockData, pendingBlock{
1102 hash: blockHash,
1103 bytes: blockBytes,
1104 },
1105 )
1106 // Tracef("added block %s to pending blocks", blockHash)
1107 return nil
1108 }
1109 1110 // HasBlock returns whether or not a block with the given hash exists in the database.
1111 //
1112 // Returns the following errors as required by the interface contract:
1113 //
1114 // - ErrTxClosed if the transaction has already been closed
1115 //
1116 // This function is part of the database.Tx interface implementation.
1117 func (tx *transaction) HasBlock(hash *chainhash.Hash) (bool, error) {
1118 // Ensure transaction state is valid.
1119 if e := tx.checkClosed(); E.Chk(e) {
1120 return false, e
1121 }
1122 return tx.hasBlock(hash), nil
1123 }
1124 1125 // HasBlocks returns whether or not the blocks with the provided hashes exist in the database.
1126 //
1127 // Returns the following errors as required by the interface contract:
1128 //
1129 // - ErrTxClosed if the transaction has already been closed
1130 //
1131 // This function is part of the database.Tx interface implementation.
1132 func (tx *transaction) HasBlocks(hashes []chainhash.Hash) ([]bool, error) {
1133 // Ensure transaction state is valid.
1134 if e := tx.checkClosed(); E.Chk(e) {
1135 return nil, e
1136 }
1137 results := make([]bool, len(hashes))
1138 for i := range hashes {
1139 results[i] = tx.hasBlock(&hashes[i])
1140 }
1141 return results, nil
1142 }
1143 1144 // fetchBlockRow fetches the metadata stored in the block index for the provided hash. It will return ErrBlockNotFound
1145 // if there is no entry.
1146 func (tx *transaction) fetchBlockRow(hash *chainhash.Hash) ([]byte, error) {
1147 blockRow := tx.blockIdxBucket.Get(hash[:])
1148 if blockRow == nil {
1149 str := fmt.Sprintf("block %s does not exist", hash)
1150 return nil, makeDbErr(database.ErrBlockNotFound, str, nil)
1151 }
1152 return blockRow, nil
1153 }
1154 1155 // FetchBlockHeader returns the raw serialized bytes for the block header identified by the given hash.
1156 //
1157 // The raw bytes are in the format returned by Serialize on a wire.BlockHeader.
1158 //
1159 // Returns the following errors as required by the interface contract:
1160 //
1161 // - ErrBlockNotFound if the requested block hash does not exist
1162 //
1163 // - ErrTxClosed if the transaction has already been closed
1164 //
1165 // - ErrCorruption if the database has somehow become corrupted
1166 //
1167 // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a
1168 // transaction has ended results in undefined behavior.
1169 //
1170 // This constraint prevents additional data copies and allows support for memory-mapped database implementations.
1171 //
1172 // This function is part of the database.Tx interface implementation.
1173 func (tx *transaction) FetchBlockHeader(hash *chainhash.Hash) ([]byte, error) {
1174 return tx.FetchBlockRegion(
1175 &database.BlockRegion{
1176 Hash: hash,
1177 Offset: 0,
1178 Len: blockHdrSize,
1179 },
1180 )
1181 }
1182 1183 // FetchBlockHeaders returns the raw serialized bytes for the block headers identified by the given hashes.
1184 //
1185 // The raw bytes are in the format returned by Serialize on a wire.BlockHeader.
1186 //
1187 // Returns the following errors as required by the interface contract:
1188 //
1189 // - ErrBlockNotFound if the any of the requested block hashes do not exist
1190 //
1191 // - ErrTxClosed if the transaction has already been closed
1192 //
1193 // - ErrCorruption if the database has somehow become corrupted
1194 //
1195 // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a
1196 // transaction has ended results in undefined behavior.
1197 //
1198 // This constraint prevents additional data copies and allows support for memory-mapped database implementations.
1199 //
1200 // This function is part of the database.Tx interface implementation.
1201 func (tx *transaction) FetchBlockHeaders(hashes []chainhash.Hash) ([][]byte, error) {
1202 regions := make([]database.BlockRegion, len(hashes))
1203 for i := range hashes {
1204 regions[i].Hash = &hashes[i]
1205 regions[i].Offset = 0
1206 regions[i].Len = blockHdrSize
1207 }
1208 return tx.FetchBlockRegions(regions)
1209 }
1210 1211 // FetchBlock returns the raw serialized bytes for the block identified by the given hash. The raw bytes are in the
1212 // format returned by Serialize on a wire.Block.
1213 //
1214 // Returns the following errors as required by the interface contract:
1215 //
1216 // - ErrBlockNotFound if the requested block hash does not exist
1217 //
1218 // - ErrTxClosed if the transaction has already been closed
1219 //
1220 // - ErrCorruption if the database has somehow become corrupted
1221 //
1222 // In addition, returns ErrDriverSpecific if any failures occur when reading the block files.
1223 //
1224 // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a
1225 // transaction has ended results in undefined behavior.
1226 //
1227 // This constraint prevents additional data copies and allows support for memory-mapped database implementations.
1228 //
1229 // This function is part of the database.Tx interface implementation.
1230 func (tx *transaction) FetchBlock(hash *chainhash.Hash) ([]byte, error) {
1231 // Ensure transaction state is valid.
1232 if e := tx.checkClosed(); E.Chk(e) {
1233 return nil, e
1234 }
1235 // When the block is pending to be written on commit return the bytes from there.
1236 if idx, exists := tx.pendingBlocks[*hash]; exists {
1237 return tx.pendingBlockData[idx].bytes, nil
1238 }
1239 // Lookup the location of the block in the files from the block index.
1240 blockRow, e := tx.fetchBlockRow(hash)
1241 if e != nil {
1242 return nil, e
1243 }
1244 location := deserializeBlockLoc(blockRow)
1245 // Read the block from the appropriate location. The function also performs a checksum over the data to detect data
1246 // corruption.
1247 blockBytes, e := tx.db.store.readBlock(hash, location)
1248 if e != nil {
1249 return nil, e
1250 }
1251 return blockBytes, nil
1252 }
1253 1254 // FetchBlocks returns the raw serialized bytes for the blocks identified by the given hashes.
1255 //
1256 // The raw bytes are in the format returned by Serialize on a wire.Block.
1257 //
1258 // Returns the following errors as required by the interface contract:
1259 //
1260 // - ErrBlockNotFound if any of the requested block hashed do not exist
1261 //
1262 // - ErrTxClosed if the transaction has already been closed
1263 //
1264 // - ErrCorruption if the database has somehow become corrupted
1265 //
1266 // In addition, returns ErrDriverSpecific if any failures occur when reading the block files.
1267 //
1268 // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a
1269 // transaction has ended results in undefined behavior.
1270 //
1271 // This constraint prevents additional data copies and allows support for memory-mapped database implementations.
1272 //
1273 // This function is part of the database.Tx interface implementation.
1274 func (tx *transaction) FetchBlocks(hashes []chainhash.Hash) ([][]byte, error) {
1275 // Ensure transaction state is valid.
1276 if e := tx.checkClosed(); E.Chk(e) {
1277 return nil, e
1278 }
1279 // NOTE: This could check for the existence of all blocks before loading any of them which would be faster in the
1280 // failure case, however callers will not typically be calling this function with invalid values, so optimize for
1281 // the common case. Load the blocks.
1282 blocks := make([][]byte, len(hashes))
1283 for i := range hashes {
1284 var e error
1285 blocks[i], e = tx.FetchBlock(&hashes[i])
1286 if e != nil {
1287 return nil, e
1288 }
1289 }
1290 return blocks, nil
1291 }
1292 1293 // fetchPendingRegion attempts to fetch the provided region from any block which are pending to be written on commit.
1294 //
1295 // It will return nil for the byte slice when the region references a block which is not pending. When the region does
1296 // reference a pending block, it is bounds checked and returns ErrBlockRegionInvalid if invalid.
1297 func (tx *transaction) fetchPendingRegion(region *database.BlockRegion) ([]byte, error) {
1298 // Nothing to do if the block is not pending to be written on commit.
1299 idx, exists := tx.pendingBlocks[*region.Hash]
1300 if !exists {
1301 return nil, nil
1302 }
1303 // Ensure the region is within the bounds of the block.
1304 blockBytes := tx.pendingBlockData[idx].bytes
1305 blockLen := uint32(len(blockBytes))
1306 endOffset := region.Offset + region.Len
1307 if endOffset < region.Offset || endOffset > blockLen {
1308 str := fmt.Sprintf(
1309 "block %s region offset %d, length %d "+
1310 "exceeds block length of %d", region.Hash,
1311 region.Offset, region.Len, blockLen,
1312 )
1313 return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
1314 }
1315 // Return the bytes from the pending block.
1316 return blockBytes[region.Offset:endOffset:endOffset], nil
1317 }
1318 1319 // FetchBlockRegion returns the raw serialized bytes for the given block region.
1320 //
1321 // For example, it is possible to directly extract Bitcoin transactions and/or scripts from a block with this function.
1322 // Depending on the backend implementation, this can provide significant savings by avoiding the need to load entire
1323 // blocks.
1324 //
1325 // The raw bytes are in the format returned by Serialize on a wire.Block and the Offset field in the provided
1326 // BlockRegion is zero-based and relative to the start of the block (byte 0).
1327 //
1328 // Returns the following errors as required by the interface contract:
1329 //
1330 // - ErrBlockNotFound if the requested block hash does not exist
1331 //
1332 // - ErrBlockRegionInvalid if the region exceeds the bounds of the associated block
1333 //
1334 // - ErrTxClosed if the transaction has already been closed
1335 //
1336 // - ErrCorruption if the database has somehow become corrupted
1337 //
1338 // In addition, returns ErrDriverSpecific if any failures occur when reading the block files.
1339 //
1340 // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a
1341 // transaction has ended results in undefined behavior. This constraint prevents additional data copies and allows
1342 // support for memory-mapped database implementations. This function is part of the database.Tx interface
1343 // implementation.
1344 func (tx *transaction) FetchBlockRegion(region *database.BlockRegion) ([]byte, error) {
1345 // Ensure transaction state is valid.
1346 if e := tx.checkClosed(); E.Chk(e) {
1347 return nil, e
1348 }
1349 // When the block is pending to be written on commit return the bytes from there.
1350 if tx.pendingBlocks != nil {
1351 regionBytes, e := tx.fetchPendingRegion(region)
1352 if e != nil {
1353 return nil, e
1354 }
1355 if regionBytes != nil {
1356 return regionBytes, nil
1357 }
1358 }
1359 // Lookup the location of the block in the files from the block index.
1360 blockRow, e := tx.fetchBlockRow(region.Hash)
1361 if e != nil {
1362 return nil, e
1363 }
1364 location := deserializeBlockLoc(blockRow)
1365 // Ensure the region is within the bounds of the block.
1366 endOffset := region.Offset + region.Len
1367 if endOffset < region.Offset || endOffset > location.blockLen {
1368 str := fmt.Sprintf(
1369 "block %s region offset %d, length %d "+
1370 "exceeds block length of %d", region.Hash,
1371 region.Offset, region.Len, location.blockLen,
1372 )
1373 return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
1374 }
1375 // Read the region from the appropriate disk block file.
1376 regionBytes, e := tx.db.store.readBlockRegion(
1377 location, region.Offset,
1378 region.Len,
1379 )
1380 if e != nil {
1381 return nil, e
1382 }
1383 return regionBytes, nil
1384 }
1385 1386 // FetchBlockRegions returns the raw serialized bytes for the given block regions.
1387 //
1388 // For example, it is possible to directly extract Bitcoin transactions and/or scripts from various blocks with this
1389 // function. Depending on the backend implementation, this can provide significant savings by avoiding the need to load
1390 // entire blocks.
1391 //
1392 // The raw bytes are in the format returned by Serialize on a wire.Block and the Offset fields in the provided
1393 // BlockRegions are zero-based and relative to the start of the block (byte 0).
1394 //
1395 // Returns the following errors as required by the interface contract:
1396 //
1397 // - ErrBlockNotFound if any of the request block hashes do not exist
1398 //
1399 // - ErrBlockRegionInvalid if one or more region exceed the bounds of the associated block
1400 //
1401 // - ErrTxClosed if the transaction has already been closed
1402 //
1403 // - ErrCorruption if the database has somehow become corrupted
1404 //
1405 // In addition, returns ErrDriverSpecific if any failures occur when reading the block files.
1406 //
1407 // NOTE: The data returned by this function is only valid during a database transaction. Attempting to access it after a
1408 // transaction has ended results in undefined behavior. This constraint prevents additional data copies and allows
1409 // support for memory-mapped database implementations. This function is part of the database.Tx interface
1410 // implementation.
1411 func (tx *transaction) FetchBlockRegions(regions []database.BlockRegion) ([][]byte, error) {
1412 // Ensure transaction state is valid.
1413 if e := tx.checkClosed(); E.Chk(e) {
1414 return nil, e
1415 }
1416 // NOTE: This could check for the existence of all blocks before deserializing the locations and building up the
1417 // fetch list which would be faster in the failure case, however callers will not typically be calling this function
1418 // with invalid values, so optimize for the common case.
1419 //
1420 // NOTE: A potential optimization here would be to combine adjacent regions to reduce the number of reads.
1421 //
1422 // In order to improve efficiency of loading the bulk data, first grab the block location for all of the requested
1423 // block hashes and txsort the reads by filenum:offset so that all reads are grouped by file and linear within each
1424 // file.
1425 //
1426 // This can result in quite a significant performance increase depending on how spread out the requested hashes are
1427 // by reducing the number of file open/closes and random accesses needed.
1428 //
1429 // The fetchList is intentionally allocated with a cap because some of the regions might be fetched from the pending
1430 // blocks and hence there is no need to fetch those from disk.
1431 blockRegions := make([][]byte, len(regions))
1432 fetchList := make([]bulkFetchData, 0, len(regions))
1433 for i := range regions {
1434 region := ®ions[i]
1435 // When the block is pending to be written on commit grab the bytes from there.
1436 if tx.pendingBlocks != nil {
1437 regionBytes, e := tx.fetchPendingRegion(region)
1438 if e != nil {
1439 return nil, e
1440 }
1441 if regionBytes != nil {
1442 blockRegions[i] = regionBytes
1443 continue
1444 }
1445 }
1446 // Lookup the location of the block in the files from the block index.
1447 blockRow, e := tx.fetchBlockRow(region.Hash)
1448 if e != nil {
1449 return nil, e
1450 }
1451 location := deserializeBlockLoc(blockRow)
1452 // Ensure the region is within the bounds of the block.
1453 endOffset := region.Offset + region.Len
1454 if endOffset < region.Offset || endOffset > location.blockLen {
1455 str := fmt.Sprintf(
1456 "block %s region offset %d, length "+
1457 "%d exceeds block length of %d", region.Hash,
1458 region.Offset, region.Len, location.blockLen,
1459 )
1460 return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
1461 }
1462 fetchList = append(fetchList, bulkFetchData{&location, i})
1463 }
1464 sort.Sort(bulkFetchDataSorter(fetchList))
1465 // Read all of the regions in the fetch list and set the results.
1466 for i := range fetchList {
1467 fetchData := &fetchList[i]
1468 ri := fetchData.replyIndex
1469 region := ®ions[ri]
1470 location := fetchData.blockLocation
1471 regionBytes, e := tx.db.store.readBlockRegion(
1472 *location,
1473 region.Offset, region.Len,
1474 )
1475 if e != nil {
1476 return nil, e
1477 }
1478 blockRegions[ri] = regionBytes
1479 }
1480 return blockRegions, nil
1481 }
1482 1483 // close marks the transaction closed then releases any pending data, the underlying snapshot, the transaction read
1484 // lock, and the write lock when the transaction is writable.
1485 func (tx *transaction) close() {
1486 tx.closed = true
1487 // Clear pending blocks that would have been written on commit.
1488 tx.pendingBlocks = nil
1489 tx.pendingBlockData = nil
1490 // Clear pending keys that would have been written or deleted on commit.
1491 tx.pendingKeys = nil
1492 tx.pendingRemove = nil
1493 // Release the snapshot.
1494 if tx.snapshot != nil {
1495 tx.snapshot.Release()
1496 tx.snapshot = nil
1497 }
1498 tx.db.closeLock.RUnlock()
1499 // Release the writer lock for writable transactions to unblock any other write transaction which are possibly
1500 // waiting.
1501 if tx.writable {
1502 tx.db.writeLock.Unlock()
1503 }
1504 }
1505 1506 // writePendingAndCommit writes pending block data to the flat block files, updates the metadata with their locations as
1507 // well as the new current write location, and commits the metadata to the memory database cache.
1508 //
1509 // It also properly handles rollback in the case of failures.
1510 //
1511 // This function MUST only be called when there is pending data to be written.
1512 func (tx *transaction) writePendingAndCommit() (e error) {
1513 // Save the current block store write position for potential rollback.
1514 //
1515 // These variables are only updated here in this function and there can only be one write transaction active at a
1516 // time, so it's safe to store them for potential rollback.
1517 wc := tx.db.store.writeCursor
1518 wc.RLock()
1519 oldBlkFileNum := wc.curFileNum
1520 oldBlkOffset := wc.curOffset
1521 wc.RUnlock()
1522 // rollback is a closure that is used to rollback all writes to the block files.
1523 rollback := func() {
1524 // Rollback any modifications made to the block files if needed.
1525 tx.db.store.handleRollback(oldBlkFileNum, oldBlkOffset)
1526 }
1527 // Loop through all of the pending blocks to store and write them.
1528 for _, blockData := range tx.pendingBlockData {
1529 // Tracef("storing block %s", blockData.hash)
1530 location, e := tx.db.store.writeBlock(blockData.bytes)
1531 if e != nil {
1532 rollback()
1533 return e
1534 }
1535 // Add a record in the block index for the block. The record includes the location information needed to locate
1536 // the block on the filesystem as well as the block header since they are so commonly needed.
1537 blockRow := serializeBlockLoc(location)
1538 e = tx.blockIdxBucket.Put(blockData.hash[:], blockRow)
1539 if e != nil {
1540 rollback()
1541 return e
1542 }
1543 }
1544 // Update the metadata for the current write file and offset.
1545 writeRow := serializeWriteRow(wc.curFileNum, wc.curOffset)
1546 if e := tx.metaBucket.Put(writeLocKeyName, writeRow); E.Chk(e) {
1547 rollback()
1548 return convertErr("failed to store write cursor", e)
1549 }
1550 // Atomically update the database cache.
1551 //
1552 // The cache automatically handles flushing to the underlying persistent storage database.
1553 return tx.db.cache.commitTx(tx)
1554 }
1555 1556 // Commit commits all changes that have been made to the root metadata bucket and all of its sub-buckets to the database
1557 // cache which is periodically synced to persistent storage. In addition, it commits all new blocks directly to
1558 // persistent storage bypassing the db cache.
1559 //
1560 // Blocks can be rather large so this help increase the amount of cache available for the metadata updates and is safe
1561 // since blocks are immutable.
1562 //
1563 // This function is part of the database.Tx interface implementation.
1564 func (tx *transaction) Commit() (e error) {
1565 // Prevent commits on managed transactions.
1566 if tx.managed {
1567 tx.close()
1568 panic("managed transaction commit not allowed")
1569 }
1570 // Ensure transaction state is valid.
1571 if e := tx.checkClosed(); E.Chk(e) {
1572 return e
1573 }
1574 // Regardless of whether the commit succeeds, the transaction is closed on return.
1575 defer tx.close()
1576 // Ensure the transaction is writable.
1577 if !tx.writable {
1578 str := "Commit requires a writable database transaction"
1579 return makeDbErr(database.ErrTxNotWritable, str, nil)
1580 }
1581 // Write pending data. The function will rollback if any errors occur.
1582 return tx.writePendingAndCommit()
1583 }
1584 1585 // Rollback undoes all changes that have been made to the root bucket and all of its sub-buckets.
1586 //
1587 // This function is part of the database.Tx interface implementation.
1588 func (tx *transaction) Rollback() (e error) {
1589 // Prevent rollbacks on managed transactions.
1590 if tx.managed {
1591 tx.close()
1592 panic("managed transaction rollback not allowed")
1593 }
1594 // Ensure transaction state is valid.
1595 if e := tx.checkClosed(); E.Chk(e) {
1596 return e
1597 }
1598 tx.close()
1599 return nil
1600 }
1601 1602 // db represents a collection of namespaces which are persisted and implements the database.DB interface. All database
1603 // access is performed through transactions which are obtained through the specific Namespace.
1604 type db struct {
1605 writeLock sync.Mutex // Limit to one write transaction at a time.
1606 closeLock sync.RWMutex // Make database close block while txns active.
1607 closed bool // Is the database closed?
1608 store *blockStore // Handles read/writing blocks to flat files.
1609 cache *dbCache // Cache layer which wraps underlying leveldb DB.
1610 }
1611 1612 // Enforce db implements the database.DB interface.
1613 var _ database.DB = (*db)(nil)
1614 1615 // Type returns the database driver type the current database instance was created with. This function is part of the
1616 // database DB interface implementation.
1617 func (db *db) Type() string {
1618 return dbType
1619 }
1620 1621 // begin is the implementation function for the Begin database method.
1622 //
1623 // See its documentation for more details.
1624 //
1625 // This function is only separate because it returns the internal transaction which is used by the managed transaction
1626 // code while the database method returns the interface.
1627 func (db *db) begin(writable bool) (*transaction, error) {
1628 // Whenever a new writable transaction is started, grab the write lock to ensure only a single write transaction can
1629 // be active at the same time.
1630 //
1631 // This lock will not be released until the transaction is closed ( via Rollback or Commit).
1632 if writable {
1633 db.writeLock.Lock()
1634 }
1635 // Whenever a new transaction is started, grab a read lock against the database to ensure Close will wait for the
1636 // transaction to finish.
1637 //
1638 // This lock will not be released until the transaction is closed ( via Rollback or Commit).
1639 db.closeLock.RLock()
1640 if db.closed {
1641 db.closeLock.RUnlock()
1642 if writable {
1643 db.writeLock.Unlock()
1644 }
1645 return nil, makeDbErr(
1646 database.ErrDbNotOpen, errDbNotOpenStr,
1647 nil,
1648 )
1649 }
1650 // Grab a snapshot of the database cache (which in turn also handles the underlying database).
1651 snapshot, e := db.cache.Snapshot()
1652 if e != nil {
1653 db.closeLock.RUnlock()
1654 if writable {
1655 db.writeLock.Unlock()
1656 }
1657 return nil, e
1658 }
1659 // The metadata and block index buckets are internal-only buckets, so they have defined IDs.
1660 tx := &transaction{
1661 writable: writable,
1662 db: db,
1663 snapshot: snapshot,
1664 pendingKeys: treap.NewMutable(),
1665 pendingRemove: treap.NewMutable(),
1666 }
1667 tx.metaBucket = &bucket{tx: tx, id: metadataBucketID}
1668 tx.blockIdxBucket = &bucket{tx: tx, id: blockIdxBucketID}
1669 return tx, nil
1670 }
1671 1672 // Begin starts a transaction which is either read-only or read-write depending
1673 // on the specified flag.
1674 //
1675 // Multiple read-only transactions can be started simultaneously while only a
1676 // single read-write transaction can be started at a time.
1677 //
1678 // The call will block when starting a read-write transaction when one is
1679 // already open.
1680 //
1681 // NOTE: The transaction must be closed by calling Rollback or Commit on it when
1682 // it is no longer needed.
1683 //
1684 // Failure to do so will result in unclaimed memory.
1685 //
1686 // This function is part of the database.DB interface implementation.
1687 func (db *db) Begin(writable bool) (database.Tx, error) {
1688 return db.begin(writable)
1689 }
1690 1691 // rollbackOnPanic rolls the passed transaction back if the code in the calling
1692 // function panics. This is needed since the mutex on a transaction must be
1693 // released and a panic in called code would prevent that from happening.
1694 //
1695 // NOTE: This can only be handled manually for managed transactions since they
1696 // control the life-cycle of the transaction.
1697 //
1698 // As the documentation on Begin calls out, callers opting to use manual
1699 // transactions will have to ensure the transaction is rolled back on panic if
1700 // it desires that functionality as well or the database will fail to close
1701 // since the read-lock will never be released.
1702 func rollbackOnPanic(tx *transaction) {
1703 if err := recover(); err != nil {
1704 tx.managed = false
1705 _ = tx.Rollback()
1706 panic(err)
1707 }
1708 }
1709 1710 // View invokes the passed function in the context of a managed read-only
1711 // transaction with the root bucket for the namespace.
1712 //
1713 // Any errors returned from the user-supplied function are returned from this
1714 // function. This function is part of the database.DB interface implementation.
1715 func (db *db) View(fn func(database.Tx) error) (e error) {
1716 // Start a read-only transaction.
1717 tx, e := db.begin(false)
1718 if e != nil {
1719 return e
1720 }
1721 // Since the user-provided function might panic, ensure the transaction releases
1722 // all mutexes and resources.
1723 //
1724 // There is no guarantee the caller won't use recover and keep going.
1725 //
1726 // Thus, the database must still be in a usable state on panics due to caller
1727 // issues.
1728 defer rollbackOnPanic(tx)
1729 tx.managed = true
1730 e = fn(tx)
1731 tx.managed = false
1732 if e != nil {
1733 // The error is ignored here because nothing was written yet and regardless of a rollback failure, the tx is
1734 // closed now anyways.
1735 _ = tx.Rollback()
1736 return e
1737 }
1738 return tx.Rollback()
1739 }
1740 1741 // Update invokes the passed function in the context of a managed read -write transaction with the root bucket for the
1742 // namespace.
1743 //
1744 // Any errors returned from the user-supplied function will cause the transaction to be rolled back and are returned
1745 // from this function.
1746 //
1747 // Otherwise, the transaction is committed when the user-supplied function returns a nil error. This function is part of
1748 // the database. DB interface implementation.
1749 func (db *db) Update(fn func(database.Tx) error) (e error) {
1750 // Start a read-write transaction.
1751 tx, e := db.begin(true)
1752 if e != nil {
1753 return e
1754 }
1755 // Since the user-provided function might panic, ensure the transaction releases all mutexes and resources.
1756 //
1757 // There is no guarantee the caller won't use recover and keep going.
1758 //
1759 // Thus, the database must still be in a usable state on panics due to caller issues.
1760 defer rollbackOnPanic(tx)
1761 tx.managed = true
1762 e = fn(tx)
1763 tx.managed = false
1764 if e != nil {
1765 // The error is ignored here because nothing was written yet and regardless of a rollback failure, the tx is
1766 // closed now anyways.
1767 _ = tx.Rollback()
1768 return e
1769 }
1770 return tx.Commit()
1771 }
1772 1773 // Close cleanly shuts down the database and syncs all data.
1774 //
1775 // It will block until all database transactions have been finalized ( rolled back or committed). This function is part
1776 // of the database.
1777 //
1778 // DB interface implementation.
1779 func (db *db) Close() (e error) {
1780 // Since all transactions have a read lock on this mutex, this will cause Close to wait for all readers to complete.
1781 db.closeLock.Lock()
1782 defer db.closeLock.Unlock()
1783 if db.closed {
1784 return makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr, nil)
1785 }
1786 db.closed = true
1787 // NOTE: Since the above lock waits for all transactions to finish and prevents any new ones from being started, it
1788 // is safe to flush the cache and clear all state without the individual locks. Close the database cache which will
1789 // flush any existing entries to disk and close the underlying leveldb database.
1790 //
1791 // Any error is saved and returned at the end after the remaining cleanup since the database will be marked closed
1792 // even if this fails given there is no good way for the caller to recover from a failure here anyways.
1793 closeErr := db.cache.Close()
1794 // Close any open flat files that house the blocks.
1795 wc := db.store.writeCursor
1796 if wc.curFile.file != nil {
1797 _ = wc.curFile.file.Close()
1798 wc.curFile.file = nil
1799 }
1800 for _, blockFile := range db.store.openBlockFiles {
1801 _ = blockFile.file.Close()
1802 }
1803 db.store.openBlockFiles = nil
1804 db.store.openBlocksLRU.Init()
1805 db.store.fileNumToLRUElem = nil
1806 return closeErr
1807 }
1808 1809 func // fileExists reports whether the named file or directory exists.
1810 fileExists(name string) bool {
1811 var e error
1812 if _, e = os.Stat(name); E.Chk(e) {
1813 if os.IsNotExist(e) {
1814 return false
1815 }
1816 }
1817 return true
1818 }
1819 1820 // initDB creates the initial buckets and values used by the package.
1821 //
1822 // This is mainly in a separate function for testing purposes.
1823 func initDB(ldb *leveldb.DB) (e error) {
1824 // The starting block file write cursor location is file num 0, offset 0.
1825 batch := new(leveldb.Batch)
1826 batch.Put(
1827 bucketizedKey(metadataBucketID, writeLocKeyName),
1828 serializeWriteRow(0, 0),
1829 )
1830 // Create block index bucket and set the current bucket id.
1831 //
1832 // NOTE: Since buckets are virtualized through the use of prefixes, there is no need to store the bucket index data
1833 // for the metadata bucket in the database. However, the first bucket ID to use does need to account for it to
1834 // ensure there are no key collisions.
1835 batch.Put(
1836 bucketIndexKey(metadataBucketID, blockIdxBucketName),
1837 blockIdxBucketID[:],
1838 )
1839 batch.Put(curBucketIDKeyName, blockIdxBucketID[:])
1840 // Write everything as a single batch.
1841 if e := ldb.Write(batch, nil); E.Chk(e) {
1842 str := fmt.Sprintf(
1843 "failed to initialize metadata database: %v",
1844 e,
1845 )
1846 return convertErr(str, e)
1847 }
1848 return nil
1849 }
1850 1851 // openDB opens the database at the provided path
1852 //
1853 // ErrDbDoesNotExist is returned if the database doesn't exist and the create flag is not set.
1854 func openDB(dbPath string, network wire.BitcoinNet, create bool) (database.DB, error) {
1855 // DBError if the database doesn't exist and the create flag is not set.
1856 metadataDbPath := filepath.Join(dbPath, metadataDbName)
1857 dbExists := fileExists(metadataDbPath)
1858 if !create && !dbExists {
1859 str := fmt.Sprintf("database %q does not exist", metadataDbPath)
1860 return nil, makeDbErr(database.ErrDbDoesNotExist, str, nil)
1861 }
1862 // Ensure the full path to the database exists.
1863 if !dbExists {
1864 // The error can be ignored here since the call to leveldb. OpenFile will fail if the directory couldn't be
1865 // created.
1866 _ = os.MkdirAll(dbPath, 0700)
1867 }
1868 // Open the metadata database (will create it if needed).
1869 opts := opt.Options{
1870 ErrorIfExist: create,
1871 Strict: opt.DefaultStrict,
1872 Compression: opt.NoCompression,
1873 Filter: filter.NewBloomFilter(10),
1874 }
1875 ldb, e := leveldb.OpenFile(metadataDbPath, &opts)
1876 if e != nil {
1877 return nil, convertErr(e.Error(), e)
1878 }
1879 // Create the block store which includes scanning the existing flat block files to find what the current write
1880 // cursor position is according to the data that is actually on disk.
1881 //
1882 // Also create the database cache which wraps the underlying leveldb database to provide write caching.
1883 store := newBlockStore(dbPath, network)
1884 cache := newDbCache(ldb, store, defaultCacheSize, defaultFlushSecs)
1885 pdb := &db{store: store, cache: cache}
1886 // Perform any reconciliation needed between the block and metadata as well as database initialization, if needed.
1887 return reconcileDB(pdb, create)
1888 }
1889