bitcoind_conn.go raw

   1  package chainclient
   2  
   3  import (
   4  	"bytes"
   5  	"fmt"
   6  	"net"
   7  	"sync"
   8  	"sync/atomic"
   9  	"time"
  10  	
  11  	"github.com/p9c/p9/pkg/qu"
  12  	
  13  	"github.com/tstranex/gozmq"
  14  	
  15  	"github.com/p9c/p9/pkg/chaincfg"
  16  	"github.com/p9c/p9/pkg/chainhash"
  17  	"github.com/p9c/p9/pkg/rpcclient"
  18  	"github.com/p9c/p9/pkg/wire"
  19  )
  20  
  21  // BitcoindConn represents a persistent client connection to a bitcoind node that listens for events read from a ZMQ
  22  // connection.
  23  type BitcoindConn struct {
  24  	started int32 // To be used atomically.
  25  	stopped int32 // To be used atomically.
  26  	// rescanClientCounter is an atomic counter that assigns a unique ID to each new bitcoind rescan client using the
  27  	// current bitcoind connection.
  28  	rescanClientCounter uint64
  29  	// chainParams identifies the current network the bitcoind node is running on.
  30  	chainParams *chaincfg.Params
  31  	// client is the RPC client to the bitcoind node.
  32  	client *rpcclient.Client
  33  	// zmqBlockHost is the host listening for ZMQ connections that will be responsible for delivering raw transaction
  34  	// events.
  35  	zmqBlockHost string
  36  	// zmqTxHost is the host listening for ZMQ connections that will be responsible for delivering raw transaction
  37  	// events.
  38  	zmqTxHost string
  39  	// zmqPollInterval is the interval at which we'll attempt to retrieve an event from the ZMQ connection.
  40  	zmqPollInterval time.Duration
  41  	// rescanClients is the set of active bitcoind rescan clients to which ZMQ event notfications will be sent to.
  42  	rescanClientsMtx sync.Mutex
  43  	rescanClients    map[uint64]*BitcoindClient
  44  	quit             qu.C
  45  	wg               sync.WaitGroup
  46  }
  47  
  48  // NewBitcoindConn creates a client connection to the node described by the host string. The connection is not
  49  // established immediately, but must be done using the Start method. If the remote node does not operate on the same
  50  // bitcoin network as described by the passed chain parameters, the connection will be disconnected.
  51  func NewBitcoindConn(
  52  	chainParams *chaincfg.Params,
  53  	host, user, pass, zmqBlockHost, zmqTxHost string,
  54  	zmqPollInterval time.Duration,
  55  ) (*BitcoindConn, error) {
  56  	clientCfg := &rpcclient.ConnConfig{
  57  		Host:                 host,
  58  		User:                 user,
  59  		Pass:                 pass,
  60  		DisableAutoReconnect: false,
  61  		DisableConnectOnNew:  true,
  62  		TLS:                  false,
  63  		HTTPPostMode:         true,
  64  	}
  65  	client, e := rpcclient.New(clientCfg, nil, qu.T())
  66  	if e != nil {
  67  		return nil, e
  68  	}
  69  	conn := &BitcoindConn{
  70  		chainParams:     chainParams,
  71  		client:          client,
  72  		zmqBlockHost:    zmqBlockHost,
  73  		zmqTxHost:       zmqTxHost,
  74  		zmqPollInterval: zmqPollInterval,
  75  		rescanClients:   make(map[uint64]*BitcoindClient),
  76  		quit:            qu.T(),
  77  	}
  78  	return conn, nil
  79  }
  80  
  81  // Start attempts to establish a RPC and ZMQ connection to a bitcoind node. If successful, a goroutine is spawned to
  82  // read events from the ZMQ connection. It's possible for this function to fail due to a limited number of connection
  83  // attempts. This is done to prevent waiting forever on the connection to be established in the case that the node is
  84  // down.
  85  func (c *BitcoindConn) Start() (e error) {
  86  	if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
  87  		return nil
  88  	}
  89  	// Verify that the node is running on the expected network.
  90  	netw, e := c.getCurrentNet()
  91  	if e != nil {
  92  		c.client.Disconnect()
  93  		return e
  94  	}
  95  	if netw != c.chainParams.Net {
  96  		c.client.Disconnect()
  97  		return fmt.Errorf(
  98  			"expected network %v, got %v",
  99  			c.chainParams.Net, netw,
 100  		)
 101  	}
 102  	// Establish two different ZMQ connections to bitcoind to retrieve block and transaction event notifications. We'll
 103  	// use two as a separation of concern to ensure one type of event isn't dropped from the connection queue due to
 104  	// another type of event filling it up.
 105  	zmqBlockConn, e := gozmq.Subscribe(
 106  		c.zmqBlockHost, []string{"rawblock"},
 107  	)
 108  	if e != nil {
 109  		c.client.Disconnect()
 110  		return fmt.Errorf(
 111  			"unable to subscribe for zmq block events: "+
 112  				"%v", e,
 113  		)
 114  	}
 115  	zmqTxConn, e := gozmq.Subscribe(
 116  		c.zmqTxHost, []string{"rawtx"},
 117  	)
 118  	if e != nil {
 119  		c.client.Disconnect()
 120  		return fmt.Errorf(
 121  			"unable to subscribe for zmq tx events: %v",
 122  			e,
 123  		)
 124  	}
 125  	c.wg.Add(2)
 126  	go c.blockEventHandler(zmqBlockConn)
 127  	go c.txEventHandler(zmqTxConn)
 128  	return nil
 129  }
 130  
 131  // Stop terminates the RPC and ZMQ connection to a bitcoind node and removes any active rescan clients.
 132  func (c *BitcoindConn) Stop() {
 133  	if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
 134  		return
 135  	}
 136  	for _, client := range c.rescanClients {
 137  		client.Stop()
 138  	}
 139  	c.quit.Q()
 140  	c.client.Shutdown()
 141  	c.client.WaitForShutdown()
 142  	c.wg.Wait()
 143  }
 144  
 145  // blockEventHandler reads raw blocks events from the ZMQ block socket and forwards them along to the current rescan
 146  // clients.
 147  //
 148  // NOTE: This must be run as a goroutine.
 149  func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) {
 150  	defer c.wg.Done()
 151  	defer func() {
 152  		if e := conn.Close(); E.Chk(e) {
 153  		}
 154  	}()
 155  	I.Ln(
 156  		"started listening for bitcoind block notifications via ZMQ on", c.zmqBlockHost,
 157  	)
 158  	for {
 159  		// Before attempting to read from the ZMQ socket, we'll make sure to check if we've been requested to shut down.
 160  		select {
 161  		case <-c.quit.Wait():
 162  			return
 163  		default:
 164  		}
 165  		// Poll an event from the ZMQ socket.
 166  		msgBytes, e := conn.Receive()
 167  		if e != nil {
 168  			// It's possible that the connection to the socket continuously times out, so we'll prevent logging this
 169  			// error to prevent spamming the logs.
 170  			netErr, ok := e.(net.Error)
 171  			if ok && netErr.Timeout() {
 172  				continue
 173  			}
 174  			E.Ln(
 175  				"unable to receive ZMQ rawblock message:", e,
 176  			)
 177  			continue
 178  		}
 179  		// We have an event! We'll now ensure it is a block event, deserialize it, and report it to the different rescan
 180  		// clients.
 181  		eventType := string(msgBytes[0])
 182  		switch eventType {
 183  		case "rawblock":
 184  			block := &wire.Block{}
 185  			r := bytes.NewReader(msgBytes[1])
 186  			if e := block.Deserialize(r); E.Chk(e) {
 187  				E.Ln(
 188  					"unable to deserialize block:", e,
 189  				)
 190  				continue
 191  			}
 192  			c.rescanClientsMtx.Lock()
 193  			for _, client := range c.rescanClients {
 194  				select {
 195  				case client.zmqBlockNtfns <- block:
 196  				case <-client.quit.Wait():
 197  				case <-c.quit.Wait():
 198  					c.rescanClientsMtx.Unlock()
 199  					return
 200  				}
 201  			}
 202  			c.rescanClientsMtx.Unlock()
 203  		default:
 204  			// It's possible that the message wasn't fully read if bitcoind shuts down, which will produce an unreadable
 205  			// event type. To prevent from logging it, we'll make sure it conforms to the ASCII standard.
 206  			if eventType == "" || !isASCII(eventType) {
 207  				continue
 208  			}
 209  			W.Ln(
 210  				"received unexpected event type from rawblock subscription:",
 211  				eventType,
 212  			)
 213  		}
 214  	}
 215  }
 216  
 217  // txEventHandler reads raw blocks events from the ZMQ block socket and forwards them along to the current rescan
 218  // clients.
 219  //
 220  // NOTE: This must be run as a goroutine.
 221  func (c *BitcoindConn) txEventHandler(conn *gozmq.Conn) {
 222  	defer c.wg.Done()
 223  	defer func() {
 224  		if e := conn.Close(); E.Chk(e) {
 225  		}
 226  	}()
 227  	I.Ln(
 228  		"started listening for bitcoind transaction notifications via ZMQ on",
 229  		c.zmqTxHost,
 230  	)
 231  	for {
 232  		// Before attempting to read from the ZMQ socket, we'll make sure to check if we've been requested to shut down.
 233  		select {
 234  		case <-c.quit.Wait():
 235  			return
 236  		default:
 237  		}
 238  		// Poll an event from the ZMQ socket.
 239  		msgBytes, e := conn.Receive()
 240  		if e != nil {
 241  			// It's possible that the connection to the socket continuously times out, so we'll prevent logging this
 242  			// error to prevent spamming the logs.
 243  			netErr, ok := e.(net.Error)
 244  			if ok && netErr.Timeout() {
 245  				continue
 246  			}
 247  			E.Ln(
 248  				"unable to receive ZMQ rawtx message:", e,
 249  			)
 250  			continue
 251  		}
 252  		// We have an event! We'll now ensure it is a transaction event, deserialize it, and report it to the different
 253  		// rescan clients.
 254  		eventType := string(msgBytes[0])
 255  		switch eventType {
 256  		case "rawtx":
 257  			tx := &wire.MsgTx{}
 258  			r := bytes.NewReader(msgBytes[1])
 259  			if e := tx.Deserialize(r); E.Chk(e) {
 260  				E.Ln(
 261  					"unable to deserialize transaction:", e,
 262  				)
 263  				continue
 264  			}
 265  			c.rescanClientsMtx.Lock()
 266  			for _, client := range c.rescanClients {
 267  				select {
 268  				case client.zmqTxNtfns <- tx:
 269  				case <-client.quit.Wait():
 270  				case <-c.quit.Wait():
 271  					c.rescanClientsMtx.Unlock()
 272  					return
 273  				}
 274  			}
 275  			c.rescanClientsMtx.Unlock()
 276  		default:
 277  			// It's possible that the message wasn't fully read if bitcoind shuts down, which will produce an unreadable
 278  			// event type. To prevent from logging it, we'll make sure it conforms to the ASCII standard.
 279  			if eventType == "" || !isASCII(eventType) {
 280  				continue
 281  			}
 282  			W.Ln(
 283  				"received unexpected event type from rawtx subscription:",
 284  				eventType,
 285  			)
 286  		}
 287  	}
 288  }
 289  
 290  // getCurrentNet returns the network on which the bitcoind node is running.
 291  func (c *BitcoindConn) getCurrentNet() (wire.BitcoinNet, error) {
 292  	hash, e := c.client.GetBlockHash(0)
 293  	if e != nil {
 294  		return 0, e
 295  	}
 296  	switch *hash {
 297  	case *chaincfg.TestNet3Params.GenesisHash:
 298  		return chaincfg.TestNet3Params.Net, nil
 299  	case *chaincfg.RegressionTestParams.GenesisHash:
 300  		return chaincfg.RegressionTestParams.Net, nil
 301  	case *chaincfg.MainNetParams.GenesisHash:
 302  		return chaincfg.MainNetParams.Net, nil
 303  	default:
 304  		return 0, fmt.Errorf("unknown network with genesis hash %v", hash)
 305  	}
 306  }
 307  
 308  // NewBitcoindClient returns a bitcoind client using the current bitcoind connection. This allows us to share the same
 309  // connection using multiple clients.
 310  func (c *BitcoindConn) NewBitcoindClient() *BitcoindClient {
 311  	return &BitcoindClient{
 312  		quit:              qu.T(),
 313  		id:                atomic.AddUint64(&c.rescanClientCounter, 1),
 314  		chainParams:       c.chainParams,
 315  		chainConn:         c,
 316  		rescanUpdate:      make(chan interface{}),
 317  		watchedAddresses:  make(map[string]struct{}),
 318  		watchedOutPoints:  make(map[wire.OutPoint]struct{}),
 319  		watchedTxs:        make(map[chainhash.Hash]struct{}),
 320  		notificationQueue: NewConcurrentQueue(20),
 321  		zmqTxNtfns:        make(chan *wire.MsgTx),
 322  		zmqBlockNtfns:     make(chan *wire.Block),
 323  		mempool:           make(map[chainhash.Hash]struct{}),
 324  		expiredMempool:    make(map[int32]map[chainhash.Hash]struct{}),
 325  	}
 326  }
 327  
 328  // AddClient adds a client to the set of active rescan clients of the current chain connection. This allows the
 329  // connection to include the specified client in its notification delivery.
 330  //
 331  // NOTE: This function is safe for concurrent access.
 332  func (c *BitcoindConn) AddClient(client *BitcoindClient) {
 333  	c.rescanClientsMtx.Lock()
 334  	defer c.rescanClientsMtx.Unlock()
 335  	c.rescanClients[client.id] = client
 336  }
 337  
 338  // RemoveClient removes the client with the given ID from the set of active rescan clients. Once removed, the client
 339  // will no longer receive block and transaction notifications from the chain connection.
 340  //
 341  // NOTE: This function is safe for concurrent access.
 342  func (c *BitcoindConn) RemoveClient(id uint64) {
 343  	c.rescanClientsMtx.Lock()
 344  	defer c.rescanClientsMtx.Unlock()
 345  	delete(c.rescanClients, id)
 346  }
 347  
 348  // isASCII is a helper method that checks whether all bytes in `data` would be printable ASCII characters if interpreted
 349  // as a string.
 350  func isASCII(s string) bool {
 351  	for _, c := range s {
 352  		if c < 32 || c > 126 {
 353  			return false
 354  		}
 355  	}
 356  	return true
 357  }
 358