1 package index
2 3 import (
4 "fmt"
5 "github.com/p9c/p9/pkg/block"
6 7 "github.com/p9c/p9/pkg/blockchain"
8 "github.com/p9c/p9/pkg/chainhash"
9 "github.com/p9c/p9/pkg/database"
10 )
11 12 var (
13 // indexTipsBucketName is the name of the db bucket used to house the current tip of each index.
14 indexTipsBucketName = []byte("idxtips")
15 )
16 17 // The index manager tracks the current tip of each index by using a parent bucket that contains an entry for index.
18 //
19 // The serialized format for an index tip is:
20 //
21 // [<block hash><block height>],...
22 // Field Type Size
23 // block hash chainhash.Hash chainhash.HashSize
24 // block height uint32 4 bytes
25 26 // dbPutIndexerTip uses an existing database transaction to update or add the current tip for the given index to the
27 // provided values.
28 func dbPutIndexerTip(dbTx database.Tx, idxKey []byte, hash *chainhash.Hash, height int32) (e error) {
29 serialized := make([]byte, chainhash.HashSize+4)
30 copy(serialized, hash[:])
31 byteOrder.PutUint32(serialized[chainhash.HashSize:], uint32(height))
32 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
33 return indexesBucket.Put(idxKey, serialized)
34 }
35 36 // dbFetchIndexerTip uses an existing database transaction to retrieve the hash and height of the current tip for the
37 // provided index.
38 func dbFetchIndexerTip(dbTx database.Tx, idxKey []byte) (*chainhash.Hash, int32, error) {
39 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
40 serialized := indexesBucket.Get(idxKey)
41 if len(serialized) < chainhash.HashSize+4 {
42 return nil, 0, database.DBError{
43 ErrorCode: database.ErrCorruption,
44 Description: fmt.Sprintf(
45 "unexpected end of data for "+
46 "index %q tip", string(idxKey),
47 ),
48 }
49 }
50 var hash chainhash.Hash
51 copy(hash[:], serialized[:chainhash.HashSize])
52 height := int32(byteOrder.Uint32(serialized[chainhash.HashSize:]))
53 return &hash, height, nil
54 }
55 56 // dbIndexConnectBlock adds all of the index entries associated with the given block using the provided indexer and
57 // updates the tip of the indexer accordingly. An error will be returned if the current tip for the indexer is not the
58 // previous block for the passed block.
59 func dbIndexConnectBlock(
60 dbTx database.Tx, indexer Indexer, block *block.Block,
61 stxo []blockchain.SpentTxOut,
62 ) (e error) {
63 // Assert that the block being connected properly connects to the current tip of the index.
64 idxKey := indexer.Key()
65 var curTipHash *chainhash.Hash
66 if curTipHash, _, e = dbFetchIndexerTip(dbTx, idxKey); E.Chk(e) {
67 return e
68 }
69 if !curTipHash.IsEqual(&block.WireBlock().Header.PrevBlock) {
70 return AssertError(
71 fmt.Sprintf(
72 "dbIndexConnectBlock must be "+
73 "called with a block that extends the current index "+
74 "tip (%s, tip %s, block %s)", indexer.Name(),
75 curTipHash, block.Hash(),
76 ),
77 )
78 }
79 // Notify the indexer with the connected block so it can index it.
80 if e := indexer.ConnectBlock(dbTx, block, stxo); E.Chk(e) {
81 return e
82 }
83 // Update the current index tip.
84 return dbPutIndexerTip(dbTx, idxKey, block.Hash(), block.Height())
85 }
86 87 // dbIndexDisconnectBlock removes all of the index entries associated with the given block using the provided indexer
88 // and updates the tip of the indexer accordingly. An error will be returned if the current tip for the indexer is not
89 // the passed block.
90 func dbIndexDisconnectBlock(
91 dbTx database.Tx, indexer Indexer, block *block.Block,
92 stxo []blockchain.SpentTxOut,
93 ) (e error) {
94 // Assert that the block being disconnected is the current tip of the index.
95 idxKey := indexer.Key()
96 var curTipHash *chainhash.Hash
97 if curTipHash, _, e = dbFetchIndexerTip(dbTx, idxKey); E.Chk(e) {
98 return e
99 }
100 if !curTipHash.IsEqual(block.Hash()) {
101 return AssertError(
102 fmt.Sprintf(
103 "dbIndexDisconnectBlock must "+
104 "be called with the block at the current index tip "+
105 "(%s, tip %s, block %s)", indexer.Name(),
106 curTipHash, block.Hash(),
107 ),
108 )
109 }
110 // Notify the indexer with the disconnected block so it can remove all of
111 // the appropriate entries.
112 if e := indexer.DisconnectBlock(dbTx, block, stxo); E.Chk(e) {
113 return e
114 }
115 // Update the current index tip.
116 prevHash := &block.WireBlock().Header.PrevBlock
117 return dbPutIndexerTip(dbTx, idxKey, prevHash, block.Height()-1)
118 }
119 120 // Manager defines an index manager that manages multiple optional indexes and implements the blockchain. IndexManager
121 // interface so it can be seamlessly plugged into normal chain processing.
122 type Manager struct {
123 db database.DB
124 enabledIndexes []Indexer
125 }
126 127 // Ensure the Manager type implements the blockchain.IndexManager interface.
128 var _ blockchain.IndexManager = (*Manager)(nil)
129 130 // indexDropKey returns the key for an index which indicates it is in the process of being dropped.
131 func indexDropKey(idxKey []byte) []byte {
132 dropKey := make([]byte, len(idxKey)+1)
133 dropKey[0] = 'd'
134 copy(dropKey[1:], idxKey)
135 return dropKey
136 }
137 138 // maybeFinishDrops determines if each of the enabled indexes are in the middle of being dropped and finishes dropping
139 // them when the are. This is necessary because dropping and index has to be done in several atomic steps rather than
140 // one big atomic step due to the massive number of entries.
141 func (m *Manager) maybeFinishDrops(interrupt <-chan struct{}) (e error) {
142 indexNeedsDrop := make([]bool, len(m.enabledIndexes))
143 if e = m.db.View(
144 func(dbTx database.Tx) (e error) {
145 // None of the indexes needs to be dropped if the index tips bucket hasn't been created yet.
146 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
147 if indexesBucket == nil {
148 return nil
149 }
150 // Mark the indexer as requiring a drop if one is already in progress.
151 for i, indexer := range m.enabledIndexes {
152 dropKey := indexDropKey(indexer.Key())
153 if indexesBucket.Get(dropKey) != nil {
154 indexNeedsDrop[i] = true
155 }
156 }
157 return nil
158 },
159 ); E.Chk(e) {
160 return e
161 }
162 if interruptRequested(interrupt) {
163 return errInterruptRequested
164 }
165 // Finish dropping any of the enabled indexes that are already in the
166 // middle of being dropped.
167 for i, indexer := range m.enabledIndexes {
168 if !indexNeedsDrop[i] {
169 continue
170 }
171 I.C(
172 func() string {
173 return fmt.Sprintf("Resuming %s drop", indexer.Name())
174 },
175 )
176 if e = dropIndex(m.db, indexer.Key(), indexer.Name(), interrupt); E.Chk(e) {
177 return e
178 }
179 }
180 return nil
181 }
182 183 // maybeCreateIndexes determines if each of the enabled indexes have already been created and creates them if not.
184 func (m *Manager) maybeCreateIndexes(dbTx database.Tx) (e error) {
185 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
186 for _, indexer := range m.enabledIndexes {
187 // Nothing to do if the index tip already exists.
188 idxKey := indexer.Key()
189 if indexesBucket.Get(idxKey) != nil {
190 continue
191 }
192 // The tip for the index does not exist, so create it and invoke the create callback for the index so it can
193 // perform any one-time initialization it requires.
194 if e := indexer.Create(dbTx); E.Chk(e) {
195 return e
196 }
197 // Set the tip for the index to values which represent an uninitialized index.
198 e := dbPutIndexerTip(dbTx, idxKey, &chainhash.Hash{}, -1)
199 if e != nil {
200 return e
201 }
202 }
203 return nil
204 }
205 206 // Init initializes the enabled indexes. This is called during chain initialization and primarily consists of catching
207 // up all indexes to the current best chain tip. This is necessary since each index can be disabled and re-enabled at
208 // any time and attempting to catch-up indexes at the same time new blocks are being downloaded would lead to an overall
209 // longer time to catch up due to the I/O contention. This is part of the blockchain.IndexManager interface.
210 func (m *Manager) Init(chain *blockchain.BlockChain, interrupt <-chan struct{}) (e error) {
211 // Nothing to do when no indexes are enabled.
212 if len(m.enabledIndexes) == 0 {
213 return nil
214 }
215 if interruptRequested(interrupt) {
216 return errInterruptRequested
217 }
218 // Finish and drops that were previously interrupted.
219 if e = m.maybeFinishDrops(interrupt); E.Chk(e) {
220 return e
221 }
222 // Create the initial state for the indexes as needed.
223 e = m.db.Update(
224 func(dbTx database.Tx) (e error) {
225 // Create the bucket for the current tips as needed.
226 meta := dbTx.Metadata()
227 if _, e = meta.CreateBucketIfNotExists(indexTipsBucketName); E.Chk(e) {
228 return e
229 }
230 return m.maybeCreateIndexes(dbTx)
231 },
232 )
233 if E.Chk(e) {
234 return e
235 }
236 // Initialize each of the enabled indexes.
237 for _, indexer := range m.enabledIndexes {
238 if e = indexer.Init(); E.Chk(e) {
239 return e
240 }
241 }
242 // Rollback indexes to the main chain if their tip is an orphaned fork. This is fairly unlikely, but it can happen
243 // if the chain is reorganized while the index is disabled. This has to be done in reverse order because later
244 // indexes can depend on earlier ones.
245 var height int32
246 var hash *chainhash.Hash
247 for i := len(m.enabledIndexes); i > 0; i-- {
248 indexer := m.enabledIndexes[i-1]
249 // Fetch the current tip for the index.
250 e = m.db.View(
251 func(dbTx database.Tx) (e error) {
252 idxKey := indexer.Key()
253 hash, height, e = dbFetchIndexerTip(dbTx, idxKey)
254 return e
255 },
256 )
257 if e != nil {
258 return e
259 }
260 // Nothing to do if the index does not have any entries yet.
261 if height == -1 {
262 continue
263 }
264 // Loop until the tip is a block that exists in the main chain.
265 initialHeight := height
266 for !chain.MainChainHasBlock(hash) {
267 // At this point the index tip is orphaned, so load the orphaned block from the database directly and
268 // disconnect it from the index. The block has to be loaded directly since it is no longer in the main chain
269 // and thus the chain. BlockByHash function would error.
270 var b *block.Block
271 e = m.db.View(
272 func(dbTx database.Tx) (e error) {
273 blockBytes, e := dbTx.FetchBlock(hash)
274 if e != nil {
275 return e
276 }
277 b, e = block.NewFromBytes(blockBytes)
278 if e != nil {
279 return e
280 }
281 b.SetHeight(height)
282 return e
283 },
284 )
285 if e != nil {
286 return e
287 }
288 // We'll also grab the set of outputs spent by this block so we can remove them from the index.
289 var spentTxos []blockchain.SpentTxOut
290 spentTxos, e = chain.FetchSpendJournal(b)
291 if e != nil {
292 return e
293 }
294 // With the block and stxo set for that block retrieved, we can now update the index itself.
295 e = m.db.Update(
296 func(dbTx database.Tx) (e error) {
297 // Remove all of the index entries associated with the block and update the indexer tip.
298 e = dbIndexDisconnectBlock(
299 dbTx, indexer, b, spentTxos,
300 )
301 if e != nil {
302 return e
303 }
304 // Update the tip to the previous block.
305 hash = &b.WireBlock().Header.PrevBlock
306 height--
307 return nil
308 },
309 )
310 if e != nil {
311 return e
312 }
313 if interruptRequested(interrupt) {
314 return errInterruptRequested
315 }
316 }
317 if initialHeight != height {
318 I.F(
319 "removed %d orphaned blocks from %s (heights %d to %d)",
320 initialHeight-height,
321 indexer.Name(),
322 height+1,
323 initialHeight,
324 )
325 }
326 }
327 // Fetch the current tip heights for each index along with tracking the lowest one so the catchup code only needs to
328 // start at the earliest block and is able to skip connecting the block for the indexes that don't need it.
329 bestHeight := chain.BestSnapshot().Height
330 lowestHeight := bestHeight
331 indexerHeights := make([]int32, len(m.enabledIndexes))
332 e = m.db.View(
333 func(dbTx database.Tx) (e error) {
334 for i, indexer := range m.enabledIndexes {
335 idxKey := indexer.Key()
336 _, height, e := dbFetchIndexerTip(dbTx, idxKey)
337 if e != nil {
338 return e
339 }
340 T.F(
341 "current %s tip (height %d, hash %v)",
342 indexer.Name(),
343 height,
344 hash,
345 )
346 indexerHeights[i] = height
347 if height < lowestHeight {
348 lowestHeight = height
349 }
350 }
351 return nil
352 },
353 )
354 if e != nil {
355 return e
356 }
357 // Nothing to index if all of the indexes are caught up.
358 if lowestHeight == bestHeight {
359 return nil
360 }
361 // Create a progress logger for the indexing process below.
362 progressLogger := newBlockProgressLogger(
363 "Indexed",
364 )
365 // At this point, one or more indexes are behind the current best chain tip and need to be caught up, so log the
366 // details and loop through each block that needs to be indexed.
367 I.F(
368 "catching up indexes from height %d to %d",
369 lowestHeight,
370 bestHeight,
371 )
372 for height := lowestHeight + 1; height <= bestHeight; height++ {
373 // Load the block for the height since it is required to index it.
374 var blk *block.Block
375 blk, e = chain.BlockByHeight(height)
376 if e != nil {
377 return e
378 }
379 if interruptRequested(interrupt) {
380 return errInterruptRequested
381 }
382 // Connect the block for all indexes that need it.
383 var spentTxos []blockchain.SpentTxOut
384 for i, indexer := range m.enabledIndexes {
385 // Skip indexes that don't need to be updated with this block.
386 if indexerHeights[i] >= height {
387 continue
388 }
389 // When the index requires all of the referenced txouts and they haven't been loaded yet, they need to be
390 // retrieved from the spend journal.
391 if spentTxos == nil && indexNeedsInputs(indexer) {
392 spentTxos, e = chain.FetchSpendJournal(blk)
393 if e != nil {
394 return e
395 }
396 }
397 e := m.db.Update(
398 func(dbTx database.Tx) (e error) {
399 return dbIndexConnectBlock(
400 dbTx, indexer, blk, spentTxos,
401 )
402 },
403 )
404 if e != nil {
405 return e
406 }
407 indexerHeights[i] = height
408 }
409 // Log indexing progress.
410 progressLogger.LogBlockHeight(blk)
411 if interruptRequested(interrupt) {
412 return errInterruptRequested
413 }
414 }
415 I.Ln("indexes caught up to height", bestHeight)
416 return nil
417 }
418 419 // indexNeedsInputs returns whether or not the index needs access to the txouts referenced by the transaction inputs
420 // being indexed.
421 func indexNeedsInputs(index Indexer) bool {
422 if idx, ok := index.(NeedsInputser); ok {
423 return idx.NeedsInputs()
424 }
425 return false
426 }
427 428 // // dbFetchTx looks up the passed transaction hash in the transaction index
429 // and loads it from the database.
430 // func dbFetchTx(// dbTx database.Tx, hash *chainhash.Hash) (*wire.MsgTx,
431 // error) {
432 // // Look up the location of the transaction.
433 // blockRegion, e := dbFetchTxIndexEntry(dbTx, hash)
434 // if e != nil {
435 // DB// return nil, e
436 // }
437 // if blockRegion == nil {
438 // return nil, fmt.Errorf("transaction %v not found", hash)
439 // }
440 // // Load the raw transaction bytes from the database.
441 // txBytes, e := dbTx.FetchBlockRegion(blockRegion)
442 // if e != nil {
443 // DB// return nil, e
444 // }
445 // // Deserialize the transaction.
446 // var msgTx wire.MsgTx
447 // e = msgTx.Deserialize(bytes.NewReader(txBytes))
448 // if e != nil {
449 // DB// return nil, e
450 // }
451 // return &msgTx, nil
452 // }
453 454 // ConnectBlock must be invoked when a block is extending the main chain. It keeps track of the state of each index it
455 // is managing, performs some sanity checks, and invokes each indexer. This is part of the blockchain.IndexManager
456 // interface.
457 func (m *Manager) ConnectBlock(
458 dbTx database.Tx, block *block.Block,
459 stxos []blockchain.SpentTxOut,
460 ) (e error) {
461 // Call each of the currently active optional indexes with the block being connected so they can update accordingly.
462 for _, index := range m.enabledIndexes {
463 e := dbIndexConnectBlock(dbTx, index, block, stxos)
464 if e != nil {
465 return e
466 }
467 }
468 return nil
469 }
470 471 // DisconnectBlock must be invoked when a block is being disconnected from the end of the main chain. It keeps track of
472 // the state of each index it is managing, performs some sanity checks, and invokes each indexer to remove the index
473 // entries associated with the block. This is part of the blockchain.IndexManager interface.
474 func (m *Manager) DisconnectBlock(
475 dbTx database.Tx, block *block.Block,
476 stxo []blockchain.SpentTxOut,
477 ) (e error) {
478 // Call each of the currently active optional indexes with the block being disconnected so they can update
479 // accordingly.
480 for _, index := range m.enabledIndexes {
481 e := dbIndexDisconnectBlock(dbTx, index, block, stxo)
482 if e != nil {
483 return e
484 }
485 }
486 return nil
487 }
488 489 // NewManager returns a new index manager with the provided indexes enabled. The manager returned satisfies the
490 // blockchain. IndexManager interface and thus cleanly plugs into the normal blockchain processing path.
491 func NewManager(db database.DB, enabledIndexes []Indexer) *Manager {
492 return &Manager{
493 db: db,
494 enabledIndexes: enabledIndexes,
495 }
496 }
497 498 // dropIndex drops the passed index from the database. Since indexes can be massive, it deletes the index in multiple
499 // database transactions in order to keep memory usage to reasonable levels. It also marks the drop in progress so the
500 // drop can be resumed if it is stopped before it is done before the index can be used again.
501 func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan struct{}) (e error) {
502 // Nothing to do if the index doesn't already exist.
503 var needsDelete bool
504 e = db.View(
505 func(dbTx database.Tx) (e error) {
506 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
507 if indexesBucket != nil && indexesBucket.Get(idxKey) != nil {
508 needsDelete = true
509 }
510 return nil
511 },
512 )
513 if E.Chk(e) {
514 return
515 }
516 if !needsDelete {
517 W.F("not dropping %s because it does not exist", idxName)
518 return
519 }
520 // Mark that the index is in the process of being dropped so that it can be resumed on the next start if interrupted
521 // before the process is complete.
522 I.F("dropping all %s entries. This might take a while...", idxName)
523 e = db.Update(
524 func(dbTx database.Tx) (e error) {
525 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
526 return indexesBucket.Put(indexDropKey(idxKey), idxKey)
527 },
528 )
529 if e != nil {
530 return e
531 }
532 // Since the indexes can be so large, attempting to simply delete the bucket in a single database transaction would
533 // result in massive memory usage and likely crash many systems due to ulimits. In order to avoid this, use a cursor
534 // to delete a maximum number of entries out of the bucket at a time. Recurse buckets depth-first to delete any
535 // sub-buckets.
536 const maxDeletions = 2000000
537 var totalDeleted uint64
538 // Recurse through all buckets in the index, cataloging each for later deletion.
539 var subBuckets [][][]byte
540 var subBucketClosure func(database.Tx, []byte, [][]byte) error
541 subBucketClosure = func(
542 dbTx database.Tx,
543 subBucket []byte, tlBucket [][]byte,
544 ) (e error) {
545 // Get full bucket name and append to subBuckets for later deletion.
546 var bucketName [][]byte
547 if (tlBucket == nil) || (len(tlBucket) == 0) {
548 bucketName = append(bucketName, subBucket)
549 } else {
550 bucketName = append(tlBucket, subBucket)
551 }
552 subBuckets = append(subBuckets, bucketName)
553 // Recurse sub-buckets to append to subBuckets slice.
554 bucket := dbTx.Metadata()
555 for _, subBucketName := range bucketName {
556 bucket = bucket.Bucket(subBucketName)
557 }
558 return bucket.ForEachBucket(
559 func(k []byte) (e error) {
560 return subBucketClosure(dbTx, k, bucketName)
561 },
562 )
563 }
564 // Call subBucketClosure with top-level bucket.
565 e = db.View(
566 func(dbTx database.Tx) (e error) {
567 return subBucketClosure(dbTx, idxKey, nil)
568 },
569 )
570 if e != nil {
571 return nil
572 }
573 // Iterate through each sub-bucket in reverse, deepest-first, deleting all keys inside them and then dropping the
574 // buckets themselves.
575 for i := range subBuckets {
576 bucketName := subBuckets[len(subBuckets)-1-i]
577 // Delete maxDeletions key/value pairs at a time.
578 for numDeleted := maxDeletions; numDeleted == maxDeletions; {
579 numDeleted = 0
580 e = db.Update(
581 func(dbTx database.Tx) (e error) {
582 subBucket := dbTx.Metadata()
583 for _, subBucketName := range bucketName {
584 subBucket = subBucket.Bucket(subBucketName)
585 }
586 cursor := subBucket.Cursor()
587 for ok := cursor.First(); ok; ok = cursor.Next() &&
588 numDeleted < maxDeletions {
589 if e := cursor.Delete(); E.Chk(e) {
590 return e
591 }
592 numDeleted++
593 }
594 return nil
595 },
596 )
597 if e != nil {
598 return e
599 }
600 if numDeleted > 0 {
601 totalDeleted += uint64(numDeleted)
602 I.F(
603 "deleted %d keys (%d total) from %s", numDeleted,
604 totalDeleted, idxName,
605 )
606 }
607 }
608 if interruptRequested(interrupt) {
609 return errInterruptRequested
610 }
611 // Drop the bucket itself.
612 e = db.Update(
613 func(dbTx database.Tx) (e error) {
614 bucket := dbTx.Metadata()
615 for j := 0; j < len(bucketName)-1; j++ {
616 bucket = bucket.Bucket(bucketName[j])
617 }
618 return bucket.DeleteBucket(bucketName[len(bucketName)-1])
619 },
620 )
621 if e != nil {
622 }
623 }
624 // Call extra index specific deinitialization for the transaction index.
625 if idxName == txIndexName {
626 if e = dropBlockIDIndex(db); E.Chk(e) {
627 return e
628 }
629 }
630 // Remove the index tip, index bucket, and in-progress drop flag now that all index entries have been removed.
631 e = db.Update(
632 func(dbTx database.Tx) (e error) {
633 meta := dbTx.Metadata()
634 indexesBucket := meta.Bucket(indexTipsBucketName)
635 if e := indexesBucket.Delete(idxKey); E.Chk(e) {
636 return e
637 }
638 return indexesBucket.Delete(indexDropKey(idxKey))
639 },
640 )
641 if e != nil {
642 return e
643 }
644 I.Ln("dropped", idxName)
645 return nil
646 }
647