rpcws.go raw

   1  package chainrpc
   2  
   3  import (
   4  	"bytes"
   5  	"container/list"
   6  	"crypto/sha256"
   7  	"crypto/subtle"
   8  	"encoding/base64"
   9  	"encoding/hex"
  10  	"encoding/json"
  11  	"errors"
  12  	"fmt"
  13  	"io"
  14  	"math"
  15  	"sync"
  16  	"time"
  17  
  18  	amount2 "github.com/p9c/p9/pkg/amt"
  19  	"github.com/p9c/p9/pkg/block"
  20  	"github.com/p9c/p9/pkg/btcaddr"
  21  	"github.com/p9c/p9/pkg/chaincfg"
  22  
  23  	"github.com/p9c/p9/pkg/qu"
  24  
  25  	"github.com/btcsuite/websocket"
  26  	"golang.org/x/crypto/ripemd160"
  27  
  28  	"github.com/p9c/p9/pkg/blockchain"
  29  	"github.com/p9c/p9/pkg/btcjson"
  30  	"github.com/p9c/p9/pkg/chainhash"
  31  	"github.com/p9c/p9/pkg/database"
  32  	"github.com/p9c/p9/pkg/txscript"
  33  	"github.com/p9c/p9/pkg/util"
  34  	"github.com/p9c/p9/pkg/wire"
  35  )
  36  
  37  // Notification types
  38  
  39  type NotificationBlockConnected block.Block
  40  type NotificationBlockDisconnected block.Block
  41  type NotificationRegisterAddr struct {
  42  	WSC   *WSClient
  43  	Addrs []string
  44  }
  45  type NotificationRegisterBlocks WSClient
  46  
  47  // Notification control requests
  48  
  49  type NotificationRegisterClient WSClient
  50  type NotificationRegisterNewMempoolTxs WSClient
  51  type NotificationRegisterSpent struct {
  52  	WSC *WSClient
  53  	OPs []*wire.OutPoint
  54  }
  55  type NotificationTxAcceptedByMempool struct {
  56  	IsNew bool
  57  	Tx    *util.Tx
  58  }
  59  type NotificationUnregisterAddr struct {
  60  	WSC  *WSClient
  61  	Addr string
  62  }
  63  type NotificationUnregisterBlocks WSClient
  64  type NotificationUnregisterClient WSClient
  65  type NotificationUnregisterNewMempoolTxs WSClient
  66  type NotificationUnregisterSpent struct {
  67  	WSC *WSClient
  68  	OP  *wire.OutPoint
  69  }
  70  type RescanKeys struct {
  71  	Fallbacks           map[string]struct{}
  72  	PubKeyHashes        map[[ripemd160.Size]byte]struct{}
  73  	ScriptHashes        map[[ripemd160.Size]byte]struct{}
  74  	CompressedPubKeys   map[[33]byte]struct{}
  75  	UncompressedPubKeys map[[65]byte]struct{}
  76  	Unspent             map[wire.OutPoint]struct{}
  77  }
  78  type Semaphore qu.C
  79  
  80  // WSClient provides an abstraction for handling a websocket client. The overall data flow is split into 3 main
  81  // goroutines, a possible 4th goroutine for long-running operations (only started if request is made), and a websocket
  82  // manager which is used to allow things such as broadcasting requested notifications to all connected websocket
  83  // clients.
  84  //
  85  // Inbound messages are read via the inHandler goroutine and generally dispatched to their own handler. However, certain
  86  // potentially long-running operations such as rescans, are sent to the asyncHander goroutine and are limited to one at
  87  // a time.
  88  //
  89  // There are two outbound message types - one for responding to client requests and another for async notifications.
  90  // Responses to client requests use SendMessage which employs a buffered channel thereby limiting the number of
  91  // outstanding requests that can be made.
  92  //
  93  // Notifications are sent via QueueNotification which implements a queue via notificationQueueHandler to ensure sending
  94  // notifications from other subsystems can't block.
  95  //
  96  // Ultimately, all messages are sent via the outHandler.
  97  type WSClient struct {
  98  	// ws client requires a mutex lock for access
  99  	sync.Mutex
 100  	// Server is the RPC Server that is servicing the client.
 101  	Server *Server
 102  	// Conn is the underlying websocket connection.
 103  	Conn *websocket.Conn
 104  	// Addr is the remote address of the client.
 105  	Addr string
 106  	// SessionID is a random ID generated for each client when connected. These IDs may be queried by a client using the
 107  	// session RPC. A change to the session ID indicates that the client reconnected.
 108  	SessionID    uint64
 109  	AddrRequests map[string]struct{}
 110  	// SpentRequests is a set of unspent Outpoints a wallet has requested notifications for when they are spent by a
 111  	// processed transaction. Owned by the notification manager.
 112  	SpentRequests map[wire.OutPoint]struct{}
 113  	// FilterData is the new generation transaction filter backported from github.com/decred/dcrd for the new backported
 114  	// `loadtxfilter` and `rescanblocks` methods.
 115  	FilterData *WSClientFilter
 116  	// Networking infrastructure.
 117  	ServiceRequestSem Semaphore
 118  	NtfnChan          chan []byte
 119  	SendChan          chan WSResponse
 120  	Quit              qu.C
 121  	WG                sync.WaitGroup
 122  	// Disconnected indicated whether or not the websocket client is Disconnected.
 123  	Disconnected bool
 124  	// Authenticated specifies whether a client has been Authenticated and therefore is allowed to communicated over the
 125  	// websocket.
 126  	Authenticated bool
 127  	// IsAdmin specifies whether a client may change the state of the Server; false means its access is only to the
 128  	// limited set of RPC calls.
 129  	IsAdmin bool
 130  	// VerboseTxUpdates specifies whether a client has requested verbose information about all new transactions.
 131  	VerboseTxUpdates bool
 132  	// AddrRequests is a set of addresses the caller has requested to be notified about. It is maintained here so all
 133  	// requests can be removed when a wallet disconnects. Owned by the notification manager.
 134  }
 135  
 136  // WSClientFilter tracks relevant addresses for each websocket client for the `rescanblocks` extension. It is modified
 137  // by the `loadtxfilter` command. NOTE: This extension was ported from github.com/decred/dcrd
 138  type WSClientFilter struct {
 139  	mu sync.Mutex
 140  	// Implemented fast paths for address lookup.
 141  	PubKeyHashes        map[[ripemd160.Size]byte]struct{}
 142  	ScriptHashes        map[[ripemd160.Size]byte]struct{}
 143  	CompressedPubKeys   map[[33]byte]struct{}
 144  	UncompressedPubKeys map[[65]byte]struct{}
 145  	// A fallback address lookup map in case a fast path doesn't exist. Only exists for completeness. If using this
 146  	// shows up in a profile, there's a good chance a fast path should be added.
 147  	OtherAddresses map[string]struct{}
 148  	// Outpoints of Unspent outputs.
 149  	Unspent map[wire.OutPoint]struct{}
 150  }
 151  
 152  // WSCommandHandler describes a callback function used to handle a specific command.
 153  type WSCommandHandler func(*WSClient, interface{}) (interface{}, error)
 154  
 155  // WSNtfnMgr is a connection and notification manager used for websockets. It allows websocket clients to register for
 156  // notifications they are interested in.
 157  //
 158  // When an event happens elsewhere in the code such as transactions being added to the memory pool or block
 159  // connects/disconnects, the notification manager is provided with the relevant details needed to figure out which
 160  // websocket clients need to be notified based on what they have registered for and notifies them accordingly.
 161  //
 162  // It is also used to keep track of all connected websocket clients.
 163  type WSNtfnMgr struct {
 164  	// Server is the RPC Server the notification manager is associated with.
 165  	Server *Server
 166  	// QueueNotification queues a notification for handling.
 167  	QueueNotification chan interface{}
 168  	// NotificationMsgs feeds notificationHandler with notifications and client (un)registration requests from a queue
 169  	// as well as registration and unregistering requests from clients.
 170  	NotificationMsgs chan interface{}
 171  	// Access channel for current number of connected clients.
 172  	NumClients chan int
 173  	// Shutdown handling
 174  	WG   sync.WaitGroup
 175  	Quit qu.C
 176  }
 177  
 178  // WSResponse houses a message to send to a connected websocket client as well as a channel to reply on when the message
 179  // is sent.
 180  type WSResponse struct {
 181  	Msg      []byte
 182  	DoneChan chan bool
 183  }
 184  
 185  const (
 186  	// WebsocketSendBufferSize is the number of elements the send channel can queue before blocking. Note that this only
 187  	// applies to requests handled directly in the websocket client input handler or the async handler since
 188  	// notifications have their own queuing mechanism independent of the send channel buffer.
 189  	WebsocketSendBufferSize = 50
 190  )
 191  
 192  // ErrClientQuit describes the error where a client send is not processed due to the client having already been
 193  // disconnected or dropped.
 194  var ErrClientQuit = errors.New("client quit")
 195  
 196  // ErrRescanReorg defines the error that is returned when an unrecoverable reorganize is detected during a rescan.
 197  var ErrRescanReorg = btcjson.RPCError{
 198  	Code:    btcjson.ErrRPCDatabase,
 199  	Message: "Reorganize",
 200  }
 201  
 202  // TimeZeroVal is simply the zero value for a time.Time and is used to avoid creating multiple instances.
 203  var TimeZeroVal = time.Time{}
 204  
 205  // WSHandlers maps RPC command strings to appropriate websocket handler functions. This is set by init because help
 206  // references WSHandlers and thus causes a dependency loop.
 207  var WSHandlers map[string]WSCommandHandler
 208  
 209  var WSHandlersBeforeInit = map[string]WSCommandHandler{
 210  	"loadtxfilter":              HandleLoadTxFilter,
 211  	"help":                      HandleWebsocketHelp,
 212  	"notifyblocks":              HandleNotifyBlocks,
 213  	"notifynewtransactions":     HandleNotifyNewTransactions,
 214  	"notifyreceived":            HandleNotifyReceived,
 215  	"notifyspent":               HandleNotifySpent,
 216  	"session":                   HandleSession,
 217  	"stopnotifyblocks":          HandleStopNotifyBlocks,
 218  	"stopnotifynewtransactions": HandleStopNotifyNewTransactions,
 219  	"stopnotifyspent":           HandleStopNotifySpent,
 220  	"stopnotifyreceived":        HandleStopNotifyReceived,
 221  	"rescan":                    HandleRescan,
 222  	"rescanblocks":              HandleRescanBlocks,
 223  }
 224  
 225  // UnspentSlice returns a slice of currently-unspent outpoints for the rescan lookup keys. This is primarily intended to
 226  // be used to register outpoints for continuous notifications after a rescan has completed.
 227  func (r *RescanKeys) UnspentSlice() []*wire.OutPoint {
 228  	ops := make([]*wire.OutPoint, 0, len(r.Unspent))
 229  	for op := range r.Unspent {
 230  		opCopy := op
 231  		ops = append(ops, &opCopy)
 232  	}
 233  	return ops
 234  }
 235  
 236  // WebsocketHandler handles a new websocket client by creating a new wsClient, starting it, and blocking until the
 237  // connection closes. Since it blocks, it must be run in a separate goroutine.
 238  //
 239  // It should be invoked from the websocket Server handler which runs each new connection in a new goroutine thereby
 240  // satisfying the requirement.
 241  func (s *Server) WebsocketHandler(
 242  	conn *websocket.Conn, remoteAddr string,
 243  	authenticated bool, isAdmin bool,
 244  ) {
 245  	// Clear the read deadline that was set before the websocket hijacked the connection.
 246  	e := conn.SetReadDeadline(TimeZeroVal)
 247  	if e != nil {
 248  		D.Ln(e)
 249  	}
 250  	// Limit max number of websocket clients.
 251  	T.Ln("new websocket client", remoteAddr)
 252  	if s.NtfnMgr.GetNumClients()+1 > s.Config.RPCMaxWebsockets.V() {
 253  		I.F(
 254  			"max websocket clients exceeded [%d] - disconnecting client"+
 255  				" %s",
 256  			s.Config.RPCMaxWebsockets,
 257  			remoteAddr,
 258  		)
 259  		if e = conn.Close(); E.Chk(e) {
 260  		}
 261  		return
 262  	}
 263  	// Create a new websocket client to handle the new websocket connection and wait for it to shutdown.
 264  	//
 265  	// Once it has shutdown (and hence disconnected), remove it and any notifications it registered for.
 266  	client, e := NewWebsocketClient(s, conn, remoteAddr, authenticated, isAdmin)
 267  	if e != nil {
 268  		E.F("failed to serve client %s: %v %s", remoteAddr, e)
 269  		if e := conn.Close(); E.Chk(e) {
 270  		}
 271  		return
 272  	}
 273  	s.NtfnMgr.AddClient(client)
 274  	client.Start()
 275  	client.WaitForShutdown()
 276  	s.NtfnMgr.RemoveClient(client)
 277  	T.Ln("disconnected websocket client", remoteAddr)
 278  }
 279  
 280  // Disconnect disconnects the websocket client.
 281  func (c *WSClient) Disconnect() {
 282  	c.Lock()
 283  	defer c.Unlock()
 284  	// Nothing to do if already disconnected.
 285  	if c.Disconnected {
 286  		return
 287  	}
 288  	T.Ln("disconnecting websocket client", c.Addr)
 289  	c.Quit.Q()
 290  	if e := c.Conn.Close(); E.Chk(e) {
 291  	}
 292  	c.Disconnected = true
 293  }
 294  
 295  // IsDisconnected returns whether or not the websocket client is disconnected.
 296  func (c *WSClient) IsDisconnected() bool {
 297  	c.Lock()
 298  	isDisconnected := c.Disconnected
 299  	c.Unlock()
 300  	return isDisconnected
 301  }
 302  
 303  // QueueNotification queues the passed notification to be sent to the websocket client.
 304  //
 305  // This function, as the name implies, is only intended for notifications since it has additional logic to prevent other
 306  // subsystems, such as the memory pool and block manager, from blocking even when the send channel is full.
 307  //
 308  // If the client is in the process of shutting down, this function returns ErrClientQuit. This is intended to be checked
 309  // by long-running notification handlers to stop processing if there is no more work needed to be done.
 310  func (c *WSClient) QueueNotification(marshalledJSON []byte) (e error) {
 311  	// Don't queue the message if disconnected.
 312  	if c.IsDisconnected() {
 313  		return ErrClientQuit
 314  	}
 315  	c.NtfnChan <- marshalledJSON
 316  	return nil
 317  }
 318  
 319  // SendMessage sends the passed json to the websocket client. It is backed by a buffered channel, so it will not block
 320  // until the send channel is full.
 321  //
 322  // Note however that QueueNotification must be used for sending async notifications instead of the this function. This
 323  // approach allows a limit to the number of outstanding requests a client can make without preventing or blocking on
 324  // async notifications.
 325  func (c *WSClient) SendMessage(marshalledJSON []byte, doneChan chan bool) {
 326  	// Don't send the message if disconnected.
 327  	if c.IsDisconnected() {
 328  		if doneChan != nil {
 329  			doneChan <- false
 330  		}
 331  		return
 332  	}
 333  	c.SendChan <- WSResponse{Msg: marshalledJSON, DoneChan: doneChan}
 334  }
 335  
 336  // Start begins processing input and output messages.
 337  func (c *WSClient) Start() {
 338  	T.Ln("starting websocket client", c.Addr)
 339  	// Start processing input and output.
 340  	c.WG.Add(3)
 341  	go c.InHandler()
 342  	go c.NotificationQueueHandler()
 343  	go c.OutHandler()
 344  }
 345  
 346  // WaitForShutdown blocks until the websocket client goroutines are stopped and the connection is closed.
 347  func (c *WSClient) WaitForShutdown() {
 348  	c.WG.Wait()
 349  }
 350  
 351  // InHandler handles all incoming messages for the websocket connection. It must be run as a goroutine.
 352  func (c *WSClient) InHandler() {
 353  out:
 354  	for {
 355  		// Break out of the loop once the quit channel has been closed. Use a non-blocking select here so we fall
 356  		// through otherwise.
 357  		select {
 358  		case <-c.Quit.Wait():
 359  			break out
 360  		default:
 361  		}
 362  		_, msg, e := c.Conn.ReadMessage()
 363  		if e != nil {
 364  			// Log the error if it's not due to disconnecting.
 365  			if e != io.EOF && e != io.ErrUnexpectedEOF {
 366  				T.F(
 367  					"websocket receive error from %s: %v",
 368  					c.Addr, e,
 369  				)
 370  			}
 371  			break out
 372  		}
 373  		var request btcjson.Request
 374  		e = json.Unmarshal(msg, &request)
 375  		if e != nil {
 376  			if !c.Authenticated {
 377  				break out
 378  			}
 379  			jsonErr := &btcjson.RPCError{
 380  				Code:    btcjson.ErrRPCParse.Code,
 381  				Message: "Failed to parse request: " + e.Error(),
 382  			}
 383  			reply, e := CreateMarshalledReply(nil, nil, jsonErr)
 384  			if e != nil {
 385  				E.Ln(
 386  					"failed to marshal parse failure reply:", e,
 387  				)
 388  				continue
 389  			}
 390  			c.SendMessage(reply, nil)
 391  			continue
 392  		}
 393  		// The JSON-RPC 1.0 spec defines that notifications must have their "id" set to null and states that
 394  		// notifications do not have a response.
 395  		//
 396  		// A JSON-RPC 2.0 notification is a request with "json-rpc":"2.0", and without an "id" member. The specification
 397  		// states that notifications must not be responded to. JSON-RPC 2.0 permits the null value as a valid request
 398  		// id, therefore such requests are not notifications.
 399  		//
 400  		// Bitcoin Core serves requests with "id":null or even an absent "id", and responds to such requests with
 401  		// "id":null in the response.
 402  		//
 403  		// Pod does not respond to any request without and "id" or "id":null, regardless the indicated JSON-RPC protocol
 404  		// version unless RPC quirks are enabled. With RPC quirks enabled, such requests will be responded to if the
 405  		// reqeust does not indicate JSON-RPC version.
 406  		//
 407  		// RPC quirks can be enabled by the user to avoid compatibility issues with software relying on Core's behavior.
 408  		if request.ID == nil && !(c.Server.Config.RPCQuirks.True() && request.Jsonrpc == "") {
 409  			if !c.Authenticated {
 410  				break out
 411  			}
 412  			continue
 413  		}
 414  		cmd := ParseCmd(&request)
 415  		if cmd.Err != nil {
 416  			if !c.Authenticated {
 417  				break out
 418  			}
 419  			reply, e := CreateMarshalledReply(cmd.ID, nil, cmd.Err)
 420  			if e != nil {
 421  				E.F("failed to marshal parse failure reply:", e)
 422  				continue
 423  			}
 424  			c.SendMessage(reply, nil)
 425  			continue
 426  		}
 427  		T.F("received command <%s> from %s", cmd.Method, c.Addr)
 428  		// T.Ln(interrupt.GoroutineDump())
 429  		// Chk auth. The client is immediately disconnected if the first request of an unauthenticated websocket
 430  		// client is not the authenticate request, an authenticate request is received when the client is already
 431  		// authenticated, or incorrect authentication credentials are provided in the request.
 432  		switch authCmd, ok := cmd.Cmd.(*btcjson.AuthenticateCmd); {
 433  		case c.Authenticated && ok:
 434  			W.F(
 435  				"websocket client %s is already authenticated", c.Addr,
 436  			)
 437  			break out
 438  		case !c.Authenticated && !ok:
 439  			W.F("unauthenticated websocket message received")
 440  			break out
 441  		case !c.Authenticated:
 442  			// Chk credentials.
 443  			login := authCmd.Username + ":" + authCmd.Passphrase
 444  			auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
 445  			authSha := sha256.Sum256([]byte(auth))
 446  			cmp := subtle.ConstantTimeCompare(authSha[:], c.Server.AuthSHA[:])
 447  			limitcmp := subtle.ConstantTimeCompare(authSha[:], c.Server.LimitAuthSHA[:])
 448  			if cmp != 1 && limitcmp != 1 {
 449  				W.Ln("authentication failure from", c.Addr)
 450  				break out
 451  			}
 452  			c.Authenticated = true
 453  			c.IsAdmin = cmp == 1
 454  			// Marshal and send response.
 455  			reply, e := CreateMarshalledReply(cmd.ID, nil, nil)
 456  			if e != nil {
 457  				E.Ln("failed to marshal authenticate reply:", e)
 458  				continue
 459  			}
 460  			c.SendMessage(reply, nil)
 461  			continue
 462  		}
 463  		// Chk if the client is using limited RPC credentials and error when not authorized to call this RPC.
 464  		if !c.IsAdmin {
 465  			if _, ok := RPCLimited[request.Method]; !ok {
 466  				jsonErr := &btcjson.RPCError{
 467  					Code:    btcjson.ErrRPCInvalidParams.Code,
 468  					Message: "limited user not authorized for this method",
 469  				}
 470  				// Marshal and send response.
 471  				reply, e := CreateMarshalledReply(request.ID, nil, jsonErr)
 472  				if e != nil {
 473  					E.Ln("failed to marshal parse failure reply:", e)
 474  					continue
 475  				}
 476  				c.SendMessage(reply, nil)
 477  				continue
 478  			}
 479  		}
 480  		// Asynchronously handle the request. A semaphore is used to limit the number of concurrent requests currently
 481  		// being serviced. If the semaphore can not be acquired, simply wait until a request finished before reading the
 482  		// next RPC request from the websocket client.
 483  		//
 484  		// This could be a little fancier by timing out and failing when it takes too long to service the request, but
 485  		// if that is done, the read of the next request should not be blocked by this semaphore, otherwise the next
 486  		// request will be read and will probably sit here for another few seconds before timing out as well. This will
 487  		// cause the total timeout duration for later requests to be much longer than the check here would imply.
 488  		//
 489  		// If a timeout is added, the semaphore acquiring should be moved inside of the new goroutine with a select
 490  		// statement that also reads a time.After channel. This will unblock the read of the next request from the
 491  		// websocket client and allow many requests to be waited on concurrently.
 492  		c.ServiceRequestSem.Acquire()
 493  		go func() {
 494  			c.ServiceRequest(cmd)
 495  			c.ServiceRequestSem.Release()
 496  		}()
 497  	}
 498  	// Ensure the connection is closed.
 499  	c.Disconnect()
 500  	c.WG.Done()
 501  	T.Ln("websocket client input handler done for", c.Addr)
 502  }
 503  
 504  // NotificationQueueHandler handles the queuing of outgoing notifications for the websocket client. This runs as a muxer
 505  // for various sources of input to ensure that queuing up notifications to be sent will not block. Otherwise, slow
 506  // clients could bog down the other systems (such as the mempool or block manager) which are queuing the data. The data
 507  // is passed on to outHandler to actually be written. It must be run as a goroutine.
 508  func (c *WSClient) NotificationQueueHandler() {
 509  	ntfnSentChan := make(chan bool, 1) // nonblocking sync
 510  	// pendingNtfns is used as a queue for notifications that are ready to be sent once there are no outstanding
 511  	// notifications currently being sent.
 512  	//
 513  	// The waiting flag is used over simply checking for items in the pending list to ensure cleanup knows what has and
 514  	// hasn't been sent to the outHandler.
 515  	//
 516  	// Currently no special cleanup is needed, however if something like a done channel is added to notifications in the
 517  	// future, not knowing what has and hasn't been sent to the outHandler (and thus who should respond to the done
 518  	// channel) would be problematic without using this approach.
 519  	pendingNtfns := list.New()
 520  	waiting := false
 521  out:
 522  	for {
 523  		select {
 524  		// This channel is notified when a message is being queued to be sent across the network socket.
 525  		//
 526  		// It will either send the message immediately if a send is not already in progress, or queue the message to
 527  		// be sent once the other pending messages are sent.
 528  		case msg := <-c.NtfnChan:
 529  			if !waiting {
 530  				c.SendMessage(msg, ntfnSentChan)
 531  			} else {
 532  				pendingNtfns.PushBack(msg)
 533  			}
 534  			waiting = true
 535  			// This channel is notified when a notification has been sent across the
 536  			// network socket.
 537  		case <-ntfnSentChan:
 538  			// No longer waiting if there are no more messages in the pending
 539  			// messages queue.
 540  			next := pendingNtfns.Front()
 541  			if next == nil {
 542  				waiting = false
 543  				continue
 544  			}
 545  			// Notify the outHandler about the next item to asynchronously send.
 546  			msg := pendingNtfns.Remove(next).([]byte)
 547  			c.SendMessage(msg, ntfnSentChan)
 548  		case <-c.Quit.Wait():
 549  			break out
 550  		}
 551  	}
 552  	// Drain any wait channels before exiting so nothing is left waiting around to send.
 553  cleanup:
 554  	for {
 555  		select {
 556  		case <-c.NtfnChan:
 557  		case <-ntfnSentChan:
 558  		default:
 559  			break cleanup
 560  		}
 561  	}
 562  	c.WG.Done()
 563  	T.Ln("websocket client notification queue handler done for", c.Addr)
 564  }
 565  
 566  // OutHandler handles all outgoing messages for the websocket connection. It must be run as a goroutine.
 567  //
 568  // It uses a buffered channel to serialize output messages while allowing the sender to continue running asynchronously.
 569  //
 570  // It must be run as a goroutine.
 571  func (c *WSClient) OutHandler() {
 572  out:
 573  	for {
 574  		// Send any messages ready for send until the quit channel is closed.
 575  		select {
 576  		case r := <-c.SendChan:
 577  			e := c.Conn.WriteMessage(websocket.TextMessage, r.Msg)
 578  			if e != nil {
 579  				c.Disconnect()
 580  				break out
 581  			}
 582  			if r.DoneChan != nil {
 583  				r.DoneChan <- true
 584  			}
 585  		case <-c.Quit.Wait():
 586  			break out
 587  		}
 588  	}
 589  	// Drain any wait channels before exiting so nothing is left waiting around to send.
 590  cleanup:
 591  	for {
 592  		select {
 593  		case r := <-c.SendChan:
 594  			if r.DoneChan != nil {
 595  				r.DoneChan <- false
 596  			}
 597  		default:
 598  			break cleanup
 599  		}
 600  	}
 601  	c.WG.Done()
 602  	T.Ln("websocket client output handler done for", c.Addr)
 603  }
 604  
 605  // ServiceRequest services a parsed RPC request by looking up and executing the appropriate RPC handler. The response is
 606  // marshalled and sent to the websocket client.
 607  func (c *WSClient) ServiceRequest(r *ParsedRPCCmd) {
 608  	var (
 609  		result interface{}
 610  		e      error
 611  	)
 612  	// Lookup the websocket extension for the command and if it doesn't exist fallback to handling the command as a
 613  	// standard command.
 614  	wsHandler, ok := WSHandlers[r.Method]
 615  	if ok {
 616  		result, e = wsHandler(c, r.Cmd)
 617  	} else {
 618  		result, e = c.Server.StandardCmdResult(r, nil)
 619  	}
 620  	reply, e := CreateMarshalledReply(r.ID, result, e)
 621  	if e != nil {
 622  		E.F("failed to marshal reply for <%s> command: %v", r.Method, e)
 623  		return
 624  	}
 625  	c.SendMessage(reply, nil)
 626  }
 627  
 628  // AddAddress adds an address to a wsClientFilter, treating it correctly based on the type of address passed as an
 629  // argument. NOTE: This extension was ported from github.com/decred/dcrd
 630  func (f *WSClientFilter) AddAddress(a btcaddr.Address) {
 631  	switch a := a.(type) {
 632  	case *btcaddr.PubKeyHash:
 633  		f.PubKeyHashes[*a.Hash160()] = struct{}{}
 634  		return
 635  	case *btcaddr.ScriptHash:
 636  		f.ScriptHashes[*a.Hash160()] = struct{}{}
 637  		return
 638  	case *btcaddr.PubKey:
 639  		serializedPubKey := a.ScriptAddress()
 640  		switch len(serializedPubKey) {
 641  		case 33: // compressed
 642  			var compressedPubKey [33]byte
 643  			copy(compressedPubKey[:], serializedPubKey)
 644  			f.CompressedPubKeys[compressedPubKey] = struct{}{}
 645  			return
 646  		case 65: // uncompressed
 647  			var uncompressedPubKey [65]byte
 648  			copy(uncompressedPubKey[:], serializedPubKey)
 649  			f.UncompressedPubKeys[uncompressedPubKey] = struct{}{}
 650  			return
 651  		}
 652  	}
 653  	f.OtherAddresses[a.EncodeAddress()] = struct{}{}
 654  }
 655  
 656  // AddAddressStr parses an address from a string and then adds it to the wsClientFilter using addAddress.
 657  //
 658  // NOTE: This extension was ported from github.com/decred/dcrd
 659  func (f *WSClientFilter) AddAddressStr(s string, params *chaincfg.Params) {
 660  	// If address can't be decoded, no point in saving it since it should also impossible to create the address from an
 661  	// inspected transaction output script.
 662  	a, e := btcaddr.Decode(s, params)
 663  	if e != nil {
 664  		return
 665  	}
 666  	f.AddAddress(a)
 667  }
 668  
 669  // AddUnspentOutPoint adds an outpoint to the wsClientFilter.
 670  //
 671  // NOTE: This extension was ported from github.com/decred/dcrd
 672  func (f *WSClientFilter) AddUnspentOutPoint(op *wire.OutPoint) {
 673  	f.Unspent[*op] = struct{}{}
 674  }
 675  
 676  // ExistsAddress returns true if the passed address has been added to the wsClientFilter. NOTE: This extension was
 677  // ported from github.com/decred/dcrd
 678  func (f *WSClientFilter) ExistsAddress(a btcaddr.Address) bool {
 679  	switch a := a.(type) {
 680  	case *btcaddr.PubKeyHash:
 681  		_, ok := f.PubKeyHashes[*a.Hash160()]
 682  		return ok
 683  	case *btcaddr.ScriptHash:
 684  		_, ok := f.ScriptHashes[*a.Hash160()]
 685  		return ok
 686  	case *btcaddr.PubKey:
 687  		serializedPubKey := a.ScriptAddress()
 688  		switch len(serializedPubKey) {
 689  		case 33: // compressed
 690  			var compressedPubKey [33]byte
 691  			copy(compressedPubKey[:], serializedPubKey)
 692  			_, ok := f.CompressedPubKeys[compressedPubKey]
 693  			if !ok {
 694  				_, ok = f.PubKeyHashes[*a.PubKeyHash().Hash160()]
 695  			}
 696  			return ok
 697  		case 65: // uncompressed
 698  			var uncompressedPubKey [65]byte
 699  			copy(uncompressedPubKey[:], serializedPubKey)
 700  			_, ok := f.UncompressedPubKeys[uncompressedPubKey]
 701  			if !ok {
 702  				_, ok = f.PubKeyHashes[*a.PubKeyHash().Hash160()]
 703  			}
 704  			return ok
 705  		}
 706  	}
 707  	_, ok := f.OtherAddresses[a.EncodeAddress()]
 708  	return ok
 709  }
 710  
 711  // ExistsUnspentOutPoint returns true if the passed outpoint has been added to the wsClientFilter.
 712  //
 713  // NOTE: This extension was ported from github.com/decred/dcrd
 714  func (f *WSClientFilter) ExistsUnspentOutPoint(op *wire.OutPoint) bool {
 715  	_, ok := f.Unspent[*op]
 716  	return ok
 717  }
 718  
 719  // // removeAddress removes the passed address, if it exists, from the
 720  // wsClientFilter. NOTE: This extension was ported from github.com/decred/dcrd
 721  // func (f *wsClientFilter) removeAddress(a util.Address) {
 722  // 	switch a := a.(type) {
 723  // 	case *util.PubKeyHash:
 724  // 		delete(f.pubKeyHashes, *a.Hash160())
 725  // 		return
 726  // 	case *util.ScriptHash:
 727  // 		delete(f.scriptHashes, *a.Hash160())
 728  // 		return
 729  // 	case *util.PubKey:
 730  // 		serializedPubKey := a.ScriptAddress()
 731  // 		switch len(serializedPubKey) {
 732  // 		case 33: // compressed
 733  // 			var compressedPubKey [33]byte
 734  // 			copy(compressedPubKey[:], serializedPubKey)
 735  // 			delete(f.compressedPubKeys, compressedPubKey)
 736  // 			return
 737  // 		case 65: // uncompressed
 738  // 			var uncompressedPubKey [65]byte
 739  // 			copy(uncompressedPubKey[:], serializedPubKey)
 740  // 			delete(f.uncompressedPubKeys, uncompressedPubKey)
 741  // 			return
 742  // 		}
 743  // 	}
 744  // 	delete(f.otherAddresses, a.EncodeAddress())
 745  // }
 746  // // removeAddressStr parses an address from a string and then removes it from
 747  // // the wsClientFilter using removeAddress. NOTE: This extension was ported
 748  // // from github.com/decred/dcrd
 749  // func (f *wsClientFilter) removeAddressStr(s string, netparams *chaincfg.Params) {
 750  // 	a, e := util.Decode(s, netparams)
 751  // 	if e ==  nil {
 752  // 		f.removeAddress(a)
 753  // 	} else {
 754  // 		delete(f.otherAddresses, s)
 755  // 	}
 756  // }
 757  // // removeUnspentOutPoint removes the passed outpoint, if it exists, from the
 758  // wsClientFilter. NOTE: This extension was ported from github.com/decred/dcrd
 759  // func (f *wsClientFilter) removeUnspentOutPoint(op *wire.OutPoint) {
 760  // 	delete(f.unspent, *op)
 761  // }
 762  
 763  // AddClient adds the passed websocket client to the notification manager.
 764  func (m *WSNtfnMgr) AddClient(wsc *WSClient) {
 765  	m.QueueNotification <- (*NotificationRegisterClient)(wsc)
 766  }
 767  
 768  // SendNotifyBlockConnected passes a block newly-connected to the best chain to the notification manager for block and
 769  // transaction notification processing.
 770  func (m *WSNtfnMgr) SendNotifyBlockConnected(block *block.Block) {
 771  	// As NotifyBlockConnected will be called by the block manager and the RPC Server may no longer be running, use a
 772  	// select statement to unblock enqueuing the notification once the RPC Server has begun shutting down.
 773  	select {
 774  	case m.QueueNotification <- (*NotificationBlockConnected)(block):
 775  	case <-m.Quit.Wait():
 776  	}
 777  }
 778  
 779  // SendNotifyBlockDisconnected passes a block disconnected from the best chain to the notification manager for block
 780  // notification processing.
 781  func (m *WSNtfnMgr) SendNotifyBlockDisconnected(block *block.Block) {
 782  	// As NotifyBlockDisconnected will be called by the block manager and the RPC Server may no longer be running, use a
 783  	// select statement to unblock enqueuing the notification once the RPC Server has begun shutting down.
 784  	select {
 785  	case m.QueueNotification <- (*NotificationBlockDisconnected)(block):
 786  	case <-m.Quit.Wait():
 787  	}
 788  }
 789  
 790  // SendNotifyMempoolTx passes a transaction accepted by mempool to the notification manager for transaction notification
 791  // processing. If isNew is true, the tx is is a new transaction, rather than one added to the mempool during a reorg.
 792  func (m *WSNtfnMgr) SendNotifyMempoolTx(tx *util.Tx, isNew bool) {
 793  	n := &NotificationTxAcceptedByMempool{
 794  		IsNew: isNew,
 795  		Tx:    tx,
 796  	}
 797  	// As NotifyMempoolTx will be called by mempool and the RPC Server may no longer be running, use a select statement
 798  	// to unblock enqueuing the notification once the RPC Server has begun shutting down.
 799  	select {
 800  	case m.QueueNotification <- n:
 801  	case <-m.Quit.Wait():
 802  	}
 803  }
 804  
 805  // GetNumClients returns the number of clients actively being served.
 806  func (m *WSNtfnMgr) GetNumClients() (n int) {
 807  	select {
 808  	case n = <-m.NumClients:
 809  	case <-m.Quit.Wait(): // Use default n (0) if Server has shut down.
 810  	}
 811  	return
 812  }
 813  
 814  // RegisterBlockUpdates requests block update notifications to the passed websocket client.
 815  func (m *WSNtfnMgr) RegisterBlockUpdates(wsc *WSClient) {
 816  	m.QueueNotification <- (*NotificationRegisterBlocks)(wsc)
 817  }
 818  
 819  // RegisterNewMempoolTxsUpdates requests notifications to the passed websocket client when new transactions are added to
 820  // the memory pool.
 821  func (m *WSNtfnMgr) RegisterNewMempoolTxsUpdates(wsc *WSClient) {
 822  	m.QueueNotification <- (*NotificationRegisterNewMempoolTxs)(wsc)
 823  }
 824  
 825  // RegisterSpentRequests requests a notification when each of the passed outpoints is confirmed spent (contained in a
 826  // block connected to the main chain) for the passed websocket client. The request is automatically removed once the
 827  // notification has been sent.
 828  func (m *WSNtfnMgr) RegisterSpentRequests(wsc *WSClient, ops []*wire.OutPoint) {
 829  	m.QueueNotification <- &NotificationRegisterSpent{
 830  		WSC: wsc,
 831  		OPs: ops,
 832  	}
 833  }
 834  
 835  // RegisterTxOutAddressRequests requests notifications to the passed websocket client when a transaction output spends
 836  // to the passed address.
 837  func (m *WSNtfnMgr) RegisterTxOutAddressRequests(
 838  	wsc *WSClient, addrs []string,
 839  ) {
 840  	m.QueueNotification <- &NotificationRegisterAddr{
 841  		WSC:   wsc,
 842  		Addrs: addrs,
 843  	}
 844  }
 845  
 846  // RemoveClient removes the passed websocket client and all notifications registered for it.
 847  func (m *WSNtfnMgr) RemoveClient(wsc *WSClient) {
 848  	select {
 849  	case m.QueueNotification <- (*NotificationUnregisterClient)(wsc):
 850  	case <-m.Quit.Wait():
 851  	}
 852  }
 853  
 854  // Shutdown shuts down the manager, stopping the notification queue and notification handler goroutines.
 855  func (m *WSNtfnMgr) Shutdown() {
 856  	m.Quit.Q()
 857  }
 858  
 859  // Start starts the goroutines required for the manager to queue and process websocket client notifications.
 860  func (m *WSNtfnMgr) Start() {
 861  	go m.QueueHandler()
 862  	go m.NotificationHandler()
 863  }
 864  
 865  // UnregisterBlockUpdates removes block update notifications for the passed websocket client.
 866  func (m *WSNtfnMgr) UnregisterBlockUpdates(wsc *WSClient) {
 867  	m.QueueNotification <- (*NotificationUnregisterBlocks)(wsc)
 868  }
 869  
 870  // UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket client when new transaction are added to
 871  // the memory pool.
 872  func (m *WSNtfnMgr) UnregisterNewMempoolTxsUpdates(wsc *WSClient) {
 873  	m.QueueNotification <- (*NotificationUnregisterNewMempoolTxs)(wsc)
 874  }
 875  
 876  // UnregisterSpentRequest removes a request from the passed websocket client to be notified when the passed outpoint is
 877  // confirmed spent (contained in a block connected to the main chain).
 878  func (m *WSNtfnMgr) UnregisterSpentRequest(
 879  	wsc *WSClient,
 880  	op *wire.OutPoint,
 881  ) {
 882  	m.QueueNotification <- &NotificationUnregisterSpent{
 883  		WSC: wsc,
 884  		OP:  op,
 885  	}
 886  }
 887  
 888  // UnregisterTxOutAddressRequest removes a request from the passed websocket client to be notified when a transaction
 889  // spends to the passed address.
 890  func (m *WSNtfnMgr) UnregisterTxOutAddressRequest(wsc *WSClient, addr string) {
 891  	m.QueueNotification <- &NotificationUnregisterAddr{
 892  		WSC:  wsc,
 893  		Addr: addr,
 894  	}
 895  }
 896  
 897  // WaitForShutdown blocks until all notification manager goroutines have finished.
 898  func (m *WSNtfnMgr) WaitForShutdown() {
 899  	m.WG.Wait()
 900  }
 901  
 902  // AddAddrRequests adds the websocket client wsc to the address to client set addrMap so wsc will be notified for any
 903  // mempool or block transaction outputs spending to any of the addresses in addrs.
 904  func (*WSNtfnMgr) AddAddrRequests(
 905  	addrMap map[string]map[qu.C]*WSClient, wsc *WSClient, addrs []string,
 906  ) {
 907  	for _, addr := range addrs {
 908  		// Track the request in the client as well so it can be quickly be removed on disconnect.
 909  		wsc.AddrRequests[addr] = struct{}{}
 910  		// Add the client to the set of clients to notify when the outpoint is seen. Create map as needed.
 911  		cmap, ok := addrMap[addr]
 912  		if !ok {
 913  			cmap = make(map[qu.C]*WSClient)
 914  			addrMap[addr] = cmap
 915  		}
 916  		cmap[wsc.Quit] = wsc
 917  	}
 918  }
 919  
 920  // AddSpentRequests modifies a map of watched outpoints to sets of websocket clients to add a new request watch all of
 921  // the outpoints in ops and create and send a notification when spent to the websocket client wsc.
 922  func (m *WSNtfnMgr) AddSpentRequests(
 923  	opMap map[wire.
 924  OutPoint]map[qu.C]*WSClient, wsc *WSClient, ops []*wire.OutPoint,
 925  ) {
 926  	for _, op := range ops {
 927  		// Track the request in the client as well so it can be quickly be removed on disconnect.
 928  		wsc.SpentRequests[*op] = struct{}{}
 929  		// Add the client to the list to notify when the outpoint is seen. Create the list as needed.
 930  		cmap, ok := opMap[*op]
 931  		if !ok {
 932  			cmap = make(map[qu.C]*WSClient)
 933  			opMap[*op] = cmap
 934  		}
 935  		cmap[wsc.Quit] = wsc
 936  	}
 937  	// Chk if any transactions spending these outputs already exists in the mempool, if so send the notification
 938  	// immediately.
 939  	spends := make(map[chainhash.Hash]*util.Tx)
 940  	for _, op := range ops {
 941  		spend := m.Server.Cfg.TxMemPool.CheckSpend(*op)
 942  		if spend != nil {
 943  			D.F(
 944  				"found existing mempool spend for outpoint<%v>: %v",
 945  				op, spend.Hash(),
 946  			)
 947  			spends[*spend.Hash()] = spend
 948  		}
 949  	}
 950  	for _, spend := range spends {
 951  		m.NotifyForTx(opMap, nil, spend, nil)
 952  	}
 953  }
 954  
 955  // NotificationHandler reads notifications and control messages from the queue handler and processes one at a time.
 956  func (m *WSNtfnMgr) NotificationHandler() {
 957  	// clients is a map of all currently connected websocket clients.
 958  	clients := make(map[qu.C]*WSClient)
 959  	// Maps used to hold lists of websocket clients to be notified on certain events. Each websocket client also keeps
 960  	// maps for the events which have multiple triggers to make removal from these lists on connection close less
 961  	// horrendously.
 962  	//
 963  	// Where possible, the quit channel is used as the unique id for a client since it is quite a bit more efficient
 964  	// than using the entire struct.
 965  	blockNotifications := make(map[qu.C]*WSClient)
 966  	txNotifications := make(map[qu.C]*WSClient)
 967  	watchedOutPoints := make(map[wire.OutPoint]map[qu.C]*WSClient)
 968  	watchedAddrs := make(map[string]map[qu.C]*WSClient)
 969  out:
 970  	for {
 971  		select {
 972  		case n, ok := <-m.NotificationMsgs:
 973  			if !ok {
 974  				// queueHandler quit.
 975  				break out
 976  			}
 977  			switch n := n.(type) {
 978  			case *NotificationBlockConnected:
 979  				block := (*block.Block)(n)
 980  				// Skip iterating through all txs if no tx notification requests exist.
 981  				if len(watchedOutPoints) != 0 || len(watchedAddrs) != 0 {
 982  					for _, tx := range block.Transactions() {
 983  						m.NotifyForTx(
 984  							watchedOutPoints,
 985  							watchedAddrs, tx, block,
 986  						)
 987  					}
 988  				}
 989  				if len(blockNotifications) != 0 {
 990  					m.NotifyBlockConnected(
 991  						blockNotifications,
 992  						block,
 993  					)
 994  					m.NotifyFilteredBlockConnected(
 995  						blockNotifications,
 996  						block,
 997  					)
 998  				}
 999  			case *NotificationBlockDisconnected:
1000  				block := (*block.Block)(n)
1001  				if len(blockNotifications) != 0 {
1002  					m.NotifyBlockDisconnected(
1003  						blockNotifications,
1004  						block,
1005  					)
1006  					m.NotifyFilteredBlockDisconnected(
1007  						blockNotifications,
1008  						block,
1009  					)
1010  				}
1011  			case *NotificationTxAcceptedByMempool:
1012  				if n.IsNew && len(txNotifications) != 0 {
1013  					m.NotifyForNewTx(txNotifications, n.Tx)
1014  				}
1015  				m.NotifyForTx(watchedOutPoints, watchedAddrs, n.Tx, nil)
1016  				m.NotifyRelevantTxAccepted(n.Tx, clients)
1017  			case *NotificationRegisterBlocks:
1018  				wsc := (*WSClient)(n)
1019  				blockNotifications[wsc.Quit] = wsc
1020  			case *NotificationUnregisterBlocks:
1021  				wsc := (*WSClient)(n)
1022  				delete(blockNotifications, wsc.Quit)
1023  			case *NotificationRegisterClient:
1024  				wsc := (*WSClient)(n)
1025  				clients[wsc.Quit] = wsc
1026  			case *NotificationUnregisterClient:
1027  				wsc := (*WSClient)(n)
1028  				// Remove any requests made by the client as well as the client itself.
1029  				delete(blockNotifications, wsc.Quit)
1030  				delete(txNotifications, wsc.Quit)
1031  				for k := range wsc.SpentRequests {
1032  					op := k
1033  					m.RemoveSpentRequest(watchedOutPoints, wsc, &op)
1034  				}
1035  				for addr := range wsc.AddrRequests {
1036  					m.RemoveAddrRequest(watchedAddrs, wsc, addr)
1037  				}
1038  				delete(clients, wsc.Quit)
1039  			case *NotificationRegisterSpent:
1040  				m.AddSpentRequests(watchedOutPoints, n.WSC, n.OPs)
1041  			case *NotificationUnregisterSpent:
1042  				m.RemoveSpentRequest(watchedOutPoints, n.WSC, n.OP)
1043  			case *NotificationRegisterAddr:
1044  				m.AddAddrRequests(watchedAddrs, n.WSC, n.Addrs)
1045  			case *NotificationUnregisterAddr:
1046  				m.RemoveAddrRequest(watchedAddrs, n.WSC, n.Addr)
1047  			case *NotificationRegisterNewMempoolTxs:
1048  				wsc := (*WSClient)(n)
1049  				txNotifications[wsc.Quit] = wsc
1050  			case *NotificationUnregisterNewMempoolTxs:
1051  				wsc := (*WSClient)(n)
1052  				delete(txNotifications, wsc.Quit)
1053  			default:
1054  				W.Ln("unhandled notification type")
1055  			}
1056  		case m.NumClients <- len(clients):
1057  		case <-m.Quit.Wait():
1058  			// RPC Server shutting down.
1059  			break out
1060  		}
1061  	}
1062  	for _, c := range clients {
1063  		c.Disconnect()
1064  	}
1065  	m.WG.Done()
1066  }
1067  
1068  // NotifyBlockConnected notifies websocket clients that have registered for block updates when a block is connected to
1069  // the main chain.
1070  func (*WSNtfnMgr) NotifyBlockConnected(
1071  	clients map[qu.C]*WSClient, block *block.Block,
1072  ) {
1073  	// Notify interested websocket clients about the connected block.
1074  	ntfn := btcjson.NewBlockConnectedNtfn(
1075  		block.Hash().String(), block.Height(),
1076  		block.WireBlock().Header.Timestamp.Unix(),
1077  	)
1078  	marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
1079  	if e != nil {
1080  		E.Ln("failed to marshal block connected notification:", e)
1081  		return
1082  	}
1083  	for _, wsc := range clients {
1084  		e := wsc.QueueNotification(marshalledJSON)
1085  		if e != nil {
1086  		}
1087  	}
1088  }
1089  
1090  // NotifyBlockDisconnected notifies websocket clients that have registered for block updates when a block is
1091  // disconnected from the main chain (due to a reorganize).
1092  func (*WSNtfnMgr) NotifyBlockDisconnected(
1093  	clients map[qu.C]*WSClient, block *block.Block,
1094  ) {
1095  	// Skip notification creation if no clients have requested block connected/ disconnected notifications.
1096  	if len(clients) == 0 {
1097  		return
1098  	}
1099  	// Notify interested websocket clients about the disconnected block.
1100  	ntfn := btcjson.NewBlockDisconnectedNtfn(
1101  		block.Hash().String(),
1102  		block.Height(), block.WireBlock().Header.Timestamp.Unix(),
1103  	)
1104  	marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
1105  	if e != nil {
1106  		E.Ln(
1107  			"failed to marshal block disconnected notification:",
1108  			e,
1109  		)
1110  		return
1111  	}
1112  	for _, wsc := range clients {
1113  		e := wsc.QueueNotification(marshalledJSON)
1114  		if e != nil {
1115  		}
1116  	}
1117  }
1118  
1119  // NotifyFilteredBlockConnected notifies websocket clients that have registered for block updates when a block is
1120  // connected to the main chain.
1121  func (m *WSNtfnMgr) NotifyFilteredBlockConnected(
1122  	clients map[qu.C]*WSClient, block *block.Block,
1123  ) {
1124  	// Create the common portion of the notification that is the same for every client.
1125  	var w bytes.Buffer
1126  	e := block.WireBlock().Header.Serialize(&w)
1127  	if e != nil {
1128  		E.Ln(
1129  			"failed to serialize header for filtered block connected"+
1130  				" notification:", e,
1131  		)
1132  		return
1133  	}
1134  	ntfn := btcjson.NewFilteredBlockConnectedNtfn(
1135  		block.Height(),
1136  		hex.EncodeToString(w.Bytes()), nil,
1137  	)
1138  	// Search for relevant transactions for each client and save them serialized in hex encoding for the notification.
1139  	subscribedTxs := make(map[qu.C][]string)
1140  	for _, tx := range block.Transactions() {
1141  		var txHex string
1142  		for quitChan := range m.GetSubscribedClients(tx, clients) {
1143  			if txHex == "" {
1144  				txHex = TxHexString(tx.MsgTx())
1145  			}
1146  			subscribedTxs[quitChan] = append(subscribedTxs[quitChan], txHex)
1147  		}
1148  	}
1149  	for quitChan, wsc := range clients {
1150  		// Add all discovered transactions for this client. For clients that have no new-style filter, add the empty
1151  		// string slice.
1152  		ntfn.SubscribedTxs = subscribedTxs[quitChan]
1153  		// Marshal and queue notification.
1154  		marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
1155  		if e != nil {
1156  			E.F("failed to marshal filtered block connected notification:", e)
1157  			return
1158  		}
1159  		e = wsc.QueueNotification(marshalledJSON)
1160  		if e != nil {
1161  			D.Ln(e)
1162  		}
1163  	}
1164  }
1165  
1166  // NotifyFilteredBlockDisconnected notifies websocket clients that have registered for block updates when a block is
1167  // disconnected from the main chain (due to a reorganize).
1168  func (*WSNtfnMgr) NotifyFilteredBlockDisconnected(
1169  	clients map[qu.C]*WSClient, block *block.Block,
1170  ) {
1171  	// Skip notification creation if no clients have requested block connected/ disconnected notifications.
1172  	if len(clients) == 0 {
1173  		return
1174  	}
1175  	// Notify interested websocket clients about the disconnected block.
1176  	var w bytes.Buffer
1177  	e := block.WireBlock().Header.Serialize(&w)
1178  	if e != nil {
1179  		E.Ln("failed to serialize header for filtered block disconnected notification:", e)
1180  		return
1181  	}
1182  	ntfn := btcjson.NewFilteredBlockDisconnectedNtfn(block.Height(), hex.EncodeToString(w.Bytes()))
1183  	marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
1184  	if e != nil {
1185  		E.Ln("failed to marshal filtered block disconnected notification:", e)
1186  		return
1187  	}
1188  	for _, wsc := range clients {
1189  		e := wsc.QueueNotification(marshalledJSON)
1190  		if e != nil {
1191  			D.Ln(e)
1192  		}
1193  	}
1194  }
1195  
1196  // NotifyForNewTx notifies websocket clients that have registered for updates when a new transaction is added to the
1197  // memory pool.
1198  func (m *WSNtfnMgr) NotifyForNewTx(
1199  	clients map[qu.C]*WSClient,
1200  	tx *util.Tx,
1201  ) {
1202  	txHashStr := tx.Hash().String()
1203  	mtx := tx.MsgTx()
1204  	var amount int64
1205  	for _, txOut := range mtx.TxOut {
1206  		amount += txOut.Value
1207  	}
1208  	ntfn := btcjson.NewTxAcceptedNtfn(txHashStr, amount2.Amount(amount).ToDUO())
1209  	marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
1210  	if e != nil {
1211  		E.Ln("failed to marshal tx notification:", e)
1212  		return
1213  	}
1214  	var verboseNtfn *btcjson.TxAcceptedVerboseNtfn
1215  	var marshalledJSONVerbose []byte
1216  	for _, wsc := range clients {
1217  		if wsc.VerboseTxUpdates {
1218  			net := m.Server.Cfg.ChainParams
1219  			var rawTx *btcjson.TxRawResult
1220  			rawTx, e = CreateTxRawResult(
1221  				net, mtx, txHashStr, nil,
1222  				"", 0, 0,
1223  			)
1224  			if e != nil {
1225  				return
1226  			}
1227  			verboseNtfn = btcjson.NewTxAcceptedVerboseNtfn(*rawTx)
1228  			marshalledJSONVerbose, e = btcjson.MarshalCmd(
1229  				nil,
1230  				verboseNtfn,
1231  			)
1232  			if e != nil {
1233  				E.Ln("failed to marshal verbose tx notification:", e)
1234  			}
1235  			if marshalledJSONVerbose != nil {
1236  				e = wsc.QueueNotification(marshalledJSONVerbose)
1237  				if e != nil {
1238  					D.Ln(e)
1239  				}
1240  				continue
1241  			}
1242  			return
1243  		}
1244  		e = wsc.QueueNotification(marshalledJSONVerbose)
1245  		if e != nil {
1246  		} else {
1247  			e := wsc.QueueNotification(marshalledJSON)
1248  			if e != nil {
1249  			}
1250  		}
1251  	}
1252  }
1253  
1254  // NotifyForTx examines the inputs and outputs of the passed transaction, notifying websocket clients of outputs
1255  // spending to a watched address and inputs spending a watched outpoint.
1256  func (m *WSNtfnMgr) NotifyForTx(
1257  	ops map[wire.OutPoint]map[qu.C]*WSClient,
1258  	addrs map[string]map[qu.C]*WSClient, tx *util.Tx, block *block.Block,
1259  ) {
1260  	if len(ops) != 0 {
1261  		m.NotifyForTxIns(ops, tx, block)
1262  	}
1263  	if len(addrs) != 0 {
1264  		m.NotifyForTxOuts(ops, addrs, tx, block)
1265  	}
1266  }
1267  
1268  // NotifyForTxIns examines the inputs of the passed transaction and sends interested websocket clients a redeemingtx
1269  // notification if any inputs spend a watched output. If block is non-nil, any matching spent requests are removed.
1270  func (m *WSNtfnMgr) NotifyForTxIns(
1271  	ops map[wire.
1272  OutPoint]map[qu.C]*WSClient, tx *util.Tx, block *block.Block,
1273  ) {
1274  	// Nothing to do if nobody is watching outpoints.
1275  	if len(ops) == 0 {
1276  		return
1277  	}
1278  	txHex := ""
1279  	wscNotified := make(map[qu.C]struct{})
1280  	for _, txIn := range tx.MsgTx().TxIn {
1281  		prevOut := &txIn.PreviousOutPoint
1282  		if cmap, ok := ops[*prevOut]; ok {
1283  			if txHex == "" {
1284  				txHex = TxHexString(tx.MsgTx())
1285  			}
1286  			marshalledJSON, e := NewRedeemingTxNotification(txHex, tx.Index(), block)
1287  			if e != nil {
1288  				E.Ln(
1289  					"failed to marshal redeemingtx notification:", e,
1290  				)
1291  				continue
1292  			}
1293  			for wscQuit, wsc := range cmap {
1294  				if block != nil {
1295  					m.RemoveSpentRequest(ops, wsc, prevOut)
1296  				}
1297  				if _, ok := wscNotified[wscQuit]; !ok {
1298  					wscNotified[wscQuit] = struct{}{}
1299  					e := wsc.QueueNotification(marshalledJSON)
1300  					if e != nil {
1301  						D.Ln(e)
1302  					}
1303  				}
1304  			}
1305  		}
1306  	}
1307  }
1308  
1309  // NotifyForTxOuts examines each transaction output, notifying interested websocket clients of the transaction if an
1310  // output spends to a watched address. A spent notification request is automatically registered for the client for each
1311  // matching output.
1312  func (m *WSNtfnMgr) NotifyForTxOuts(
1313  	ops map[wire.OutPoint]map[qu.C]*WSClient,
1314  	addrs map[string]map[qu.C]*WSClient, tx *util.Tx, block *block.Block,
1315  ) {
1316  	// Nothing to do if nobody is listening for address notifications.
1317  	if len(addrs) == 0 {
1318  		return
1319  	}
1320  	txHex := ""
1321  	wscNotified := make(map[qu.C]struct{})
1322  	for i, txOut := range tx.MsgTx().TxOut {
1323  		var txAddrs []btcaddr.Address
1324  		var e error
1325  		_, txAddrs, _, e = txscript.ExtractPkScriptAddrs(
1326  			txOut.PkScript, m.Server.Cfg.ChainParams,
1327  		)
1328  		if e != nil {
1329  			continue
1330  		}
1331  		for _, txAddr := range txAddrs {
1332  			cmap, ok := addrs[txAddr.EncodeAddress()]
1333  			if !ok {
1334  				continue
1335  			}
1336  			if txHex == "" {
1337  				txHex = TxHexString(tx.MsgTx())
1338  			}
1339  			ntfn := btcjson.NewRecvTxNtfn(
1340  				txHex, BlockDetails(
1341  					block,
1342  					tx.Index(),
1343  				),
1344  			)
1345  			marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
1346  			if e != nil {
1347  				E.Ln("Failed to marshal processedtx notification:", e)
1348  				continue
1349  			}
1350  			op := []*wire.OutPoint{wire.NewOutPoint(tx.Hash(), uint32(i))}
1351  			for wscQuit, wsc := range cmap {
1352  				m.AddSpentRequests(ops, wsc, op)
1353  				if _, ok := wscNotified[wscQuit]; !ok {
1354  					wscNotified[wscQuit] = struct{}{}
1355  					e := wsc.QueueNotification(marshalledJSON)
1356  					if e != nil {
1357  					}
1358  				}
1359  			}
1360  		}
1361  	}
1362  }
1363  
1364  // NotifyRelevantTxAccepted examines the inputs and outputs of the passed transaction, notifying websocket clients of
1365  // outputs spending to a watched address and inputs spending a watched outpoint.
1366  //
1367  // Any outputs paying to a watched address result in the output being watched as well for future notifications.
1368  func (m *WSNtfnMgr) NotifyRelevantTxAccepted(
1369  	tx *util.Tx, clients map[qu.C]*WSClient,
1370  ) {
1371  	clientsToNotify := m.GetSubscribedClients(tx, clients)
1372  	if len(clientsToNotify) != 0 {
1373  		n := btcjson.NewRelevantTxAcceptedNtfn(TxHexString(tx.MsgTx()))
1374  		marshalled, e := btcjson.MarshalCmd(nil, n)
1375  		if e != nil {
1376  			E.Ln("failed to marshal notification:", e)
1377  			return
1378  		}
1379  		for quitChan := range clientsToNotify {
1380  			e := clients[quitChan].QueueNotification(marshalled)
1381  			if e != nil {
1382  			}
1383  		}
1384  	}
1385  }
1386  
1387  // QueueHandler maintains a queue of notifications and notification handler control messages.
1388  func (m *WSNtfnMgr) QueueHandler() {
1389  	QueueHandler(m.QueueNotification, m.NotificationMsgs, m.Quit)
1390  	m.WG.Done()
1391  }
1392  
1393  // RemoveAddrRequest removes the websocket client wsc from the address to client set addrs so it will no longer receive
1394  // notification updates for any transaction outputs send to addr.
1395  func (*WSNtfnMgr) RemoveAddrRequest(
1396  	addrs map[string]map[qu.C]*WSClient, wsc *WSClient, addr string,
1397  ) {
1398  	// Remove the request tracking from the client.
1399  	delete(wsc.AddrRequests, addr)
1400  	// Remove the client from the list to notify.
1401  	cmap, ok := addrs[addr]
1402  	if !ok {
1403  		W.F(
1404  			"attempt to remove nonexistent addr request <%s> for websocket client %s",
1405  			addr, wsc.Addr,
1406  		)
1407  		return
1408  	}
1409  	delete(cmap, wsc.Quit)
1410  	// Remove the map entry altogether if there are no more clients interested in it.
1411  	if len(cmap) == 0 {
1412  		delete(addrs, addr)
1413  	}
1414  }
1415  
1416  // RemoveSpentRequest modifies a map of watched outpoints to remove the websocket client wsc from the set of clients to
1417  // be notified when a watched outpoint is spent. If wsc is the last client, the outpoint key is removed from the map.
1418  func (*WSNtfnMgr) RemoveSpentRequest(
1419  	ops map[wire.
1420  OutPoint]map[qu.C]*WSClient, wsc *WSClient, op *wire.OutPoint,
1421  ) {
1422  	// Remove the request tracking from the client.
1423  	delete(wsc.SpentRequests, *op)
1424  	// Remove the client from the list to notify.
1425  	notifyMap, ok := ops[*op]
1426  	if !ok {
1427  		W.Ln(
1428  			"attempt to remove nonexistent spent request for"+
1429  				" websocket client", wsc.Addr,
1430  		)
1431  		return
1432  	}
1433  	delete(notifyMap, wsc.Quit)
1434  	// Remove the map entry altogether if there are no more clients interested in it.
1435  	if len(notifyMap) == 0 {
1436  		delete(ops, *op)
1437  	}
1438  }
1439  
1440  // GetSubscribedClients returns the set of all websocket client quit channels that are registered to receive
1441  // notifications regarding tx, either due to tx spending a watched output or outputting to a watched address.
1442  //
1443  // Matching client's filters are updated based on this transaction's outputs and output addresses that may be relevant
1444  // for a client.
1445  func (m *WSNtfnMgr) GetSubscribedClients(
1446  	tx *util.Tx,
1447  	clients map[qu.C]*WSClient,
1448  ) map[qu.C]struct{} {
1449  	// Use a map of client quit channels as keys to prevent duplicates when multiple inputs and/or outputs are relevant
1450  	// to the client.
1451  	subscribed := make(map[qu.C]struct{})
1452  	msgTx := tx.MsgTx()
1453  	for _, input := range msgTx.TxIn {
1454  		for quitChan, wsc := range clients {
1455  			wsc.Lock()
1456  			filter := wsc.FilterData
1457  			wsc.Unlock()
1458  			if filter == nil {
1459  				continue
1460  			}
1461  			filter.mu.Lock()
1462  			if filter.ExistsUnspentOutPoint(&input.PreviousOutPoint) {
1463  				subscribed[quitChan] = struct{}{}
1464  			}
1465  			filter.mu.Unlock()
1466  		}
1467  	}
1468  	var e error
1469  	for i, output := range msgTx.TxOut {
1470  		var addrs []btcaddr.Address
1471  		_, addrs, _, e = txscript.ExtractPkScriptAddrs(
1472  			output.PkScript, m.Server.Cfg.ChainParams,
1473  		)
1474  		if e != nil {
1475  			// Clients are not able to subscribe to nonstandard or non-address outputs.
1476  			continue
1477  		}
1478  		for quitChan, wsc := range clients {
1479  			wsc.Lock()
1480  			filter := wsc.FilterData
1481  			wsc.Unlock()
1482  			if filter == nil {
1483  				continue
1484  			}
1485  			filter.mu.Lock()
1486  			for _, a := range addrs {
1487  				if filter.ExistsAddress(a) {
1488  					subscribed[quitChan] = struct{}{}
1489  					op := wire.OutPoint{
1490  						Hash:  *tx.Hash(),
1491  						Index: uint32(i),
1492  					}
1493  					filter.AddUnspentOutPoint(&op)
1494  				}
1495  			}
1496  			filter.mu.Unlock()
1497  		}
1498  	}
1499  	return subscribed
1500  }
1501  func (s Semaphore) Acquire() {
1502  	s <- struct{}{}
1503  }
1504  func (s Semaphore) Release() {
1505  	<-s
1506  }
1507  
1508  // BlockDetails creates a BlockDetails struct to include in btcws notifications from a block and a transaction's block
1509  // index.
1510  func BlockDetails(block *block.Block, txIndex int) *btcjson.BlockDetails {
1511  	if block == nil {
1512  		return nil
1513  	}
1514  	return &btcjson.BlockDetails{
1515  		Height: block.Height(),
1516  		Hash:   block.Hash().String(),
1517  		Index:  txIndex,
1518  		Time:   block.WireBlock().Header.Timestamp.Unix(),
1519  	}
1520  }
1521  
1522  // CheckAddressValidity checks the validity of each address in the passed string slice. It does this by attempting to
1523  // decode each address using the current active network parameters.
1524  //
1525  // If any single address fails to decode properly, the function returns an error. Otherwise, nil is returned.
1526  func CheckAddressValidity(addrs []string, params *chaincfg.Params) (e error) {
1527  	for _, addr := range addrs {
1528  		_, e := btcaddr.Decode(addr, params)
1529  		if e != nil {
1530  			return &btcjson.RPCError{
1531  				Code: btcjson.ErrRPCInvalidAddressOrKey,
1532  				Message: fmt.Sprintf(
1533  					"Invalid address or key: %v",
1534  					addr,
1535  				),
1536  			}
1537  		}
1538  	}
1539  	return nil
1540  }
1541  
1542  // DescendantBlock returns the appropriate JSON-RPC error if a current block fetched during a reorganize is not a direct
1543  // child of the parent block hash.
1544  func DescendantBlock(
1545  	prevHash *chainhash.Hash, curBlock *block.Block,
1546  ) (e error) {
1547  	curHash := &curBlock.WireBlock().Header.PrevBlock
1548  	if !prevHash.IsEqual(curHash) {
1549  		E.F(
1550  			"stopping rescan for reorged block %v (replaced by block %v)",
1551  			prevHash, curHash,
1552  		)
1553  		return &ErrRescanReorg
1554  	}
1555  	return nil
1556  }
1557  
1558  // DeserializeOutpoints deserializes each serialized outpoint.
1559  func DeserializeOutpoints(serializedOuts []btcjson.OutPoint) (
1560  	[]*wire.OutPoint,
1561  	error,
1562  ) {
1563  	outpoints := make([]*wire.OutPoint, 0, len(serializedOuts))
1564  	for i := range serializedOuts {
1565  		blockHash, e := chainhash.NewHashFromStr(serializedOuts[i].Hash)
1566  		if e != nil {
1567  			return nil, DecodeHexError(serializedOuts[i].Hash)
1568  		}
1569  		index := serializedOuts[i].Index
1570  		outpoints = append(outpoints, wire.NewOutPoint(blockHash, index))
1571  	}
1572  	return outpoints, nil
1573  }
1574  
1575  // HandleLoadTxFilter implements the loadtxfilter command extension for websocket connections. NOTE: This extension is
1576  // ported from github.com/decred/dcrd
1577  func HandleLoadTxFilter(wsc *WSClient, icmd interface{}) (interface{}, error) {
1578  	cmd := icmd.(*btcjson.LoadTxFilterCmd)
1579  	outPoints := make([]wire.OutPoint, len(cmd.OutPoints))
1580  	for i := range cmd.OutPoints {
1581  		hash, e := chainhash.NewHashFromStr(cmd.OutPoints[i].Hash)
1582  		if e != nil {
1583  			return nil, &btcjson.RPCError{
1584  				Code:    btcjson.ErrRPCInvalidParameter,
1585  				Message: e.Error(),
1586  			}
1587  		}
1588  		outPoints[i] = wire.OutPoint{
1589  			Hash:  *hash,
1590  			Index: cmd.OutPoints[i].Index,
1591  		}
1592  	}
1593  	params := wsc.Server.Cfg.ChainParams
1594  	wsc.Lock()
1595  	if cmd.Reload || wsc.FilterData == nil {
1596  		wsc.FilterData = NewWSClientFilter(
1597  			cmd.Addresses, outPoints,
1598  			params,
1599  		)
1600  		wsc.Unlock()
1601  	} else {
1602  		wsc.Unlock()
1603  		wsc.FilterData.mu.Lock()
1604  		for _, a := range cmd.Addresses {
1605  			wsc.FilterData.AddAddressStr(a, params)
1606  		}
1607  		for i := range outPoints {
1608  			wsc.FilterData.AddUnspentOutPoint(&outPoints[i])
1609  		}
1610  		wsc.FilterData.mu.Unlock()
1611  	}
1612  	return nil, nil
1613  }
1614  
1615  // HandleNotifyBlocks implements the notifyblocks command extension for websocket connections.
1616  func HandleNotifyBlocks(wsc *WSClient, icmd interface{}) (interface{}, error) {
1617  	wsc.Server.NtfnMgr.RegisterBlockUpdates(wsc)
1618  	return nil, nil
1619  }
1620  
1621  // HandleNotifyNewTransactions implements the notifynewtransactions command
1622  // extension for websocket connections.
1623  func HandleNotifyNewTransactions(
1624  	wsc *WSClient,
1625  	icmd interface{},
1626  ) (interface{}, error) {
1627  	cmd, ok := icmd.(*btcjson.NotifyNewTransactionsCmd)
1628  	if !ok {
1629  		return nil, btcjson.ErrRPCInternal
1630  	}
1631  	wsc.VerboseTxUpdates = cmd.Verbose != nil && *cmd.Verbose
1632  	wsc.Server.NtfnMgr.RegisterNewMempoolTxsUpdates(wsc)
1633  	return nil, nil
1634  }
1635  
1636  // HandleNotifyReceived implements the notifyreceived command extension for websocket connections.
1637  func HandleNotifyReceived(wsc *WSClient, icmd interface{}) (
1638  	interface{},
1639  	error,
1640  ) {
1641  	cmd, ok := icmd.(*btcjson.NotifyReceivedCmd)
1642  	if !ok {
1643  		return nil, btcjson.ErrRPCInternal
1644  	}
1645  	// Decode addresses to validate input, but the strings slice is used directly if these are all ok.
1646  	e := CheckAddressValidity(cmd.Addresses, wsc.Server.Cfg.ChainParams)
1647  	if e != nil {
1648  		return nil, e
1649  	}
1650  	wsc.Server.NtfnMgr.RegisterTxOutAddressRequests(wsc, cmd.Addresses)
1651  	return nil, nil
1652  }
1653  
1654  // HandleNotifySpent implements the notifyspent command extension for websocket connections.
1655  func HandleNotifySpent(wsc *WSClient, icmd interface{}) (interface{}, error) {
1656  	cmd, ok := icmd.(*btcjson.NotifySpentCmd)
1657  	if !ok {
1658  		return nil, btcjson.ErrRPCInternal
1659  	}
1660  	outpoints, e := DeserializeOutpoints(cmd.OutPoints)
1661  	if e != nil {
1662  		return nil, e
1663  	}
1664  	wsc.Server.NtfnMgr.RegisterSpentRequests(wsc, outpoints)
1665  	return nil, nil
1666  }
1667  
1668  // HandleRescan implements the rescan command extension for websocket connections.
1669  //
1670  // NOTE: This does not smartly handle reorgs, and fixing requires database changes (for safe, concurrent access to full
1671  // block ranges, and support for other chains than the best chain).
1672  //
1673  // It will, however, detect whether a reorg removed a block that was previously processed, and result in the handler
1674  // erroring.
1675  //
1676  // Clients must handle this by finding a block still in the chain (perhaps from a rescanprogress notification) to resume
1677  // their rescan. TODO: simplify, modularise this
1678  func HandleRescan(wsc *WSClient, icmd interface{}) (interface{}, error) {
1679  	cmd, ok := icmd.(*btcjson.RescanCmd)
1680  	if !ok {
1681  		return nil, btcjson.ErrRPCInternal
1682  	}
1683  	outpoints := make([]*wire.OutPoint, 0, len(cmd.OutPoints))
1684  	for i := range cmd.OutPoints {
1685  		cmdOutpoint := &cmd.OutPoints[i]
1686  		blockHash, e := chainhash.NewHashFromStr(cmdOutpoint.Hash)
1687  		if e != nil {
1688  			return nil, DecodeHexError(cmdOutpoint.Hash)
1689  		}
1690  		outpoint := wire.NewOutPoint(blockHash, cmdOutpoint.Index)
1691  		outpoints = append(outpoints, outpoint)
1692  	}
1693  	numAddrs := len(cmd.Addresses)
1694  	if numAddrs == 1 {
1695  		D.Ln("beginning rescan for 1 address")
1696  	} else {
1697  		D.F("beginning rescan for %d addresses", numAddrs)
1698  	}
1699  	// Build lookup maps.
1700  	lookups := RescanKeys{
1701  		Fallbacks:           map[string]struct{}{},
1702  		PubKeyHashes:        map[[ripemd160.Size]byte]struct{}{},
1703  		ScriptHashes:        map[[ripemd160.Size]byte]struct{}{},
1704  		CompressedPubKeys:   map[[33]byte]struct{}{},
1705  		UncompressedPubKeys: map[[65]byte]struct{}{},
1706  		Unspent:             map[wire.OutPoint]struct{}{},
1707  	}
1708  	var compressedPubkey [33]byte
1709  	var uncompressedPubkey [65]byte
1710  	params := wsc.Server.Cfg.ChainParams
1711  	for _, addrStr := range cmd.Addresses {
1712  		addr, e := btcaddr.Decode(addrStr, params)
1713  		if e != nil {
1714  			jsonErr := btcjson.RPCError{
1715  				Code: btcjson.ErrRPCInvalidAddressOrKey,
1716  				Message: "Rescan address " + addrStr + ": " +
1717  					e.Error(),
1718  			}
1719  			return nil, &jsonErr
1720  		}
1721  		switch a := addr.(type) {
1722  		case *btcaddr.PubKeyHash:
1723  			lookups.PubKeyHashes[*a.Hash160()] = struct{}{}
1724  		case *btcaddr.ScriptHash:
1725  			lookups.ScriptHashes[*a.Hash160()] = struct{}{}
1726  		case *btcaddr.PubKey:
1727  			pubkeyBytes := a.ScriptAddress()
1728  			switch len(pubkeyBytes) {
1729  			case 33: // Compressed
1730  				copy(compressedPubkey[:], pubkeyBytes)
1731  				lookups.CompressedPubKeys[compressedPubkey] = struct{}{}
1732  			case 65: // Uncompressed
1733  				copy(uncompressedPubkey[:], pubkeyBytes)
1734  				lookups.UncompressedPubKeys[uncompressedPubkey] = struct{}{}
1735  			default:
1736  				jsonErr := btcjson.RPCError{
1737  					Code:    btcjson.ErrRPCInvalidAddressOrKey,
1738  					Message: "Pubkey " + addrStr + " is of unknown length",
1739  				}
1740  				return nil, &jsonErr
1741  			}
1742  		default:
1743  			// A new address type must have been added. Use encoded payment address string as a fallback until a fast
1744  			// path is added.
1745  			lookups.Fallbacks[addrStr] = struct{}{}
1746  		}
1747  	}
1748  	for _, outpoint := range outpoints {
1749  		lookups.Unspent[*outpoint] = struct{}{}
1750  	}
1751  	chain := wsc.Server.Cfg.Chain
1752  	minBlockHash, e := chainhash.NewHashFromStr(cmd.BeginBlock)
1753  	if e != nil {
1754  		return nil, DecodeHexError(cmd.BeginBlock)
1755  	}
1756  	minBlock, e := chain.BlockHeightByHash(minBlockHash)
1757  	if e != nil {
1758  		return nil, &btcjson.RPCError{
1759  			Code:    btcjson.ErrRPCBlockNotFound,
1760  			Message: "ScriptError getting block: " + e.Error(),
1761  		}
1762  	}
1763  	maxBlock := int32(math.MaxInt32)
1764  	if cmd.EndBlock != nil {
1765  		maxBlockHash, e := chainhash.NewHashFromStr(*cmd.EndBlock)
1766  		if e != nil {
1767  			return nil, DecodeHexError(*cmd.EndBlock)
1768  		}
1769  		maxBlock, e = chain.BlockHeightByHash(maxBlockHash)
1770  		if e != nil {
1771  			return nil, &btcjson.RPCError{
1772  				Code:    btcjson.ErrRPCBlockNotFound,
1773  				Message: "ScriptError getting block: " + e.Error(),
1774  			}
1775  		}
1776  	}
1777  	// lastBlock and lastBlockHash track the previously-rescanned block. They equal nil when no previous blocks have
1778  	// been rescanned.
1779  	var lastBlock *block.Block
1780  	var lastBlockHash *chainhash.Hash
1781  	// A ticker is created to wait at least 10 seconds before notifying the websocket client of the current progress
1782  	// completed by the rescan.
1783  	ticker := time.NewTicker(10 * time.Second)
1784  	defer ticker.Stop()
1785  	// Instead of fetching all block shas at once, fetch in smaller chunks to ensure large rescans consume a limited
1786  	// amount of memory.
1787  fetchRange:
1788  	for minBlock < maxBlock {
1789  		// Limit the max number of hashes to fetch at once to the maximum number of items allowed in a single inventory.
1790  		// This value could be higher since it's not creating inventory messages, but this mirrors the limiting logic
1791  		// used in the peer-to-peer protocol.
1792  		maxLoopBlock := maxBlock
1793  		if maxLoopBlock-minBlock > wire.MaxInvPerMsg {
1794  			maxLoopBlock = minBlock + wire.MaxInvPerMsg
1795  		}
1796  		hashList, e := chain.HeightRange(minBlock, maxLoopBlock)
1797  		if e != nil {
1798  			E.Ln("error looking up block range:", e)
1799  			return nil, &btcjson.RPCError{
1800  				Code:    btcjson.ErrRPCDatabase,
1801  				Message: "Database error: " + e.Error(),
1802  			}
1803  		}
1804  		if len(hashList) == 0 {
1805  			// The rescan is finished if no blocks hashes for this range were successfully fetched and a stop block was
1806  			// provided.
1807  			if maxBlock != math.MaxInt32 {
1808  				break
1809  			}
1810  			// If the rescan is through the current block, set up the client to continue to receive notifications
1811  			// regarding all rescanned addresses and the current set of unspent outputs.
1812  			//
1813  			// This is done safely by temporarily grabbing exclusive access of the block manager. If no more blocks have
1814  			// been attached between this pause and the fetch above, then it is safe to register the websocket client
1815  			// for continuous notifications if necessary.
1816  			//
1817  			// Otherwise, continue the fetch loop again to rescan the new blocks (or error due to an irrecoverable
1818  			// reorganize).
1819  			pauseGuard := wsc.Server.Cfg.SyncMgr.Pause()
1820  			best := wsc.Server.Cfg.Chain.BestSnapshot()
1821  			curHash := &best.Hash
1822  			again := true
1823  			if lastBlockHash == nil || *lastBlockHash == *curHash {
1824  				again = false
1825  				n := wsc.Server.NtfnMgr
1826  				n.RegisterSpentRequests(wsc, lookups.UnspentSlice())
1827  				n.RegisterTxOutAddressRequests(wsc, cmd.Addresses)
1828  			}
1829  			close(pauseGuard)
1830  			// this err value is nil if it got to here from above at 1577
1831  			// if e != nil  {
1832  			//  Errorf(
1833  			// 		"ScriptError fetching best block hash:", e,
1834  			// 	}
1835  			// 	return nil, &json.RPCError{
1836  			// 		Code:    json.ErrRPCDatabase,
1837  			// 		Message: "Database error: " + err.ScriptError(),
1838  			// 	}
1839  			// }
1840  			if again {
1841  				continue
1842  			}
1843  			break
1844  		}
1845  	loopHashList:
1846  		for i := range hashList {
1847  			var blk *block.Block
1848  			blk, e = chain.BlockByHash(&hashList[i])
1849  			if e != nil {
1850  				// Only handle reorgs if a block could not be found for the hash.
1851  				if dbErr, ok := e.(database.DBError); !ok ||
1852  					dbErr.ErrorCode != database.ErrBlockNotFound {
1853  					E.Ln("error looking up block:", e)
1854  					return nil, &btcjson.RPCError{
1855  						Code: btcjson.ErrRPCDatabase,
1856  						Message: "Database error: " +
1857  							e.Error(),
1858  					}
1859  				}
1860  				// If an absolute max block was specified, don't attempt to handle the reorg.
1861  				if maxBlock != math.MaxInt32 {
1862  					E.Ln("stopping rescan for reorged block", cmd.EndBlock)
1863  					return nil, &ErrRescanReorg
1864  				}
1865  				// If the lookup for the previously valid block hash failed, there may have been a reorg. Fetch a new
1866  				// range of block hashes and verify that the previously processed block (if there was any) still exists
1867  				// in the database. If it doesn't, we error.
1868  				//
1869  				// A goto is used to branch execution back to before the range was evaluated, as it must be
1870  				// reevaluated for the new hashList.
1871  				minBlock += int32(i)
1872  				hashList, e = RecoverFromReorg(
1873  					chain,
1874  					minBlock, maxBlock, lastBlockHash,
1875  				)
1876  				if e != nil {
1877  					return nil, e
1878  				}
1879  				if len(hashList) == 0 {
1880  					break fetchRange
1881  				}
1882  				goto loopHashList
1883  			}
1884  			if i == 0 && lastBlockHash != nil {
1885  				// Ensure the new hashList is on the same fork as the last block from the old hashList.
1886  				jsonErr := DescendantBlock(lastBlockHash, blk)
1887  				if jsonErr != nil {
1888  					return nil, jsonErr
1889  				}
1890  			}
1891  			// A select statement is used to stop rescans if the client requesting the rescan has disconnected.
1892  			select {
1893  			case <-wsc.Quit.Wait():
1894  				D.F("stopped rescan at height %v for disconnected client", blk.Height())
1895  				return nil, nil
1896  			default:
1897  				RescanBlock(wsc, &lookups, blk)
1898  				lastBlock = blk
1899  				lastBlockHash = blk.Hash()
1900  			}
1901  			// Periodically notify the client of the progress completed. Continue with next block if no progress
1902  			// notification is needed yet.
1903  			select {
1904  			case <-ticker.C: // fallthrough
1905  			default:
1906  				continue
1907  			}
1908  			n := btcjson.NewRescanProgressNtfn(
1909  				hashList[i].String(),
1910  				blk.Height(), blk.WireBlock().Header.Timestamp.Unix(),
1911  			)
1912  			mn, e := btcjson.MarshalCmd(nil, n)
1913  			if e != nil {
1914  				E.F(
1915  					"failed to marshal rescan progress notification: %v",
1916  					e,
1917  				)
1918  				continue
1919  			}
1920  			if e = wsc.QueueNotification(mn); e == ErrClientQuit {
1921  				// Finished if the client disconnected.
1922  				D.F(
1923  					"stopped rescan at height %v for disconnected client",
1924  					blk.Height(),
1925  				)
1926  				return nil, nil
1927  			}
1928  		}
1929  		minBlock += int32(len(hashList))
1930  	}
1931  	// Notify websocket client of the finished rescan. Due to how pod asynchronously queues notifications to not block
1932  	// calling code, there is no guarantee that any of the notifications created during rescan (such as rescanprogress,
1933  	// recvtx and redeemingtx) will be received before the rescan RPC returns.
1934  	//
1935  	// Therefore, another method is needed to safely inform clients that all rescan notifications have been sent.
1936  	n := btcjson.NewRescanFinishedNtfn(
1937  		lastBlockHash.String(),
1938  		lastBlock.Height(),
1939  		lastBlock.WireBlock().Header.Timestamp.Unix(),
1940  	)
1941  	if mn, e := btcjson.MarshalCmd(nil, n); E.Chk(e) {
1942  		E.F(
1943  			"failed to marshal rescan finished notification: %v", e,
1944  		)
1945  	} else {
1946  		// The rescan is finished, so we don't care whether the client has disconnected at this point, so discard error.
1947  		_ = wsc.QueueNotification(mn)
1948  	}
1949  	I.Ln("finished rescan")
1950  	return nil, nil
1951  }
1952  
1953  // HandleRescanBlocks implements the rescanblocks command extension for websocket connections.
1954  //
1955  // NOTE: This extension is ported from github.com/decred/dcrd
1956  func HandleRescanBlocks(wsc *WSClient, icmd interface{}) (interface{}, error) {
1957  	cmd, ok := icmd.(*btcjson.RescanBlocksCmd)
1958  	if !ok {
1959  		return nil, btcjson.ErrRPCInternal
1960  	}
1961  	// Load client's transaction filter.  Must exist in order to continue.
1962  	wsc.Lock()
1963  	filter := wsc.FilterData
1964  	wsc.Unlock()
1965  	if filter == nil {
1966  		return nil, &btcjson.RPCError{
1967  			Code:    btcjson.ErrRPCMisc,
1968  			Message: "Transaction filter must be loaded before rescanning",
1969  		}
1970  	}
1971  	blockHashes := make([]*chainhash.Hash, len(cmd.BlockHashes))
1972  	for i := range cmd.BlockHashes {
1973  		hash, e := chainhash.NewHashFromStr(cmd.BlockHashes[i])
1974  		if e != nil {
1975  			return nil, e
1976  		}
1977  		blockHashes[i] = hash
1978  	}
1979  	discoveredData := make([]btcjson.RescannedBlock, 0, len(blockHashes))
1980  	// Iterate over each block in the request and rescan. When a block contains relevant transactions, add it to the
1981  	// response.
1982  	bc := wsc.Server.Cfg.Chain
1983  	params := wsc.Server.Cfg.ChainParams
1984  	var lastBlockHash *chainhash.Hash
1985  	for i := range blockHashes {
1986  		block, e := bc.BlockByHash(blockHashes[i])
1987  		if e != nil {
1988  			return nil, &btcjson.RPCError{
1989  				Code:    btcjson.ErrRPCBlockNotFound,
1990  				Message: "Failed to fetch block: " + e.Error(),
1991  			}
1992  		}
1993  		if lastBlockHash != nil && block.WireBlock().Header.
1994  			PrevBlock != *lastBlockHash {
1995  			return nil, &btcjson.RPCError{
1996  				Code: btcjson.ErrRPCInvalidParameter,
1997  				Message: fmt.Sprintf(
1998  					"Block %v is not a child of %v",
1999  					blockHashes[i], lastBlockHash,
2000  				),
2001  			}
2002  		}
2003  		lastBlockHash = blockHashes[i]
2004  		transactions := RescanBlockFilter(filter, block, params)
2005  		if len(transactions) != 0 {
2006  			discoveredData = append(
2007  				discoveredData, btcjson.RescannedBlock{
2008  					Hash:         cmd.BlockHashes[i],
2009  					Transactions: transactions,
2010  				},
2011  			)
2012  		}
2013  	}
2014  	return &discoveredData, nil
2015  }
2016  
2017  // HandleSession implements the session command extension for websocket connections.
2018  func HandleSession(wsc *WSClient, icmd interface{}) (interface{}, error) {
2019  	return &btcjson.SessionResult{SessionID: wsc.SessionID}, nil
2020  }
2021  
2022  // HandleStopNotifyBlocks implements the stopnotifyblocks command extension for websocket connections.
2023  func HandleStopNotifyBlocks(wsc *WSClient, icmd interface{}) (
2024  	interface{},
2025  	error,
2026  ) {
2027  	wsc.Server.NtfnMgr.UnregisterBlockUpdates(wsc)
2028  	return nil, nil
2029  }
2030  
2031  // HandleStopNotifyNewTransactions implements the stopnotifynewtransactions command extension for websocket connections.
2032  func HandleStopNotifyNewTransactions(
2033  	wsc *WSClient, icmd interface{},
2034  ) (interface{}, error) {
2035  	wsc.Server.NtfnMgr.UnregisterNewMempoolTxsUpdates(wsc)
2036  	return nil, nil
2037  }
2038  
2039  // HandleStopNotifyReceived implements the stopnotifyreceived command extension for websocket connections.
2040  func HandleStopNotifyReceived(wsc *WSClient, icmd interface{}) (
2041  	interface{},
2042  	error,
2043  ) {
2044  	cmd, ok := icmd.(*btcjson.StopNotifyReceivedCmd)
2045  	if !ok {
2046  		return nil, btcjson.ErrRPCInternal
2047  	}
2048  	// Decode addresses to validate input, but the strings slice is used directly if these are all ok.
2049  	e := CheckAddressValidity(cmd.Addresses, wsc.Server.Cfg.ChainParams)
2050  	if e != nil {
2051  		return nil, e
2052  	}
2053  	for _, addr := range cmd.Addresses {
2054  		wsc.Server.NtfnMgr.UnregisterTxOutAddressRequest(wsc, addr)
2055  	}
2056  	return nil, nil
2057  }
2058  
2059  // HandleStopNotifySpent implements the stopnotifyspent command extension for websocket connections.
2060  func HandleStopNotifySpent(wsc *WSClient, icmd interface{}) (
2061  	interface{}, error,
2062  ) {
2063  	cmd, ok := icmd.(*btcjson.StopNotifySpentCmd)
2064  	if !ok {
2065  		return nil, btcjson.ErrRPCInternal
2066  	}
2067  	outpoints, e := DeserializeOutpoints(cmd.OutPoints)
2068  	if e != nil {
2069  		return nil, e
2070  	}
2071  	for _, outpoint := range outpoints {
2072  		wsc.Server.NtfnMgr.UnregisterSpentRequest(wsc, outpoint)
2073  	}
2074  	return nil, nil
2075  }
2076  
2077  // HandleWebsocketHelp implements the help command for websocket connections.
2078  func HandleWebsocketHelp(wsc *WSClient, icmd interface{}) (interface{}, error) {
2079  	cmd, ok := icmd.(*btcjson.HelpCmd)
2080  	if !ok {
2081  		return nil, btcjson.ErrRPCInternal
2082  	}
2083  	// Provide a usage overview of all commands when no specific command was specified.
2084  	var command string
2085  	if cmd.Command != nil {
2086  		command = *cmd.Command
2087  	}
2088  	if command == "" {
2089  		usage, e := wsc.Server.HelpCacher.RPCUsage(true)
2090  		if e != nil {
2091  			context := "Failed to generate RPC usage"
2092  			return nil, InternalRPCError(e.Error(), context)
2093  		}
2094  		return usage, nil
2095  	}
2096  	// Chk that the command asked for is supported and implemented. Search the list of websocket handlers as well as
2097  	// the main list of handlers since help should only be provided for those cases.
2098  	valid := true
2099  	if _, ok := RPCHandlers[command]; !ok {
2100  		if _, ok := WSHandlers[command]; !ok {
2101  			valid = false
2102  		}
2103  	}
2104  	if !valid {
2105  		return nil, &btcjson.RPCError{
2106  			Code:    btcjson.ErrRPCInvalidParameter,
2107  			Message: "Unknown command: " + command,
2108  		}
2109  	}
2110  	// Get the help for the command.
2111  	help, e := wsc.Server.HelpCacher.RPCMethodHelp(command)
2112  	if e != nil {
2113  		context := "Failed to generate help"
2114  		return nil, InternalRPCError(e.Error(), context)
2115  	}
2116  	return help, nil
2117  }
2118  
2119  func init() {
2120  
2121  	WSHandlers = WSHandlersBeforeInit
2122  }
2123  func MakeSemaphore(n int) Semaphore {
2124  	return Semaphore(qu.Ts(n))
2125  }
2126  
2127  // NewRedeemingTxNotification returns a new marshalled redeemingtx notification with the passed parameters.
2128  func NewRedeemingTxNotification(
2129  	txHex string, index int,
2130  	block *block.Block,
2131  ) ([]byte, error) {
2132  	// Create and marshal the notification.
2133  	ntfn := btcjson.NewRedeemingTxNtfn(txHex, BlockDetails(block, index))
2134  	return btcjson.MarshalCmd(nil, ntfn)
2135  }
2136  
2137  // NewWSClientFilter creates a new, empty wsClientFilter struct to be used for a websocket client.
2138  //
2139  // NOTE: This extension was ported from github.com/decred/ dcrd
2140  func NewWSClientFilter(
2141  	addresses []string, unspentOutPoints []wire.OutPoint,
2142  	params *chaincfg.Params,
2143  ) *WSClientFilter {
2144  	filter := &WSClientFilter{
2145  		PubKeyHashes:        map[[ripemd160.Size]byte]struct{}{},
2146  		ScriptHashes:        map[[ripemd160.Size]byte]struct{}{},
2147  		CompressedPubKeys:   map[[33]byte]struct{}{},
2148  		UncompressedPubKeys: map[[65]byte]struct{}{},
2149  		OtherAddresses:      map[string]struct{}{},
2150  		Unspent: make(
2151  			map[wire.OutPoint]struct{},
2152  			len(unspentOutPoints),
2153  		),
2154  	}
2155  	for _, s := range addresses {
2156  		filter.AddAddressStr(s, params)
2157  	}
2158  	for i := range unspentOutPoints {
2159  		filter.AddUnspentOutPoint(&unspentOutPoints[i])
2160  	}
2161  	return filter
2162  }
2163  
2164  // NewWebsocketClient returns a new websocket client given the notification manager, websocket connection, remote
2165  // address, and whether or not the client has already been authenticated (via HTTP Basic access authentication). The
2166  // returned client is ready to start.
2167  //
2168  // Once started, the client will process incoming and outgoing messages in separate goroutines complete with queuing and
2169  // asynchrous handling for long-running operations.
2170  func NewWebsocketClient(
2171  	server *Server, conn *websocket.Conn,
2172  	remoteAddr string, authenticated bool, isAdmin bool,
2173  ) (*WSClient, error) {
2174  	sessionID, e := wire.RandomUint64()
2175  	if e != nil {
2176  		return nil, e
2177  	}
2178  	client := &WSClient{
2179  		Conn:              conn,
2180  		Addr:              remoteAddr,
2181  		Authenticated:     authenticated,
2182  		IsAdmin:           isAdmin,
2183  		SessionID:         sessionID,
2184  		Server:            server,
2185  		AddrRequests:      make(map[string]struct{}),
2186  		SpentRequests:     make(map[wire.OutPoint]struct{}),
2187  		ServiceRequestSem: MakeSemaphore(server.Config.RPCMaxConcurrentReqs.V()),
2188  		NtfnChan:          make(chan []byte, 1), // nonblocking sync
2189  		SendChan:          make(chan WSResponse, WebsocketSendBufferSize),
2190  		Quit:              qu.T(),
2191  	}
2192  	return client, nil
2193  }
2194  
2195  // NewWSNotificationManager returns a new notification manager ready for use. See wsNotificationManager for more
2196  // details.
2197  func NewWSNotificationManager(server *Server) *WSNtfnMgr {
2198  	return &WSNtfnMgr{
2199  		Server:            server,
2200  		QueueNotification: make(chan interface{}),
2201  		NotificationMsgs:  make(chan interface{}),
2202  		NumClients:        make(chan int),
2203  		Quit:              qu.T(),
2204  	}
2205  }
2206  
2207  // QueueHandler manages a queue of empty interfaces, reading from in and sending the oldest unsent to out.
2208  //
2209  // This handler stops when either of the in or quit channels are closed, and closes out before returning, without
2210  // waiting to send any variables still remaining in the queue.
2211  func QueueHandler(in <-chan interface{}, out chan<- interface{}, quit qu.C) {
2212  	var q []interface{}
2213  	var dequeue chan<- interface{}
2214  	skipQueue := out
2215  	var next interface{}
2216  out:
2217  	for {
2218  		select {
2219  		case n, ok := <-in:
2220  			if !ok {
2221  				// Sender closed input channel.
2222  				break out
2223  			}
2224  			// Either send to out immediately if skipQueue is non-nil (queue is empty) and reader is ready, or append to
2225  			// the queue and send later.
2226  			select {
2227  			case skipQueue <- n:
2228  			default:
2229  				q = append(q, n)
2230  				dequeue = out
2231  				skipQueue = nil
2232  				next = q[0]
2233  			}
2234  		case dequeue <- next:
2235  			copy(q, q[1:])
2236  			q[len(q)-1] = nil // avoid leak
2237  			q = q[:len(q)-1]
2238  			if len(q) == 0 {
2239  				dequeue = nil
2240  				skipQueue = out
2241  			} else {
2242  				next = q[0]
2243  			}
2244  		case <-quit.Wait():
2245  			break out
2246  		}
2247  	}
2248  	close(out)
2249  }
2250  
2251  // RecoverFromReorg attempts to recover from a detected reorganize during a rescan.
2252  //
2253  // It fetches a new range of block shas from the database and verifies that the new range of blocks is on the same fork
2254  // as a previous range of blocks.
2255  //
2256  // If this condition does not hold true, the JSON-RPC error for an unrecoverable reorganize is returned.
2257  func RecoverFromReorg(
2258  	chain *blockchain.BlockChain, minBlock, maxBlock int32,
2259  	lastBlock *chainhash.Hash,
2260  ) ([]chainhash.Hash, error) {
2261  	hashList, e := chain.HeightRange(minBlock, maxBlock)
2262  	if e != nil {
2263  		E.Ln("error looking up block range:", e)
2264  		return nil, &btcjson.RPCError{
2265  			Code:    btcjson.ErrRPCDatabase,
2266  			Message: "Database error: " + e.Error(),
2267  		}
2268  	}
2269  	if lastBlock == nil || len(hashList) == 0 {
2270  		return hashList, nil
2271  	}
2272  	blk, e := chain.BlockByHash(&hashList[0])
2273  	if e != nil {
2274  		E.Ln("error looking up possibly reorged block:", e)
2275  		return nil, &btcjson.RPCError{
2276  			Code:    btcjson.ErrRPCDatabase,
2277  			Message: "Database error: " + e.Error(),
2278  		}
2279  	}
2280  	jsonErr := DescendantBlock(lastBlock, blk)
2281  	if jsonErr != nil {
2282  		return nil, jsonErr
2283  	}
2284  	return hashList, nil
2285  }
2286  
2287  // RescanBlock rescans all transactions in a single block. This is a helper function for handleRescan.
2288  func RescanBlock(wsc *WSClient, lookups *RescanKeys, blk *block.Block) {
2289  	for _, tx := range blk.Transactions() {
2290  		// Hexadecimal representation of this tx. Only created if needed, and reused for later notifications if already
2291  		// made.
2292  		var txHex string
2293  		// All inputs and outputs must be iterated through to correctly modify the unspent map, however, just a single
2294  		// notification for any matching transaction inputs or outputs should be created and sent.
2295  		spentNotified := false
2296  		recvNotified := false
2297  		for _, txin := range tx.MsgTx().TxIn {
2298  			if _, ok := lookups.Unspent[txin.PreviousOutPoint]; ok {
2299  				delete(lookups.Unspent, txin.PreviousOutPoint)
2300  				if spentNotified {
2301  					continue
2302  				}
2303  				if txHex == "" {
2304  					txHex = TxHexString(tx.MsgTx())
2305  				}
2306  				marshalledJSON, e := NewRedeemingTxNotification(
2307  					txHex,
2308  					tx.Index(), blk,
2309  				)
2310  				if e != nil {
2311  					E.Ln(
2312  						"failed to marshal redeemingtx notification:",
2313  						e,
2314  					)
2315  					continue
2316  				}
2317  				e = wsc.QueueNotification(marshalledJSON)
2318  				// Stop the rescan early if the websocket client disconnected.
2319  				if e == ErrClientQuit {
2320  					return
2321  				}
2322  				spentNotified = true
2323  			}
2324  		}
2325  		for txOutIdx, txout := range tx.MsgTx().TxOut {
2326  			_, addrs, _, _ := txscript.ExtractPkScriptAddrs(
2327  				txout.PkScript, wsc.Server.Cfg.ChainParams,
2328  			)
2329  			for _, addr := range addrs {
2330  				switch a := addr.(type) {
2331  				case *btcaddr.PubKeyHash:
2332  					if _, ok := lookups.PubKeyHashes[*a.Hash160()]; !ok {
2333  						continue
2334  					}
2335  				case *btcaddr.ScriptHash:
2336  					if _, ok := lookups.ScriptHashes[*a.Hash160()]; !ok {
2337  						continue
2338  					}
2339  				case *btcaddr.PubKey:
2340  					found := false
2341  					switch sa := a.ScriptAddress(); len(sa) {
2342  					case 33: // Compressed
2343  						var key [33]byte
2344  						copy(key[:], sa)
2345  						if _, ok := lookups.CompressedPubKeys[key]; ok {
2346  							found = true
2347  						}
2348  					case 65: // Uncompressed
2349  						var key [65]byte
2350  						copy(key[:], sa)
2351  						if _, ok := lookups.UncompressedPubKeys[key]; ok {
2352  							found = true
2353  						}
2354  					default:
2355  						W.F("skipping rescanned pubkey of unknown serialized length", len(sa))
2356  						continue
2357  					}
2358  					// If the transaction output pays to the pubkey of a rescanned P2PKH address, include it as well.
2359  					if !found {
2360  						pkh := a.PubKeyHash()
2361  						if _, ok := lookups.PubKeyHashes[*pkh.Hash160()]; !ok {
2362  							continue
2363  						}
2364  					}
2365  				default:
2366  					// A new address type must have been added. Encode as a payment address string and check the
2367  					// fallback map.
2368  					addrStr := addr.EncodeAddress()
2369  					_, ok := lookups.Fallbacks[addrStr]
2370  					if !ok {
2371  						continue
2372  					}
2373  				}
2374  				outpoint := wire.OutPoint{
2375  					Hash:  *tx.Hash(),
2376  					Index: uint32(txOutIdx),
2377  				}
2378  				lookups.Unspent[outpoint] = struct{}{}
2379  				if recvNotified {
2380  					continue
2381  				}
2382  				if txHex == "" {
2383  					txHex = TxHexString(tx.MsgTx())
2384  				}
2385  				ntfn := btcjson.NewRecvTxNtfn(
2386  					txHex,
2387  					BlockDetails(blk, tx.Index()),
2388  				)
2389  				marshalledJSON, e := btcjson.MarshalCmd(nil, ntfn)
2390  				if e != nil {
2391  					E.Ln("failed to marshal recvtx notification:", e)
2392  					return
2393  				}
2394  				e = wsc.QueueNotification(marshalledJSON)
2395  				// Stop the rescan early if the websocket client disconnected.
2396  				if e == ErrClientQuit {
2397  					return
2398  				}
2399  				recvNotified = true
2400  			}
2401  		}
2402  	}
2403  }
2404  
2405  // RescanBlockFilter rescans a block for any relevant transactions for the passed lookup keys. Any discovered
2406  // transactions are returned hex encoded as a string slice.
2407  //
2408  // NOTE: This extension is ported from github.com/decred/dcrd
2409  func RescanBlockFilter(
2410  	filter *WSClientFilter, block *block.Block,
2411  	params *chaincfg.Params,
2412  ) []string {
2413  	var transactions []string
2414  	filter.mu.Lock()
2415  	for _, tx := range block.Transactions() {
2416  		msgTx := tx.MsgTx()
2417  		// Keep track of whether the transaction has already been added to the result. It shouldn't be added twice.
2418  		added := false
2419  		// Scan inputs if not a coinbase transaction.
2420  		if !blockchain.IsCoinBaseTx(msgTx) {
2421  			for _, input := range msgTx.TxIn {
2422  				if !filter.ExistsUnspentOutPoint(&input.PreviousOutPoint) {
2423  					continue
2424  				}
2425  				if !added {
2426  					transactions = append(
2427  						transactions,
2428  						TxHexString(msgTx),
2429  					)
2430  					added = true
2431  				}
2432  			}
2433  		}
2434  		var e error
2435  		// Scan outputs.
2436  		for i, output := range msgTx.TxOut {
2437  			var addrs []btcaddr.Address
2438  			_, addrs, _, e = txscript.ExtractPkScriptAddrs(
2439  				output.PkScript, params,
2440  			)
2441  			if e != nil {
2442  				continue
2443  			}
2444  			for _, a := range addrs {
2445  				if !filter.ExistsAddress(a) {
2446  					continue
2447  				}
2448  				op := wire.OutPoint{
2449  					Hash:  *tx.Hash(),
2450  					Index: uint32(i),
2451  				}
2452  				filter.AddUnspentOutPoint(&op)
2453  				if !added {
2454  					transactions = append(
2455  						transactions,
2456  						TxHexString(msgTx),
2457  					)
2458  					added = true
2459  				}
2460  			}
2461  		}
2462  	}
2463  	filter.mu.Unlock()
2464  	return transactions
2465  }
2466  
2467  // TxHexString returns the serialized transaction encoded in hexadecimal.
2468  func TxHexString(tx *wire.MsgTx) string {
2469  	buf := bytes.NewBuffer(make([]byte, 0, tx.SerializeSize()))
2470  	// Ignore Serialize's error, as writing to a bytes.buffer cannot fail.
2471  	e := tx.Serialize(buf)
2472  	if e != nil {
2473  	}
2474  	return hex.EncodeToString(buf.Bytes())
2475  }
2476