bitcoind_client.go raw

   1  package chainclient
   2  
   3  import (
   4  	"container/list"
   5  	"encoding/hex"
   6  	"errors"
   7  	"fmt"
   8  	"github.com/p9c/p9/pkg/btcaddr"
   9  	"github.com/p9c/p9/pkg/chaincfg"
  10  	"sync"
  11  	"sync/atomic"
  12  	"time"
  13  	
  14  	"github.com/p9c/p9/pkg/qu"
  15  	
  16  	"github.com/p9c/p9/pkg/btcjson"
  17  	"github.com/p9c/p9/pkg/chainhash"
  18  	"github.com/p9c/p9/pkg/txscript"
  19  	"github.com/p9c/p9/pkg/util"
  20  	am "github.com/p9c/p9/pkg/waddrmgr"
  21  	"github.com/p9c/p9/pkg/wire"
  22  	tm "github.com/p9c/p9/pkg/wtxmgr"
  23  )
  24  
  25  var (
  26  	// ErrBitcoindClientShuttingDown is an error returned when we attempt to receive a notification for a specific item
  27  	// and the bitcoind client is in the middle of shutting down.
  28  	ErrBitcoindClientShuttingDown = errors.New("client is shutting down")
  29  )
  30  
  31  // BitcoindClient represents a persistent client connection to a bitcoind server for information regarding the current
  32  // best block chain.
  33  type BitcoindClient struct {
  34  	started int32 // To be used atomically.
  35  	stopped int32 // To be used atomically.
  36  	// birthday is the earliest time for which we should begin scanning the chain.
  37  	birthday time.Time
  38  	// chainParams are the parameters of the current chain this client is active under.
  39  	chainParams *chaincfg.Params
  40  	// id is the unique ID of this client assigned by the backing bitcoind connection.
  41  	id uint64
  42  	// chainConn is the backing client to our rescan client that contains the RPC and ZMQ connections to a bitcoind
  43  	// node.
  44  	chainConn *BitcoindConn
  45  	// bestBlock keeps track of the tip of the current best chain.
  46  	bestBlockMtx sync.RWMutex
  47  	bestBlock    am.BlockStamp
  48  	// notifyBlocks signals whether the client is sending block notifications to the caller.
  49  	notifyBlocks uint32
  50  	// rescanUpdate is a channel will be sent items that we should match transactions against while processing a chain
  51  	// rescan to determine if they are relevant to the client.
  52  	rescanUpdate chan interface{}
  53  	// watchedAddresses, watchedOutPoints, and watchedTxs are the set of items we should match transactions against
  54  	// while processing a chain rescan to determine if they are relevant to the client.
  55  	watchMtx         sync.RWMutex
  56  	watchedAddresses map[string]struct{}
  57  	watchedOutPoints map[wire.OutPoint]struct{}
  58  	watchedTxs       map[chainhash.Hash]struct{}
  59  	// mempool keeps track of all relevant transactions that have yet to be confirmed. This is used to shortcut the
  60  	// filtering process of a transaction when a new confirmed transaction notification is received.
  61  	//
  62  	// NOTE: This requires the watchMtx to be held.
  63  	mempool map[chainhash.Hash]struct{}
  64  	// expiredMempool keeps track of a set of confirmed transactions along with the height at which they were included
  65  	// in a block. These transactions will then be removed from the mempool after a period of 288 blocks. This is done
  66  	// to ensure the transactions are safe from a reorg in the chain.
  67  	//
  68  	// NOTE: This requires the watchMtx to be held.
  69  	expiredMempool map[int32]map[chainhash.Hash]struct{}
  70  	// notificationQueue is a concurrent unbounded queue that handles dispatching notifications to the subscriber of
  71  	// this client.
  72  	//
  73  	// TODO: Rather than leaving this as an unbounded queue for all types of notifications, try dropping ones where a
  74  	//  later enqueued notification can fully invalidate one waiting to be processed. For example, BlockConnected
  75  	//  notifications for greater block heights can remove the need to process earlier notifications still waiting to be
  76  	//  processed.
  77  	notificationQueue *ConcurrentQueue
  78  	// zmqTxNtfns is a channel through which ZMQ transaction events will be retrieved from the backing bitcoind
  79  	// connection.
  80  	zmqTxNtfns chan *wire.MsgTx
  81  	// zmqBlockNtfns is a channel through which ZMQ block events will be retrieved from the backing bitcoind connection.
  82  	zmqBlockNtfns chan *wire.Block
  83  	quit          qu.C
  84  	wg            sync.WaitGroup
  85  }
  86  
  87  // A compile-time check to ensure that BitcoindClient satisfies the chainclient.Interface interface.
  88  var _ Interface = (*BitcoindClient)(nil)
  89  
  90  // BackEnd returns the name of the driver.
  91  func (c *BitcoindClient) BackEnd() string {
  92  	return "bitcoind"
  93  }
  94  
  95  // GetBestBlock returns the highest block known to bitcoind.
  96  func (c *BitcoindClient) GetBestBlock() (*chainhash.Hash, int32, error) {
  97  	bcInfo, e := c.chainConn.client.GetBlockChainInfo()
  98  	if e != nil {
  99  		return nil, 0, e
 100  	}
 101  	hash, e := chainhash.NewHashFromStr(bcInfo.BestBlockHash)
 102  	if e != nil {
 103  		return nil, 0, e
 104  	}
 105  	return hash, bcInfo.Blocks, nil
 106  }
 107  
 108  // GetBlockHeight returns the height for the hash, if known, or returns an error.
 109  func (c *BitcoindClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) {
 110  	header, e := c.chainConn.client.GetBlockHeaderVerbose(hash)
 111  	if e != nil {
 112  		return 0, e
 113  	}
 114  	return header.Height, nil
 115  }
 116  
 117  // GetBlock returns a block from the hash.
 118  func (c *BitcoindClient) GetBlock(hash *chainhash.Hash) (*wire.Block, error) {
 119  	return c.chainConn.client.GetBlock(hash)
 120  }
 121  
 122  // GetBlockVerbose returns a verbose block from the hash.
 123  func (c *BitcoindClient) GetBlockVerbose(
 124  	hash *chainhash.Hash,
 125  ) (*btcjson.GetBlockVerboseResult, error) {
 126  	return c.chainConn.client.GetBlockVerbose(hash)
 127  }
 128  
 129  // GetBlockHash returns a block hash from the height.
 130  func (c *BitcoindClient) GetBlockHash(height int64) (*chainhash.Hash, error) {
 131  	return c.chainConn.client.GetBlockHash(height)
 132  }
 133  
 134  // GetBlockHeader returns a block header from the hash.
 135  func (c *BitcoindClient) GetBlockHeader(
 136  	hash *chainhash.Hash,
 137  ) (*wire.BlockHeader, error) {
 138  	return c.chainConn.client.GetBlockHeader(hash)
 139  }
 140  
 141  // GetBlockHeaderVerbose returns a block header from the hash.
 142  func (c *BitcoindClient) GetBlockHeaderVerbose(
 143  	hash *chainhash.Hash,
 144  ) (*btcjson.GetBlockHeaderVerboseResult, error) {
 145  	return c.chainConn.client.GetBlockHeaderVerbose(hash)
 146  }
 147  
 148  // GetRawTransactionVerbose returns a transaction from the tx hash.
 149  func (c *BitcoindClient) GetRawTransactionVerbose(
 150  	hash *chainhash.Hash,
 151  ) (*btcjson.TxRawResult, error) {
 152  	return c.chainConn.client.GetRawTransactionVerbose(hash)
 153  }
 154  
 155  // GetTxOut returns a txout from the outpoint info provided.
 156  func (c *BitcoindClient) GetTxOut(
 157  	txHash *chainhash.Hash, index uint32,
 158  	mempool bool,
 159  ) (*btcjson.GetTxOutResult, error) {
 160  	return c.chainConn.client.GetTxOut(txHash, index, mempool)
 161  }
 162  
 163  // SendRawTransaction sends a raw transaction via bitcoind.
 164  func (c *BitcoindClient) SendRawTransaction(
 165  	tx *wire.MsgTx,
 166  	allowHighFees bool,
 167  ) (*chainhash.Hash, error) {
 168  	return c.chainConn.client.SendRawTransaction(tx, allowHighFees)
 169  }
 170  
 171  // Notifications returns a channel to retrieve notifications from.
 172  //
 173  // NOTE: This is part of the chainclient.Interface interface.
 174  func (c *BitcoindClient) Notifications() <-chan interface{} {
 175  	return c.notificationQueue.ChanOut()
 176  }
 177  
 178  // NotifyReceived allows the chain backend to notify the caller whenever a transaction pays to any of the given
 179  // addresses.
 180  //
 181  // NOTE: This is part of the chainclient.Interface interface.
 182  func (c *BitcoindClient) NotifyReceived(addrs []btcaddr.Address) (e error) {
 183  	if e = c.NotifyBlocks(); E.Chk(e) {
 184  	}
 185  	select {
 186  	case c.rescanUpdate <- addrs:
 187  	case <-c.quit.Wait():
 188  		return ErrBitcoindClientShuttingDown
 189  	}
 190  	return nil
 191  }
 192  
 193  // NotifySpent allows the chain backend to notify the caller whenever a transaction spends any of the given outpoints.
 194  func (c *BitcoindClient) NotifySpent(outPoints []*wire.OutPoint) (e error) {
 195  	if e = c.NotifyBlocks(); E.Chk(e) {
 196  	}
 197  	select {
 198  	case c.rescanUpdate <- outPoints:
 199  	case <-c.quit.Wait():
 200  		return ErrBitcoindClientShuttingDown
 201  	}
 202  	return nil
 203  }
 204  
 205  // NotifyTx allows the chain backend to notify the caller whenever any of the given transactions confirm within the
 206  // chain.
 207  func (c *BitcoindClient) NotifyTx(txids []chainhash.Hash) (e error) {
 208  	if e = c.NotifyBlocks(); E.Chk(e) {
 209  	}
 210  	select {
 211  	case c.rescanUpdate <- txids:
 212  	case <-c.quit.Wait():
 213  		return ErrBitcoindClientShuttingDown
 214  	}
 215  	return nil
 216  }
 217  
 218  // NotifyBlocks allows the chain backend to notify the caller whenever a block is connected or disconnected.
 219  //
 220  // NOTE: This is part of the chainclient.Interface interface.
 221  func (c *BitcoindClient) NotifyBlocks() (e error) {
 222  	atomic.StoreUint32(&c.notifyBlocks, 1)
 223  	return nil
 224  }
 225  
 226  // shouldNotifyBlocks determines whether the client should send block notifications to the caller.
 227  func (c *BitcoindClient) shouldNotifyBlocks() bool {
 228  	return atomic.LoadUint32(&c.notifyBlocks) == 1
 229  }
 230  
 231  // LoadTxFilter uses the given filters to what we should match transactions against to determine if they are relevant to
 232  // the client. The reset argument is used to reset the current filters.
 233  //
 234  // The current filters supported are of the following types:
 235  //	[]util.Address
 236  //	[]wire.OutPoint
 237  //	[]*wire.OutPoint
 238  //	map[wire.OutPoint]util.Address
 239  //	[]chainhash.Hash
 240  //	[]*chainhash.Hash
 241  func (c *BitcoindClient) LoadTxFilter(reset bool, filters ...interface{}) (e error) {
 242  	if reset {
 243  		select {
 244  		case c.rescanUpdate <- struct{}{}:
 245  		case <-c.quit.Wait():
 246  			return ErrBitcoindClientShuttingDown
 247  		}
 248  	}
 249  	updateFilter := func(filter interface{}) (e error) {
 250  		select {
 251  		case c.rescanUpdate <- filter:
 252  		case <-c.quit.Wait():
 253  			return ErrBitcoindClientShuttingDown
 254  		}
 255  		return nil
 256  	}
 257  	// In order to make this operation atomic, we'll iterate through the filters twice: the first to ensure there aren't
 258  	// any unsupported filter types, and the second to actually update our filters.
 259  	for _, filter := range filters {
 260  		switch filter := filter.(type) {
 261  		case []btcaddr.Address, []wire.OutPoint, []*wire.OutPoint,
 262  			map[wire.OutPoint]btcaddr.Address, []chainhash.Hash,
 263  			[]*chainhash.Hash:
 264  			// Proceed to check the next filter type.
 265  		default:
 266  			return fmt.Errorf("unsupported filter type %Ter", filter)
 267  		}
 268  	}
 269  	for _, filter := range filters {
 270  		if e := updateFilter(filter); E.Chk(e) {
 271  			return e
 272  		}
 273  	}
 274  	return nil
 275  }
 276  
 277  // RescanBlocks rescans any blocks passed, returning only the blocks that matched as []json.BlockDetails.
 278  func (c *BitcoindClient) RescanBlocks(
 279  	blockHashes []chainhash.Hash,
 280  ) ([]btcjson.RescannedBlock, error) {
 281  	rescannedBlocks := make([]btcjson.RescannedBlock, 0, len(blockHashes))
 282  	for _, hash := range blockHashes {
 283  		header, e := c.GetBlockHeaderVerbose(&hash)
 284  		if e != nil {
 285  			E.F(
 286  				"unable to get header %s from bitcoind: %s",
 287  				hash, e,
 288  			)
 289  			continue
 290  		}
 291  		block, e := c.GetBlock(&hash)
 292  		if e != nil {
 293  			E.F(
 294  				"unable to get block %s from bitcoind: %s",
 295  				hash, e,
 296  			)
 297  			continue
 298  		}
 299  		relevantTxs, e := c.filterBlock(block, header.Height, false)
 300  		if e != nil {
 301  		}
 302  		if len(relevantTxs) > 0 {
 303  			rescannedBlock := btcjson.RescannedBlock{
 304  				Hash: hash.String(),
 305  			}
 306  			for _, tx := range relevantTxs {
 307  				rescannedBlock.Transactions = append(
 308  					rescannedBlock.Transactions,
 309  					hex.EncodeToString(tx.SerializedTx),
 310  				)
 311  			}
 312  			rescannedBlocks = append(rescannedBlocks, rescannedBlock)
 313  		}
 314  	}
 315  	return rescannedBlocks, nil
 316  }
 317  
 318  // Rescan rescans from the block with the given hash until the current block, after adding the passed addresses and
 319  // outpoints to the client's watch list.
 320  func (c *BitcoindClient) Rescan(
 321  	blockHash *chainhash.Hash,
 322  	addresses []btcaddr.Address, outPoints map[wire.OutPoint]btcaddr.Address,
 323  ) (e error) {
 324  	// A block hash is required to use as the starting point of the rescan.
 325  	if blockHash == nil {
 326  		return errors.New("rescan requires a starting block hash")
 327  	}
 328  	// We'll then update our filters with the given outpoints and addresses.
 329  	select {
 330  	case c.rescanUpdate <- addresses:
 331  	case <-c.quit.Wait():
 332  		return ErrBitcoindClientShuttingDown
 333  	}
 334  	select {
 335  	case c.rescanUpdate <- outPoints:
 336  	case <-c.quit.Wait():
 337  		return ErrBitcoindClientShuttingDown
 338  	}
 339  	// Once the filters have been updated, we can begin the rescan.
 340  	select {
 341  	case c.rescanUpdate <- *blockHash:
 342  	case <-c.quit.Wait():
 343  		return ErrBitcoindClientShuttingDown
 344  	}
 345  	return nil
 346  }
 347  
 348  // Start initializes the bitcoind rescan client using the backing bitcoind connection and starts all goroutines
 349  // necessary in order to process rescans and ZMQ notifications.
 350  //
 351  // NOTE: This is part of the chainclient.Interface interface.
 352  func (c *BitcoindClient) Start() (e error) {
 353  	if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
 354  		return nil
 355  	}
 356  	// Start the notification queue and immediately dispatch a ClientConnected notification to the caller. This is
 357  	// needed as some of the callers will require this notification before proceeding.
 358  	c.notificationQueue.Start()
 359  	c.notificationQueue.ChanIn() <- ClientConnected{}
 360  	// Retrieve the best block of the chain.
 361  	bestHash, bestHeight, e := c.GetBestBlock()
 362  	if e != nil {
 363  		return fmt.Errorf("unable to retrieve best block: %v", e)
 364  	}
 365  	bestHeader, e := c.GetBlockHeaderVerbose(bestHash)
 366  	if e != nil {
 367  		return fmt.Errorf(
 368  			"unable to retrieve header for best block: "+
 369  				"%v", e,
 370  		)
 371  	}
 372  	c.bestBlockMtx.Lock()
 373  	c.bestBlock = am.BlockStamp{
 374  		Hash:      *bestHash,
 375  		Height:    bestHeight,
 376  		Timestamp: time.Unix(bestHeader.Time, 0),
 377  	}
 378  	c.bestBlockMtx.Unlock()
 379  	// Once the client has started successfully, we'll include it in the set of rescan clients of the backing bitcoind
 380  	// connection in order to received ZMQ event notifications.
 381  	c.chainConn.AddClient(c)
 382  	c.wg.Add(2)
 383  	go c.rescanHandler()
 384  	go c.ntfnHandler()
 385  	return nil
 386  }
 387  
 388  // Stop stops the bitcoind rescan client from processing rescans and ZMQ notifications.
 389  //
 390  // NOTE: This is part of the chainclient.Interface interface.
 391  func (c *BitcoindClient) Stop() {
 392  	if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
 393  		return
 394  	}
 395  	c.quit.Q()
 396  	// Remove this client's reference from the bitcoind connection to prevent sending notifications to it after it's
 397  	// been stopped.
 398  	c.chainConn.RemoveClient(c.id)
 399  	c.notificationQueue.Stop()
 400  }
 401  
 402  // WaitForShutdown blocks until the client has finished disconnecting and all handlers have exited.
 403  //
 404  // NOTE: This is part of the chainclient.Interface interface.
 405  func (c *BitcoindClient) WaitForShutdown() {
 406  	c.wg.Wait()
 407  }
 408  
 409  // rescanHandler handles the logic needed for the caller to trigger a chain rescan.
 410  //
 411  // NOTE: This must be called as a goroutine.
 412  func (c *BitcoindClient) rescanHandler() {
 413  	defer c.wg.Done()
 414  	for {
 415  		select {
 416  		case update := <-c.rescanUpdate:
 417  			switch update := update.(type) {
 418  			// We're clearing the filters.
 419  			case struct{}:
 420  				c.watchMtx.Lock()
 421  				c.watchedOutPoints = make(map[wire.OutPoint]struct{})
 422  				c.watchedAddresses = make(map[string]struct{})
 423  				c.watchedTxs = make(map[chainhash.Hash]struct{})
 424  				c.watchMtx.Unlock()
 425  			// We're adding the addresses to our filter.
 426  			case []btcaddr.Address:
 427  				c.watchMtx.Lock()
 428  				for _, addr := range update {
 429  					c.watchedAddresses[addr.String()] = struct{}{}
 430  				}
 431  				c.watchMtx.Unlock()
 432  			// We're adding the outpoints to our filter.
 433  			case []wire.OutPoint:
 434  				c.watchMtx.Lock()
 435  				for _, op := range update {
 436  					c.watchedOutPoints[op] = struct{}{}
 437  				}
 438  				c.watchMtx.Unlock()
 439  			case []*wire.OutPoint:
 440  				c.watchMtx.Lock()
 441  				for _, op := range update {
 442  					c.watchedOutPoints[*op] = struct{}{}
 443  				}
 444  				c.watchMtx.Unlock()
 445  			// We're adding the outpoints that map to the scripts that we should scan for to our filter.
 446  			case map[wire.OutPoint]btcaddr.Address:
 447  				c.watchMtx.Lock()
 448  				for op := range update {
 449  					c.watchedOutPoints[op] = struct{}{}
 450  				}
 451  				c.watchMtx.Unlock()
 452  			// We're adding the transactions to our filter.
 453  			case []chainhash.Hash:
 454  				c.watchMtx.Lock()
 455  				for _, txid := range update {
 456  					c.watchedTxs[txid] = struct{}{}
 457  				}
 458  				c.watchMtx.Unlock()
 459  			case []*chainhash.Hash:
 460  				c.watchMtx.Lock()
 461  				for _, txid := range update {
 462  					c.watchedTxs[*txid] = struct{}{}
 463  				}
 464  				c.watchMtx.Unlock()
 465  			// We're starting a rescan from the hash.
 466  			case chainhash.Hash:
 467  				if e := c.rescan(update); E.Chk(e) {
 468  					E.Ln(
 469  						"unable to complete chain rescan:", e,
 470  					)
 471  				}
 472  			default:
 473  				W.F(
 474  					"received unexpected filter type %Ter", update,
 475  				)
 476  			}
 477  		case <-c.quit.Wait():
 478  			return
 479  		}
 480  	}
 481  }
 482  
 483  // ntfnHandler handles the logic to retrieve ZMQ notifications from the backing bitcoind connection.
 484  //
 485  // NOTE: This must be called as a goroutine.
 486  func (c *BitcoindClient) ntfnHandler() {
 487  	defer c.wg.Done()
 488  	for {
 489  		select {
 490  		case tx := <-c.zmqTxNtfns:
 491  			var e error
 492  			if _, _, e = c.filterTx(tx, nil, true); E.Chk(e) {
 493  				E.F(
 494  					"unable to filter transaction %v: %v %s",
 495  					tx.TxHash(), e,
 496  				)
 497  			}
 498  		case newBlock := <-c.zmqBlockNtfns:
 499  			// If the new block's previous hash matches the best hash known to us, then the new block is the next
 500  			// successor, so we'll update our best block to reflect this and determine if this new block matches any of
 501  			// our existing filters.
 502  			c.bestBlockMtx.Lock()
 503  			bestBlock := c.bestBlock
 504  			c.bestBlockMtx.Unlock()
 505  			if newBlock.Header.PrevBlock == bestBlock.Hash {
 506  				newBlockHeight := bestBlock.Height + 1
 507  				_, e := c.filterBlock(
 508  					newBlock, newBlockHeight, true,
 509  				)
 510  				if e != nil {
 511  					E.F(
 512  						"unable to filter block %v: %v",
 513  						newBlock.BlockHash(), e,
 514  					)
 515  					continue
 516  				}
 517  				// With the block successfully filtered, we'll make it our new best block.
 518  				bestBlock.Hash = newBlock.BlockHash()
 519  				bestBlock.Height = newBlockHeight
 520  				bestBlock.Timestamp = newBlock.Header.Timestamp
 521  				c.bestBlockMtx.Lock()
 522  				c.bestBlock = bestBlock
 523  				c.bestBlockMtx.Unlock()
 524  				continue
 525  			}
 526  			// Otherwise, we've encountered a reorg.
 527  			if e := c.reorg(bestBlock, newBlock); E.Chk(e) {
 528  				E.F(
 529  					"unable to process chain reorg:", e,
 530  				)
 531  			}
 532  		case <-c.quit.Wait():
 533  			return
 534  		}
 535  	}
 536  }
 537  
 538  // SetBirthday sets the birthday of the bitcoind rescan client.
 539  //
 540  // NOTE: This should be done before the client has been started in order for it to properly carry its duties.
 541  func (c *BitcoindClient) SetBirthday(t time.Time) {
 542  	c.birthday = t
 543  }
 544  
 545  // BlockStamp returns the latest block notified by the client, or an error if the client has been shut down.
 546  func (c *BitcoindClient) BlockStamp() (*am.BlockStamp, error) {
 547  	c.bestBlockMtx.RLock()
 548  	bestBlock := c.bestBlock
 549  	c.bestBlockMtx.RUnlock()
 550  	return &bestBlock, nil
 551  }
 552  
 553  // onBlockConnected is a callback that's executed whenever a new block has been detected. This will queue a
 554  // BlockConnected notification to the caller.
 555  func (c *BitcoindClient) onBlockConnected(
 556  	hash *chainhash.Hash, height int32,
 557  	timestamp time.Time,
 558  ) {
 559  	if c.shouldNotifyBlocks() {
 560  		select {
 561  		case c.notificationQueue.ChanIn() <- BlockConnected{
 562  			Block: tm.Block{
 563  				Hash:   *hash,
 564  				Height: height,
 565  			},
 566  			Time: timestamp,
 567  		}:
 568  		case <-c.quit.Wait():
 569  		}
 570  	}
 571  }
 572  
 573  // onFilteredBlockConnected is an alternative callback that's executed whenever a new block has been detected. It serves
 574  // the same purpose as onBlockConnected, but it also includes a list of the relevant transactions found within the block
 575  // being connected. This will queue a FilteredBlockConnected notification to the caller.
 576  func (c *BitcoindClient) onFilteredBlockConnected(
 577  	height int32,
 578  	header *wire.BlockHeader, relevantTxs []*tm.TxRecord,
 579  ) {
 580  	if c.shouldNotifyBlocks() {
 581  		select {
 582  		case c.notificationQueue.ChanIn() <- FilteredBlockConnected{
 583  			Block: &tm.BlockMeta{
 584  				Block: tm.Block{
 585  					Hash:   header.BlockHash(),
 586  					Height: height,
 587  				},
 588  				Time: header.Timestamp,
 589  			},
 590  			RelevantTxs: relevantTxs,
 591  		}:
 592  		case <-c.quit.Wait():
 593  		}
 594  	}
 595  }
 596  
 597  // onBlockDisconnected is a callback that's executed whenever a block has been disconnected. This will queue a
 598  // BlockDisconnected notification to the caller with the details of the block being disconnected.
 599  func (c *BitcoindClient) onBlockDisconnected(
 600  	hash *chainhash.Hash, height int32,
 601  	timestamp time.Time,
 602  ) {
 603  	if c.shouldNotifyBlocks() {
 604  		select {
 605  		case c.notificationQueue.ChanIn() <- BlockDisconnected{
 606  			Block: tm.Block{
 607  				Hash:   *hash,
 608  				Height: height,
 609  			},
 610  			Time: timestamp,
 611  		}:
 612  		case <-c.quit.Wait():
 613  		}
 614  	}
 615  }
 616  
 617  // onRelevantTx is a callback that's executed whenever a transaction is relevant to the caller. This means that the
 618  // transaction matched a specific item in the client's different filters. This will queue a RelevantTx notification to
 619  // the caller.
 620  func (c *BitcoindClient) onRelevantTx(
 621  	tx *tm.TxRecord,
 622  	blockDetails *btcjson.BlockDetails,
 623  ) {
 624  	block, e := parseBlock(blockDetails)
 625  	if e != nil {
 626  		E.Ln(
 627  			"unable to send onRelevantTx notification, failed parse block:",
 628  			e,
 629  		)
 630  		return
 631  	}
 632  	select {
 633  	case c.notificationQueue.ChanIn() <- RelevantTx{
 634  		TxRecord: tx,
 635  		Block:    block,
 636  	}:
 637  	case <-c.quit.Wait():
 638  	}
 639  }
 640  
 641  // onRescanProgress is a callback that's executed whenever a rescan is in progress. This will queue a RescanProgress
 642  // notification to the caller with the current rescan progress details.
 643  func (c *BitcoindClient) onRescanProgress(
 644  	hash *chainhash.Hash, height int32,
 645  	timestamp time.Time,
 646  ) {
 647  	select {
 648  	case c.notificationQueue.ChanIn() <- &RescanProgress{
 649  		Hash:   hash,
 650  		Height: height,
 651  		Time:   timestamp,
 652  	}:
 653  	case <-c.quit.Wait():
 654  	}
 655  }
 656  
 657  // onRescanFinished is a callback that's executed whenever a rescan has finished. This will queue a RescanFinished
 658  // notification to the caller with the details of the last block in the range of the rescan.
 659  func (c *BitcoindClient) onRescanFinished(
 660  	hash *chainhash.Hash, height int32,
 661  	timestamp time.Time,
 662  ) {
 663  	I.F(
 664  		"rescan finished at %d (%s)",
 665  		height, hash,
 666  	)
 667  	select {
 668  	case c.notificationQueue.ChanIn() <- &RescanFinished{
 669  		Hash:   hash,
 670  		Height: height,
 671  		Time:   timestamp,
 672  	}:
 673  	case <-c.quit.Wait():
 674  	}
 675  }
 676  
 677  // reorg processes a reorganization during chain synchronization. This is separate from a rescan's handling of a reorg.
 678  // This will rewind back until it finds a common ancestor and notify all the new blocks since then.
 679  func (c *BitcoindClient) reorg(
 680  	currentBlock am.BlockStamp,
 681  	reorgBlock *wire.Block,
 682  ) (e error) {
 683  	D.Ln("possible reorg at block", reorgBlock.BlockHash())
 684  	// Retrieve the best known height based on the block which caused the reorg. This way, we can preserve the chain of
 685  	// blocks we need to retrieve.
 686  	bestHash := reorgBlock.BlockHash()
 687  	var bestHeight int32
 688  	bestHeight, e = c.GetBlockHeight(&bestHash)
 689  	if e != nil {
 690  		return e
 691  	}
 692  	if bestHeight < currentBlock.Height {
 693  		D.Ln("detected multiple reorgs")
 694  		return nil
 695  	}
 696  	// We'll now keep track of all the blocks known to the *chain*, starting from the best block known to us until the
 697  	// best block in the chain. This will let us fast-forward despite any future reorgs.
 698  	blocksToNotify := list.New()
 699  	blocksToNotify.PushFront(reorgBlock)
 700  	previousBlock := reorgBlock.Header.PrevBlock
 701  	for i := bestHeight - 1; i >= currentBlock.Height; i-- {
 702  		var block *wire.Block
 703  		block, e = c.GetBlock(&previousBlock)
 704  		if e != nil {
 705  			return e
 706  		}
 707  		blocksToNotify.PushFront(block)
 708  		previousBlock = block.Header.PrevBlock
 709  	}
 710  	// Rewind back to the last common ancestor block using the previous block hash from each header to avoid any race
 711  	// conditions. If we encounter more reorgs, they'll be queued and we'll repeat the cycle.
 712  	//
 713  	// We'll start by retrieving the header to the best block known to us.
 714  	currentHeader, e := c.GetBlockHeader(&currentBlock.Hash)
 715  	if e != nil {
 716  		return e
 717  	}
 718  	// Then, we'll walk backwards in the chain until we find our common ancestor.
 719  	for previousBlock != currentHeader.PrevBlock {
 720  		// Since the previous hashes don't match, the current block has been reorged out of the chain, so we should send
 721  		// a BlockDisconnected notification for it.
 722  		D.F(
 723  			"disconnecting block %d (%v) %s",
 724  			currentBlock.Height,
 725  			currentBlock.Hash,
 726  		)
 727  		c.onBlockDisconnected(
 728  			&currentBlock.Hash, currentBlock.Height,
 729  			currentBlock.Timestamp,
 730  		)
 731  		// Our current block should now reflect the previous one to continue the common ancestor search.
 732  		currentHeader, e = c.GetBlockHeader(&currentHeader.PrevBlock)
 733  		if e != nil {
 734  			return e
 735  		}
 736  		currentBlock.Height--
 737  		currentBlock.Hash = currentHeader.PrevBlock
 738  		currentBlock.Timestamp = currentHeader.Timestamp
 739  		// Store the correct block in our list in order to notify it once we've found our common ancestor.
 740  		block, e := c.GetBlock(&previousBlock)
 741  		if e != nil {
 742  			return e
 743  		}
 744  		blocksToNotify.PushFront(block)
 745  		previousBlock = block.Header.PrevBlock
 746  	}
 747  	// Disconnect the last block from the old chain. Since the previous block remains the same between the old and new
 748  	// chains, the tip will now be the last common ancestor.
 749  	D.F(
 750  		"disconnecting block %d (%v) %s",
 751  		currentBlock.Height, currentBlock.Hash,
 752  	)
 753  	c.onBlockDisconnected(
 754  		&currentBlock.Hash, currentBlock.Height, currentHeader.Timestamp,
 755  	)
 756  	currentBlock.Height--
 757  	// Now we fast-forward to the new block, notifying along the way.
 758  	for blocksToNotify.Front() != nil {
 759  		nextBlock := blocksToNotify.Front().Value.(*wire.Block)
 760  		nextHeight := currentBlock.Height + 1
 761  		nextHash := nextBlock.BlockHash()
 762  		nextHeader, e := c.GetBlockHeader(&nextHash)
 763  		if e != nil {
 764  			return e
 765  		}
 766  		_, e = c.filterBlock(nextBlock, nextHeight, true)
 767  		if e != nil {
 768  			return e
 769  		}
 770  		currentBlock.Height = nextHeight
 771  		currentBlock.Hash = nextHash
 772  		currentBlock.Timestamp = nextHeader.Timestamp
 773  		blocksToNotify.Remove(blocksToNotify.Front())
 774  	}
 775  	c.bestBlockMtx.Lock()
 776  	c.bestBlock = currentBlock
 777  	c.bestBlockMtx.Unlock()
 778  	return nil
 779  }
 780  
 781  // FilterBlocks scans the blocks contained in the FilterBlocksRequest for any addresses of interest. Each block will be
 782  // fetched and filtered sequentially, returning a FilterBlocksResponse for the first block containing a matching
 783  // address. If no matches are found in the range of blocks requested, the returned response will be nil.
 784  //
 785  // NOTE: This is part of the chainclient.Interface interface.
 786  func (c *BitcoindClient) FilterBlocks(
 787  	req *FilterBlocksRequest,
 788  ) (*FilterBlocksResponse, error) {
 789  	blockFilterer := NewBlockFilterer(c.chainParams, req)
 790  	// Iterate over the requested blocks, fetching each from the rpc client. Each block will scanned using the reverse
 791  	// addresses indexes generated above, breaking out early if any addresses are found.
 792  	for i, block := range req.Blocks {
 793  		// TODO(conner): add prefetching, since we already know we'll be
 794  		// fetching *every* block
 795  		rawBlock, e := c.GetBlock(&block.Hash)
 796  		if e != nil {
 797  			return nil, e
 798  		}
 799  		if !blockFilterer.FilterBlock(rawBlock) {
 800  			continue
 801  		}
 802  		// If any external or internal addresses were detected in this block, we return them to the caller so that the
 803  		// rescan windows can widened with subsequent addresses. The `BatchIndex` is returned so that the caller can
 804  		// compute the *next* block from which to begin again.
 805  		resp := &FilterBlocksResponse{
 806  			BatchIndex:         uint32(i),
 807  			BlockMeta:          block,
 808  			FoundExternalAddrs: blockFilterer.FoundExternal,
 809  			FoundInternalAddrs: blockFilterer.FoundInternal,
 810  			FoundOutPoints:     blockFilterer.FoundOutPoints,
 811  			RelevantTxns:       blockFilterer.RelevantTxns,
 812  		}
 813  		return resp, nil
 814  	}
 815  	// No addresses were found for this range.
 816  	return nil, nil
 817  }
 818  
 819  // rescan performs a rescan of the chain using a bitcoind backend, from the specified hash to the best known hash, while
 820  // watching out for reorgs that happen during the rescan. It uses the addresses and outputs being tracked by the client
 821  // in the watch list. This is called only within a queue processing loop.
 822  func (c *BitcoindClient) rescan(start chainhash.Hash) (e error) {
 823  	I.Ln("starting rescan from block", start)
 824  	// We start by getting the best already processed block. We only use the height, as the hash can change during a
 825  	// reorganization, which we catch by testing connectivity from known blocks to the previous block.
 826  	bestHash, bestHeight, e := c.GetBestBlock()
 827  	if e != nil {
 828  		return e
 829  	}
 830  	bestHeader, e := c.GetBlockHeaderVerbose(bestHash)
 831  	if e != nil {
 832  		return e
 833  	}
 834  	bestBlock := am.BlockStamp{
 835  		Hash:      *bestHash,
 836  		Height:    bestHeight,
 837  		Timestamp: time.Unix(bestHeader.Time, 0),
 838  	}
 839  	// Create a list of headers sorted in forward order. We'll use this in the event that we need to backtrack due to a
 840  	// chain reorg.
 841  	headers := list.New()
 842  	previousHeader, e := c.GetBlockHeaderVerbose(&start)
 843  	if e != nil {
 844  		return e
 845  	}
 846  	previousHash, e := chainhash.NewHashFromStr(previousHeader.Hash)
 847  	if e != nil {
 848  		return e
 849  	}
 850  	headers.PushBack(previousHeader)
 851  	// Queue a RescanFinished notification to the caller with the last block processed throughout the rescan once done.
 852  	defer c.onRescanFinished(
 853  		previousHash, previousHeader.Height,
 854  		time.Unix(previousHeader.Time, 0),
 855  	)
 856  	// Cycle through all of the blocks known to bitcoind, being mindful of reorgs.
 857  	for i := previousHeader.Height + 1; i <= bestBlock.Height; i++ {
 858  		var hash *chainhash.Hash
 859  		hash, e = c.GetBlockHash(int64(i))
 860  		if e != nil {
 861  			return e
 862  		}
 863  		// If the previous header is before the wallet birthday, fetch the current header and construct a dummy block,
 864  		// rather than fetching the whole block itself. This speeds things up as we no longer have to fetch the whole
 865  		// block when we know it won't match any of our filters.
 866  		var block *wire.Block
 867  		afterBirthday := previousHeader.Time >= c.birthday.Unix()
 868  		if !afterBirthday {
 869  			var header *wire.BlockHeader
 870  			header, e = c.GetBlockHeader(hash)
 871  			if e != nil {
 872  				return e
 873  			}
 874  			block = &wire.Block{
 875  				Header: *header,
 876  			}
 877  			afterBirthday = c.birthday.Before(header.Timestamp)
 878  			if afterBirthday {
 879  				c.onRescanProgress(
 880  					previousHash, i,
 881  					block.Header.Timestamp,
 882  				)
 883  			}
 884  		}
 885  		if afterBirthday {
 886  			block, e = c.GetBlock(hash)
 887  			if e != nil {
 888  				return e
 889  			}
 890  		}
 891  		for block.Header.PrevBlock.String() != previousHeader.Hash {
 892  			// If we're in this for loop, it looks like we've been reorganized. We now walk backwards to the common
 893  			// ancestor between the best chain and the known chain.
 894  			//
 895  			// First, we signal a disconnected block to rewind the rescan state.
 896  			c.onBlockDisconnected(
 897  				previousHash, previousHeader.Height,
 898  				time.Unix(previousHeader.Time, 0),
 899  			)
 900  			// Get the previous block of the best chain.
 901  			hash, e = c.GetBlockHash(int64(i - 1))
 902  			if e != nil {
 903  				return e
 904  			}
 905  			block, e = c.GetBlock(hash)
 906  			if e != nil {
 907  				return e
 908  			}
 909  			// Then, we'll the get the header of this previous block.
 910  			if headers.Back() != nil {
 911  				// If it's already in the headers list, we can just get it from there and remove the current hash.
 912  				headers.Remove(headers.Back())
 913  				if headers.Back() != nil {
 914  					previousHeader = headers.Back().
 915  						Value.(*btcjson.GetBlockHeaderVerboseResult)
 916  					previousHash, e = chainhash.NewHashFromStr(
 917  						previousHeader.Hash,
 918  					)
 919  					if e != nil {
 920  						return e
 921  					}
 922  				}
 923  			} else {
 924  				// Otherwise, we get it from bitcoind.
 925  				previousHash, e = chainhash.NewHashFromStr(
 926  					previousHeader.PreviousHash,
 927  				)
 928  				if e != nil {
 929  					return e
 930  				}
 931  				previousHeader, e = c.GetBlockHeaderVerbose(
 932  					previousHash,
 933  				)
 934  				if e != nil {
 935  					return e
 936  				}
 937  			}
 938  		}
 939  		// Now that we've ensured we haven't come across a reorg, we'll add the current block header to our list of
 940  		// headers.
 941  		blockHash := block.BlockHash()
 942  		previousHash = &blockHash
 943  		previousHeader = &btcjson.GetBlockHeaderVerboseResult{
 944  			Hash:         blockHash.String(),
 945  			Height:       i,
 946  			PreviousHash: block.Header.PrevBlock.String(),
 947  			Time:         block.Header.Timestamp.Unix(),
 948  		}
 949  		headers.PushBack(previousHeader)
 950  		// Notify the block and any of its relevant transactions.
 951  		if _, e = c.filterBlock(block, i, true); E.Chk(e) {
 952  			return e
 953  		}
 954  		if i%10000 == 0 {
 955  			c.onRescanProgress(
 956  				previousHash, i, block.Header.Timestamp,
 957  			)
 958  		}
 959  		// If we've reached the previously best known block, check to make sure the underlying node hasn't synchronized
 960  		// additional blocks. If it has, update the best known block and continue to rescan to that point.
 961  		if i == bestBlock.Height {
 962  			bestHash, bestHeight, e = c.GetBestBlock()
 963  			if e != nil {
 964  				return e
 965  			}
 966  			bestHeader, e = c.GetBlockHeaderVerbose(bestHash)
 967  			if e != nil {
 968  				return e
 969  			}
 970  			bestBlock.Hash = *bestHash
 971  			bestBlock.Height = bestHeight
 972  			bestBlock.Timestamp = time.Unix(bestHeader.Time, 0)
 973  		}
 974  	}
 975  	return nil
 976  }
 977  
 978  // filterBlock filters a block for watched outpoints and addresses, and returns any matching transactions, sending
 979  // notifications along the way.
 980  func (c *BitcoindClient) filterBlock(
 981  	block *wire.Block, height int32,
 982  	notify bool,
 983  ) ([]*tm.TxRecord, error) {
 984  	// If this block happened before the client's birthday, then we'll skip it entirely.
 985  	if block.Header.Timestamp.Before(c.birthday) {
 986  		return nil, nil
 987  	}
 988  	if c.shouldNotifyBlocks() {
 989  		D.F(
 990  			"filtering block %d (%s) with %d transactions %s",
 991  			height, block.BlockHash(), len(block.Transactions),
 992  		)
 993  	}
 994  	// Create a block details template to use for all of the confirmed transactions found within this block.
 995  	blockHash := block.BlockHash()
 996  	blockDetails := &btcjson.BlockDetails{
 997  		Hash:   blockHash.String(),
 998  		Height: height,
 999  		Time:   block.Header.Timestamp.Unix(),
1000  	}
1001  	// Now, we'll through all of the transactions in the block keeping track of any relevant to the caller.
1002  	var relevantTxs []*tm.TxRecord
1003  	confirmedTxs := make(map[chainhash.Hash]struct{})
1004  	for i, tx := range block.Transactions {
1005  		// Update the index in the block details with the index of this
1006  		// transaction.
1007  		blockDetails.Index = i
1008  		isRelevant, rec, e := c.filterTx(tx, blockDetails, notify)
1009  		if e != nil {
1010  			W.F(
1011  				"Unable to filter transaction %v: %v",
1012  				tx.TxHash(), e,
1013  			)
1014  			continue
1015  		}
1016  		if isRelevant {
1017  			relevantTxs = append(relevantTxs, rec)
1018  			confirmedTxs[tx.TxHash()] = struct{}{}
1019  		}
1020  	}
1021  	// Update the expiration map by setting the block's confirmed transactions and deleting any in the mempool that were
1022  	// confirmed over 288 blocks ago.
1023  	c.watchMtx.Lock()
1024  	c.expiredMempool[height] = confirmedTxs
1025  	if oldBlock, ok := c.expiredMempool[height-288]; ok {
1026  		for txHash := range oldBlock {
1027  			delete(c.mempool, txHash)
1028  		}
1029  		delete(c.expiredMempool, height-288)
1030  	}
1031  	c.watchMtx.Unlock()
1032  	if notify {
1033  		c.onFilteredBlockConnected(height, &block.Header, relevantTxs)
1034  		c.onBlockConnected(&blockHash, height, block.Header.Timestamp)
1035  	}
1036  	return relevantTxs, nil
1037  }
1038  
1039  // filterTx determines whether a transaction is relevant to the client by inspecting the client's different filters.
1040  func (c *BitcoindClient) filterTx(
1041  	tx *wire.MsgTx,
1042  	blockDetails *btcjson.BlockDetails,
1043  	notify bool,
1044  ) (bool, *tm.TxRecord, error) {
1045  	txDetails := util.NewTx(tx)
1046  	if blockDetails != nil {
1047  		txDetails.SetIndex(blockDetails.Index)
1048  	}
1049  	rec, e := tm.NewTxRecordFromMsgTx(txDetails.MsgTx(), time.Now())
1050  	if e != nil {
1051  		E.Ln(
1052  			"Cannot create transaction record for relevant tx:", e,
1053  		)
1054  		return false, nil, e
1055  	}
1056  	if blockDetails != nil {
1057  		rec.Received = time.Unix(blockDetails.Time, 0)
1058  	}
1059  	// We'll begin the filtering process by holding the lock to ensure we match exactly against what's currently in the
1060  	// filters.
1061  	c.watchMtx.Lock()
1062  	defer c.watchMtx.Unlock()
1063  	// If we've already seen this transaction and it's now been confirmed, then we'll shortcut the filter process by
1064  	// immediately sending a notification to the caller that the filter matches.
1065  	if _, ok := c.mempool[tx.TxHash()]; ok {
1066  		if notify && blockDetails != nil {
1067  			c.onRelevantTx(rec, blockDetails)
1068  		}
1069  		return true, rec, nil
1070  	}
1071  	// Otherwise, this is a new transaction we have yet to see. We'll need to determine if this transaction is somehow
1072  	// relevant to the caller.
1073  	var isRelevant bool
1074  	// We'll start by cycling through its outputs to determine if it pays to any of the currently watched addresses. If
1075  	// an output matches, we'll add it to our watch list.
1076  	for i, out := range tx.TxOut {
1077  		var addrs []btcaddr.Address
1078  		_, addrs, _, e = txscript.ExtractPkScriptAddrs(
1079  			out.PkScript, c.chainParams,
1080  		)
1081  		if e != nil {
1082  			D.F(
1083  				"Unable to parse output script in %s:%d: %v %s",
1084  				tx.TxHash(), i, e,
1085  			)
1086  			continue
1087  		}
1088  		for _, addr := range addrs {
1089  			if _, ok := c.watchedAddresses[addr.String()]; ok {
1090  				isRelevant = true
1091  				op := wire.OutPoint{
1092  					Hash:  tx.TxHash(),
1093  					Index: uint32(i),
1094  				}
1095  				c.watchedOutPoints[op] = struct{}{}
1096  			}
1097  		}
1098  	}
1099  	// If the transaction didn't pay to any of our watched addresses, we'll check if we're currently watching for the
1100  	// hash of this transaction.
1101  	if !isRelevant {
1102  		if _, ok := c.watchedTxs[tx.TxHash()]; ok {
1103  			isRelevant = true
1104  		}
1105  	}
1106  	// If the transaction didn't pay to any of our watched hashes, we'll check if it spends any of our watched
1107  	// outpoints.
1108  	if !isRelevant {
1109  		for _, in := range tx.TxIn {
1110  			if _, ok := c.watchedOutPoints[in.PreviousOutPoint]; ok {
1111  				isRelevant = true
1112  				break
1113  			}
1114  		}
1115  	}
1116  	// If the transaction is not relevant to us, we can simply exit.
1117  	if !isRelevant {
1118  		return false, rec, nil
1119  	}
1120  	// Otherwise, the transaction matched our filters, so we should dispatch a notification for it. If it's still
1121  	// unconfirmed, we'll include it in our mempool so that it can also be notified as part of FilteredBlockConnected
1122  	// once it confirms.
1123  	if blockDetails == nil {
1124  		c.mempool[tx.TxHash()] = struct{}{}
1125  	}
1126  	c.onRelevantTx(rec, blockDetails)
1127  	return true, rec, nil
1128  }
1129