1 package ffldb
2 3 import (
4 "bytes"
5 "fmt"
6 "sync"
7 "time"
8 9 "github.com/btcsuite/goleveldb/leveldb"
10 "github.com/btcsuite/goleveldb/leveldb/iterator"
11 "github.com/btcsuite/goleveldb/leveldb/util"
12 13 "github.com/p9c/p9/pkg/util/treap"
14 )
15 16 const (
17 // defaultCacheSize is the default size for the database cache.
18 defaultCacheSize = 64 * 1024 * 1024
19 // defaultFlushSecs is the default number of seconds to use as a threshold in between database cache flushes when
20 // the cache size has not been exceeded.
21 defaultFlushSecs = 36
22 // ldbBatchHeaderSize is the size of a leveldb batch header which includes the sequence header and record counter.
23 ldbBatchHeaderSize = 12
24 // ldbRecordIKeySize is the size of the ikey used internally by leveldb when appending a record to a batch.
25 ldbRecordIKeySize = 8
26 // These are used to help preallocate space needed for a batch in one allocation instead of letting leveldb itself
27 // constantly grow it.
28 //
29 // This results in far less pressure on the GC and consequently helps prevent the GC from allocating a lot of extra
30 // unneeded space.
31 )
32 33 // ldbCacheIter wraps a treap iterator to provide the additional functionality needed to satisfy the
34 // leveldb iterator.Iterator interface.
35 type ldbCacheIter struct {
36 *treap.Iterator
37 }
38 39 // Enforce ldbCacheIterator implements the leveldb iterator.Iterator interface.
40 var _ iterator.Iterator = (*ldbCacheIter)(nil)
41 42 // Error is only provided to satisfy the iterator interface as there are no errors for this memory-only structure.
43 //
44 // This is part of the leveldb iterator.Iterator interface implementation.
45 func (iter *ldbCacheIter) Error() (e error) {
46 return nil
47 }
48 49 // SetReleaser is only provided to satisfy the iterator interface as there is no need to override it. This is part of
50 // the leveldb iterator.
51 //
52 // Iterator interface implementation.
53 func (iter *ldbCacheIter) SetReleaser(releaser util.Releaser) {
54 }
55 56 // Release is only provided to satisfy the iterator interface.
57 //
58 // This is part of the leveldb iterator.Iterator interface implementation.
59 func (iter *ldbCacheIter) Release() {
60 }
61 62 // newLdbCacheIter creates a new treap iterator for the given slice against the pending keys for the passed cache
63 // snapshot and returns it wrapped in an ldbCacheIter so it can be used as a leveldb iterator.
64 func newLdbCacheIter(snap *dbCacheSnapshot, slice *util.Range) *ldbCacheIter {
65 iter := snap.pendingKeys.Iterator(slice.Start, slice.Limit)
66 return &ldbCacheIter{Iterator: iter}
67 }
68 69 // dbCacheIterator defines an iterator over the key/value pairs in the database cache and underlying database.
70 type dbCacheIterator struct {
71 cacheSnapshot *dbCacheSnapshot
72 dbIter iterator.Iterator
73 cacheIter iterator.Iterator
74 currentIter iterator.Iterator
75 released bool
76 }
77 78 // Enforce dbCacheIterator implements the leveldb iterator.Iterator interface.
79 var _ iterator.Iterator = (*dbCacheIterator)(nil)
80 81 // skipPendingUpdates skips any keys at the current database iterator position that are being updated by the cache.
82 //
83 // The forwards flag indicates the direction the iterator is moving.
84 func (iter *dbCacheIterator) skipPendingUpdates(forwards bool) {
85 for iter.dbIter.Valid() {
86 var skip bool
87 key := iter.dbIter.Key()
88 if iter.cacheSnapshot.pendingRemove.Has(key) {
89 skip = true
90 } else if iter.cacheSnapshot.pendingKeys.Has(key) {
91 skip = true
92 }
93 if !skip {
94 break
95 }
96 if forwards {
97 iter.dbIter.Next()
98 } else {
99 iter.dbIter.Prev()
100 }
101 }
102 }
103 104 // chooseIterator first skips any entries in the database iterator that are being updated by the cache and sets the
105 // current iterator to the appropriate iterator depending on their validity and the order they compare in while taking
106 // into account the direction flag.
107 //
108 // When the iterator is being moved forwards and both iterators are valid, the iterator with the smaller key is chosen
109 // and vice versa when the iterator is being moved backwards.
110 func (iter *dbCacheIterator) chooseIterator(forwards bool) bool {
111 // Skip any keys at the current database iterator position that are being updated by the cache.
112 iter.skipPendingUpdates(forwards)
113 // When both iterators are exhausted, the iterator is exhausted too.
114 if !iter.dbIter.Valid() && !iter.cacheIter.Valid() {
115 iter.currentIter = nil
116 return false
117 }
118 // Choose the database iterator when the cache iterator is exhausted.
119 if !iter.cacheIter.Valid() {
120 iter.currentIter = iter.dbIter
121 return true
122 }
123 // Choose the cache iterator when the database iterator is exhausted.
124 if !iter.dbIter.Valid() {
125 iter.currentIter = iter.cacheIter
126 return true
127 }
128 // Both iterators are valid, so choose the iterator with either the smaller or larger key depending on the forwards
129 // flag.
130 compare := bytes.Compare(iter.dbIter.Key(), iter.cacheIter.Key())
131 if (forwards && compare > 0) || (!forwards && compare < 0) {
132 iter.currentIter = iter.cacheIter
133 } else {
134 iter.currentIter = iter.dbIter
135 }
136 return true
137 }
138 139 // First positions the iterator at the first key/value pair and returns whether or not the pair exists.
140 //
141 // This is part of the leveldb iterator. Iterator interface implementation.
142 func (iter *dbCacheIterator) First() bool {
143 // Seek to the first key in both the database and cache iterators and choose the iterator that is both valid and has
144 // the smaller key.
145 iter.dbIter.First()
146 iter.cacheIter.First()
147 return iter.chooseIterator(true)
148 }
149 150 // Last positions the iterator at the last key/value pair and returns whether or not the pair exists.
151 //
152 // This is part of the leveldb iterator. Iterator interface implementation.
153 func (iter *dbCacheIterator) Last() bool {
154 // Seek to the last key in both the database and cache iterators and choose the iterator that is both valid and has
155 // the larger key.
156 iter.dbIter.Last()
157 iter.cacheIter.Last()
158 return iter.chooseIterator(false)
159 }
160 161 // Next moves the iterator one key/value pair forward and returns whether or not the pair exists.
162 //
163 // This is part of the leveldb iterator. Iterator interface implementation.
164 func (iter *dbCacheIterator) Next() bool {
165 // Nothing to return if cursor is exhausted.
166 if iter.currentIter == nil {
167 return false
168 }
169 // Move the current iterator to the next entry and choose the iterator that is both valid and has the smaller key.
170 iter.currentIter.Next()
171 return iter.chooseIterator(true)
172 }
173 174 // Prev moves the iterator one key/value pair backward and returns whether or not the pair exists.
175 //
176 // This is part of the leveldb iterator. Iterator interface implementation.
177 func (iter *dbCacheIterator) Prev() bool {
178 // Nothing to return if cursor is exhausted.
179 if iter.currentIter == nil {
180 return false
181 }
182 // Move the current iterator to the previous entry and choose the iterator that is both valid and has the larger
183 // key.
184 iter.currentIter.Prev()
185 return iter.chooseIterator(false)
186 }
187 188 // Seek positions the iterator at the first key/value pair that is greater than or equal to the passed seek key.
189 //
190 // Returns false if no suitable key was found.
191 // This is part of the leveldb iterator.Iterator interface implementation.
192 func (iter *dbCacheIterator) Seek(key []byte) bool {
193 // Seek to the provided key in both the database and cache iterators then choose the iterator that is both valid and
194 // has the larger key.
195 iter.dbIter.Seek(key)
196 iter.cacheIter.Seek(key)
197 return iter.chooseIterator(true)
198 }
199 200 // Valid indicates whether the iterator is positioned at a valid key/value pair. It will be considered invalid when the
201 // iterator is newly created or exhausted.
202 //
203 // This is part of the leveldb iterator. Iterator interface implementation.
204 func (iter *dbCacheIterator) Valid() bool {
205 return iter.currentIter != nil
206 }
207 208 // Key returns the current key the iterator is pointing to.
209 //
210 // This is part of the leveldb iterator.Iterator interface implementation.
211 func (iter *dbCacheIterator) Key() []byte {
212 // Nothing to return if iterator is exhausted.
213 if iter.currentIter == nil {
214 return nil
215 }
216 return iter.currentIter.Key()
217 }
218 219 // Value returns the current value the iterator is pointing to.
220 //
221 // This is part of the leveldb iterator.Iterator interface implementation.
222 func (iter *dbCacheIterator) Value() []byte {
223 // Nothing to return if iterator is exhausted.
224 if iter.currentIter == nil {
225 return nil
226 }
227 return iter.currentIter.Value()
228 }
229 230 // SetReleaser is only provided to satisfy the iterator interface as there is
231 // no need to override it.
232 //
233 // This is part of the leveldb iterator. Iterator interface implementation.
234 func (iter *dbCacheIterator) SetReleaser(releaser util.Releaser) {
235 }
236 237 // Release releases the iterator by removing the underlying treap iterator from the list of active iterators against the
238 // pending keys treap.
239 //
240 // This is part of the leveldb iterator.Iterator interface implementation.
241 func (iter *dbCacheIterator) Release() {
242 if !iter.released {
243 iter.dbIter.Release()
244 iter.cacheIter.Release()
245 iter.currentIter = nil
246 iter.released = true
247 }
248 }
249 250 // Error is only provided to satisfy the iterator interface as there are no errors for this memory-only structure.
251 //
252 // This is part of the leveldb iterator.Iterator interface implementation.
253 func (iter *dbCacheIterator) Error() (e error) {
254 return nil
255 }
256 257 // dbCacheSnapshot defines a snapshot of the database cache and underlying database at a particular point in time.
258 type dbCacheSnapshot struct {
259 dbSnapshot *leveldb.Snapshot
260 pendingKeys *treap.Immutable
261 pendingRemove *treap.Immutable
262 }
263 264 // Has returns whether or not the passed key exists.
265 func (snap *dbCacheSnapshot) Has(key []byte) bool {
266 // Chk the cached entries first.
267 if snap.pendingRemove.Has(key) {
268 return false
269 }
270 if snap.pendingKeys.Has(key) {
271 return true
272 }
273 // Consult the database.
274 hasKey, _ := snap.dbSnapshot.Has(key, nil)
275 return hasKey
276 }
277 278 // Get returns the value for the passed key. The function will return nil when the key does not exist.
279 func (snap *dbCacheSnapshot) Get(key []byte) []byte {
280 // Chk the cached entries first.
281 if snap.pendingRemove.Has(key) {
282 return nil
283 }
284 if value := snap.pendingKeys.Get(key); value != nil {
285 return value
286 }
287 // Consult the database.
288 value, e := snap.dbSnapshot.Get(key, nil)
289 if e != nil {
290 // F.Ln(err)
291 return nil
292 }
293 return value
294 }
295 296 // Release releases the snapshot.
297 func (snap *dbCacheSnapshot) Release() {
298 snap.dbSnapshot.Release()
299 snap.pendingKeys = nil
300 snap.pendingRemove = nil
301 }
302 303 // NewIterator returns a new iterator for the snapshot. The newly returned iterator is not pointing to a valid item
304 // until a call to one of the methods to position it is made.
305 //
306 // The slice parameter allows the iterator to be limited to a range of keys.
307 //
308 // The start key is inclusive and the limit key is exclusive. Either or both can be nil if the functionality is not
309 // desired.
310 func (snap *dbCacheSnapshot) NewIterator(slice *util.Range) *dbCacheIterator {
311 return &dbCacheIterator{
312 dbIter: snap.dbSnapshot.NewIterator(slice, nil),
313 cacheIter: newLdbCacheIter(snap, slice),
314 cacheSnapshot: snap,
315 }
316 }
317 318 // dbCache provides a database cache layer backed by an underlying database.
319 //
320 // It allows a maximum cache size and flush interval to be specified such that the cache is flushed to the database when
321 // the cache size exceeds the maximum configured value or it has been longer than the configured interval since the last
322 // flush.
323 //
324 // This effectively provides transaction batching so that callers can commit transactions at will without incurring
325 // large performance hits due to frequent disk syncs.
326 type dbCache struct {
327 // ldb is the underlying leveldb DB for metadata.
328 ldb *leveldb.DB
329 // store is used to sync blocks to flat files.
330 store *blockStore
331 // The following fields are related to flushing the cache to persistent storage.
332 //
333 // Note that all flushing is performed in an opportunistic fashion.
334 //
335 // This means that it is only flushed during a transaction or when the database cache is closed.
336 //
337 // maxSize is the maximum size threshold the cache can grow to before it is flushed.
338 //
339 // flushInterval is the threshold interval of time that is allowed to pass before the cache is flushed.
340 //
341 // lastFlush is the time the cache was last flushed. It is used in conjunction with the current time and the flush
342 // interval.
343 //
344 // NOTE: These flush related fields are protected by the database write lock.
345 maxSize uint64
346 flushInterval time.Duration
347 lastFlush time.Time
348 // The following fields hold the keys that need to be stored or deleted from the underlying database once the cache
349 // is full, enough time has passed, or when the database is shutting down.
350 //
351 // Note that these are stored using immutable treaps to support O(1) MVCC snapshots against the cached data.
352 //
353 // The cacheLock is used to protect concurrent access for cache updates and snapshots.
354 cacheLock sync.RWMutex
355 cachedKeys *treap.Immutable
356 cachedRemove *treap.Immutable
357 }
358 359 // Snapshot returns a snapshot of the database cache and underlying database at a particular point in time.
360 //
361 // The snapshot must be released after use by calling Release.
362 func (c *dbCache) Snapshot() (*dbCacheSnapshot, error) {
363 dbSnapshot, e := c.ldb.GetSnapshot()
364 if e != nil {
365 str := "failed to open transaction"
366 return nil, convertErr(str, e)
367 }
368 // Since the cached keys to be added and removed use an immutable treap, a snapshot is simply obtaining the root of
369 // the tree under the lock which is used to atomically swap the root.
370 c.cacheLock.RLock()
371 cacheSnapshot := &dbCacheSnapshot{
372 dbSnapshot: dbSnapshot,
373 pendingKeys: c.cachedKeys,
374 pendingRemove: c.cachedRemove,
375 }
376 c.cacheLock.RUnlock()
377 return cacheSnapshot, nil
378 }
379 380 // updateDB invokes the passed function in the context of a managed leveldb transaction. Any errors returned from the
381 // user-supplied function will cause the transaction to be rolled back and are returned from this function. Otherwise,
382 // the transaction is committed when the user-supplied function returns a nil error.
383 func (c *dbCache) updateDB(fn func(ldbTx *leveldb.Transaction) error) (e error) {
384 // Start a leveldb transaction.
385 ldbTx, e := c.ldb.OpenTransaction()
386 if e != nil {
387 return convertErr("failed to open ldb transaction", e)
388 }
389 if e := fn(ldbTx); E.Chk(e) {
390 ldbTx.Discard()
391 return e
392 }
393 // Commit the leveldb transaction and convert any errors as needed.
394 if e := ldbTx.Commit(); E.Chk(e) {
395 return convertErr("failed to commit leveldb transaction", e)
396 }
397 return nil
398 }
399 400 // TreapForEacher is an interface which allows iteration of a treap in ascending order using a user-supplied callback
401 // for each key/value pair.
402 //
403 // It mainly exists so both mutable and immutable treaps can be atomically committed to the database with the same
404 // function.
405 type TreapForEacher interface {
406 ForEach(func(k, v []byte) bool)
407 }
408 409 // commitTreaps atomically commits all of the passed pending add/update/remove updates to the underlying database.
410 func (c *dbCache) commitTreaps(pendingKeys, pendingRemove TreapForEacher) (e error) {
411 // Perform all leveldb updates using an atomic transaction.
412 return c.updateDB(
413 func(ldbTx *leveldb.Transaction) (e error) {
414 var innerErr error
415 pendingKeys.ForEach(
416 func(k, v []byte) bool {
417 if dbErr := ldbTx.Put(k, v, nil); dbErr != nil {
418 str := fmt.Sprintf(
419 "failed to put key %q to "+
420 "ldb transaction", k,
421 )
422 innerErr = convertErr(str, dbErr)
423 return false
424 }
425 return true
426 },
427 )
428 if innerErr != nil {
429 return innerErr
430 }
431 pendingRemove.ForEach(
432 func(k, v []byte) bool {
433 if dbErr := ldbTx.Delete(k, nil); dbErr != nil {
434 str := fmt.Sprintf(
435 "failed to delete "+
436 "key %q from ldb transaction",
437 k,
438 )
439 innerErr = convertErr(str, dbErr)
440 return false
441 }
442 return true
443 },
444 )
445 return innerErr
446 },
447 )
448 }
449 450 // flush flushes the database cache to persistent storage.
451 //
452 // This involes syncing the block store and replaying all transactions that have been applied to the cache to the
453 // underlying database.
454 //
455 // This function MUST be called with the database write lock held.
456 func (c *dbCache) flush() (e error) {
457 c.lastFlush = time.Now()
458 // Sync the current write file associated with the block store.
459 //
460 // This is necessary before writing the metadata to prevent the case where the metadata contains information about a
461 // block which actually hasn't been written yet in unexpected shutdown scenarios.
462 if e := c.store.syncBlocks(); E.Chk(e) {
463 return e
464 }
465 // Since the cached keys to be added and removed use an immutable treap, a snapshot is simply obtaining the root of
466 // the tree under the lock which is used to atomically swap the root.
467 c.cacheLock.RLock()
468 cachedKeys := c.cachedKeys
469 cachedRemove := c.cachedRemove
470 c.cacheLock.RUnlock()
471 // Nothing to do if there is no data to flush.
472 if cachedKeys.Len() == 0 && cachedRemove.Len() == 0 {
473 return nil
474 }
475 // Perform all leveldb updates using an atomic transaction.
476 if e := c.commitTreaps(cachedKeys, cachedRemove); E.Chk(e) {
477 return e
478 }
479 // Clear the cache since it has been flushed.
480 c.cacheLock.Lock()
481 c.cachedKeys = treap.NewImmutable()
482 c.cachedRemove = treap.NewImmutable()
483 c.cacheLock.Unlock()
484 D.Ln("synced database to disk")
485 return nil
486 }
487 488 // needsFlush returns whether or not the database cache needs to be flushed to persistent storage based on its current
489 // size, whether or not adding all of the entries in the passed database transaction would cause it to exceed the
490 // configured limit, and how much time has elapsed since the last time the cache was flushed.
491 //
492 // This function MUST be called with the database write lock held.
493 func (c *dbCache) needsFlush(tx *transaction) bool {
494 // A flush is needed when more time has elapsed than the configured flush interval.
495 if time.Since(c.lastFlush) > c.flushInterval {
496 return true
497 }
498 // A flush is needed when the size of the database cache exceeds the specified max cache size.
499 //
500 // The total calculated size is multiplied by 1.
501 //
502 // 5 here to account for additional memory consumption that will be needed during the flush as well as old nodes in
503 // the cache that are referenced by the snapshot used by the transaction.
504 snap := tx.snapshot
505 totalSize := snap.pendingKeys.Size() + snap.pendingRemove.Size()
506 totalSize = uint64(float64(totalSize) * 1.5)
507 return totalSize > c.maxSize
508 }
509 510 // commitTx atomically adds all of the pending keys to add and remove into the database cache.
511 //
512 // When adding the pending keys would cause the size of the cache to exceed the max cache size, or the time since the
513 // last flush exceeds the configured flush interval, the cache will be flushed to the underlying persistent database.
514 //
515 // This is an atomic operation with respect to the cache in that either all of the pending keys to add and remove in the
516 // transaction will be applied or none of them will. The database cache itself might be flushed to the underlying
517 // persistent database even if the transaction fails to apply, but it will only be the state of the cache without the
518 // transaction applied.
519 //
520 // This function MUST be called during a database write transaction which in turn implies the database write lock will
521 // be held.
522 func (c *dbCache) commitTx(tx *transaction) (e error) {
523 // Flush the cache and write the current transaction directly to the database if a flush is needed.
524 if c.needsFlush(tx) {
525 if e := c.flush(); E.Chk(e) {
526 return e
527 }
528 // Perform all leveldb updates using an atomic transaction.
529 e := c.commitTreaps(tx.pendingKeys, tx.pendingRemove)
530 if e != nil {
531 return e
532 }
533 // Clear the transaction entries since they have been committed.
534 tx.pendingKeys = nil
535 tx.pendingRemove = nil
536 return nil
537 }
538 // At this point a database flush is not needed, so atomically commit the transaction to the cache.
539 //
540 // Since the cached keys to be added and removed use an immutable treap, a snapshot is simply obtaining the root of
541 // the tree under the lock which is used to atomically swap the root.
542 c.cacheLock.RLock()
543 newCachedKeys := c.cachedKeys
544 newCachedRemove := c.cachedRemove
545 c.cacheLock.RUnlock()
546 // Apply every key to add in the database transaction to the cache.
547 tx.pendingKeys.ForEach(
548 func(k, v []byte) bool {
549 newCachedRemove = newCachedRemove.Delete(k)
550 newCachedKeys = newCachedKeys.Put(k, v)
551 return true
552 },
553 )
554 tx.pendingKeys = nil
555 // Apply every key to remove in the database transaction to the cache.
556 tx.pendingRemove.ForEach(
557 func(k, v []byte) bool {
558 newCachedKeys = newCachedKeys.Delete(k)
559 newCachedRemove = newCachedRemove.Put(k, nil)
560 return true
561 },
562 )
563 tx.pendingRemove = nil
564 // Atomically replace the immutable treaps which hold the cached keys to add and delete.
565 c.cacheLock.Lock()
566 c.cachedKeys = newCachedKeys
567 c.cachedRemove = newCachedRemove
568 c.cacheLock.Unlock()
569 return nil
570 }
571 572 // Close cleanly shuts down the database cache by syncing all data and closing the underlying leveldb database.
573 //
574 // This function MUST be called with the database write lock held.
575 func (c *dbCache) Close() (e error) {
576 // Flush any outstanding cached entries to disk.
577 if e := c.flush(); E.Chk(e) {
578 // Even if there is an error while flushing, attempt to close the underlying database. The error is ignored
579 // since it would mask the flush error.
580 _ = c.ldb.Close()
581 return e
582 }
583 // Close the underlying leveldb database.
584 if e := c.ldb.Close(); E.Chk(e) {
585 str := "failed to close underlying leveldb database"
586 return convertErr(str, e)
587 }
588 return nil
589 }
590 591 // newDbCache returns a new database cache instance backed by the provided leveldb instance.
592 //
593 // The cache will be flushed to leveldb when the max size exceeds the provided value or it has been longer than the
594 // provided interval since the last flush.
595 func newDbCache(ldb *leveldb.DB, store *blockStore, maxSize uint64, flushIntervalSecs uint32) *dbCache {
596 return &dbCache{
597 ldb: ldb,
598 store: store,
599 maxSize: maxSize,
600 flushInterval: time.Second * time.Duration(flushIntervalSecs),
601 lastFlush: time.Now(),
602 cachedKeys: treap.NewImmutable(),
603 cachedRemove: treap.NewImmutable(),
604 }
605 }
606