1 package wallet
2 3 import (
4 "crypto/sha256"
5 "crypto/subtle"
6 "encoding/base64"
7 js "encoding/json"
8 "errors"
9 "io"
10 "io/ioutil"
11 "net"
12 "net/http"
13 "sync"
14 "sync/atomic"
15 "time"
16 17 "github.com/p9c/p9/pkg/qu"
18 19 "github.com/btcsuite/websocket"
20 21 "github.com/p9c/p9/pkg/btcjson"
22 "github.com/p9c/p9/pkg/chainclient"
23 "github.com/p9c/p9/pkg/interrupt"
24 )
25 26 type WebsocketClient struct {
27 conn *websocket.Conn
28 authenticated bool
29 remoteAddr string
30 allRequests chan []byte
31 responses chan []byte
32 quit qu.C // closed on disconnect
33 wg sync.WaitGroup
34 }
35 36 func NewWebsocketClient(c *websocket.Conn, authenticated bool, remoteAddr string) *WebsocketClient {
37 return &WebsocketClient{
38 conn: c,
39 authenticated: authenticated,
40 remoteAddr: remoteAddr,
41 allRequests: make(chan []byte),
42 responses: make(chan []byte),
43 quit: qu.T(),
44 }
45 }
46 func (c *WebsocketClient) Send(b []byte) (e error) {
47 select {
48 case c.responses <- b:
49 return nil
50 case <-c.quit.Wait():
51 return errors.New("websocket client disconnected")
52 }
53 }
54 55 // Server holds the items the RPC server may need to access (auth, config,
56 // shutdown, etc.)
57 type Server struct {
58 HTTPServer http.Server
59 Wallet *Wallet
60 WalletLoader *Loader
61 ChainClient chainclient.Interface
62 // handlerLookup func(string) (requestHandler, bool)
63 HandlerMutex sync.Mutex
64 Listeners []net.Listener
65 AuthSHA [sha256.Size]byte
66 Upgrader websocket.Upgrader
67 MaxPostClients int64 // Max concurrent HTTP POST clients.
68 MaxWebsocketClients int64 // Max concurrent websocket clients.
69 WG sync.WaitGroup
70 Quit qu.C
71 QuitMutex sync.Mutex
72 RequestShutdownChan qu.C
73 }
74 75 // JSONAuthFail sends a message back to the client if the http auth is rejected.
76 func JSONAuthFail(w http.ResponseWriter) {
77 w.Header().Add("WWW-Authenticate", `Basic realm="pod RPC"`)
78 http.Error(w, "401 Unauthorized.", http.StatusUnauthorized)
79 }
80 81 // NewServer creates a new server for serving legacy RPC client connections, both HTTP POST and websocket.
82 func NewServer(opts *Options, walletLoader *Loader, listeners []net.Listener, quit qu.C) *Server {
83 serveMux := http.NewServeMux()
84 const rpcAuthTimeoutSeconds = 10
85 server := &Server{
86 HTTPServer: http.Server{
87 Handler: serveMux,
88 // Timeout connections which don't complete the initial handshake within the
89 // allowed timeframe.
90 ReadTimeout: time.Second * rpcAuthTimeoutSeconds,
91 },
92 WalletLoader: walletLoader,
93 MaxPostClients: opts.MaxPOSTClients,
94 MaxWebsocketClients: opts.MaxWebsocketClients,
95 Listeners: listeners,
96 // A hash of the HTTP basic auth string is used for a constant time comparison.
97 AuthSHA: sha256.Sum256(HTTPBasicAuth(opts.Username, opts.Password)),
98 Upgrader: websocket.Upgrader{
99 // Allow all origins.
100 CheckOrigin: func(r *http.Request) bool { return true },
101 },
102 Quit: quit,
103 RequestShutdownChan: qu.Ts(1),
104 }
105 serveMux.Handle(
106 "/", ThrottledFn(
107 opts.MaxPOSTClients,
108 func(w http.ResponseWriter, r *http.Request) {
109 w.Header().Set("Connection", "close")
110 w.Header().Set("Content-Type", "application/json")
111 r.Close = true
112 if e := server.CheckAuthHeader(r); E.Chk(e) {
113 W.Ln("unauthorized client connection attempt")
114 JSONAuthFail(w)
115 return
116 }
117 server.WG.Add(1)
118 server.POSTClientRPC(w, r)
119 server.WG.Done()
120 },
121 ),
122 )
123 serveMux.Handle(
124 "/ws", ThrottledFn(
125 opts.MaxWebsocketClients,
126 func(w http.ResponseWriter, r *http.Request) {
127 authenticated := false
128 switch server.CheckAuthHeader(r) {
129 case nil:
130 authenticated = true
131 case ErrNoAuth:
132 // nothing
133 default:
134 // If auth was supplied but incorrect, rather than simply being missing,
135 // immediately terminate the connection.
136 W.Ln("disconnecting improperly authorized websocket client")
137 JSONAuthFail(w)
138 return
139 }
140 conn, e := server.Upgrader.Upgrade(w, r, nil)
141 if e != nil {
142 W.F(
143 "cannot websocket upgrade client %s: %v",
144 r.RemoteAddr, e,
145 )
146 return
147 }
148 wsc := NewWebsocketClient(conn, authenticated, r.RemoteAddr)
149 server.WebsocketClientRPC(wsc)
150 },
151 ),
152 )
153 for _, lis := range listeners {
154 server.Serve(lis)
155 }
156 return server
157 }
158 159 // HTTPBasicAuth returns the UTF-8 bytes of the HTTP Basic authentication string:
160 //
161 // "Basic " + base64(username + ":" + password)
162 func HTTPBasicAuth(username, password string) []byte {
163 const header = "Basic "
164 b64 := base64.StdEncoding
165 b64InputLen := len(username) + len(":") + len(password)
166 b64Input := make([]byte, 0, b64InputLen)
167 b64Input = append(b64Input, username...)
168 b64Input = append(b64Input, ':')
169 b64Input = append(b64Input, password...)
170 output := make([]byte, len(header)+b64.EncodedLen(b64InputLen))
171 copy(output, header)
172 b64.Encode(output[len(header):], b64Input)
173 return output
174 }
175 176 // Serve serves HTTP POST and websocket RPC for the legacy JSON-RPC RPC server.
177 // This function does not block on lis.Accept.
178 func (s *Server) Serve(lis net.Listener) {
179 s.WG.Add(1)
180 go func() {
181 I.Ln("wallet RPC server listening on ", lis.Addr())
182 var e error
183 if e = s.HTTPServer.Serve(lis); E.Chk(e) {
184 }
185 D.Ln("finished serving wallet RPC:", e)
186 s.WG.Done()
187 }()
188 }
189 190 // RegisterWallet associates the legacy RPC server with the wallet. This
191 // function must be called before any wallet RPCs can be called by clients.
192 func (s *Server) RegisterWallet(w *Wallet) {
193 s.HandlerMutex.Lock()
194 s.Wallet = w
195 s.HandlerMutex.Unlock()
196 }
197 198 // Stop gracefully shuts down the rpc server by stopping and disconnecting all
199 // clients, disconnecting the chain server connection, and closing the wallet's
200 // account files. This blocks until shutdown completes.
201 func (s *Server) Stop() {
202 s.QuitMutex.Lock()
203 select {
204 case <-s.Quit.Wait():
205 s.QuitMutex.Unlock()
206 return
207 default:
208 }
209 // Stop the connected wllt and chain server, if any.
210 s.HandlerMutex.Lock()
211 wllt := s.Wallet
212 chainClient := s.ChainClient
213 s.HandlerMutex.Unlock()
214 if wllt != nil {
215 wllt.Stop()
216 }
217 if chainClient != nil {
218 chainClient.Stop()
219 }
220 // Stop all the listeners.
221 for _, listener := range s.Listeners {
222 e := listener.Close()
223 if e != nil {
224 E.F(
225 "cannot close listener `%s`: %v %s",
226 listener.Addr(), e,
227 )
228 }
229 }
230 // Signal the remaining goroutines to stop.
231 s.Quit.Q()
232 s.QuitMutex.Unlock()
233 // First wait for the wallet and chain server to stop, if they were ever set.
234 if wllt != nil {
235 wllt.WaitForShutdown()
236 }
237 if chainClient != nil {
238 chainClient.WaitForShutdown()
239 }
240 // Wait for all remaining goroutines to exit.
241 s.WG.Wait()
242 }
243 244 // SetChainServer sets the chain server client component needed to run a fully
245 // functional bitcoin wallet RPC server. This can be called to enable RPC
246 // passthrough even before a loaded wallet is set, but the wallet's RPC client
247 // is preferred.
248 func (s *Server) SetChainServer(chainClient chainclient.Interface) {
249 s.HandlerMutex.Lock()
250 s.ChainClient = chainClient
251 s.HandlerMutex.Unlock()
252 }
253 254 // HandlerClosure creates a closure function for handling requests of the given
255 // method. This may be a request that is handled directly by btcwallet, or a
256 // chain server request that is handled by passing the request down to pod.
257 //
258 // NOTE: These handlers do not handle special cases, such as the authenticate
259 // method. Each of these must be checked beforehand (the method is already
260 // known) and handled accordingly.
261 func (s *Server) HandlerClosure(request *btcjson.Request) LazyHandler {
262 s.HandlerMutex.Lock()
263 // With the lock held, make copies of these pointers for the closure.
264 wllt := s.Wallet
265 chainClient := s.ChainClient
266 if wllt != nil && chainClient == nil {
267 chainClient = wllt.ChainClient()
268 s.ChainClient = chainClient
269 D.Ln("HandlerClosure got the ChainClient")
270 }
271 s.HandlerMutex.Unlock()
272 return LazyApplyHandler(request, wllt, chainClient)
273 }
274 275 // ErrNoAuth represents an error where authentication could not succeed due to a
276 // missing Authorization HTTP header.
277 var ErrNoAuth = errors.New("no auth")
278 279 // CheckAuthHeader checks the HTTP Basic authentication supplied by a client in
280 // the HTTP request r. It errors with ErrNoAuth if the request does not contain
281 // the Authorization header, or another non-nil error if the authentication was
282 // provided but incorrect.
283 //
284 // This check is time-constant.
285 func (s *Server) CheckAuthHeader(r *http.Request) (e error) {
286 authHdr := r.Header["Authorization"]
287 if len(authHdr) == 0 {
288 return ErrNoAuth
289 }
290 authSHA := sha256.Sum256([]byte(authHdr[0]))
291 cmp := subtle.ConstantTimeCompare(authSHA[:], s.AuthSHA[:])
292 if cmp != 1 {
293 return errors.New("bad auth")
294 }
295 return nil
296 }
297 298 // ThrottledFn wraps an http.HandlerFunc with throttling of concurrent active
299 // clients by responding with an HTTP 429 when the threshold is crossed.
300 func ThrottledFn(threshold int64, f http.HandlerFunc) http.Handler {
301 return Throttled(threshold, f)
302 }
303 304 // Throttled wraps an http.Handler with throttling of concurrent active clients
305 // by responding with an HTTP 429 when the threshold is crossed.
306 func Throttled(threshold int64, h http.Handler) http.Handler {
307 var active int64
308 return http.HandlerFunc(
309 func(w http.ResponseWriter, r *http.Request) {
310 current := atomic.AddInt64(&active, 1)
311 defer atomic.AddInt64(&active, -1)
312 if current-1 >= threshold {
313 W.F(
314 "reached threshold of %d concurrent active clients", threshold,
315 )
316 http.Error(w, "429 Too Many Requests", 429)
317 return
318 }
319 h.ServeHTTP(w, r)
320 },
321 )
322 }
323 324 // // sanitizeRequest returns a sanitized string for the request which may be
325 // // safely logged. It is intended to strip private keys, passphrases, and any
326 // // other secrets from request parameters before they may be saved to a log file.
327 // func sanitizeRequest(// r *json.Request) string {
328 // // These are considered unsafe to log, so sanitize parameters.
329 // switch r.Method {
330 // case "encryptwallet", "importprivkey", "importwallet",
331 // "signrawtransaction", "walletpassphrase",
332 // "walletpassphrasechange":
333 // return fmt.Sprintf(`{"id":%v,"method":"%s","netparams":SANITIZED %d parameters}`,
334 // r.ID, r.Method, len(r.Params))
335 // }
336 // return fmt.Sprintf(`{"id":%v,"method":"%s","netparams":%v}`, r.ID,
337 // r.Method, r.Params)
338 // }
339 340 // IDPointer returns a pointer to the passed ID, or nil if the interface is nil. Interface pointers are usually a red
341 // flag of doing something incorrectly, but this is only implemented here to work around an oddity with json, which uses
342 // empty interface pointers for response IDs.
343 func IDPointer(id interface{}) (p *interface{}) {
344 if id != nil {
345 p = &id
346 }
347 return
348 }
349 350 // InvalidAuth checks whether a websocket request is a valid (parsable) authenticate request and checks the supplied
351 // username and passphrase against the server auth.
352 func (s *Server) InvalidAuth(req *btcjson.Request) bool {
353 cmd, e := btcjson.UnmarshalCmd(req)
354 if e != nil {
355 return false
356 }
357 authCmd, ok := cmd.(*btcjson.AuthenticateCmd)
358 if !ok {
359 return false
360 }
361 // Chk credentials.
362 login := authCmd.Username + ":" + authCmd.Passphrase
363 auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
364 authSha := sha256.Sum256([]byte(auth))
365 return subtle.ConstantTimeCompare(authSha[:], s.AuthSHA[:]) != 1
366 }
367 func (s *Server) WebsocketClientRead(wsc *WebsocketClient) {
368 for {
369 _, request, e := wsc.conn.ReadMessage()
370 if e != nil {
371 if e != io.EOF && e != io.ErrUnexpectedEOF {
372 W.F(
373 "websocket receive failed from client %s: %v",
374 wsc.remoteAddr, e,
375 )
376 }
377 close(wsc.allRequests)
378 break
379 }
380 wsc.allRequests <- request
381 }
382 }
383 func (s *Server) WebsocketClientRespond(wsc *WebsocketClient) {
384 // A for-select with a read of the quit channel is used instead of a for-range to provide clean shutdown. This is
385 // necessary due to WebsocketClientRead (which sends to the allRequests chan) not closing allRequests during
386 // shutdown if the remote websocket client is still connected.
387 out:
388 for {
389 select {
390 case reqBytes, ok := <-wsc.allRequests:
391 if !ok {
392 // client disconnected
393 break out
394 }
395 var req btcjson.Request
396 e := js.Unmarshal(reqBytes, &req)
397 if e != nil {
398 if !wsc.authenticated {
399 // Disconnect immediately.
400 break out
401 }
402 resp := MakeResponse(
403 req.ID, nil,
404 btcjson.ErrRPCInvalidRequest,
405 )
406 mResp, e := js.Marshal(resp)
407 // We expect the marshal to succeed. If it doesn't, it indicates some non-marshalable type in the
408 // response.
409 if e != nil {
410 panic(e)
411 }
412 e = wsc.Send(mResp)
413 if e != nil {
414 break out
415 }
416 continue
417 }
418 if req.Method == "authenticate" {
419 if wsc.authenticated || s.InvalidAuth(&req) {
420 // Disconnect immediately.
421 break out
422 }
423 wsc.authenticated = true
424 resp := MakeResponse(req.ID, nil, nil)
425 // Expected to never fail.
426 mResp, e := js.Marshal(resp)
427 if e != nil {
428 panic(e)
429 }
430 e = wsc.Send(mResp)
431 if e != nil {
432 break out
433 }
434 continue
435 }
436 if !wsc.authenticated {
437 // Disconnect immediately.
438 break out
439 }
440 switch req.Method {
441 case "stop":
442 resp := MakeResponse(
443 req.ID,
444 "wallet stopping.", nil,
445 )
446 mResp, e := js.Marshal(resp)
447 // Expected to never fail.
448 if e != nil {
449 panic(e)
450 }
451 e = wsc.Send(mResp)
452 if e != nil {
453 break out
454 }
455 s.RequestProcessShutdown()
456 // break
457 case "restart":
458 resp := MakeResponse(
459 req.ID,
460 "wallet restarting.", nil,
461 )
462 mResp, e := js.Marshal(resp)
463 // Expected to never fail.
464 if e != nil {
465 panic(e)
466 }
467 e = wsc.Send(mResp)
468 if e != nil {
469 break out
470 }
471 interrupt.Restart = true
472 s.RequestProcessShutdown()
473 // break
474 default:
475 req := req // Copy for the closure
476 f := s.HandlerClosure(&req)
477 wsc.wg.Add(1)
478 go func() {
479 resp, jsonErr := f()
480 mResp, e := btcjson.MarshalResponse(req.ID, resp, jsonErr)
481 if e != nil {
482 E.Ln(
483 "unable to marshal response:", e,
484 )
485 } else {
486 _ = wsc.Send(mResp)
487 }
488 wsc.wg.Done()
489 }()
490 }
491 case <-s.Quit.Wait():
492 break out
493 }
494 }
495 // allow client to disconnect after all handler goroutines are done
496 wsc.wg.Wait()
497 close(wsc.responses)
498 s.WG.Done()
499 }
500 func (s *Server) WebsocketClientSend(wsc *WebsocketClient) {
501 const deadline = 2 * time.Second
502 out:
503 for {
504 select {
505 case response, ok := <-wsc.responses:
506 if !ok {
507 // client disconnected
508 break out
509 }
510 e := wsc.conn.SetWriteDeadline(time.Now().Add(deadline))
511 if e != nil {
512 W.F(
513 "cannot set write deadline on client %s: %v",
514 wsc.remoteAddr, e,
515 )
516 }
517 e = wsc.conn.WriteMessage(
518 websocket.TextMessage,
519 response,
520 )
521 if e != nil {
522 W.F(
523 "failed websocket send to client %s: %v", wsc.remoteAddr, e,
524 )
525 break out
526 }
527 case <-s.Quit.Wait():
528 break out
529 }
530 }
531 wsc.quit.Q()
532 I.Ln("disconnected websocket client", wsc.remoteAddr)
533 s.WG.Done()
534 }
535 536 // WebsocketClientRPC starts the goroutines to serve JSON-RPC requests over a websocket connection for a single client.
537 func (s *Server) WebsocketClientRPC(wsc *WebsocketClient) {
538 I.F("new websocket client %s", wsc.remoteAddr)
539 // Clear the read deadline set before the websocket hijacked
540 // the connection.
541 if e := wsc.conn.SetReadDeadline(time.Time{}); E.Chk(e) {
542 W.Ln("cannot remove read deadline:", e)
543 }
544 // WebsocketClientRead is intentionally not run with the waitgroup so it is ignored during shutdown. This is to
545 // prevent a hang during shutdown where the goroutine is blocked on a read of the websocket connection if the client
546 // is still connected.
547 go s.WebsocketClientRead(wsc)
548 s.WG.Add(2)
549 go s.WebsocketClientRespond(wsc)
550 go s.WebsocketClientSend(wsc)
551 <-wsc.quit
552 }
553 554 // MaxRequestSize specifies the maximum number of bytes in the request body that may be read from a client. This is
555 // currently limited to 4MB.
556 const MaxRequestSize = 1024 * 1024 * 4
557 558 // POSTClientRPC processes and replies to a JSON-RPC client request.
559 func (s *Server) POSTClientRPC(w http.ResponseWriter, r *http.Request) {
560 body := http.MaxBytesReader(w, r.Body, MaxRequestSize)
561 rpcRequest, e := ioutil.ReadAll(body)
562 if e != nil {
563 // TODO: what if the underlying reader errored?
564 http.Error(
565 w, "413 Request Too Large.",
566 http.StatusRequestEntityTooLarge,
567 )
568 return
569 }
570 // First check whether wallet has a handler for this request's method. If unfound, the request is sent to the chain
571 // server for further processing. While checking the methods, disallow authenticate requests, as they are invalid
572 // for HTTP POST clients.
573 var req btcjson.Request
574 e = js.Unmarshal(rpcRequest, &req)
575 if e != nil {
576 var resp []byte
577 resp, e = btcjson.MarshalResponse(req.ID, nil, btcjson.ErrRPCInvalidRequest)
578 if e != nil {
579 E.Ln(
580 "Unable to marshal response:", e,
581 )
582 http.Error(
583 w, "500 Internal Server BTCJSONError",
584 http.StatusInternalServerError,
585 )
586 return
587 }
588 _, e = w.Write(resp)
589 if e != nil {
590 W.Ln(
591 "cannot write invalid request request to client:", e,
592 )
593 }
594 return
595 }
596 // Create the response and error from the request. Two special cases are handled for the authenticate and stop
597 // request methods.
598 var res interface{}
599 var jsonErr *btcjson.RPCError
600 var stop bool
601 switch req.Method {
602 case "authenticate":
603 // Drop it.
604 return
605 case "stop":
606 stop = true
607 res = "pod/wallet stopping"
608 case "restart":
609 stop = true
610 res = "pod/wallet restarting"
611 default:
612 res, jsonErr = s.HandlerClosure(&req)()
613 }
614 // Marshal and send.
615 mResp, e := btcjson.MarshalResponse(req.ID, res, jsonErr)
616 if e != nil {
617 E.Ln(
618 "unable to marshal response:", e,
619 )
620 http.Error(w, "500 Internal Server BTCJSONError", http.StatusInternalServerError)
621 return
622 }
623 _, e = w.Write(mResp)
624 if e != nil {
625 W.Ln(
626 "unable to respond to client:", e,
627 )
628 }
629 if stop {
630 s.RequestProcessShutdown()
631 }
632 }
633 func (s *Server) RequestProcessShutdown() {
634 select {
635 case s.RequestShutdownChan <- struct{}{}:
636 default:
637 }
638 }
639 640 // RequestProcessShutdownChan returns a channel that is sent to when an authorized client requests remote shutdown.
641 func (s *Server) RequestProcessShutdownChan() qu.C {
642 return s.RequestShutdownChan
643 }
644