dbcache.go raw

   1  package ffldb
   2  
   3  import (
   4  	"bytes"
   5  	"fmt"
   6  	"sync"
   7  	"time"
   8  	
   9  	"github.com/btcsuite/goleveldb/leveldb"
  10  	"github.com/btcsuite/goleveldb/leveldb/iterator"
  11  	"github.com/btcsuite/goleveldb/leveldb/util"
  12  	
  13  	"github.com/p9c/p9/pkg/util/treap"
  14  )
  15  
  16  const (
  17  	// defaultCacheSize is the default size for the database cache.
  18  	defaultCacheSize = 64 * 1024 * 1024
  19  	// defaultFlushSecs is the default number of seconds to use as a threshold in between database cache flushes when
  20  	// the cache size has not been exceeded.
  21  	defaultFlushSecs = 36
  22  	// ldbBatchHeaderSize is the size of a leveldb batch header which includes the sequence header and record counter.
  23  	ldbBatchHeaderSize = 12
  24  	// ldbRecordIKeySize is the size of the ikey used internally by leveldb when appending a record to a batch.
  25  	ldbRecordIKeySize = 8
  26  	// These are used to help preallocate space needed for a batch in one allocation instead of letting leveldb itself
  27  	// constantly grow it.
  28  	//
  29  	// This results in far less pressure on the GC and consequently helps prevent the GC from allocating a lot of extra
  30  	// unneeded space.
  31  )
  32  
  33  // ldbCacheIter wraps a treap iterator to provide the additional functionality needed to satisfy the
  34  // leveldb iterator.Iterator interface.
  35  type ldbCacheIter struct {
  36  	*treap.Iterator
  37  }
  38  
  39  // Enforce ldbCacheIterator implements the leveldb iterator.Iterator interface.
  40  var _ iterator.Iterator = (*ldbCacheIter)(nil)
  41  
  42  // Error is only provided to satisfy the iterator interface as there are no errors for this memory-only structure.
  43  //
  44  // This is part of the leveldb iterator.Iterator interface implementation.
  45  func (iter *ldbCacheIter) Error() (e error) {
  46  	return nil
  47  }
  48  
  49  // SetReleaser is only provided to satisfy the iterator interface as there is no need to override it. This is part of
  50  // the leveldb iterator.
  51  //
  52  // Iterator interface implementation.
  53  func (iter *ldbCacheIter) SetReleaser(releaser util.Releaser) {
  54  }
  55  
  56  // Release is only provided to satisfy the iterator interface.
  57  //
  58  // This is part of the leveldb iterator.Iterator interface implementation.
  59  func (iter *ldbCacheIter) Release() {
  60  }
  61  
  62  // newLdbCacheIter creates a new treap iterator for the given slice against the pending keys for the passed cache
  63  // snapshot and returns it wrapped in an ldbCacheIter so it can be used as a leveldb iterator.
  64  func newLdbCacheIter(snap *dbCacheSnapshot, slice *util.Range) *ldbCacheIter {
  65  	iter := snap.pendingKeys.Iterator(slice.Start, slice.Limit)
  66  	return &ldbCacheIter{Iterator: iter}
  67  }
  68  
  69  // dbCacheIterator defines an iterator over the key/value pairs in the database cache and underlying database.
  70  type dbCacheIterator struct {
  71  	cacheSnapshot *dbCacheSnapshot
  72  	dbIter        iterator.Iterator
  73  	cacheIter     iterator.Iterator
  74  	currentIter   iterator.Iterator
  75  	released      bool
  76  }
  77  
  78  // Enforce dbCacheIterator implements the leveldb iterator.Iterator interface.
  79  var _ iterator.Iterator = (*dbCacheIterator)(nil)
  80  
  81  // skipPendingUpdates skips any keys at the current database iterator position that are being updated by the cache.
  82  //
  83  // The forwards flag indicates the direction the iterator is moving.
  84  func (iter *dbCacheIterator) skipPendingUpdates(forwards bool) {
  85  	for iter.dbIter.Valid() {
  86  		var skip bool
  87  		key := iter.dbIter.Key()
  88  		if iter.cacheSnapshot.pendingRemove.Has(key) {
  89  			skip = true
  90  		} else if iter.cacheSnapshot.pendingKeys.Has(key) {
  91  			skip = true
  92  		}
  93  		if !skip {
  94  			break
  95  		}
  96  		if forwards {
  97  			iter.dbIter.Next()
  98  		} else {
  99  			iter.dbIter.Prev()
 100  		}
 101  	}
 102  }
 103  
 104  // chooseIterator first skips any entries in the database iterator that are being updated by the cache and sets the
 105  // current iterator to the appropriate iterator depending on their validity and the order they compare in while taking
 106  // into account the direction flag.
 107  //
 108  // When the iterator is being moved forwards and both iterators are valid, the iterator with the smaller key is chosen
 109  // and vice versa when the iterator is being moved backwards.
 110  func (iter *dbCacheIterator) chooseIterator(forwards bool) bool {
 111  	// Skip any keys at the current database iterator position that are being updated by the cache.
 112  	iter.skipPendingUpdates(forwards)
 113  	// When both iterators are exhausted, the iterator is exhausted too.
 114  	if !iter.dbIter.Valid() && !iter.cacheIter.Valid() {
 115  		iter.currentIter = nil
 116  		return false
 117  	}
 118  	// Choose the database iterator when the cache iterator is exhausted.
 119  	if !iter.cacheIter.Valid() {
 120  		iter.currentIter = iter.dbIter
 121  		return true
 122  	}
 123  	// Choose the cache iterator when the database iterator is exhausted.
 124  	if !iter.dbIter.Valid() {
 125  		iter.currentIter = iter.cacheIter
 126  		return true
 127  	}
 128  	// Both iterators are valid, so choose the iterator with either the smaller or larger key depending on the forwards
 129  	// flag.
 130  	compare := bytes.Compare(iter.dbIter.Key(), iter.cacheIter.Key())
 131  	if (forwards && compare > 0) || (!forwards && compare < 0) {
 132  		iter.currentIter = iter.cacheIter
 133  	} else {
 134  		iter.currentIter = iter.dbIter
 135  	}
 136  	return true
 137  }
 138  
 139  // First positions the iterator at the first key/value pair and returns whether or not the pair exists.
 140  //
 141  // This is part of the leveldb iterator. Iterator interface implementation.
 142  func (iter *dbCacheIterator) First() bool {
 143  	// Seek to the first key in both the database and cache iterators and choose the iterator that is both valid and has
 144  	// the smaller key.
 145  	iter.dbIter.First()
 146  	iter.cacheIter.First()
 147  	return iter.chooseIterator(true)
 148  }
 149  
 150  // Last positions the iterator at the last key/value pair and returns whether or not the pair exists.
 151  //
 152  // This is part of the leveldb iterator. Iterator interface implementation.
 153  func (iter *dbCacheIterator) Last() bool {
 154  	// Seek to the last key in both the database and cache iterators and choose the iterator that is both valid and has
 155  	// the larger key.
 156  	iter.dbIter.Last()
 157  	iter.cacheIter.Last()
 158  	return iter.chooseIterator(false)
 159  }
 160  
 161  // Next moves the iterator one key/value pair forward and returns whether or not the pair exists.
 162  //
 163  // This is part of the leveldb iterator. Iterator interface implementation.
 164  func (iter *dbCacheIterator) Next() bool {
 165  	// Nothing to return if cursor is exhausted.
 166  	if iter.currentIter == nil {
 167  		return false
 168  	}
 169  	// Move the current iterator to the next entry and choose the iterator that is both valid and has the smaller key.
 170  	iter.currentIter.Next()
 171  	return iter.chooseIterator(true)
 172  }
 173  
 174  // Prev moves the iterator one key/value pair backward and returns whether or not the pair exists.
 175  //
 176  // This is part of the leveldb iterator. Iterator interface implementation.
 177  func (iter *dbCacheIterator) Prev() bool {
 178  	// Nothing to return if cursor is exhausted.
 179  	if iter.currentIter == nil {
 180  		return false
 181  	}
 182  	// Move the current iterator to the previous entry and choose the iterator that is both valid and has the larger
 183  	// key.
 184  	iter.currentIter.Prev()
 185  	return iter.chooseIterator(false)
 186  }
 187  
 188  // Seek positions the iterator at the first key/value pair that is greater than or equal to the passed seek key.
 189  //
 190  // Returns false if no suitable key was found.
 191  // This is part of the leveldb iterator.Iterator interface implementation.
 192  func (iter *dbCacheIterator) Seek(key []byte) bool {
 193  	// Seek to the provided key in both the database and cache iterators then choose the iterator that is both valid and
 194  	// has the larger key.
 195  	iter.dbIter.Seek(key)
 196  	iter.cacheIter.Seek(key)
 197  	return iter.chooseIterator(true)
 198  }
 199  
 200  // Valid indicates whether the iterator is positioned at a valid key/value pair. It will be considered invalid when the
 201  // iterator is newly created or exhausted.
 202  //
 203  // This is part of the leveldb iterator. Iterator interface implementation.
 204  func (iter *dbCacheIterator) Valid() bool {
 205  	return iter.currentIter != nil
 206  }
 207  
 208  // Key returns the current key the iterator is pointing to.
 209  //
 210  // This is part of the leveldb iterator.Iterator interface implementation.
 211  func (iter *dbCacheIterator) Key() []byte {
 212  	// Nothing to return if iterator is exhausted.
 213  	if iter.currentIter == nil {
 214  		return nil
 215  	}
 216  	return iter.currentIter.Key()
 217  }
 218  
 219  // Value returns the current value the iterator is pointing to.
 220  //
 221  // This is part of the leveldb iterator.Iterator interface implementation.
 222  func (iter *dbCacheIterator) Value() []byte {
 223  	// Nothing to return if iterator is exhausted.
 224  	if iter.currentIter == nil {
 225  		return nil
 226  	}
 227  	return iter.currentIter.Value()
 228  }
 229  
 230  // SetReleaser is only provided to satisfy the iterator interface as there is
 231  // no need to override it.
 232  //
 233  // This is part of the leveldb iterator. Iterator interface implementation.
 234  func (iter *dbCacheIterator) SetReleaser(releaser util.Releaser) {
 235  }
 236  
 237  // Release releases the iterator by removing the underlying treap iterator from the list of active iterators against the
 238  // pending keys treap.
 239  //
 240  // This is part of the leveldb iterator.Iterator interface implementation.
 241  func (iter *dbCacheIterator) Release() {
 242  	if !iter.released {
 243  		iter.dbIter.Release()
 244  		iter.cacheIter.Release()
 245  		iter.currentIter = nil
 246  		iter.released = true
 247  	}
 248  }
 249  
 250  // Error is only provided to satisfy the iterator interface as there are no errors for this memory-only structure.
 251  //
 252  // This is part of the leveldb iterator.Iterator interface implementation.
 253  func (iter *dbCacheIterator) Error() (e error) {
 254  	return nil
 255  }
 256  
 257  // dbCacheSnapshot defines a snapshot of the database cache and underlying database at a particular point in time.
 258  type dbCacheSnapshot struct {
 259  	dbSnapshot    *leveldb.Snapshot
 260  	pendingKeys   *treap.Immutable
 261  	pendingRemove *treap.Immutable
 262  }
 263  
 264  // Has returns whether or not the passed key exists.
 265  func (snap *dbCacheSnapshot) Has(key []byte) bool {
 266  	// Chk the cached entries first.
 267  	if snap.pendingRemove.Has(key) {
 268  		return false
 269  	}
 270  	if snap.pendingKeys.Has(key) {
 271  		return true
 272  	}
 273  	// Consult the database.
 274  	hasKey, _ := snap.dbSnapshot.Has(key, nil)
 275  	return hasKey
 276  }
 277  
 278  // Get returns the value for the passed key. The function will return nil when the key does not exist.
 279  func (snap *dbCacheSnapshot) Get(key []byte) []byte {
 280  	// Chk the cached entries first.
 281  	if snap.pendingRemove.Has(key) {
 282  		return nil
 283  	}
 284  	if value := snap.pendingKeys.Get(key); value != nil {
 285  		return value
 286  	}
 287  	// Consult the database.
 288  	value, e := snap.dbSnapshot.Get(key, nil)
 289  	if e != nil {
 290  		// F.Ln(err)
 291  		return nil
 292  	}
 293  	return value
 294  }
 295  
 296  // Release releases the snapshot.
 297  func (snap *dbCacheSnapshot) Release() {
 298  	snap.dbSnapshot.Release()
 299  	snap.pendingKeys = nil
 300  	snap.pendingRemove = nil
 301  }
 302  
 303  // NewIterator returns a new iterator for the snapshot. The newly returned iterator is not pointing to a valid item
 304  // until a call to one of the methods to position it is made.
 305  //
 306  // The slice parameter allows the iterator to be limited to a range of keys.
 307  //
 308  // The start key is inclusive and the limit key is exclusive. Either or both can be nil if the functionality is not
 309  // desired.
 310  func (snap *dbCacheSnapshot) NewIterator(slice *util.Range) *dbCacheIterator {
 311  	return &dbCacheIterator{
 312  		dbIter:        snap.dbSnapshot.NewIterator(slice, nil),
 313  		cacheIter:     newLdbCacheIter(snap, slice),
 314  		cacheSnapshot: snap,
 315  	}
 316  }
 317  
 318  // dbCache provides a database cache layer backed by an underlying database.
 319  //
 320  // It allows a maximum cache size and flush interval to be specified such that the cache is flushed to the database when
 321  // the cache size exceeds the maximum configured value or it has been longer than the configured interval since the last
 322  // flush.
 323  //
 324  // This effectively provides transaction batching so that callers can commit transactions at will without incurring
 325  // large performance hits due to frequent disk syncs.
 326  type dbCache struct {
 327  	// ldb is the underlying leveldb DB for metadata.
 328  	ldb *leveldb.DB
 329  	// store is used to sync blocks to flat files.
 330  	store *blockStore
 331  	// The following fields are related to flushing the cache to persistent storage.
 332  	//
 333  	// Note that all flushing is performed in an opportunistic fashion.
 334  	//
 335  	// This means that it is only flushed during a transaction or when the database cache is closed.
 336  	//
 337  	// maxSize is the maximum size threshold the cache can grow to before it is flushed.
 338  	//
 339  	// flushInterval is the threshold interval of time that is allowed to pass before the cache is flushed.
 340  	//
 341  	// lastFlush is the time the cache was last flushed. It is used in conjunction with the current time and the flush
 342  	// interval.
 343  	//
 344  	// NOTE: These flush related fields are protected by the database write lock.
 345  	maxSize       uint64
 346  	flushInterval time.Duration
 347  	lastFlush     time.Time
 348  	// The following fields hold the keys that need to be stored or deleted from the underlying database once the cache
 349  	// is full, enough time has passed, or when the database is shutting down.
 350  	//
 351  	// Note that these are stored using immutable treaps to support O(1) MVCC snapshots against the cached data.
 352  	//
 353  	// The cacheLock is used to protect concurrent access for cache updates and snapshots.
 354  	cacheLock    sync.RWMutex
 355  	cachedKeys   *treap.Immutable
 356  	cachedRemove *treap.Immutable
 357  }
 358  
 359  // Snapshot returns a snapshot of the database cache and underlying database at a particular point in time.
 360  //
 361  // The snapshot must be released after use by calling Release.
 362  func (c *dbCache) Snapshot() (*dbCacheSnapshot, error) {
 363  	dbSnapshot, e := c.ldb.GetSnapshot()
 364  	if e != nil {
 365  		str := "failed to open transaction"
 366  		return nil, convertErr(str, e)
 367  	}
 368  	// Since the cached keys to be added and removed use an immutable treap, a snapshot is simply obtaining the root of
 369  	// the tree under the lock which is used to atomically swap the root.
 370  	c.cacheLock.RLock()
 371  	cacheSnapshot := &dbCacheSnapshot{
 372  		dbSnapshot:    dbSnapshot,
 373  		pendingKeys:   c.cachedKeys,
 374  		pendingRemove: c.cachedRemove,
 375  	}
 376  	c.cacheLock.RUnlock()
 377  	return cacheSnapshot, nil
 378  }
 379  
 380  // updateDB invokes the passed function in the context of a managed leveldb transaction. Any errors returned from the
 381  // user-supplied function will cause the transaction to be rolled back and are returned from this function. Otherwise,
 382  // the transaction is committed when the user-supplied function returns a nil error.
 383  func (c *dbCache) updateDB(fn func(ldbTx *leveldb.Transaction) error) (e error) {
 384  	// Start a leveldb transaction.
 385  	ldbTx, e := c.ldb.OpenTransaction()
 386  	if e != nil {
 387  		return convertErr("failed to open ldb transaction", e)
 388  	}
 389  	if e := fn(ldbTx); E.Chk(e) {
 390  		ldbTx.Discard()
 391  		return e
 392  	}
 393  	// Commit the leveldb transaction and convert any errors as needed.
 394  	if e := ldbTx.Commit(); E.Chk(e) {
 395  		return convertErr("failed to commit leveldb transaction", e)
 396  	}
 397  	return nil
 398  }
 399  
 400  // TreapForEacher is an interface which allows iteration of a treap in ascending order using a user-supplied callback
 401  // for each key/value pair.
 402  //
 403  // It mainly exists so both mutable and immutable treaps can be atomically committed to the database with the same
 404  // function.
 405  type TreapForEacher interface {
 406  	ForEach(func(k, v []byte) bool)
 407  }
 408  
 409  // commitTreaps atomically commits all of the passed pending add/update/remove updates to the underlying database.
 410  func (c *dbCache) commitTreaps(pendingKeys, pendingRemove TreapForEacher) (e error) {
 411  	// Perform all leveldb updates using an atomic transaction.
 412  	return c.updateDB(
 413  		func(ldbTx *leveldb.Transaction) (e error) {
 414  			var innerErr error
 415  			pendingKeys.ForEach(
 416  				func(k, v []byte) bool {
 417  					if dbErr := ldbTx.Put(k, v, nil); dbErr != nil {
 418  						str := fmt.Sprintf(
 419  							"failed to put key %q to "+
 420  								"ldb transaction", k,
 421  						)
 422  						innerErr = convertErr(str, dbErr)
 423  						return false
 424  					}
 425  					return true
 426  				},
 427  			)
 428  			if innerErr != nil {
 429  				return innerErr
 430  			}
 431  			pendingRemove.ForEach(
 432  				func(k, v []byte) bool {
 433  					if dbErr := ldbTx.Delete(k, nil); dbErr != nil {
 434  						str := fmt.Sprintf(
 435  							"failed to delete "+
 436  								"key %q from ldb transaction",
 437  							k,
 438  						)
 439  						innerErr = convertErr(str, dbErr)
 440  						return false
 441  					}
 442  					return true
 443  				},
 444  			)
 445  			return innerErr
 446  		},
 447  	)
 448  }
 449  
 450  // flush flushes the database cache to persistent storage.
 451  //
 452  // This involes syncing the block store and replaying all transactions that have been applied to the cache to the
 453  // underlying database.
 454  //
 455  // This function MUST be called with the database write lock held.
 456  func (c *dbCache) flush() (e error) {
 457  	c.lastFlush = time.Now()
 458  	// Sync the current write file associated with the block store.
 459  	//
 460  	// This is necessary before writing the metadata to prevent the case where the metadata contains information about a
 461  	// block which actually hasn't been written yet in unexpected shutdown scenarios.
 462  	if e := c.store.syncBlocks(); E.Chk(e) {
 463  		return e
 464  	}
 465  	// Since the cached keys to be added and removed use an immutable treap, a snapshot is simply obtaining the root of
 466  	// the tree under the lock which is used to atomically swap the root.
 467  	c.cacheLock.RLock()
 468  	cachedKeys := c.cachedKeys
 469  	cachedRemove := c.cachedRemove
 470  	c.cacheLock.RUnlock()
 471  	// Nothing to do if there is no data to flush.
 472  	if cachedKeys.Len() == 0 && cachedRemove.Len() == 0 {
 473  		return nil
 474  	}
 475  	// Perform all leveldb updates using an atomic transaction.
 476  	if e := c.commitTreaps(cachedKeys, cachedRemove); E.Chk(e) {
 477  		return e
 478  	}
 479  	// Clear the cache since it has been flushed.
 480  	c.cacheLock.Lock()
 481  	c.cachedKeys = treap.NewImmutable()
 482  	c.cachedRemove = treap.NewImmutable()
 483  	c.cacheLock.Unlock()
 484  	D.Ln("synced database to disk")
 485  	return nil
 486  }
 487  
 488  // needsFlush returns whether or not the database cache needs to be flushed to persistent storage based on its current
 489  // size, whether or not adding all of the entries in the passed database transaction would cause it to exceed the
 490  // configured limit, and how much time has elapsed since the last time the cache was flushed.
 491  //
 492  // This function MUST be called with the database write lock held.
 493  func (c *dbCache) needsFlush(tx *transaction) bool {
 494  	// A flush is needed when more time has elapsed than the configured flush interval.
 495  	if time.Since(c.lastFlush) > c.flushInterval {
 496  		return true
 497  	}
 498  	// A flush is needed when the size of the database cache exceeds the specified max cache size.
 499  	//
 500  	// The total calculated size is multiplied by 1.
 501  	//
 502  	// 5 here to account for additional memory consumption that will be needed during the flush as well as old nodes in
 503  	// the cache that are referenced by the snapshot used by the transaction.
 504  	snap := tx.snapshot
 505  	totalSize := snap.pendingKeys.Size() + snap.pendingRemove.Size()
 506  	totalSize = uint64(float64(totalSize) * 1.5)
 507  	return totalSize > c.maxSize
 508  }
 509  
 510  // commitTx atomically adds all of the pending keys to add and remove into the database cache.
 511  //
 512  // When adding the pending keys would cause the size of the cache to exceed the max cache size, or the time since the
 513  // last flush exceeds the configured flush interval, the cache will be flushed to the underlying persistent database.
 514  //
 515  // This is an atomic operation with respect to the cache in that either all of the pending keys to add and remove in the
 516  // transaction will be applied or none of them will. The database cache itself might be flushed to the underlying
 517  // persistent database even if the transaction fails to apply, but it will only be the state of the cache without the
 518  // transaction applied.
 519  //
 520  // This function MUST be called during a database write transaction which in turn implies the database write lock will
 521  // be held.
 522  func (c *dbCache) commitTx(tx *transaction) (e error) {
 523  	// Flush the cache and write the current transaction directly to the database if a flush is needed.
 524  	if c.needsFlush(tx) {
 525  		if e := c.flush(); E.Chk(e) {
 526  			return e
 527  		}
 528  		// Perform all leveldb updates using an atomic transaction.
 529  		e := c.commitTreaps(tx.pendingKeys, tx.pendingRemove)
 530  		if e != nil {
 531  			return e
 532  		}
 533  		// Clear the transaction entries since they have been committed.
 534  		tx.pendingKeys = nil
 535  		tx.pendingRemove = nil
 536  		return nil
 537  	}
 538  	// At this point a database flush is not needed, so atomically commit the transaction to the cache.
 539  	//
 540  	// Since the cached keys to be added and removed use an immutable treap, a snapshot is simply obtaining the root of
 541  	// the tree under the lock which is used to atomically swap the root.
 542  	c.cacheLock.RLock()
 543  	newCachedKeys := c.cachedKeys
 544  	newCachedRemove := c.cachedRemove
 545  	c.cacheLock.RUnlock()
 546  	// Apply every key to add in the database transaction to the cache.
 547  	tx.pendingKeys.ForEach(
 548  		func(k, v []byte) bool {
 549  			newCachedRemove = newCachedRemove.Delete(k)
 550  			newCachedKeys = newCachedKeys.Put(k, v)
 551  			return true
 552  		},
 553  	)
 554  	tx.pendingKeys = nil
 555  	// Apply every key to remove in the database transaction to the cache.
 556  	tx.pendingRemove.ForEach(
 557  		func(k, v []byte) bool {
 558  			newCachedKeys = newCachedKeys.Delete(k)
 559  			newCachedRemove = newCachedRemove.Put(k, nil)
 560  			return true
 561  		},
 562  	)
 563  	tx.pendingRemove = nil
 564  	// Atomically replace the immutable treaps which hold the cached keys to add and delete.
 565  	c.cacheLock.Lock()
 566  	c.cachedKeys = newCachedKeys
 567  	c.cachedRemove = newCachedRemove
 568  	c.cacheLock.Unlock()
 569  	return nil
 570  }
 571  
 572  // Close cleanly shuts down the database cache by syncing all data and closing the underlying leveldb database.
 573  //
 574  // This function MUST be called with the database write lock held.
 575  func (c *dbCache) Close() (e error) {
 576  	// Flush any outstanding cached entries to disk.
 577  	if e := c.flush(); E.Chk(e) {
 578  		// Even if there is an error while flushing, attempt to close the underlying database. The error is ignored
 579  		// since it would mask the flush error.
 580  		_ = c.ldb.Close()
 581  		return e
 582  	}
 583  	// Close the underlying leveldb database.
 584  	if e := c.ldb.Close(); E.Chk(e) {
 585  		str := "failed to close underlying leveldb database"
 586  		return convertErr(str, e)
 587  	}
 588  	return nil
 589  }
 590  
 591  // newDbCache returns a new database cache instance backed by the provided leveldb instance.
 592  //
 593  // The cache will be flushed to leveldb when the max size exceeds the provided value or it has been longer than the
 594  // provided interval since the last flush.
 595  func newDbCache(ldb *leveldb.DB, store *blockStore, maxSize uint64, flushIntervalSecs uint32) *dbCache {
 596  	return &dbCache{
 597  		ldb:           ldb,
 598  		store:         store,
 599  		maxSize:       maxSize,
 600  		flushInterval: time.Second * time.Duration(flushIntervalSecs),
 601  		lastFlush:     time.Now(),
 602  		cachedKeys:    treap.NewImmutable(),
 603  		cachedRemove:  treap.NewImmutable(),
 604  	}
 605  }
 606