1 package indexers
2 3 import (
4 "fmt"
5 "github.com/p9c/p9/pkg/block"
6 7 "github.com/p9c/p9/pkg/blockchain"
8 "github.com/p9c/p9/pkg/chainhash"
9 "github.com/p9c/p9/pkg/database"
10 )
11 12 var (
13 // indexTipsBucketName is the name of the db bucket used to house the current tip of each index.
14 indexTipsBucketName = []byte("idxtips")
15 )
16 17 // The index manager tracks the current tip of each index by using a parent bucket that contains an entry for index.
18 //
19 // The serialized format for an index tip is:
20 //
21 // [<block hash><block height>],...
22 // Field Type Size
23 // block hash chainhash.Hash chainhash.HashSize
24 // block height uint32 4 bytes
25 26 // dbPutIndexerTip uses an existing database transaction to update or add the current tip for the given index to the
27 // provided values.
28 func dbPutIndexerTip(dbTx database.Tx, idxKey []byte, hash *chainhash.Hash, height int32) (e error) {
29 serialized := make([]byte, chainhash.HashSize+4)
30 copy(serialized, hash[:])
31 byteOrder.PutUint32(serialized[chainhash.HashSize:], uint32(height))
32 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
33 return indexesBucket.Put(idxKey, serialized)
34 }
35 36 // dbFetchIndexerTip uses an existing database transaction to retrieve the hash and height of the current tip for the
37 // provided index.
38 func dbFetchIndexerTip(dbTx database.Tx, idxKey []byte) (*chainhash.Hash, int32, error) {
39 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
40 serialized := indexesBucket.Get(idxKey)
41 if len(serialized) < chainhash.HashSize+4 {
42 return nil, 0, database.DBError{
43 ErrorCode: database.ErrCorruption,
44 Description: fmt.Sprintf(
45 "unexpected end of data for "+
46 "index %q tip", string(idxKey),
47 ),
48 }
49 }
50 var hash chainhash.Hash
51 copy(hash[:], serialized[:chainhash.HashSize])
52 height := int32(byteOrder.Uint32(serialized[chainhash.HashSize:]))
53 return &hash, height, nil
54 }
55 56 // dbIndexConnectBlock adds all of the index entries associated with the given block using the provided indexer and
57 // updates the tip of the indexer accordingly. An error will be returned if the current tip for the indexer is not the
58 // previous block for the passed block.
59 func dbIndexConnectBlock(
60 dbTx database.Tx, indexer Indexer, block *block.Block,
61 stxo []blockchain.SpentTxOut,
62 ) (e error) {
63 // Assert that the block being connected properly connects to the current tip of the index.
64 idxKey := indexer.Key()
65 var curTipHash *chainhash.Hash
66 if curTipHash, _, e = dbFetchIndexerTip(dbTx, idxKey); E.Chk(e) {
67 return e
68 }
69 if !curTipHash.IsEqual(&block.WireBlock().Header.PrevBlock) {
70 return AssertError(
71 fmt.Sprintf(
72 "dbIndexConnectBlock must be "+
73 "called with a block that extends the current index "+
74 "tip (%s, tip %s, block %s)", indexer.Name(),
75 curTipHash, block.Hash(),
76 ),
77 )
78 }
79 // Notify the indexer with the connected block so it can index it.
80 if e := indexer.ConnectBlock(dbTx, block, stxo); E.Chk(e) {
81 return e
82 }
83 // Update the current index tip.
84 return dbPutIndexerTip(dbTx, idxKey, block.Hash(), block.Height())
85 }
86 87 // dbIndexDisconnectBlock removes all of the index entries associated with the given block using the provided indexer
88 // and updates the tip of the indexer accordingly. An error will be returned if the current tip for the indexer is not
89 // the passed block.
90 func dbIndexDisconnectBlock(
91 dbTx database.Tx, indexer Indexer, block *block.Block,
92 stxo []blockchain.SpentTxOut,
93 ) (e error) {
94 // Assert that the block being disconnected is the current tip of the index.
95 idxKey := indexer.Key()
96 var curTipHash *chainhash.Hash
97 if curTipHash, _, e = dbFetchIndexerTip(dbTx, idxKey); E.Chk(e) {
98 return e
99 }
100 if !curTipHash.IsEqual(block.Hash()) {
101 return AssertError(
102 fmt.Sprintf(
103 "dbIndexDisconnectBlock must "+
104 "be called with the block at the current index tip "+
105 "(%s, tip %s, block %s)", indexer.Name(),
106 curTipHash, block.Hash(),
107 ),
108 )
109 }
110 // Notify the indexer with the disconnected block so it can remove all of
111 // the appropriate entries.
112 if e := indexer.DisconnectBlock(dbTx, block, stxo); E.Chk(e) {
113 return e
114 }
115 // Update the current index tip.
116 prevHash := &block.WireBlock().Header.PrevBlock
117 return dbPutIndexerTip(dbTx, idxKey, prevHash, block.Height()-1)
118 }
119 120 // Manager defines an index manager that manages multiple optional indexes and implements the blockchain. IndexManager
121 // interface so it can be seamlessly plugged into normal chain processing.
122 type Manager struct {
123 db database.DB
124 enabledIndexes []Indexer
125 }
126 127 // Ensure the Manager type implements the blockchain.IndexManager interface.
128 var _ blockchain.IndexManager = (*Manager)(nil)
129 130 // indexDropKey returns the key for an index which indicates it is in the process of being dropped.
131 func indexDropKey(idxKey []byte) []byte {
132 dropKey := make([]byte, len(idxKey)+1)
133 dropKey[0] = 'd'
134 copy(dropKey[1:], idxKey)
135 return dropKey
136 }
137 138 // maybeFinishDrops determines if each of the enabled indexes are in the middle of being dropped and finishes dropping
139 // them when the are. This is necessary because dropping and index has to be done in several atomic steps rather than
140 // one big atomic step due to the massive number of entries.
141 func (m *Manager) maybeFinishDrops(interrupt <-chan struct{}) (e error) {
142 indexNeedsDrop := make([]bool, len(m.enabledIndexes))
143 if e = m.db.View(
144 func(dbTx database.Tx) (e error) {
145 // None of the indexes needs to be dropped if the index tips bucket hasn't been created yet.
146 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
147 if indexesBucket == nil {
148 return nil
149 }
150 // Mark the indexer as requiring a drop if one is already in progress.
151 for i, indexer := range m.enabledIndexes {
152 dropKey := indexDropKey(indexer.Key())
153 if indexesBucket.Get(dropKey) != nil {
154 indexNeedsDrop[i] = true
155 }
156 }
157 return nil
158 },
159 ); E.Chk(e) {
160 return e
161 }
162 if interruptRequested(interrupt) {
163 return errInterruptRequested
164 }
165 // Finish dropping any of the enabled indexes that are already in the
166 // middle of being dropped.
167 for i, indexer := range m.enabledIndexes {
168 if !indexNeedsDrop[i] {
169 continue
170 }
171 I.C(
172 func() string {
173 return fmt.Sprintf("Resuming %s drop", indexer.Name())
174 },
175 )
176 if e = dropIndex(m.db, indexer.Key(), indexer.Name(), interrupt); E.Chk(e) {
177 return e
178 }
179 }
180 return nil
181 }
182 183 // maybeCreateIndexes determines if each of the enabled indexes have already been created and creates them if not.
184 func (m *Manager) maybeCreateIndexes(dbTx database.Tx) (e error) {
185 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
186 for _, indexer := range m.enabledIndexes {
187 // Nothing to do if the index tip already exists.
188 idxKey := indexer.Key()
189 if indexesBucket.Get(idxKey) != nil {
190 continue
191 }
192 // The tip for the index does not exist, so create it and invoke the create callback for the index so it can
193 // perform any one-time initialization it requires.
194 if e := indexer.Create(dbTx); E.Chk(e) {
195 return e
196 }
197 // Set the tip for the index to values which represent an uninitialized index.
198 e := dbPutIndexerTip(dbTx, idxKey, &chainhash.Hash{}, -1)
199 if e != nil {
200 return e
201 }
202 }
203 return nil
204 }
205 206 // Init initializes the enabled indexes. This is called during chain initialization and primarily consists of catching
207 // up all indexes to the current best chain tip. This is necessary since each index can be disabled and re-enabled at
208 // any time and attempting to catch-up indexes at the same time new blocks are being downloaded would lead to an overall
209 // longer time to catch up due to the I/O contention. This is part of the blockchain.IndexManager interface.
210 func (m *Manager) Init(chain *blockchain.BlockChain, interrupt <-chan struct{}) (e error) {
211 // Nothing to do when no indexes are enabled.
212 if len(m.enabledIndexes) == 0 {
213 return nil
214 }
215 if interruptRequested(interrupt) {
216 return errInterruptRequested
217 }
218 // Finish and drops that were previously interrupted.
219 if e = m.maybeFinishDrops(interrupt); E.Chk(e) {
220 return e
221 }
222 // Create the initial state for the indexes as needed.
223 e = m.db.Update(
224 func(dbTx database.Tx) (e error) {
225 // Create the bucket for the current tips as needed.
226 meta := dbTx.Metadata()
227 if _, e = meta.CreateBucketIfNotExists(indexTipsBucketName); E.Chk(e) {
228 return e
229 }
230 return m.maybeCreateIndexes(dbTx)
231 },
232 )
233 if E.Chk(e) {
234 return e
235 }
236 // Initialize each of the enabled indexes.
237 for _, indexer := range m.enabledIndexes {
238 if e = indexer.Init(); E.Chk(e) {
239 return e
240 }
241 }
242 // Rollback indexes to the main chain if their tip is an orphaned fork. This is fairly unlikely, but it can happen
243 // if the chain is reorganized while the index is disabled. This has to be done in reverse order because later
244 // indexes can depend on earlier ones.
245 var height int32
246 var hash *chainhash.Hash
247 for i := len(m.enabledIndexes); i > 0; i-- {
248 indexer := m.enabledIndexes[i-1]
249 // Fetch the current tip for the index.
250 e = m.db.View(
251 func(dbTx database.Tx) (e error) {
252 idxKey := indexer.Key()
253 hash, height, e = dbFetchIndexerTip(dbTx, idxKey)
254 return e
255 },
256 )
257 if e != nil {
258 return e
259 }
260 // Nothing to do if the index does not have any entries yet.
261 if height == -1 {
262 continue
263 }
264 // Loop until the tip is a block that exists in the main chain.
265 initialHeight := height
266 for !chain.MainChainHasBlock(hash) {
267 // At this point the index tip is orphaned, so load the orphaned block from the database directly and
268 // disconnect it from the index. The block has to be loaded directly since it is no longer in the main chain
269 // and thus the chain. BlockByHash function would error.
270 var blk *block.Block
271 e = m.db.View(
272 func(dbTx database.Tx) (e error) {
273 var blockBytes []byte
274 blockBytes, e = dbTx.FetchBlock(hash)
275 if e != nil {
276 return e
277 }
278 blk, e = block.NewFromBytes(blockBytes)
279 if e != nil {
280 return e
281 }
282 blk.SetHeight(height)
283 return e
284 },
285 )
286 if e != nil {
287 return e
288 }
289 // We'll also grab the set of outputs spent by this block so we can remove them from the index.
290 var spentTxos []blockchain.SpentTxOut
291 spentTxos, e = chain.FetchSpendJournal(blk)
292 if e != nil {
293 return e
294 }
295 // With the block and stxo set for that block retrieved, we can now update the index itself.
296 e = m.db.Update(
297 func(dbTx database.Tx) (e error) {
298 // Remove all of the index entries associated with the block and update the indexer tip.
299 e = dbIndexDisconnectBlock(
300 dbTx, indexer, blk, spentTxos,
301 )
302 if e != nil {
303 return e
304 }
305 // Update the tip to the previous block.
306 hash = &blk.WireBlock().Header.PrevBlock
307 height--
308 return nil
309 },
310 )
311 if e != nil {
312 return e
313 }
314 if interruptRequested(interrupt) {
315 return errInterruptRequested
316 }
317 }
318 if initialHeight != height {
319 I.F(
320 "removed %d orphaned blocks from %s (heights %d to %d)",
321 initialHeight-height,
322 indexer.Name(),
323 height+1,
324 initialHeight,
325 )
326 }
327 }
328 // Fetch the current tip heights for each index along with tracking the lowest one so the catchup code only needs to
329 // start at the earliest block and is able to skip connecting the block for the indexes that don't need it.
330 bestHeight := chain.BestSnapshot().Height
331 lowestHeight := bestHeight
332 indexerHeights := make([]int32, len(m.enabledIndexes))
333 e = m.db.View(
334 func(dbTx database.Tx) (e error) {
335 for i, indexer := range m.enabledIndexes {
336 idxKey := indexer.Key()
337 _, height, e := dbFetchIndexerTip(dbTx, idxKey)
338 if e != nil {
339 return e
340 }
341 T.F(
342 "current %s tip (height %d, hash %v)",
343 indexer.Name(),
344 height,
345 hash,
346 )
347 indexerHeights[i] = height
348 if height < lowestHeight {
349 lowestHeight = height
350 }
351 }
352 return nil
353 },
354 )
355 if e != nil {
356 return e
357 }
358 // Nothing to index if all of the indexes are caught up.
359 if lowestHeight == bestHeight {
360 return nil
361 }
362 // Create a progress logger for the indexing process below.
363 progressLogger := newBlockProgressLogger(
364 "Indexed",
365 // log.L,
366 )
367 // At this point, one or more indexes are behind the current best chain tip and need to be caught up, so log the
368 // details and loop through each block that needs to be indexed.
369 I.F(
370 "catching up indexes from height %d to %d",
371 lowestHeight,
372 bestHeight,
373 )
374 for height := lowestHeight + 1; height <= bestHeight; height++ {
375 // Load the block for the height since it is required to index it.
376 block, e := chain.BlockByHeight(height)
377 if e != nil {
378 return e
379 }
380 if interruptRequested(interrupt) {
381 return errInterruptRequested
382 }
383 // Connect the block for all indexes that need it.
384 var spentTxos []blockchain.SpentTxOut
385 for i, indexer := range m.enabledIndexes {
386 // Skip indexes that don't need to be updated with this block.
387 if indexerHeights[i] >= height {
388 continue
389 }
390 // When the index requires all of the referenced txouts and they haven't been loaded yet, they need to be
391 // retrieved from the spend journal.
392 if spentTxos == nil && indexNeedsInputs(indexer) {
393 spentTxos, e = chain.FetchSpendJournal(block)
394 if e != nil {
395 return e
396 }
397 }
398 e := m.db.Update(
399 func(dbTx database.Tx) (e error) {
400 return dbIndexConnectBlock(
401 dbTx, indexer, block, spentTxos,
402 )
403 },
404 )
405 if e != nil {
406 return e
407 }
408 indexerHeights[i] = height
409 }
410 // Log indexing progress.
411 progressLogger.LogBlockHeight(block)
412 if interruptRequested(interrupt) {
413 return errInterruptRequested
414 }
415 }
416 I.Ln("indexes caught up to height", bestHeight)
417 return nil
418 }
419 420 // indexNeedsInputs returns whether or not the index needs access to the txouts referenced by the transaction inputs
421 // being indexed.
422 func indexNeedsInputs(index Indexer) bool {
423 if idx, ok := index.(NeedsInputser); ok {
424 return idx.NeedsInputs()
425 }
426 return false
427 }
428 429 // // dbFetchTx looks up the passed transaction hash in the transaction index
430 // and loads it from the database.
431 // func dbFetchTx(// dbTx database.Tx, hash *chainhash.Hash) (*wire.MsgTx,
432 // error) {
433 // // Look up the location of the transaction.
434 // blockRegion, e := dbFetchTxIndexEntry(dbTx, hash)
435 // if e != nil {
436 // DB// return nil, e
437 // }
438 // if blockRegion == nil {
439 // return nil, fmt.Errorf("transaction %v not found", hash)
440 // }
441 // // Load the raw transaction bytes from the database.
442 // txBytes, e := dbTx.FetchBlockRegion(blockRegion)
443 // if e != nil {
444 // DB// return nil, e
445 // }
446 // // Deserialize the transaction.
447 // var msgTx wire.MsgTx
448 // e = msgTx.Deserialize(bytes.NewReader(txBytes))
449 // if e != nil {
450 // DB// return nil, e
451 // }
452 // return &msgTx, nil
453 // }
454 455 // ConnectBlock must be invoked when a block is extending the main chain. It keeps track of the state of each index it
456 // is managing, performs some sanity checks, and invokes each indexer. This is part of the blockchain.IndexManager
457 // interface.
458 func (m *Manager) ConnectBlock(
459 dbTx database.Tx, block *block.Block,
460 stxos []blockchain.SpentTxOut,
461 ) (e error) {
462 // Call each of the currently active optional indexes with the block being connected so they can update accordingly.
463 for _, index := range m.enabledIndexes {
464 e := dbIndexConnectBlock(dbTx, index, block, stxos)
465 if e != nil {
466 return e
467 }
468 }
469 return nil
470 }
471 472 // DisconnectBlock must be invoked when a block is being disconnected from the end of the main chain. It keeps track of
473 // the state of each index it is managing, performs some sanity checks, and invokes each indexer to remove the index
474 // entries associated with the block. This is part of the blockchain.IndexManager interface.
475 func (m *Manager) DisconnectBlock(
476 dbTx database.Tx, block *block.Block,
477 stxo []blockchain.SpentTxOut,
478 ) (e error) {
479 // Call each of the currently active optional indexes with the block being disconnected so they can update
480 // accordingly.
481 for _, index := range m.enabledIndexes {
482 e := dbIndexDisconnectBlock(dbTx, index, block, stxo)
483 if e != nil {
484 return e
485 }
486 }
487 return nil
488 }
489 490 // NewManager returns a new index manager with the provided indexes enabled. The manager returned satisfies the
491 // blockchain. IndexManager interface and thus cleanly plugs into the normal blockchain processing path.
492 func NewManager(db database.DB, enabledIndexes []Indexer) *Manager {
493 return &Manager{
494 db: db,
495 enabledIndexes: enabledIndexes,
496 }
497 }
498 499 // dropIndex drops the passed index from the database. Since indexes can be massive, it deletes the index in multiple
500 // database transactions in order to keep memory usage to reasonable levels. It also marks the drop in progress so the
501 // drop can be resumed if it is stopped before it is done before the index can be used again.
502 func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan struct{}) (e error) {
503 // Nothing to do if the index doesn't already exist.
504 var needsDelete bool
505 e = db.View(
506 func(dbTx database.Tx) (e error) {
507 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
508 if indexesBucket != nil && indexesBucket.Get(idxKey) != nil {
509 needsDelete = true
510 }
511 return nil
512 },
513 )
514 if E.Chk(e) {
515 return
516 }
517 if !needsDelete {
518 W.F("not dropping %s because it does not exist", idxName)
519 return
520 }
521 // Mark that the index is in the process of being dropped so that it can be resumed on the next start if interrupted
522 // before the process is complete.
523 I.F("dropping all %s entries. This might take a while...", idxName)
524 e = db.Update(
525 func(dbTx database.Tx) (e error) {
526 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
527 return indexesBucket.Put(indexDropKey(idxKey), idxKey)
528 },
529 )
530 if e != nil {
531 return e
532 }
533 // Since the indexes can be so large, attempting to simply delete the bucket in a single database transaction would
534 // result in massive memory usage and likely crash many systems due to ulimits. In order to avoid this, use a cursor
535 // to delete a maximum number of entries out of the bucket at a time. Recurse buckets depth-first to delete any
536 // sub-buckets.
537 const maxDeletions = 2000000
538 var totalDeleted uint64
539 // Recurse through all buckets in the index, cataloging each for later deletion.
540 var subBuckets [][][]byte
541 var subBucketClosure func(database.Tx, []byte, [][]byte) error
542 subBucketClosure = func(
543 dbTx database.Tx,
544 subBucket []byte, tlBucket [][]byte,
545 ) (e error) {
546 // Get full bucket name and append to subBuckets for later deletion.
547 var bucketName [][]byte
548 if (tlBucket == nil) || (len(tlBucket) == 0) {
549 bucketName = append(bucketName, subBucket)
550 } else {
551 bucketName = append(tlBucket, subBucket)
552 }
553 subBuckets = append(subBuckets, bucketName)
554 // Recurse sub-buckets to append to subBuckets slice.
555 bucket := dbTx.Metadata()
556 for _, subBucketName := range bucketName {
557 bucket = bucket.Bucket(subBucketName)
558 }
559 return bucket.ForEachBucket(
560 func(k []byte) (e error) {
561 return subBucketClosure(dbTx, k, bucketName)
562 },
563 )
564 }
565 // Call subBucketClosure with top-level bucket.
566 e = db.View(
567 func(dbTx database.Tx) (e error) {
568 return subBucketClosure(dbTx, idxKey, nil)
569 },
570 )
571 if e != nil {
572 return nil
573 }
574 // Iterate through each sub-bucket in reverse, deepest-first, deleting all keys inside them and then dropping the
575 // buckets themselves.
576 for i := range subBuckets {
577 bucketName := subBuckets[len(subBuckets)-1-i]
578 // Delete maxDeletions key/value pairs at a time.
579 for numDeleted := maxDeletions; numDeleted == maxDeletions; {
580 numDeleted = 0
581 e = db.Update(
582 func(dbTx database.Tx) (e error) {
583 subBucket := dbTx.Metadata()
584 for _, subBucketName := range bucketName {
585 subBucket = subBucket.Bucket(subBucketName)
586 }
587 cursor := subBucket.Cursor()
588 for ok := cursor.First(); ok; ok = cursor.Next() &&
589 numDeleted < maxDeletions {
590 if e := cursor.Delete(); E.Chk(e) {
591 return e
592 }
593 numDeleted++
594 }
595 return nil
596 },
597 )
598 if e != nil {
599 return e
600 }
601 if numDeleted > 0 {
602 totalDeleted += uint64(numDeleted)
603 I.F(
604 "deleted %d keys (%d total) from %s", numDeleted,
605 totalDeleted, idxName,
606 )
607 }
608 }
609 if interruptRequested(interrupt) {
610 return errInterruptRequested
611 }
612 // Drop the bucket itself.
613 e = db.Update(
614 func(dbTx database.Tx) (e error) {
615 bucket := dbTx.Metadata()
616 for j := 0; j < len(bucketName)-1; j++ {
617 bucket = bucket.Bucket(bucketName[j])
618 }
619 return bucket.DeleteBucket(bucketName[len(bucketName)-1])
620 },
621 )
622 if e != nil {
623 }
624 }
625 // Call extra index specific deinitialization for the transaction index.
626 if idxName == txIndexName {
627 if e = dropBlockIDIndex(db); E.Chk(e) {
628 return e
629 }
630 }
631 // Remove the index tip, index bucket, and in-progress drop flag now that all index entries have been removed.
632 e = db.Update(
633 func(dbTx database.Tx) (e error) {
634 meta := dbTx.Metadata()
635 indexesBucket := meta.Bucket(indexTipsBucketName)
636 if e := indexesBucket.Delete(idxKey); E.Chk(e) {
637 return e
638 }
639 return indexesBucket.Delete(indexDropKey(idxKey))
640 },
641 )
642 if e != nil {
643 return e
644 }
645 I.Ln("dropped", idxName)
646 return nil
647 }
648