1 package chainclient
2 3 import (
4 "bytes"
5 "fmt"
6 "net"
7 "sync"
8 "sync/atomic"
9 "time"
10 11 "github.com/p9c/p9/pkg/qu"
12 13 "github.com/tstranex/gozmq"
14 15 "github.com/p9c/p9/pkg/chaincfg"
16 "github.com/p9c/p9/pkg/chainhash"
17 "github.com/p9c/p9/pkg/rpcclient"
18 "github.com/p9c/p9/pkg/wire"
19 )
20 21 // BitcoindConn represents a persistent client connection to a bitcoind node that listens for events read from a ZMQ
22 // connection.
23 type BitcoindConn struct {
24 started int32 // To be used atomically.
25 stopped int32 // To be used atomically.
26 // rescanClientCounter is an atomic counter that assigns a unique ID to each new bitcoind rescan client using the
27 // current bitcoind connection.
28 rescanClientCounter uint64
29 // chainParams identifies the current network the bitcoind node is running on.
30 chainParams *chaincfg.Params
31 // client is the RPC client to the bitcoind node.
32 client *rpcclient.Client
33 // zmqBlockHost is the host listening for ZMQ connections that will be responsible for delivering raw transaction
34 // events.
35 zmqBlockHost string
36 // zmqTxHost is the host listening for ZMQ connections that will be responsible for delivering raw transaction
37 // events.
38 zmqTxHost string
39 // zmqPollInterval is the interval at which we'll attempt to retrieve an event from the ZMQ connection.
40 zmqPollInterval time.Duration
41 // rescanClients is the set of active bitcoind rescan clients to which ZMQ event notfications will be sent to.
42 rescanClientsMtx sync.Mutex
43 rescanClients map[uint64]*BitcoindClient
44 quit qu.C
45 wg sync.WaitGroup
46 }
47 48 // NewBitcoindConn creates a client connection to the node described by the host string. The connection is not
49 // established immediately, but must be done using the Start method. If the remote node does not operate on the same
50 // bitcoin network as described by the passed chain parameters, the connection will be disconnected.
51 func NewBitcoindConn(
52 chainParams *chaincfg.Params,
53 host, user, pass, zmqBlockHost, zmqTxHost string,
54 zmqPollInterval time.Duration,
55 ) (*BitcoindConn, error) {
56 clientCfg := &rpcclient.ConnConfig{
57 Host: host,
58 User: user,
59 Pass: pass,
60 DisableAutoReconnect: false,
61 DisableConnectOnNew: true,
62 TLS: false,
63 HTTPPostMode: true,
64 }
65 client, e := rpcclient.New(clientCfg, nil, qu.T())
66 if e != nil {
67 return nil, e
68 }
69 conn := &BitcoindConn{
70 chainParams: chainParams,
71 client: client,
72 zmqBlockHost: zmqBlockHost,
73 zmqTxHost: zmqTxHost,
74 zmqPollInterval: zmqPollInterval,
75 rescanClients: make(map[uint64]*BitcoindClient),
76 quit: qu.T(),
77 }
78 return conn, nil
79 }
80 81 // Start attempts to establish a RPC and ZMQ connection to a bitcoind node. If successful, a goroutine is spawned to
82 // read events from the ZMQ connection. It's possible for this function to fail due to a limited number of connection
83 // attempts. This is done to prevent waiting forever on the connection to be established in the case that the node is
84 // down.
85 func (c *BitcoindConn) Start() (e error) {
86 if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
87 return nil
88 }
89 // Verify that the node is running on the expected network.
90 netw, e := c.getCurrentNet()
91 if e != nil {
92 c.client.Disconnect()
93 return e
94 }
95 if netw != c.chainParams.Net {
96 c.client.Disconnect()
97 return fmt.Errorf(
98 "expected network %v, got %v",
99 c.chainParams.Net, netw,
100 )
101 }
102 // Establish two different ZMQ connections to bitcoind to retrieve block and transaction event notifications. We'll
103 // use two as a separation of concern to ensure one type of event isn't dropped from the connection queue due to
104 // another type of event filling it up.
105 zmqBlockConn, e := gozmq.Subscribe(
106 c.zmqBlockHost, []string{"rawblock"},
107 )
108 if e != nil {
109 c.client.Disconnect()
110 return fmt.Errorf(
111 "unable to subscribe for zmq block events: "+
112 "%v", e,
113 )
114 }
115 zmqTxConn, e := gozmq.Subscribe(
116 c.zmqTxHost, []string{"rawtx"},
117 )
118 if e != nil {
119 c.client.Disconnect()
120 return fmt.Errorf(
121 "unable to subscribe for zmq tx events: %v",
122 e,
123 )
124 }
125 c.wg.Add(2)
126 go c.blockEventHandler(zmqBlockConn)
127 go c.txEventHandler(zmqTxConn)
128 return nil
129 }
130 131 // Stop terminates the RPC and ZMQ connection to a bitcoind node and removes any active rescan clients.
132 func (c *BitcoindConn) Stop() {
133 if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
134 return
135 }
136 for _, client := range c.rescanClients {
137 client.Stop()
138 }
139 c.quit.Q()
140 c.client.Shutdown()
141 c.client.WaitForShutdown()
142 c.wg.Wait()
143 }
144 145 // blockEventHandler reads raw blocks events from the ZMQ block socket and forwards them along to the current rescan
146 // clients.
147 //
148 // NOTE: This must be run as a goroutine.
149 func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) {
150 defer c.wg.Done()
151 defer func() {
152 if e := conn.Close(); E.Chk(e) {
153 }
154 }()
155 I.Ln(
156 "started listening for bitcoind block notifications via ZMQ on", c.zmqBlockHost,
157 )
158 for {
159 // Before attempting to read from the ZMQ socket, we'll make sure to check if we've been requested to shut down.
160 select {
161 case <-c.quit.Wait():
162 return
163 default:
164 }
165 // Poll an event from the ZMQ socket.
166 msgBytes, e := conn.Receive()
167 if e != nil {
168 // It's possible that the connection to the socket continuously times out, so we'll prevent logging this
169 // error to prevent spamming the logs.
170 netErr, ok := e.(net.Error)
171 if ok && netErr.Timeout() {
172 continue
173 }
174 E.Ln(
175 "unable to receive ZMQ rawblock message:", e,
176 )
177 continue
178 }
179 // We have an event! We'll now ensure it is a block event, deserialize it, and report it to the different rescan
180 // clients.
181 eventType := string(msgBytes[0])
182 switch eventType {
183 case "rawblock":
184 block := &wire.Block{}
185 r := bytes.NewReader(msgBytes[1])
186 if e := block.Deserialize(r); E.Chk(e) {
187 E.Ln(
188 "unable to deserialize block:", e,
189 )
190 continue
191 }
192 c.rescanClientsMtx.Lock()
193 for _, client := range c.rescanClients {
194 select {
195 case client.zmqBlockNtfns <- block:
196 case <-client.quit.Wait():
197 case <-c.quit.Wait():
198 c.rescanClientsMtx.Unlock()
199 return
200 }
201 }
202 c.rescanClientsMtx.Unlock()
203 default:
204 // It's possible that the message wasn't fully read if bitcoind shuts down, which will produce an unreadable
205 // event type. To prevent from logging it, we'll make sure it conforms to the ASCII standard.
206 if eventType == "" || !isASCII(eventType) {
207 continue
208 }
209 W.Ln(
210 "received unexpected event type from rawblock subscription:",
211 eventType,
212 )
213 }
214 }
215 }
216 217 // txEventHandler reads raw blocks events from the ZMQ block socket and forwards them along to the current rescan
218 // clients.
219 //
220 // NOTE: This must be run as a goroutine.
221 func (c *BitcoindConn) txEventHandler(conn *gozmq.Conn) {
222 defer c.wg.Done()
223 defer func() {
224 if e := conn.Close(); E.Chk(e) {
225 }
226 }()
227 I.Ln(
228 "started listening for bitcoind transaction notifications via ZMQ on",
229 c.zmqTxHost,
230 )
231 for {
232 // Before attempting to read from the ZMQ socket, we'll make sure to check if we've been requested to shut down.
233 select {
234 case <-c.quit.Wait():
235 return
236 default:
237 }
238 // Poll an event from the ZMQ socket.
239 msgBytes, e := conn.Receive()
240 if e != nil {
241 // It's possible that the connection to the socket continuously times out, so we'll prevent logging this
242 // error to prevent spamming the logs.
243 netErr, ok := e.(net.Error)
244 if ok && netErr.Timeout() {
245 continue
246 }
247 E.Ln(
248 "unable to receive ZMQ rawtx message:", e,
249 )
250 continue
251 }
252 // We have an event! We'll now ensure it is a transaction event, deserialize it, and report it to the different
253 // rescan clients.
254 eventType := string(msgBytes[0])
255 switch eventType {
256 case "rawtx":
257 tx := &wire.MsgTx{}
258 r := bytes.NewReader(msgBytes[1])
259 if e := tx.Deserialize(r); E.Chk(e) {
260 E.Ln(
261 "unable to deserialize transaction:", e,
262 )
263 continue
264 }
265 c.rescanClientsMtx.Lock()
266 for _, client := range c.rescanClients {
267 select {
268 case client.zmqTxNtfns <- tx:
269 case <-client.quit.Wait():
270 case <-c.quit.Wait():
271 c.rescanClientsMtx.Unlock()
272 return
273 }
274 }
275 c.rescanClientsMtx.Unlock()
276 default:
277 // It's possible that the message wasn't fully read if bitcoind shuts down, which will produce an unreadable
278 // event type. To prevent from logging it, we'll make sure it conforms to the ASCII standard.
279 if eventType == "" || !isASCII(eventType) {
280 continue
281 }
282 W.Ln(
283 "received unexpected event type from rawtx subscription:",
284 eventType,
285 )
286 }
287 }
288 }
289 290 // getCurrentNet returns the network on which the bitcoind node is running.
291 func (c *BitcoindConn) getCurrentNet() (wire.BitcoinNet, error) {
292 hash, e := c.client.GetBlockHash(0)
293 if e != nil {
294 return 0, e
295 }
296 switch *hash {
297 case *chaincfg.TestNet3Params.GenesisHash:
298 return chaincfg.TestNet3Params.Net, nil
299 case *chaincfg.RegressionTestParams.GenesisHash:
300 return chaincfg.RegressionTestParams.Net, nil
301 case *chaincfg.MainNetParams.GenesisHash:
302 return chaincfg.MainNetParams.Net, nil
303 default:
304 return 0, fmt.Errorf("unknown network with genesis hash %v", hash)
305 }
306 }
307 308 // NewBitcoindClient returns a bitcoind client using the current bitcoind connection. This allows us to share the same
309 // connection using multiple clients.
310 func (c *BitcoindConn) NewBitcoindClient() *BitcoindClient {
311 return &BitcoindClient{
312 quit: qu.T(),
313 id: atomic.AddUint64(&c.rescanClientCounter, 1),
314 chainParams: c.chainParams,
315 chainConn: c,
316 rescanUpdate: make(chan interface{}),
317 watchedAddresses: make(map[string]struct{}),
318 watchedOutPoints: make(map[wire.OutPoint]struct{}),
319 watchedTxs: make(map[chainhash.Hash]struct{}),
320 notificationQueue: NewConcurrentQueue(20),
321 zmqTxNtfns: make(chan *wire.MsgTx),
322 zmqBlockNtfns: make(chan *wire.Block),
323 mempool: make(map[chainhash.Hash]struct{}),
324 expiredMempool: make(map[int32]map[chainhash.Hash]struct{}),
325 }
326 }
327 328 // AddClient adds a client to the set of active rescan clients of the current chain connection. This allows the
329 // connection to include the specified client in its notification delivery.
330 //
331 // NOTE: This function is safe for concurrent access.
332 func (c *BitcoindConn) AddClient(client *BitcoindClient) {
333 c.rescanClientsMtx.Lock()
334 defer c.rescanClientsMtx.Unlock()
335 c.rescanClients[client.id] = client
336 }
337 338 // RemoveClient removes the client with the given ID from the set of active rescan clients. Once removed, the client
339 // will no longer receive block and transaction notifications from the chain connection.
340 //
341 // NOTE: This function is safe for concurrent access.
342 func (c *BitcoindConn) RemoveClient(id uint64) {
343 c.rescanClientsMtx.Lock()
344 defer c.rescanClientsMtx.Unlock()
345 delete(c.rescanClients, id)
346 }
347 348 // isASCII is a helper method that checks whether all bytes in `data` would be printable ASCII characters if interpreted
349 // as a string.
350 func isASCII(s string) bool {
351 for _, c := range s {
352 if c < 32 || c > 126 {
353 return false
354 }
355 }
356 return true
357 }
358