1 package rpcclient
2 3 import (
4 "bytes"
5 "container/list"
6 "crypto/tls"
7 "crypto/x509"
8 "encoding/base64"
9 js "encoding/json"
10 "errors"
11 "fmt"
12 "io"
13 "io/ioutil"
14 "math"
15 "net"
16 "net/http"
17 "net/url"
18 "sync"
19 "sync/atomic"
20 "time"
21 22 "github.com/p9c/p9/pkg/qu"
23 24 "github.com/btcsuite/go-socks/socks"
25 "github.com/btcsuite/websocket"
26 27 "github.com/p9c/p9/pkg/btcjson"
28 )
29 30 var (
31 // ErrInvalidAuth is an error to describe the condition where the client is
32 // either unable to authenticate or the specified endpoint is incorrect.
33 ErrInvalidAuth = errors.New("authentication failure")
34 // ErrInvalidEndpoint is an error to describe the condition where the websocket
35 // handshake failed with the specified endpoint.
36 ErrInvalidEndpoint = errors.New("the endpoint either does not support websockets or does not exist")
37 // ErrClientNotConnected is an error to describe the condition where a websocket
38 // client has been created, but the connection was never established.
39 //
40 // This condition differs from ErrClientDisconnect, which represents an
41 // established connection that was lost.
42 ErrClientNotConnected = errors.New("the client was never connected")
43 // ErrClientDisconnect is an error to describe the condition where the client
44 // has been disconnected from the RPC server.
45 //
46 // When the DisableAutoReconnect opt is not set, any outstanding futures when
47 // a client disconnect occurs will return this error as will any new requests.
48 ErrClientDisconnect = errors.New("the client has been disconnected")
49 // ErrClientShutdown is an error to describe the condition where the client is
50 // either already shutdown, or in the process of shutting down.
51 //
52 // Any outstanding futures when a client shutdown occurs will return this error
53 // as will any new requests.
54 ErrClientShutdown = errors.New("the client has been shutdown")
55 // ErrNotWebsocketClient is an error to describe the condition of calling a
56 // Client method intended for a websocket client when the client has been
57 // configured to run in HTTP POST mode instead.
58 ErrNotWebsocketClient = errors.New("client is not configured for websockets")
59 // ErrClientAlreadyConnected is an error to describe the condition where a new
60 // client connection cannot be established due to a websocket client having
61 // already connected to the RPC server.
62 ErrClientAlreadyConnected = errors.New("websocket client has already connected")
63 )
64 65 const (
66 // sendBufferSize is the number of elements the websocket send channel can queue
67 // before blocking.
68 sendBufferSize = 50
69 // sendPostBufferSize is the number of elements the HTTP POST send channel can
70 // queue before blocking.
71 sendPostBufferSize = 100
72 // connectionRetryInterval is the amount of time to wait in between retries when
73 // automatically reconnecting to an RPC server.
74 connectionRetryInterval = time.Second * 5
75 )
76 77 // sendPostDetails houses an HTTP POST request to send to an RPC server as well
78 // as the original JSON-RPC command and a channel to reply on when the server
79 // responds with the result.
80 type sendPostDetails struct {
81 httpRequest *http.Request
82 jsonRequest *jsonRequest
83 }
84 85 // jsonRequest holds information about a json request that is used to properly
86 // detect, interpret, and deliver a reply to it.
87 type jsonRequest struct {
88 id uint64
89 method string
90 cmd interface{}
91 marshalledJSON []byte
92 responseChan chan *response
93 }
94 95 // Client represents a Bitcoin RPC client which allows easy access to the
96 // various RPC methods available on a Bitcoin RPC server.
97 //
98 // Each of the wrapper functions handle the details of converting the passed and
99 // return types to and from the underlying JSON types which are required for the
100 // JSON-RPC invocations
101 //
102 // The client provides each RPC in both synchronous (blocking) and asynchronous
103 // (non-blocking) forms.
104 //
105 // The asynchronous forms are based on the concept of futures where they return
106 // an instance of a type that promises to deliver the result of the invocation
107 // at some future time.
108 //
109 // Invoking the Receive method on the returned future will block until the
110 // result is available if it's not already.
111 type Client struct {
112 id uint64 // atomic, so must stay 64-bit aligned
113 // config holds the connection configuration assoiated with this client.
114 config *ConnConfig
115 // wsConn is the underlying websocket connection when not in HTTP POST mode.
116 wsConn *websocket.Conn
117 // httpClient is the underlying HTTP client to use when running in HTTP
118 // POST mode.
119 httpClient *http.Client
120 // mtx is a mutex to protect access to connection related fields.
121 mtx sync.Mutex
122 // disconnected indicated whether or not the server is disconnected.
123 disconnected bool
124 // retryCount holds the number of times the client has tried to reconnect to the
125 // RPC server.
126 retryCount int64
127 // Track command and their response channels by ID.
128 requestLock sync.Mutex
129 requestMap map[uint64]*list.Element
130 requestList *list.List
131 // Notifications.
132 ntfnHandlers *NotificationHandlers
133 ntfnStateLock sync.Mutex
134 ntfnState *notificationState
135 // Networking infrastructure.
136 sendChan chan []byte
137 sendPostChan chan *sendPostDetails
138 connEstablished qu.C
139 disconnect qu.C
140 shutdown qu.C
141 wg sync.WaitGroup
142 }
143 144 // NextID returns the next id to be used when sending a JSON-RPC message. This
145 // ID allows responses to be associated with particular requests per the
146 // JSON-RPC specification.
147 //
148 // Typically the consumer of the client does not need to call this function,
149 // however, if a custom request is being created and used this function should
150 // be used to ensure the ID is unique amongst all requests being made.
151 func (c *Client) NextID() uint64 {
152 return atomic.AddUint64(&c.id, 1)
153 }
154 155 // addRequest associates the passed jsonRequest with its id.
156 //
157 // This allows the response from the remote server to be unmarshalled to the
158 // appropriate type and sent to the specified channel when it is received.
159 //
160 // If the client has already begun shutting down, ErrClientShutdown is returned
161 // and the request is not added. This function is safe for concurrent access.
162 func (c *Client) addRequest(jReq *jsonRequest) (e error) {
163 c.requestLock.Lock()
164 defer c.requestLock.Unlock()
165 // A non-blocking read of the shutdown channel with the request lock held avoids
166 // adding the request to the client's internal data structures if the client is
167 // in the process of shutting down (and has not yet grabbed the request lock),
168 // or has finished shutdown already (responding to each outstanding request with
169 // ErrClientShutdown).
170 select {
171 case <-c.shutdown.Wait():
172 return ErrClientShutdown
173 default:
174 }
175 element := c.requestList.PushBack(jReq)
176 c.requestMap[jReq.id] = element
177 return nil
178 }
179 180 // removeRequest returns and removes the jsonRequest which contains the response
181 // channel and original method associated with the passed id or nil if there is
182 // no association. This function is safe for concurrent access.
183 func (c *Client) removeRequest(id uint64) *jsonRequest {
184 c.requestLock.Lock()
185 defer c.requestLock.Unlock()
186 element := c.requestMap[id]
187 if element != nil {
188 delete(c.requestMap, id)
189 request := c.requestList.Remove(element).(*jsonRequest)
190 return request
191 }
192 return nil
193 }
194 195 // removeAllRequests removes all the jsonRequests which contain the response
196 // channels for outstanding requests.
197 //
198 // This function MUST be called with the request lock held.
199 func (c *Client) removeAllRequests() {
200 c.requestMap = make(map[uint64]*list.Element)
201 c.requestList.Init()
202 }
203 204 // trackRegisteredNtfns examines the passed command to see if it is one of the
205 // notification commands and updates the notification state that is used to
206 // automatically re-establish registered notifications on reconnects.
207 func (c *Client) trackRegisteredNtfns(cmd interface{}) {
208 // Nothing to do if the caller is not interested in notifications.
209 if c.ntfnHandlers == nil {
210 return
211 }
212 c.ntfnStateLock.Lock()
213 defer c.ntfnStateLock.Unlock()
214 switch bcmd := cmd.(type) {
215 case *btcjson.NotifyBlocksCmd:
216 c.ntfnState.notifyBlocks = true
217 case *btcjson.NotifyNewTransactionsCmd:
218 if bcmd.Verbose != nil && *bcmd.Verbose {
219 c.ntfnState.notifyNewTxVerbose = true
220 } else {
221 c.ntfnState.notifyNewTx = true
222 }
223 case *btcjson.NotifySpentCmd:
224 for _, op := range bcmd.OutPoints {
225 c.ntfnState.notifySpent[op] = struct{}{}
226 }
227 case *btcjson.NotifyReceivedCmd:
228 for _, addr := range bcmd.Addresses {
229 c.ntfnState.notifyReceived[addr] = struct{}{}
230 }
231 }
232 }
233 234 type (
235 // inMessage is the first type that an incoming message is unmarshalled into. It
236 // supports both requests (for notification support) and responses.
237 //
238 // The partially-unmarshaled message is a notification if the embedded ID (from
239 // the response) is nil. Otherwise, it is a response.
240 inMessage struct {
241 ID *float64 `json:"id"`
242 *rawNotification
243 *rawResponse
244 }
245 // rawNotification is a partially-unmarshalled JSON-RPC notification.
246 rawNotification struct {
247 Method string `json:"method"`
248 Params []js.RawMessage `json:"netparams"`
249 }
250 // rawResponse is a partially-unmarshalled JSON-RPC response. For this to be
251 // valid (according to JSON-RPC 1.0 spec), ID may not be nil.
252 rawResponse struct {
253 Result js.RawMessage `json:"result"`
254 Error *btcjson.RPCError `json:"error"`
255 }
256 )
257 258 // response is the raw bytes of a JSON-RPC result, or the error if the response
259 // error object was non-null.
260 type response struct {
261 result []byte
262 err error
263 }
264 265 // result checks whether the unmarshalled response contains a non-nil error,
266 // returning an unmarshalled json.RPCError (or an unmarshaling error) if so.
267 //
268 // If the response is not an error, the raw bytes of the request are returned
269 // for further unmashaling into specific result types.
270 func (r rawResponse) result() (result []byte, e error) {
271 if r.Error != nil {
272 return nil, r.Error
273 }
274 return r.Result, nil
275 }
276 277 // handleMessage is the main handler for incoming notifications and responses.
278 func (c *Client) handleMessage(msg []byte) {
279 // Attempt to unmarshal the message as either a notification or response.
280 var in inMessage
281 in.rawResponse = new(rawResponse)
282 in.rawNotification = new(rawNotification)
283 e := js.Unmarshal(msg, &in)
284 if e != nil {
285 W.Ln("remote server sent invalid message:", e)
286 return
287 }
288 // JSON-RPC 1.0 notifications are requests with a null id.
289 if in.ID == nil {
290 ntfn := in.rawNotification
291 if ntfn == nil {
292 W.Ln("malformed notification: missing method and parameters")
293 return
294 }
295 if ntfn.Method == "" {
296 W.Ln("malformed notification: missing method")
297 return
298 }
299 // netparams are not optional: nil isn't valid (but len == 0 is)
300 if ntfn.Params == nil {
301 W.Ln("malformed notification: missing netparams")
302 return
303 }
304 // Deliver the notification.
305 T.Ln("received notification:", in.Method)
306 c.handleNotification(in.rawNotification)
307 return
308 }
309 310 // ensure that in.ID can be converted to an integer without loss of precision
311 if *in.ID < 0 || *in.ID != math.Trunc(*in.ID) {
312 W.Ln("malformed response: invalid identifier")
313 return
314 }
315 if in.rawResponse == nil {
316 W.Ln("malformed response: missing result and error")
317 return
318 }
319 id := uint64(*in.ID)
320 // Tracef("received response for id %d (result %s)", id, in.Result)
321 request := c.removeRequest(id)
322 // Nothing more to do if there is no request associated with this reply.
323 if request == nil || request.responseChan == nil {
324 W.F("received unexpected reply: %s (id %d)", in.Result, id)
325 return
326 }
327 // Since the command was successful, examine it to see if it's a notification,
328 // and if is, add it to the notification state so it automatically be
329 // re-established on reconnect.
330 c.trackRegisteredNtfns(request.cmd)
331 // Deliver the response.
332 result, e := in.rawResponse.result()
333 request.responseChan <- &response{result: result, err: e}
334 }
335 336 // shouldLogReadError returns whether or not the passed error, which is expected
337 // to have come from reading from the websocket connection in wsInHandler,
338 // should be logged.
339 func (c *Client) shouldLogReadError(e error) bool {
340 // No logging when the connection is being forcibly disconnected.
341 select {
342 case <-c.shutdown.Wait():
343 return false
344 default:
345 }
346 // No logging when the connection has been disconnected.
347 if e == io.EOF {
348 return false
349 }
350 if opErr, ok := e.(*net.OpError); ok && !opErr.Temporary() {
351 return false
352 }
353 return true
354 }
355 356 // wsInHandler handles all incoming messages for the websocket connection
357 // associated with the client. It must be run as a goroutine.
358 func (c *Client) wsInHandler() {
359 out:
360 for {
361 // Break out of the loop once the shutdown channel has been closed. Use a
362 // non-blocking select here so we fall through otherwise.
363 select {
364 case <-c.shutdown.Wait():
365 break out
366 default:
367 }
368 _, msg, e := c.wsConn.ReadMessage()
369 if e != nil && e != io.ErrUnexpectedEOF && e != io.EOF {
370 // Log the error if it's not due to disconnecting.
371 if c.shouldLogReadError(e) {
372 T.F("websocket receive error from %s: %v %s", c.config.Host, e)
373 }
374 break out
375 }
376 c.handleMessage(msg)
377 }
378 379 // Ensure the connection is closed.
380 c.Disconnect()
381 c.wg.Done()
382 T.Ln("RPC client input handler done for", c.config.Host)
383 }
384 385 // disconnectChan returns a copy of the current disconnect channel.
386 //
387 // The channel is read protected by the client mutex, and is safe to call while
388 // the channel is being reassigned during a reconnect.
389 func (c *Client) disconnectChan() <-chan struct{} {
390 c.mtx.Lock()
391 ch := c.disconnect
392 c.mtx.Unlock()
393 return ch
394 }
395 396 // wsOutHandler handles all outgoing messages for the websocket connection.
397 //
398 // It uses a buffered channel to serialize output messages while allowing the
399 // sender to continue running asynchronously. It must be run as a goroutine.
400 func (c *Client) wsOutHandler() {
401 out:
402 for {
403 // Send any messages ready for send until the client is disconnected closed.
404 select {
405 case msg := <-c.sendChan:
406 // T.Ln("### sendChan received message to send")
407 var e error
408 if e = c.wsConn.WriteMessage(websocket.TextMessage, msg); E.Chk(e) {
409 // T.Ln("### sendChan disconnecting client")
410 c.Disconnect()
411 break out
412 }
413 // T.Ln("### message sent")
414 case <-c.disconnectChan():
415 break out
416 }
417 }
418 // Drain any channels before exiting so nothing is left waiting around send.
419 cleanup:
420 for {
421 select {
422 case <-c.sendChan:
423 default:
424 break cleanup
425 }
426 }
427 c.wg.Done()
428 T.Ln("RPC client output handler done for", c.config.Host)
429 }
430 431 // sendMessage sends the passed JSON to the connected server using the websocket
432 // connection. It is backed by a buffered channel, so it will not block until
433 // the send channel is full.
434 func (c *Client) sendMessage(marshalledJSON []byte) {
435 // T.Ln("### sendMessage")
436 // Don't send the message if disconnected.
437 select {
438 case c.sendChan <- marshalledJSON:
439 // T.Ln("### message sent on channel")
440 case <-c.disconnectChan():
441 // T.Ln("### client is disconnected")
442 return
443 }
444 }
445 446 // reregisterNtfns creates and sends commands needed to re-establish the current
447 // notification state associated with the client.
448 //
449 // It should only be called on on reconnect by the resendRequests function.
450 func (c *Client) reregisterNtfns() (e error) {
451 // Nothing to do if the caller is not interested in notifications.
452 if c.ntfnHandlers == nil {
453 return nil
454 }
455 // In order to avoid holding the lock on the notification state for the entire
456 // time of the potentially long running RPCs issued below, make a copy of it and
457 // work from that.
458 //
459 // Also, other commands will be running concurrently which could modify the
460 // notification state (while not under the lock of course) which also register
461 // it with the remote RPC server, so this prevents double registrations.
462 c.ntfnStateLock.Lock()
463 stateCopy := c.ntfnState.Copy()
464 c.ntfnStateLock.Unlock()
465 // Reregister notifyblocks if needed.
466 if stateCopy.notifyBlocks {
467 D.Ln("reregistering [notifyblocks]")
468 if e := c.NotifyBlocks(); E.Chk(e) {
469 return e
470 }
471 }
472 // Reregister notifynewtransactions if needed.
473 if stateCopy.notifyNewTx || stateCopy.notifyNewTxVerbose {
474 D.F(
475 "reregistering [notifynewtransactions] (verbose=%v)",
476 stateCopy.notifyNewTxVerbose,
477 )
478 e := c.NotifyNewTransactions(stateCopy.notifyNewTxVerbose)
479 if e != nil {
480 return e
481 }
482 }
483 // Reregister the combination of all previously registered notifyspent outpoints
484 // in one command if needed.
485 nsLen := len(stateCopy.notifySpent)
486 if nsLen > 0 {
487 outpoints := make([]btcjson.OutPoint, 0, nsLen)
488 for op := range stateCopy.notifySpent {
489 outpoints = append(outpoints, op)
490 }
491 D.F("reregistering [notifyspent] outpoints: %v", outpoints)
492 if e := c.notifySpentInternal(outpoints).Receive(); E.Chk(e) {
493 return e
494 }
495 }
496 // Reregister the combination of all previously registered notifyreceived
497 // addresses in one command if needed.
498 nrLen := len(stateCopy.notifyReceived)
499 if nrLen > 0 {
500 addresses := make([]string, 0, nrLen)
501 for addr := range stateCopy.notifyReceived {
502 addresses = append(addresses, addr)
503 }
504 D.Ln("reregistering [notifyreceived] addresses:", addresses)
505 if e := c.notifyReceivedInternal(addresses).Receive(); E.Chk(e) {
506 return e
507 }
508 }
509 return nil
510 }
511 512 // ignoreResends is a set of all methods for requests that are "long running"
513 // are not be reissued by the client on reconnect.
514 var ignoreResends = map[string]struct{}{
515 "rescan": {},
516 }
517 518 // resendRequests resends any requests that had not completed when the client
519 // disconnected. It is intended to be called once the client has reconnected as
520 // a separate goroutine.
521 func (c *Client) resendRequests() {
522 // Set the notification state back up. If anything goes wrong, disconnect the client.
523 if e := c.reregisterNtfns(); E.Chk(e) {
524 W.Ln("unable to re-establish notification state:", e)
525 c.Disconnect()
526 return
527 }
528 // Since it's possible to block on send and more requests might be added by the
529 // caller while resending, make a copy of all of the requests that need to be
530 // resent now and work from the copy. This allows the lock to be released
531 // quickly.
532 c.requestLock.Lock()
533 resendReqs := make([]*jsonRequest, 0, c.requestList.Len())
534 var nextElem *list.Element
535 for e := c.requestList.Front(); e != nil; e = nextElem {
536 nextElem = e.Next()
537 jReq := e.Value.(*jsonRequest)
538 if _, ok := ignoreResends[jReq.method]; ok {
539 // If a request is not sent on reconnect, remove it from the request structures,
540 // since no reply is expected.
541 delete(c.requestMap, jReq.id)
542 c.requestList.Remove(e)
543 } else {
544 resendReqs = append(resendReqs, jReq)
545 }
546 }
547 c.requestLock.Unlock()
548 for _, jReq := range resendReqs {
549 // Stop resending commands if the client disconnected again since the next
550 // reconnect will handle them.
551 if c.Disconnected() {
552 return
553 }
554 T.F("sending command [%s] with id %d %s", jReq.method, jReq.id)
555 c.sendMessage(jReq.marshalledJSON)
556 }
557 }
558 559 // wsReconnectHandler listens for client disconnects and automatically tries to
560 // reconnect with retry interval that scales based on the number of retries.
561 //
562 // It also resends any commands that had not completed when the client
563 // disconnected so the disconnect/reconnect process is largely transparent to
564 // the caller.
565 //
566 // This function is not run when the DisableAutoReconnect config options is set.
567 //
568 // This function must be run as a goroutine.
569 func (c *Client) wsReconnectHandler() {
570 out:
571 for {
572 select {
573 case <-c.disconnect.Wait():
574 // On disconnect, fallthrough to reestablish the connection.
575 case <-c.shutdown.Wait():
576 break out
577 }
578 reconnect:
579 for {
580 select {
581 case <-c.shutdown.Wait():
582 break out
583 default:
584 }
585 wsConn, e := dial(c.config)
586 if e != nil {
587 c.retryCount++
588 T.F("failed to connect to %s: %v %s", c.config.Host, e)
589 // IconScale the retry interval by the number of retries so there is a backoff
590 // up to a max of 1 minute.
591 scaledInterval := connectionRetryInterval.Nanoseconds() * c.
592 retryCount
593 scaledDuration := time.Duration(scaledInterval)
594 if scaledDuration > time.Minute {
595 scaledDuration = time.Minute
596 }
597 T.F(
598 "retrying connection to %s in %s",
599 c.config.Host, scaledDuration,
600 )
601 time.Sleep(scaledDuration)
602 continue reconnect
603 }
604 I.Ln("reestablished connection to RPC server", c.config.Host)
605 // Reset the connection state and signal the reconnect happened.
606 c.wsConn = wsConn
607 c.retryCount = 0
608 c.mtx.Lock()
609 c.disconnect = qu.T()
610 c.disconnected = false
611 c.mtx.Unlock()
612 // Start processing input and output for the new connection.
613 c.start()
614 // Reissue pending requests in another goroutine since the send can block.
615 go c.resendRequests()
616 // Break out of the reconnect loop back to wait for disconnect again.
617 break reconnect
618 }
619 }
620 c.wg.Done()
621 T.Ln("RPC client reconnect handler done for", c.config.Host)
622 }
623 624 // handleSendPostMessage handles performing the passed HTTP request, reading the
625 // result unmarshalling it and delivering the unmarshalled result to the
626 // provided response channel.
627 func (c *Client) handleSendPostMessage(details *sendPostDetails) {
628 jReq := details.jsonRequest
629 // Tracef("sending command [%s] with id %d", jReq.method, jReq.id)
630 httpResponse, e := c.httpClient.Do(details.httpRequest)
631 if e != nil {
632 jReq.responseChan <- &response{err: e}
633 return
634 }
635 // Read the raw bytes and close the response.
636 var respBytes []byte
637 if respBytes, e = ioutil.ReadAll(httpResponse.Body); E.Chk(e) {
638 }
639 if e = httpResponse.Body.Close(); E.Chk(e) {
640 e = fmt.Errorf("error reading json reply: %v", e)
641 jReq.responseChan <- &response{err: e}
642 return
643 }
644 // Try to unmarshal the response as a regular JSON-RPC response.
645 var resp rawResponse
646 if e = js.Unmarshal(respBytes, &resp); E.Chk(e) {
647 // When the response itself isn't a valid JSON-RPC response return an error
648 // which includes the HTTP status code and raw response bytes.
649 e = fmt.Errorf("status code: %d, response: %q", httpResponse.StatusCode, string(respBytes))
650 jReq.responseChan <- &response{err: e}
651 return
652 }
653 var res []byte
654 if res, e = resp.result(); E.Chk(e) {
655 }
656 jReq.responseChan <- &response{result: res, err: e}
657 }
658 659 // sendPostHandler handles all outgoing messages when the client is running in
660 // HTTP POST mode.
661 //
662 // It uses a buffered channel to serialize output messages while allowing the
663 // sender to continue running asynchronously. It must be run as a goroutine.
664 func (c *Client) sendPostHandler() {
665 out:
666 for {
667 // Send any messages ready for send until the shutdown channel is closed.
668 select {
669 case details := <-c.sendPostChan:
670 c.handleSendPostMessage(details)
671 case <-c.shutdown.Wait():
672 break out
673 }
674 }
675 // Drain any wait channels before exiting so nothing is left waiting around to
676 // send.
677 cleanup:
678 for {
679 select {
680 case details := <-c.sendPostChan:
681 details.jsonRequest.responseChan <- &response{
682 result: nil,
683 err: ErrClientShutdown,
684 }
685 default:
686 break cleanup
687 }
688 }
689 c.wg.Done()
690 T.Ln("RPC client send handler done for", c.config.Host)
691 }
692 693 // sendPostRequest sends the passed HTTP request to the RPC server using the
694 // HTTP client associated with the client.
695 //
696 // It is backed by a buffered channel so it will not block until the send
697 // channel is full.
698 func (c *Client) sendPostRequest(httpReq *http.Request, jReq *jsonRequest) {
699 // Don't send the message if shutting down.
700 select {
701 case <-c.shutdown.Wait():
702 jReq.responseChan <- &response{result: nil, err: ErrClientShutdown}
703 default:
704 }
705 c.sendPostChan <- &sendPostDetails{
706 jsonRequest: jReq,
707 httpRequest: httpReq,
708 }
709 }
710 711 // newFutureError returns a new future result channel that already has the
712 // passed error waiting on the channel with the reply set to nil. This is useful
713 // to easily return errors from the various Async functions.
714 func newFutureError(e error) chan *response {
715 responseChan := make(chan *response, 1)
716 responseChan <- &response{err: e}
717 return responseChan
718 }
719 720 // receiveFuture receives from the passed futureResult channel to extract a
721 // reply or any errors.
722 //
723 // The examined errors include an error in the futureResult and the error in the
724 // reply from the server. This will block until the result is available on the
725 // passed channel.
726 func receiveFuture(f chan *response) ([]byte, error) {
727 // Wait for a response on the returned channel.
728 r := <-f
729 return r.result, r.err
730 }
731 732 // sendPost sends the passed request to the server by issuing an HTTP POST
733 // request using the provided response channel for the reply.
734 //
735 // Typically a new connection is opened and closed for each command when using
736 // this method, however, the underlying HTTP client might coalesce multiple
737 // commands depending on several factors including the remote server
738 // configuration.
739 func (c *Client) sendPost(jReq *jsonRequest) {
740 // Generate a request to the configured RPC server.
741 protocol := "http"
742 if c.config.TLS {
743 protocol = "https"
744 }
745 address := protocol + "://" + c.config.Host
746 bodyReader := bytes.NewReader(jReq.marshalledJSON)
747 httpReq, e := http.NewRequest("POST", address, bodyReader)
748 if e != nil {
749 jReq.responseChan <- &response{result: nil, err: e}
750 return
751 }
752 httpReq.Close = true
753 httpReq.Header.Set("Content-Type", "application/json")
754 // Configure basic access authorization.
755 httpReq.SetBasicAuth(c.config.User, c.config.Pass)
756 // Tracef("sending command [%s] with id %d", jReq.method, jReq.id)
757 c.sendPostRequest(httpReq, jReq)
758 }
759 760 // sendRequest sends the passed json request to the associated server using the
761 // provided response channel for the reply.
762 //
763 // It handles both websocket and HTTP POST mode depending on the configuration
764 // of the client.
765 func (c *Client) sendRequest(jReq *jsonRequest) {
766 // Choose which marshal and send function to use depending on whether the client
767 // running in HTTP POST mode or not.
768 //
769 // When running in HTTP POST mode, the command is issued via an HTTP client.
770 // Otherwise, the command is issued via the asynchronous websocket channels.
771 if c.config.HTTPPostMode {
772 // T.Ln("### sending via http post mode")
773 c.sendPost(jReq)
774 return
775 }
776 // Chk whether the websocket connection has never been established, in which
777 // case the handler goroutines are not running.
778 // T.Ln("### waiting for connection established")
779 select {
780 case <-c.connEstablished.Wait():
781 // T.Ln("### connEstablished")
782 default:
783 // T.Ln("### sending back error client not connected")
784 jReq.responseChan <- &response{err: ErrClientNotConnected}
785 return
786 }
787 // Add the request to the internal tracking map so the response from the remote
788 // server can be properly detected and routed to the response channel. Then send
789 // the marshalled request via the websocket connection.
790 if e := c.addRequest(jReq); E.Chk(e) {
791 // T.Ln("### error", e)
792 jReq.responseChan <- &response{err: e}
793 return
794 }
795 // Tracef("### sending command [%s] with id %d", jReq.method, jReq.id)
796 c.sendMessage(jReq.marshalledJSON)
797 }
798 799 // sendCmd sends the passed command to the associated server and returns a
800 // response channel on which the reply will be delivered at some point in the
801 // future.
802 //
803 // It handles both websocket and HTTP POST mode depending on the configuration
804 // of the client.
805 func (c *Client) sendCmd(cmd interface{}) chan *response {
806 // T.Ln("### sendCmd")
807 // Traces(cmd)
808 // Get the method associated with the command.
809 var e error
810 var method string
811 if method, e = btcjson.CmdMethod(cmd); E.Chk(e) {
812 // T.Ln("### error", e)
813 return newFutureError(e)
814 }
815 // Marshal the command.
816 id := c.NextID()
817 var marshalledJSON []byte
818 if marshalledJSON, e = btcjson.MarshalCmd(id, cmd); E.Chk(e) {
819 return newFutureError(e)
820 }
821 // Generate the request and send it along with a channel to respond on.
822 responseChan := make(chan *response, 1)
823 jReq := &jsonRequest{
824 id: id,
825 method: method,
826 cmd: cmd,
827 marshalledJSON: marshalledJSON,
828 responseChan: responseChan,
829 }
830 // T.Ln("### sending request")
831 c.sendRequest(jReq)
832 return responseChan
833 }
834 835 // sendCmdAndWait sends the passed command to the associated server, waits for
836 // the reply, and returns the result from it.
837 //
838 // It will return the error field in the reply if there is one.
839 func (c *Client) sendCmdAndWait(cmd interface{}) (interface{}, error) {
840 // Marshal the command to JSON-RPC, send it to the connected server, and wait
841 // for a response on the returned channel.
842 return receiveFuture(c.sendCmd(cmd))
843 }
844 845 // Disconnected returns whether or not the server is disconnected. If a
846 // websocket client was created but never connected, this also returns false.
847 func (c *Client) Disconnected() bool {
848 if c == nil {
849 return true
850 }
851 c.mtx.Lock()
852 defer c.mtx.Unlock()
853 select {
854 case <-c.connEstablished.Wait():
855 return c.disconnected
856 default:
857 return false
858 }
859 }
860 861 // doDisconnect disconnects the websocket associated with the client if it
862 // hasn't already been disconnected.
863 //
864 // It will return false if the disconnect is not needed or the client is running
865 // in HTTP POST mode. This function is safe for concurrent access.
866 func (c *Client) doDisconnect() bool {
867 if c.config.HTTPPostMode {
868 return false
869 }
870 c.mtx.Lock()
871 defer c.mtx.Unlock()
872 // Nothing to do if already disconnected.
873 if c.disconnected {
874 return false
875 }
876 T.Ln("disconnecting RPC client", c.config.Host)
877 c.disconnect.Q()
878 if c.wsConn != nil {
879 if e := c.wsConn.Close(); E.Chk(e) {
880 }
881 }
882 c.disconnected = true
883 return true
884 }
885 886 // doShutdown closes the shutdown channel and logs the shutdown unless shutdown
887 // is already in progress.
888 //
889 // It will return false if the shutdown is not needed.
890 //
891 // This function is safe for concurrent access.
892 func (c *Client) doShutdown() bool {
893 // Ignore the shutdown request if the client is already in the process of
894 // shutting down or already shutdown.
895 select {
896 case <-c.shutdown.Wait():
897 return false
898 default:
899 }
900 T.Ln("shutting down RPC client", c.config.Host)
901 c.shutdown.Q()
902 return true
903 }
904 905 // Disconnect disconnects the current websocket associated with the client.
906 //
907 // The connection will automatically be re-established unless the client was
908 // created with the DisableAutoReconnect flag. This function has no effect when
909 // the client is running in HTTP POST mode.
910 func (c *Client) Disconnect() {
911 // Nothing to do if already disconnected or running in HTTP POST mode.
912 if !c.doDisconnect() {
913 return
914 }
915 c.requestLock.Lock()
916 defer c.requestLock.Unlock()
917 // When operating without auto reconnect, send errors to any pending requests
918 // and shutdown the client.
919 if c.config.DisableAutoReconnect {
920 for e := c.requestList.Front(); e != nil; e = e.Next() {
921 req := e.Value.(*jsonRequest)
922 req.responseChan <- &response{
923 result: nil,
924 err: ErrClientDisconnect,
925 }
926 }
927 c.removeAllRequests()
928 c.doShutdown()
929 }
930 }
931 932 // Shutdown shuts down the client by disconnecting any connections associated
933 // with the client and, when automatic reconnect is enabled, preventing future
934 // attempts to reconnect. It also stops all goroutines.
935 func (c *Client) Shutdown() {
936 // Do the shutdown under the request lock to prevent clients from adding new
937 // requests while the client shutdown process is initiated.
938 c.requestLock.Lock()
939 defer c.requestLock.Unlock()
940 // Ignore the shutdown request if the client is already in the process of
941 // shutting down or already shutdown.
942 if !c.doShutdown() {
943 return
944 }
945 // Send the ErrClientShutdown error to any pending requests.
946 for e := c.requestList.Front(); e != nil; e = e.Next() {
947 req := e.Value.(*jsonRequest)
948 req.responseChan <- &response{
949 result: nil,
950 err: ErrClientShutdown,
951 }
952 }
953 c.removeAllRequests()
954 // Disconnect the client if needed.
955 c.doDisconnect()
956 }
957 958 // start begins processing input and output messages.
959 func (c *Client) start() {
960 T.Ln("starting RPC client", c.config.Host)
961 // Start the I/O processing handlers depending on whether the client is in HTTP
962 // POST mode or the default websocket mode.
963 if c.config.HTTPPostMode {
964 c.wg.Add(1)
965 go c.sendPostHandler()
966 } else {
967 c.wg.Add(3)
968 go func() {
969 if c.ntfnHandlers != nil {
970 if c.ntfnHandlers.OnClientConnected != nil {
971 c.ntfnHandlers.OnClientConnected()
972 }
973 }
974 c.wg.Done()
975 }()
976 go c.wsInHandler()
977 go c.wsOutHandler()
978 }
979 }
980 981 // WaitForShutdown blocks until the client goroutines are stopped and the
982 // connection is closed.
983 func (c *Client) WaitForShutdown() {
984 c.wg.Wait()
985 }
986 987 // ConnConfig describes the connection configuration parameters for the client.
988 type ConnConfig struct {
989 // Host is the IP address and port of the RPC server you want to connect to.
990 Host string
991 // Endpoint is the websocket endpoint on the RPC server. This is typically "ws".
992 Endpoint string
993 // User is the username to use to authenticate to the RPC server.
994 User string
995 // Pass is the passphrase to use to authenticate to the RPC server.
996 Pass string
997 // TLS enables transport layer security encryption. It is recommended to always
998 // use TLS if the RPC server supports it as otherwise your username and password
999 // is sent across the wire in cleartext.
1000 TLS bool
1001 // Certificates are the bytes for a PEM-encoded certificate chain used for the
1002 // TLS connection. It has no effect if the DisableTLS parameter is true.
1003 Certificates []byte
1004 // Proxy specifies to connect through a SOCKS 5 proxy server. It may be an empty
1005 // string if a proxy is not required.
1006 Proxy string
1007 // ProxyUser is an optional username to use for the proxy server if it requires
1008 // authentication. It has no effect if the Proxy parameter is not set.
1009 ProxyUser string
1010 // ProxyPass is an optional password to use for the proxy server if it requires
1011 // authentication. It has no effect if the Proxy parameter is not set.
1012 ProxyPass string
1013 // DisableAutoReconnect specifies the client should not automatically try to
1014 // reconnect to the server when it has been disconnected.
1015 DisableAutoReconnect bool
1016 // DisableConnectOnNew specifies that a websocket client connection should not
1017 // be tried when creating the client with New. Instead, the client is created
1018 // and returned unconnected, and Connect must be called manually.
1019 DisableConnectOnNew bool
1020 // HTTPPostMode instructs the client to run using multiple independent
1021 // connections issuing HTTP POST requests instead of using the default of
1022 // websockets.
1023 //
1024 // Websockets are generally preferred as some of the features of the client such
1025 // notifications only work with websockets, however, not all servers support the
1026 // websocket extensions, so this flag can be set to true to use basic HTTP POST
1027 // requests instead.
1028 HTTPPostMode bool
1029 // EnableBCInfoHacks is an opt provided to enable compatibility hacks when
1030 // connecting to blockchain.info RPC server
1031 EnableBCInfoHacks bool
1032 }
1033 1034 // newHTTPClient returns a new http client that is configured according to the
1035 // proxy and TLS settings in the associated connection configuration.
1036 func newHTTPClient(config *ConnConfig) (*http.Client, error) {
1037 // Set proxy function if there is a proxy configured.
1038 var proxyFunc func(*http.Request) (*url.URL, error)
1039 if config.Proxy != "" {
1040 proxyURL, e := url.Parse(config.Proxy)
1041 if e != nil {
1042 return nil, e
1043 }
1044 proxyFunc = http.ProxyURL(proxyURL)
1045 }
1046 // Configure TLS if needed.
1047 var tlsConfig *tls.Config
1048 if !config.TLS {
1049 if len(config.Certificates) > 0 {
1050 pool := x509.NewCertPool()
1051 pool.AppendCertsFromPEM(config.Certificates)
1052 tlsConfig = &tls.Config{
1053 RootCAs: pool,
1054 }
1055 }
1056 }
1057 client := http.Client{
1058 Transport: &http.Transport{
1059 Proxy: proxyFunc,
1060 TLSClientConfig: tlsConfig,
1061 },
1062 }
1063 return &client, nil
1064 }
1065 1066 // dial opens a websocket connection using the passed connection configuration
1067 // details.
1068 func dial(config *ConnConfig) (*websocket.Conn, error) {
1069 // Setup TLS if not disabled.
1070 var tlsConfig *tls.Config
1071 var scheme = "ws"
1072 if config.TLS {
1073 tlsConfig = &tls.Config{
1074 MinVersion: tls.VersionTLS12,
1075 }
1076 if len(config.Certificates) > 0 {
1077 // D.Ln("no certificates for verification")
1078 pool := x509.NewCertPool()
1079 pool.AppendCertsFromPEM(config.Certificates)
1080 tlsConfig.RootCAs = pool
1081 }
1082 scheme = "wss"
1083 }
1084 // Create a websocket dialer that will be used to make the connection. It is
1085 // modified by the proxy setting below as needed.
1086 dialer := websocket.Dialer{TLSClientConfig: tlsConfig}
1087 // Setup the proxy if one is configured.
1088 if config.Proxy != "" {
1089 proxy := &socks.Proxy{
1090 Addr: config.Proxy,
1091 Username: config.ProxyUser,
1092 Password: config.ProxyPass,
1093 }
1094 dialer.NetDial = proxy.Dial
1095 }
1096 // The RPC server requires basic authorization, so create a custom request
1097 // header with the Authorization header set.
1098 login := config.User + ":" + config.Pass
1099 auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
1100 requestHeader := make(http.Header)
1101 requestHeader.Add("Authorization", auth)
1102 // Dial the connection.
1103 address := fmt.Sprintf("%s://%s/%s", scheme, config.Host, config.Endpoint)
1104 wsConn, resp, e := dialer.Dial(address, requestHeader)
1105 if e != nil {
1106 // debug.SetTraceback("all")
1107 // debug.PrintStack()
1108 if e != websocket.ErrBadHandshake || resp == nil {
1109 return nil, e
1110 }
1111 // Detect HTTP authentication error status codes.
1112 if resp.StatusCode == http.StatusUnauthorized ||
1113 resp.StatusCode == http.StatusForbidden {
1114 return nil, ErrInvalidAuth
1115 }
1116 // The connection was authenticated and the status response was ok, but the
1117 // websocket handshake still failed, so the endpoint is invalid in some way.
1118 if resp.StatusCode == http.StatusOK {
1119 return nil, ErrInvalidEndpoint
1120 }
1121 // Return the status text from the server if none of the special cases above
1122 // apply.
1123 return nil, errors.New(resp.Status)
1124 }
1125 return wsConn, nil
1126 }
1127 1128 // New creates a new RPC client based on the provided connection configuration
1129 // details.
1130 //
1131 // The notification handlers parameter may be nil if you are not interested in
1132 // receiving notifications and will be ignored if the configuration is set to
1133 // run in HTTP POST mode.
1134 func New(config *ConnConfig, ntfnHandlers *NotificationHandlers, quit qu.C) (*Client, error) {
1135 // Either open a websocket connection or create an HTTP client depending on the
1136 // HTTP POST mode. Also, set the notification handlers to nil when running in
1137 // HTTP POST mode.
1138 var wsConn *websocket.Conn
1139 var httpClient *http.Client
1140 connEstablished := qu.T()
1141 var start bool
1142 if config.HTTPPostMode {
1143 ntfnHandlers = nil
1144 start = true
1145 var e error
1146 httpClient, e = newHTTPClient(config)
1147 if e != nil {
1148 return nil, e
1149 }
1150 } else {
1151 if !config.DisableConnectOnNew {
1152 var e error
1153 wsConn, e = dial(config)
1154 if e != nil {
1155 return nil, e
1156 }
1157 start = true
1158 }
1159 }
1160 client := &Client{
1161 config: config,
1162 wsConn: wsConn,
1163 httpClient: httpClient,
1164 requestMap: make(map[uint64]*list.Element),
1165 requestList: list.New(),
1166 ntfnHandlers: ntfnHandlers,
1167 ntfnState: newNotificationState(),
1168 sendChan: make(chan []byte, sendBufferSize),
1169 sendPostChan: make(chan *sendPostDetails, sendPostBufferSize),
1170 connEstablished: connEstablished,
1171 disconnect: qu.T(),
1172 shutdown: qu.T(),
1173 }
1174 go func() {
1175 out:
1176 for {
1177 select {
1178 case <-quit.Wait():
1179 client.disconnect.Q()
1180 client.shutdown.Q()
1181 break out
1182 }
1183 }
1184 }()
1185 if start {
1186 T.Ln("established connection to RPC server", config.Host)
1187 connEstablished.Q()
1188 client.start()
1189 if !client.config.HTTPPostMode && !client.config.DisableAutoReconnect {
1190 client.wg.Add(1)
1191 go client.wsReconnectHandler()
1192 }
1193 }
1194 return client, nil
1195 }
1196 1197 // Connect establishes the initial websocket connection. This is necessary when
1198 // a client was created after setting the DisableConnectOnNew field of the
1199 // Config struct.
1200 //
1201 // Up to tries number of connections (each after an increasing backoff) will be
1202 // tried if the connection can not be established. The special value of 0
1203 // indicates an unlimited number of connection attempts.
1204 //
1205 // This method will error if the client is not configured for websockets, if the
1206 // connection has already been established, or if none of the connection
1207 // attempts were successful.
1208 func (c *Client) Connect(tries int) (e error) {
1209 c.mtx.Lock()
1210 defer c.mtx.Unlock()
1211 if c.config.HTTPPostMode {
1212 return ErrNotWebsocketClient
1213 }
1214 if c.wsConn != nil {
1215 return ErrClientAlreadyConnected
1216 }
1217 // Begin connection attempts. Increase the backoff after each failed attempt, up
1218 // to a maximum of one minute.
1219 var backoff time.Duration
1220 for i := 0; tries == 0 || i < tries; i++ {
1221 var wsConn *websocket.Conn
1222 wsConn, e = dial(c.config)
1223 if e != nil {
1224 backoff = connectionRetryInterval * time.Duration(i+1)
1225 if backoff > time.Minute {
1226 backoff = time.Minute
1227 }
1228 time.Sleep(backoff)
1229 continue
1230 }
1231 // Connection was established. Set the websocket connection member of the client
1232 // and start the goroutines necessary to run the client.
1233 D.Ln("established connection to RPC server", c.config.Host)
1234 c.wsConn = wsConn
1235 c.connEstablished.Q()
1236 c.start()
1237 if !c.config.DisableAutoReconnect {
1238 c.wg.Add(1)
1239 go c.wsReconnectHandler()
1240 }
1241 return nil
1242 }
1243 1244 // All connection attempts failed, so return the last error.
1245 return e
1246 }
1247