infra.go raw

   1  package rpcclient
   2  
   3  import (
   4  	"bytes"
   5  	"container/list"
   6  	"crypto/tls"
   7  	"crypto/x509"
   8  	"encoding/base64"
   9  	js "encoding/json"
  10  	"errors"
  11  	"fmt"
  12  	"io"
  13  	"io/ioutil"
  14  	"math"
  15  	"net"
  16  	"net/http"
  17  	"net/url"
  18  	"sync"
  19  	"sync/atomic"
  20  	"time"
  21  	
  22  	"github.com/p9c/p9/pkg/qu"
  23  	
  24  	"github.com/btcsuite/go-socks/socks"
  25  	"github.com/btcsuite/websocket"
  26  	
  27  	"github.com/p9c/p9/pkg/btcjson"
  28  )
  29  
  30  var (
  31  	// ErrInvalidAuth is an error to describe the condition where the client is
  32  	// either unable to authenticate or the specified endpoint is incorrect.
  33  	ErrInvalidAuth = errors.New("authentication failure")
  34  	// ErrInvalidEndpoint is an error to describe the condition where the websocket
  35  	// handshake failed with the specified endpoint.
  36  	ErrInvalidEndpoint = errors.New("the endpoint either does not support websockets or does not exist")
  37  	// ErrClientNotConnected is an error to describe the condition where a websocket
  38  	// client has been created, but the connection was never established.
  39  	//
  40  	// This condition differs from ErrClientDisconnect, which represents an
  41  	// established connection that was lost.
  42  	ErrClientNotConnected = errors.New("the client was never connected")
  43  	// ErrClientDisconnect is an error to describe the condition where the client
  44  	// has been disconnected from the RPC server.
  45  	//
  46  	// When the DisableAutoReconnect opt is not set, any outstanding futures when
  47  	// a client disconnect occurs will return this error as will any new requests.
  48  	ErrClientDisconnect = errors.New("the client has been disconnected")
  49  	// ErrClientShutdown is an error to describe the condition where the client is
  50  	// either already shutdown, or in the process of shutting down.
  51  	//
  52  	// Any outstanding futures when a client shutdown occurs will return this error
  53  	// as will any new requests.
  54  	ErrClientShutdown = errors.New("the client has been shutdown")
  55  	// ErrNotWebsocketClient is an error to describe the condition of calling a
  56  	// Client method intended for a websocket client when the client has been
  57  	// configured to run in HTTP POST mode instead.
  58  	ErrNotWebsocketClient = errors.New("client is not configured for websockets")
  59  	// ErrClientAlreadyConnected is an error to describe the condition where a new
  60  	// client connection cannot be established due to a websocket client having
  61  	// already connected to the RPC server.
  62  	ErrClientAlreadyConnected = errors.New("websocket client has already connected")
  63  )
  64  
  65  const (
  66  	// sendBufferSize is the number of elements the websocket send channel can queue
  67  	// before blocking.
  68  	sendBufferSize = 50
  69  	// sendPostBufferSize is the number of elements the HTTP POST send channel can
  70  	// queue before blocking.
  71  	sendPostBufferSize = 100
  72  	// connectionRetryInterval is the amount of time to wait in between retries when
  73  	// automatically reconnecting to an RPC server.
  74  	connectionRetryInterval = time.Second * 5
  75  )
  76  
  77  // sendPostDetails houses an HTTP POST request to send to an RPC server as well
  78  // as the original JSON-RPC command and a channel to reply on when the server
  79  // responds with the result.
  80  type sendPostDetails struct {
  81  	httpRequest *http.Request
  82  	jsonRequest *jsonRequest
  83  }
  84  
  85  // jsonRequest holds information about a json request that is used to properly
  86  // detect, interpret, and deliver a reply to it.
  87  type jsonRequest struct {
  88  	id             uint64
  89  	method         string
  90  	cmd            interface{}
  91  	marshalledJSON []byte
  92  	responseChan   chan *response
  93  }
  94  
  95  // Client represents a Bitcoin RPC client which allows easy access to the
  96  // various RPC methods available on a Bitcoin RPC server.
  97  //
  98  // Each of the wrapper functions handle the details of converting the passed and
  99  // return types to and from the underlying JSON types which are required for the
 100  // JSON-RPC invocations
 101  //
 102  // The client provides each RPC in both synchronous (blocking) and asynchronous
 103  // (non-blocking) forms.
 104  //
 105  // The asynchronous forms are based on the concept of futures where they return
 106  // an instance of a type that promises to deliver the result of the invocation
 107  // at some future time.
 108  //
 109  // Invoking the Receive method on the returned future will block until the
 110  // result is available if it's not already.
 111  type Client struct {
 112  	id uint64 // atomic, so must stay 64-bit aligned
 113  	// config holds the connection configuration assoiated with this client.
 114  	config *ConnConfig
 115  	// wsConn is the underlying websocket connection when not in HTTP POST mode.
 116  	wsConn *websocket.Conn
 117  	// httpClient is the underlying HTTP client to use when running in HTTP
 118  	// POST mode.
 119  	httpClient *http.Client
 120  	// mtx is a mutex to protect access to connection related fields.
 121  	mtx sync.Mutex
 122  	// disconnected indicated whether or not the server is disconnected.
 123  	disconnected bool
 124  	// retryCount holds the number of times the client has tried to reconnect to the
 125  	// RPC server.
 126  	retryCount int64
 127  	// Track command and their response channels by ID.
 128  	requestLock sync.Mutex
 129  	requestMap  map[uint64]*list.Element
 130  	requestList *list.List
 131  	// Notifications.
 132  	ntfnHandlers  *NotificationHandlers
 133  	ntfnStateLock sync.Mutex
 134  	ntfnState     *notificationState
 135  	// Networking infrastructure.
 136  	sendChan        chan []byte
 137  	sendPostChan    chan *sendPostDetails
 138  	connEstablished qu.C
 139  	disconnect      qu.C
 140  	shutdown        qu.C
 141  	wg              sync.WaitGroup
 142  }
 143  
 144  // NextID returns the next id to be used when sending a JSON-RPC message. This
 145  // ID allows responses to be associated with particular requests per the
 146  // JSON-RPC specification.
 147  //
 148  // Typically the consumer of the client does not need to call this function,
 149  // however, if a custom request is being created and used this function should
 150  // be used to ensure the ID is unique amongst all requests being made.
 151  func (c *Client) NextID() uint64 {
 152  	return atomic.AddUint64(&c.id, 1)
 153  }
 154  
 155  // addRequest associates the passed jsonRequest with its id.
 156  //
 157  // This allows the response from the remote server to be unmarshalled to the
 158  // appropriate type and sent to the specified channel when it is received.
 159  //
 160  // If the client has already begun shutting down, ErrClientShutdown is returned
 161  // and the request is not added. This function is safe for concurrent access.
 162  func (c *Client) addRequest(jReq *jsonRequest) (e error) {
 163  	c.requestLock.Lock()
 164  	defer c.requestLock.Unlock()
 165  	// A non-blocking read of the shutdown channel with the request lock held avoids
 166  	// adding the request to the client's internal data structures if the client is
 167  	// in the process of shutting down (and has not yet grabbed the request lock),
 168  	// or has finished shutdown already (responding to each outstanding request with
 169  	// ErrClientShutdown).
 170  	select {
 171  	case <-c.shutdown.Wait():
 172  		return ErrClientShutdown
 173  	default:
 174  	}
 175  	element := c.requestList.PushBack(jReq)
 176  	c.requestMap[jReq.id] = element
 177  	return nil
 178  }
 179  
 180  // removeRequest returns and removes the jsonRequest which contains the response
 181  // channel and original method associated with the passed id or nil if there is
 182  // no association. This function is safe for concurrent access.
 183  func (c *Client) removeRequest(id uint64) *jsonRequest {
 184  	c.requestLock.Lock()
 185  	defer c.requestLock.Unlock()
 186  	element := c.requestMap[id]
 187  	if element != nil {
 188  		delete(c.requestMap, id)
 189  		request := c.requestList.Remove(element).(*jsonRequest)
 190  		return request
 191  	}
 192  	return nil
 193  }
 194  
 195  // removeAllRequests removes all the jsonRequests which contain the response
 196  // channels for outstanding requests.
 197  //
 198  // This function MUST be called with the request lock held.
 199  func (c *Client) removeAllRequests() {
 200  	c.requestMap = make(map[uint64]*list.Element)
 201  	c.requestList.Init()
 202  }
 203  
 204  // trackRegisteredNtfns examines the passed command to see if it is one of the
 205  // notification commands and updates the notification state that is used to
 206  // automatically re-establish registered notifications on reconnects.
 207  func (c *Client) trackRegisteredNtfns(cmd interface{}) {
 208  	// Nothing to do if the caller is not interested in notifications.
 209  	if c.ntfnHandlers == nil {
 210  		return
 211  	}
 212  	c.ntfnStateLock.Lock()
 213  	defer c.ntfnStateLock.Unlock()
 214  	switch bcmd := cmd.(type) {
 215  	case *btcjson.NotifyBlocksCmd:
 216  		c.ntfnState.notifyBlocks = true
 217  	case *btcjson.NotifyNewTransactionsCmd:
 218  		if bcmd.Verbose != nil && *bcmd.Verbose {
 219  			c.ntfnState.notifyNewTxVerbose = true
 220  		} else {
 221  			c.ntfnState.notifyNewTx = true
 222  		}
 223  	case *btcjson.NotifySpentCmd:
 224  		for _, op := range bcmd.OutPoints {
 225  			c.ntfnState.notifySpent[op] = struct{}{}
 226  		}
 227  	case *btcjson.NotifyReceivedCmd:
 228  		for _, addr := range bcmd.Addresses {
 229  			c.ntfnState.notifyReceived[addr] = struct{}{}
 230  		}
 231  	}
 232  }
 233  
 234  type (
 235  	// inMessage is the first type that an incoming message is unmarshalled into. It
 236  	// supports both requests (for notification support) and responses.
 237  	//
 238  	// The partially-unmarshaled message is a notification if the embedded ID (from
 239  	// the response) is nil. Otherwise, it is a response.
 240  	inMessage struct {
 241  		ID *float64 `json:"id"`
 242  		*rawNotification
 243  		*rawResponse
 244  	}
 245  	// rawNotification is a partially-unmarshalled JSON-RPC notification.
 246  	rawNotification struct {
 247  		Method string          `json:"method"`
 248  		Params []js.RawMessage `json:"netparams"`
 249  	}
 250  	// rawResponse is a partially-unmarshalled JSON-RPC response. For this to be
 251  	// valid (according to JSON-RPC 1.0 spec), ID may not be nil.
 252  	rawResponse struct {
 253  		Result js.RawMessage     `json:"result"`
 254  		Error  *btcjson.RPCError `json:"error"`
 255  	}
 256  )
 257  
 258  // response is the raw bytes of a JSON-RPC result, or the error if the response
 259  // error object was non-null.
 260  type response struct {
 261  	result []byte
 262  	err    error
 263  }
 264  
 265  // result checks whether the unmarshalled response contains a non-nil error,
 266  // returning an unmarshalled json.RPCError (or an unmarshaling error) if so.
 267  //
 268  // If the response is not an error, the raw bytes of the request are returned
 269  // for further unmashaling into specific result types.
 270  func (r rawResponse) result() (result []byte, e error) {
 271  	if r.Error != nil {
 272  		return nil, r.Error
 273  	}
 274  	return r.Result, nil
 275  }
 276  
 277  // handleMessage is the main handler for incoming notifications and responses.
 278  func (c *Client) handleMessage(msg []byte) {
 279  	// Attempt to unmarshal the message as either a notification or response.
 280  	var in inMessage
 281  	in.rawResponse = new(rawResponse)
 282  	in.rawNotification = new(rawNotification)
 283  	e := js.Unmarshal(msg, &in)
 284  	if e != nil {
 285  		W.Ln("remote server sent invalid message:", e)
 286  		return
 287  	}
 288  	// JSON-RPC 1.0 notifications are requests with a null id.
 289  	if in.ID == nil {
 290  		ntfn := in.rawNotification
 291  		if ntfn == nil {
 292  			W.Ln("malformed notification: missing method and parameters")
 293  			return
 294  		}
 295  		if ntfn.Method == "" {
 296  			W.Ln("malformed notification: missing method")
 297  			return
 298  		}
 299  		// netparams are not optional: nil isn't valid (but len == 0 is)
 300  		if ntfn.Params == nil {
 301  			W.Ln("malformed notification: missing netparams")
 302  			return
 303  		}
 304  		// Deliver the notification.
 305  		T.Ln("received notification:", in.Method)
 306  		c.handleNotification(in.rawNotification)
 307  		return
 308  	}
 309  	
 310  	// ensure that in.ID can be converted to an integer without loss of precision
 311  	if *in.ID < 0 || *in.ID != math.Trunc(*in.ID) {
 312  		W.Ln("malformed response: invalid identifier")
 313  		return
 314  	}
 315  	if in.rawResponse == nil {
 316  		W.Ln("malformed response: missing result and error")
 317  		return
 318  	}
 319  	id := uint64(*in.ID)
 320  	// Tracef("received response for id %d (result %s)", id, in.Result)
 321  	request := c.removeRequest(id)
 322  	// Nothing more to do if there is no request associated with this reply.
 323  	if request == nil || request.responseChan == nil {
 324  		W.F("received unexpected reply: %s (id %d)", in.Result, id)
 325  		return
 326  	}
 327  	// Since the command was successful, examine it to see if it's a notification,
 328  	// and if is, add it to the notification state so it automatically be
 329  	// re-established on reconnect.
 330  	c.trackRegisteredNtfns(request.cmd)
 331  	// Deliver the response.
 332  	result, e := in.rawResponse.result()
 333  	request.responseChan <- &response{result: result, err: e}
 334  }
 335  
 336  // shouldLogReadError returns whether or not the passed error, which is expected
 337  // to have come from reading from the websocket connection in wsInHandler,
 338  // should be logged.
 339  func (c *Client) shouldLogReadError(e error) bool {
 340  	// No logging when the connection is being forcibly disconnected.
 341  	select {
 342  	case <-c.shutdown.Wait():
 343  		return false
 344  	default:
 345  	}
 346  	// No logging when the connection has been disconnected.
 347  	if e == io.EOF {
 348  		return false
 349  	}
 350  	if opErr, ok := e.(*net.OpError); ok && !opErr.Temporary() {
 351  		return false
 352  	}
 353  	return true
 354  }
 355  
 356  // wsInHandler handles all incoming messages for the websocket connection
 357  // associated with the client. It must be run as a goroutine.
 358  func (c *Client) wsInHandler() {
 359  out:
 360  	for {
 361  		// Break out of the loop once the shutdown channel has been closed. Use a
 362  		// non-blocking select here so we fall through otherwise.
 363  		select {
 364  		case <-c.shutdown.Wait():
 365  			break out
 366  		default:
 367  		}
 368  		_, msg, e := c.wsConn.ReadMessage()
 369  		if e != nil && e != io.ErrUnexpectedEOF && e != io.EOF {
 370  			// Log the error if it's not due to disconnecting.
 371  			if c.shouldLogReadError(e) {
 372  				T.F("websocket receive error from %s: %v %s", c.config.Host, e)
 373  			}
 374  			break out
 375  		}
 376  		c.handleMessage(msg)
 377  	}
 378  	
 379  	// Ensure the connection is closed.
 380  	c.Disconnect()
 381  	c.wg.Done()
 382  	T.Ln("RPC client input handler done for", c.config.Host)
 383  }
 384  
 385  // disconnectChan returns a copy of the current disconnect channel.
 386  //
 387  // The channel is read protected by the client mutex, and is safe to call while
 388  // the channel is being reassigned during a reconnect.
 389  func (c *Client) disconnectChan() <-chan struct{} {
 390  	c.mtx.Lock()
 391  	ch := c.disconnect
 392  	c.mtx.Unlock()
 393  	return ch
 394  }
 395  
 396  // wsOutHandler handles all outgoing messages for the websocket connection.
 397  //
 398  // It uses a buffered channel to serialize output messages while allowing the
 399  // sender to continue running asynchronously. It must be run as a goroutine.
 400  func (c *Client) wsOutHandler() {
 401  out:
 402  	for {
 403  		// Send any messages ready for send until the client is disconnected closed.
 404  		select {
 405  		case msg := <-c.sendChan:
 406  			// T.Ln("### sendChan received message to send")
 407  			var e error
 408  			if e = c.wsConn.WriteMessage(websocket.TextMessage, msg); E.Chk(e) {
 409  				// T.Ln("### sendChan disconnecting client")
 410  				c.Disconnect()
 411  				break out
 412  			}
 413  			// T.Ln("### message sent")
 414  		case <-c.disconnectChan():
 415  			break out
 416  		}
 417  	}
 418  	// Drain any channels before exiting so nothing is left waiting around send.
 419  cleanup:
 420  	for {
 421  		select {
 422  		case <-c.sendChan:
 423  		default:
 424  			break cleanup
 425  		}
 426  	}
 427  	c.wg.Done()
 428  	T.Ln("RPC client output handler done for", c.config.Host)
 429  }
 430  
 431  // sendMessage sends the passed JSON to the connected server using the websocket
 432  // connection. It is backed by a buffered channel, so it will not block until
 433  // the send channel is full.
 434  func (c *Client) sendMessage(marshalledJSON []byte) {
 435  	// T.Ln("### sendMessage")
 436  	// Don't send the message if disconnected.
 437  	select {
 438  	case c.sendChan <- marshalledJSON:
 439  		// T.Ln("### message sent on channel")
 440  	case <-c.disconnectChan():
 441  		// T.Ln("### client is disconnected")
 442  		return
 443  	}
 444  }
 445  
 446  // reregisterNtfns creates and sends commands needed to re-establish the current
 447  // notification state associated with the client.
 448  //
 449  // It should only be called on on reconnect by the resendRequests function.
 450  func (c *Client) reregisterNtfns() (e error) {
 451  	// Nothing to do if the caller is not interested in notifications.
 452  	if c.ntfnHandlers == nil {
 453  		return nil
 454  	}
 455  	// In order to avoid holding the lock on the notification state for the entire
 456  	// time of the potentially long running RPCs issued below, make a copy of it and
 457  	// work from that.
 458  	//
 459  	// Also, other commands will be running concurrently which could modify the
 460  	// notification state (while not under the lock of course) which also register
 461  	// it with the remote RPC server, so this prevents double registrations.
 462  	c.ntfnStateLock.Lock()
 463  	stateCopy := c.ntfnState.Copy()
 464  	c.ntfnStateLock.Unlock()
 465  	// Reregister notifyblocks if needed.
 466  	if stateCopy.notifyBlocks {
 467  		D.Ln("reregistering [notifyblocks]")
 468  		if e := c.NotifyBlocks(); E.Chk(e) {
 469  			return e
 470  		}
 471  	}
 472  	// Reregister notifynewtransactions if needed.
 473  	if stateCopy.notifyNewTx || stateCopy.notifyNewTxVerbose {
 474  		D.F(
 475  			"reregistering [notifynewtransactions] (verbose=%v)",
 476  			stateCopy.notifyNewTxVerbose,
 477  		)
 478  		e := c.NotifyNewTransactions(stateCopy.notifyNewTxVerbose)
 479  		if e != nil {
 480  			return e
 481  		}
 482  	}
 483  	// Reregister the combination of all previously registered notifyspent outpoints
 484  	// in one command if needed.
 485  	nsLen := len(stateCopy.notifySpent)
 486  	if nsLen > 0 {
 487  		outpoints := make([]btcjson.OutPoint, 0, nsLen)
 488  		for op := range stateCopy.notifySpent {
 489  			outpoints = append(outpoints, op)
 490  		}
 491  		D.F("reregistering [notifyspent] outpoints: %v", outpoints)
 492  		if e := c.notifySpentInternal(outpoints).Receive(); E.Chk(e) {
 493  			return e
 494  		}
 495  	}
 496  	// Reregister the combination of all previously registered notifyreceived
 497  	// addresses in one command if needed.
 498  	nrLen := len(stateCopy.notifyReceived)
 499  	if nrLen > 0 {
 500  		addresses := make([]string, 0, nrLen)
 501  		for addr := range stateCopy.notifyReceived {
 502  			addresses = append(addresses, addr)
 503  		}
 504  		D.Ln("reregistering [notifyreceived] addresses:", addresses)
 505  		if e := c.notifyReceivedInternal(addresses).Receive(); E.Chk(e) {
 506  			return e
 507  		}
 508  	}
 509  	return nil
 510  }
 511  
 512  // ignoreResends is a set of all methods for requests that are "long running"
 513  // are not be reissued by the client on reconnect.
 514  var ignoreResends = map[string]struct{}{
 515  	"rescan": {},
 516  }
 517  
 518  // resendRequests resends any requests that had not completed when the client
 519  // disconnected. It is intended to be called once the client has reconnected as
 520  // a separate goroutine.
 521  func (c *Client) resendRequests() {
 522  	// Set the notification state back up. If anything goes wrong, disconnect the client.
 523  	if e := c.reregisterNtfns(); E.Chk(e) {
 524  		W.Ln("unable to re-establish notification state:", e)
 525  		c.Disconnect()
 526  		return
 527  	}
 528  	// Since it's possible to block on send and more requests might be added by the
 529  	// caller while resending, make a copy of all of the requests that need to be
 530  	// resent now and work from the copy. This allows the lock to be released
 531  	// quickly.
 532  	c.requestLock.Lock()
 533  	resendReqs := make([]*jsonRequest, 0, c.requestList.Len())
 534  	var nextElem *list.Element
 535  	for e := c.requestList.Front(); e != nil; e = nextElem {
 536  		nextElem = e.Next()
 537  		jReq := e.Value.(*jsonRequest)
 538  		if _, ok := ignoreResends[jReq.method]; ok {
 539  			// If a request is not sent on reconnect, remove it from the request structures,
 540  			// since no reply is expected.
 541  			delete(c.requestMap, jReq.id)
 542  			c.requestList.Remove(e)
 543  		} else {
 544  			resendReqs = append(resendReqs, jReq)
 545  		}
 546  	}
 547  	c.requestLock.Unlock()
 548  	for _, jReq := range resendReqs {
 549  		// Stop resending commands if the client disconnected again since the next
 550  		// reconnect will handle them.
 551  		if c.Disconnected() {
 552  			return
 553  		}
 554  		T.F("sending command [%s] with id %d %s", jReq.method, jReq.id)
 555  		c.sendMessage(jReq.marshalledJSON)
 556  	}
 557  }
 558  
 559  // wsReconnectHandler listens for client disconnects and automatically tries to
 560  // reconnect with retry interval that scales based on the number of retries.
 561  //
 562  // It also resends any commands that had not completed when the client
 563  // disconnected so the disconnect/reconnect process is largely transparent to
 564  // the caller.
 565  //
 566  // This function is not run when the DisableAutoReconnect config options is set.
 567  //
 568  // This function must be run as a goroutine.
 569  func (c *Client) wsReconnectHandler() {
 570  out:
 571  	for {
 572  		select {
 573  		case <-c.disconnect.Wait():
 574  			// On disconnect, fallthrough to reestablish the connection.
 575  		case <-c.shutdown.Wait():
 576  			break out
 577  		}
 578  	reconnect:
 579  		for {
 580  			select {
 581  			case <-c.shutdown.Wait():
 582  				break out
 583  			default:
 584  			}
 585  			wsConn, e := dial(c.config)
 586  			if e != nil {
 587  				c.retryCount++
 588  				T.F("failed to connect to %s: %v %s", c.config.Host, e)
 589  				// IconScale the retry interval by the number of retries so there is a backoff
 590  				// up to a max of 1 minute.
 591  				scaledInterval := connectionRetryInterval.Nanoseconds() * c.
 592  					retryCount
 593  				scaledDuration := time.Duration(scaledInterval)
 594  				if scaledDuration > time.Minute {
 595  					scaledDuration = time.Minute
 596  				}
 597  				T.F(
 598  					"retrying connection to %s in %s",
 599  					c.config.Host, scaledDuration,
 600  				)
 601  				time.Sleep(scaledDuration)
 602  				continue reconnect
 603  			}
 604  			I.Ln("reestablished connection to RPC server", c.config.Host)
 605  			// Reset the connection state and signal the reconnect happened.
 606  			c.wsConn = wsConn
 607  			c.retryCount = 0
 608  			c.mtx.Lock()
 609  			c.disconnect = qu.T()
 610  			c.disconnected = false
 611  			c.mtx.Unlock()
 612  			// Start processing input and output for the new connection.
 613  			c.start()
 614  			// Reissue pending requests in another goroutine since the send can block.
 615  			go c.resendRequests()
 616  			// Break out of the reconnect loop back to wait for disconnect again.
 617  			break reconnect
 618  		}
 619  	}
 620  	c.wg.Done()
 621  	T.Ln("RPC client reconnect handler done for", c.config.Host)
 622  }
 623  
 624  // handleSendPostMessage handles performing the passed HTTP request, reading the
 625  // result unmarshalling it and delivering the unmarshalled result to the
 626  // provided response channel.
 627  func (c *Client) handleSendPostMessage(details *sendPostDetails) {
 628  	jReq := details.jsonRequest
 629  	// Tracef("sending command [%s] with id %d", jReq.method, jReq.id)
 630  	httpResponse, e := c.httpClient.Do(details.httpRequest)
 631  	if e != nil {
 632  		jReq.responseChan <- &response{err: e}
 633  		return
 634  	}
 635  	// Read the raw bytes and close the response.
 636  	var respBytes []byte
 637  	if respBytes, e = ioutil.ReadAll(httpResponse.Body); E.Chk(e) {
 638  	}
 639  	if e = httpResponse.Body.Close(); E.Chk(e) {
 640  		e = fmt.Errorf("error reading json reply: %v", e)
 641  		jReq.responseChan <- &response{err: e}
 642  		return
 643  	}
 644  	// Try to unmarshal the response as a regular JSON-RPC response.
 645  	var resp rawResponse
 646  	if e = js.Unmarshal(respBytes, &resp); E.Chk(e) {
 647  		// When the response itself isn't a valid JSON-RPC response return an error
 648  		// which includes the HTTP status code and raw response bytes.
 649  		e = fmt.Errorf("status code: %d, response: %q", httpResponse.StatusCode, string(respBytes))
 650  		jReq.responseChan <- &response{err: e}
 651  		return
 652  	}
 653  	var res []byte
 654  	if res, e = resp.result(); E.Chk(e) {
 655  	}
 656  	jReq.responseChan <- &response{result: res, err: e}
 657  }
 658  
 659  // sendPostHandler handles all outgoing messages when the client is running in
 660  // HTTP POST mode.
 661  //
 662  // It uses a buffered channel to serialize output messages while allowing the
 663  // sender to continue running asynchronously. It must be run as a goroutine.
 664  func (c *Client) sendPostHandler() {
 665  out:
 666  	for {
 667  		// Send any messages ready for send until the shutdown channel is closed.
 668  		select {
 669  		case details := <-c.sendPostChan:
 670  			c.handleSendPostMessage(details)
 671  		case <-c.shutdown.Wait():
 672  			break out
 673  		}
 674  	}
 675  	// Drain any wait channels before exiting so nothing is left waiting around to
 676  	// send.
 677  cleanup:
 678  	for {
 679  		select {
 680  		case details := <-c.sendPostChan:
 681  			details.jsonRequest.responseChan <- &response{
 682  				result: nil,
 683  				err:    ErrClientShutdown,
 684  			}
 685  		default:
 686  			break cleanup
 687  		}
 688  	}
 689  	c.wg.Done()
 690  	T.Ln("RPC client send handler done for", c.config.Host)
 691  }
 692  
 693  // sendPostRequest sends the passed HTTP request to the RPC server using the
 694  // HTTP client associated with the client.
 695  //
 696  // It is backed by a buffered channel so it will not block until the send
 697  // channel is full.
 698  func (c *Client) sendPostRequest(httpReq *http.Request, jReq *jsonRequest) {
 699  	// Don't send the message if shutting down.
 700  	select {
 701  	case <-c.shutdown.Wait():
 702  		jReq.responseChan <- &response{result: nil, err: ErrClientShutdown}
 703  	default:
 704  	}
 705  	c.sendPostChan <- &sendPostDetails{
 706  		jsonRequest: jReq,
 707  		httpRequest: httpReq,
 708  	}
 709  }
 710  
 711  // newFutureError returns a new future result channel that already has the
 712  // passed error waiting on the channel with the reply set to nil. This is useful
 713  // to easily return errors from the various Async functions.
 714  func newFutureError(e error) chan *response {
 715  	responseChan := make(chan *response, 1)
 716  	responseChan <- &response{err: e}
 717  	return responseChan
 718  }
 719  
 720  // receiveFuture receives from the passed futureResult channel to extract a
 721  // reply or any errors.
 722  //
 723  // The examined errors include an error in the futureResult and the error in the
 724  // reply from the server. This will block until the result is available on the
 725  // passed channel.
 726  func receiveFuture(f chan *response) ([]byte, error) {
 727  	// Wait for a response on the returned channel.
 728  	r := <-f
 729  	return r.result, r.err
 730  }
 731  
 732  // sendPost sends the passed request to the server by issuing an HTTP POST
 733  // request using the provided response channel for the reply.
 734  //
 735  // Typically a new connection is opened and closed for each command when using
 736  // this method, however, the underlying HTTP client might coalesce multiple
 737  // commands depending on several factors including the remote server
 738  // configuration.
 739  func (c *Client) sendPost(jReq *jsonRequest) {
 740  	// Generate a request to the configured RPC server.
 741  	protocol := "http"
 742  	if c.config.TLS {
 743  		protocol = "https"
 744  	}
 745  	address := protocol + "://" + c.config.Host
 746  	bodyReader := bytes.NewReader(jReq.marshalledJSON)
 747  	httpReq, e := http.NewRequest("POST", address, bodyReader)
 748  	if e != nil {
 749  		jReq.responseChan <- &response{result: nil, err: e}
 750  		return
 751  	}
 752  	httpReq.Close = true
 753  	httpReq.Header.Set("Content-Type", "application/json")
 754  	// Configure basic access authorization.
 755  	httpReq.SetBasicAuth(c.config.User, c.config.Pass)
 756  	// Tracef("sending command [%s] with id %d", jReq.method, jReq.id)
 757  	c.sendPostRequest(httpReq, jReq)
 758  }
 759  
 760  // sendRequest sends the passed json request to the associated server using the
 761  // provided response channel for the reply.
 762  //
 763  // It handles both websocket and HTTP POST mode depending on the configuration
 764  // of the client.
 765  func (c *Client) sendRequest(jReq *jsonRequest) {
 766  	// Choose which marshal and send function to use depending on whether the client
 767  	// running in HTTP POST mode or not.
 768  	//
 769  	// When running in HTTP POST mode, the command is issued via an HTTP client.
 770  	// Otherwise, the command is issued via the asynchronous websocket channels.
 771  	if c.config.HTTPPostMode {
 772  		// T.Ln("### sending via http post mode")
 773  		c.sendPost(jReq)
 774  		return
 775  	}
 776  	// Chk whether the websocket connection has never been established, in which
 777  	// case the handler goroutines are not running.
 778  	// T.Ln("### waiting for connection established")
 779  	select {
 780  	case <-c.connEstablished.Wait():
 781  		// T.Ln("### connEstablished")
 782  	default:
 783  		// T.Ln("### sending back error client not connected")
 784  		jReq.responseChan <- &response{err: ErrClientNotConnected}
 785  		return
 786  	}
 787  	// Add the request to the internal tracking map so the response from the remote
 788  	// server can be properly detected and routed to the response channel. Then send
 789  	// the marshalled request via the websocket connection.
 790  	if e := c.addRequest(jReq); E.Chk(e) {
 791  		// T.Ln("### error", e)
 792  		jReq.responseChan <- &response{err: e}
 793  		return
 794  	}
 795  	// Tracef("### sending command [%s] with id %d", jReq.method, jReq.id)
 796  	c.sendMessage(jReq.marshalledJSON)
 797  }
 798  
 799  // sendCmd sends the passed command to the associated server and returns a
 800  // response channel on which the reply will be delivered at some point in the
 801  // future.
 802  //
 803  // It handles both websocket and HTTP POST mode depending on the configuration
 804  // of the client.
 805  func (c *Client) sendCmd(cmd interface{}) chan *response {
 806  	// T.Ln("### sendCmd")
 807  	// Traces(cmd)
 808  	// Get the method associated with the command.
 809  	var e error
 810  	var method string
 811  	if method, e = btcjson.CmdMethod(cmd); E.Chk(e) {
 812  		// T.Ln("### error", e)
 813  		return newFutureError(e)
 814  	}
 815  	// Marshal the command.
 816  	id := c.NextID()
 817  	var marshalledJSON []byte
 818  	if marshalledJSON, e = btcjson.MarshalCmd(id, cmd); E.Chk(e) {
 819  		return newFutureError(e)
 820  	}
 821  	// Generate the request and send it along with a channel to respond on.
 822  	responseChan := make(chan *response, 1)
 823  	jReq := &jsonRequest{
 824  		id:             id,
 825  		method:         method,
 826  		cmd:            cmd,
 827  		marshalledJSON: marshalledJSON,
 828  		responseChan:   responseChan,
 829  	}
 830  	// T.Ln("### sending request")
 831  	c.sendRequest(jReq)
 832  	return responseChan
 833  }
 834  
 835  // sendCmdAndWait sends the passed command to the associated server, waits for
 836  // the reply, and returns the result from it.
 837  //
 838  // It will return the error field in the reply if there is one.
 839  func (c *Client) sendCmdAndWait(cmd interface{}) (interface{}, error) {
 840  	// Marshal the command to JSON-RPC, send it to the connected server, and wait
 841  	// for a response on the returned channel.
 842  	return receiveFuture(c.sendCmd(cmd))
 843  }
 844  
 845  // Disconnected returns whether or not the server is disconnected. If a
 846  // websocket client was created but never connected, this also returns false.
 847  func (c *Client) Disconnected() bool {
 848  	if c == nil {
 849  		return true
 850  	}
 851  	c.mtx.Lock()
 852  	defer c.mtx.Unlock()
 853  	select {
 854  	case <-c.connEstablished.Wait():
 855  		return c.disconnected
 856  	default:
 857  		return false
 858  	}
 859  }
 860  
 861  // doDisconnect disconnects the websocket associated with the client if it
 862  // hasn't already been disconnected.
 863  //
 864  // It will return false if the disconnect is not needed or the client is running
 865  // in HTTP POST mode. This function is safe for concurrent access.
 866  func (c *Client) doDisconnect() bool {
 867  	if c.config.HTTPPostMode {
 868  		return false
 869  	}
 870  	c.mtx.Lock()
 871  	defer c.mtx.Unlock()
 872  	// Nothing to do if already disconnected.
 873  	if c.disconnected {
 874  		return false
 875  	}
 876  	T.Ln("disconnecting RPC client", c.config.Host)
 877  	c.disconnect.Q()
 878  	if c.wsConn != nil {
 879  		if e := c.wsConn.Close(); E.Chk(e) {
 880  		}
 881  	}
 882  	c.disconnected = true
 883  	return true
 884  }
 885  
 886  // doShutdown closes the shutdown channel and logs the shutdown unless shutdown
 887  // is already in progress.
 888  //
 889  // It will return false if the shutdown is not needed.
 890  //
 891  // This function is safe for concurrent access.
 892  func (c *Client) doShutdown() bool {
 893  	// Ignore the shutdown request if the client is already in the process of
 894  	// shutting down or already shutdown.
 895  	select {
 896  	case <-c.shutdown.Wait():
 897  		return false
 898  	default:
 899  	}
 900  	T.Ln("shutting down RPC client", c.config.Host)
 901  	c.shutdown.Q()
 902  	return true
 903  }
 904  
 905  // Disconnect disconnects the current websocket associated with the client.
 906  //
 907  // The connection will automatically be re-established unless the client was
 908  // created with the DisableAutoReconnect flag. This function has no effect when
 909  // the client is running in HTTP POST mode.
 910  func (c *Client) Disconnect() {
 911  	// Nothing to do if already disconnected or running in HTTP POST mode.
 912  	if !c.doDisconnect() {
 913  		return
 914  	}
 915  	c.requestLock.Lock()
 916  	defer c.requestLock.Unlock()
 917  	// When operating without auto reconnect, send errors to any pending requests
 918  	// and shutdown the client.
 919  	if c.config.DisableAutoReconnect {
 920  		for e := c.requestList.Front(); e != nil; e = e.Next() {
 921  			req := e.Value.(*jsonRequest)
 922  			req.responseChan <- &response{
 923  				result: nil,
 924  				err:    ErrClientDisconnect,
 925  			}
 926  		}
 927  		c.removeAllRequests()
 928  		c.doShutdown()
 929  	}
 930  }
 931  
 932  // Shutdown shuts down the client by disconnecting any connections associated
 933  // with the client and, when automatic reconnect is enabled, preventing future
 934  // attempts to reconnect. It also stops all goroutines.
 935  func (c *Client) Shutdown() {
 936  	// Do the shutdown under the request lock to prevent clients from adding new
 937  	// requests while the client shutdown process is initiated.
 938  	c.requestLock.Lock()
 939  	defer c.requestLock.Unlock()
 940  	// Ignore the shutdown request if the client is already in the process of
 941  	// shutting down or already shutdown.
 942  	if !c.doShutdown() {
 943  		return
 944  	}
 945  	// Send the ErrClientShutdown error to any pending requests.
 946  	for e := c.requestList.Front(); e != nil; e = e.Next() {
 947  		req := e.Value.(*jsonRequest)
 948  		req.responseChan <- &response{
 949  			result: nil,
 950  			err:    ErrClientShutdown,
 951  		}
 952  	}
 953  	c.removeAllRequests()
 954  	// Disconnect the client if needed.
 955  	c.doDisconnect()
 956  }
 957  
 958  // start begins processing input and output messages.
 959  func (c *Client) start() {
 960  	T.Ln("starting RPC client", c.config.Host)
 961  	// Start the I/O processing handlers depending on whether the client is in HTTP
 962  	// POST mode or the default websocket mode.
 963  	if c.config.HTTPPostMode {
 964  		c.wg.Add(1)
 965  		go c.sendPostHandler()
 966  	} else {
 967  		c.wg.Add(3)
 968  		go func() {
 969  			if c.ntfnHandlers != nil {
 970  				if c.ntfnHandlers.OnClientConnected != nil {
 971  					c.ntfnHandlers.OnClientConnected()
 972  				}
 973  			}
 974  			c.wg.Done()
 975  		}()
 976  		go c.wsInHandler()
 977  		go c.wsOutHandler()
 978  	}
 979  }
 980  
 981  // WaitForShutdown blocks until the client goroutines are stopped and the
 982  // connection is closed.
 983  func (c *Client) WaitForShutdown() {
 984  	c.wg.Wait()
 985  }
 986  
 987  // ConnConfig describes the connection configuration parameters for the client.
 988  type ConnConfig struct {
 989  	// Host is the IP address and port of the RPC server you want to connect to.
 990  	Host string
 991  	// Endpoint is the websocket endpoint on the RPC server. This is typically "ws".
 992  	Endpoint string
 993  	// User is the username to use to authenticate to the RPC server.
 994  	User string
 995  	// Pass is the passphrase to use to authenticate to the RPC server.
 996  	Pass string
 997  	// TLS enables transport layer security encryption. It is recommended to always
 998  	// use TLS if the RPC server supports it as otherwise your username and password
 999  	// is sent across the wire in cleartext.
1000  	TLS bool
1001  	// Certificates are the bytes for a PEM-encoded certificate chain used for the
1002  	// TLS connection. It has no effect if the DisableTLS parameter is true.
1003  	Certificates []byte
1004  	// Proxy specifies to connect through a SOCKS 5 proxy server. It may be an empty
1005  	// string if a proxy is not required.
1006  	Proxy string
1007  	// ProxyUser is an optional username to use for the proxy server if it requires
1008  	// authentication. It has no effect if the Proxy parameter is not set.
1009  	ProxyUser string
1010  	// ProxyPass is an optional password to use for the proxy server if it requires
1011  	// authentication. It has no effect if the Proxy parameter is not set.
1012  	ProxyPass string
1013  	// DisableAutoReconnect specifies the client should not automatically try to
1014  	// reconnect to the server when it has been disconnected.
1015  	DisableAutoReconnect bool
1016  	// DisableConnectOnNew specifies that a websocket client connection should not
1017  	// be tried when creating the client with New. Instead, the client is created
1018  	// and returned unconnected, and Connect must be called manually.
1019  	DisableConnectOnNew bool
1020  	// HTTPPostMode instructs the client to run using multiple independent
1021  	// connections issuing HTTP POST requests instead of using the default of
1022  	// websockets.
1023  	//
1024  	// Websockets are generally preferred as some of the features of the client such
1025  	// notifications only work with websockets, however, not all servers support the
1026  	// websocket extensions, so this flag can be set to true to use basic HTTP POST
1027  	// requests instead.
1028  	HTTPPostMode bool
1029  	// EnableBCInfoHacks is an opt provided to enable compatibility hacks when
1030  	// connecting to blockchain.info RPC server
1031  	EnableBCInfoHacks bool
1032  }
1033  
1034  // newHTTPClient returns a new http client that is configured according to the
1035  // proxy and TLS settings in the associated connection configuration.
1036  func newHTTPClient(config *ConnConfig) (*http.Client, error) {
1037  	// Set proxy function if there is a proxy configured.
1038  	var proxyFunc func(*http.Request) (*url.URL, error)
1039  	if config.Proxy != "" {
1040  		proxyURL, e := url.Parse(config.Proxy)
1041  		if e != nil {
1042  			return nil, e
1043  		}
1044  		proxyFunc = http.ProxyURL(proxyURL)
1045  	}
1046  	// Configure TLS if needed.
1047  	var tlsConfig *tls.Config
1048  	if !config.TLS {
1049  		if len(config.Certificates) > 0 {
1050  			pool := x509.NewCertPool()
1051  			pool.AppendCertsFromPEM(config.Certificates)
1052  			tlsConfig = &tls.Config{
1053  				RootCAs: pool,
1054  			}
1055  		}
1056  	}
1057  	client := http.Client{
1058  		Transport: &http.Transport{
1059  			Proxy:           proxyFunc,
1060  			TLSClientConfig: tlsConfig,
1061  		},
1062  	}
1063  	return &client, nil
1064  }
1065  
1066  // dial opens a websocket connection using the passed connection configuration
1067  // details.
1068  func dial(config *ConnConfig) (*websocket.Conn, error) {
1069  	// Setup TLS if not disabled.
1070  	var tlsConfig *tls.Config
1071  	var scheme = "ws"
1072  	if config.TLS {
1073  		tlsConfig = &tls.Config{
1074  			MinVersion: tls.VersionTLS12,
1075  		}
1076  		if len(config.Certificates) > 0 {
1077  			// D.Ln("no certificates for verification")
1078  			pool := x509.NewCertPool()
1079  			pool.AppendCertsFromPEM(config.Certificates)
1080  			tlsConfig.RootCAs = pool
1081  		}
1082  		scheme = "wss"
1083  	}
1084  	// Create a websocket dialer that will be used to make the connection. It is
1085  	// modified by the proxy setting below as needed.
1086  	dialer := websocket.Dialer{TLSClientConfig: tlsConfig}
1087  	// Setup the proxy if one is configured.
1088  	if config.Proxy != "" {
1089  		proxy := &socks.Proxy{
1090  			Addr:     config.Proxy,
1091  			Username: config.ProxyUser,
1092  			Password: config.ProxyPass,
1093  		}
1094  		dialer.NetDial = proxy.Dial
1095  	}
1096  	// The RPC server requires basic authorization, so create a custom request
1097  	// header with the Authorization header set.
1098  	login := config.User + ":" + config.Pass
1099  	auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
1100  	requestHeader := make(http.Header)
1101  	requestHeader.Add("Authorization", auth)
1102  	// Dial the connection.
1103  	address := fmt.Sprintf("%s://%s/%s", scheme, config.Host, config.Endpoint)
1104  	wsConn, resp, e := dialer.Dial(address, requestHeader)
1105  	if e != nil {
1106  		// debug.SetTraceback("all")
1107  		// debug.PrintStack()
1108  		if e != websocket.ErrBadHandshake || resp == nil {
1109  			return nil, e
1110  		}
1111  		// Detect HTTP authentication error status codes.
1112  		if resp.StatusCode == http.StatusUnauthorized ||
1113  			resp.StatusCode == http.StatusForbidden {
1114  			return nil, ErrInvalidAuth
1115  		}
1116  		// The connection was authenticated and the status response was ok, but the
1117  		// websocket handshake still failed, so the endpoint is invalid in some way.
1118  		if resp.StatusCode == http.StatusOK {
1119  			return nil, ErrInvalidEndpoint
1120  		}
1121  		// Return the status text from the server if none of the special cases above
1122  		// apply.
1123  		return nil, errors.New(resp.Status)
1124  	}
1125  	return wsConn, nil
1126  }
1127  
1128  // New creates a new RPC client based on the provided connection configuration
1129  // details.
1130  //
1131  // The notification handlers parameter may be nil if you are not interested in
1132  // receiving notifications and will be ignored if the configuration is set to
1133  // run in HTTP POST mode.
1134  func New(config *ConnConfig, ntfnHandlers *NotificationHandlers, quit qu.C) (*Client, error) {
1135  	// Either open a websocket connection or create an HTTP client depending on the
1136  	// HTTP POST mode. Also, set the notification handlers to nil when running in
1137  	// HTTP POST mode.
1138  	var wsConn *websocket.Conn
1139  	var httpClient *http.Client
1140  	connEstablished := qu.T()
1141  	var start bool
1142  	if config.HTTPPostMode {
1143  		ntfnHandlers = nil
1144  		start = true
1145  		var e error
1146  		httpClient, e = newHTTPClient(config)
1147  		if e != nil {
1148  			return nil, e
1149  		}
1150  	} else {
1151  		if !config.DisableConnectOnNew {
1152  			var e error
1153  			wsConn, e = dial(config)
1154  			if e != nil {
1155  				return nil, e
1156  			}
1157  			start = true
1158  		}
1159  	}
1160  	client := &Client{
1161  		config:          config,
1162  		wsConn:          wsConn,
1163  		httpClient:      httpClient,
1164  		requestMap:      make(map[uint64]*list.Element),
1165  		requestList:     list.New(),
1166  		ntfnHandlers:    ntfnHandlers,
1167  		ntfnState:       newNotificationState(),
1168  		sendChan:        make(chan []byte, sendBufferSize),
1169  		sendPostChan:    make(chan *sendPostDetails, sendPostBufferSize),
1170  		connEstablished: connEstablished,
1171  		disconnect:      qu.T(),
1172  		shutdown:        qu.T(),
1173  	}
1174  	go func() {
1175  	out:
1176  		for {
1177  			select {
1178  			case <-quit.Wait():
1179  				client.disconnect.Q()
1180  				client.shutdown.Q()
1181  				break out
1182  			}
1183  		}
1184  	}()
1185  	if start {
1186  		T.Ln("established connection to RPC server", config.Host)
1187  		connEstablished.Q()
1188  		client.start()
1189  		if !client.config.HTTPPostMode && !client.config.DisableAutoReconnect {
1190  			client.wg.Add(1)
1191  			go client.wsReconnectHandler()
1192  		}
1193  	}
1194  	return client, nil
1195  }
1196  
1197  // Connect establishes the initial websocket connection. This is necessary when
1198  // a client was created after setting the DisableConnectOnNew field of the
1199  // Config struct.
1200  //
1201  // Up to tries number of connections (each after an increasing backoff) will be
1202  // tried if the connection can not be established. The special value of 0
1203  // indicates an unlimited number of connection attempts.
1204  //
1205  // This method will error if the client is not configured for websockets, if the
1206  // connection has already been established, or if none of the connection
1207  // attempts were successful.
1208  func (c *Client) Connect(tries int) (e error) {
1209  	c.mtx.Lock()
1210  	defer c.mtx.Unlock()
1211  	if c.config.HTTPPostMode {
1212  		return ErrNotWebsocketClient
1213  	}
1214  	if c.wsConn != nil {
1215  		return ErrClientAlreadyConnected
1216  	}
1217  	// Begin connection attempts. Increase the backoff after each failed attempt, up
1218  	// to a maximum of one minute.
1219  	var backoff time.Duration
1220  	for i := 0; tries == 0 || i < tries; i++ {
1221  		var wsConn *websocket.Conn
1222  		wsConn, e = dial(c.config)
1223  		if e != nil {
1224  			backoff = connectionRetryInterval * time.Duration(i+1)
1225  			if backoff > time.Minute {
1226  				backoff = time.Minute
1227  			}
1228  			time.Sleep(backoff)
1229  			continue
1230  		}
1231  		// Connection was established. Set the websocket connection member of the client
1232  		// and start the goroutines necessary to run the client.
1233  		D.Ln("established connection to RPC server", c.config.Host)
1234  		c.wsConn = wsConn
1235  		c.connEstablished.Q()
1236  		c.start()
1237  		if !c.config.DisableAutoReconnect {
1238  			c.wg.Add(1)
1239  			go c.wsReconnectHandler()
1240  		}
1241  		return nil
1242  	}
1243  	
1244  	// All connection attempts failed, so return the last error.
1245  	return e
1246  }
1247