1 package chainrpc
2 3 import (
4 "bytes"
5 "container/list"
6 "crypto/sha256"
7 "crypto/subtle"
8 "encoding/base64"
9 "encoding/hex"
10 "encoding/json"
11 "errors"
12 "fmt"
13 "io"
14 "math"
15 "sync"
16 "time"
17 18 amount2 "github.com/p9c/p9/pkg/amt"
19 "github.com/p9c/p9/pkg/block"
20 "github.com/p9c/p9/pkg/btcaddr"
21 "github.com/p9c/p9/pkg/chaincfg"
22 23 "github.com/p9c/p9/pkg/qu"
24 25 "github.com/btcsuite/websocket"
26 "golang.org/x/crypto/ripemd160"
27 28 "github.com/p9c/p9/pkg/blockchain"
29 "github.com/p9c/p9/pkg/btcjson"
30 "github.com/p9c/p9/pkg/chainhash"
31 "github.com/p9c/p9/pkg/database"
32 "github.com/p9c/p9/pkg/txscript"
33 "github.com/p9c/p9/pkg/util"
34 "github.com/p9c/p9/pkg/wire"
35 )
36 37 // Notification types
38 39 type NotificationBlockConnected block.Block
40 type NotificationBlockDisconnected block.Block
41 type NotificationRegisterAddr struct {
42 WSC *WSClient
43 Addrs []string
44 }
45 type NotificationRegisterBlocks WSClient
46 47 // Notification control requests
48 49 type NotificationRegisterClient WSClient
50 type NotificationRegisterNewMempoolTxs WSClient
51 type NotificationRegisterSpent struct {
52 WSC *WSClient
53 OPs []*wire.OutPoint
54 }
55 type NotificationTxAcceptedByMempool struct {
56 IsNew bool
57 Tx *util.Tx
58 }
59 type NotificationUnregisterAddr struct {
60 WSC *WSClient
61 Addr string
62 }
63 type NotificationUnregisterBlocks WSClient
64 type NotificationUnregisterClient WSClient
65 type NotificationUnregisterNewMempoolTxs WSClient
66 type NotificationUnregisterSpent struct {
67 WSC *WSClient
68 OP *wire.OutPoint
69 }
70 type RescanKeys struct {
71 Fallbacks map[string]struct{}
72 PubKeyHashes map[[ripemd160.Size]byte]struct{}
73 ScriptHashes map[[ripemd160.Size]byte]struct{}
74 CompressedPubKeys map[[33]byte]struct{}
75 UncompressedPubKeys map[[65]byte]struct{}
76 Unspent map[wire.OutPoint]struct{}
77 }
78 type Semaphore qu.C
79 80 // WSClient provides an abstraction for handling a websocket client. The overall data flow is split into 3 main
81 // goroutines, a possible 4th goroutine for long-running operations (only started if request is made), and a websocket
82 // manager which is used to allow things such as broadcasting requested notifications to all connected websocket
83 // clients.
84 //
85 // Inbound messages are read via the inHandler goroutine and generally dispatched to their own handler. However, certain
86 // potentially long-running operations such as rescans, are sent to the asyncHander goroutine and are limited to one at
87 // a time.
88 //
89 // There are two outbound message types - one for responding to client requests and another for async notifications.
90 // Responses to client requests use SendMessage which employs a buffered channel thereby limiting the number of
91 // outstanding requests that can be made.
92 //
93 // Notifications are sent via QueueNotification which implements a queue via notificationQueueHandler to ensure sending
94 // notifications from other subsystems can't block.
95 //
96 // Ultimately, all messages are sent via the outHandler.
97 type WSClient struct {
98 // ws client requires a mutex lock for access
99 sync.Mutex
100 // Server is the RPC Server that is servicing the client.
101 Server *Server
102 // Conn is the underlying websocket connection.
103 Conn *websocket.Conn
104 // Addr is the remote address of the client.
105 Addr string
106 // SessionID is a random ID generated for each client when connected. These IDs may be queried by a client using the
107 // session RPC. A change to the session ID indicates that the client reconnected.
108 SessionID uint64
109 AddrRequests map[string]struct{}
110 // SpentRequests is a set of unspent Outpoints a wallet has requested notifications for when they are spent by a
111 // processed transaction. Owned by the notification manager.
112 SpentRequests map[wire.OutPoint]struct{}
113 // FilterData is the new generation transaction filter backported from github.com/decred/dcrd for the new backported
114 // `loadtxfilter` and `rescanblocks` methods.
115 FilterData *WSClientFilter
116 // Networking infrastructure.
117 ServiceRequestSem Semaphore
118 NtfnChan chan []byte
119 SendChan chan WSResponse
120 Quit qu.C
121 WG sync.WaitGroup
122 // Disconnected indicated whether or not the websocket client is Disconnected.
123 Disconnected bool
124 // Authenticated specifies whether a client has been Authenticated and therefore is allowed to communicated over the
125 // websocket.
126 Authenticated bool
127 // IsAdmin specifies whether a client may change the state of the Server; false means its access is only to the
128 // limited set of RPC calls.
129 IsAdmin bool
130 // VerboseTxUpdates specifies whether a client has requested verbose information about all new transactions.
131 VerboseTxUpdates bool
132 // AddrRequests is a set of addresses the caller has requested to be notified about. It is maintained here so all
133 // requests can be removed when a wallet disconnects. Owned by the notification manager.
134 }
135 136 // WSClientFilter tracks relevant addresses for each websocket client for the `rescanblocks` extension. It is modified
137 // by the `loadtxfilter` command. NOTE: This extension was ported from github.com/decred/dcrd
138 type WSClientFilter struct {
139 mu sync.Mutex
140 // Implemented fast paths for address lookup.
141 PubKeyHashes map[[ripemd160.Size]byte]struct{}
142 ScriptHashes map[[ripemd160.Size]byte]struct{}
143 CompressedPubKeys map[[33]byte]struct{}
144 UncompressedPubKeys map[[65]byte]struct{}
145 // A fallback address lookup map in case a fast path doesn't exist. Only exists for completeness. If using this
146 // shows up in a profile, there's a good chance a fast path should be added.
147 OtherAddresses map[string]struct{}
148 // Outpoints of Unspent outputs.
149 Unspent map[wire.OutPoint]struct{}
150 }
151 152 // WSCommandHandler describes a callback function used to handle a specific command.
153 type WSCommandHandler func(*WSClient, interface{}) (interface{}, error)
154 155 // WSNtfnMgr is a connection and notification manager used for websockets. It allows websocket clients to register for
156 // notifications they are interested in.
157 //
158 // When an event happens elsewhere in the code such as transactions being added to the memory pool or block
159 // connects/disconnects, the notification manager is provided with the relevant details needed to figure out which
160 // websocket clients need to be notified based on what they have registered for and notifies them accordingly.
161 //
162 // It is also used to keep track of all connected websocket clients.
163 type WSNtfnMgr struct {
164 // Server is the RPC Server the notification manager is associated with.
165 Server *Server
166 // QueueNotification queues a notification for handling.
167 QueueNotification chan interface{}
168 // NotificationMsgs feeds notificationHandler with notifications and client (un)registration requests from a queue
169 // as well as registration and unregistering requests from clients.
170 NotificationMsgs chan interface{}
171 // Access channel for current number of connected clients.
172 NumClients chan int
173 // Shutdown handling
174 WG sync.WaitGroup
175 Quit qu.C
176 }
177 178 // WSResponse houses a message to send to a connected websocket client as well as a channel to reply on when the message
179 // is sent.
180 type WSResponse struct {
181 Msg []byte
182 DoneChan chan bool
183 }
184 185 const (
186 // WebsocketSendBufferSize is the number of elements the send channel can queue before blocking. Note that this only
187 // applies to requests handled directly in the websocket client input handler or the async handler since
188 // notifications have their own queuing mechanism independent of the send channel buffer.
189 WebsocketSendBufferSize = 50
190 )
191 192 // ErrClientQuit describes the error where a client send is not processed due to the client having already been
193 // disconnected or dropped.
194 var ErrClientQuit = errors.New("client quit")
195 196 // ErrRescanReorg defines the error that is returned when an unrecoverable reorganize is detected during a rescan.
197 var ErrRescanReorg = btcjson.RPCError{
198 Code: btcjson.ErrRPCDatabase,
199 Message: "Reorganize",
200 }
201 202 // TimeZeroVal is simply the zero value for a time.Time and is used to avoid creating multiple instances.
203 var TimeZeroVal = time.Time{}
204 205 // WSHandlers maps RPC command strings to appropriate websocket handler functions. This is set by init because help
206 // references WSHandlers and thus causes a dependency loop.
207 var WSHandlers map[string]WSCommandHandler
208 209 var WSHandlersBeforeInit = map[string]WSCommandHandler{
210 "loadtxfilter": HandleLoadTxFilter,
211 "help": HandleWebsocketHelp,
212 "notifyblocks": HandleNotifyBlocks,
213 "notifynewtransactions": HandleNotifyNewTransactions,
214 "notifyreceived": HandleNotifyReceived,
215 "notifyspent": HandleNotifySpent,
216 "session": HandleSession,
217 "stopnotifyblocks": HandleStopNotifyBlocks,
218 "stopnotifynewtransactions": HandleStopNotifyNewTransactions,
219 "stopnotifyspent": HandleStopNotifySpent,
220 "stopnotifyreceived": HandleStopNotifyReceived,
221 "rescan": HandleRescan,
222 "rescanblocks": HandleRescanBlocks,
223 }
224 225 // UnspentSlice returns a slice of currently-unspent outpoints for the rescan lookup keys. This is primarily intended to
226 // be used to register outpoints for continuous notifications after a rescan has completed.
227 func (r *RescanKeys) UnspentSlice() []*wire.OutPoint {
228 ops := make([]*wire.OutPoint, 0, len(r.Unspent))
229 for op := range r.Unspent {
230 opCopy := op
231 ops = append(ops, &opCopy)
232 }
233 return ops
234 }
235 236 // WebsocketHandler handles a new websocket client by creating a new wsClient, starting it, and blocking until the
237 // connection closes. Since it blocks, it must be run in a separate goroutine.
238 //
239 // It should be invoked from the websocket Server handler which runs each new connection in a new goroutine thereby
240 // satisfying the requirement.
241 func (s *Server) WebsocketHandler(
242 conn *websocket.Conn, remoteAddr string,
243 authenticated bool, isAdmin bool,
244 ) {
245 // Clear the read deadline that was set before the websocket hijacked the connection.
246 e := conn.SetReadDeadline(TimeZeroVal)
247 if e != nil {
248 D.Ln(e)
249 }
250 // Limit max number of websocket clients.
251 T.Ln("new websocket client", remoteAddr)
252 if s.NtfnMgr.GetNumClients()+1 > s.Config.RPCMaxWebsockets.V() {
253 I.F(
254 "max websocket clients exceeded [%d] - disconnecting client"+
255 " %s",
256 s.Config.RPCMaxWebsockets,
257 remoteAddr,
258 )
259 if e = conn.Close(); E.Chk(e) {
260 }
261 return
262 }
263 // Create a new websocket client to handle the new websocket connection and wait for it to shutdown.
264 //
265 // Once it has shutdown (and hence disconnected), remove it and any notifications it registered for.
266 client, e := NewWebsocketClient(s, conn, remoteAddr, authenticated, isAdmin)
267 if e != nil {
268 E.F("failed to serve client %s: %v %s", remoteAddr, e)
269 if e := conn.Close(); E.Chk(e) {
270 }
271 return
272 }
273 s.NtfnMgr.AddClient(client)
274 client.Start()
275 client.WaitForShutdown()
276 s.NtfnMgr.RemoveClient(client)
277 T.Ln("disconnected websocket client", remoteAddr)
278 }
279 280 // Disconnect disconnects the websocket client.
281 func (c *WSClient) Disconnect() {
282 c.Lock()
283 defer c.Unlock()
284 // Nothing to do if already disconnected.
285 if c.Disconnected {
286 return
287 }
288 T.Ln("disconnecting websocket client", c.Addr)
289 c.Quit.Q()
290 if e := c.Conn.Close(); E.Chk(e) {
291 }
292 c.Disconnected = true
293 }
294 295 // IsDisconnected returns whether or not the websocket client is disconnected.
296 func (c *WSClient) IsDisconnected() bool {
297 c.Lock()
298 isDisconnected := c.Disconnected
299 c.Unlock()
300 return isDisconnected
301 }
302 303 // QueueNotification queues the passed notification to be sent to the websocket client.
304 //
305 // This function, as the name implies, is only intended for notifications since it has additional logic to prevent other
306 // subsystems, such as the memory pool and block manager, from blocking even when the send channel is full.
307 //
308 // If the client is in the process of shutting down, this function returns ErrClientQuit. This is intended to be checked
309 // by long-running notification handlers to stop processing if there is no more work needed to be done.
310 func (c *WSClient) QueueNotification(marshalledJSON []byte) (e error) {
311 // Don't queue the message if disconnected.
312 if c.IsDisconnected() {
313 return ErrClientQuit
314 }
315 c.NtfnChan <- marshalledJSON
316 return nil
317 }
318 319 // SendMessage sends the passed json to the websocket client. It is backed by a buffered channel, so it will not block
320 // until the send channel is full.
321 //
322 // Note however that QueueNotification must be used for sending async notifications instead of the this function. This
323 // approach allows a limit to the number of outstanding requests a client can make without preventing or blocking on
324 // async notifications.
325 func (c *WSClient) SendMessage(marshalledJSON []byte, doneChan chan bool) {
326 // Don't send the message if disconnected.
327 if c.IsDisconnected() {
328 if doneChan != nil {
329 doneChan <- false
330 }
331 return
332 }
333 c.SendChan <- WSResponse{Msg: marshalledJSON, DoneChan: doneChan}
334 }
335 336 // Start begins processing input and output messages.
337 func (c *WSClient) Start() {
338 T.Ln("starting websocket client", c.Addr)
339 // Start processing input and output.
340 c.WG.Add(3)
341 go c.InHandler()
342 go c.NotificationQueueHandler()
343 go c.OutHandler()
344 }
345 346 // WaitForShutdown blocks until the websocket client goroutines are stopped and the connection is closed.
347 func (c *WSClient) WaitForShutdown() {
348 c.WG.Wait()
349 }
350 351 // InHandler handles all incoming messages for the websocket connection. It must be run as a goroutine.
352 func (c *WSClient) InHandler() {
353 out:
354 for {
355 // Break out of the loop once the quit channel has been closed. Use a non-blocking select here so we fall
356 // through otherwise.
357 select {
358 case <-c.Quit.Wait():
359 break out
360 default:
361 }
362 _, msg, e := c.Conn.ReadMessage()
363 if e != nil {
364 // Log the error if it's not due to disconnecting.
365 if e != io.EOF && e != io.ErrUnexpectedEOF {
366 T.F(
367 "websocket receive error from %s: %v",
368 c.Addr, e,
369 )
370 }
371 break out
372 }
373 var request btcjson.Request
374 e = json.Unmarshal(msg, &request)
375 if e != nil {
376 if !c.Authenticated {
377 break out
378 }
379 jsonErr := &btcjson.RPCError{
380 Code: btcjson.ErrRPCParse.Code,
381 Message: "Failed to parse request: " + e.Error(),
382 }
383 reply, e := CreateMarshalledReply(nil, nil, jsonErr)
384 if e != nil {
385 E.Ln(
386 "failed to marshal parse failure reply:", e,
387 )
388 continue
389 }
390 c.SendMessage(reply, nil)
391 continue
392 }
393 // The JSON-RPC 1.0 spec defines that notifications must have their "id" set to null and states that
394 // notifications do not have a response.
395 //
396 // A JSON-RPC 2.0 notification is a request with "json-rpc":"2.0", and without an "id" member. The specification
397 // states that notifications must not be responded to. JSON-RPC 2.0 permits the null value as a valid request
398 // id, therefore such requests are not notifications.
399 //
400 // Bitcoin Core serves requests with "id":null or even an absent "id", and responds to such requests with
401 // "id":null in the response.
402 //
403 // Pod does not respond to any request without and "id" or "id":null, regardless the indicated JSON-RPC protocol
404 // version unless RPC quirks are enabled. With RPC quirks enabled, such requests will be responded to if the
405 // reqeust does not indicate JSON-RPC version.
406 //
407 // RPC quirks can be enabled by the user to avoid compatibility issues with software relying on Core's behavior.
408 if request.ID == nil && !(c.Server.Config.RPCQuirks.True() && request.Jsonrpc == "") {
409 if !c.Authenticated {
410 break out
411 }
412 continue
413 }
414 cmd := ParseCmd(&request)
415 if cmd.Err != nil {
416 if !c.Authenticated {
417 break out
418 }
419 reply, e := CreateMarshalledReply(cmd.ID, nil, cmd.Err)
420 if e != nil {
421 E.F("failed to marshal parse failure reply:", e)
422 continue
423 }
424 c.SendMessage(reply, nil)
425 continue
426 }
427 T.F("received command <%s> from %s", cmd.Method, c.Addr)
428 // T.Ln(interrupt.GoroutineDump())
429 // Chk auth. The client is immediately disconnected if the first request of an unauthenticated websocket
430 // client is not the authenticate request, an authenticate request is received when the client is already
431 // authenticated, or incorrect authentication credentials are provided in the request.
432 switch authCmd, ok := cmd.Cmd.(*btcjson.AuthenticateCmd); {
433 case c.Authenticated && ok:
434 W.F(
435 "websocket client %s is already authenticated", c.Addr,
436 )
437 break out
438 case !c.Authenticated && !ok:
439 W.F("unauthenticated websocket message received")
440 break out
441 case !c.Authenticated:
442 // Chk credentials.
443 login := authCmd.Username + ":" + authCmd.Passphrase
444 auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
445 authSha := sha256.Sum256([]byte(auth))
446 cmp := subtle.ConstantTimeCompare(authSha[:], c.Server.AuthSHA[:])
447 limitcmp := subtle.ConstantTimeCompare(authSha[:], c.Server.LimitAuthSHA[:])
448 if cmp != 1 && limitcmp != 1 {
449 W.Ln("authentication failure from", c.Addr)
450 break out
451 }
452 c.Authenticated = true
453 c.IsAdmin = cmp == 1
454 // Marshal and send response.
455 reply, e := CreateMarshalledReply(cmd.ID, nil, nil)
456 if e != nil {
457 E.Ln("failed to marshal authenticate reply:", e)
458 continue
459 }
460 c.SendMessage(reply, nil)
461 continue
462 }
463 // Chk if the client is using limited RPC credentials and error when not authorized to call this RPC.
464 if !c.IsAdmin {
465 if _, ok := RPCLimited[request.Method]; !ok {
466 jsonErr := &btcjson.RPCError{
467 Code: btcjson.ErrRPCInvalidParams.Code,
468 Message: "limited user not authorized for this method",
469 }
470 // Marshal and send response.
471 reply, e := CreateMarshalledReply(request.ID, nil, jsonErr)
472 if e != nil {
473 E.Ln("failed to marshal parse failure reply:", e)
474 continue
475 }
476 c.SendMessage(reply, nil)
477 continue
478 }
479 }
480 // Asynchronously handle the request. A semaphore is used to limit the number of concurrent requests currently
481 // being serviced. If the semaphore can not be acquired, simply wait until a request finished before reading the
482 // next RPC request from the websocket client.
483 //
484 // This could be a little fancier by timing out and failing when it takes too long to service the request, but
485 // if that is done, the read of the next request should not be blocked by this semaphore, otherwise the next
486 // request will be read and will probably sit here for another few seconds before timing out as well. This will
487 // cause the total timeout duration for later requests to be much longer than the check here would imply.
488 //
489 // If a timeout is added, the semaphore acquiring should be moved inside of the new goroutine with a select
490 // statement that also reads a time.After channel. This will unblock the read of the next request from the
491 // websocket client and allow many requests to be waited on concurrently.
492 c.ServiceRequestSem.Acquire()
493 go func() {
494 c.ServiceRequest(cmd)
495 c.ServiceRequestSem.Release()
496 }()
497 }
498 // Ensure the connection is closed.
499 c.Disconnect()
500 c.WG.Done()
501 T.Ln("websocket client input handler done for", c.Addr)
502 }
503 504 // NotificationQueueHandler handles the queuing of outgoing notifications for the websocket client. This runs as a muxer
505 // for various sources of input to ensure that queuing up notifications to be sent will not block. Otherwise, slow
506 // clients could bog down the other systems (such as the mempool or block manager) which are queuing the data. The data
507 // is passed on to outHandler to actually be written. It must be run as a goroutine.
508 func (c *WSClient) NotificationQueueHandler() {
509 ntfnSentChan := make(chan bool, 1) // nonblocking sync
510 // pendingNtfns is used as a queue for notifications that are ready to be sent once there are no outstanding
511 // notifications currently being sent.
512 //
513 // The waiting flag is used over simply checking for items in the pending list to ensure cleanup knows what has and
514 // hasn't been sent to the outHandler.
515 //
516 // Currently no special cleanup is needed, however if something like a done channel is added to notifications in the
517 // future, not knowing what has and hasn't been sent to the outHandler (and thus who should respond to the done
518 // channel) would be problematic without using this approach.
519 pendingNtfns := list.New()
520 waiting := false
521 out:
522 for {
523 select {
524 // This channel is notified when a message is being queued to be sent across the network socket.
525 //
526 // It will either send the message immediately if a send is not already in progress, or queue the message to
527 // be sent once the other pending messages are sent.
528 case msg := <-c.NtfnChan:
529 if !waiting {
530 c.SendMessage(msg, ntfnSentChan)
531 } else {
532 pendingNtfns.PushBack(msg)
533 }
534 waiting = true
535 // This channel is notified when a notification has been sent across the
536 // network socket.
537 case <-ntfnSentChan:
538 // No longer waiting if there are no more messages in the pending
539 // messages queue.
540 next := pendingNtfns.Front()
541 if next == nil {
542 waiting = false
543 continue
544 }
545 // Notify the outHandler about the next item to asynchronously send.
546 msg := pendingNtfns.Remove(next).([]byte)
547 c.SendMessage(msg, ntfnSentChan)
548 case <-c.Quit.Wait():
549 break out
550 }
551 }
552 // Drain any wait channels before exiting so nothing is left waiting around to send.
553 cleanup:
554 for {
555 select {
556 case <-c.NtfnChan:
557 case <-ntfnSentChan:
558 default:
559 break cleanup
560 }
561 }
562 c.WG.Done()
563 T.Ln("websocket client notification queue handler done for", c.Addr)
564 }
565 566 // OutHandler handles all outgoing messages for the websocket connection. It must be run as a goroutine.
567 //
568 // It uses a buffered channel to serialize output messages while allowing the sender to continue running asynchronously.
569 //
570 // It must be run as a goroutine.
571 func (c *WSClient) OutHandler() {
572 out:
573 for {
574 // Send any messages ready for send until the quit channel is closed.
575 select {
576 case r := <-c.SendChan:
577 e := c.Conn.WriteMessage(websocket.TextMessage, r.Msg)
578 if e != nil {
579 c.Disconnect()
580 break out
581 }
582 if r.DoneChan != nil {
583 r.DoneChan <- true
584 }
585 case <-c.Quit.Wait():
586 break out
587 }
588 }
589 // Drain any wait channels before exiting so nothing is left waiting around to send.
590 cleanup:
591 for {
592 select {
593 case r := <-c.SendChan:
594 if r.DoneChan != nil {
595 r.DoneChan <- false
596 }
597 default:
598 break cleanup
599 }
600 }
601 c.WG.Done()
602 T.Ln("websocket client output handler done for", c.Addr)
603 }
604 605 // ServiceRequest services a parsed RPC request by looking up and executing the appropriate RPC handler. The response is
606 // marshalled and sent to the websocket client.
607 func (c *WSClient) ServiceRequest(r *ParsedRPCCmd) {
608 var (
609 result interface{}
610 e error
611 )
612 // Lookup the websocket extension for the command and if it doesn't exist fallback to handling the command as a
613 // standard command.
614 wsHandler, ok := WSHandlers[r.Method]
615 if ok {
616 result, e = wsHandler(c, r.Cmd)
617 } else {
618 result, e = c.Server.StandardCmdResult(r, nil)
619 }
620 reply, e := CreateMarshalledReply(r.ID, result, e)
621 if e != nil {
622 E.F("failed to marshal reply for <%s> command: %v", r.Method, e)
623 return
624 }
625 c.SendMessage(reply, nil)
626 }
627 628 // AddAddress adds an address to a wsClientFilter, treating it correctly based on the type of address passed as an
629 // argument. NOTE: This extension was ported from github.com/decred/dcrd
630 func (f *WSClientFilter) AddAddress(a btcaddr.Address) {
631 switch a := a.(type) {
632 case *btcaddr.PubKeyHash:
633 f.PubKeyHashes[*a.Hash160()] = struct{}{}
634 return
635 case *btcaddr.ScriptHash:
636 f.ScriptHashes[*a.Hash160()] = struct{}{}
637 return
638 case *btcaddr.PubKey:
639 serializedPubKey := a.ScriptAddress()
640 switch len(serializedPubKey) {
641 case 33: // compressed
642 var compressedPubKey [33]byte
643 copy(compressedPubKey[:], serializedPubKey)
644 f.CompressedPubKeys[compressedPubKey] = struct{}{}
645 return
646 case 65: // uncompressed
647 var uncompressedPubKey [65]byte
648 copy(uncompressedPubKey[:], serializedPubKey)
649 f.UncompressedPubKeys[uncompressedPubKey] = struct{}{}
650 return
651 }
652 }
653 f.OtherAddresses[a.EncodeAddress()] = struct{}{}
654 }
655 656 // AddAddressStr parses an address from a string and then adds it to the wsClientFilter using addAddress.
657 //
658 // NOTE: This extension was ported from github.com/decred/dcrd
659 func (f *WSClientFilter) AddAddressStr(s string, params *chaincfg.Params) {
660 // If address can't be decoded, no point in saving it since it should also impossible to create the address from an
661 // inspected transaction output script.
662 a, e := btcaddr.Decode(s, params)
663 if e != nil {
664 return
665 }
666 f.AddAddress(a)
667 }
668 669 // AddUnspentOutPoint adds an outpoint to the wsClientFilter.
670 //
671 // NOTE: This extension was ported from github.com/decred/dcrd
672 func (f *WSClientFilter) AddUnspentOutPoint(op *wire.OutPoint) {
673 f.Unspent[*op] = struct{}{}
674 }
675 676 // ExistsAddress returns true if the passed address has been added to the wsClientFilter. NOTE: This extension was
677 // ported from github.com/decred/dcrd
678 func (f *WSClientFilter) ExistsAddress(a btcaddr.Address) bool {
679 switch a := a.(type) {
680 case *btcaddr.PubKeyHash:
681 _, ok := f.PubKeyHashes[*a.Hash160()]
682 return ok
683 case *btcaddr.ScriptHash:
684 _, ok := f.ScriptHashes[*a.Hash160()]
685 return ok
686 case *btcaddr.PubKey:
687 serializedPubKey := a.ScriptAddress()
688 switch len(serializedPubKey) {
689 case 33: // compressed
690 var compressedPubKey [33]byte
691 copy(compressedPubKey[:], serializedPubKey)
692 _, ok := f.CompressedPubKeys[compressedPubKey]
693 if !ok {
694 _, ok = f.PubKeyHashes[*a.PubKeyHash().Hash160()]
695 }
696 return ok
697 case 65: // uncompressed
698 var uncompressedPubKey [65]byte
699 copy(uncompressedPubKey[:], serializedPubKey)
700 _, ok := f.UncompressedPubKeys[uncompressedPubKey]
701 if !ok {
702 _, ok = f.PubKeyHashes[*a.PubKeyHash().Hash160()]
703 }
704 return ok
705 }
706 }
707 _, ok := f.OtherAddresses[a.EncodeAddress()]
708 return ok
709 }
710 711 // ExistsUnspentOutPoint returns true if the passed outpoint has been added to the wsClientFilter.
712 //
713 // NOTE: This extension was ported from github.com/decred/dcrd
714 func (f *WSClientFilter) ExistsUnspentOutPoint(op *wire.OutPoint) bool {
715 _, ok := f.Unspent[*op]
716 return ok
717 }
718 719 // // removeAddress removes the passed address, if it exists, from the
720 // wsClientFilter. NOTE: This extension was ported from github.com/decred/dcrd
721 // func (f *wsClientFilter) removeAddress(a util.Address) {
722 // switch a := a.(type) {
723 // case *util.PubKeyHash:
724 // delete(f.pubKeyHashes, *a.Hash160())
725 // return
726 // case *util.ScriptHash:
727 // delete(f.scriptHashes, *a.Hash160())
728 // return
729 // case *util.PubKey:
730 // serializedPubKey := a.ScriptAddress()
731 // switch len(serializedPubKey) {
732 // case 33: // compressed
733 // var compressedPubKey [33]byte
734 // copy(compressedPubKey[:], serializedPubKey)
735 // delete(f.compressedPubKeys, compressedPubKey)
736 // return
737 // case 65: // uncompressed
738 // var uncompressedPubKey [65]byte
739 // copy(uncompressedPubKey[:], serializedPubKey)
740 // delete(f.uncompressedPubKeys, uncompressedPubKey)
741 // return
742 // }
743 // }
744 // delete(f.otherAddresses, a.EncodeAddress())
745 // }
746 // // removeAddressStr parses an address from a string and then removes it from
747 // // the wsClientFilter using removeAddress. NOTE: This extension was ported
748 // // from github.com/decred/dcrd
749 // func (f *wsClientFilter) removeAddressStr(s string, netparams *chaincfg.Params) {
750 // a, e := util.Decode(s, netparams)
751 // if e == nil {
752 // f.removeAddress(a)
753 // } else {
754 // delete(f.otherAddresses, s)
755 // }
756 // }
757 // // removeUnspentOutPoint removes the passed outpoint, if it exists, from the
758 // wsClientFilter. NOTE: This extension was ported from github.com/decred/dcrd
759 // func (f *wsClientFilter) removeUnspentOutPoint(op *wire.OutPoint) {
760 // delete(f.unspent, *op)
761 // }
762 763 // AddClient adds the passed websocket client to the notification manager.
764 func (m *WSNtfnMgr) AddClient(wsc *WSClient) {
765 m.QueueNotification <- (*NotificationRegisterClient)(wsc)
766 }
767 768 // SendNotifyBlockConnected passes a block newly-connected to the best chain to the notification manager for block and
769 // transaction notification processing.
770 func (m *WSNtfnMgr) SendNotifyBlockConnected(block *block.Block) {
771 // As NotifyBlockConnected will be called by the block manager and the RPC Server may no longer be running, use a
772 // select statement to unblock enqueuing the notification once the RPC Server has begun shutting down.
773 select {
774 case m.QueueNotification <- (*NotificationBlockConnected)(block):
775 case <-m.Quit.Wait():
776 }
777 }
778 779 // SendNotifyBlockDisconnected passes a block disconnected from the best chain to the notification manager for block
780 // notification processing.
781 func (m *WSNtfnMgr) SendNotifyBlockDisconnected(block *block.Block) {
782 // As NotifyBlockDisconnected will be called by the block manager and the RPC Server may no longer be running, use a
783 // select statement to unblock enqueuing the notification once the RPC Server has begun shutting down.
784 select {
785 case m.QueueNotification <- (*NotificationBlockDisconnected)(block):
786 case <-m.Quit.Wait():
787 }
788 }
789 790 // SendNotifyMempoolTx passes a transaction accepted by mempool to the notification manager for transaction notification
791 // processing. If isNew is true, the tx is is a new transaction, rather than one added to the mempool during a reorg.
792 func (m *WSNtfnMgr) SendNotifyMempoolTx(tx *util.Tx, isNew bool) {
793 n := &NotificationTxAcceptedByMempool{
794 IsNew: isNew,
795 Tx: tx,
796 }
797 // As NotifyMempoolTx will be called by mempool and the RPC Server may no longer be running, use a select statement
798 // to unblock enqueuing the notification once the RPC Server has begun shutting down.
799 select {
800 case m.QueueNotification <- n:
801 case <-m.Quit.Wait():
802 }
803 }
804 805 // GetNumClients returns the number of clients actively being served.
806 func (m *WSNtfnMgr) GetNumClients() (n int) {
807 select {
808 case n = <-m.NumClients:
809 case <-m.Quit.Wait(): // Use default n (0) if Server has shut down.
810 }
811 return
812 }
813 814 // RegisterBlockUpdates requests block update notifications to the passed websocket client.
815 func (m *WSNtfnMgr) RegisterBlockUpdates(wsc *WSClient) {
816 m.QueueNotification <- (*NotificationRegisterBlocks)(wsc)
817 }
818 819 // RegisterNewMempoolTxsUpdates requests notifications to the passed websocket client when new transactions are added to
820 // the memory pool.
821 func (m *WSNtfnMgr) RegisterNewMempoolTxsUpdates(wsc *WSClient) {
822 m.QueueNotification <- (*NotificationRegisterNewMempoolTxs)(wsc)
823 }
824 825 // RegisterSpentRequests requests a notification when each of the passed outpoints is confirmed spent (contained in a
826 // block connected to the main chain) for the passed websocket client. The request is automatically removed once the
827 // notification has been sent.
828 func (m *WSNtfnMgr) RegisterSpentRequests(wsc *WSClient, ops []*wire.OutPoint) {
829 m.QueueNotification <- &NotificationRegisterSpent{
830 WSC: wsc,
831 OPs: ops,
832 }
833 }
834 835 // RegisterTxOutAddressRequests requests notifications to the passed websocket client when a transaction output spends
836 // to the passed address.
837 func (m *WSNtfnMgr) RegisterTxOutAddressRequests(
838 wsc *WSClient, addrs []string,
839 ) {
840 m.QueueNotification <- &NotificationRegisterAddr{
841 WSC: wsc,
842 Addrs: addrs,
843 }
844 }
845 846 // RemoveClient removes the passed websocket client and all notifications registered for it.
847 func (m *WSNtfnMgr) RemoveClient(wsc *WSClient) {
848 select {
849 case m.QueueNotification <- (*NotificationUnregisterClient)(wsc):
850 case <-m.Quit.Wait():
851 }
852 }
853 854 // Shutdown shuts down the manager, stopping the notification queue and notification handler goroutines.
855 func (m *WSNtfnMgr) Shutdown() {
856 m.Quit.Q()
857 }
858 859 // Start starts the goroutines required for the manager to queue and process websocket client notifications.
860 func (m *WSNtfnMgr) Start() {
861 go m.QueueHandler()
862 go m.NotificationHandler()
863 }
864 865 // UnregisterBlockUpdates removes block update notifications for the passed websocket client.
866 func (m *WSNtfnMgr) UnregisterBlockUpdates(wsc *WSClient) {
867 m.QueueNotification <- (*NotificationUnregisterBlocks)(wsc)
868 }
869 870 // UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket client when new transaction are added to
871 // the memory pool.
872 func (m *WSNtfnMgr) UnregisterNewMempoolTxsUpdates(wsc *WSClient) {
873 m.QueueNotification <- (*NotificationUnregisterNewMempoolTxs)(wsc)
874 }
875 876 // UnregisterSpentRequest removes a request from the passed websocket client to be notified when the passed outpoint is
877 // confirmed spent (contained in a block connected to the main chain).
878 func (m *WSNtfnMgr) UnregisterSpentRequest(
879 wsc *WSClient,
880 op *wire.OutPoint,
881 ) {
882 m.QueueNotification <- &NotificationUnregisterSpent{
883 WSC: wsc,
884 OP: op,
885 }
886 }
887 888 // UnregisterTxOutAddressRequest removes a request from the passed websocket client to be notified when a transaction
889 // spends to the passed address.
890 func (m *WSNtfnMgr) UnregisterTxOutAddressRequest(wsc *WSClient, addr string) {
891 m.QueueNotification <- &NotificationUnregisterAddr{
892 WSC: wsc,
893 Addr: addr,
894 }
895 }
896 897 // WaitForShutdown blocks until all notification manager goroutines have finished.
898 func (m *WSNtfnMgr) WaitForShutdown() {
899 m.WG.Wait()
900 }
901 902 // AddAddrRequests adds the websocket client wsc to the address to client set addrMap so wsc will be notified for any
903 // mempool or block transaction outputs spending to any of the addresses in addrs.
904 func (*WSNtfnMgr) AddAddrRequests(
905 addrMap map[string]map[qu.C]*WSClient, wsc *WSClient, addrs []string,
906 ) {
907 for _, addr := range addrs {
908 // Track the request in the client as well so it can be quickly be removed on disconnect.
909 wsc.AddrRequests[addr] = struct{}{}
910 // Add the client to the set of clients to notify when the outpoint is seen. Create map as needed.
911 cmap, ok := addrMap[addr]
912 if !ok {
913 cmap = make(map[qu.C]*WSClient)
914 addrMap[addr] = cmap
915 }
916 cmap[wsc.Quit] = wsc
917 }
918 }
919 920 // AddSpentRequests modifies a map of watched outpoints to sets of websocket clients to add a new request watch all of
921 // the outpoints in ops and create and send a notification when spent to the websocket client wsc.
922 func (m *WSNtfnMgr) AddSpentRequests(
923 opMap map[wire.
924 OutPoint]map[qu.C]*WSClient, wsc *WSClient, ops []*wire.OutPoint,
925 ) {
926 for _, op := range ops {
927 // Track the request in the client as well so it can be quickly be removed on disconnect.
928 wsc.SpentRequests[*op] = struct{}{}
929 // Add the client to the list to notify when the outpoint is seen. Create the list as needed.
930 cmap, ok := opMap[*op]
931 if !ok {
932 cmap = make(map[qu.C]*WSClient)
933 opMap[*op] = cmap
934 }
935 cmap[wsc.Quit] = wsc
936 }
937 // Chk if any transactions spending these outputs already exists in the mempool, if so send the notification
938 // immediately.
939 spends := make(map[chainhash.Hash]*util.Tx)
940 for _, op := range ops {
941 spend := m.Server.Cfg.TxMemPool.CheckSpend(*op)
942 if spend != nil {
943 D.F(
944 "found existing mempool spend for outpoint<%v>: %v",
945 op, spend.Hash(),
946 )
947 spends[*spend.Hash()] = spend
948 }
949 }
950 for _, spend := range spends {
951 m.NotifyForTx(opMap, nil, spend, nil)
952 }
953 }
954 955 // NotificationHandler reads notifications and control messages from the queue handler and processes one at a time.
956 func (m *WSNtfnMgr) NotificationHandler() {
957 // clients is a map of all currently connected websocket clients.
958 clients := make(map[qu.C]*WSClient)
959 // Maps used to hold lists of websocket clients to be notified on certain events. Each websocket client also keeps
960 // maps for the events which have multiple triggers to make removal from these lists on connection close less
961 // horrendously.
962 //
963 // Where possible, the quit channel is used as the unique id for a client since it is quite a bit more efficient
964 // than using the entire struct.
965 blockNotifications := make(map[qu.C]*WSClient)
966 txNotifications := make(map[qu.C]*WSClient)
967 watchedOutPoints := make(map[wire.OutPoint]map[qu.C]*WSClient)
968 watchedAddrs := make(map[string]map[qu.C]*WSClient)
969 out:
970 for {
971 select {
972 case n, ok := <-m.NotificationMsgs:
973 if !ok {
974 // queueHandler quit.
975 break out
976 }
977 switch n := n.(type) {
978 case *NotificationBlockConnected:
979 block := (*block.Block)(n)
980 // Skip iterating through all txs if no tx notification requests exist.
981 if len(watchedOutPoints) != 0 || len(watchedAddrs) != 0 {
982 for _, tx := range block.Transactions() {
983 m.NotifyForTx(
984 watchedOutPoints,
985 watchedAddrs, tx, block,
986 )
987 }
988 }
989 if len(blockNotifications) != 0 {
990 m.NotifyBlockConnected(
991 blockNotifications,
992 block,
993 )
994 m.NotifyFilteredBlockConnected(
995 blockNotifications,
996 block,
997 )
998 }
999 case *NotificationBlockDisconnected:
1000 block := (*block.Block)(n)
1001 if len(blockNotifications) != 0 {
1002 m.NotifyBlockDisconnected(
1003 blockNotifications,
1004 block,
1005 )
1006 m.NotifyFilteredBlockDisconnected(
1007 blockNotifications,
1008 block,
1009 )
1010 }
1011 case *NotificationTxAcceptedByMempool:
1012 if n.IsNew && len(txNotifications) != 0 {
1013 m.NotifyForNewTx(txNotifications, n.Tx)
1014 }
1015 m.NotifyForTx(watchedOutPoints, watchedAddrs, n.Tx, nil)
1016 m.NotifyRelevantTxAccepted(n.Tx, clients)
1017 case *NotificationRegisterBlocks:
1018 wsc := (*WSClient)(n)
1019 blockNotifications[wsc.Quit] = wsc
1020 case *NotificationUnregisterBlocks:
1021 wsc := (*WSClient)(n)
1022 delete(blockNotifications, wsc.Quit)
1023 case *NotificationRegisterClient:
1024 wsc := (*WSClient)(n)
1025 clients[wsc.Quit] = wsc
1026 case *NotificationUnregisterClient:
1027 wsc := (*WSClient)(n)
1028 // Remove any requests made by the client as well as the client itself.
1029 delete(blockNotifications, wsc.Quit)
1030 delete(txNotifications, wsc.Quit)
1031 for k := range wsc.SpentRequests {
1032 op := k
1033 m.RemoveSpentRequest(watchedOutPoints, wsc, &op)
1034 }
1035 for addr := range wsc.AddrRequests {
1036 m.RemoveAddrRequest(watchedAddrs, wsc, addr)
1037 }
1038 delete(clients, wsc.Quit)
1039 case *NotificationRegisterSpent:
1040 m.AddSpentRequests(watchedOutPoints, n.WSC, n.OPs)
1041 case *NotificationUnregisterSpent:
1042 m.RemoveSpentRequest(watchedOutPoints, n.WSC, n.OP)
1043 case *NotificationRegisterAddr:
1044 m.AddAddrRequests(watchedAddrs, n.WSC, n.Addrs)
1045 case *NotificationUnregisterAddr:
1046 m.RemoveAddrRequest(watchedAddrs, n.WSC, n.Addr)
1047 case *NotificationRegisterNewMempoolTxs:
1048 wsc := (*WSClient)(n)
1049 txNotifications[wsc.Quit] = wsc
1050 case *NotificationUnregisterNewMempoolTxs:
1051 wsc := (*WSClient)(n)
1052 delete(txNotifications, wsc.Quit)
1053 default:
1054 W.Ln("unhandled notification type")
1055 }
1056 case m.NumClients <- len(clients):
1057 case <-m.Quit.Wait():
1058 // RPC Server shutting down.
1059 break out
1060 }
1061 }
1062 for _, c := range clients {
1063 c.Disconnect()
1064 }
1065 m.WG.Done()
1066 }
1067 1068 // NotifyBlockConnected notifies websocket clients that have registered for block updates when a block is connected to
1069 // the main chain.
1070 func (*WSNtfnMgr) NotifyBlockConnected(
1071 clients map[qu.C]*WSClient, block *block.Block,
1072 ) {
1073 // Notify interested websocket clients about the connected block.
1074 ntfn := btcjson.NewBlockConnectedNtfn(
1075 block.Hash().String(), block.Height(),
1076 block.WireBlock().Header.Timestamp.Unix(),
1077 )
1078 marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
1079 if e != nil {
1080 E.Ln("failed to marshal block connected notification:", e)
1081 return
1082 }
1083 for _, wsc := range clients {
1084 e := wsc.QueueNotification(marshalledJSON)
1085 if e != nil {
1086 }
1087 }
1088 }
1089 1090 // NotifyBlockDisconnected notifies websocket clients that have registered for block updates when a block is
1091 // disconnected from the main chain (due to a reorganize).
1092 func (*WSNtfnMgr) NotifyBlockDisconnected(
1093 clients map[qu.C]*WSClient, block *block.Block,
1094 ) {
1095 // Skip notification creation if no clients have requested block connected/ disconnected notifications.
1096 if len(clients) == 0 {
1097 return
1098 }
1099 // Notify interested websocket clients about the disconnected block.
1100 ntfn := btcjson.NewBlockDisconnectedNtfn(
1101 block.Hash().String(),
1102 block.Height(), block.WireBlock().Header.Timestamp.Unix(),
1103 )
1104 marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
1105 if e != nil {
1106 E.Ln(
1107 "failed to marshal block disconnected notification:",
1108 e,
1109 )
1110 return
1111 }
1112 for _, wsc := range clients {
1113 e := wsc.QueueNotification(marshalledJSON)
1114 if e != nil {
1115 }
1116 }
1117 }
1118 1119 // NotifyFilteredBlockConnected notifies websocket clients that have registered for block updates when a block is
1120 // connected to the main chain.
1121 func (m *WSNtfnMgr) NotifyFilteredBlockConnected(
1122 clients map[qu.C]*WSClient, block *block.Block,
1123 ) {
1124 // Create the common portion of the notification that is the same for every client.
1125 var w bytes.Buffer
1126 e := block.WireBlock().Header.Serialize(&w)
1127 if e != nil {
1128 E.Ln(
1129 "failed to serialize header for filtered block connected"+
1130 " notification:", e,
1131 )
1132 return
1133 }
1134 ntfn := btcjson.NewFilteredBlockConnectedNtfn(
1135 block.Height(),
1136 hex.EncodeToString(w.Bytes()), nil,
1137 )
1138 // Search for relevant transactions for each client and save them serialized in hex encoding for the notification.
1139 subscribedTxs := make(map[qu.C][]string)
1140 for _, tx := range block.Transactions() {
1141 var txHex string
1142 for quitChan := range m.GetSubscribedClients(tx, clients) {
1143 if txHex == "" {
1144 txHex = TxHexString(tx.MsgTx())
1145 }
1146 subscribedTxs[quitChan] = append(subscribedTxs[quitChan], txHex)
1147 }
1148 }
1149 for quitChan, wsc := range clients {
1150 // Add all discovered transactions for this client. For clients that have no new-style filter, add the empty
1151 // string slice.
1152 ntfn.SubscribedTxs = subscribedTxs[quitChan]
1153 // Marshal and queue notification.
1154 marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
1155 if e != nil {
1156 E.F("failed to marshal filtered block connected notification:", e)
1157 return
1158 }
1159 e = wsc.QueueNotification(marshalledJSON)
1160 if e != nil {
1161 D.Ln(e)
1162 }
1163 }
1164 }
1165 1166 // NotifyFilteredBlockDisconnected notifies websocket clients that have registered for block updates when a block is
1167 // disconnected from the main chain (due to a reorganize).
1168 func (*WSNtfnMgr) NotifyFilteredBlockDisconnected(
1169 clients map[qu.C]*WSClient, block *block.Block,
1170 ) {
1171 // Skip notification creation if no clients have requested block connected/ disconnected notifications.
1172 if len(clients) == 0 {
1173 return
1174 }
1175 // Notify interested websocket clients about the disconnected block.
1176 var w bytes.Buffer
1177 e := block.WireBlock().Header.Serialize(&w)
1178 if e != nil {
1179 E.Ln("failed to serialize header for filtered block disconnected notification:", e)
1180 return
1181 }
1182 ntfn := btcjson.NewFilteredBlockDisconnectedNtfn(block.Height(), hex.EncodeToString(w.Bytes()))
1183 marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
1184 if e != nil {
1185 E.Ln("failed to marshal filtered block disconnected notification:", e)
1186 return
1187 }
1188 for _, wsc := range clients {
1189 e := wsc.QueueNotification(marshalledJSON)
1190 if e != nil {
1191 D.Ln(e)
1192 }
1193 }
1194 }
1195 1196 // NotifyForNewTx notifies websocket clients that have registered for updates when a new transaction is added to the
1197 // memory pool.
1198 func (m *WSNtfnMgr) NotifyForNewTx(
1199 clients map[qu.C]*WSClient,
1200 tx *util.Tx,
1201 ) {
1202 txHashStr := tx.Hash().String()
1203 mtx := tx.MsgTx()
1204 var amount int64
1205 for _, txOut := range mtx.TxOut {
1206 amount += txOut.Value
1207 }
1208 ntfn := btcjson.NewTxAcceptedNtfn(txHashStr, amount2.Amount(amount).ToDUO())
1209 marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
1210 if e != nil {
1211 E.Ln("failed to marshal tx notification:", e)
1212 return
1213 }
1214 var verboseNtfn *btcjson.TxAcceptedVerboseNtfn
1215 var marshalledJSONVerbose []byte
1216 for _, wsc := range clients {
1217 if wsc.VerboseTxUpdates {
1218 net := m.Server.Cfg.ChainParams
1219 var rawTx *btcjson.TxRawResult
1220 rawTx, e = CreateTxRawResult(
1221 net, mtx, txHashStr, nil,
1222 "", 0, 0,
1223 )
1224 if e != nil {
1225 return
1226 }
1227 verboseNtfn = btcjson.NewTxAcceptedVerboseNtfn(*rawTx)
1228 marshalledJSONVerbose, e = btcjson.MarshalCmd(
1229 nil,
1230 verboseNtfn,
1231 )
1232 if e != nil {
1233 E.Ln("failed to marshal verbose tx notification:", e)
1234 }
1235 if marshalledJSONVerbose != nil {
1236 e = wsc.QueueNotification(marshalledJSONVerbose)
1237 if e != nil {
1238 D.Ln(e)
1239 }
1240 continue
1241 }
1242 return
1243 }
1244 e = wsc.QueueNotification(marshalledJSONVerbose)
1245 if e != nil {
1246 } else {
1247 e := wsc.QueueNotification(marshalledJSON)
1248 if e != nil {
1249 }
1250 }
1251 }
1252 }
1253 1254 // NotifyForTx examines the inputs and outputs of the passed transaction, notifying websocket clients of outputs
1255 // spending to a watched address and inputs spending a watched outpoint.
1256 func (m *WSNtfnMgr) NotifyForTx(
1257 ops map[wire.OutPoint]map[qu.C]*WSClient,
1258 addrs map[string]map[qu.C]*WSClient, tx *util.Tx, block *block.Block,
1259 ) {
1260 if len(ops) != 0 {
1261 m.NotifyForTxIns(ops, tx, block)
1262 }
1263 if len(addrs) != 0 {
1264 m.NotifyForTxOuts(ops, addrs, tx, block)
1265 }
1266 }
1267 1268 // NotifyForTxIns examines the inputs of the passed transaction and sends interested websocket clients a redeemingtx
1269 // notification if any inputs spend a watched output. If block is non-nil, any matching spent requests are removed.
1270 func (m *WSNtfnMgr) NotifyForTxIns(
1271 ops map[wire.
1272 OutPoint]map[qu.C]*WSClient, tx *util.Tx, block *block.Block,
1273 ) {
1274 // Nothing to do if nobody is watching outpoints.
1275 if len(ops) == 0 {
1276 return
1277 }
1278 txHex := ""
1279 wscNotified := make(map[qu.C]struct{})
1280 for _, txIn := range tx.MsgTx().TxIn {
1281 prevOut := &txIn.PreviousOutPoint
1282 if cmap, ok := ops[*prevOut]; ok {
1283 if txHex == "" {
1284 txHex = TxHexString(tx.MsgTx())
1285 }
1286 marshalledJSON, e := NewRedeemingTxNotification(txHex, tx.Index(), block)
1287 if e != nil {
1288 E.Ln(
1289 "failed to marshal redeemingtx notification:", e,
1290 )
1291 continue
1292 }
1293 for wscQuit, wsc := range cmap {
1294 if block != nil {
1295 m.RemoveSpentRequest(ops, wsc, prevOut)
1296 }
1297 if _, ok := wscNotified[wscQuit]; !ok {
1298 wscNotified[wscQuit] = struct{}{}
1299 e := wsc.QueueNotification(marshalledJSON)
1300 if e != nil {
1301 D.Ln(e)
1302 }
1303 }
1304 }
1305 }
1306 }
1307 }
1308 1309 // NotifyForTxOuts examines each transaction output, notifying interested websocket clients of the transaction if an
1310 // output spends to a watched address. A spent notification request is automatically registered for the client for each
1311 // matching output.
1312 func (m *WSNtfnMgr) NotifyForTxOuts(
1313 ops map[wire.OutPoint]map[qu.C]*WSClient,
1314 addrs map[string]map[qu.C]*WSClient, tx *util.Tx, block *block.Block,
1315 ) {
1316 // Nothing to do if nobody is listening for address notifications.
1317 if len(addrs) == 0 {
1318 return
1319 }
1320 txHex := ""
1321 wscNotified := make(map[qu.C]struct{})
1322 for i, txOut := range tx.MsgTx().TxOut {
1323 var txAddrs []btcaddr.Address
1324 var e error
1325 _, txAddrs, _, e = txscript.ExtractPkScriptAddrs(
1326 txOut.PkScript, m.Server.Cfg.ChainParams,
1327 )
1328 if e != nil {
1329 continue
1330 }
1331 for _, txAddr := range txAddrs {
1332 cmap, ok := addrs[txAddr.EncodeAddress()]
1333 if !ok {
1334 continue
1335 }
1336 if txHex == "" {
1337 txHex = TxHexString(tx.MsgTx())
1338 }
1339 ntfn := btcjson.NewRecvTxNtfn(
1340 txHex, BlockDetails(
1341 block,
1342 tx.Index(),
1343 ),
1344 )
1345 marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
1346 if e != nil {
1347 E.Ln("Failed to marshal processedtx notification:", e)
1348 continue
1349 }
1350 op := []*wire.OutPoint{wire.NewOutPoint(tx.Hash(), uint32(i))}
1351 for wscQuit, wsc := range cmap {
1352 m.AddSpentRequests(ops, wsc, op)
1353 if _, ok := wscNotified[wscQuit]; !ok {
1354 wscNotified[wscQuit] = struct{}{}
1355 e := wsc.QueueNotification(marshalledJSON)
1356 if e != nil {
1357 }
1358 }
1359 }
1360 }
1361 }
1362 }
1363 1364 // NotifyRelevantTxAccepted examines the inputs and outputs of the passed transaction, notifying websocket clients of
1365 // outputs spending to a watched address and inputs spending a watched outpoint.
1366 //
1367 // Any outputs paying to a watched address result in the output being watched as well for future notifications.
1368 func (m *WSNtfnMgr) NotifyRelevantTxAccepted(
1369 tx *util.Tx, clients map[qu.C]*WSClient,
1370 ) {
1371 clientsToNotify := m.GetSubscribedClients(tx, clients)
1372 if len(clientsToNotify) != 0 {
1373 n := btcjson.NewRelevantTxAcceptedNtfn(TxHexString(tx.MsgTx()))
1374 marshalled, e := btcjson.MarshalCmd(nil, n)
1375 if e != nil {
1376 E.Ln("failed to marshal notification:", e)
1377 return
1378 }
1379 for quitChan := range clientsToNotify {
1380 e := clients[quitChan].QueueNotification(marshalled)
1381 if e != nil {
1382 }
1383 }
1384 }
1385 }
1386 1387 // QueueHandler maintains a queue of notifications and notification handler control messages.
1388 func (m *WSNtfnMgr) QueueHandler() {
1389 QueueHandler(m.QueueNotification, m.NotificationMsgs, m.Quit)
1390 m.WG.Done()
1391 }
1392 1393 // RemoveAddrRequest removes the websocket client wsc from the address to client set addrs so it will no longer receive
1394 // notification updates for any transaction outputs send to addr.
1395 func (*WSNtfnMgr) RemoveAddrRequest(
1396 addrs map[string]map[qu.C]*WSClient, wsc *WSClient, addr string,
1397 ) {
1398 // Remove the request tracking from the client.
1399 delete(wsc.AddrRequests, addr)
1400 // Remove the client from the list to notify.
1401 cmap, ok := addrs[addr]
1402 if !ok {
1403 W.F(
1404 "attempt to remove nonexistent addr request <%s> for websocket client %s",
1405 addr, wsc.Addr,
1406 )
1407 return
1408 }
1409 delete(cmap, wsc.Quit)
1410 // Remove the map entry altogether if there are no more clients interested in it.
1411 if len(cmap) == 0 {
1412 delete(addrs, addr)
1413 }
1414 }
1415 1416 // RemoveSpentRequest modifies a map of watched outpoints to remove the websocket client wsc from the set of clients to
1417 // be notified when a watched outpoint is spent. If wsc is the last client, the outpoint key is removed from the map.
1418 func (*WSNtfnMgr) RemoveSpentRequest(
1419 ops map[wire.
1420 OutPoint]map[qu.C]*WSClient, wsc *WSClient, op *wire.OutPoint,
1421 ) {
1422 // Remove the request tracking from the client.
1423 delete(wsc.SpentRequests, *op)
1424 // Remove the client from the list to notify.
1425 notifyMap, ok := ops[*op]
1426 if !ok {
1427 W.Ln(
1428 "attempt to remove nonexistent spent request for"+
1429 " websocket client", wsc.Addr,
1430 )
1431 return
1432 }
1433 delete(notifyMap, wsc.Quit)
1434 // Remove the map entry altogether if there are no more clients interested in it.
1435 if len(notifyMap) == 0 {
1436 delete(ops, *op)
1437 }
1438 }
1439 1440 // GetSubscribedClients returns the set of all websocket client quit channels that are registered to receive
1441 // notifications regarding tx, either due to tx spending a watched output or outputting to a watched address.
1442 //
1443 // Matching client's filters are updated based on this transaction's outputs and output addresses that may be relevant
1444 // for a client.
1445 func (m *WSNtfnMgr) GetSubscribedClients(
1446 tx *util.Tx,
1447 clients map[qu.C]*WSClient,
1448 ) map[qu.C]struct{} {
1449 // Use a map of client quit channels as keys to prevent duplicates when multiple inputs and/or outputs are relevant
1450 // to the client.
1451 subscribed := make(map[qu.C]struct{})
1452 msgTx := tx.MsgTx()
1453 for _, input := range msgTx.TxIn {
1454 for quitChan, wsc := range clients {
1455 wsc.Lock()
1456 filter := wsc.FilterData
1457 wsc.Unlock()
1458 if filter == nil {
1459 continue
1460 }
1461 filter.mu.Lock()
1462 if filter.ExistsUnspentOutPoint(&input.PreviousOutPoint) {
1463 subscribed[quitChan] = struct{}{}
1464 }
1465 filter.mu.Unlock()
1466 }
1467 }
1468 var e error
1469 for i, output := range msgTx.TxOut {
1470 var addrs []btcaddr.Address
1471 _, addrs, _, e = txscript.ExtractPkScriptAddrs(
1472 output.PkScript, m.Server.Cfg.ChainParams,
1473 )
1474 if e != nil {
1475 // Clients are not able to subscribe to nonstandard or non-address outputs.
1476 continue
1477 }
1478 for quitChan, wsc := range clients {
1479 wsc.Lock()
1480 filter := wsc.FilterData
1481 wsc.Unlock()
1482 if filter == nil {
1483 continue
1484 }
1485 filter.mu.Lock()
1486 for _, a := range addrs {
1487 if filter.ExistsAddress(a) {
1488 subscribed[quitChan] = struct{}{}
1489 op := wire.OutPoint{
1490 Hash: *tx.Hash(),
1491 Index: uint32(i),
1492 }
1493 filter.AddUnspentOutPoint(&op)
1494 }
1495 }
1496 filter.mu.Unlock()
1497 }
1498 }
1499 return subscribed
1500 }
1501 func (s Semaphore) Acquire() {
1502 s <- struct{}{}
1503 }
1504 func (s Semaphore) Release() {
1505 <-s
1506 }
1507 1508 // BlockDetails creates a BlockDetails struct to include in btcws notifications from a block and a transaction's block
1509 // index.
1510 func BlockDetails(block *block.Block, txIndex int) *btcjson.BlockDetails {
1511 if block == nil {
1512 return nil
1513 }
1514 return &btcjson.BlockDetails{
1515 Height: block.Height(),
1516 Hash: block.Hash().String(),
1517 Index: txIndex,
1518 Time: block.WireBlock().Header.Timestamp.Unix(),
1519 }
1520 }
1521 1522 // CheckAddressValidity checks the validity of each address in the passed string slice. It does this by attempting to
1523 // decode each address using the current active network parameters.
1524 //
1525 // If any single address fails to decode properly, the function returns an error. Otherwise, nil is returned.
1526 func CheckAddressValidity(addrs []string, params *chaincfg.Params) (e error) {
1527 for _, addr := range addrs {
1528 _, e := btcaddr.Decode(addr, params)
1529 if e != nil {
1530 return &btcjson.RPCError{
1531 Code: btcjson.ErrRPCInvalidAddressOrKey,
1532 Message: fmt.Sprintf(
1533 "Invalid address or key: %v",
1534 addr,
1535 ),
1536 }
1537 }
1538 }
1539 return nil
1540 }
1541 1542 // DescendantBlock returns the appropriate JSON-RPC error if a current block fetched during a reorganize is not a direct
1543 // child of the parent block hash.
1544 func DescendantBlock(
1545 prevHash *chainhash.Hash, curBlock *block.Block,
1546 ) (e error) {
1547 curHash := &curBlock.WireBlock().Header.PrevBlock
1548 if !prevHash.IsEqual(curHash) {
1549 E.F(
1550 "stopping rescan for reorged block %v (replaced by block %v)",
1551 prevHash, curHash,
1552 )
1553 return &ErrRescanReorg
1554 }
1555 return nil
1556 }
1557 1558 // DeserializeOutpoints deserializes each serialized outpoint.
1559 func DeserializeOutpoints(serializedOuts []btcjson.OutPoint) (
1560 []*wire.OutPoint,
1561 error,
1562 ) {
1563 outpoints := make([]*wire.OutPoint, 0, len(serializedOuts))
1564 for i := range serializedOuts {
1565 blockHash, e := chainhash.NewHashFromStr(serializedOuts[i].Hash)
1566 if e != nil {
1567 return nil, DecodeHexError(serializedOuts[i].Hash)
1568 }
1569 index := serializedOuts[i].Index
1570 outpoints = append(outpoints, wire.NewOutPoint(blockHash, index))
1571 }
1572 return outpoints, nil
1573 }
1574 1575 // HandleLoadTxFilter implements the loadtxfilter command extension for websocket connections. NOTE: This extension is
1576 // ported from github.com/decred/dcrd
1577 func HandleLoadTxFilter(wsc *WSClient, icmd interface{}) (interface{}, error) {
1578 cmd := icmd.(*btcjson.LoadTxFilterCmd)
1579 outPoints := make([]wire.OutPoint, len(cmd.OutPoints))
1580 for i := range cmd.OutPoints {
1581 hash, e := chainhash.NewHashFromStr(cmd.OutPoints[i].Hash)
1582 if e != nil {
1583 return nil, &btcjson.RPCError{
1584 Code: btcjson.ErrRPCInvalidParameter,
1585 Message: e.Error(),
1586 }
1587 }
1588 outPoints[i] = wire.OutPoint{
1589 Hash: *hash,
1590 Index: cmd.OutPoints[i].Index,
1591 }
1592 }
1593 params := wsc.Server.Cfg.ChainParams
1594 wsc.Lock()
1595 if cmd.Reload || wsc.FilterData == nil {
1596 wsc.FilterData = NewWSClientFilter(
1597 cmd.Addresses, outPoints,
1598 params,
1599 )
1600 wsc.Unlock()
1601 } else {
1602 wsc.Unlock()
1603 wsc.FilterData.mu.Lock()
1604 for _, a := range cmd.Addresses {
1605 wsc.FilterData.AddAddressStr(a, params)
1606 }
1607 for i := range outPoints {
1608 wsc.FilterData.AddUnspentOutPoint(&outPoints[i])
1609 }
1610 wsc.FilterData.mu.Unlock()
1611 }
1612 return nil, nil
1613 }
1614 1615 // HandleNotifyBlocks implements the notifyblocks command extension for websocket connections.
1616 func HandleNotifyBlocks(wsc *WSClient, icmd interface{}) (interface{}, error) {
1617 wsc.Server.NtfnMgr.RegisterBlockUpdates(wsc)
1618 return nil, nil
1619 }
1620 1621 // HandleNotifyNewTransactions implements the notifynewtransactions command
1622 // extension for websocket connections.
1623 func HandleNotifyNewTransactions(
1624 wsc *WSClient,
1625 icmd interface{},
1626 ) (interface{}, error) {
1627 cmd, ok := icmd.(*btcjson.NotifyNewTransactionsCmd)
1628 if !ok {
1629 return nil, btcjson.ErrRPCInternal
1630 }
1631 wsc.VerboseTxUpdates = cmd.Verbose != nil && *cmd.Verbose
1632 wsc.Server.NtfnMgr.RegisterNewMempoolTxsUpdates(wsc)
1633 return nil, nil
1634 }
1635 1636 // HandleNotifyReceived implements the notifyreceived command extension for websocket connections.
1637 func HandleNotifyReceived(wsc *WSClient, icmd interface{}) (
1638 interface{},
1639 error,
1640 ) {
1641 cmd, ok := icmd.(*btcjson.NotifyReceivedCmd)
1642 if !ok {
1643 return nil, btcjson.ErrRPCInternal
1644 }
1645 // Decode addresses to validate input, but the strings slice is used directly if these are all ok.
1646 e := CheckAddressValidity(cmd.Addresses, wsc.Server.Cfg.ChainParams)
1647 if e != nil {
1648 return nil, e
1649 }
1650 wsc.Server.NtfnMgr.RegisterTxOutAddressRequests(wsc, cmd.Addresses)
1651 return nil, nil
1652 }
1653 1654 // HandleNotifySpent implements the notifyspent command extension for websocket connections.
1655 func HandleNotifySpent(wsc *WSClient, icmd interface{}) (interface{}, error) {
1656 cmd, ok := icmd.(*btcjson.NotifySpentCmd)
1657 if !ok {
1658 return nil, btcjson.ErrRPCInternal
1659 }
1660 outpoints, e := DeserializeOutpoints(cmd.OutPoints)
1661 if e != nil {
1662 return nil, e
1663 }
1664 wsc.Server.NtfnMgr.RegisterSpentRequests(wsc, outpoints)
1665 return nil, nil
1666 }
1667 1668 // HandleRescan implements the rescan command extension for websocket connections.
1669 //
1670 // NOTE: This does not smartly handle reorgs, and fixing requires database changes (for safe, concurrent access to full
1671 // block ranges, and support for other chains than the best chain).
1672 //
1673 // It will, however, detect whether a reorg removed a block that was previously processed, and result in the handler
1674 // erroring.
1675 //
1676 // Clients must handle this by finding a block still in the chain (perhaps from a rescanprogress notification) to resume
1677 // their rescan. TODO: simplify, modularise this
1678 func HandleRescan(wsc *WSClient, icmd interface{}) (interface{}, error) {
1679 cmd, ok := icmd.(*btcjson.RescanCmd)
1680 if !ok {
1681 return nil, btcjson.ErrRPCInternal
1682 }
1683 outpoints := make([]*wire.OutPoint, 0, len(cmd.OutPoints))
1684 for i := range cmd.OutPoints {
1685 cmdOutpoint := &cmd.OutPoints[i]
1686 blockHash, e := chainhash.NewHashFromStr(cmdOutpoint.Hash)
1687 if e != nil {
1688 return nil, DecodeHexError(cmdOutpoint.Hash)
1689 }
1690 outpoint := wire.NewOutPoint(blockHash, cmdOutpoint.Index)
1691 outpoints = append(outpoints, outpoint)
1692 }
1693 numAddrs := len(cmd.Addresses)
1694 if numAddrs == 1 {
1695 D.Ln("beginning rescan for 1 address")
1696 } else {
1697 D.F("beginning rescan for %d addresses", numAddrs)
1698 }
1699 // Build lookup maps.
1700 lookups := RescanKeys{
1701 Fallbacks: map[string]struct{}{},
1702 PubKeyHashes: map[[ripemd160.Size]byte]struct{}{},
1703 ScriptHashes: map[[ripemd160.Size]byte]struct{}{},
1704 CompressedPubKeys: map[[33]byte]struct{}{},
1705 UncompressedPubKeys: map[[65]byte]struct{}{},
1706 Unspent: map[wire.OutPoint]struct{}{},
1707 }
1708 var compressedPubkey [33]byte
1709 var uncompressedPubkey [65]byte
1710 params := wsc.Server.Cfg.ChainParams
1711 for _, addrStr := range cmd.Addresses {
1712 addr, e := btcaddr.Decode(addrStr, params)
1713 if e != nil {
1714 jsonErr := btcjson.RPCError{
1715 Code: btcjson.ErrRPCInvalidAddressOrKey,
1716 Message: "Rescan address " + addrStr + ": " +
1717 e.Error(),
1718 }
1719 return nil, &jsonErr
1720 }
1721 switch a := addr.(type) {
1722 case *btcaddr.PubKeyHash:
1723 lookups.PubKeyHashes[*a.Hash160()] = struct{}{}
1724 case *btcaddr.ScriptHash:
1725 lookups.ScriptHashes[*a.Hash160()] = struct{}{}
1726 case *btcaddr.PubKey:
1727 pubkeyBytes := a.ScriptAddress()
1728 switch len(pubkeyBytes) {
1729 case 33: // Compressed
1730 copy(compressedPubkey[:], pubkeyBytes)
1731 lookups.CompressedPubKeys[compressedPubkey] = struct{}{}
1732 case 65: // Uncompressed
1733 copy(uncompressedPubkey[:], pubkeyBytes)
1734 lookups.UncompressedPubKeys[uncompressedPubkey] = struct{}{}
1735 default:
1736 jsonErr := btcjson.RPCError{
1737 Code: btcjson.ErrRPCInvalidAddressOrKey,
1738 Message: "Pubkey " + addrStr + " is of unknown length",
1739 }
1740 return nil, &jsonErr
1741 }
1742 default:
1743 // A new address type must have been added. Use encoded payment address string as a fallback until a fast
1744 // path is added.
1745 lookups.Fallbacks[addrStr] = struct{}{}
1746 }
1747 }
1748 for _, outpoint := range outpoints {
1749 lookups.Unspent[*outpoint] = struct{}{}
1750 }
1751 chain := wsc.Server.Cfg.Chain
1752 minBlockHash, e := chainhash.NewHashFromStr(cmd.BeginBlock)
1753 if e != nil {
1754 return nil, DecodeHexError(cmd.BeginBlock)
1755 }
1756 minBlock, e := chain.BlockHeightByHash(minBlockHash)
1757 if e != nil {
1758 return nil, &btcjson.RPCError{
1759 Code: btcjson.ErrRPCBlockNotFound,
1760 Message: "ScriptError getting block: " + e.Error(),
1761 }
1762 }
1763 maxBlock := int32(math.MaxInt32)
1764 if cmd.EndBlock != nil {
1765 maxBlockHash, e := chainhash.NewHashFromStr(*cmd.EndBlock)
1766 if e != nil {
1767 return nil, DecodeHexError(*cmd.EndBlock)
1768 }
1769 maxBlock, e = chain.BlockHeightByHash(maxBlockHash)
1770 if e != nil {
1771 return nil, &btcjson.RPCError{
1772 Code: btcjson.ErrRPCBlockNotFound,
1773 Message: "ScriptError getting block: " + e.Error(),
1774 }
1775 }
1776 }
1777 // lastBlock and lastBlockHash track the previously-rescanned block. They equal nil when no previous blocks have
1778 // been rescanned.
1779 var lastBlock *block.Block
1780 var lastBlockHash *chainhash.Hash
1781 // A ticker is created to wait at least 10 seconds before notifying the websocket client of the current progress
1782 // completed by the rescan.
1783 ticker := time.NewTicker(10 * time.Second)
1784 defer ticker.Stop()
1785 // Instead of fetching all block shas at once, fetch in smaller chunks to ensure large rescans consume a limited
1786 // amount of memory.
1787 fetchRange:
1788 for minBlock < maxBlock {
1789 // Limit the max number of hashes to fetch at once to the maximum number of items allowed in a single inventory.
1790 // This value could be higher since it's not creating inventory messages, but this mirrors the limiting logic
1791 // used in the peer-to-peer protocol.
1792 maxLoopBlock := maxBlock
1793 if maxLoopBlock-minBlock > wire.MaxInvPerMsg {
1794 maxLoopBlock = minBlock + wire.MaxInvPerMsg
1795 }
1796 hashList, e := chain.HeightRange(minBlock, maxLoopBlock)
1797 if e != nil {
1798 E.Ln("error looking up block range:", e)
1799 return nil, &btcjson.RPCError{
1800 Code: btcjson.ErrRPCDatabase,
1801 Message: "Database error: " + e.Error(),
1802 }
1803 }
1804 if len(hashList) == 0 {
1805 // The rescan is finished if no blocks hashes for this range were successfully fetched and a stop block was
1806 // provided.
1807 if maxBlock != math.MaxInt32 {
1808 break
1809 }
1810 // If the rescan is through the current block, set up the client to continue to receive notifications
1811 // regarding all rescanned addresses and the current set of unspent outputs.
1812 //
1813 // This is done safely by temporarily grabbing exclusive access of the block manager. If no more blocks have
1814 // been attached between this pause and the fetch above, then it is safe to register the websocket client
1815 // for continuous notifications if necessary.
1816 //
1817 // Otherwise, continue the fetch loop again to rescan the new blocks (or error due to an irrecoverable
1818 // reorganize).
1819 pauseGuard := wsc.Server.Cfg.SyncMgr.Pause()
1820 best := wsc.Server.Cfg.Chain.BestSnapshot()
1821 curHash := &best.Hash
1822 again := true
1823 if lastBlockHash == nil || *lastBlockHash == *curHash {
1824 again = false
1825 n := wsc.Server.NtfnMgr
1826 n.RegisterSpentRequests(wsc, lookups.UnspentSlice())
1827 n.RegisterTxOutAddressRequests(wsc, cmd.Addresses)
1828 }
1829 close(pauseGuard)
1830 // this err value is nil if it got to here from above at 1577
1831 // if e != nil {
1832 // Errorf(
1833 // "ScriptError fetching best block hash:", e,
1834 // }
1835 // return nil, &json.RPCError{
1836 // Code: json.ErrRPCDatabase,
1837 // Message: "Database error: " + err.ScriptError(),
1838 // }
1839 // }
1840 if again {
1841 continue
1842 }
1843 break
1844 }
1845 loopHashList:
1846 for i := range hashList {
1847 var blk *block.Block
1848 blk, e = chain.BlockByHash(&hashList[i])
1849 if e != nil {
1850 // Only handle reorgs if a block could not be found for the hash.
1851 if dbErr, ok := e.(database.DBError); !ok ||
1852 dbErr.ErrorCode != database.ErrBlockNotFound {
1853 E.Ln("error looking up block:", e)
1854 return nil, &btcjson.RPCError{
1855 Code: btcjson.ErrRPCDatabase,
1856 Message: "Database error: " +
1857 e.Error(),
1858 }
1859 }
1860 // If an absolute max block was specified, don't attempt to handle the reorg.
1861 if maxBlock != math.MaxInt32 {
1862 E.Ln("stopping rescan for reorged block", cmd.EndBlock)
1863 return nil, &ErrRescanReorg
1864 }
1865 // If the lookup for the previously valid block hash failed, there may have been a reorg. Fetch a new
1866 // range of block hashes and verify that the previously processed block (if there was any) still exists
1867 // in the database. If it doesn't, we error.
1868 //
1869 // A goto is used to branch execution back to before the range was evaluated, as it must be
1870 // reevaluated for the new hashList.
1871 minBlock += int32(i)
1872 hashList, e = RecoverFromReorg(
1873 chain,
1874 minBlock, maxBlock, lastBlockHash,
1875 )
1876 if e != nil {
1877 return nil, e
1878 }
1879 if len(hashList) == 0 {
1880 break fetchRange
1881 }
1882 goto loopHashList
1883 }
1884 if i == 0 && lastBlockHash != nil {
1885 // Ensure the new hashList is on the same fork as the last block from the old hashList.
1886 jsonErr := DescendantBlock(lastBlockHash, blk)
1887 if jsonErr != nil {
1888 return nil, jsonErr
1889 }
1890 }
1891 // A select statement is used to stop rescans if the client requesting the rescan has disconnected.
1892 select {
1893 case <-wsc.Quit.Wait():
1894 D.F("stopped rescan at height %v for disconnected client", blk.Height())
1895 return nil, nil
1896 default:
1897 RescanBlock(wsc, &lookups, blk)
1898 lastBlock = blk
1899 lastBlockHash = blk.Hash()
1900 }
1901 // Periodically notify the client of the progress completed. Continue with next block if no progress
1902 // notification is needed yet.
1903 select {
1904 case <-ticker.C: // fallthrough
1905 default:
1906 continue
1907 }
1908 n := btcjson.NewRescanProgressNtfn(
1909 hashList[i].String(),
1910 blk.Height(), blk.WireBlock().Header.Timestamp.Unix(),
1911 )
1912 mn, e := btcjson.MarshalCmd(nil, n)
1913 if e != nil {
1914 E.F(
1915 "failed to marshal rescan progress notification: %v",
1916 e,
1917 )
1918 continue
1919 }
1920 if e = wsc.QueueNotification(mn); e == ErrClientQuit {
1921 // Finished if the client disconnected.
1922 D.F(
1923 "stopped rescan at height %v for disconnected client",
1924 blk.Height(),
1925 )
1926 return nil, nil
1927 }
1928 }
1929 minBlock += int32(len(hashList))
1930 }
1931 // Notify websocket client of the finished rescan. Due to how pod asynchronously queues notifications to not block
1932 // calling code, there is no guarantee that any of the notifications created during rescan (such as rescanprogress,
1933 // recvtx and redeemingtx) will be received before the rescan RPC returns.
1934 //
1935 // Therefore, another method is needed to safely inform clients that all rescan notifications have been sent.
1936 n := btcjson.NewRescanFinishedNtfn(
1937 lastBlockHash.String(),
1938 lastBlock.Height(),
1939 lastBlock.WireBlock().Header.Timestamp.Unix(),
1940 )
1941 if mn, e := btcjson.MarshalCmd(nil, n); E.Chk(e) {
1942 E.F(
1943 "failed to marshal rescan finished notification: %v", e,
1944 )
1945 } else {
1946 // The rescan is finished, so we don't care whether the client has disconnected at this point, so discard error.
1947 _ = wsc.QueueNotification(mn)
1948 }
1949 I.Ln("finished rescan")
1950 return nil, nil
1951 }
1952 1953 // HandleRescanBlocks implements the rescanblocks command extension for websocket connections.
1954 //
1955 // NOTE: This extension is ported from github.com/decred/dcrd
1956 func HandleRescanBlocks(wsc *WSClient, icmd interface{}) (interface{}, error) {
1957 cmd, ok := icmd.(*btcjson.RescanBlocksCmd)
1958 if !ok {
1959 return nil, btcjson.ErrRPCInternal
1960 }
1961 // Load client's transaction filter. Must exist in order to continue.
1962 wsc.Lock()
1963 filter := wsc.FilterData
1964 wsc.Unlock()
1965 if filter == nil {
1966 return nil, &btcjson.RPCError{
1967 Code: btcjson.ErrRPCMisc,
1968 Message: "Transaction filter must be loaded before rescanning",
1969 }
1970 }
1971 blockHashes := make([]*chainhash.Hash, len(cmd.BlockHashes))
1972 for i := range cmd.BlockHashes {
1973 hash, e := chainhash.NewHashFromStr(cmd.BlockHashes[i])
1974 if e != nil {
1975 return nil, e
1976 }
1977 blockHashes[i] = hash
1978 }
1979 discoveredData := make([]btcjson.RescannedBlock, 0, len(blockHashes))
1980 // Iterate over each block in the request and rescan. When a block contains relevant transactions, add it to the
1981 // response.
1982 bc := wsc.Server.Cfg.Chain
1983 params := wsc.Server.Cfg.ChainParams
1984 var lastBlockHash *chainhash.Hash
1985 for i := range blockHashes {
1986 block, e := bc.BlockByHash(blockHashes[i])
1987 if e != nil {
1988 return nil, &btcjson.RPCError{
1989 Code: btcjson.ErrRPCBlockNotFound,
1990 Message: "Failed to fetch block: " + e.Error(),
1991 }
1992 }
1993 if lastBlockHash != nil && block.WireBlock().Header.
1994 PrevBlock != *lastBlockHash {
1995 return nil, &btcjson.RPCError{
1996 Code: btcjson.ErrRPCInvalidParameter,
1997 Message: fmt.Sprintf(
1998 "Block %v is not a child of %v",
1999 blockHashes[i], lastBlockHash,
2000 ),
2001 }
2002 }
2003 lastBlockHash = blockHashes[i]
2004 transactions := RescanBlockFilter(filter, block, params)
2005 if len(transactions) != 0 {
2006 discoveredData = append(
2007 discoveredData, btcjson.RescannedBlock{
2008 Hash: cmd.BlockHashes[i],
2009 Transactions: transactions,
2010 },
2011 )
2012 }
2013 }
2014 return &discoveredData, nil
2015 }
2016 2017 // HandleSession implements the session command extension for websocket connections.
2018 func HandleSession(wsc *WSClient, icmd interface{}) (interface{}, error) {
2019 return &btcjson.SessionResult{SessionID: wsc.SessionID}, nil
2020 }
2021 2022 // HandleStopNotifyBlocks implements the stopnotifyblocks command extension for websocket connections.
2023 func HandleStopNotifyBlocks(wsc *WSClient, icmd interface{}) (
2024 interface{},
2025 error,
2026 ) {
2027 wsc.Server.NtfnMgr.UnregisterBlockUpdates(wsc)
2028 return nil, nil
2029 }
2030 2031 // HandleStopNotifyNewTransactions implements the stopnotifynewtransactions command extension for websocket connections.
2032 func HandleStopNotifyNewTransactions(
2033 wsc *WSClient, icmd interface{},
2034 ) (interface{}, error) {
2035 wsc.Server.NtfnMgr.UnregisterNewMempoolTxsUpdates(wsc)
2036 return nil, nil
2037 }
2038 2039 // HandleStopNotifyReceived implements the stopnotifyreceived command extension for websocket connections.
2040 func HandleStopNotifyReceived(wsc *WSClient, icmd interface{}) (
2041 interface{},
2042 error,
2043 ) {
2044 cmd, ok := icmd.(*btcjson.StopNotifyReceivedCmd)
2045 if !ok {
2046 return nil, btcjson.ErrRPCInternal
2047 }
2048 // Decode addresses to validate input, but the strings slice is used directly if these are all ok.
2049 e := CheckAddressValidity(cmd.Addresses, wsc.Server.Cfg.ChainParams)
2050 if e != nil {
2051 return nil, e
2052 }
2053 for _, addr := range cmd.Addresses {
2054 wsc.Server.NtfnMgr.UnregisterTxOutAddressRequest(wsc, addr)
2055 }
2056 return nil, nil
2057 }
2058 2059 // HandleStopNotifySpent implements the stopnotifyspent command extension for websocket connections.
2060 func HandleStopNotifySpent(wsc *WSClient, icmd interface{}) (
2061 interface{}, error,
2062 ) {
2063 cmd, ok := icmd.(*btcjson.StopNotifySpentCmd)
2064 if !ok {
2065 return nil, btcjson.ErrRPCInternal
2066 }
2067 outpoints, e := DeserializeOutpoints(cmd.OutPoints)
2068 if e != nil {
2069 return nil, e
2070 }
2071 for _, outpoint := range outpoints {
2072 wsc.Server.NtfnMgr.UnregisterSpentRequest(wsc, outpoint)
2073 }
2074 return nil, nil
2075 }
2076 2077 // HandleWebsocketHelp implements the help command for websocket connections.
2078 func HandleWebsocketHelp(wsc *WSClient, icmd interface{}) (interface{}, error) {
2079 cmd, ok := icmd.(*btcjson.HelpCmd)
2080 if !ok {
2081 return nil, btcjson.ErrRPCInternal
2082 }
2083 // Provide a usage overview of all commands when no specific command was specified.
2084 var command string
2085 if cmd.Command != nil {
2086 command = *cmd.Command
2087 }
2088 if command == "" {
2089 usage, e := wsc.Server.HelpCacher.RPCUsage(true)
2090 if e != nil {
2091 context := "Failed to generate RPC usage"
2092 return nil, InternalRPCError(e.Error(), context)
2093 }
2094 return usage, nil
2095 }
2096 // Chk that the command asked for is supported and implemented. Search the list of websocket handlers as well as
2097 // the main list of handlers since help should only be provided for those cases.
2098 valid := true
2099 if _, ok := RPCHandlers[command]; !ok {
2100 if _, ok := WSHandlers[command]; !ok {
2101 valid = false
2102 }
2103 }
2104 if !valid {
2105 return nil, &btcjson.RPCError{
2106 Code: btcjson.ErrRPCInvalidParameter,
2107 Message: "Unknown command: " + command,
2108 }
2109 }
2110 // Get the help for the command.
2111 help, e := wsc.Server.HelpCacher.RPCMethodHelp(command)
2112 if e != nil {
2113 context := "Failed to generate help"
2114 return nil, InternalRPCError(e.Error(), context)
2115 }
2116 return help, nil
2117 }
2118 2119 func init() {
2120 2121 WSHandlers = WSHandlersBeforeInit
2122 }
2123 func MakeSemaphore(n int) Semaphore {
2124 return Semaphore(qu.Ts(n))
2125 }
2126 2127 // NewRedeemingTxNotification returns a new marshalled redeemingtx notification with the passed parameters.
2128 func NewRedeemingTxNotification(
2129 txHex string, index int,
2130 block *block.Block,
2131 ) ([]byte, error) {
2132 // Create and marshal the notification.
2133 ntfn := btcjson.NewRedeemingTxNtfn(txHex, BlockDetails(block, index))
2134 return btcjson.MarshalCmd(nil, ntfn)
2135 }
2136 2137 // NewWSClientFilter creates a new, empty wsClientFilter struct to be used for a websocket client.
2138 //
2139 // NOTE: This extension was ported from github.com/decred/ dcrd
2140 func NewWSClientFilter(
2141 addresses []string, unspentOutPoints []wire.OutPoint,
2142 params *chaincfg.Params,
2143 ) *WSClientFilter {
2144 filter := &WSClientFilter{
2145 PubKeyHashes: map[[ripemd160.Size]byte]struct{}{},
2146 ScriptHashes: map[[ripemd160.Size]byte]struct{}{},
2147 CompressedPubKeys: map[[33]byte]struct{}{},
2148 UncompressedPubKeys: map[[65]byte]struct{}{},
2149 OtherAddresses: map[string]struct{}{},
2150 Unspent: make(
2151 map[wire.OutPoint]struct{},
2152 len(unspentOutPoints),
2153 ),
2154 }
2155 for _, s := range addresses {
2156 filter.AddAddressStr(s, params)
2157 }
2158 for i := range unspentOutPoints {
2159 filter.AddUnspentOutPoint(&unspentOutPoints[i])
2160 }
2161 return filter
2162 }
2163 2164 // NewWebsocketClient returns a new websocket client given the notification manager, websocket connection, remote
2165 // address, and whether or not the client has already been authenticated (via HTTP Basic access authentication). The
2166 // returned client is ready to start.
2167 //
2168 // Once started, the client will process incoming and outgoing messages in separate goroutines complete with queuing and
2169 // asynchrous handling for long-running operations.
2170 func NewWebsocketClient(
2171 server *Server, conn *websocket.Conn,
2172 remoteAddr string, authenticated bool, isAdmin bool,
2173 ) (*WSClient, error) {
2174 sessionID, e := wire.RandomUint64()
2175 if e != nil {
2176 return nil, e
2177 }
2178 client := &WSClient{
2179 Conn: conn,
2180 Addr: remoteAddr,
2181 Authenticated: authenticated,
2182 IsAdmin: isAdmin,
2183 SessionID: sessionID,
2184 Server: server,
2185 AddrRequests: make(map[string]struct{}),
2186 SpentRequests: make(map[wire.OutPoint]struct{}),
2187 ServiceRequestSem: MakeSemaphore(server.Config.RPCMaxConcurrentReqs.V()),
2188 NtfnChan: make(chan []byte, 1), // nonblocking sync
2189 SendChan: make(chan WSResponse, WebsocketSendBufferSize),
2190 Quit: qu.T(),
2191 }
2192 return client, nil
2193 }
2194 2195 // NewWSNotificationManager returns a new notification manager ready for use. See wsNotificationManager for more
2196 // details.
2197 func NewWSNotificationManager(server *Server) *WSNtfnMgr {
2198 return &WSNtfnMgr{
2199 Server: server,
2200 QueueNotification: make(chan interface{}),
2201 NotificationMsgs: make(chan interface{}),
2202 NumClients: make(chan int),
2203 Quit: qu.T(),
2204 }
2205 }
2206 2207 // QueueHandler manages a queue of empty interfaces, reading from in and sending the oldest unsent to out.
2208 //
2209 // This handler stops when either of the in or quit channels are closed, and closes out before returning, without
2210 // waiting to send any variables still remaining in the queue.
2211 func QueueHandler(in <-chan interface{}, out chan<- interface{}, quit qu.C) {
2212 var q []interface{}
2213 var dequeue chan<- interface{}
2214 skipQueue := out
2215 var next interface{}
2216 out:
2217 for {
2218 select {
2219 case n, ok := <-in:
2220 if !ok {
2221 // Sender closed input channel.
2222 break out
2223 }
2224 // Either send to out immediately if skipQueue is non-nil (queue is empty) and reader is ready, or append to
2225 // the queue and send later.
2226 select {
2227 case skipQueue <- n:
2228 default:
2229 q = append(q, n)
2230 dequeue = out
2231 skipQueue = nil
2232 next = q[0]
2233 }
2234 case dequeue <- next:
2235 copy(q, q[1:])
2236 q[len(q)-1] = nil // avoid leak
2237 q = q[:len(q)-1]
2238 if len(q) == 0 {
2239 dequeue = nil
2240 skipQueue = out
2241 } else {
2242 next = q[0]
2243 }
2244 case <-quit.Wait():
2245 break out
2246 }
2247 }
2248 close(out)
2249 }
2250 2251 // RecoverFromReorg attempts to recover from a detected reorganize during a rescan.
2252 //
2253 // It fetches a new range of block shas from the database and verifies that the new range of blocks is on the same fork
2254 // as a previous range of blocks.
2255 //
2256 // If this condition does not hold true, the JSON-RPC error for an unrecoverable reorganize is returned.
2257 func RecoverFromReorg(
2258 chain *blockchain.BlockChain, minBlock, maxBlock int32,
2259 lastBlock *chainhash.Hash,
2260 ) ([]chainhash.Hash, error) {
2261 hashList, e := chain.HeightRange(minBlock, maxBlock)
2262 if e != nil {
2263 E.Ln("error looking up block range:", e)
2264 return nil, &btcjson.RPCError{
2265 Code: btcjson.ErrRPCDatabase,
2266 Message: "Database error: " + e.Error(),
2267 }
2268 }
2269 if lastBlock == nil || len(hashList) == 0 {
2270 return hashList, nil
2271 }
2272 blk, e := chain.BlockByHash(&hashList[0])
2273 if e != nil {
2274 E.Ln("error looking up possibly reorged block:", e)
2275 return nil, &btcjson.RPCError{
2276 Code: btcjson.ErrRPCDatabase,
2277 Message: "Database error: " + e.Error(),
2278 }
2279 }
2280 jsonErr := DescendantBlock(lastBlock, blk)
2281 if jsonErr != nil {
2282 return nil, jsonErr
2283 }
2284 return hashList, nil
2285 }
2286 2287 // RescanBlock rescans all transactions in a single block. This is a helper function for handleRescan.
2288 func RescanBlock(wsc *WSClient, lookups *RescanKeys, blk *block.Block) {
2289 for _, tx := range blk.Transactions() {
2290 // Hexadecimal representation of this tx. Only created if needed, and reused for later notifications if already
2291 // made.
2292 var txHex string
2293 // All inputs and outputs must be iterated through to correctly modify the unspent map, however, just a single
2294 // notification for any matching transaction inputs or outputs should be created and sent.
2295 spentNotified := false
2296 recvNotified := false
2297 for _, txin := range tx.MsgTx().TxIn {
2298 if _, ok := lookups.Unspent[txin.PreviousOutPoint]; ok {
2299 delete(lookups.Unspent, txin.PreviousOutPoint)
2300 if spentNotified {
2301 continue
2302 }
2303 if txHex == "" {
2304 txHex = TxHexString(tx.MsgTx())
2305 }
2306 marshalledJSON, e := NewRedeemingTxNotification(
2307 txHex,
2308 tx.Index(), blk,
2309 )
2310 if e != nil {
2311 E.Ln(
2312 "failed to marshal redeemingtx notification:",
2313 e,
2314 )
2315 continue
2316 }
2317 e = wsc.QueueNotification(marshalledJSON)
2318 // Stop the rescan early if the websocket client disconnected.
2319 if e == ErrClientQuit {
2320 return
2321 }
2322 spentNotified = true
2323 }
2324 }
2325 for txOutIdx, txout := range tx.MsgTx().TxOut {
2326 _, addrs, _, _ := txscript.ExtractPkScriptAddrs(
2327 txout.PkScript, wsc.Server.Cfg.ChainParams,
2328 )
2329 for _, addr := range addrs {
2330 switch a := addr.(type) {
2331 case *btcaddr.PubKeyHash:
2332 if _, ok := lookups.PubKeyHashes[*a.Hash160()]; !ok {
2333 continue
2334 }
2335 case *btcaddr.ScriptHash:
2336 if _, ok := lookups.ScriptHashes[*a.Hash160()]; !ok {
2337 continue
2338 }
2339 case *btcaddr.PubKey:
2340 found := false
2341 switch sa := a.ScriptAddress(); len(sa) {
2342 case 33: // Compressed
2343 var key [33]byte
2344 copy(key[:], sa)
2345 if _, ok := lookups.CompressedPubKeys[key]; ok {
2346 found = true
2347 }
2348 case 65: // Uncompressed
2349 var key [65]byte
2350 copy(key[:], sa)
2351 if _, ok := lookups.UncompressedPubKeys[key]; ok {
2352 found = true
2353 }
2354 default:
2355 W.F("skipping rescanned pubkey of unknown serialized length", len(sa))
2356 continue
2357 }
2358 // If the transaction output pays to the pubkey of a rescanned P2PKH address, include it as well.
2359 if !found {
2360 pkh := a.PubKeyHash()
2361 if _, ok := lookups.PubKeyHashes[*pkh.Hash160()]; !ok {
2362 continue
2363 }
2364 }
2365 default:
2366 // A new address type must have been added. Encode as a payment address string and check the
2367 // fallback map.
2368 addrStr := addr.EncodeAddress()
2369 _, ok := lookups.Fallbacks[addrStr]
2370 if !ok {
2371 continue
2372 }
2373 }
2374 outpoint := wire.OutPoint{
2375 Hash: *tx.Hash(),
2376 Index: uint32(txOutIdx),
2377 }
2378 lookups.Unspent[outpoint] = struct{}{}
2379 if recvNotified {
2380 continue
2381 }
2382 if txHex == "" {
2383 txHex = TxHexString(tx.MsgTx())
2384 }
2385 ntfn := btcjson.NewRecvTxNtfn(
2386 txHex,
2387 BlockDetails(blk, tx.Index()),
2388 )
2389 marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
2390 if e != nil {
2391 E.Ln("failed to marshal recvtx notification:", e)
2392 return
2393 }
2394 e = wsc.QueueNotification(marshalledJSON)
2395 // Stop the rescan early if the websocket client disconnected.
2396 if e == ErrClientQuit {
2397 return
2398 }
2399 recvNotified = true
2400 }
2401 }
2402 }
2403 }
2404 2405 // RescanBlockFilter rescans a block for any relevant transactions for the passed lookup keys. Any discovered
2406 // transactions are returned hex encoded as a string slice.
2407 //
2408 // NOTE: This extension is ported from github.com/decred/dcrd
2409 func RescanBlockFilter(
2410 filter *WSClientFilter, block *block.Block,
2411 params *chaincfg.Params,
2412 ) []string {
2413 var transactions []string
2414 filter.mu.Lock()
2415 for _, tx := range block.Transactions() {
2416 msgTx := tx.MsgTx()
2417 // Keep track of whether the transaction has already been added to the result. It shouldn't be added twice.
2418 added := false
2419 // Scan inputs if not a coinbase transaction.
2420 if !blockchain.IsCoinBaseTx(msgTx) {
2421 for _, input := range msgTx.TxIn {
2422 if !filter.ExistsUnspentOutPoint(&input.PreviousOutPoint) {
2423 continue
2424 }
2425 if !added {
2426 transactions = append(
2427 transactions,
2428 TxHexString(msgTx),
2429 )
2430 added = true
2431 }
2432 }
2433 }
2434 var e error
2435 // Scan outputs.
2436 for i, output := range msgTx.TxOut {
2437 var addrs []btcaddr.Address
2438 _, addrs, _, e = txscript.ExtractPkScriptAddrs(
2439 output.PkScript, params,
2440 )
2441 if e != nil {
2442 continue
2443 }
2444 for _, a := range addrs {
2445 if !filter.ExistsAddress(a) {
2446 continue
2447 }
2448 op := wire.OutPoint{
2449 Hash: *tx.Hash(),
2450 Index: uint32(i),
2451 }
2452 filter.AddUnspentOutPoint(&op)
2453 if !added {
2454 transactions = append(
2455 transactions,
2456 TxHexString(msgTx),
2457 )
2458 added = true
2459 }
2460 }
2461 }
2462 }
2463 filter.mu.Unlock()
2464 return transactions
2465 }
2466 2467 // TxHexString returns the serialized transaction encoded in hexadecimal.
2468 func TxHexString(tx *wire.MsgTx) string {
2469 buf := bytes.NewBuffer(make([]byte, 0, tx.SerializeSize()))
2470 // Ignore Serialize's error, as writing to a bytes.buffer cannot fail.
2471 e := tx.Serialize(buf)
2472 if e != nil {
2473 }
2474 return hex.EncodeToString(buf.Bytes())
2475 }
2476