1 package chainclient
2 3 import (
4 "errors"
5 "fmt"
6 "github.com/p9c/p9/pkg/btcaddr"
7 "github.com/p9c/p9/pkg/chaincfg"
8 "sync"
9 "time"
10 11 "github.com/p9c/p9/pkg/qu"
12 13 sac "github.com/p9c/p9/cmd/spv"
14 "github.com/p9c/p9/pkg/chainhash"
15 "github.com/p9c/p9/pkg/gcs"
16 "github.com/p9c/p9/pkg/gcs/builder"
17 "github.com/p9c/p9/pkg/rpcclient"
18 "github.com/p9c/p9/pkg/txscript"
19 "github.com/p9c/p9/pkg/util"
20 "github.com/p9c/p9/pkg/waddrmgr"
21 "github.com/p9c/p9/pkg/wire"
22 "github.com/p9c/p9/pkg/wtxmgr"
23 )
24 25 // NeutrinoClient is an implementation of the btcwallet chainclient.Interface interface.
26 type NeutrinoClient struct {
27 CS *sac.ChainService
28 chainParams *chaincfg.Params
29 // We currently support one rescan/notification goroutine per client
30 rescan *sac.Rescan
31 enqueueNotification chan interface{}
32 dequeueNotification chan interface{}
33 startTime time.Time
34 lastProgressSent bool
35 currentBlock chan *waddrmgr.BlockStamp
36 quit qu.C
37 rescanQuit qu.C
38 rescanErr <-chan error
39 wg sync.WaitGroup
40 started bool
41 scanning bool
42 finished bool
43 isRescan bool
44 clientMtx sync.Mutex
45 }
46 47 // NewNeutrinoClient creates a new NeutrinoClient struct with a backing ChainService.
48 func NewNeutrinoClient(
49 chainParams *chaincfg.Params,
50 chainService *sac.ChainService,
51 ) *NeutrinoClient {
52 return &NeutrinoClient{
53 CS: chainService,
54 chainParams: chainParams,
55 }
56 }
57 58 // BackEnd returns the name of the driver.
59 func (s *NeutrinoClient) BackEnd() string {
60 return "neutrino"
61 }
62 63 // Start replicates the RPC client's Start method.
64 func (s *NeutrinoClient) Start() (e error) {
65 s.CS.Start()
66 s.clientMtx.Lock()
67 defer s.clientMtx.Unlock()
68 if !s.started {
69 s.enqueueNotification = make(chan interface{})
70 s.dequeueNotification = make(chan interface{})
71 s.currentBlock = make(chan *waddrmgr.BlockStamp)
72 s.quit = qu.T()
73 s.started = true
74 s.wg.Add(1)
75 go func() {
76 select {
77 case s.enqueueNotification <- ClientConnected{}:
78 case <-s.quit.Wait():
79 }
80 }()
81 go s.notificationHandler()
82 }
83 return nil
84 }
85 86 // Stop replicates the RPC client's Stop method.
87 func (s *NeutrinoClient) Stop() {
88 s.clientMtx.Lock()
89 defer s.clientMtx.Unlock()
90 if !s.started {
91 return
92 }
93 s.quit.Q()
94 s.started = false
95 }
96 97 // WaitForShutdown replicates the RPC client's WaitForShutdown method.
98 func (s *NeutrinoClient) WaitForShutdown() {
99 s.wg.Wait()
100 }
101 102 // GetBlock replicates the RPC client's GetBlock command.
103 func (s *NeutrinoClient) GetBlock(hash *chainhash.Hash) (*wire.Block, error) {
104 // TODO(roasbeef): add a block cache?
105 // * which evication strategy? depends on use case
106 // Should the block cache be INSIDE neutrino instead of in btcwallet?
107 block, e := s.CS.GetBlock(*hash)
108 if e != nil {
109 return nil, e
110 }
111 return block.WireBlock(), nil
112 }
113 114 // GetBlockHeight gets the height of a block by its hash. It serves as a replacement for the use of
115 // GetBlockVerboseTxAsync for the wallet package since we can't actually return a FutureGetBlockVerboseResult because
116 // the underlying type is private to rpcclient.
117 func (s *NeutrinoClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) {
118 return s.CS.GetBlockHeight(hash)
119 }
120 121 // GetBestBlock replicates the RPC client's GetBestBlock command.
122 func (s *NeutrinoClient) GetBestBlock() (h *chainhash.Hash, height int32, e error) {
123 var chainTip *waddrmgr.BlockStamp
124 chainTip, e = s.CS.BestBlock()
125 if e != nil {
126 return nil, 0, e
127 }
128 return &chainTip.Hash, chainTip.Height, nil
129 }
130 131 // BlockStamp returns the latest block notified by the client, or an error if the client has been shut down.
132 func (s *NeutrinoClient) BlockStamp() (*waddrmgr.BlockStamp, error) {
133 select {
134 case bs := <-s.currentBlock:
135 return bs, nil
136 case <-s.quit.Wait():
137 return nil, errors.New("disconnected")
138 }
139 }
140 141 // GetBlockHash returns the block hash for the given height, or an error if the client has been shut down or the hash at
142 // the block height doesn't exist or is unknown.
143 func (s *NeutrinoClient) GetBlockHash(height int64) (*chainhash.Hash, error) {
144 return s.CS.GetBlockHash(height)
145 }
146 147 // GetBlockHeader returns the block header for the given block hash, or an error if the client has been shut down or the
148 // hash doesn't exist or is unknown.
149 func (s *NeutrinoClient) GetBlockHeader(
150 blockHash *chainhash.Hash,
151 ) (*wire.BlockHeader, error) {
152 return s.CS.GetBlockHeader(blockHash)
153 }
154 155 // SendRawTransaction replicates the RPC client's SendRawTransaction command.
156 func (s *NeutrinoClient) SendRawTransaction(tx *wire.MsgTx, allowHighFees bool) (
157 *chainhash.Hash, error,
158 ) {
159 e := s.CS.SendTransaction(tx)
160 if e != nil {
161 return nil, e
162 }
163 hash := tx.TxHash()
164 return &hash, nil
165 }
166 167 // FilterBlocks scans the blocks contained in the FilterBlocksRequest for any addresses of interest. For each requested
168 // block, the corresponding compact filter will first be checked for matches, skipping those that do not report
169 // anything. If the filter returns a positive match, the full block will be fetched and filtered. This method returns a
170 // FilterBlocksResponse for the first block containing a matching address. If no matches are found in the range of
171 // blocks requested, the returned response will be nil.
172 func (s *NeutrinoClient) FilterBlocks(
173 req *FilterBlocksRequest,
174 ) (*FilterBlocksResponse, error) {
175 blockFilterer := NewBlockFilterer(s.chainParams, req)
176 // Construct the watchlist using the addresses and outpoints contained in the filter blocks request.
177 watchList, e := buildFilterBlocksWatchList(req)
178 if e != nil {
179 return nil, e
180 }
181 // Iterate over the requested blocks, fetching the compact filter for each one, and matching it against the
182 // watchlist generated above. If the filter returns a positive match, the full block is then requested and scanned
183 // for addresses using the block filterer.
184 for i, blk := range req.Blocks {
185 filter, e := s.pollCFilter(&blk.Hash)
186 if e != nil {
187 return nil, e
188 }
189 // Skip any empty filters.
190 if filter == nil || filter.N() == 0 {
191 continue
192 }
193 key := builder.DeriveKey(&blk.Hash)
194 matched, e := filter.MatchAny(key, watchList)
195 if e != nil {
196 return nil, e
197 } else if !matched {
198 continue
199 }
200 T.F(
201 "fetching block height=%d hash=%v",
202 blk.Height, blk.Hash,
203 )
204 // TODO(conner): can optimize bandwidth by only fetching
205 // stripped blocks
206 rawBlock, e := s.GetBlock(&blk.Hash)
207 if e != nil {
208 return nil, e
209 }
210 if !blockFilterer.FilterBlock(rawBlock) {
211 continue
212 }
213 // If any external or internal addresses were detected in this block, we return them to the caller so that the
214 // rescan windows can widened with subsequent addresses. The `BatchIndex` is returned so that the caller can
215 // compute the *next* block from which to begin again.
216 resp := &FilterBlocksResponse{
217 BatchIndex: uint32(i),
218 BlockMeta: blk,
219 FoundExternalAddrs: blockFilterer.FoundExternal,
220 FoundInternalAddrs: blockFilterer.FoundInternal,
221 FoundOutPoints: blockFilterer.FoundOutPoints,
222 RelevantTxns: blockFilterer.RelevantTxns,
223 }
224 return resp, nil
225 }
226 // No addresses were found for this range.
227 return nil, nil
228 }
229 230 // buildFilterBlocksWatchList constructs a watchlist used for matching against a cfilter from a FilterBlocksRequest. The
231 // watchlist will be populated with all external addresses, internal addresses, and outpoints contained in the request.
232 func buildFilterBlocksWatchList(req *FilterBlocksRequest) ([][]byte, error) {
233 // Construct a watch list containing the script addresses of all internal and external addresses that were
234 // requested, in addition to the set of outpoints currently being watched.
235 watchListSize := len(req.ExternalAddrs) +
236 len(req.InternalAddrs) +
237 len(req.WatchedOutPoints)
238 watchList := make([][]byte, 0, watchListSize)
239 for _, addr := range req.ExternalAddrs {
240 p2shAddr, e := txscript.PayToAddrScript(addr)
241 if e != nil {
242 return nil, e
243 }
244 watchList = append(watchList, p2shAddr)
245 }
246 for _, addr := range req.InternalAddrs {
247 p2shAddr, e := txscript.PayToAddrScript(addr)
248 if e != nil {
249 return nil, e
250 }
251 watchList = append(watchList, p2shAddr)
252 }
253 for _, addr := range req.WatchedOutPoints {
254 addr, e := txscript.PayToAddrScript(addr)
255 if e != nil {
256 return nil, e
257 }
258 watchList = append(watchList, addr)
259 }
260 return watchList, nil
261 }
262 263 // pollCFilter attempts to fetch a CFilter from the neutrino client. This is used to get around the fact that the filter
264 // headers may lag behind the highest known block header.
265 func (s *NeutrinoClient) pollCFilter(hash *chainhash.Hash) (filter *gcs.Filter, e error) {
266 var (
267 count int
268 )
269 const maxFilterRetries = 50
270 for count < maxFilterRetries {
271 if count > 0 {
272 time.Sleep(100 * time.Millisecond)
273 }
274 filter, e = s.CS.GetCFilter(*hash, wire.GCSFilterRegular)
275 if e != nil {
276 count++
277 continue
278 }
279 return filter, nil
280 }
281 return nil, e
282 }
283 284 // Rescan replicates the RPC client's Rescan command.
285 func (s *NeutrinoClient) Rescan(
286 startHash *chainhash.Hash, addrs []btcaddr.Address,
287 outPoints map[wire.OutPoint]btcaddr.Address,
288 ) (e error) {
289 s.clientMtx.Lock()
290 defer s.clientMtx.Unlock()
291 if !s.started {
292 return fmt.Errorf(
293 "can't do a rescan when the chain client " +
294 "is not started",
295 )
296 }
297 if s.scanning {
298 // Restart the rescan by killing the existing rescan.
299 s.rescanQuit.Q()
300 s.clientMtx.Unlock()
301 s.rescan.WaitForShutdown()
302 s.clientMtx.Lock()
303 s.rescan = nil
304 s.rescanErr = nil
305 }
306 s.rescanQuit = qu.T()
307 s.scanning = true
308 s.finished = false
309 s.lastProgressSent = false
310 s.isRescan = true
311 bestBlock, e := s.CS.BestBlock()
312 if e != nil {
313 return fmt.Errorf("can't get chain service's best block: %s", e)
314 }
315 header, e := s.CS.GetBlockHeader(&bestBlock.Hash)
316 if e != nil {
317 return fmt.Errorf(
318 "can't get block header for hash %v: %s",
319 bestBlock.Hash, e,
320 )
321 }
322 // If the wallet is already fully caught up, or the rescan has started with state that indicates a "fresh" wallet,
323 // we'll send a notification indicating the rescan has "finished".
324 if header.BlockHash() == *startHash {
325 s.finished = true
326 select {
327 case s.enqueueNotification <- &RescanFinished{
328 Hash: startHash,
329 Height: bestBlock.Height,
330 Time: header.Timestamp,
331 }:
332 case <-s.quit.Wait():
333 return nil
334 case <-s.rescanQuit.Wait():
335 return nil
336 }
337 }
338 var inputsToWatch []sac.InputWithScript
339 for op, addr := range outPoints {
340 addrScript, e := txscript.PayToAddrScript(addr)
341 if e != nil {
342 }
343 inputsToWatch = append(
344 inputsToWatch, sac.InputWithScript{
345 OutPoint: op,
346 PkScript: addrScript,
347 },
348 )
349 }
350 newRescan := s.CS.NewRescan(
351 sac.NotificationHandlers(
352 rpcclient.NotificationHandlers{
353 OnBlockConnected: s.onBlockConnected,
354 OnFilteredBlockConnected: s.onFilteredBlockConnected,
355 OnBlockDisconnected: s.onBlockDisconnected,
356 },
357 ),
358 sac.StartBlock(&waddrmgr.BlockStamp{Hash: *startHash}),
359 sac.StartTime(s.startTime),
360 sac.QuitChan(s.rescanQuit),
361 sac.WatchAddrs(addrs...),
362 sac.WatchInputs(inputsToWatch...),
363 )
364 s.rescan = newRescan
365 s.rescanErr = s.rescan.Start()
366 return nil
367 }
368 369 // NotifyBlocks replicates the RPC client's NotifyBlocks command.
370 func (s *NeutrinoClient) NotifyBlocks() (e error) {
371 s.clientMtx.Lock()
372 // If we're scanning, we're already notifying on blocks. Otherwise, start a rescan without watching any addresses.
373 if !s.scanning {
374 s.clientMtx.Unlock()
375 return s.NotifyReceived([]btcaddr.Address{})
376 }
377 s.clientMtx.Unlock()
378 return nil
379 }
380 381 // NotifyReceived replicates the RPC client's NotifyReceived command.
382 func (s *NeutrinoClient) NotifyReceived(addrs []btcaddr.Address) (e error) {
383 s.clientMtx.Lock()
384 // If we have a rescan running, we just need to add the appropriate addresses to the watch list.
385 if s.scanning {
386 s.clientMtx.Unlock()
387 return s.rescan.Update(sac.AddAddrs(addrs...))
388 }
389 s.rescanQuit = qu.T()
390 s.scanning = true
391 // Don't need RescanFinished or RescanProgress notifications.
392 s.finished = true
393 s.lastProgressSent = true
394 // Rescan with just the specified addresses.
395 newRescan := s.CS.NewRescan(
396 sac.NotificationHandlers(
397 rpcclient.NotificationHandlers{
398 OnBlockConnected: s.onBlockConnected,
399 OnFilteredBlockConnected: s.onFilteredBlockConnected,
400 OnBlockDisconnected: s.onBlockDisconnected,
401 },
402 ),
403 sac.StartTime(s.startTime),
404 sac.QuitChan(s.rescanQuit),
405 sac.WatchAddrs(addrs...),
406 )
407 s.rescan = newRescan
408 s.rescanErr = s.rescan.Start()
409 s.clientMtx.Unlock()
410 return nil
411 }
412 413 // Notifications replicates the RPC client's Notifications method.
414 func (s *NeutrinoClient) Notifications() <-chan interface{} {
415 return s.dequeueNotification
416 }
417 418 // SetStartTime is a non-interface method to set the birthday of the wallet using this object. Since only a single
419 // rescan at a time is currently supported, only one birthday needs to be set. This does not fully restart a running
420 // rescan, so should not be used to update a rescan while it is running. TODO: When factoring out to multiple rescans
421 // per Neutrino client, add a birthday per client.
422 func (s *NeutrinoClient) SetStartTime(startTime time.Time) {
423 s.clientMtx.Lock()
424 defer s.clientMtx.Unlock()
425 s.startTime = startTime
426 }
427 428 // onFilteredBlockConnected sends appropriate notifications to the notification channel.
429 func (s *NeutrinoClient) onFilteredBlockConnected(
430 height int32,
431 header *wire.BlockHeader, relevantTxs []*util.Tx,
432 ) {
433 ntfn := FilteredBlockConnected{
434 Block: &wtxmgr.BlockMeta{
435 Block: wtxmgr.Block{
436 Hash: header.BlockHash(),
437 Height: height,
438 },
439 Time: header.Timestamp,
440 },
441 }
442 for _, tx := range relevantTxs {
443 rec, e := wtxmgr.NewTxRecordFromMsgTx(
444 tx.MsgTx(),
445 header.Timestamp,
446 )
447 if e != nil {
448 E.Ln(
449 "cannot create transaction record for relevant tx:", e,
450 )
451 // TODO(aakselrod): Return?
452 continue
453 }
454 ntfn.RelevantTxs = append(ntfn.RelevantTxs, rec)
455 }
456 select {
457 case s.enqueueNotification <- ntfn:
458 case <-s.quit.Wait():
459 return
460 case <-s.rescanQuit.Wait():
461 return
462 }
463 // Handle RescanFinished notification if required.
464 bs, e := s.CS.BestBlock()
465 if e != nil {
466 E.Ln("can't get chain service's best block:", e)
467 return
468 }
469 if bs.Hash == header.BlockHash() {
470 // Only send the RescanFinished notification once.
471 s.clientMtx.Lock()
472 if s.finished {
473 s.clientMtx.Unlock()
474 return
475 }
476 // Only send the RescanFinished notification once the underlying chain service sees itself as current.
477 current := s.CS.IsCurrent() && s.lastProgressSent
478 if current {
479 s.finished = true
480 }
481 s.clientMtx.Unlock()
482 if current {
483 select {
484 case s.enqueueNotification <- &RescanFinished{
485 Hash: &bs.Hash,
486 Height: bs.Height,
487 Time: header.Timestamp,
488 }:
489 case <-s.quit.Wait():
490 return
491 case <-s.rescanQuit.Wait():
492 return
493 }
494 }
495 }
496 }
497 498 // onBlockDisconnected sends appropriate notifications to the notification channel.
499 func (s *NeutrinoClient) onBlockDisconnected(
500 hash *chainhash.Hash, height int32,
501 t time.Time,
502 ) {
503 select {
504 case s.enqueueNotification <- BlockDisconnected{
505 Block: wtxmgr.Block{
506 Hash: *hash,
507 Height: height,
508 },
509 Time: t,
510 }:
511 case <-s.quit.Wait():
512 case <-s.rescanQuit.Wait():
513 }
514 }
515 func (s *NeutrinoClient) onBlockConnected(
516 hash *chainhash.Hash, height int32,
517 time time.Time,
518 ) {
519 // TODO: Move this closure out and parameterize it? Is it useful
520 // outside here?
521 sendRescanProgress := func() {
522 select {
523 case s.enqueueNotification <- &RescanProgress{
524 Hash: hash,
525 Height: height,
526 Time: time,
527 }:
528 case <-s.quit.Wait():
529 case <-s.rescanQuit.Wait():
530 }
531 }
532 // Only send BlockConnected notification if we're processing blocks before the birthday. Otherwise, we can just
533 // update using RescanProgress notifications.
534 if time.Before(s.startTime) {
535 // Send a RescanProgress notification every 10K blocks.
536 if height%10000 == 0 {
537 s.clientMtx.Lock()
538 shouldSend := s.isRescan && !s.finished
539 s.clientMtx.Unlock()
540 if shouldSend {
541 sendRescanProgress()
542 }
543 }
544 } else {
545 // Send a RescanProgress notification if we're just going over the boundary between pre-birthday and
546 // post-birthday blocks, and note that we've sent it.
547 s.clientMtx.Lock()
548 if !s.lastProgressSent {
549 shouldSend := s.isRescan && !s.finished
550 if shouldSend {
551 s.clientMtx.Unlock()
552 sendRescanProgress()
553 s.clientMtx.Lock()
554 s.lastProgressSent = true
555 }
556 }
557 s.clientMtx.Unlock()
558 select {
559 case s.enqueueNotification <- BlockConnected{
560 Block: wtxmgr.Block{
561 Hash: *hash,
562 Height: height,
563 },
564 Time: time,
565 }:
566 case <-s.quit.Wait():
567 case <-s.rescanQuit.Wait():
568 }
569 }
570 }
571 572 // notificationHandler queues and dequeues notifications. There are currently no bounds on the queue, so the dequeue
573 // channel should be read continually to avoid running out of memory.
574 func (s *NeutrinoClient) notificationHandler() {
575 hash, height, e := s.GetBestBlock()
576 if e != nil {
577 E.F("failed to get best block from chain service:", e)
578 s.Stop()
579 s.wg.Done()
580 return
581 }
582 bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height}
583 // TODO: Rather than leaving this as an unbounded queue for all types of notifications, try dropping ones where a
584 // later enqueued notification can fully invalidate one waiting to be processed. For example, blockconnected
585 // notifications for greater block heights can remove the need to process earlier blockconnected notifications still
586 // waiting here.
587 var notifications []interface{}
588 enqueue := s.enqueueNotification
589 var dequeue chan interface{}
590 var next interface{}
591 out:
592 for {
593 s.clientMtx.Lock()
594 rescanErr := s.rescanErr
595 s.clientMtx.Unlock()
596 select {
597 case n, ok := <-enqueue:
598 if !ok {
599 // If no notifications are queued for handling, the queue is finished.
600 if len(notifications) == 0 {
601 break out
602 }
603 // nil channel so no more reads can occur.
604 enqueue = nil
605 continue
606 }
607 if len(notifications) == 0 {
608 next = n
609 dequeue = s.dequeueNotification
610 }
611 notifications = append(notifications, n)
612 case dequeue <- next:
613 if n, ok := next.(BlockConnected); ok {
614 bs = &waddrmgr.BlockStamp{
615 Height: n.Height,
616 Hash: n.Hash,
617 }
618 }
619 notifications[0] = nil
620 notifications = notifications[1:]
621 if len(notifications) != 0 {
622 next = notifications[0]
623 } else {
624 // If no more notifications can be enqueued, the queue is finished.
625 if enqueue == nil {
626 break out
627 }
628 dequeue = nil
629 }
630 case e := <-rescanErr:
631 if e != nil {
632 E.Ln("neutrino rescan ended with error:", e)
633 }
634 case s.currentBlock <- bs:
635 case <-s.quit.Wait():
636 break out
637 }
638 }
639 s.Stop()
640 close(s.dequeueNotification)
641 s.wg.Done()
642 }
643