1 package chainclient
2 3 import (
4 "errors"
5 "github.com/p9c/p9/pkg/btcaddr"
6 "github.com/p9c/p9/pkg/chaincfg"
7 "github.com/p9c/p9/pkg/txscript"
8 "sync"
9 "time"
10 11 "github.com/p9c/p9/pkg/qu"
12 13 "github.com/p9c/p9/pkg/btcjson"
14 "github.com/p9c/p9/pkg/chainhash"
15 "github.com/p9c/p9/pkg/gcs"
16 "github.com/p9c/p9/pkg/gcs/builder"
17 "github.com/p9c/p9/pkg/rpcclient"
18 "github.com/p9c/p9/pkg/util"
19 "github.com/p9c/p9/pkg/waddrmgr"
20 "github.com/p9c/p9/pkg/wire"
21 "github.com/p9c/p9/pkg/wtxmgr"
22 )
23 24 // RPCClient represents a persistent client connection to a bitcoin RPC server for information regarding the current
25 // best block chain.
26 type RPCClient struct {
27 *rpcclient.Client
28 connConfig *rpcclient.ConnConfig // Work around unexported field
29 chainParams *chaincfg.Params
30 reconnectAttempts int
31 enqueueNotification chan interface{}
32 dequeueNotification chan interface{}
33 currentBlock chan *waddrmgr.BlockStamp
34 quit qu.C
35 wg sync.WaitGroup
36 started bool
37 quitMtx sync.Mutex
38 }
39 40 // NewRPCClient creates a client connection to the server described by the connect string. If disableTLS is false, the
41 // remote RPC certificate must be provided in the certs slice. The connection is not established immediately, but must
42 // be done using the Start method. If the remote server does not operate on the same bitcoin network as described by the
43 // passed chain parameters, the connection will be disconnected.
44 func NewRPCClient(
45 chainParams *chaincfg.Params,
46 connect, user, pass string,
47 certs []byte,
48 tls bool,
49 reconnectAttempts int,
50 quit qu.C,
51 ) (*RPCClient, error) {
52 W.Ln("creating new RPC client")
53 if reconnectAttempts < 0 {
54 return nil, errors.New("reconnectAttempts must be positive")
55 }
56 client := &RPCClient{
57 connConfig: &rpcclient.ConnConfig{
58 Host: connect,
59 Endpoint: "ws",
60 User: user,
61 Pass: pass,
62 Certificates: certs,
63 DisableAutoReconnect: false,
64 DisableConnectOnNew: true,
65 TLS: tls,
66 },
67 chainParams: chainParams,
68 reconnectAttempts: reconnectAttempts,
69 enqueueNotification: make(chan interface{}),
70 dequeueNotification: make(chan interface{}),
71 currentBlock: make(chan *waddrmgr.BlockStamp),
72 quit: quit,
73 }
74 ntfnCallbacks := &rpcclient.NotificationHandlers{
75 OnClientConnected: client.onClientConnect,
76 OnBlockConnected: client.onBlockConnected,
77 OnBlockDisconnected: client.onBlockDisconnected,
78 OnRecvTx: client.onRecvTx,
79 OnRedeemingTx: client.onRedeemingTx,
80 OnRescanFinished: client.onRescanFinished,
81 OnRescanProgress: client.onRescanProgress,
82 }
83 W.Ln("*actually* creating rpc client")
84 rpcClient, e := rpcclient.New(client.connConfig, ntfnCallbacks, client.quit)
85 if e != nil {
86 return nil, e
87 }
88 // defer W.Ln("*succeeded* in making rpc client")
89 client.Client = rpcClient
90 return client, nil
91 }
92 93 // BackEnd returns the name of the driver.
94 func (c *RPCClient) BackEnd() string {
95 return "pod"
96 }
97 98 // Start attempts to establish a client connection with the remote server. If successful, handler goroutines are started
99 // to process notifications sent by the server. After a limited number of connection attempts, this function gives up,
100 // and therefore will not block forever waiting for the connection to be established to a server that may not exist.
101 func (c *RPCClient) Start() (e error) {
102 // D.Ln(c.connConfig)
103 e = c.Connect(c.reconnectAttempts)
104 if e != nil {
105 return e
106 }
107 // Verify that the server is running on the expected network.
108 net, e := c.GetCurrentNet()
109 if e != nil {
110 c.Disconnect()
111 return e
112 }
113 if net != c.chainParams.Net {
114 c.Disconnect()
115 return errors.New("mismatched networks")
116 }
117 c.quitMtx.Lock()
118 c.started = true
119 c.quitMtx.Unlock()
120 c.wg.Add(1)
121 go c.handler()
122 return nil
123 }
124 125 // Stop disconnects the client and signals the shutdown of all goroutines started by Start.
126 func (c *RPCClient) Stop() {
127 c.quitMtx.Lock()
128 select {
129 case <-c.quit.Wait():
130 default:
131 c.quit.Q()
132 c.Client.Shutdown()
133 if !c.started {
134 close(c.dequeueNotification)
135 }
136 }
137 c.quitMtx.Unlock()
138 }
139 140 // Rescan wraps the normal Rescan command with an additional parameter that allows us to map an outpoint to the address
141 // in the chain that it pays to. This is useful when using BIP 158 filters as they include the prev pkScript rather than
142 // the full outpoint.
143 func (c *RPCClient) Rescan(
144 startHash *chainhash.Hash, addrs []btcaddr.Address,
145 outPoints map[wire.OutPoint]btcaddr.Address,
146 ) (e error) {
147 flatOutpoints := make([]*wire.OutPoint, 0, len(outPoints))
148 for ops := range outPoints {
149 flatOutpoints = append(flatOutpoints, &ops)
150 }
151 return c.Client.Rescan(startHash, addrs, flatOutpoints)
152 }
153 154 // WaitForShutdown blocks until both the client has finished disconnecting and all handlers have exited.
155 func (c *RPCClient) WaitForShutdown() {
156 c.Client.WaitForShutdown()
157 c.wg.Wait()
158 }
159 160 // Notifications returns a channel of parsed notifications sent by the remote bitcoin RPC server. This channel must be
161 // continually read or the process may abort for running out memory, as unread notifications are queued for later reads.
162 func (c *RPCClient) Notifications() <-chan interface{} {
163 return c.dequeueNotification
164 }
165 166 // BlockStamp returns the latest block notified by the client, or an error if the client has been shut down.
167 func (c *RPCClient) BlockStamp() (*waddrmgr.BlockStamp, error) {
168 select {
169 case bs := <-c.currentBlock:
170 return bs, nil
171 case <-c.quit.Wait():
172 return nil, errors.New("disconnected")
173 }
174 }
175 176 // buildFilterBlocksWatchList constructs a watchlist used for matching against a cfilter from a FilterBlocksRequest. The
177 // watchlist will be populated with all external addresses, internal addresses, and outpoints contained in the request.
178 func buildFilterBlocksWatchList(req *FilterBlocksRequest) ([][]byte, error) {
179 // Construct a watch list containing the script addresses of all internal and external addresses that were
180 // requested, in addition to the set of outpoints currently being watched.
181 watchListSize := len(req.ExternalAddrs) +
182 len(req.InternalAddrs) +
183 len(req.WatchedOutPoints)
184 watchList := make([][]byte, 0, watchListSize)
185 for _, addr := range req.ExternalAddrs {
186 p2shAddr, e := txscript.PayToAddrScript(addr)
187 if e != nil {
188 return nil, e
189 }
190 watchList = append(watchList, p2shAddr)
191 }
192 for _, addr := range req.InternalAddrs {
193 p2shAddr, e := txscript.PayToAddrScript(addr)
194 if e != nil {
195 return nil, e
196 }
197 watchList = append(watchList, p2shAddr)
198 }
199 for _, addr := range req.WatchedOutPoints {
200 addr, e := txscript.PayToAddrScript(addr)
201 if e != nil {
202 return nil, e
203 }
204 watchList = append(watchList, addr)
205 }
206 return watchList, nil
207 }
208 209 // FilterBlocks scans the blocks contained in the FilterBlocksRequest for any addresses of interest. For each requested
210 // block, the corresponding compact filter will first be checked for matches, skipping those that do not report
211 // anything. If the filter returns a positive match, the full block will be fetched and filtered. This method returns a
212 // FilterBlocksResponse for the first block containing a matching address. If no matches are found in the range of
213 // blocks requested, the returned response will be nil.
214 func (c *RPCClient) FilterBlocks(req *FilterBlocksRequest,) (*FilterBlocksResponse, error) {
215 blockFilterer := NewBlockFilterer(c.chainParams, req)
216 // Construct the watchlist using the addresses and outpoints contained in the filter blocks request.
217 watchList, e := buildFilterBlocksWatchList(req)
218 if e != nil {
219 return nil, e
220 }
221 // Iterate over the requested blocks, fetching the compact filter for each one, and matching it against the
222 // watchlist generated above. If the filter returns a positive match, the full block is then requested and scanned
223 // for addresses using the block filterer.
224 for i, blk := range req.Blocks {
225 rawFilter, e := c.GetCFilter(&blk.Hash, wire.GCSFilterRegular)
226 if e != nil {
227 return nil, e
228 }
229 // Ensure the filter is large enough to be deserialized.
230 if len(rawFilter.Data) < 4 {
231 continue
232 }
233 filter, e := gcs.FromNBytes(
234 builder.DefaultP, builder.DefaultM, rawFilter.Data,
235 )
236 if e != nil {
237 return nil, e
238 }
239 // Skip any empty filters.
240 if filter.N() == 0 {
241 continue
242 }
243 key := builder.DeriveKey(&blk.Hash)
244 matched, e := filter.MatchAny(key, watchList)
245 if e != nil {
246 return nil, e
247 } else if !matched {
248 continue
249 }
250 T.F(
251 "fetching block height=%d hash=%v",
252 blk.Height, blk.Hash,
253 )
254 rawBlock, e := c.GetBlock(&blk.Hash)
255 if e != nil {
256 return nil, e
257 }
258 if !blockFilterer.FilterBlock(rawBlock) {
259 continue
260 }
261 // If any external or internal addresses were detected in this block, we return them to the caller so that the
262 // rescan windows can widened with subsequent addresses. The `BatchIndex` is returned so that the caller can
263 // compute the *next* block from which to begin again.
264 resp := &FilterBlocksResponse{
265 BatchIndex: uint32(i),
266 BlockMeta: blk,
267 FoundExternalAddrs: blockFilterer.FoundExternal,
268 FoundInternalAddrs: blockFilterer.FoundInternal,
269 FoundOutPoints: blockFilterer.FoundOutPoints,
270 RelevantTxns: blockFilterer.RelevantTxns,
271 }
272 return resp, nil
273 }
274 // No addresses were found for this range.
275 return nil, nil
276 }
277 278 // parseBlock parses a btcws definition of the block a tx is mined it to the Block structure of the wtxmgr package, and the
279 // block index. This is done here since rpcclient doesn't parse this nicely for us.
280 func parseBlock(block *btcjson.BlockDetails) (*wtxmgr.BlockMeta, error) {
281 if block == nil {
282 return nil, nil
283 }
284 blkHash, e := chainhash.NewHashFromStr(block.Hash)
285 if e != nil {
286 return nil, e
287 }
288 blk := &wtxmgr.BlockMeta{
289 Block: wtxmgr.Block{
290 Height: block.Height,
291 Hash: *blkHash,
292 },
293 Time: time.Unix(block.Time, 0),
294 }
295 return blk, nil
296 }
297 func (c *RPCClient) onClientConnect() {
298 select {
299 case c.enqueueNotification <- ClientConnected{}:
300 case <-c.quit.Wait():
301 }
302 }
303 func (c *RPCClient) onBlockConnected(hash *chainhash.Hash, height int32, time time.Time) {
304 select {
305 case c.enqueueNotification <- BlockConnected{
306 Block: wtxmgr.Block{
307 Hash: *hash,
308 Height: height,
309 },
310 Time: time,
311 }:
312 case <-c.quit.Wait():
313 }
314 }
315 func (c *RPCClient) onBlockDisconnected(hash *chainhash.Hash, height int32, time time.Time) {
316 select {
317 case c.enqueueNotification <- BlockDisconnected{
318 Block: wtxmgr.Block{
319 Hash: *hash,
320 Height: height,
321 },
322 Time: time,
323 }:
324 case <-c.quit.Wait():
325 }
326 }
327 func (c *RPCClient) onRecvTx(tx *util.Tx, block *btcjson.BlockDetails) {
328 blk, e := parseBlock(block)
329 if e != nil {
330 // Log and drop improper notification.
331 E.Ln(
332 "recvtx notification bad block:", e,
333 )
334 return
335 }
336 rec, e := wtxmgr.NewTxRecordFromMsgTx(tx.MsgTx(), time.Now())
337 if e != nil {
338 E.Ln("cannot create transaction record for relevant tx:", e)
339 return
340 }
341 select {
342 case c.enqueueNotification <- RelevantTx{rec, blk}:
343 case <-c.quit.Wait():
344 }
345 }
346 func (c *RPCClient) onRedeemingTx(tx *util.Tx, block *btcjson.BlockDetails) {
347 // Handled exactly like recvtx notifications.
348 c.onRecvTx(tx, block)
349 }
350 func (c *RPCClient) onRescanProgress(hash *chainhash.Hash, height int32, blkTime time.Time) {
351 select {
352 case c.enqueueNotification <- &RescanProgress{hash, height, blkTime}:
353 case <-c.quit.Wait():
354 }
355 }
356 func (c *RPCClient) onRescanFinished(hash *chainhash.Hash, height int32, blkTime time.Time) {
357 select {
358 case c.enqueueNotification <- &RescanFinished{hash, height, blkTime}:
359 case <-c.quit.Wait():
360 }
361 }
362 363 // handler maintains a queue of notifications and the current state (best block) of the chain.
364 func (c *RPCClient) handler() {
365 hash, height, e := c.GetBestBlock()
366 if e != nil {
367 E.Ln("failed to receive best block from chain server:", e)
368 c.Stop()
369 c.wg.Done()
370 return
371 }
372 bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height}
373 // TODO: Rather than leaving this as an unbounded queue for all types of notifications, try dropping ones where a
374 // later enqueued notification can fully invalidate one waiting to be processed. For example, blockconnected
375 // notifications for greater block heights can remove the need to process earlier blockconnected notifications still
376 // waiting here.
377 var notifications []interface{}
378 enqueue := c.enqueueNotification
379 var dequeue chan interface{}
380 var next interface{}
381 out:
382 for {
383 select {
384 case n, ok := <-enqueue:
385 if !ok {
386 // If no notifications are queued for handling, the queue is finished.
387 if len(notifications) == 0 {
388 break out
389 }
390 // nil channel so no more reads can occur.
391 enqueue = nil
392 continue
393 }
394 if len(notifications) == 0 {
395 next = n
396 dequeue = c.dequeueNotification
397 }
398 notifications = append(notifications, n)
399 case dequeue <- next:
400 if n, ok := next.(BlockConnected); ok {
401 bs = &waddrmgr.BlockStamp{
402 Height: n.Height,
403 Hash: n.Hash,
404 }
405 }
406 notifications[0] = nil
407 notifications = notifications[1:]
408 if len(notifications) != 0 {
409 next = notifications[0]
410 } else {
411 // If no more notifications can be enqueued, the queue is finished.
412 if enqueue == nil {
413 break out
414 }
415 dequeue = nil
416 }
417 case c.currentBlock <- bs:
418 case <-c.quit.Wait():
419 D.Ln("legacy rpc handler stopping on quit channel close")
420 break out
421 }
422 }
423 c.Stop()
424 close(c.dequeueNotification)
425 c.wg.Done()
426 }
427 428 // POSTClient creates the equivalent HTTP POST rpcclient.Client.
429 func (c *RPCClient) POSTClient() (*rpcclient.Client, error) {
430 configCopy := *c.connConfig
431 configCopy.HTTPPostMode = true
432 return rpcclient.New(&configCopy, nil, qu.T())
433 }
434