package chainclient import ( "errors" "fmt" "github.com/p9c/p9/pkg/btcaddr" "github.com/p9c/p9/pkg/chaincfg" "sync" "time" "github.com/p9c/p9/pkg/qu" sac "github.com/p9c/p9/cmd/spv" "github.com/p9c/p9/pkg/chainhash" "github.com/p9c/p9/pkg/gcs" "github.com/p9c/p9/pkg/gcs/builder" "github.com/p9c/p9/pkg/rpcclient" "github.com/p9c/p9/pkg/txscript" "github.com/p9c/p9/pkg/util" "github.com/p9c/p9/pkg/waddrmgr" "github.com/p9c/p9/pkg/wire" "github.com/p9c/p9/pkg/wtxmgr" ) // NeutrinoClient is an implementation of the btcwallet chainclient.Interface interface. type NeutrinoClient struct { CS *sac.ChainService chainParams *chaincfg.Params // We currently support one rescan/notification goroutine per client rescan *sac.Rescan enqueueNotification chan interface{} dequeueNotification chan interface{} startTime time.Time lastProgressSent bool currentBlock chan *waddrmgr.BlockStamp quit qu.C rescanQuit qu.C rescanErr <-chan error wg sync.WaitGroup started bool scanning bool finished bool isRescan bool clientMtx sync.Mutex } // NewNeutrinoClient creates a new NeutrinoClient struct with a backing ChainService. func NewNeutrinoClient( chainParams *chaincfg.Params, chainService *sac.ChainService, ) *NeutrinoClient { return &NeutrinoClient{ CS: chainService, chainParams: chainParams, } } // BackEnd returns the name of the driver. func (s *NeutrinoClient) BackEnd() string { return "neutrino" } // Start replicates the RPC client's Start method. func (s *NeutrinoClient) Start() (e error) { s.CS.Start() s.clientMtx.Lock() defer s.clientMtx.Unlock() if !s.started { s.enqueueNotification = make(chan interface{}) s.dequeueNotification = make(chan interface{}) s.currentBlock = make(chan *waddrmgr.BlockStamp) s.quit = qu.T() s.started = true s.wg.Add(1) go func() { select { case s.enqueueNotification <- ClientConnected{}: case <-s.quit.Wait(): } }() go s.notificationHandler() } return nil } // Stop replicates the RPC client's Stop method. func (s *NeutrinoClient) Stop() { s.clientMtx.Lock() defer s.clientMtx.Unlock() if !s.started { return } s.quit.Q() s.started = false } // WaitForShutdown replicates the RPC client's WaitForShutdown method. func (s *NeutrinoClient) WaitForShutdown() { s.wg.Wait() } // GetBlock replicates the RPC client's GetBlock command. func (s *NeutrinoClient) GetBlock(hash *chainhash.Hash) (*wire.Block, error) { // TODO(roasbeef): add a block cache? // * which evication strategy? depends on use case // Should the block cache be INSIDE neutrino instead of in btcwallet? block, e := s.CS.GetBlock(*hash) if e != nil { return nil, e } return block.WireBlock(), nil } // GetBlockHeight gets the height of a block by its hash. It serves as a replacement for the use of // GetBlockVerboseTxAsync for the wallet package since we can't actually return a FutureGetBlockVerboseResult because // the underlying type is private to rpcclient. func (s *NeutrinoClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) { return s.CS.GetBlockHeight(hash) } // GetBestBlock replicates the RPC client's GetBestBlock command. func (s *NeutrinoClient) GetBestBlock() (h *chainhash.Hash, height int32, e error) { var chainTip *waddrmgr.BlockStamp chainTip, e = s.CS.BestBlock() if e != nil { return nil, 0, e } return &chainTip.Hash, chainTip.Height, nil } // BlockStamp returns the latest block notified by the client, or an error if the client has been shut down. func (s *NeutrinoClient) BlockStamp() (*waddrmgr.BlockStamp, error) { select { case bs := <-s.currentBlock: return bs, nil case <-s.quit.Wait(): return nil, errors.New("disconnected") } } // GetBlockHash returns the block hash for the given height, or an error if the client has been shut down or the hash at // the block height doesn't exist or is unknown. func (s *NeutrinoClient) GetBlockHash(height int64) (*chainhash.Hash, error) { return s.CS.GetBlockHash(height) } // GetBlockHeader returns the block header for the given block hash, or an error if the client has been shut down or the // hash doesn't exist or is unknown. func (s *NeutrinoClient) GetBlockHeader( blockHash *chainhash.Hash, ) (*wire.BlockHeader, error) { return s.CS.GetBlockHeader(blockHash) } // SendRawTransaction replicates the RPC client's SendRawTransaction command. func (s *NeutrinoClient) SendRawTransaction(tx *wire.MsgTx, allowHighFees bool) ( *chainhash.Hash, error, ) { e := s.CS.SendTransaction(tx) if e != nil { return nil, e } hash := tx.TxHash() return &hash, nil } // FilterBlocks scans the blocks contained in the FilterBlocksRequest for any addresses of interest. For each requested // block, the corresponding compact filter will first be checked for matches, skipping those that do not report // anything. If the filter returns a positive match, the full block will be fetched and filtered. This method returns a // FilterBlocksResponse for the first block containing a matching address. If no matches are found in the range of // blocks requested, the returned response will be nil. func (s *NeutrinoClient) FilterBlocks( req *FilterBlocksRequest, ) (*FilterBlocksResponse, error) { blockFilterer := NewBlockFilterer(s.chainParams, req) // Construct the watchlist using the addresses and outpoints contained in the filter blocks request. watchList, e := buildFilterBlocksWatchList(req) if e != nil { return nil, e } // Iterate over the requested blocks, fetching the compact filter for each one, and matching it against the // watchlist generated above. If the filter returns a positive match, the full block is then requested and scanned // for addresses using the block filterer. for i, blk := range req.Blocks { filter, e := s.pollCFilter(&blk.Hash) if e != nil { return nil, e } // Skip any empty filters. if filter == nil || filter.N() == 0 { continue } key := builder.DeriveKey(&blk.Hash) matched, e := filter.MatchAny(key, watchList) if e != nil { return nil, e } else if !matched { continue } T.F( "fetching block height=%d hash=%v", blk.Height, blk.Hash, ) // TODO(conner): can optimize bandwidth by only fetching // stripped blocks rawBlock, e := s.GetBlock(&blk.Hash) if e != nil { return nil, e } if !blockFilterer.FilterBlock(rawBlock) { continue } // If any external or internal addresses were detected in this block, we return them to the caller so that the // rescan windows can widened with subsequent addresses. The `BatchIndex` is returned so that the caller can // compute the *next* block from which to begin again. resp := &FilterBlocksResponse{ BatchIndex: uint32(i), BlockMeta: blk, FoundExternalAddrs: blockFilterer.FoundExternal, FoundInternalAddrs: blockFilterer.FoundInternal, FoundOutPoints: blockFilterer.FoundOutPoints, RelevantTxns: blockFilterer.RelevantTxns, } return resp, nil } // No addresses were found for this range. return nil, nil } // buildFilterBlocksWatchList constructs a watchlist used for matching against a cfilter from a FilterBlocksRequest. The // watchlist will be populated with all external addresses, internal addresses, and outpoints contained in the request. func buildFilterBlocksWatchList(req *FilterBlocksRequest) ([][]byte, error) { // Construct a watch list containing the script addresses of all internal and external addresses that were // requested, in addition to the set of outpoints currently being watched. watchListSize := len(req.ExternalAddrs) + len(req.InternalAddrs) + len(req.WatchedOutPoints) watchList := make([][]byte, 0, watchListSize) for _, addr := range req.ExternalAddrs { p2shAddr, e := txscript.PayToAddrScript(addr) if e != nil { return nil, e } watchList = append(watchList, p2shAddr) } for _, addr := range req.InternalAddrs { p2shAddr, e := txscript.PayToAddrScript(addr) if e != nil { return nil, e } watchList = append(watchList, p2shAddr) } for _, addr := range req.WatchedOutPoints { addr, e := txscript.PayToAddrScript(addr) if e != nil { return nil, e } watchList = append(watchList, addr) } return watchList, nil } // pollCFilter attempts to fetch a CFilter from the neutrino client. This is used to get around the fact that the filter // headers may lag behind the highest known block header. func (s *NeutrinoClient) pollCFilter(hash *chainhash.Hash) (filter *gcs.Filter, e error) { var ( count int ) const maxFilterRetries = 50 for count < maxFilterRetries { if count > 0 { time.Sleep(100 * time.Millisecond) } filter, e = s.CS.GetCFilter(*hash, wire.GCSFilterRegular) if e != nil { count++ continue } return filter, nil } return nil, e } // Rescan replicates the RPC client's Rescan command. func (s *NeutrinoClient) Rescan( startHash *chainhash.Hash, addrs []btcaddr.Address, outPoints map[wire.OutPoint]btcaddr.Address, ) (e error) { s.clientMtx.Lock() defer s.clientMtx.Unlock() if !s.started { return fmt.Errorf( "can't do a rescan when the chain client " + "is not started", ) } if s.scanning { // Restart the rescan by killing the existing rescan. s.rescanQuit.Q() s.clientMtx.Unlock() s.rescan.WaitForShutdown() s.clientMtx.Lock() s.rescan = nil s.rescanErr = nil } s.rescanQuit = qu.T() s.scanning = true s.finished = false s.lastProgressSent = false s.isRescan = true bestBlock, e := s.CS.BestBlock() if e != nil { return fmt.Errorf("can't get chain service's best block: %s", e) } header, e := s.CS.GetBlockHeader(&bestBlock.Hash) if e != nil { return fmt.Errorf( "can't get block header for hash %v: %s", bestBlock.Hash, e, ) } // If the wallet is already fully caught up, or the rescan has started with state that indicates a "fresh" wallet, // we'll send a notification indicating the rescan has "finished". if header.BlockHash() == *startHash { s.finished = true select { case s.enqueueNotification <- &RescanFinished{ Hash: startHash, Height: bestBlock.Height, Time: header.Timestamp, }: case <-s.quit.Wait(): return nil case <-s.rescanQuit.Wait(): return nil } } var inputsToWatch []sac.InputWithScript for op, addr := range outPoints { addrScript, e := txscript.PayToAddrScript(addr) if e != nil { } inputsToWatch = append( inputsToWatch, sac.InputWithScript{ OutPoint: op, PkScript: addrScript, }, ) } newRescan := s.CS.NewRescan( sac.NotificationHandlers( rpcclient.NotificationHandlers{ OnBlockConnected: s.onBlockConnected, OnFilteredBlockConnected: s.onFilteredBlockConnected, OnBlockDisconnected: s.onBlockDisconnected, }, ), sac.StartBlock(&waddrmgr.BlockStamp{Hash: *startHash}), sac.StartTime(s.startTime), sac.QuitChan(s.rescanQuit), sac.WatchAddrs(addrs...), sac.WatchInputs(inputsToWatch...), ) s.rescan = newRescan s.rescanErr = s.rescan.Start() return nil } // NotifyBlocks replicates the RPC client's NotifyBlocks command. func (s *NeutrinoClient) NotifyBlocks() (e error) { s.clientMtx.Lock() // If we're scanning, we're already notifying on blocks. Otherwise, start a rescan without watching any addresses. if !s.scanning { s.clientMtx.Unlock() return s.NotifyReceived([]btcaddr.Address{}) } s.clientMtx.Unlock() return nil } // NotifyReceived replicates the RPC client's NotifyReceived command. func (s *NeutrinoClient) NotifyReceived(addrs []btcaddr.Address) (e error) { s.clientMtx.Lock() // If we have a rescan running, we just need to add the appropriate addresses to the watch list. if s.scanning { s.clientMtx.Unlock() return s.rescan.Update(sac.AddAddrs(addrs...)) } s.rescanQuit = qu.T() s.scanning = true // Don't need RescanFinished or RescanProgress notifications. s.finished = true s.lastProgressSent = true // Rescan with just the specified addresses. newRescan := s.CS.NewRescan( sac.NotificationHandlers( rpcclient.NotificationHandlers{ OnBlockConnected: s.onBlockConnected, OnFilteredBlockConnected: s.onFilteredBlockConnected, OnBlockDisconnected: s.onBlockDisconnected, }, ), sac.StartTime(s.startTime), sac.QuitChan(s.rescanQuit), sac.WatchAddrs(addrs...), ) s.rescan = newRescan s.rescanErr = s.rescan.Start() s.clientMtx.Unlock() return nil } // Notifications replicates the RPC client's Notifications method. func (s *NeutrinoClient) Notifications() <-chan interface{} { return s.dequeueNotification } // SetStartTime is a non-interface method to set the birthday of the wallet using this object. Since only a single // rescan at a time is currently supported, only one birthday needs to be set. This does not fully restart a running // rescan, so should not be used to update a rescan while it is running. TODO: When factoring out to multiple rescans // per Neutrino client, add a birthday per client. func (s *NeutrinoClient) SetStartTime(startTime time.Time) { s.clientMtx.Lock() defer s.clientMtx.Unlock() s.startTime = startTime } // onFilteredBlockConnected sends appropriate notifications to the notification channel. func (s *NeutrinoClient) onFilteredBlockConnected( height int32, header *wire.BlockHeader, relevantTxs []*util.Tx, ) { ntfn := FilteredBlockConnected{ Block: &wtxmgr.BlockMeta{ Block: wtxmgr.Block{ Hash: header.BlockHash(), Height: height, }, Time: header.Timestamp, }, } for _, tx := range relevantTxs { rec, e := wtxmgr.NewTxRecordFromMsgTx( tx.MsgTx(), header.Timestamp, ) if e != nil { E.Ln( "cannot create transaction record for relevant tx:", e, ) // TODO(aakselrod): Return? continue } ntfn.RelevantTxs = append(ntfn.RelevantTxs, rec) } select { case s.enqueueNotification <- ntfn: case <-s.quit.Wait(): return case <-s.rescanQuit.Wait(): return } // Handle RescanFinished notification if required. bs, e := s.CS.BestBlock() if e != nil { E.Ln("can't get chain service's best block:", e) return } if bs.Hash == header.BlockHash() { // Only send the RescanFinished notification once. s.clientMtx.Lock() if s.finished { s.clientMtx.Unlock() return } // Only send the RescanFinished notification once the underlying chain service sees itself as current. current := s.CS.IsCurrent() && s.lastProgressSent if current { s.finished = true } s.clientMtx.Unlock() if current { select { case s.enqueueNotification <- &RescanFinished{ Hash: &bs.Hash, Height: bs.Height, Time: header.Timestamp, }: case <-s.quit.Wait(): return case <-s.rescanQuit.Wait(): return } } } } // onBlockDisconnected sends appropriate notifications to the notification channel. func (s *NeutrinoClient) onBlockDisconnected( hash *chainhash.Hash, height int32, t time.Time, ) { select { case s.enqueueNotification <- BlockDisconnected{ Block: wtxmgr.Block{ Hash: *hash, Height: height, }, Time: t, }: case <-s.quit.Wait(): case <-s.rescanQuit.Wait(): } } func (s *NeutrinoClient) onBlockConnected( hash *chainhash.Hash, height int32, time time.Time, ) { // TODO: Move this closure out and parameterize it? Is it useful // outside here? sendRescanProgress := func() { select { case s.enqueueNotification <- &RescanProgress{ Hash: hash, Height: height, Time: time, }: case <-s.quit.Wait(): case <-s.rescanQuit.Wait(): } } // Only send BlockConnected notification if we're processing blocks before the birthday. Otherwise, we can just // update using RescanProgress notifications. if time.Before(s.startTime) { // Send a RescanProgress notification every 10K blocks. if height%10000 == 0 { s.clientMtx.Lock() shouldSend := s.isRescan && !s.finished s.clientMtx.Unlock() if shouldSend { sendRescanProgress() } } } else { // Send a RescanProgress notification if we're just going over the boundary between pre-birthday and // post-birthday blocks, and note that we've sent it. s.clientMtx.Lock() if !s.lastProgressSent { shouldSend := s.isRescan && !s.finished if shouldSend { s.clientMtx.Unlock() sendRescanProgress() s.clientMtx.Lock() s.lastProgressSent = true } } s.clientMtx.Unlock() select { case s.enqueueNotification <- BlockConnected{ Block: wtxmgr.Block{ Hash: *hash, Height: height, }, Time: time, }: case <-s.quit.Wait(): case <-s.rescanQuit.Wait(): } } } // notificationHandler queues and dequeues notifications. There are currently no bounds on the queue, so the dequeue // channel should be read continually to avoid running out of memory. func (s *NeutrinoClient) notificationHandler() { hash, height, e := s.GetBestBlock() if e != nil { E.F("failed to get best block from chain service:", e) s.Stop() s.wg.Done() return } bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height} // TODO: Rather than leaving this as an unbounded queue for all types of notifications, try dropping ones where a // later enqueued notification can fully invalidate one waiting to be processed. For example, blockconnected // notifications for greater block heights can remove the need to process earlier blockconnected notifications still // waiting here. var notifications []interface{} enqueue := s.enqueueNotification var dequeue chan interface{} var next interface{} out: for { s.clientMtx.Lock() rescanErr := s.rescanErr s.clientMtx.Unlock() select { case n, ok := <-enqueue: if !ok { // If no notifications are queued for handling, the queue is finished. if len(notifications) == 0 { break out } // nil channel so no more reads can occur. enqueue = nil continue } if len(notifications) == 0 { next = n dequeue = s.dequeueNotification } notifications = append(notifications, n) case dequeue <- next: if n, ok := next.(BlockConnected); ok { bs = &waddrmgr.BlockStamp{ Height: n.Height, Hash: n.Hash, } } notifications[0] = nil notifications = notifications[1:] if len(notifications) != 0 { next = notifications[0] } else { // If no more notifications can be enqueued, the queue is finished. if enqueue == nil { break out } dequeue = nil } case e := <-rescanErr: if e != nil { E.Ln("neutrino rescan ended with error:", e) } case s.currentBlock <- bs: case <-s.quit.Wait(): break out } } s.Stop() close(s.dequeueNotification) s.wg.Done() }