rpc.go raw

   1  package chainclient
   2  
   3  import (
   4  	"errors"
   5  	"github.com/p9c/p9/pkg/btcaddr"
   6  	"github.com/p9c/p9/pkg/chaincfg"
   7  	"github.com/p9c/p9/pkg/txscript"
   8  	"sync"
   9  	"time"
  10  	
  11  	"github.com/p9c/p9/pkg/qu"
  12  	
  13  	"github.com/p9c/p9/pkg/btcjson"
  14  	"github.com/p9c/p9/pkg/chainhash"
  15  	"github.com/p9c/p9/pkg/gcs"
  16  	"github.com/p9c/p9/pkg/gcs/builder"
  17  	"github.com/p9c/p9/pkg/rpcclient"
  18  	"github.com/p9c/p9/pkg/util"
  19  	"github.com/p9c/p9/pkg/waddrmgr"
  20  	"github.com/p9c/p9/pkg/wire"
  21  	"github.com/p9c/p9/pkg/wtxmgr"
  22  )
  23  
  24  // RPCClient represents a persistent client connection to a bitcoin RPC server for information regarding the current
  25  // best block chain.
  26  type RPCClient struct {
  27  	*rpcclient.Client
  28  	connConfig          *rpcclient.ConnConfig // Work around unexported field
  29  	chainParams         *chaincfg.Params
  30  	reconnectAttempts   int
  31  	enqueueNotification chan interface{}
  32  	dequeueNotification chan interface{}
  33  	currentBlock        chan *waddrmgr.BlockStamp
  34  	quit                qu.C
  35  	wg                  sync.WaitGroup
  36  	started             bool
  37  	quitMtx             sync.Mutex
  38  }
  39  
  40  // NewRPCClient creates a client connection to the server described by the connect string. If disableTLS is false, the
  41  // remote RPC certificate must be provided in the certs slice. The connection is not established immediately, but must
  42  // be done using the Start method. If the remote server does not operate on the same bitcoin network as described by the
  43  // passed chain parameters, the connection will be disconnected.
  44  func NewRPCClient(
  45  	chainParams *chaincfg.Params,
  46  	connect, user, pass string,
  47  	certs []byte,
  48  	tls bool,
  49  	reconnectAttempts int,
  50  	quit qu.C,
  51  ) (*RPCClient, error) {
  52  	W.Ln("creating new RPC client")
  53  	if reconnectAttempts < 0 {
  54  		return nil, errors.New("reconnectAttempts must be positive")
  55  	}
  56  	client := &RPCClient{
  57  		connConfig: &rpcclient.ConnConfig{
  58  			Host:                 connect,
  59  			Endpoint:             "ws",
  60  			User:                 user,
  61  			Pass:                 pass,
  62  			Certificates:         certs,
  63  			DisableAutoReconnect: false,
  64  			DisableConnectOnNew:  true,
  65  			TLS:                  tls,
  66  		},
  67  		chainParams:         chainParams,
  68  		reconnectAttempts:   reconnectAttempts,
  69  		enqueueNotification: make(chan interface{}),
  70  		dequeueNotification: make(chan interface{}),
  71  		currentBlock:        make(chan *waddrmgr.BlockStamp),
  72  		quit:                quit,
  73  	}
  74  	ntfnCallbacks := &rpcclient.NotificationHandlers{
  75  		OnClientConnected:   client.onClientConnect,
  76  		OnBlockConnected:    client.onBlockConnected,
  77  		OnBlockDisconnected: client.onBlockDisconnected,
  78  		OnRecvTx:            client.onRecvTx,
  79  		OnRedeemingTx:       client.onRedeemingTx,
  80  		OnRescanFinished:    client.onRescanFinished,
  81  		OnRescanProgress:    client.onRescanProgress,
  82  	}
  83  	W.Ln("*actually* creating rpc client")
  84  	rpcClient, e := rpcclient.New(client.connConfig, ntfnCallbacks, client.quit)
  85  	if e != nil {
  86  		return nil, e
  87  	}
  88  	// defer W.Ln("*succeeded* in making rpc client")
  89  	client.Client = rpcClient
  90  	return client, nil
  91  }
  92  
  93  // BackEnd returns the name of the driver.
  94  func (c *RPCClient) BackEnd() string {
  95  	return "pod"
  96  }
  97  
  98  // Start attempts to establish a client connection with the remote server. If successful, handler goroutines are started
  99  // to process notifications sent by the server. After a limited number of connection attempts, this function gives up,
 100  // and therefore will not block forever waiting for the connection to be established to a server that may not exist.
 101  func (c *RPCClient) Start() (e error) {
 102  	// D.Ln(c.connConfig)
 103  	e = c.Connect(c.reconnectAttempts)
 104  	if e != nil {
 105  		return e
 106  	}
 107  	// Verify that the server is running on the expected network.
 108  	net, e := c.GetCurrentNet()
 109  	if e != nil {
 110  		c.Disconnect()
 111  		return e
 112  	}
 113  	if net != c.chainParams.Net {
 114  		c.Disconnect()
 115  		return errors.New("mismatched networks")
 116  	}
 117  	c.quitMtx.Lock()
 118  	c.started = true
 119  	c.quitMtx.Unlock()
 120  	c.wg.Add(1)
 121  	go c.handler()
 122  	return nil
 123  }
 124  
 125  // Stop disconnects the client and signals the shutdown of all goroutines started by Start.
 126  func (c *RPCClient) Stop() {
 127  	c.quitMtx.Lock()
 128  	select {
 129  	case <-c.quit.Wait():
 130  	default:
 131  		c.quit.Q()
 132  		c.Client.Shutdown()
 133  		if !c.started {
 134  			close(c.dequeueNotification)
 135  		}
 136  	}
 137  	c.quitMtx.Unlock()
 138  }
 139  
 140  // Rescan wraps the normal Rescan command with an additional parameter that allows us to map an outpoint to the address
 141  // in the chain that it pays to. This is useful when using BIP 158 filters as they include the prev pkScript rather than
 142  // the full outpoint.
 143  func (c *RPCClient) Rescan(
 144  	startHash *chainhash.Hash, addrs []btcaddr.Address,
 145  	outPoints map[wire.OutPoint]btcaddr.Address,
 146  ) (e error) {
 147  	flatOutpoints := make([]*wire.OutPoint, 0, len(outPoints))
 148  	for ops := range outPoints {
 149  		flatOutpoints = append(flatOutpoints, &ops)
 150  	}
 151  	return c.Client.Rescan(startHash, addrs, flatOutpoints)
 152  }
 153  
 154  // WaitForShutdown blocks until both the client has finished disconnecting and all handlers have exited.
 155  func (c *RPCClient) WaitForShutdown() {
 156  	c.Client.WaitForShutdown()
 157  	c.wg.Wait()
 158  }
 159  
 160  // Notifications returns a channel of parsed notifications sent by the remote bitcoin RPC server. This channel must be
 161  // continually read or the process may abort for running out memory, as unread notifications are queued for later reads.
 162  func (c *RPCClient) Notifications() <-chan interface{} {
 163  	return c.dequeueNotification
 164  }
 165  
 166  // BlockStamp returns the latest block notified by the client, or an error if the client has been shut down.
 167  func (c *RPCClient) BlockStamp() (*waddrmgr.BlockStamp, error) {
 168  	select {
 169  	case bs := <-c.currentBlock:
 170  		return bs, nil
 171  	case <-c.quit.Wait():
 172  		return nil, errors.New("disconnected")
 173  	}
 174  }
 175  
 176  // buildFilterBlocksWatchList constructs a watchlist used for matching against a cfilter from a FilterBlocksRequest. The
 177  // watchlist will be populated with all external addresses, internal addresses, and outpoints contained in the request.
 178  func buildFilterBlocksWatchList(req *FilterBlocksRequest) ([][]byte, error) {
 179  	// Construct a watch list containing the script addresses of all internal and external addresses that were
 180  	// requested, in addition to the set of outpoints currently being watched.
 181  	watchListSize := len(req.ExternalAddrs) +
 182  		len(req.InternalAddrs) +
 183  		len(req.WatchedOutPoints)
 184  	watchList := make([][]byte, 0, watchListSize)
 185  	for _, addr := range req.ExternalAddrs {
 186  		p2shAddr, e := txscript.PayToAddrScript(addr)
 187  		if e != nil {
 188  			return nil, e
 189  		}
 190  		watchList = append(watchList, p2shAddr)
 191  	}
 192  	for _, addr := range req.InternalAddrs {
 193  		p2shAddr, e := txscript.PayToAddrScript(addr)
 194  		if e != nil {
 195  			return nil, e
 196  		}
 197  		watchList = append(watchList, p2shAddr)
 198  	}
 199  	for _, addr := range req.WatchedOutPoints {
 200  		addr, e := txscript.PayToAddrScript(addr)
 201  		if e != nil {
 202  			return nil, e
 203  		}
 204  		watchList = append(watchList, addr)
 205  	}
 206  	return watchList, nil
 207  }
 208  
 209  // FilterBlocks scans the blocks contained in the FilterBlocksRequest for any addresses of interest. For each requested
 210  // block, the corresponding compact filter will first be checked for matches, skipping those that do not report
 211  // anything. If the filter returns a positive match, the full block will be fetched and filtered. This method returns a
 212  // FilterBlocksResponse for the first block containing a matching address. If no matches are found in the range of
 213  // blocks requested, the returned response will be nil.
 214  func (c *RPCClient) FilterBlocks(req *FilterBlocksRequest,) (*FilterBlocksResponse, error) {
 215  	blockFilterer := NewBlockFilterer(c.chainParams, req)
 216  	// Construct the watchlist using the addresses and outpoints contained in the filter blocks request.
 217  	watchList, e := buildFilterBlocksWatchList(req)
 218  	if e != nil {
 219  		return nil, e
 220  	}
 221  	// Iterate over the requested blocks, fetching the compact filter for each one, and matching it against the
 222  	// watchlist generated above. If the filter returns a positive match, the full block is then requested and scanned
 223  	// for addresses using the block filterer.
 224  	for i, blk := range req.Blocks {
 225  		rawFilter, e := c.GetCFilter(&blk.Hash, wire.GCSFilterRegular)
 226  		if e != nil {
 227  			return nil, e
 228  		}
 229  		// Ensure the filter is large enough to be deserialized.
 230  		if len(rawFilter.Data) < 4 {
 231  			continue
 232  		}
 233  		filter, e := gcs.FromNBytes(
 234  			builder.DefaultP, builder.DefaultM, rawFilter.Data,
 235  		)
 236  		if e != nil {
 237  			return nil, e
 238  		}
 239  		// Skip any empty filters.
 240  		if filter.N() == 0 {
 241  			continue
 242  		}
 243  		key := builder.DeriveKey(&blk.Hash)
 244  		matched, e := filter.MatchAny(key, watchList)
 245  		if e != nil {
 246  			return nil, e
 247  		} else if !matched {
 248  			continue
 249  		}
 250  		T.F(
 251  			"fetching block height=%d hash=%v",
 252  			blk.Height, blk.Hash,
 253  		)
 254  		rawBlock, e := c.GetBlock(&blk.Hash)
 255  		if e != nil {
 256  			return nil, e
 257  		}
 258  		if !blockFilterer.FilterBlock(rawBlock) {
 259  			continue
 260  		}
 261  		// If any external or internal addresses were detected in this block, we return them to the caller so that the
 262  		// rescan windows can widened with subsequent addresses. The `BatchIndex` is returned so that the caller can
 263  		// compute the *next* block from which to begin again.
 264  		resp := &FilterBlocksResponse{
 265  			BatchIndex:         uint32(i),
 266  			BlockMeta:          blk,
 267  			FoundExternalAddrs: blockFilterer.FoundExternal,
 268  			FoundInternalAddrs: blockFilterer.FoundInternal,
 269  			FoundOutPoints:     blockFilterer.FoundOutPoints,
 270  			RelevantTxns:       blockFilterer.RelevantTxns,
 271  		}
 272  		return resp, nil
 273  	}
 274  	// No addresses were found for this range.
 275  	return nil, nil
 276  }
 277  
 278  // parseBlock parses a btcws definition of the block a tx is mined it to the Block structure of the wtxmgr package, and the
 279  // block index. This is done here since rpcclient doesn't parse this nicely for us.
 280  func parseBlock(block *btcjson.BlockDetails) (*wtxmgr.BlockMeta, error) {
 281  	if block == nil {
 282  		return nil, nil
 283  	}
 284  	blkHash, e := chainhash.NewHashFromStr(block.Hash)
 285  	if e != nil {
 286  		return nil, e
 287  	}
 288  	blk := &wtxmgr.BlockMeta{
 289  		Block: wtxmgr.Block{
 290  			Height: block.Height,
 291  			Hash:   *blkHash,
 292  		},
 293  		Time: time.Unix(block.Time, 0),
 294  	}
 295  	return blk, nil
 296  }
 297  func (c *RPCClient) onClientConnect() {
 298  	select {
 299  	case c.enqueueNotification <- ClientConnected{}:
 300  	case <-c.quit.Wait():
 301  	}
 302  }
 303  func (c *RPCClient) onBlockConnected(hash *chainhash.Hash, height int32, time time.Time) {
 304  	select {
 305  	case c.enqueueNotification <- BlockConnected{
 306  		Block: wtxmgr.Block{
 307  			Hash:   *hash,
 308  			Height: height,
 309  		},
 310  		Time: time,
 311  	}:
 312  	case <-c.quit.Wait():
 313  	}
 314  }
 315  func (c *RPCClient) onBlockDisconnected(hash *chainhash.Hash, height int32, time time.Time) {
 316  	select {
 317  	case c.enqueueNotification <- BlockDisconnected{
 318  		Block: wtxmgr.Block{
 319  			Hash:   *hash,
 320  			Height: height,
 321  		},
 322  		Time: time,
 323  	}:
 324  	case <-c.quit.Wait():
 325  	}
 326  }
 327  func (c *RPCClient) onRecvTx(tx *util.Tx, block *btcjson.BlockDetails) {
 328  	blk, e := parseBlock(block)
 329  	if e != nil {
 330  		// Log and drop improper notification.
 331  		E.Ln(
 332  			"recvtx notification bad block:", e,
 333  		)
 334  		return
 335  	}
 336  	rec, e := wtxmgr.NewTxRecordFromMsgTx(tx.MsgTx(), time.Now())
 337  	if e != nil {
 338  		E.Ln("cannot create transaction record for relevant tx:", e)
 339  		return
 340  	}
 341  	select {
 342  	case c.enqueueNotification <- RelevantTx{rec, blk}:
 343  	case <-c.quit.Wait():
 344  	}
 345  }
 346  func (c *RPCClient) onRedeemingTx(tx *util.Tx, block *btcjson.BlockDetails) {
 347  	// Handled exactly like recvtx notifications.
 348  	c.onRecvTx(tx, block)
 349  }
 350  func (c *RPCClient) onRescanProgress(hash *chainhash.Hash, height int32, blkTime time.Time) {
 351  	select {
 352  	case c.enqueueNotification <- &RescanProgress{hash, height, blkTime}:
 353  	case <-c.quit.Wait():
 354  	}
 355  }
 356  func (c *RPCClient) onRescanFinished(hash *chainhash.Hash, height int32, blkTime time.Time) {
 357  	select {
 358  	case c.enqueueNotification <- &RescanFinished{hash, height, blkTime}:
 359  	case <-c.quit.Wait():
 360  	}
 361  }
 362  
 363  // handler maintains a queue of notifications and the current state (best block) of the chain.
 364  func (c *RPCClient) handler() {
 365  	hash, height, e := c.GetBestBlock()
 366  	if e != nil {
 367  		E.Ln("failed to receive best block from chain server:", e)
 368  		c.Stop()
 369  		c.wg.Done()
 370  		return
 371  	}
 372  	bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height}
 373  	// TODO: Rather than leaving this as an unbounded queue for all types of notifications, try dropping ones where a
 374  	//  later enqueued notification can fully invalidate one waiting to be processed. For example, blockconnected
 375  	//  notifications for greater block heights can remove the need to process earlier blockconnected notifications still
 376  	//  waiting here.
 377  	var notifications []interface{}
 378  	enqueue := c.enqueueNotification
 379  	var dequeue chan interface{}
 380  	var next interface{}
 381  out:
 382  	for {
 383  		select {
 384  		case n, ok := <-enqueue:
 385  			if !ok {
 386  				// If no notifications are queued for handling, the queue is finished.
 387  				if len(notifications) == 0 {
 388  					break out
 389  				}
 390  				// nil channel so no more reads can occur.
 391  				enqueue = nil
 392  				continue
 393  			}
 394  			if len(notifications) == 0 {
 395  				next = n
 396  				dequeue = c.dequeueNotification
 397  			}
 398  			notifications = append(notifications, n)
 399  		case dequeue <- next:
 400  			if n, ok := next.(BlockConnected); ok {
 401  				bs = &waddrmgr.BlockStamp{
 402  					Height: n.Height,
 403  					Hash:   n.Hash,
 404  				}
 405  			}
 406  			notifications[0] = nil
 407  			notifications = notifications[1:]
 408  			if len(notifications) != 0 {
 409  				next = notifications[0]
 410  			} else {
 411  				// If no more notifications can be enqueued, the queue is finished.
 412  				if enqueue == nil {
 413  					break out
 414  				}
 415  				dequeue = nil
 416  			}
 417  		case c.currentBlock <- bs:
 418  		case <-c.quit.Wait():
 419  			D.Ln("legacy rpc handler stopping on quit channel close")
 420  			break out
 421  		}
 422  	}
 423  	c.Stop()
 424  	close(c.dequeueNotification)
 425  	c.wg.Done()
 426  }
 427  
 428  // POSTClient creates the equivalent HTTP POST rpcclient.Client.
 429  func (c *RPCClient) POSTClient() (*rpcclient.Client, error) {
 430  	configCopy := *c.connConfig
 431  	configCopy.HTTPPostMode = true
 432  	return rpcclient.New(&configCopy, nil, qu.T())
 433  }
 434