1 package chainclient
2 3 import (
4 "container/list"
5 "encoding/hex"
6 "errors"
7 "fmt"
8 "github.com/p9c/p9/pkg/btcaddr"
9 "github.com/p9c/p9/pkg/chaincfg"
10 "sync"
11 "sync/atomic"
12 "time"
13 14 "github.com/p9c/p9/pkg/qu"
15 16 "github.com/p9c/p9/pkg/btcjson"
17 "github.com/p9c/p9/pkg/chainhash"
18 "github.com/p9c/p9/pkg/txscript"
19 "github.com/p9c/p9/pkg/util"
20 am "github.com/p9c/p9/pkg/waddrmgr"
21 "github.com/p9c/p9/pkg/wire"
22 tm "github.com/p9c/p9/pkg/wtxmgr"
23 )
24 25 var (
26 // ErrBitcoindClientShuttingDown is an error returned when we attempt to receive a notification for a specific item
27 // and the bitcoind client is in the middle of shutting down.
28 ErrBitcoindClientShuttingDown = errors.New("client is shutting down")
29 )
30 31 // BitcoindClient represents a persistent client connection to a bitcoind server for information regarding the current
32 // best block chain.
33 type BitcoindClient struct {
34 started int32 // To be used atomically.
35 stopped int32 // To be used atomically.
36 // birthday is the earliest time for which we should begin scanning the chain.
37 birthday time.Time
38 // chainParams are the parameters of the current chain this client is active under.
39 chainParams *chaincfg.Params
40 // id is the unique ID of this client assigned by the backing bitcoind connection.
41 id uint64
42 // chainConn is the backing client to our rescan client that contains the RPC and ZMQ connections to a bitcoind
43 // node.
44 chainConn *BitcoindConn
45 // bestBlock keeps track of the tip of the current best chain.
46 bestBlockMtx sync.RWMutex
47 bestBlock am.BlockStamp
48 // notifyBlocks signals whether the client is sending block notifications to the caller.
49 notifyBlocks uint32
50 // rescanUpdate is a channel will be sent items that we should match transactions against while processing a chain
51 // rescan to determine if they are relevant to the client.
52 rescanUpdate chan interface{}
53 // watchedAddresses, watchedOutPoints, and watchedTxs are the set of items we should match transactions against
54 // while processing a chain rescan to determine if they are relevant to the client.
55 watchMtx sync.RWMutex
56 watchedAddresses map[string]struct{}
57 watchedOutPoints map[wire.OutPoint]struct{}
58 watchedTxs map[chainhash.Hash]struct{}
59 // mempool keeps track of all relevant transactions that have yet to be confirmed. This is used to shortcut the
60 // filtering process of a transaction when a new confirmed transaction notification is received.
61 //
62 // NOTE: This requires the watchMtx to be held.
63 mempool map[chainhash.Hash]struct{}
64 // expiredMempool keeps track of a set of confirmed transactions along with the height at which they were included
65 // in a block. These transactions will then be removed from the mempool after a period of 288 blocks. This is done
66 // to ensure the transactions are safe from a reorg in the chain.
67 //
68 // NOTE: This requires the watchMtx to be held.
69 expiredMempool map[int32]map[chainhash.Hash]struct{}
70 // notificationQueue is a concurrent unbounded queue that handles dispatching notifications to the subscriber of
71 // this client.
72 //
73 // TODO: Rather than leaving this as an unbounded queue for all types of notifications, try dropping ones where a
74 // later enqueued notification can fully invalidate one waiting to be processed. For example, BlockConnected
75 // notifications for greater block heights can remove the need to process earlier notifications still waiting to be
76 // processed.
77 notificationQueue *ConcurrentQueue
78 // zmqTxNtfns is a channel through which ZMQ transaction events will be retrieved from the backing bitcoind
79 // connection.
80 zmqTxNtfns chan *wire.MsgTx
81 // zmqBlockNtfns is a channel through which ZMQ block events will be retrieved from the backing bitcoind connection.
82 zmqBlockNtfns chan *wire.Block
83 quit qu.C
84 wg sync.WaitGroup
85 }
86 87 // A compile-time check to ensure that BitcoindClient satisfies the chainclient.Interface interface.
88 var _ Interface = (*BitcoindClient)(nil)
89 90 // BackEnd returns the name of the driver.
91 func (c *BitcoindClient) BackEnd() string {
92 return "bitcoind"
93 }
94 95 // GetBestBlock returns the highest block known to bitcoind.
96 func (c *BitcoindClient) GetBestBlock() (*chainhash.Hash, int32, error) {
97 bcInfo, e := c.chainConn.client.GetBlockChainInfo()
98 if e != nil {
99 return nil, 0, e
100 }
101 hash, e := chainhash.NewHashFromStr(bcInfo.BestBlockHash)
102 if e != nil {
103 return nil, 0, e
104 }
105 return hash, bcInfo.Blocks, nil
106 }
107 108 // GetBlockHeight returns the height for the hash, if known, or returns an error.
109 func (c *BitcoindClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) {
110 header, e := c.chainConn.client.GetBlockHeaderVerbose(hash)
111 if e != nil {
112 return 0, e
113 }
114 return header.Height, nil
115 }
116 117 // GetBlock returns a block from the hash.
118 func (c *BitcoindClient) GetBlock(hash *chainhash.Hash) (*wire.Block, error) {
119 return c.chainConn.client.GetBlock(hash)
120 }
121 122 // GetBlockVerbose returns a verbose block from the hash.
123 func (c *BitcoindClient) GetBlockVerbose(
124 hash *chainhash.Hash,
125 ) (*btcjson.GetBlockVerboseResult, error) {
126 return c.chainConn.client.GetBlockVerbose(hash)
127 }
128 129 // GetBlockHash returns a block hash from the height.
130 func (c *BitcoindClient) GetBlockHash(height int64) (*chainhash.Hash, error) {
131 return c.chainConn.client.GetBlockHash(height)
132 }
133 134 // GetBlockHeader returns a block header from the hash.
135 func (c *BitcoindClient) GetBlockHeader(
136 hash *chainhash.Hash,
137 ) (*wire.BlockHeader, error) {
138 return c.chainConn.client.GetBlockHeader(hash)
139 }
140 141 // GetBlockHeaderVerbose returns a block header from the hash.
142 func (c *BitcoindClient) GetBlockHeaderVerbose(
143 hash *chainhash.Hash,
144 ) (*btcjson.GetBlockHeaderVerboseResult, error) {
145 return c.chainConn.client.GetBlockHeaderVerbose(hash)
146 }
147 148 // GetRawTransactionVerbose returns a transaction from the tx hash.
149 func (c *BitcoindClient) GetRawTransactionVerbose(
150 hash *chainhash.Hash,
151 ) (*btcjson.TxRawResult, error) {
152 return c.chainConn.client.GetRawTransactionVerbose(hash)
153 }
154 155 // GetTxOut returns a txout from the outpoint info provided.
156 func (c *BitcoindClient) GetTxOut(
157 txHash *chainhash.Hash, index uint32,
158 mempool bool,
159 ) (*btcjson.GetTxOutResult, error) {
160 return c.chainConn.client.GetTxOut(txHash, index, mempool)
161 }
162 163 // SendRawTransaction sends a raw transaction via bitcoind.
164 func (c *BitcoindClient) SendRawTransaction(
165 tx *wire.MsgTx,
166 allowHighFees bool,
167 ) (*chainhash.Hash, error) {
168 return c.chainConn.client.SendRawTransaction(tx, allowHighFees)
169 }
170 171 // Notifications returns a channel to retrieve notifications from.
172 //
173 // NOTE: This is part of the chainclient.Interface interface.
174 func (c *BitcoindClient) Notifications() <-chan interface{} {
175 return c.notificationQueue.ChanOut()
176 }
177 178 // NotifyReceived allows the chain backend to notify the caller whenever a transaction pays to any of the given
179 // addresses.
180 //
181 // NOTE: This is part of the chainclient.Interface interface.
182 func (c *BitcoindClient) NotifyReceived(addrs []btcaddr.Address) (e error) {
183 if e = c.NotifyBlocks(); E.Chk(e) {
184 }
185 select {
186 case c.rescanUpdate <- addrs:
187 case <-c.quit.Wait():
188 return ErrBitcoindClientShuttingDown
189 }
190 return nil
191 }
192 193 // NotifySpent allows the chain backend to notify the caller whenever a transaction spends any of the given outpoints.
194 func (c *BitcoindClient) NotifySpent(outPoints []*wire.OutPoint) (e error) {
195 if e = c.NotifyBlocks(); E.Chk(e) {
196 }
197 select {
198 case c.rescanUpdate <- outPoints:
199 case <-c.quit.Wait():
200 return ErrBitcoindClientShuttingDown
201 }
202 return nil
203 }
204 205 // NotifyTx allows the chain backend to notify the caller whenever any of the given transactions confirm within the
206 // chain.
207 func (c *BitcoindClient) NotifyTx(txids []chainhash.Hash) (e error) {
208 if e = c.NotifyBlocks(); E.Chk(e) {
209 }
210 select {
211 case c.rescanUpdate <- txids:
212 case <-c.quit.Wait():
213 return ErrBitcoindClientShuttingDown
214 }
215 return nil
216 }
217 218 // NotifyBlocks allows the chain backend to notify the caller whenever a block is connected or disconnected.
219 //
220 // NOTE: This is part of the chainclient.Interface interface.
221 func (c *BitcoindClient) NotifyBlocks() (e error) {
222 atomic.StoreUint32(&c.notifyBlocks, 1)
223 return nil
224 }
225 226 // shouldNotifyBlocks determines whether the client should send block notifications to the caller.
227 func (c *BitcoindClient) shouldNotifyBlocks() bool {
228 return atomic.LoadUint32(&c.notifyBlocks) == 1
229 }
230 231 // LoadTxFilter uses the given filters to what we should match transactions against to determine if they are relevant to
232 // the client. The reset argument is used to reset the current filters.
233 //
234 // The current filters supported are of the following types:
235 // []util.Address
236 // []wire.OutPoint
237 // []*wire.OutPoint
238 // map[wire.OutPoint]util.Address
239 // []chainhash.Hash
240 // []*chainhash.Hash
241 func (c *BitcoindClient) LoadTxFilter(reset bool, filters ...interface{}) (e error) {
242 if reset {
243 select {
244 case c.rescanUpdate <- struct{}{}:
245 case <-c.quit.Wait():
246 return ErrBitcoindClientShuttingDown
247 }
248 }
249 updateFilter := func(filter interface{}) (e error) {
250 select {
251 case c.rescanUpdate <- filter:
252 case <-c.quit.Wait():
253 return ErrBitcoindClientShuttingDown
254 }
255 return nil
256 }
257 // In order to make this operation atomic, we'll iterate through the filters twice: the first to ensure there aren't
258 // any unsupported filter types, and the second to actually update our filters.
259 for _, filter := range filters {
260 switch filter := filter.(type) {
261 case []btcaddr.Address, []wire.OutPoint, []*wire.OutPoint,
262 map[wire.OutPoint]btcaddr.Address, []chainhash.Hash,
263 []*chainhash.Hash:
264 // Proceed to check the next filter type.
265 default:
266 return fmt.Errorf("unsupported filter type %Ter", filter)
267 }
268 }
269 for _, filter := range filters {
270 if e := updateFilter(filter); E.Chk(e) {
271 return e
272 }
273 }
274 return nil
275 }
276 277 // RescanBlocks rescans any blocks passed, returning only the blocks that matched as []json.BlockDetails.
278 func (c *BitcoindClient) RescanBlocks(
279 blockHashes []chainhash.Hash,
280 ) ([]btcjson.RescannedBlock, error) {
281 rescannedBlocks := make([]btcjson.RescannedBlock, 0, len(blockHashes))
282 for _, hash := range blockHashes {
283 header, e := c.GetBlockHeaderVerbose(&hash)
284 if e != nil {
285 E.F(
286 "unable to get header %s from bitcoind: %s",
287 hash, e,
288 )
289 continue
290 }
291 block, e := c.GetBlock(&hash)
292 if e != nil {
293 E.F(
294 "unable to get block %s from bitcoind: %s",
295 hash, e,
296 )
297 continue
298 }
299 relevantTxs, e := c.filterBlock(block, header.Height, false)
300 if e != nil {
301 }
302 if len(relevantTxs) > 0 {
303 rescannedBlock := btcjson.RescannedBlock{
304 Hash: hash.String(),
305 }
306 for _, tx := range relevantTxs {
307 rescannedBlock.Transactions = append(
308 rescannedBlock.Transactions,
309 hex.EncodeToString(tx.SerializedTx),
310 )
311 }
312 rescannedBlocks = append(rescannedBlocks, rescannedBlock)
313 }
314 }
315 return rescannedBlocks, nil
316 }
317 318 // Rescan rescans from the block with the given hash until the current block, after adding the passed addresses and
319 // outpoints to the client's watch list.
320 func (c *BitcoindClient) Rescan(
321 blockHash *chainhash.Hash,
322 addresses []btcaddr.Address, outPoints map[wire.OutPoint]btcaddr.Address,
323 ) (e error) {
324 // A block hash is required to use as the starting point of the rescan.
325 if blockHash == nil {
326 return errors.New("rescan requires a starting block hash")
327 }
328 // We'll then update our filters with the given outpoints and addresses.
329 select {
330 case c.rescanUpdate <- addresses:
331 case <-c.quit.Wait():
332 return ErrBitcoindClientShuttingDown
333 }
334 select {
335 case c.rescanUpdate <- outPoints:
336 case <-c.quit.Wait():
337 return ErrBitcoindClientShuttingDown
338 }
339 // Once the filters have been updated, we can begin the rescan.
340 select {
341 case c.rescanUpdate <- *blockHash:
342 case <-c.quit.Wait():
343 return ErrBitcoindClientShuttingDown
344 }
345 return nil
346 }
347 348 // Start initializes the bitcoind rescan client using the backing bitcoind connection and starts all goroutines
349 // necessary in order to process rescans and ZMQ notifications.
350 //
351 // NOTE: This is part of the chainclient.Interface interface.
352 func (c *BitcoindClient) Start() (e error) {
353 if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
354 return nil
355 }
356 // Start the notification queue and immediately dispatch a ClientConnected notification to the caller. This is
357 // needed as some of the callers will require this notification before proceeding.
358 c.notificationQueue.Start()
359 c.notificationQueue.ChanIn() <- ClientConnected{}
360 // Retrieve the best block of the chain.
361 bestHash, bestHeight, e := c.GetBestBlock()
362 if e != nil {
363 return fmt.Errorf("unable to retrieve best block: %v", e)
364 }
365 bestHeader, e := c.GetBlockHeaderVerbose(bestHash)
366 if e != nil {
367 return fmt.Errorf(
368 "unable to retrieve header for best block: "+
369 "%v", e,
370 )
371 }
372 c.bestBlockMtx.Lock()
373 c.bestBlock = am.BlockStamp{
374 Hash: *bestHash,
375 Height: bestHeight,
376 Timestamp: time.Unix(bestHeader.Time, 0),
377 }
378 c.bestBlockMtx.Unlock()
379 // Once the client has started successfully, we'll include it in the set of rescan clients of the backing bitcoind
380 // connection in order to received ZMQ event notifications.
381 c.chainConn.AddClient(c)
382 c.wg.Add(2)
383 go c.rescanHandler()
384 go c.ntfnHandler()
385 return nil
386 }
387 388 // Stop stops the bitcoind rescan client from processing rescans and ZMQ notifications.
389 //
390 // NOTE: This is part of the chainclient.Interface interface.
391 func (c *BitcoindClient) Stop() {
392 if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
393 return
394 }
395 c.quit.Q()
396 // Remove this client's reference from the bitcoind connection to prevent sending notifications to it after it's
397 // been stopped.
398 c.chainConn.RemoveClient(c.id)
399 c.notificationQueue.Stop()
400 }
401 402 // WaitForShutdown blocks until the client has finished disconnecting and all handlers have exited.
403 //
404 // NOTE: This is part of the chainclient.Interface interface.
405 func (c *BitcoindClient) WaitForShutdown() {
406 c.wg.Wait()
407 }
408 409 // rescanHandler handles the logic needed for the caller to trigger a chain rescan.
410 //
411 // NOTE: This must be called as a goroutine.
412 func (c *BitcoindClient) rescanHandler() {
413 defer c.wg.Done()
414 for {
415 select {
416 case update := <-c.rescanUpdate:
417 switch update := update.(type) {
418 // We're clearing the filters.
419 case struct{}:
420 c.watchMtx.Lock()
421 c.watchedOutPoints = make(map[wire.OutPoint]struct{})
422 c.watchedAddresses = make(map[string]struct{})
423 c.watchedTxs = make(map[chainhash.Hash]struct{})
424 c.watchMtx.Unlock()
425 // We're adding the addresses to our filter.
426 case []btcaddr.Address:
427 c.watchMtx.Lock()
428 for _, addr := range update {
429 c.watchedAddresses[addr.String()] = struct{}{}
430 }
431 c.watchMtx.Unlock()
432 // We're adding the outpoints to our filter.
433 case []wire.OutPoint:
434 c.watchMtx.Lock()
435 for _, op := range update {
436 c.watchedOutPoints[op] = struct{}{}
437 }
438 c.watchMtx.Unlock()
439 case []*wire.OutPoint:
440 c.watchMtx.Lock()
441 for _, op := range update {
442 c.watchedOutPoints[*op] = struct{}{}
443 }
444 c.watchMtx.Unlock()
445 // We're adding the outpoints that map to the scripts that we should scan for to our filter.
446 case map[wire.OutPoint]btcaddr.Address:
447 c.watchMtx.Lock()
448 for op := range update {
449 c.watchedOutPoints[op] = struct{}{}
450 }
451 c.watchMtx.Unlock()
452 // We're adding the transactions to our filter.
453 case []chainhash.Hash:
454 c.watchMtx.Lock()
455 for _, txid := range update {
456 c.watchedTxs[txid] = struct{}{}
457 }
458 c.watchMtx.Unlock()
459 case []*chainhash.Hash:
460 c.watchMtx.Lock()
461 for _, txid := range update {
462 c.watchedTxs[*txid] = struct{}{}
463 }
464 c.watchMtx.Unlock()
465 // We're starting a rescan from the hash.
466 case chainhash.Hash:
467 if e := c.rescan(update); E.Chk(e) {
468 E.Ln(
469 "unable to complete chain rescan:", e,
470 )
471 }
472 default:
473 W.F(
474 "received unexpected filter type %Ter", update,
475 )
476 }
477 case <-c.quit.Wait():
478 return
479 }
480 }
481 }
482 483 // ntfnHandler handles the logic to retrieve ZMQ notifications from the backing bitcoind connection.
484 //
485 // NOTE: This must be called as a goroutine.
486 func (c *BitcoindClient) ntfnHandler() {
487 defer c.wg.Done()
488 for {
489 select {
490 case tx := <-c.zmqTxNtfns:
491 var e error
492 if _, _, e = c.filterTx(tx, nil, true); E.Chk(e) {
493 E.F(
494 "unable to filter transaction %v: %v %s",
495 tx.TxHash(), e,
496 )
497 }
498 case newBlock := <-c.zmqBlockNtfns:
499 // If the new block's previous hash matches the best hash known to us, then the new block is the next
500 // successor, so we'll update our best block to reflect this and determine if this new block matches any of
501 // our existing filters.
502 c.bestBlockMtx.Lock()
503 bestBlock := c.bestBlock
504 c.bestBlockMtx.Unlock()
505 if newBlock.Header.PrevBlock == bestBlock.Hash {
506 newBlockHeight := bestBlock.Height + 1
507 _, e := c.filterBlock(
508 newBlock, newBlockHeight, true,
509 )
510 if e != nil {
511 E.F(
512 "unable to filter block %v: %v",
513 newBlock.BlockHash(), e,
514 )
515 continue
516 }
517 // With the block successfully filtered, we'll make it our new best block.
518 bestBlock.Hash = newBlock.BlockHash()
519 bestBlock.Height = newBlockHeight
520 bestBlock.Timestamp = newBlock.Header.Timestamp
521 c.bestBlockMtx.Lock()
522 c.bestBlock = bestBlock
523 c.bestBlockMtx.Unlock()
524 continue
525 }
526 // Otherwise, we've encountered a reorg.
527 if e := c.reorg(bestBlock, newBlock); E.Chk(e) {
528 E.F(
529 "unable to process chain reorg:", e,
530 )
531 }
532 case <-c.quit.Wait():
533 return
534 }
535 }
536 }
537 538 // SetBirthday sets the birthday of the bitcoind rescan client.
539 //
540 // NOTE: This should be done before the client has been started in order for it to properly carry its duties.
541 func (c *BitcoindClient) SetBirthday(t time.Time) {
542 c.birthday = t
543 }
544 545 // BlockStamp returns the latest block notified by the client, or an error if the client has been shut down.
546 func (c *BitcoindClient) BlockStamp() (*am.BlockStamp, error) {
547 c.bestBlockMtx.RLock()
548 bestBlock := c.bestBlock
549 c.bestBlockMtx.RUnlock()
550 return &bestBlock, nil
551 }
552 553 // onBlockConnected is a callback that's executed whenever a new block has been detected. This will queue a
554 // BlockConnected notification to the caller.
555 func (c *BitcoindClient) onBlockConnected(
556 hash *chainhash.Hash, height int32,
557 timestamp time.Time,
558 ) {
559 if c.shouldNotifyBlocks() {
560 select {
561 case c.notificationQueue.ChanIn() <- BlockConnected{
562 Block: tm.Block{
563 Hash: *hash,
564 Height: height,
565 },
566 Time: timestamp,
567 }:
568 case <-c.quit.Wait():
569 }
570 }
571 }
572 573 // onFilteredBlockConnected is an alternative callback that's executed whenever a new block has been detected. It serves
574 // the same purpose as onBlockConnected, but it also includes a list of the relevant transactions found within the block
575 // being connected. This will queue a FilteredBlockConnected notification to the caller.
576 func (c *BitcoindClient) onFilteredBlockConnected(
577 height int32,
578 header *wire.BlockHeader, relevantTxs []*tm.TxRecord,
579 ) {
580 if c.shouldNotifyBlocks() {
581 select {
582 case c.notificationQueue.ChanIn() <- FilteredBlockConnected{
583 Block: &tm.BlockMeta{
584 Block: tm.Block{
585 Hash: header.BlockHash(),
586 Height: height,
587 },
588 Time: header.Timestamp,
589 },
590 RelevantTxs: relevantTxs,
591 }:
592 case <-c.quit.Wait():
593 }
594 }
595 }
596 597 // onBlockDisconnected is a callback that's executed whenever a block has been disconnected. This will queue a
598 // BlockDisconnected notification to the caller with the details of the block being disconnected.
599 func (c *BitcoindClient) onBlockDisconnected(
600 hash *chainhash.Hash, height int32,
601 timestamp time.Time,
602 ) {
603 if c.shouldNotifyBlocks() {
604 select {
605 case c.notificationQueue.ChanIn() <- BlockDisconnected{
606 Block: tm.Block{
607 Hash: *hash,
608 Height: height,
609 },
610 Time: timestamp,
611 }:
612 case <-c.quit.Wait():
613 }
614 }
615 }
616 617 // onRelevantTx is a callback that's executed whenever a transaction is relevant to the caller. This means that the
618 // transaction matched a specific item in the client's different filters. This will queue a RelevantTx notification to
619 // the caller.
620 func (c *BitcoindClient) onRelevantTx(
621 tx *tm.TxRecord,
622 blockDetails *btcjson.BlockDetails,
623 ) {
624 block, e := parseBlock(blockDetails)
625 if e != nil {
626 E.Ln(
627 "unable to send onRelevantTx notification, failed parse block:",
628 e,
629 )
630 return
631 }
632 select {
633 case c.notificationQueue.ChanIn() <- RelevantTx{
634 TxRecord: tx,
635 Block: block,
636 }:
637 case <-c.quit.Wait():
638 }
639 }
640 641 // onRescanProgress is a callback that's executed whenever a rescan is in progress. This will queue a RescanProgress
642 // notification to the caller with the current rescan progress details.
643 func (c *BitcoindClient) onRescanProgress(
644 hash *chainhash.Hash, height int32,
645 timestamp time.Time,
646 ) {
647 select {
648 case c.notificationQueue.ChanIn() <- &RescanProgress{
649 Hash: hash,
650 Height: height,
651 Time: timestamp,
652 }:
653 case <-c.quit.Wait():
654 }
655 }
656 657 // onRescanFinished is a callback that's executed whenever a rescan has finished. This will queue a RescanFinished
658 // notification to the caller with the details of the last block in the range of the rescan.
659 func (c *BitcoindClient) onRescanFinished(
660 hash *chainhash.Hash, height int32,
661 timestamp time.Time,
662 ) {
663 I.F(
664 "rescan finished at %d (%s)",
665 height, hash,
666 )
667 select {
668 case c.notificationQueue.ChanIn() <- &RescanFinished{
669 Hash: hash,
670 Height: height,
671 Time: timestamp,
672 }:
673 case <-c.quit.Wait():
674 }
675 }
676 677 // reorg processes a reorganization during chain synchronization. This is separate from a rescan's handling of a reorg.
678 // This will rewind back until it finds a common ancestor and notify all the new blocks since then.
679 func (c *BitcoindClient) reorg(
680 currentBlock am.BlockStamp,
681 reorgBlock *wire.Block,
682 ) (e error) {
683 D.Ln("possible reorg at block", reorgBlock.BlockHash())
684 // Retrieve the best known height based on the block which caused the reorg. This way, we can preserve the chain of
685 // blocks we need to retrieve.
686 bestHash := reorgBlock.BlockHash()
687 var bestHeight int32
688 bestHeight, e = c.GetBlockHeight(&bestHash)
689 if e != nil {
690 return e
691 }
692 if bestHeight < currentBlock.Height {
693 D.Ln("detected multiple reorgs")
694 return nil
695 }
696 // We'll now keep track of all the blocks known to the *chain*, starting from the best block known to us until the
697 // best block in the chain. This will let us fast-forward despite any future reorgs.
698 blocksToNotify := list.New()
699 blocksToNotify.PushFront(reorgBlock)
700 previousBlock := reorgBlock.Header.PrevBlock
701 for i := bestHeight - 1; i >= currentBlock.Height; i-- {
702 var block *wire.Block
703 block, e = c.GetBlock(&previousBlock)
704 if e != nil {
705 return e
706 }
707 blocksToNotify.PushFront(block)
708 previousBlock = block.Header.PrevBlock
709 }
710 // Rewind back to the last common ancestor block using the previous block hash from each header to avoid any race
711 // conditions. If we encounter more reorgs, they'll be queued and we'll repeat the cycle.
712 //
713 // We'll start by retrieving the header to the best block known to us.
714 currentHeader, e := c.GetBlockHeader(¤tBlock.Hash)
715 if e != nil {
716 return e
717 }
718 // Then, we'll walk backwards in the chain until we find our common ancestor.
719 for previousBlock != currentHeader.PrevBlock {
720 // Since the previous hashes don't match, the current block has been reorged out of the chain, so we should send
721 // a BlockDisconnected notification for it.
722 D.F(
723 "disconnecting block %d (%v) %s",
724 currentBlock.Height,
725 currentBlock.Hash,
726 )
727 c.onBlockDisconnected(
728 ¤tBlock.Hash, currentBlock.Height,
729 currentBlock.Timestamp,
730 )
731 // Our current block should now reflect the previous one to continue the common ancestor search.
732 currentHeader, e = c.GetBlockHeader(¤tHeader.PrevBlock)
733 if e != nil {
734 return e
735 }
736 currentBlock.Height--
737 currentBlock.Hash = currentHeader.PrevBlock
738 currentBlock.Timestamp = currentHeader.Timestamp
739 // Store the correct block in our list in order to notify it once we've found our common ancestor.
740 block, e := c.GetBlock(&previousBlock)
741 if e != nil {
742 return e
743 }
744 blocksToNotify.PushFront(block)
745 previousBlock = block.Header.PrevBlock
746 }
747 // Disconnect the last block from the old chain. Since the previous block remains the same between the old and new
748 // chains, the tip will now be the last common ancestor.
749 D.F(
750 "disconnecting block %d (%v) %s",
751 currentBlock.Height, currentBlock.Hash,
752 )
753 c.onBlockDisconnected(
754 ¤tBlock.Hash, currentBlock.Height, currentHeader.Timestamp,
755 )
756 currentBlock.Height--
757 // Now we fast-forward to the new block, notifying along the way.
758 for blocksToNotify.Front() != nil {
759 nextBlock := blocksToNotify.Front().Value.(*wire.Block)
760 nextHeight := currentBlock.Height + 1
761 nextHash := nextBlock.BlockHash()
762 nextHeader, e := c.GetBlockHeader(&nextHash)
763 if e != nil {
764 return e
765 }
766 _, e = c.filterBlock(nextBlock, nextHeight, true)
767 if e != nil {
768 return e
769 }
770 currentBlock.Height = nextHeight
771 currentBlock.Hash = nextHash
772 currentBlock.Timestamp = nextHeader.Timestamp
773 blocksToNotify.Remove(blocksToNotify.Front())
774 }
775 c.bestBlockMtx.Lock()
776 c.bestBlock = currentBlock
777 c.bestBlockMtx.Unlock()
778 return nil
779 }
780 781 // FilterBlocks scans the blocks contained in the FilterBlocksRequest for any addresses of interest. Each block will be
782 // fetched and filtered sequentially, returning a FilterBlocksResponse for the first block containing a matching
783 // address. If no matches are found in the range of blocks requested, the returned response will be nil.
784 //
785 // NOTE: This is part of the chainclient.Interface interface.
786 func (c *BitcoindClient) FilterBlocks(
787 req *FilterBlocksRequest,
788 ) (*FilterBlocksResponse, error) {
789 blockFilterer := NewBlockFilterer(c.chainParams, req)
790 // Iterate over the requested blocks, fetching each from the rpc client. Each block will scanned using the reverse
791 // addresses indexes generated above, breaking out early if any addresses are found.
792 for i, block := range req.Blocks {
793 // TODO(conner): add prefetching, since we already know we'll be
794 // fetching *every* block
795 rawBlock, e := c.GetBlock(&block.Hash)
796 if e != nil {
797 return nil, e
798 }
799 if !blockFilterer.FilterBlock(rawBlock) {
800 continue
801 }
802 // If any external or internal addresses were detected in this block, we return them to the caller so that the
803 // rescan windows can widened with subsequent addresses. The `BatchIndex` is returned so that the caller can
804 // compute the *next* block from which to begin again.
805 resp := &FilterBlocksResponse{
806 BatchIndex: uint32(i),
807 BlockMeta: block,
808 FoundExternalAddrs: blockFilterer.FoundExternal,
809 FoundInternalAddrs: blockFilterer.FoundInternal,
810 FoundOutPoints: blockFilterer.FoundOutPoints,
811 RelevantTxns: blockFilterer.RelevantTxns,
812 }
813 return resp, nil
814 }
815 // No addresses were found for this range.
816 return nil, nil
817 }
818 819 // rescan performs a rescan of the chain using a bitcoind backend, from the specified hash to the best known hash, while
820 // watching out for reorgs that happen during the rescan. It uses the addresses and outputs being tracked by the client
821 // in the watch list. This is called only within a queue processing loop.
822 func (c *BitcoindClient) rescan(start chainhash.Hash) (e error) {
823 I.Ln("starting rescan from block", start)
824 // We start by getting the best already processed block. We only use the height, as the hash can change during a
825 // reorganization, which we catch by testing connectivity from known blocks to the previous block.
826 bestHash, bestHeight, e := c.GetBestBlock()
827 if e != nil {
828 return e
829 }
830 bestHeader, e := c.GetBlockHeaderVerbose(bestHash)
831 if e != nil {
832 return e
833 }
834 bestBlock := am.BlockStamp{
835 Hash: *bestHash,
836 Height: bestHeight,
837 Timestamp: time.Unix(bestHeader.Time, 0),
838 }
839 // Create a list of headers sorted in forward order. We'll use this in the event that we need to backtrack due to a
840 // chain reorg.
841 headers := list.New()
842 previousHeader, e := c.GetBlockHeaderVerbose(&start)
843 if e != nil {
844 return e
845 }
846 previousHash, e := chainhash.NewHashFromStr(previousHeader.Hash)
847 if e != nil {
848 return e
849 }
850 headers.PushBack(previousHeader)
851 // Queue a RescanFinished notification to the caller with the last block processed throughout the rescan once done.
852 defer c.onRescanFinished(
853 previousHash, previousHeader.Height,
854 time.Unix(previousHeader.Time, 0),
855 )
856 // Cycle through all of the blocks known to bitcoind, being mindful of reorgs.
857 for i := previousHeader.Height + 1; i <= bestBlock.Height; i++ {
858 var hash *chainhash.Hash
859 hash, e = c.GetBlockHash(int64(i))
860 if e != nil {
861 return e
862 }
863 // If the previous header is before the wallet birthday, fetch the current header and construct a dummy block,
864 // rather than fetching the whole block itself. This speeds things up as we no longer have to fetch the whole
865 // block when we know it won't match any of our filters.
866 var block *wire.Block
867 afterBirthday := previousHeader.Time >= c.birthday.Unix()
868 if !afterBirthday {
869 var header *wire.BlockHeader
870 header, e = c.GetBlockHeader(hash)
871 if e != nil {
872 return e
873 }
874 block = &wire.Block{
875 Header: *header,
876 }
877 afterBirthday = c.birthday.Before(header.Timestamp)
878 if afterBirthday {
879 c.onRescanProgress(
880 previousHash, i,
881 block.Header.Timestamp,
882 )
883 }
884 }
885 if afterBirthday {
886 block, e = c.GetBlock(hash)
887 if e != nil {
888 return e
889 }
890 }
891 for block.Header.PrevBlock.String() != previousHeader.Hash {
892 // If we're in this for loop, it looks like we've been reorganized. We now walk backwards to the common
893 // ancestor between the best chain and the known chain.
894 //
895 // First, we signal a disconnected block to rewind the rescan state.
896 c.onBlockDisconnected(
897 previousHash, previousHeader.Height,
898 time.Unix(previousHeader.Time, 0),
899 )
900 // Get the previous block of the best chain.
901 hash, e = c.GetBlockHash(int64(i - 1))
902 if e != nil {
903 return e
904 }
905 block, e = c.GetBlock(hash)
906 if e != nil {
907 return e
908 }
909 // Then, we'll the get the header of this previous block.
910 if headers.Back() != nil {
911 // If it's already in the headers list, we can just get it from there and remove the current hash.
912 headers.Remove(headers.Back())
913 if headers.Back() != nil {
914 previousHeader = headers.Back().
915 Value.(*btcjson.GetBlockHeaderVerboseResult)
916 previousHash, e = chainhash.NewHashFromStr(
917 previousHeader.Hash,
918 )
919 if e != nil {
920 return e
921 }
922 }
923 } else {
924 // Otherwise, we get it from bitcoind.
925 previousHash, e = chainhash.NewHashFromStr(
926 previousHeader.PreviousHash,
927 )
928 if e != nil {
929 return e
930 }
931 previousHeader, e = c.GetBlockHeaderVerbose(
932 previousHash,
933 )
934 if e != nil {
935 return e
936 }
937 }
938 }
939 // Now that we've ensured we haven't come across a reorg, we'll add the current block header to our list of
940 // headers.
941 blockHash := block.BlockHash()
942 previousHash = &blockHash
943 previousHeader = &btcjson.GetBlockHeaderVerboseResult{
944 Hash: blockHash.String(),
945 Height: i,
946 PreviousHash: block.Header.PrevBlock.String(),
947 Time: block.Header.Timestamp.Unix(),
948 }
949 headers.PushBack(previousHeader)
950 // Notify the block and any of its relevant transactions.
951 if _, e = c.filterBlock(block, i, true); E.Chk(e) {
952 return e
953 }
954 if i%10000 == 0 {
955 c.onRescanProgress(
956 previousHash, i, block.Header.Timestamp,
957 )
958 }
959 // If we've reached the previously best known block, check to make sure the underlying node hasn't synchronized
960 // additional blocks. If it has, update the best known block and continue to rescan to that point.
961 if i == bestBlock.Height {
962 bestHash, bestHeight, e = c.GetBestBlock()
963 if e != nil {
964 return e
965 }
966 bestHeader, e = c.GetBlockHeaderVerbose(bestHash)
967 if e != nil {
968 return e
969 }
970 bestBlock.Hash = *bestHash
971 bestBlock.Height = bestHeight
972 bestBlock.Timestamp = time.Unix(bestHeader.Time, 0)
973 }
974 }
975 return nil
976 }
977 978 // filterBlock filters a block for watched outpoints and addresses, and returns any matching transactions, sending
979 // notifications along the way.
980 func (c *BitcoindClient) filterBlock(
981 block *wire.Block, height int32,
982 notify bool,
983 ) ([]*tm.TxRecord, error) {
984 // If this block happened before the client's birthday, then we'll skip it entirely.
985 if block.Header.Timestamp.Before(c.birthday) {
986 return nil, nil
987 }
988 if c.shouldNotifyBlocks() {
989 D.F(
990 "filtering block %d (%s) with %d transactions %s",
991 height, block.BlockHash(), len(block.Transactions),
992 )
993 }
994 // Create a block details template to use for all of the confirmed transactions found within this block.
995 blockHash := block.BlockHash()
996 blockDetails := &btcjson.BlockDetails{
997 Hash: blockHash.String(),
998 Height: height,
999 Time: block.Header.Timestamp.Unix(),
1000 }
1001 // Now, we'll through all of the transactions in the block keeping track of any relevant to the caller.
1002 var relevantTxs []*tm.TxRecord
1003 confirmedTxs := make(map[chainhash.Hash]struct{})
1004 for i, tx := range block.Transactions {
1005 // Update the index in the block details with the index of this
1006 // transaction.
1007 blockDetails.Index = i
1008 isRelevant, rec, e := c.filterTx(tx, blockDetails, notify)
1009 if e != nil {
1010 W.F(
1011 "Unable to filter transaction %v: %v",
1012 tx.TxHash(), e,
1013 )
1014 continue
1015 }
1016 if isRelevant {
1017 relevantTxs = append(relevantTxs, rec)
1018 confirmedTxs[tx.TxHash()] = struct{}{}
1019 }
1020 }
1021 // Update the expiration map by setting the block's confirmed transactions and deleting any in the mempool that were
1022 // confirmed over 288 blocks ago.
1023 c.watchMtx.Lock()
1024 c.expiredMempool[height] = confirmedTxs
1025 if oldBlock, ok := c.expiredMempool[height-288]; ok {
1026 for txHash := range oldBlock {
1027 delete(c.mempool, txHash)
1028 }
1029 delete(c.expiredMempool, height-288)
1030 }
1031 c.watchMtx.Unlock()
1032 if notify {
1033 c.onFilteredBlockConnected(height, &block.Header, relevantTxs)
1034 c.onBlockConnected(&blockHash, height, block.Header.Timestamp)
1035 }
1036 return relevantTxs, nil
1037 }
1038 1039 // filterTx determines whether a transaction is relevant to the client by inspecting the client's different filters.
1040 func (c *BitcoindClient) filterTx(
1041 tx *wire.MsgTx,
1042 blockDetails *btcjson.BlockDetails,
1043 notify bool,
1044 ) (bool, *tm.TxRecord, error) {
1045 txDetails := util.NewTx(tx)
1046 if blockDetails != nil {
1047 txDetails.SetIndex(blockDetails.Index)
1048 }
1049 rec, e := tm.NewTxRecordFromMsgTx(txDetails.MsgTx(), time.Now())
1050 if e != nil {
1051 E.Ln(
1052 "Cannot create transaction record for relevant tx:", e,
1053 )
1054 return false, nil, e
1055 }
1056 if blockDetails != nil {
1057 rec.Received = time.Unix(blockDetails.Time, 0)
1058 }
1059 // We'll begin the filtering process by holding the lock to ensure we match exactly against what's currently in the
1060 // filters.
1061 c.watchMtx.Lock()
1062 defer c.watchMtx.Unlock()
1063 // If we've already seen this transaction and it's now been confirmed, then we'll shortcut the filter process by
1064 // immediately sending a notification to the caller that the filter matches.
1065 if _, ok := c.mempool[tx.TxHash()]; ok {
1066 if notify && blockDetails != nil {
1067 c.onRelevantTx(rec, blockDetails)
1068 }
1069 return true, rec, nil
1070 }
1071 // Otherwise, this is a new transaction we have yet to see. We'll need to determine if this transaction is somehow
1072 // relevant to the caller.
1073 var isRelevant bool
1074 // We'll start by cycling through its outputs to determine if it pays to any of the currently watched addresses. If
1075 // an output matches, we'll add it to our watch list.
1076 for i, out := range tx.TxOut {
1077 var addrs []btcaddr.Address
1078 _, addrs, _, e = txscript.ExtractPkScriptAddrs(
1079 out.PkScript, c.chainParams,
1080 )
1081 if e != nil {
1082 D.F(
1083 "Unable to parse output script in %s:%d: %v %s",
1084 tx.TxHash(), i, e,
1085 )
1086 continue
1087 }
1088 for _, addr := range addrs {
1089 if _, ok := c.watchedAddresses[addr.String()]; ok {
1090 isRelevant = true
1091 op := wire.OutPoint{
1092 Hash: tx.TxHash(),
1093 Index: uint32(i),
1094 }
1095 c.watchedOutPoints[op] = struct{}{}
1096 }
1097 }
1098 }
1099 // If the transaction didn't pay to any of our watched addresses, we'll check if we're currently watching for the
1100 // hash of this transaction.
1101 if !isRelevant {
1102 if _, ok := c.watchedTxs[tx.TxHash()]; ok {
1103 isRelevant = true
1104 }
1105 }
1106 // If the transaction didn't pay to any of our watched hashes, we'll check if it spends any of our watched
1107 // outpoints.
1108 if !isRelevant {
1109 for _, in := range tx.TxIn {
1110 if _, ok := c.watchedOutPoints[in.PreviousOutPoint]; ok {
1111 isRelevant = true
1112 break
1113 }
1114 }
1115 }
1116 // If the transaction is not relevant to us, we can simply exit.
1117 if !isRelevant {
1118 return false, rec, nil
1119 }
1120 // Otherwise, the transaction matched our filters, so we should dispatch a notification for it. If it's still
1121 // unconfirmed, we'll include it in our mempool so that it can also be notified as part of FilteredBlockConnected
1122 // once it confirms.
1123 if blockDetails == nil {
1124 c.mempool[tx.TxHash()] = struct{}{}
1125 }
1126 c.onRelevantTx(rec, blockDetails)
1127 return true, rec, nil
1128 }
1129