server.go raw

   1  package chainrpc
   2  
   3  import (
   4  	"bytes"
   5  	"crypto/rand"
   6  	"crypto/tls"
   7  	"encoding/binary"
   8  	"errors"
   9  	"fmt"
  10  	"math"
  11  	"net"
  12  	"os"
  13  	"os/exec"
  14  	"runtime"
  15  	"sort"
  16  	"strconv"
  17  	"strings"
  18  	"sync"
  19  	"sync/atomic"
  20  	"time"
  21  
  22  	"github.com/p9c/p9/pkg/qu"
  23  
  24  	"github.com/p9c/p9/pkg/log"
  25  	"github.com/p9c/p9/pkg/amt"
  26  	block2 "github.com/p9c/p9/pkg/block"
  27  	"github.com/p9c/p9/pkg/chainrpc/peersummary"
  28  	"github.com/p9c/p9/pkg/fork"
  29  	"github.com/p9c/p9/pkg/interrupt"
  30  	"github.com/p9c/p9/pkg/mining"
  31  	"github.com/p9c/p9/pod/config"
  32  
  33  	uberatomic "go.uber.org/atomic"
  34  
  35  	"github.com/p9c/p9/cmd/node/active"
  36  	"github.com/p9c/p9/pkg/addrmgr"
  37  	"github.com/p9c/p9/pkg/blockchain"
  38  	"github.com/p9c/p9/pkg/bloom"
  39  	"github.com/p9c/p9/pkg/chaincfg"
  40  	"github.com/p9c/p9/pkg/chainhash"
  41  	"github.com/p9c/p9/pkg/connmgr"
  42  	"github.com/p9c/p9/pkg/database"
  43  	"github.com/p9c/p9/pkg/indexers"
  44  	"github.com/p9c/p9/pkg/mempool"
  45  	"github.com/p9c/p9/pkg/netsync"
  46  	"github.com/p9c/p9/pkg/peer"
  47  	"github.com/p9c/p9/pkg/txscript"
  48  	"github.com/p9c/p9/pkg/upnp"
  49  	"github.com/p9c/p9/pkg/util"
  50  	"github.com/p9c/p9/pkg/wire"
  51  	"github.com/p9c/p9/version"
  52  )
  53  
  54  const DefaultMaxOrphanTxSize = 100000
  55  
  56  type (
  57  	// BroadcastInventoryAdd is a type used to declare that the InvVect it contains needs to be added to the rebroadcast
  58  	// map
  59  	BroadcastInventoryAdd RelayMsg
  60  	// BroadcastInventoryDel is a type used to declare that the InvVect it contains needs to be removed from the
  61  	// rebroadcast map
  62  	BroadcastInventoryDel *wire.InvVect
  63  	// BroadcastMsg provides the ability to house a bitcoin message to be broadcast to all connected peers except
  64  	// specified excluded peers.
  65  	BroadcastMsg struct {
  66  		Message      wire.Message
  67  		ExcludePeers []*NodePeer
  68  	}
  69  	// CFHeaderKV is a tuple of a filter header and its associated block hash. The struct is used to cache cfcheckpt
  70  	// responses.
  71  	CFHeaderKV struct {
  72  		BlockHash    chainhash.Hash
  73  		FilterHeader chainhash.Hash
  74  	}
  75  	// CheckpointSorter implements sort.Interface to allow a slice of checkpoints to be sorted.
  76  	CheckpointSorter []chaincfg.Checkpoint
  77  	ConnectNodeMsg   struct {
  78  		Addr      string
  79  		Permanent bool
  80  		Reply     chan error
  81  	}
  82  	DisconnectNodeMsg struct {
  83  		Cmp   func(*NodePeer) bool
  84  		Reply chan error
  85  	}
  86  	GetAddedNodesMsg struct {
  87  		Reply chan []*NodePeer
  88  	}
  89  	GetConnCountMsg struct {
  90  		Reply chan int32
  91  	}
  92  	GetOutboundGroup struct {
  93  		Key   string
  94  		Reply chan int
  95  	}
  96  	GetPeersMsg struct {
  97  		Reply chan []*NodePeer
  98  	}
  99  	// OnionAddr implements the net.Addr interface and represents a tor address.
 100  	OnionAddr struct {
 101  		Addr string
 102  	}
 103  	// PeerState maintains state of inbound, persistent, outbound peers as well as banned peers and outbound groups.
 104  	PeerState struct {
 105  		InboundPeers    map[int32]*NodePeer
 106  		OutboundPeers   map[int32]*NodePeer
 107  		PersistentPeers map[int32]*NodePeer
 108  		Banned          map[string]time.Time
 109  		OutboundGroups  map[string]int
 110  	}
 111  	// RelayMsg packages an inventory vector along with the newly discovered inventory so the relay has access to that
 112  	// information.
 113  	RelayMsg struct {
 114  		InvVect *wire.InvVect
 115  		Data    interface{}
 116  	}
 117  	RemoveNodeMsg struct {
 118  		Cmp   func(*NodePeer) bool
 119  		Reply chan error
 120  	}
 121  	// Node provides a bitcoin Node for handling communications to and from bitcoin peers.
 122  	Node struct {
 123  		// The following variables must only be used atomically. Putting the uint64s first makes them 64-bit aligned for
 124  		// 32-bit systems.
 125  		BytesReceived        uint64 // Total bytes received from all peers since start.
 126  		BytesSent            uint64 // Total bytes sent by all peers since start.
 127  		StartupTime          int64
 128  		ChainParams          *chaincfg.Params
 129  		AddrManager          *addrmgr.AddrManager
 130  		ConnManager          *connmgr.ConnManager
 131  		SigCache             *txscript.SigCache
 132  		HashCache            *txscript.HashCache
 133  		RPCServers           []*Server
 134  		SyncManager          *netsync.SyncManager
 135  		Chain                *blockchain.BlockChain
 136  		TxMemPool            *mempool.TxPool
 137  		CPUMiner             *exec.Cmd
 138  		ModifyRebroadcastInv chan interface{}
 139  		NewPeers             chan *NodePeer
 140  		DonePeers            chan *NodePeer
 141  		BanPeers             chan *NodePeer
 142  		PeerState            chan chan peersummary.PeerSummaries
 143  		Query                chan interface{}
 144  		RelayInv             chan RelayMsg
 145  		Broadcast            chan BroadcastMsg
 146  		PeerHeightsUpdate    chan UpdatePeerHeightsMsg
 147  		WG                   sync.WaitGroup
 148  		Quit                 qu.C
 149  		NAT                  upnp.NAT
 150  		DB                   database.DB
 151  		TimeSource           blockchain.MedianTimeSource
 152  		Services             wire.ServiceFlag
 153  		// The following fields are used for optional indexes. They will be nil if the associated index is not enabled.
 154  		//
 155  		// These fields are set during initial creation of the server and never changed afterwards, so they do not need
 156  		// to be protected for concurrent access.
 157  		TxIndex   *indexers.TxIndex
 158  		AddrIndex *indexers.AddrIndex
 159  		CFIndex   *indexers.CFIndex
 160  		// The fee estimator keeps track of how long transactions are left in the mempool before they are mined into
 161  		// blocks.
 162  		FeeEstimator *mempool.FeeEstimator
 163  		// CFCheckptCaches stores a cached slice of filter headers for cfcheckpt messages for each filter type.
 164  		CFCheckptCaches                 map[wire.FilterType][]CFHeaderKV
 165  		CFCheckptCachesMtx              sync.RWMutex
 166  		Config                          *config.Config
 167  		ActiveNet                       *chaincfg.Params
 168  		StateCfg                        *active.Config
 169  		GenThreads                      uint32
 170  		Started                         int32
 171  		Shutdown                        int32
 172  		ShutdownSched                   int32
 173  		HighestKnown                    uberatomic.Int32
 174  		peerState                       *PeerState
 175  		StartController, StopController qu.C
 176  	}
 177  	// NodePeer extends the peer to maintain state shared by the server and the blockmanager.
 178  	NodePeer struct {
 179  		*peer.Peer
 180  		// The following variables must only be used atomically
 181  		FeeFilter      int64
 182  		ConnReq        *connmgr.ConnReq
 183  		Server         *Node
 184  		ContinueHash   *chainhash.Hash
 185  		RelayMtx       sync.Mutex
 186  		Filter         *bloom.Filter
 187  		KnownAddresses map[string]struct{}
 188  		BanScore       connmgr.DynamicBanScore
 189  		Quit           qu.C
 190  		// The following chans are used to sync blockmanager and server.
 191  		TxProcessed    qu.C
 192  		BlockProcessed qu.C
 193  		SentAddrs      bool
 194  		IsWhitelisted  bool
 195  		Persistent     bool
 196  		DisableRelayTx bool
 197  		IP             net.IP
 198  		Port           uint16
 199  	}
 200  	// SimpleAddr implements the net.Addr interface with two struct fields
 201  	SimpleAddr struct {
 202  		Net, Addr string
 203  	}
 204  	// UpdatePeerHeightsMsg is a message sent from the blockmanager to the server after a new block has been accepted.
 205  	// The purpose of the message is to update the heights of peers that were known to announce the block before we
 206  	// connected it to the main chain or recognized it as an orphan.
 207  	//
 208  	// With these updates, peer heights will be kept up to date, allowing for fresh data when selecting sync peer
 209  	// candidacy.
 210  	UpdatePeerHeightsMsg struct {
 211  		NewHash    *chainhash.Hash
 212  		NewHeight  int32
 213  		OriginPeer *peer.Peer
 214  	}
 215  )
 216  
 217  const (
 218  	// DefaultServices describes the default services that are supported by the server.
 219  	DefaultServices = wire.SFNodeNetwork | wire.SFNodeBloom |
 220  		/*wire.SFNodeWitness |*/ wire.SFNodeCF
 221  	// DefaultRequiredServices describes the default services that are required to be supported by outbound peers.
 222  	DefaultRequiredServices = wire.SFNodeNetwork
 223  	// DefaultTargetOutbound is the default number of outbound peers to target.
 224  	DefaultTargetOutbound = 125
 225  	// ConnectionRetryInterval is the base amount of time to wait in between retries
 226  	// when connecting to persistent peers. It is adjusted by the number of retries
 227  	// such that there is a retry backoff.
 228  	ConnectionRetryInterval = time.Minute
 229  )
 230  
 231  var (
 232  	// Ensure simpleAddr implements the net.Addr interface.
 233  	_ net.Addr = SimpleAddr{}
 234  	// UserAgentName is the user agent name and is used to help identify ourselves
 235  	// to peers.
 236  	UserAgentName = "pod"
 237  	// UserAgentVersion is the user agent version and is used to help identify
 238  	// ourselves to peers.
 239  	UserAgentVersion = version.Tag
 240  	// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
 241  	zeroHash chainhash.Hash
 242  )
 243  
 244  // Network returns "onion". This is part of the net.Addr interface.
 245  func (oa *OnionAddr) Network() string {
 246  	return "onion"
 247  }
 248  
 249  // String returns the onion address. This is part of the net.Addr interface.
 250  func (oa *OnionAddr) String() string {
 251  	return oa.Addr
 252  }
 253  
 254  // Count returns the count of all known peers.
 255  func (ps *PeerState) Count() int {
 256  	return len(ps.InboundPeers) + len(ps.OutboundPeers) +
 257  		len(ps.PersistentPeers)
 258  }
 259  
 260  // ForAllOutboundPeers is a helper function that runs closure on all outbound
 261  // peers known to peerState.
 262  func (ps *PeerState) ForAllOutboundPeers(closure func(sp *NodePeer)) {
 263  	for _, e := range ps.OutboundPeers {
 264  		closure(e)
 265  	}
 266  	for _, e := range ps.PersistentPeers {
 267  		closure(e)
 268  	}
 269  }
 270  
 271  // ForAllPeers is a helper function that runs closure on all peers known to peerState.
 272  func (ps *PeerState) ForAllPeers(closure func(sp *NodePeer)) {
 273  	for _, e := range ps.InboundPeers {
 274  		closure(e)
 275  	}
 276  	ps.ForAllOutboundPeers(closure)
 277  }
 278  
 279  // AddBytesReceived adds the passed number of bytes to the total bytes received counter for the server.
 280  //
 281  // It is safe for concurrent access.
 282  func (n *Node) AddBytesReceived(bytesReceived uint64) {
 283  	atomic.AddUint64(&n.BytesReceived, bytesReceived)
 284  }
 285  
 286  // AddBytesSent adds the passed number of bytes to the total bytes sent counter for the server.
 287  //
 288  // It is safe for concurrent access.
 289  func (n *Node) AddBytesSent(bytesSent uint64) {
 290  	atomic.AddUint64(&n.BytesSent, bytesSent)
 291  }
 292  
 293  // AddPeer adds a new peer that has already been connected to the server.
 294  func (n *Node) AddPeer(sp *NodePeer) {
 295  	n.NewPeers <- sp
 296  }
 297  
 298  // AddRebroadcastInventory adds 'iv' to the list of inventories to be rebroadcasted at random intervals until they show
 299  // up in a block.
 300  func (n *Node) AddRebroadcastInventory(iv *wire.InvVect, data interface{}) {
 301  	// Ignore if shutting down.
 302  	if atomic.LoadInt32(&n.Shutdown) != 0 {
 303  		return
 304  	}
 305  	n.ModifyRebroadcastInv <- BroadcastInventoryAdd{InvVect: iv, Data: data}
 306  }
 307  
 308  // AnnounceNewTransactions generates and relays inventory vectors and notifies both websocket and getblocktemplate long
 309  // poll clients of the passed transactions.
 310  //
 311  // This function should be called whenever new transactions are added to the mempool.
 312  func (n *Node) AnnounceNewTransactions(txns []*mempool.TxDesc) {
 313  	// Generate and relay inventory vectors for all newly accepted transactions.
 314  	n.RelayTransactions(txns)
 315  	// Notify both websocket and getblocktemplate long poll clients of all newly accepted transactions.
 316  	for i := range n.RPCServers {
 317  		if n.RPCServers[i] != nil {
 318  			n.RPCServers[i].NotifyNewTransactions(txns)
 319  		}
 320  	}
 321  }
 322  
 323  // BanPeer bans a peer that has already been connected to the server by ip.
 324  func (n *Node) BanPeer(sp *NodePeer) {
 325  	n.BanPeers <- sp
 326  }
 327  
 328  // BroadcastMessage sends msg to all peers currently connected to the server except those in the passed peers to
 329  // exclude.
 330  func (n *Node) BroadcastMessage(msg wire.Message, exclPeers ...*NodePeer) {
 331  	// XXX: Need to determine if this is an alert that has already been broadcast and refrain from broadcasting again.
 332  	bmsg := BroadcastMsg{Message: msg, ExcludePeers: exclPeers}
 333  	n.Broadcast <- bmsg
 334  }
 335  
 336  // ConnectedCount returns the number of currently connected peers.
 337  func (n *Node) ConnectedCount() int32 {
 338  	replyChan := make(chan int32)
 339  	n.Query <- GetConnCountMsg{Reply: replyChan}
 340  	return <-replyChan
 341  }
 342  
 343  // NetTotals returns the sum of all bytes received and sent across the network for all peers.
 344  //
 345  // It is safe for concurrent access.
 346  func (n *Node) NetTotals() (uint64, uint64) {
 347  	return atomic.LoadUint64(&n.BytesReceived),
 348  		atomic.LoadUint64(&n.BytesSent)
 349  }
 350  
 351  // OutboundGroupCount returns the number of peers connected to the given outbound group key.
 352  func (n *Node) OutboundGroupCount(
 353  	key string,
 354  ) int {
 355  	replyChan := make(chan int)
 356  	n.Query <- GetOutboundGroup{Key: key, Reply: replyChan}
 357  	return <-replyChan
 358  }
 359  
 360  // RelayInventory relays the passed inventory vector to all connected peers that are not already known to have it.
 361  func (n *Node) RelayInventory(invVect *wire.InvVect, data interface{}) {
 362  	n.RelayInv <- RelayMsg{InvVect: invVect, Data: data}
 363  }
 364  
 365  // RemoveRebroadcastInventory removes 'iv' from the list of items to be rebroadcasted if present.
 366  func (n *Node) RemoveRebroadcastInventory(iv *wire.InvVect) {
 367  	// Log<-cl.Debug{emoveBroadcastInventory"
 368  	// Ignore if shutting down.
 369  	if atomic.LoadInt32(&n.Shutdown) != 0 {
 370  		// Log<-cl.Debug{gnoring due to shutdown"
 371  		return
 372  	}
 373  	n.ModifyRebroadcastInv <- BroadcastInventoryDel(iv)
 374  }
 375  
 376  // ScheduleShutdown schedules a server shutdown after the specified duration. It also dynamically adjusts how often to
 377  // warn the server is going down based on remaining duration.
 378  func (n *Node) ScheduleShutdown(duration time.Duration) {
 379  	// Don't schedule shutdown more than once.
 380  	if atomic.AddInt32(&n.ShutdownSched, 1) != 1 {
 381  		return
 382  	}
 383  	W.F("server shutdown in %v", duration)
 384  	go func() {
 385  		remaining := duration
 386  		tickDuration := DynamicTickDuration(remaining)
 387  		done := time.After(remaining)
 388  		ticker := time.NewTicker(tickDuration)
 389  	out:
 390  		for {
 391  			select {
 392  			case <-done:
 393  				ticker.Stop()
 394  				e := n.Stop()
 395  				if e != nil {
 396  				}
 397  				break out
 398  			case <-ticker.C:
 399  				remaining -= -tickDuration
 400  				if remaining < time.Second {
 401  					continue
 402  				}
 403  				// Change tick duration dynamically based on remaining time.
 404  				newDuration := DynamicTickDuration(remaining)
 405  				if tickDuration != newDuration {
 406  					tickDuration = newDuration
 407  					ticker.Stop()
 408  					ticker = time.NewTicker(tickDuration)
 409  				}
 410  				W.F("server shutdown in %v", remaining)
 411  			}
 412  		}
 413  	}()
 414  }
 415  
 416  // Start begins accepting connections from peers.
 417  func (n *Node) Start() {
 418  	// Already started?
 419  	if atomic.AddInt32(&n.Started, 1) != 1 {
 420  		return
 421  	}
 422  	D.Ln("starting server")
 423  	// Server startup time. Used for the uptime command for uptime calculation.
 424  	n.StartupTime = time.Now().Unix()
 425  	// Start the peer handler which in turn starts the address and block managers.
 426  	n.WG.Add(1)
 427  	go n.PeerHandler()
 428  	if n.NAT != nil {
 429  		n.WG.Add(1)
 430  		go n.UPNPUpdateThread()
 431  	}
 432  	if n.Config.DisableRPC.False() {
 433  		n.WG.Add(1)
 434  		// Start the rebroadcastHandler, which ensures user tx received by the RPC server are rebroadcast until being
 435  		// included in a block.
 436  		go n.RebroadcastHandler()
 437  		for i := range n.RPCServers {
 438  			n.RPCServers[i].Start()
 439  		}
 440  	}
 441  	// // Start the CPU miner if generation is enabled.
 442  	// if *n.Config.Generate && *n.Config.GenThreads != 0 {
 443  	// 	D.Ln("starting miner")
 444  	// 	args := []string{os.Args[0], "-D", *n.Config.DataDir}
 445  	// 	if *n.Config.KopachGUI {
 446  	// 		args = append(args, "--kopachgui")
 447  	// 	}
 448  	// 	args = append(args, "kopach")
 449  	// 	// args = apputil.PrependForWindows(args)
 450  	// 	n.StateCfg.Miner = consume.Log(n.Quit, func(ent *log.Entry) (e error) {
 451  	// 		D.Ln(ent.Level, ent.Time, ent.Text, ent.CodeLocation)
 452  	// 		return
 453  	// 	}, func(pkg string) (out bool) {
 454  	// 		return false
 455  	// 	}, args...)
 456  	// 	consume.Start(n.StateCfg.Miner)
 457  	// 	// defer consume.Kill(n.StateCfg.Miner)
 458  	// }
 459  	// interrupt.AddHandler(func() {
 460  	// 	// Stop the CPU miner if needed
 461  	// 	consume.Kill(n.StateCfg.Miner)
 462  	// 	D.Ln("miner has stopped")
 463  	// })
 464  }
 465  
 466  // Stop gracefully shuts down the server by stopping and disconnecting all peers and the main listener.
 467  func (n *Node) Stop() (e error) {
 468  	D.Ln("stopping chain rpc server")
 469  	// Make sure this only happens once.
 470  	if atomic.AddInt32(&n.Shutdown, 1) != 1 {
 471  		D.Ln("server is already in the process of shutting down")
 472  		return nil
 473  	}
 474  	T.Ln("node shutting down")
 475  
 476  	// Shutdown the RPC server if it'n not disabled.
 477  	if !n.Config.DisableRPC.True() {
 478  		for i := range n.RPCServers {
 479  			e = n.RPCServers[i].Stop()
 480  			if e != nil {
 481  			}
 482  		}
 483  	}
 484  	// Save fee estimator state in the database.
 485  	if e = n.DB.Update(
 486  		func(tx database.Tx) (e error) {
 487  			metadata := tx.Metadata()
 488  			if e = metadata.Put(mempool.EstimateFeeDatabaseKey, n.FeeEstimator.Save()); E.Chk(e) {
 489  			}
 490  			return nil
 491  		},
 492  	); E.Chk(e) {
 493  	}
 494  	// Stop the CPU miner if needed
 495  	// consume.Kill(n.StateCfg.Miner)
 496  	// D.Ln("miner has stopped")
 497  	D.Ln("Signal the remaining goroutines to quit.")
 498  	n.Quit.Q()
 499  	return
 500  }
 501  
 502  // TransactionConfirmed has one confirmation on the main chain. Now we can mark it as no longer needing rebroadcasting.
 503  func (n *Node) TransactionConfirmed(tx *util.Tx) {
 504  	// Rebroadcasting is only necessary when the RPC server is active.
 505  	for i := range n.RPCServers {
 506  		if n.RPCServers[i] == nil {
 507  			return
 508  		}
 509  	}
 510  	iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
 511  	n.RemoveRebroadcastInventory(iv)
 512  }
 513  
 514  // UpdatePeerHeights updates the heights of all peers who have have announced the latest connected main chain block, or
 515  // a recognized orphan.
 516  //
 517  // These height updates allow us to dynamically refresh peer heights, ensuring sync peer selection has access to the
 518  // latest block heights for each peer.
 519  func (n *Node) UpdatePeerHeights(
 520  	latestBlkHash *chainhash.Hash,
 521  	latestHeight int32, updateSource *peer.Peer,
 522  ) {
 523  	n.PeerHeightsUpdate <- UpdatePeerHeightsMsg{
 524  		NewHash:    latestBlkHash,
 525  		NewHeight:  latestHeight,
 526  		OriginPeer: updateSource,
 527  	}
 528  }
 529  
 530  // WaitForShutdown blocks until the main listener and peer handlers are stopped.
 531  func (n *Node) WaitForShutdown() {
 532  	n.WG.Wait()
 533  }
 534  
 535  // HandleAddPeerMsg deals with adding new peers. It is invoked from the peerHandler goroutine.
 536  func (n *Node) HandleAddPeerMsg(state *PeerState, sp *NodePeer) bool {
 537  	I.Ln("HandleAddPeerMsg")
 538  	if sp == nil {
 539  		return false
 540  	}
 541  	// Ignore new peers if we're shutting down.
 542  	if atomic.LoadInt32(&n.Shutdown) != 0 {
 543  		I.F("new peer %n ignored - server is shutting down", sp)
 544  		sp.Disconnect()
 545  		return false
 546  	}
 547  	// Disconnect banned peers.
 548  	var host string
 549  	var e error
 550  	host, _, e = net.SplitHostPort(sp.Addr())
 551  	if e != nil {
 552  		E.Ln("can't split host/port", e)
 553  		sp.Disconnect()
 554  		return false
 555  	}
 556  	if banEnd, ok := state.Banned[host]; ok {
 557  		if time.Now().Before(banEnd) {
 558  			D.F(
 559  				"peer %n is banned for another %v - disconnecting %n",
 560  				host, time.Until(banEnd),
 561  			)
 562  			sp.Disconnect()
 563  			return false
 564  		}
 565  		I.F("peer %n is no longer banned", host)
 566  		delete(state.Banned, host)
 567  	}
 568  	// TODO: Chk for max peers from a single IP.
 569  
 570  	// Limit max number of total peers.
 571  	if state.Count() >= n.Config.MaxPeers.V() {
 572  		I.F(
 573  			"max peers reached [%d] - disconnecting peer %n",
 574  			n.Config.MaxPeers, sp.Addr(),
 575  		)
 576  		sp.Disconnect()
 577  		// TODO: how to handle permanent peers here? they should be rescheduled.
 578  		return false
 579  	}
 580  	// I.S(state.InboundPeers)
 581  	// I.S(state.OutboundPeers)
 582  	// I.S(state.PersistentPeers)
 583  
 584  	// for i := range state.InboundPeers {
 585  	// 	if state.InboundPeers[i].LocalAddr() == sp.Addr() {
 586  	//
 587  	// 	}
 588  	// }
 589  	// for i := range state.OutboundPeers {
 590  	//
 591  	// }
 592  
 593  	// 		D.Ln(
 594  	// 			state.OutboundPeers[i].UserAgent(),
 595  	// 			state.InboundPeers[j].UserAgent(),
 596  	// 			sp.UserAgent(),
 597  	// 			state.OutboundPeers[i].LocalAddr().String(),
 598  	// 			state.InboundPeers[j].LocalAddr().String(),
 599  	// 			sp.Addr(),
 600  	// 			state.OutboundPeers[i].Addr(),
 601  	// 			state.InboundPeers[j].Addr(),
 602  	// 			sp.Addr(),
 603  	// 		)
 604  	// 		if strings.Contains(sp.UserAgent(), "nonce") &&
 605  	// 			state.OutboundPeers[i].UserAgent() == sp.UserAgent() ||
 606  	// 			state.InboundPeers[j].UserAgent() == sp.UserAgent() ||
 607  	// 			state.OutboundPeers[i].LocalAddr().String() == sp.Addr() ||
 608  	// 			state.InboundPeers[j].LocalAddr().String() == sp.Addr() ||
 609  	// 			state.OutboundPeers[i].Addr() == sp.Addr() ||
 610  	// 			state.InboundPeers[j].Addr() == sp.Addr() {
 611  	// 			D.Ln("already have connection to peer with UAC", sp.UserAgent())
 612  	//
 613  	// 			sp.Disconnect()
 614  	// 		}
 615  	// for i := range state.InboundPeers {
 616  	// 	D.Ln("inbound peer:", state.InboundPeers[i].LocalAddr(), state.InboundPeers[i].Addr())
 617  	// }
 618  	// for i := range state.OutboundPeers {
 619  	// 	D.Ln("outbound peer:", state.OutboundPeers[i].LocalAddr(), state.OutboundPeers[i].Addr())
 620  	// }
 621  	// Add the new peer and start it.
 622  	D.Ln("new peer", sp.UserAgent(), sp.Addr(), sp.LocalAddr(), sp.Inbound())
 623  	if sp.Inbound() {
 624  		state.InboundPeers[sp.ID()] = sp
 625  	} else {
 626  		state.OutboundGroups[addrmgr.GroupKey(sp.NA())]++
 627  		if sp.Persistent {
 628  			state.PersistentPeers[sp.ID()] = sp
 629  		} else {
 630  			state.OutboundPeers[sp.ID()] = sp
 631  		}
 632  	}
 633  	return true
 634  }
 635  
 636  // HandleBanPeerMsg deals with banning peers. It is invoked from the peerHandler goroutine.
 637  func (n *Node) HandleBanPeerMsg(state *PeerState, sp *NodePeer) {
 638  	var host string
 639  	var e error
 640  	host, _, e = net.SplitHostPort(sp.Addr())
 641  	if e != nil {
 642  		E.F("can't split ban peer %n %v %n", sp.Addr(), e)
 643  		return
 644  	}
 645  	direction := log.DirectionString(sp.Inbound())
 646  	I.F("banned peer %n (%n) for %v", host, direction, *n.Config.BanDuration)
 647  	state.Banned[host] = time.Now().Add(n.Config.BanDuration.V())
 648  }
 649  
 650  // HandleBroadcastMsg deals with broadcasting messages to peers. It is invoked from the peerHandler goroutine.
 651  func (n *Node) HandleBroadcastMsg(state *PeerState, bmsg *BroadcastMsg) {
 652  	state.ForAllPeers(
 653  		func(sp *NodePeer) {
 654  			if !sp.Connected() {
 655  				return
 656  			}
 657  			for _, ep := range bmsg.ExcludePeers {
 658  				if sp == ep {
 659  					return
 660  				}
 661  			}
 662  			sp.QueueMessage(bmsg.Message, nil)
 663  		},
 664  	)
 665  }
 666  
 667  // HandleDonePeerMsg deals with peers that have signalled they are done. It is invoked from the peerHandler goroutine.
 668  func (n *Node) HandleDonePeerMsg(state *PeerState, sp *NodePeer) {
 669  	var list map[int32]*NodePeer
 670  	switch {
 671  	case sp.Persistent:
 672  		list = state.PersistentPeers
 673  	case sp.Inbound():
 674  		list = state.InboundPeers
 675  	default:
 676  		list = state.OutboundPeers
 677  	}
 678  	if _, ok := list[sp.ID()]; ok {
 679  		if !sp.Inbound() && sp.VersionKnown() {
 680  			state.OutboundGroups[addrmgr.GroupKey(sp.NA())]--
 681  		}
 682  		if !sp.Inbound() && sp.ConnReq != nil {
 683  			n.ConnManager.Disconnect(sp.ConnReq.ID())
 684  		}
 685  		delete(list, sp.ID())
 686  		T.Ln("removed peer ", sp)
 687  		return
 688  	}
 689  	if sp.ConnReq != nil {
 690  		n.ConnManager.Disconnect(sp.ConnReq.ID())
 691  	}
 692  	// Update the address' last seen time if the peer has acknowledged our version and has sent us its version as well.
 693  	if sp.VerAckReceived() && sp.VersionKnown() && sp.NA() != nil {
 694  		n.AddrManager.Connected(sp.NA())
 695  	}
 696  	// If we get here it means that either we didn't know about the peer or we purposefully deleted it.
 697  }
 698  
 699  // HandleQuery is the central handler for all queries and commands from other goroutines related to peer state.
 700  //
 701  // Previously this counts two if the same node was connected outbound and then connected back inbound. The nonce given
 702  // in a Version message is now added to the Peer struct and then as this iterates the connected peers list, it adds
 703  // nonces from Peers marked connected to a map, thus excluding double-counting, and returns this value.
 704  //
 705  // No idea why it was not written to exclude keeping multiple peers open like this, since a connection is a duplex
 706  // channel, but at least now the ConnectedCount query will provide the correct numbers (this was changed in order to
 707  // allow identifying local area network nodes so a non-internet test environment can be created
 708  func (n *Node) HandleQuery(state *PeerState, querymsg interface{}) {
 709  	switch msg := querymsg.(type) {
 710  	case GetConnCountMsg:
 711  		nonces := make(map[string]struct{})
 712  		nonce := ""
 713  		state.ForAllPeers(
 714  			func(sp *NodePeer) {
 715  				// D.Ln(sp.UserAgent())
 716  				ua := strings.Split(sp.UserAgent(), "nonce")
 717  				if len(ua) < 2 {
 718  					nonce = fmt.Sprintf("%s/%s", sp.Peer.LocalAddr().String(), sp.Peer.Addr())
 719  				} else {
 720  					nonce = fmt.Sprintf(
 721  						"%s/%s", ua[1][:8],
 722  						strings.Split(sp.Peer.LocalAddr().String(), ":")[0],
 723  					)
 724  				}
 725  				_, ok := nonces[nonce]
 726  				if !ok {
 727  					if sp.Connected() {
 728  						nonces[nonce] = struct{}{}
 729  						// nconnected++
 730  					}
 731  				}
 732  			},
 733  		)
 734  		// D.Ln(nonces)
 735  		msg.Reply <- int32(len(nonces))
 736  	case GetPeersMsg:
 737  		peers := make([]*NodePeer, 0, state.Count())
 738  		state.ForAllPeers(
 739  			func(sp *NodePeer) {
 740  				if !sp.Connected() {
 741  					return
 742  				}
 743  				peers = append(peers, sp)
 744  			},
 745  		)
 746  		msg.Reply <- peers
 747  	case ConnectNodeMsg:
 748  		// TODO: duplicate oneshots? Limit max number of total peers.
 749  		if state.Count() >= n.Config.MaxPeers.V() {
 750  			msg.Reply <- errors.New("max peers reached")
 751  			return
 752  		}
 753  		for _, nodePeer := range state.PersistentPeers {
 754  			if nodePeer.Addr() == msg.Addr || nodePeer.Peer.LocalAddr().String() == msg.Addr {
 755  				if msg.Permanent {
 756  					msg.Reply <- errors.New("nodePeer already connected")
 757  				} else {
 758  					msg.Reply <- errors.New("nodePeer exists as a permanent nodePeer")
 759  				}
 760  				return
 761  			}
 762  		}
 763  		for _, nodePeer := range state.InboundPeers {
 764  			if nodePeer.Addr() == msg.Addr || nodePeer.Peer.LocalAddr().String() == msg.Addr {
 765  				if msg.Permanent {
 766  					msg.Reply <- errors.New("nodePeer already connected inbound")
 767  				} else {
 768  					msg.Reply <- errors.New("nodePeer exists as a permanent nodePeer inbound")
 769  				}
 770  				return
 771  			}
 772  		}
 773  		for _, nodePeer := range state.OutboundPeers {
 774  			if nodePeer.Addr() == msg.Addr || nodePeer.Peer.LocalAddr().String() == msg.Addr {
 775  				if msg.Permanent {
 776  					msg.Reply <- errors.New("nodePeer already connected outbound inbound")
 777  				} else {
 778  					msg.Reply <- errors.New("nodePeer exists as a permanent nodePeer")
 779  				}
 780  				return
 781  			}
 782  		}
 783  		netAddr, e := AddrStringToNetAddr(n.Config, n.StateCfg, msg.Addr)
 784  		if e != nil {
 785  			msg.Reply <- e
 786  			return
 787  		}
 788  		// TODO: if too many, nuke a non-perm nodePeer.
 789  		go n.ConnManager.Connect(
 790  			&connmgr.ConnReq{
 791  				Addr:      netAddr,
 792  				Permanent: msg.Permanent,
 793  			},
 794  		)
 795  		msg.Reply <- nil
 796  	case RemoveNodeMsg:
 797  		found := DisconnectPeer(
 798  			state.PersistentPeers, msg.Cmp, func(sp *NodePeer) {
 799  				// Keep group counts ok since we remove from the list now.
 800  				state.OutboundGroups[addrmgr.GroupKey(sp.NA())]--
 801  			},
 802  		)
 803  		if found {
 804  			msg.Reply <- nil
 805  		} else {
 806  			msg.Reply <- errors.New("nodePeer not found")
 807  		}
 808  	case GetOutboundGroup:
 809  		count, ok := state.OutboundGroups[msg.Key]
 810  		if ok {
 811  			msg.Reply <- count
 812  		} else {
 813  			msg.Reply <- 0
 814  		}
 815  	// Request a list of the persistent (added) peers.
 816  	case GetAddedNodesMsg:
 817  		// Respond with a slice of the relevant peers.
 818  		peers := make([]*NodePeer, 0, len(state.PersistentPeers))
 819  		for _, sp := range state.PersistentPeers {
 820  			peers = append(peers, sp)
 821  		}
 822  		msg.Reply <- peers
 823  	case DisconnectNodeMsg:
 824  		// Chk inbound peers. We pass a nil callback since we don't require any additional actions on disconnect for
 825  		// inbound peers.
 826  		found := DisconnectPeer(state.InboundPeers, msg.Cmp, nil)
 827  		if found {
 828  			msg.Reply <- nil
 829  			return
 830  		}
 831  		// Chk outbound peers.
 832  		found = DisconnectPeer(
 833  			state.OutboundPeers, msg.Cmp, func(sp *NodePeer) {
 834  				// Keep group counts ok since we remove from the list now.
 835  				state.OutboundGroups[addrmgr.GroupKey(sp.NA())]--
 836  			},
 837  		)
 838  		if found {
 839  			// If there are multiple outbound connections to the same ip:port, continue disconnecting them all until no
 840  			// such peers are found.
 841  			for found {
 842  				found = DisconnectPeer(
 843  					state.OutboundPeers, msg.Cmp,
 844  					func(sp *NodePeer) {
 845  						state.OutboundGroups[addrmgr.GroupKey(sp.NA())]--
 846  					},
 847  				)
 848  			}
 849  			msg.Reply <- nil
 850  			return
 851  		}
 852  		msg.Reply <- errors.New("nodePeer not found")
 853  	}
 854  }
 855  
 856  // HandleRelayInvMsg deals with relaying inventory to peers that are not already known to have it. It is invoked from
 857  // the peerHandler goroutine.
 858  func (n *Node) HandleRelayInvMsg(state *PeerState, msg RelayMsg) {
 859  	state.ForAllPeers(
 860  		func(sp *NodePeer) {
 861  			if !sp.Connected() {
 862  				return
 863  			}
 864  			// If the inventory is a block and the peer prefers headers, generate and send a headers message instead of an
 865  			// inventory message.
 866  			if msg.InvVect.Type == wire.InvTypeBlock && sp.WantsHeaders() {
 867  				blockHeader, ok := msg.Data.(wire.BlockHeader)
 868  				if !ok {
 869  					W.Ln("underlying data for headers is not a block header")
 870  					return
 871  				}
 872  				msgHeaders := wire.NewMsgHeaders()
 873  				if e := msgHeaders.AddBlockHeader(&blockHeader); E.Chk(e) {
 874  					E.Ln("failed to add block header:", e)
 875  					return
 876  				}
 877  				sp.QueueMessage(msgHeaders, nil)
 878  				return
 879  			}
 880  			if msg.InvVect.Type == wire.InvTypeTx {
 881  				// Don't relay the transaction to the peer when it has transaction relaying disabled.
 882  				if sp.IsRelayTxDisabled() {
 883  					return
 884  				}
 885  				txD, ok := msg.Data.(*mempool.TxDesc)
 886  				if !ok {
 887  					W.F("underlying data for tx inv relay is not a *mempool.TxDesc: %Ter", msg.Data)
 888  					return
 889  				}
 890  				// Don't relay the transaction if the transaction fee-per-kb is less than the peer'n feefilter.
 891  				feeFilter := atomic.LoadInt64(&sp.FeeFilter)
 892  				if feeFilter > 0 && txD.FeePerKB < feeFilter {
 893  					return
 894  				}
 895  				// Don't relay the transaction if there is a bloom filter loaded and the transaction doesn't match it.
 896  				if sp.Filter.IsLoaded() {
 897  					if !sp.Filter.MatchTxAndUpdate(txD.Tx) {
 898  						return
 899  					}
 900  				}
 901  			}
 902  			// Queue the inventory to be relayed with the next batch. It will be ignored if the peer is already known to
 903  			// have the inventory.
 904  			sp.QueueInventory(msg.InvVect)
 905  		},
 906  	)
 907  }
 908  
 909  // HandleUpdatePeerHeights updates the heights of all peers who were known to announce a block we recently accepted.
 910  func (n *Node) HandleUpdatePeerHeights(
 911  	state *PeerState,
 912  	umsg UpdatePeerHeightsMsg,
 913  ) {
 914  	state.ForAllPeers(
 915  		func(sp *NodePeer) {
 916  			// The origin peer should already have the updated height.
 917  			if sp.Peer == umsg.OriginPeer {
 918  				return
 919  			}
 920  			// This is a pointer to the underlying memory which doesn't change.
 921  			latestBlkHash := sp.LastAnnouncedBlock()
 922  			// Skip this peer if it hasn't recently announced any new blocks.
 923  			if latestBlkHash == nil {
 924  				return
 925  			}
 926  			// If the peer has recently announced a block, and this block matches our newly accepted block, then update
 927  			// their block height.
 928  			if *latestBlkHash == *umsg.NewHash {
 929  				sp.UpdateLastBlockHeight(umsg.NewHeight)
 930  				sp.UpdateLastAnnouncedBlock(nil)
 931  			}
 932  		},
 933  	)
 934  }
 935  
 936  // InboundPeerConnected is invoked by the connection manager when a new inbound connection is established. It
 937  // initializes a new inbound server peer instance, associates it with the connection, and starts a goroutine to wait for
 938  // disconnection.
 939  func (n *Node) InboundPeerConnected(conn net.Conn) {
 940  	ca := conn.RemoteAddr().String()
 941  	var h string
 942  	var e error
 943  	if h, _, e = net.SplitHostPort(ca); E.Chk(e) {
 944  	}
 945  	remoteIP := net.ParseIP(h)
 946  	cla := conn.LocalAddr().String()
 947  	var hh string
 948  	if hh, _, e = net.SplitHostPort(cla); E.Chk(e) {
 949  	}
 950  	localIP := net.ParseIP(hh)
 951  	I.Ln("inbound peer connected", ca, cla, remoteIP)
 952  	sp := NewServerPeer(n, localIP, false)
 953  	sp.IsWhitelisted = GetIsWhitelisted(n.StateCfg, conn.RemoteAddr())
 954  	sp.Peer = peer.NewInboundPeer(NewPeerConfig(sp))
 955  	_ = sp.AssociateConnection(conn)
 956  	go n.PeerDoneHandler(sp)
 957  	// go func() {
 958  	// 	msg := <-msgChan
 959  	// 	n.peerState.ForAllPeers(
 960  	// 		func(np *NodePeer) {
 961  	// 			if np.IP.Equal(msg.AddrMe.IP) && np.Port == msg.AddrMe.Port && np.ID() != sp.ID() {
 962  	// 				I.Ln("disconnecting inbound peer we are connected outbound to")
 963  	// 				sp.Disconnect()
 964  	// 			}
 965  	// 			// check also that the address of origin matches the one in the message, if this
 966  	// 			// is wrong and not zero it is being spoofed and is suspicious
 967  	// 			if !msg.AddrMe.IP.Equal(net.IP{0, 0, 0, 0}) && msg.AddrYou.IP.String() != remoteIP.String() {
 968  	// 				W.Ln("disconnecting peer", remoteIP, "who is sending message with non-matching IP", msg.AddrYou.IP)
 969  	// 				sp.Disconnect()
 970  	// 			}
 971  	// 		},
 972  	// 	)
 973  	// }()
 974  }
 975  
 976  // OutboundPeerConnected is invoked by the connection manager when a new
 977  // outbound connection is established. It initializes a new outbound server peer
 978  // instance, associates it with the relevant state such as the connection
 979  // request instance and the connection itself, and finally notifies the address
 980  // manager of the attempt.
 981  //
 982  // TODO: the serverpeer should attach to the same connection as the inbound
 983  //  connection?
 984  func (n *Node) OutboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
 985  	ca := conn.RemoteAddr().String()
 986  	cla := conn.LocalAddr().String()
 987  	I.Ln("outbound peer connected", ca, cla)
 988  	var hh string
 989  	var e error
 990  	if hh, _, e = net.SplitHostPort(cla); E.Chk(e) {
 991  	}
 992  	localIP := net.ParseIP(hh)
 993  	sp := NewServerPeer(n, localIP, c.Permanent)
 994  	p, e := peer.NewOutboundPeer(NewPeerConfig(sp), c.Addr.String())
 995  	if e != nil {
 996  		E.F("cannot create outbound peer %n: %v %n", c.Addr, e)
 997  		n.ConnManager.Disconnect(c.ID())
 998  	}
 999  	sp.Peer = p
1000  	sp.ConnReq = c
1001  	sp.IsWhitelisted = GetIsWhitelisted(n.StateCfg, conn.RemoteAddr())
1002  	_ = sp.AssociateConnection(conn)
1003  	go n.PeerDoneHandler(sp)
1004  	// go func() {
1005  	// 	msg := <-msgChan
1006  	// 	n.peerState.ForAllPeers(
1007  	// 		func(np *NodePeer) {
1008  	// 			if np.IP.Equal(msg.AddrMe.IP) && np.Port == msg.AddrMe.Port && np.ID() != sp.ID() {
1009  	// 				I.Ln("disconnecting outbound peer we are connected inbound to")
1010  	// 				sp.Disconnect()
1011  	// 			}
1012  	// 		},
1013  	// 	)
1014  	// }()
1015  	n.AddrManager.Attempt(sp.NA())
1016  }
1017  
1018  // PeerDoneHandler handles peer disconnects by notifiying the server that it's done along with other performing other
1019  // desirable cleanup.
1020  func (n *Node) PeerDoneHandler(sp *NodePeer) {
1021  	sp.WaitForDisconnect()
1022  	n.DonePeers <- sp
1023  	// Only tell sync manager we are gone if we ever told it we existed.
1024  	if sp.VersionKnown() {
1025  		n.SyncManager.DonePeer(sp.Peer)
1026  		// Evict any remaining orphans that were sent by the peer.
1027  		numEvicted := n.TxMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID()))
1028  		if numEvicted > 0 {
1029  			D.F(
1030  				"Evicted %d %n from peer %v (id %d)",
1031  				numEvicted, log.PickNoun(int(numEvicted), "orphan", "orphans"),
1032  				sp, sp.ID(),
1033  			)
1034  		}
1035  	}
1036  	sp.Quit.Q()
1037  }
1038  
1039  // PeerHandler is used to handle peer operations such as adding and removing peers to and from the server, banning
1040  // peers, and broadcasting messages to peers. It must be run in a goroutine.
1041  func (n *Node) PeerHandler() {
1042  	// Start the address manager and sync manager, both of which are needed by peers. This is done here since their
1043  	// lifecycle is closely tied to this handler and rather than adding more channels to synchronize things, it's easier
1044  	// and slightly faster to simply start and stop them in this handler.
1045  	n.AddrManager.Start()
1046  	n.SyncManager.Start()
1047  	T.Ln("starting peer handler")
1048  	n.peerState = &PeerState{
1049  		InboundPeers:    make(map[int32]*NodePeer),
1050  		PersistentPeers: make(map[int32]*NodePeer),
1051  		OutboundPeers:   make(map[int32]*NodePeer),
1052  		Banned:          make(map[string]time.Time),
1053  		OutboundGroups:  make(map[string]int),
1054  	}
1055  	if !n.Config.DisableDNSSeed.True() || len(n.Config.ConnectPeers.S()) < 0 {
1056  		// Add peers discovered through DNS to the address manager.
1057  		connmgr.SeedFromDNS(
1058  			n.ActiveNet, DefaultRequiredServices,
1059  			Lookup(n.StateCfg), func(addrs []*wire.NetAddress) {
1060  				// Bitcoind uses a lookup of the dns seeder here. This is rather strange since the values looked up by
1061  				// the DNS seed lookups will vary quite a lot. To replicate this behaviour we put all addresses as
1062  				// having come from the first one.
1063  				D.Ln("adding addresses")
1064  				n.AddrManager.AddAddresses(addrs, addrs[0])
1065  			},
1066  		)
1067  	}
1068  	T.Ln("starting connmgr")
1069  	go n.ConnManager.Start()
1070  out:
1071  	for {
1072  		select {
1073  		// queries for current peer summary list
1074  		case qc := <-n.PeerState:
1075  			go func() {
1076  				T.Ln("handling peer summary query")
1077  				// flatten the list of
1078  				res := make(map[int32]*NodePeer)
1079  				for i := range n.peerState.InboundPeers {
1080  					res[i] = n.peerState.InboundPeers[i]
1081  				}
1082  				for i := range n.peerState.OutboundPeers {
1083  					res[i] = n.peerState.OutboundPeers[i]
1084  				}
1085  				for i := range n.peerState.PersistentPeers {
1086  					res[i] = n.peerState.PersistentPeers[i]
1087  				}
1088  				var ps peersummary.PeerSummaries
1089  				for i := range res {
1090  					if res[i].Connected() {
1091  						ps = append(
1092  							ps, peersummary.PeerSummary{
1093  								IP:      res[i].Peer.NA().IP,
1094  								Inbound: res[i].Inbound(),
1095  							},
1096  						)
1097  					}
1098  				}
1099  				// send back the answer
1100  				T.Ln("sending back peer summary")
1101  				// D.S(ps)
1102  				qc <- ps
1103  			}()
1104  		// New peers connected to the server.
1105  		case p := <-n.NewPeers:
1106  			n.HandleAddPeerMsg(n.peerState, p)
1107  		// Disconnected peers.
1108  		case p := <-n.DonePeers:
1109  			n.HandleDonePeerMsg(n.peerState, p)
1110  		// Block accepted in mainchain or orphan, update peer height.
1111  		case umsg := <-n.PeerHeightsUpdate:
1112  			n.HandleUpdatePeerHeights(n.peerState, umsg)
1113  		// Peer to ban.
1114  		case p := <-n.BanPeers:
1115  			n.HandleBanPeerMsg(n.peerState, p)
1116  		// New inventory to potentially be relayed to other peers.
1117  		case invMsg := <-n.RelayInv:
1118  			n.HandleRelayInvMsg(n.peerState, invMsg)
1119  		// Message to broadcast to all connected peers except those which are excluded by the message.
1120  		case bmsg := <-n.Broadcast:
1121  			n.HandleBroadcastMsg(n.peerState, &bmsg)
1122  		case qmsg := <-n.Query:
1123  			n.HandleQuery(n.peerState, qmsg)
1124  		case <-n.Quit.Wait():
1125  			D.Ln("chain peer server shutting down")
1126  			// Disconnect all peers on server shutdown.
1127  			n.peerState.ForAllPeers(
1128  				func(sp *NodePeer) {
1129  					T.F("shutdown peer %n", sp.Addr())
1130  					sp.Disconnect()
1131  				},
1132  			)
1133  			break out
1134  		}
1135  	}
1136  	n.ConnManager.Stop()
1137  	e := n.SyncManager.Stop()
1138  	if e != nil {
1139  	}
1140  	e = n.AddrManager.Stop()
1141  	if e != nil {
1142  	}
1143  	// Drain channels before exiting so nothing is left waiting around to send.
1144  cleanup:
1145  	for {
1146  		select {
1147  		case <-n.NewPeers:
1148  		case <-n.DonePeers:
1149  		case <-n.PeerHeightsUpdate:
1150  		case <-n.RelayInv:
1151  		case <-n.Broadcast:
1152  		case <-n.Query:
1153  		default:
1154  			break cleanup
1155  		}
1156  	}
1157  	n.WG.Done()
1158  	T.F("peer handler done")
1159  }
1160  
1161  // PushBlockMsg sends a block message for the provided block hash to the connected peer. An error is returned if the
1162  // block hash is not known.
1163  func (n *Node) PushBlockMsg(
1164  	sp *NodePeer, hash *chainhash.Hash,
1165  	doneChan chan<- struct{}, waitChan qu.C,
1166  	encoding wire.MessageEncoding,
1167  ) (e error) {
1168  	// Fetch the raw block bytes from the database.
1169  	var blockBytes []byte
1170  	e = sp.Server.DB.View(
1171  		func(dbTx database.Tx) (e error) {
1172  			blockBytes, e = dbTx.FetchBlock(hash)
1173  			return e
1174  		},
1175  	)
1176  	if e != nil {
1177  		E.F(
1178  			"unable to fetch requested block hash %v: %v",
1179  			hash, e,
1180  		)
1181  		if doneChan != nil {
1182  			doneChan <- struct{}{}
1183  		}
1184  		return e
1185  	}
1186  	// Deserialize the block.
1187  	var msgBlock wire.Block
1188  	e = msgBlock.Deserialize(bytes.NewReader(blockBytes))
1189  	if e != nil {
1190  		E.F(
1191  			"unable to deserialize requested block hash %v: %v",
1192  			hash, e,
1193  		)
1194  		if doneChan != nil {
1195  			doneChan <- struct{}{}
1196  		}
1197  		return e
1198  	}
1199  	// Once we have fetched data wait for any previous operation to finish.
1200  	if waitChan != nil {
1201  		<-waitChan
1202  	}
1203  	// We only send the channel for this message if we aren't sending an inv straight after.
1204  	var dc chan<- struct{}
1205  	continueHash := sp.ContinueHash
1206  	sendInv := continueHash != nil && continueHash.IsEqual(hash)
1207  	if !sendInv {
1208  		dc = doneChan
1209  	}
1210  	sp.QueueMessageWithEncoding(&msgBlock, dc, encoding)
1211  	// When the peer requests the final block that was advertised in response to a getblocks message which requested
1212  	// more blocks than would fit into a single message, send it a new inventory message to trigger it to issue another
1213  	// getblocks message for the next batch of inventory.
1214  	if sendInv {
1215  		best := sp.Server.Chain.BestSnapshot()
1216  		invMsg := wire.NewMsgInvSizeHint(1)
1217  		iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash)
1218  		e := invMsg.AddInvVect(iv)
1219  		if e != nil {
1220  		}
1221  		sp.QueueMessage(invMsg, doneChan)
1222  		sp.ContinueHash = nil
1223  	}
1224  	return nil
1225  }
1226  
1227  // PushMerkleBlockMsg sends a merkleblock message for the provided block hash to the connected peer. Since a merkle
1228  // block requires the peer to have a filter loaded, this call will simply be ignored if there is no filter loaded.
1229  //
1230  // An error is returned if the block hash is not known.
1231  func (n *Node) PushMerkleBlockMsg(
1232  	sp *NodePeer, hash *chainhash.Hash,
1233  	doneChan chan<- struct{}, waitChan qu.C,
1234  	encoding wire.MessageEncoding,
1235  ) (e error) {
1236  	// Do not send a response if the peer doesn't have a filter loaded.
1237  	if !sp.Filter.IsLoaded() {
1238  		if doneChan != nil {
1239  			doneChan <- struct{}{}
1240  		}
1241  		return nil
1242  	}
1243  	// Fetch the raw block bytes from the database.
1244  	blk, e := sp.Server.Chain.BlockByHash(hash)
1245  	if e != nil {
1246  		E.F(
1247  			"unable to fetch requested block hash %v: %v",
1248  			hash, e,
1249  		)
1250  		if doneChan != nil {
1251  			doneChan <- struct{}{}
1252  		}
1253  		return e
1254  	}
1255  	// Generate a merkle block by filtering the requested block according to the filter for the peer.
1256  	merkle, matchedTxIndices := bloom.NewMerkleBlock(blk, sp.Filter)
1257  	// Once we have fetched data wait for any previous operation to finish.
1258  	if waitChan != nil {
1259  		<-waitChan
1260  	}
1261  	// Send the merkleblock. Only send the done channel with this message if no transactions will be sent afterwards.
1262  	var dc chan<- struct{}
1263  	if len(matchedTxIndices) == 0 {
1264  		dc = doneChan
1265  	}
1266  	sp.QueueMessage(merkle, dc)
1267  	// Finally, send any matched transactions.
1268  	blkTransactions := blk.WireBlock().Transactions
1269  	for i, txIndex := range matchedTxIndices {
1270  		// Only send the done channel on the final transaction.
1271  		var dc chan<- struct{}
1272  		if i == len(matchedTxIndices)-1 {
1273  			dc = doneChan
1274  		}
1275  		if txIndex < uint32(len(blkTransactions)) {
1276  			sp.QueueMessageWithEncoding(
1277  				blkTransactions[txIndex], dc,
1278  				encoding,
1279  			)
1280  		}
1281  	}
1282  	return nil
1283  }
1284  
1285  // PushTxMsg sends a tx message for the provided transaction hash to the connected peer.
1286  //
1287  // An error is returned if the transaction hash is not known.
1288  func (n *Node) PushTxMsg(
1289  	sp *NodePeer, hash *chainhash.Hash,
1290  	doneChan chan<- struct{}, waitChan qu.C,
1291  	encoding wire.MessageEncoding,
1292  ) (e error) {
1293  	// Attempt to fetch the requested transaction from the pool. A call could be made to check for existence first, but
1294  	// simply trying to fetch a missing transaction results in the same behavior.
1295  	tx, e := n.TxMemPool.FetchTransaction(hash)
1296  	if e != nil {
1297  		E.F("unable to fetch tx %v from transaction pool: %v", hash, e)
1298  		if doneChan != nil {
1299  			doneChan <- struct{}{}
1300  		}
1301  		return e
1302  	}
1303  	// Once we have fetched data wait for any previous operation to finish.
1304  	if waitChan != nil {
1305  		<-waitChan
1306  	}
1307  	sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding)
1308  	return nil
1309  }
1310  
1311  // RebroadcastHandler keeps track of user submitted inventories that we have sent out but have not yet made it into a
1312  // block. We periodically rebroadcast them in case our peers restarted or otherwise lost track of them.
1313  func (n *Node) RebroadcastHandler() {
1314  	// Wait 5 min before first tx rebroadcast.
1315  	timer := time.NewTimer(5 * time.Minute)
1316  	pendingInvs := make(map[wire.InvVect]interface{})
1317  out:
1318  	for {
1319  		select {
1320  		case riv := <-n.ModifyRebroadcastInv:
1321  			switch msg := riv.(type) {
1322  			// Incoming InvVects are added to our map of RPC txs.
1323  			case BroadcastInventoryAdd:
1324  				pendingInvs[*msg.InvVect] = msg.Data
1325  			// When an InvVect has been added to a block, we can now remove it, if it was present.
1326  			case BroadcastInventoryDel:
1327  				if _, ok := pendingInvs[*msg]; ok {
1328  					delete(pendingInvs, *msg)
1329  				}
1330  			}
1331  		case <-timer.C:
1332  			// Any inventory we have has not made it into a block yet. We periodically resubmit them until they have.
1333  			for iv, data := range pendingInvs {
1334  				ivCopy := iv
1335  				n.RelayInventory(&ivCopy, data)
1336  			}
1337  			// Process at a random time up to 30mins (in seconds) in the future.
1338  			timer.Reset(
1339  				time.Second *
1340  					time.Duration(RandomUint16Number(1800)),
1341  			)
1342  		case <-n.Quit.Wait():
1343  			break out
1344  			// default:
1345  		}
1346  	}
1347  	timer.Stop()
1348  	// Drain channels before exiting so nothing is left waiting around to send.
1349  cleanup:
1350  	for {
1351  		select {
1352  		case <-n.ModifyRebroadcastInv:
1353  		default:
1354  			break cleanup
1355  		}
1356  	}
1357  	n.WG.Done()
1358  }
1359  
1360  // RelayTransactions generates and relays inventory vectors for all of the
1361  // passed transactions to all connected peers.
1362  func (n *Node) RelayTransactions(txns []*mempool.TxDesc) {
1363  	for _, txD := range txns {
1364  		iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash())
1365  		n.RelayInventory(iv, txD)
1366  	}
1367  }
1368  func (n *Node) UPNPUpdateThread() {
1369  	// Go off immediately to prevent code duplication, thereafter we renew lease
1370  	// every 15 minutes.
1371  	timer := time.NewTimer(0 * time.Second)
1372  	lport, _ := strconv.ParseInt(n.ActiveNet.DefaultPort, 10, 16)
1373  	first := true
1374  out:
1375  	for {
1376  		select {
1377  		case <-timer.C:
1378  			// TODO: pick external port  more cleverly
1379  			// TODO: know which ports we are listening to on an external net.
1380  			// TODO: if specific listen port doesn't work then ask for wildcard
1381  			//  listen port?
1382  			// XXX this assumes timeout is in seconds.
1383  			listenPort, e := n.NAT.AddPortMapping(
1384  				"tcp", int(lport), int(lport), "pod listen port",
1385  				20*60,
1386  			)
1387  			if e != nil {
1388  				E.F("can't add UPnP port mapping: %v %n", e)
1389  			}
1390  			if first && e == nil {
1391  				// TODO: look this up periodically to see if upnp domain changed and so did ip.
1392  				externalip, e := n.NAT.GetExternalAddress()
1393  				if e != nil {
1394  					E.F("UPnP can't get external address: %v", e)
1395  					continue out
1396  				}
1397  				na := wire.NewNetAddressIPPort(externalip, uint16(listenPort), n.Services)
1398  				e = n.AddrManager.AddLocalAddress(na, addrmgr.UpnpPrio)
1399  				if e != nil {
1400  					_ = e
1401  					// XXX DeletePortMapping?
1402  				}
1403  				W.F("successfully bound via UPnP to %n", addrmgr.NetAddressKey(na))
1404  				first = false
1405  			}
1406  			timer.Reset(time.Minute * 15)
1407  		case <-n.Quit.Wait():
1408  			break out
1409  		}
1410  	}
1411  	timer.Stop()
1412  	if e := n.NAT.DeletePortMapping(
1413  		"tcp", int(lport),
1414  		int(lport),
1415  	); E.Chk(e) {
1416  		D.F("unable to remove UPnP port mapping: %v %n", e)
1417  	} else {
1418  		D.Ln("successfully cleared UPnP port mapping")
1419  	}
1420  	n.WG.Done()
1421  }
1422  
1423  // OnAddr is invoked when a peer receives an addr bitcoin message and is used to notify the server about advertised addresses.
1424  func (np *NodePeer) OnAddr(
1425  	_ *peer.Peer,
1426  	msg *wire.MsgAddr,
1427  ) {
1428  	// Ignore addresses when running on the simulation test network. This helps prevent the network from becoming
1429  	// another public test network since it will not be able to learn about other peers that have not specifically been
1430  	// provided.
1431  	if (np.Server.Config.Network.V())[0] == 's' {
1432  		return
1433  	}
1434  	// Ignore old style addresses which don't include a timestamp.
1435  	if np.ProtocolVersion() < wire.NetAddressTimeVersion {
1436  		return
1437  	}
1438  	// A message that has no addresses is invalid.
1439  	if len(msg.AddrList) == 0 {
1440  		E.F(
1441  			"command [%s] from %s does not contain any addresses",
1442  			msg.Command(), np.Peer,
1443  		)
1444  		np.Disconnect()
1445  		return
1446  	}
1447  	for _, na := range msg.AddrList {
1448  		// Don't add more address if we're disconnecting.
1449  		if !np.Connected() {
1450  			return
1451  		}
1452  		// Set the timestamp to 5 days ago if it's more than 24 hours in the future so this address is one of the first
1453  		// to be removed when space is needed.
1454  		now := time.Now()
1455  		if na.Timestamp.After(now.Add(time.Minute * 10)) {
1456  			na.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
1457  		}
1458  		// Add address to known addresses for this peer.
1459  		np.AddKnownAddresses([]*wire.NetAddress{na})
1460  	}
1461  	// Add addresses to server address manager. The address manager handles the details of things such as preventing
1462  	// duplicate addresses, max addresses, and last seen updates. XXX bitcoind gives a 2 hour time penalty here, do we
1463  	// want to do the same?
1464  	np.Server.AddrManager.AddAddresses(msg.AddrList, np.NA())
1465  }
1466  
1467  // OnBlock is invoked when a peer receives a block bitcoin message. It blocks until the bitcoin block has been fully
1468  // processed.
1469  func (np *NodePeer) OnBlock(p *peer.Peer, msg *wire.Block, buf []byte) {
1470  	T.Ln("OnBlock from", p.Addr())
1471  	// Convert the raw Block to a util.Block which provides some convenience
1472  	// methods and things such as hash caching.
1473  	block := block2.NewFromBlockAndBytes(msg, buf)
1474  	// Add the block to the known inventory for the peer.
1475  	iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
1476  	np.AddKnownInventory(iv)
1477  	// Queue the block up to be handled by the block manager and intentionally block further receives until the bitcoin
1478  	// block is fully processed and known good or bad.
1479  	//
1480  	// This helps prevent a malicious peer from queuing up a bunch of bad blocks before disconnecting (or being
1481  	// disconnected) and wasting memory.
1482  	//
1483  	// Additionally, this behavior is depended on by at least the block acceptance test tool as the reference
1484  	// implementation processes blocks in the same thread and therefore blocks further messages until the bitcoin block
1485  	// has been fully processed.
1486  	np.Server.SyncManager.QueueBlock(block, np.Peer, np.BlockProcessed)
1487  	<-np.BlockProcessed
1488  }
1489  
1490  // OnFeeFilter is invoked when a peer receives a feefilter bitcoin message and is used by remote peers to request that
1491  // no transactions which have a fee rate lower than provided value are inventoried to them. The peer will be
1492  // disconnected if an invalid fee filter value is provided.
1493  func (np *NodePeer) OnFeeFilter(
1494  	_ *peer.Peer,
1495  	msg *wire.MsgFeeFilter,
1496  ) {
1497  	// Chk that the passed minimum fee is a valid amount.
1498  	if msg.MinFee < 0 || msg.MinFee > int64(amt.MaxSatoshi) {
1499  		D.F(
1500  			"peer %v sent an invalid feefilter '%v' -- disconnecting %s",
1501  			np, amt.Amount(msg.MinFee),
1502  		)
1503  		np.Disconnect()
1504  		return
1505  	}
1506  	atomic.StoreInt64(&np.FeeFilter, msg.MinFee)
1507  }
1508  
1509  // OnFilterAdd is invoked when a peer receives a filteradd bitcoin message and is used by remote peers to add data to an
1510  // already loaded bloom filter. The peer will be disconnected if a filter is not loaded when this message is received or
1511  // the server is not configured to allow bloom filters.
1512  func (np *NodePeer) OnFilterAdd(
1513  	_ *peer.Peer,
1514  	msg *wire.MsgFilterAdd,
1515  ) {
1516  	// Disconnect and/or ban depending on the node bloom services flag and negotiated protocol version.
1517  	if !np.EnforceNodeBloomFlag(msg.Command()) {
1518  		return
1519  	}
1520  	if !np.Filter.IsLoaded() {
1521  		D.F("%s sent a filteradd request with no filter loaded -- disconnecting %s", np)
1522  		np.Disconnect()
1523  		return
1524  	}
1525  	np.Filter.Add(msg.Data)
1526  }
1527  
1528  // OnFilterClear is invoked when a peer receives a filterclear bitcoin message and is used by remote peers to clear an
1529  // already loaded bloom filter. The peer will be disconnected if a filter is not loaded when this message is received or
1530  // the server is not configured to allow bloom filters.
1531  func (np *NodePeer) OnFilterClear(
1532  	_ *peer.Peer,
1533  	msg *wire.MsgFilterClear,
1534  ) {
1535  	// Disconnect and/or ban depending on the node bloom services flag and negotiated protocol version.
1536  	if !np.EnforceNodeBloomFlag(msg.Command()) {
1537  		return
1538  	}
1539  	if !np.Filter.IsLoaded() {
1540  		D.F(
1541  			"%s sent a filterclear request with no filter loaded"+
1542  				" -- disconnecting %s", np,
1543  		)
1544  		np.Disconnect()
1545  		return
1546  	}
1547  	np.Filter.Unload()
1548  }
1549  
1550  // OnFilterLoad is invoked when a peer receives a filterload bitcoin message and it used to load a bloom filter that
1551  // should be used for delivering merkle blocks and associated transactions that match the filter. The peer will be
1552  // disconnected if the server is not configured to allow bloom filters.
1553  func (np *NodePeer) OnFilterLoad(
1554  	_ *peer.Peer,
1555  	msg *wire.MsgFilterLoad,
1556  ) {
1557  	// Disconnect and/or ban depending on the node bloom services flag and negotiated protocol version.
1558  	if !np.EnforceNodeBloomFlag(msg.Command()) {
1559  		return
1560  	}
1561  	np.SetDisableRelayTx(false)
1562  	np.Filter.Reload(msg)
1563  }
1564  
1565  // OnGetAddr is invoked when a peer receives a getaddr bitcoin message and is used to provide the peer with known
1566  // addresses from the address manager.
1567  func (np *NodePeer) OnGetAddr(
1568  	_ *peer.Peer,
1569  	msg *wire.MsgGetAddr,
1570  ) {
1571  	// Don't return any addresses when running on the simulation test network. This helps prevent the network from
1572  	// becoming another public test network since it will not be able to learn about other peers that have not
1573  	// specifically been provided.
1574  	if (np.Server.Config.Network.V())[0] == 's' {
1575  		return
1576  	}
1577  	// Do not accept getaddr requests from outbound peers. This reduces fingerprinting attacks.
1578  	if !np.Inbound() {
1579  		D.Ln("ignoring getaddr request from outbound peer", np)
1580  		return
1581  	}
1582  	// Only allow one getaddr request per connection to discourage address stamping of inv announcements.
1583  	if np.SentAddrs {
1584  		D.F("ignoring repeated getaddr request from peer %s %s", np)
1585  		return
1586  	}
1587  	np.SentAddrs = true
1588  	// Get the current known addresses from the address manager.
1589  	addrCache := np.Server.AddrManager.AddressCache()
1590  	// Push the addresses.
1591  	np.PreparePushAddrMsg(addrCache)
1592  }
1593  
1594  // OnGetBlocks is invoked when a peer receives a getblocks bitcoin message.
1595  func (np *NodePeer) OnGetBlocks(
1596  	_ *peer.Peer,
1597  	msg *wire.MsgGetBlocks,
1598  ) {
1599  	// Find the most recent known block in the best chain based on the block locator and fetch all of the block hashes
1600  	// after it until either wire.MaxBlocksPerMsg have been fetched or the provided stop hash is encountered. Use the
1601  	// block after the genesis block if no other blocks in the provided locator are known.
1602  	//
1603  	// This does mean the client will start over with the genesis block if unknown block locators are provided. This
1604  	// mirrors the behavior in the reference implementation.
1605  	chain := np.Server.Chain
1606  	hashList := chain.LocateBlocks(
1607  		msg.BlockLocatorHashes, &msg.HashStop,
1608  		wire.MaxBlocksPerMsg,
1609  	)
1610  	// Generate inventory message.
1611  	invMsg := wire.NewMsgInv()
1612  	for i := range hashList {
1613  		iv := wire.NewInvVect(wire.InvTypeBlock, &hashList[i])
1614  		e := invMsg.AddInvVect(iv)
1615  		if e != nil {
1616  		}
1617  	}
1618  	// Send the inventory message if there is anything to send.
1619  	if len(invMsg.InvList) > 0 {
1620  		invListLen := len(invMsg.InvList)
1621  		if invListLen == wire.MaxBlocksPerMsg {
1622  			// Intentionally use a copy of the final hash so there is not a reference into the inventory slice which
1623  			// would prevent the entire slice from being eligible for GC as soon as it's sent.
1624  			continueHash := invMsg.InvList[invListLen-1].Hash
1625  			np.ContinueHash = &continueHash
1626  		}
1627  		np.QueueMessage(invMsg, nil)
1628  	}
1629  }
1630  
1631  // OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin message.
1632  func (np *NodePeer) OnGetCFCheckpt(
1633  	_ *peer.Peer,
1634  	msg *wire.MsgGetCFCheckpt,
1635  ) {
1636  	// Ignore getcfcheckpt requests if not in sync.
1637  	if !np.Server.SyncManager.IsCurrent() {
1638  		return
1639  	}
1640  	// We'll also ensure that the remote party is requesting a set of checkpoints for filters that we actually currently
1641  	// maintain.
1642  	switch msg.FilterType {
1643  	case wire.GCSFilterRegular:
1644  		break
1645  	default:
1646  		D.Ln(
1647  			"filter request for unknown checkpoints for filter:",
1648  			msg.FilterType,
1649  		)
1650  		return
1651  	}
1652  	// Now that we know the client is fetching a filter that we know of, we'll fetch the block hashes et each check
1653  	// point interval so we can compare against our cache, and create new check points if necessary.
1654  	blockHashes, e := np.Server.Chain.IntervalBlockHashes(
1655  		&msg.StopHash, wire.CFCheckptInterval,
1656  	)
1657  	if e != nil {
1658  		E.Ln("invalid getcfilters request:", e)
1659  		return
1660  	}
1661  	checkptMsg := wire.NewMsgCFCheckpt(
1662  		msg.FilterType, &msg.StopHash, len(blockHashes),
1663  	)
1664  	// Fetch the current existing cache so we can decide if we need to extend it or if its adequate as is.
1665  	np.Server.CFCheckptCachesMtx.RLock()
1666  	checkptCache := np.Server.CFCheckptCaches[msg.FilterType]
1667  	// If the set of block hashes is beyond the current size of the cache, then we'll expand the size of the cache and
1668  	// also retain the write lock.
1669  	var updateCache bool
1670  	if len(blockHashes) > len(checkptCache) {
1671  		// Now that we know we'll need to modify the size of the cache, we'll release the read lock and grab the write
1672  		// lock to possibly expand the cache size.
1673  		np.Server.CFCheckptCachesMtx.RUnlock()
1674  		np.Server.CFCheckptCachesMtx.Lock()
1675  		defer np.Server.CFCheckptCachesMtx.Unlock()
1676  		// Now that we have the write lock, we'll check again as it's possible that the cache has already been expanded.
1677  		checkptCache = np.Server.CFCheckptCaches[msg.FilterType]
1678  		// If we still need to expand the cache, then We'll mark that we need to update the cache for below and also
1679  		// expand the size of the cache in place.
1680  		if len(blockHashes) > len(checkptCache) {
1681  			updateCache = true
1682  			additionalLength := len(blockHashes) - len(checkptCache)
1683  			newEntries := make([]CFHeaderKV, additionalLength)
1684  			I.F(
1685  				"growing size of checkpoint cache from %v to %v block hashes",
1686  				len(checkptCache), len(blockHashes),
1687  			)
1688  			checkptCache = append(np.Server.CFCheckptCaches[msg.FilterType], newEntries...)
1689  		}
1690  	} else {
1691  		// Otherwise, we'll hold onto the read lock for the remainder of this method.
1692  		defer np.Server.CFCheckptCachesMtx.RUnlock()
1693  		T.F("serving stale cache of size %v", len(checkptCache))
1694  	}
1695  	// Now that we know the cache is of an appropriate size, we'll iterate backwards until the find the block hash. We
1696  	// do this as it's possible a re-org has occurred so items in the db are now in the main china while the cache has
1697  	// been partially invalidated.
1698  	var forkIdx int
1699  	for forkIdx = len(blockHashes); forkIdx > 0; forkIdx-- {
1700  		if checkptCache[forkIdx-1].BlockHash == blockHashes[forkIdx-1] {
1701  			break
1702  		}
1703  	}
1704  	// Now that we know the how much of the cache is relevant for this query, we'll populate our check point message
1705  	// with the cache as is. Shortly below, we'll populate the new elements of the cache.
1706  	for i := 0; i < forkIdx; i++ {
1707  		e = checkptMsg.AddCFHeader(&checkptCache[i].FilterHeader)
1708  		if e != nil {
1709  		}
1710  	}
1711  	// We'll now collect the set of hashes that are beyond our cache so we can look up the filter headers to populate
1712  	// the final cache.
1713  	blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx)
1714  	for i := forkIdx; i < len(blockHashes); i++ {
1715  		blockHashPtrs = append(blockHashPtrs, &blockHashes[i])
1716  	}
1717  	filterHeaders, e := np.Server.CFIndex.FilterHeadersByBlockHashes(
1718  		blockHashPtrs, msg.FilterType,
1719  	)
1720  	E.Ln("error retrieving cfilter headers:", e)
1721  	if e != nil {
1722  		return
1723  	}
1724  	// Now that we have the full set of filter headers, we'll add them to the checkpoint message, and also update our
1725  	// cache in line.
1726  	for i, filterHeaderBytes := range filterHeaders {
1727  		if len(filterHeaderBytes) == 0 {
1728  			W.Ln("could not obtain CF header for", blockHashPtrs[i])
1729  			return
1730  		}
1731  		filterHeader, e := chainhash.NewHash(filterHeaderBytes)
1732  		if e != nil {
1733  			E.Ln("committed filter header deserialize failed:", e)
1734  			return
1735  		}
1736  		e = checkptMsg.AddCFHeader(filterHeader)
1737  		if e != nil {
1738  		}
1739  		// If the new main chain is longer than what's in the cache, then we'll override it beyond the fork point.
1740  		if updateCache {
1741  			checkptCache[forkIdx+i] = CFHeaderKV{
1742  				BlockHash:    blockHashes[forkIdx+i],
1743  				FilterHeader: *filterHeader,
1744  			}
1745  		}
1746  	}
1747  	// Finally, we'll update the cache if we need to, and send the final message back to the requesting peer.
1748  	if updateCache {
1749  		np.Server.CFCheckptCaches[msg.FilterType] = checkptCache
1750  	}
1751  	np.QueueMessage(checkptMsg, nil)
1752  }
1753  
1754  // OnGetCFHeaders is invoked when a peer receives a getcfheader bitcoin message.
1755  func (np *NodePeer) OnGetCFHeaders(
1756  	_ *peer.Peer,
1757  	msg *wire.MsgGetCFHeaders,
1758  ) {
1759  	// Ignore getcfilterheader requests if not in sync.
1760  	if !np.Server.SyncManager.IsCurrent() {
1761  		return
1762  	}
1763  	// We'll also ensure that the remote party is requesting a set of headers for filters that we actually currently
1764  	// maintain.
1765  	switch msg.FilterType {
1766  	case wire.GCSFilterRegular:
1767  		break
1768  	default:
1769  		D.Ln("filter request for unknown headers for filter:", msg.FilterType)
1770  		return
1771  	}
1772  	startHeight := int32(msg.StartHeight)
1773  	maxResults := wire.MaxCFHeadersPerMsg
1774  	// If StartHeight is positive, fetch the predecessor block hash so we can populate the PrevFilterHeader field.
1775  	if msg.StartHeight > 0 {
1776  		startHeight--
1777  		maxResults++
1778  	}
1779  	// Fetch the hashes from the block index.
1780  	hashList, e := np.Server.Chain.HeightToHashRange(
1781  		startHeight, &msg.StopHash, maxResults,
1782  	)
1783  	if e != nil {
1784  		E.Ln("invalid getcfheaders request:", e)
1785  	}
1786  	// This is possible if StartHeight is one greater that the height of StopHash, and we pull a valid range of hashes
1787  	// including the previous filter header.
1788  	if len(hashList) == 0 || (msg.StartHeight > 0 && len(hashList) == 1) {
1789  		D.Ln("no results for getcfheaders request")
1790  		return
1791  	}
1792  	// Create []*chainhash.Hash from []chainhash.Hash to pass to FilterHeadersByBlockHashes.
1793  	hashPtrs := make([]*chainhash.Hash, len(hashList))
1794  	for i := range hashList {
1795  		hashPtrs[i] = &hashList[i]
1796  	}
1797  	// Fetch the raw filter hash bytes from the database for all blocks.
1798  	filterHashes, e := np.Server.CFIndex.FilterHashesByBlockHashes(
1799  		hashPtrs, msg.FilterType,
1800  	)
1801  	if e != nil {
1802  		E.Ln("error retrieving cfilter hashes:", e)
1803  		return
1804  	}
1805  	// Generate cfheaders message and send it.
1806  	headersMsg := wire.NewMsgCFHeaders()
1807  	// Populate the PrevFilterHeader field.
1808  	if msg.StartHeight > 0 {
1809  		prevBlockHash := &hashList[0]
1810  		// Fetch the raw committed filter header bytes from the database.
1811  		headerBytes, e := np.Server.CFIndex.FilterHeaderByBlockHash(
1812  			prevBlockHash, msg.FilterType,
1813  		)
1814  		if e != nil {
1815  			E.Ln("error retrieving CF header:", e)
1816  			return
1817  		}
1818  		if len(headerBytes) == 0 {
1819  			W.Ln("could not obtain CF header for", prevBlockHash)
1820  			return
1821  		}
1822  		// Deserialize the hash into PrevFilterHeader.
1823  		e = headersMsg.PrevFilterHeader.SetBytes(headerBytes)
1824  		if e != nil {
1825  			E.Ln("committed filter header deserialize failed:", e)
1826  			return
1827  		}
1828  		hashList = hashList[1:]
1829  		filterHashes = filterHashes[1:]
1830  	}
1831  	// Populate HeaderHashes.
1832  	for i, hashBytes := range filterHashes {
1833  		if len(hashBytes) == 0 {
1834  			W.Ln("could not obtain CF hash for", hashList[i])
1835  			return
1836  		}
1837  		// Deserialize the hash.
1838  		filterHash, e := chainhash.NewHash(hashBytes)
1839  		if e != nil {
1840  			E.Ln("committed filter hash deserialize failed:", e)
1841  			return
1842  		}
1843  		e = headersMsg.AddCFHash(filterHash)
1844  		if e != nil {
1845  		}
1846  	}
1847  	headersMsg.FilterType = msg.FilterType
1848  	headersMsg.StopHash = msg.StopHash
1849  	np.QueueMessage(headersMsg, nil)
1850  }
1851  
1852  // OnGetCFilters is invoked when a peer receives a getcfilters bitcoin message.
1853  func (np *NodePeer) OnGetCFilters(
1854  	_ *peer.Peer,
1855  	msg *wire.MsgGetCFilters,
1856  ) {
1857  	// Ignore getcfilters requests if not in sync.
1858  	if !np.Server.SyncManager.IsCurrent() {
1859  		return
1860  	}
1861  	// We'll also ensure that the remote party is requesting a set of filters that we actually currently maintain.
1862  	switch msg.FilterType {
1863  	case wire.GCSFilterRegular:
1864  		break
1865  	default:
1866  		D.Ln("filter request for unknown filter:", msg.FilterType)
1867  		return
1868  	}
1869  	hashes, e := np.Server.Chain.HeightToHashRange(
1870  		int32(msg.StartHeight), &msg.StopHash, wire.MaxGetCFiltersReqRange,
1871  	)
1872  	if e != nil {
1873  		E.Ln("invalid getcfilters request:", e)
1874  		return
1875  	}
1876  	// Create []*chainhash.Hash from []chainhash.Hash to pass to FiltersByBlockHashes.
1877  	hashPtrs := make([]*chainhash.Hash, len(hashes))
1878  	for i := range hashes {
1879  		hashPtrs[i] = &hashes[i]
1880  	}
1881  	filters, e := np.Server.CFIndex.FiltersByBlockHashes(
1882  		hashPtrs, msg.FilterType,
1883  	)
1884  	if e != nil {
1885  		E.Ln("error retrieving cfilters:", e)
1886  		return
1887  	}
1888  	for i, filterBytes := range filters {
1889  		if len(filterBytes) == 0 {
1890  			W.Ln("could not obtain cfilter for", hashes[i])
1891  			return
1892  		}
1893  		filterMsg := wire.NewMsgCFilter(
1894  			msg.FilterType, &hashes[i], filterBytes,
1895  		)
1896  		np.QueueMessage(filterMsg, nil)
1897  	}
1898  }
1899  
1900  // OnGetData is invoked when a peer receives a getdata bitcoin message and is used to deliver block and transaction
1901  // information.
1902  func (np *NodePeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
1903  	numAdded := 0
1904  	notFound := wire.NewMsgNotFound()
1905  	length := len(msg.InvList)
1906  	// A decaying ban score increase is applied to prevent exhausting resources with unusually large inventory queries.
1907  	//
1908  	// Requesting more than the maximum inventory vector length within a short period of time yields a score above the
1909  	// default ban threshold.
1910  	//
1911  	// Sustained bursts of small requests are not penalized as that would potentially ban peers performing IBD. This
1912  	// incremental score decays each minute to half of its value.
1913  	if np.AddBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") {
1914  		return
1915  	}
1916  	// We wait on this wait channel periodically to prevent queuing far more data than we can send in a reasonable time,
1917  	// wasting memory.
1918  	//
1919  	// The waiting occurs after the database fetch for the next one to provide a little pipelining.
1920  	var waitChan qu.C
1921  	doneChan := qu.Ts(1)
1922  	for i, iv := range msg.InvList {
1923  		var c qu.C
1924  		// If this will be the last message we send.
1925  		if i == length-1 && len(notFound.InvList) == 0 {
1926  			c = doneChan
1927  		} else if (i+1)%3 == 0 {
1928  			// Buffered so as to not make the send goroutine block.
1929  			c = qu.Ts(1)
1930  		}
1931  		var e error
1932  		switch iv.Type {
1933  		// case wire.InvTypeWitnessTx:
1934  		// 	e = np.Server.PushTxMsg(
1935  		// 		np, &iv.Hash, c, waitChan,
1936  		// 		wire.WitnessEncoding,
1937  		// 	)
1938  		case wire.InvTypeTx:
1939  			e = np.Server.PushTxMsg(
1940  				np, &iv.Hash, c, waitChan,
1941  				wire.BaseEncoding,
1942  			)
1943  		// case wire.InvTypeWitnessBlock:
1944  		// 	e = np.Server.PushBlockMsg(
1945  		// 		np, &iv.Hash, c, waitChan,
1946  		// 		wire.WitnessEncoding,
1947  		// 	)
1948  		case wire.InvTypeBlock:
1949  			e = np.Server.PushBlockMsg(
1950  				np, &iv.Hash, c, waitChan,
1951  				wire.BaseEncoding,
1952  			)
1953  		// case wire.InvTypeFilteredWitnessBlock:
1954  		// 	e = np.Server.PushMerkleBlockMsg(
1955  		// 		np, &iv.Hash, c, waitChan,
1956  		// 		wire.WitnessEncoding,
1957  		// 	)
1958  		case wire.InvTypeFilteredBlock:
1959  			e = np.Server.PushMerkleBlockMsg(
1960  				np, &iv.Hash, c, waitChan,
1961  				wire.BaseEncoding,
1962  			)
1963  		default:
1964  			W.Ln("unknown type in inventory request", iv.Type)
1965  			continue
1966  		}
1967  		if e != nil {
1968  			e := notFound.AddInvVect(iv)
1969  			if e != nil {
1970  			}
1971  			// When there is a failure fetching the final entry and the done channel was sent in due to there being no
1972  			// outstanding not found inventory, consume it here because there is now not found inventory that will use
1973  			// the channel momentarily.
1974  			if i == len(msg.InvList)-1 && c != nil {
1975  				<-c
1976  			}
1977  		}
1978  		numAdded++
1979  		waitChan = c
1980  	}
1981  	if len(notFound.InvList) != 0 {
1982  		np.QueueMessage(notFound, doneChan)
1983  	}
1984  	// Wait for messages to be sent. We can send quite a lot of data at this point and this will keep the peer busy for
1985  	// a decent amount of time.
1986  	//
1987  	// We don't process anything else by them in this time so that we have an idea of when we should hear back from them
1988  	// - else the idle timeout could fire when we were only half done sending the blocks.
1989  	if numAdded > 0 {
1990  		<-doneChan
1991  	}
1992  }
1993  
1994  // OnGetHeaders is invoked when a peer receives a getheaders bitcoin message.
1995  func (np *NodePeer) OnGetHeaders(
1996  	_ *peer.Peer,
1997  	msg *wire.MsgGetHeaders,
1998  ) {
1999  	// Ignore getheaders requests if not in sync.
2000  	if !np.Server.SyncManager.IsCurrent() {
2001  		return
2002  	}
2003  	// Find the most recent known block in the best chain based on the block locator and fetch all of the headers after
2004  	// it until either wire.MaxBlockHeadersPerMsg have been fetched or the provided stop hash is encountered. Use the
2005  	// block after the genesis block if no other blocks in the provided locator are known. This does mean the client
2006  	// will start over with the genesis block if unknown block locators are provided. This mirrors the behavior in the
2007  	// reference implementation.
2008  	chain := np.Server.Chain
2009  	headers := chain.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop)
2010  	// Send found headers to the requesting peer.
2011  	blockHeaders := make([]*wire.BlockHeader, len(headers))
2012  	for i := range headers {
2013  		blockHeaders[i] = &headers[i]
2014  	}
2015  	np.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil)
2016  }
2017  
2018  // OnHeaders is invoked when a peer receives a headers bitcoin message. The message is passed down to the sync manager.
2019  func (np *NodePeer) OnHeaders(
2020  	_ *peer.Peer,
2021  	msg *wire.MsgHeaders,
2022  ) {
2023  	np.Server.SyncManager.QueueHeaders(msg, np.Peer)
2024  }
2025  
2026  // OnInv is invoked when a peer receives an inv bitcoin message and is used to examine the inventory being advertised by
2027  // the remote peer and react accordingly. We pass the message down to blockmanager which will call QueueMessage with any
2028  // appropriate responses.
2029  func (np *NodePeer) OnInv(
2030  	_ *peer.Peer,
2031  	msg *wire.MsgInv,
2032  ) {
2033  	if !np.Server.Config.BlocksOnly.True() {
2034  		if len(msg.InvList) > 0 {
2035  			np.Server.SyncManager.QueueInv(msg, np.Peer)
2036  		}
2037  		return
2038  	}
2039  	newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList)))
2040  	for _, invVect := range msg.InvList {
2041  		if invVect.Type == wire.InvTypeTx {
2042  			T.F("ignoring tx %v in inv from %v -- blocksonly enabled", invVect.Hash, np)
2043  			if np.ProtocolVersion() >= wire.BIP0037Version {
2044  				I.F("peer %v is announcing transactions -- disconnecting", np)
2045  				np.Disconnect()
2046  				return
2047  			}
2048  			continue
2049  		}
2050  		e := newInv.AddInvVect(invVect)
2051  		if e != nil {
2052  			E.Ln("failed to add inventory vector:", e)
2053  			break
2054  		}
2055  	}
2056  	if len(newInv.InvList) > 0 {
2057  		np.Server.SyncManager.QueueInv(newInv, np.Peer)
2058  	}
2059  }
2060  
2061  // OnMemPool is invoked when a peer receives a mempool bitcoin message. It creates and sends an inventory message with
2062  // the contents of the memory pool up to the maximum inventory allowed per message. When the peer has a bloom filter
2063  // loaded, the contents are filtered accordingly.
2064  func (np *NodePeer) OnMemPool(
2065  	_ *peer.Peer,
2066  	msg *wire.MsgMemPool,
2067  ) {
2068  	// Only allow mempool requests if the server has bloom filtering enabled.
2069  	if np.Server.Services&wire.SFNodeBloom != wire.SFNodeBloom {
2070  		D.Ln(
2071  			"peer", np, "sent mempool request with bloom filtering disabled"+
2072  				" -- disconnecting",
2073  		)
2074  		np.Disconnect()
2075  		return
2076  	}
2077  	// A decaying ban score increase is applied to prevent flooding. The ban score accumulates and passes the ban
2078  	// threshold if a burst of mempool messages comes from a peer. The score decays each minute to half of its value.
2079  	np.AddBanScore(0, 33, "mempool")
2080  	// Generate inventory message with the available transactions in the transaction memory pool. Limit it to the max
2081  	// allowed inventory per message.
2082  	//
2083  	// The NewMsgInvSizeHint function automatically limits the passed hint to the maximum allowed, so it's safe to pass
2084  	// it without double checking it here.
2085  	txMemPool := np.Server.TxMemPool
2086  	txDescs := txMemPool.TxDescs()
2087  	invMsg := wire.NewMsgInvSizeHint(uint(len(txDescs)))
2088  	for _, txDesc := range txDescs {
2089  		// Either add all transactions when there is no bloom filter, or only the transactions that match the filter
2090  		// when there is one.
2091  		if !np.Filter.IsLoaded() || np.Filter.MatchTxAndUpdate(txDesc.Tx) {
2092  			iv := wire.NewInvVect(wire.InvTypeTx, txDesc.Tx.Hash())
2093  			e := invMsg.AddInvVect(iv)
2094  			if e != nil {
2095  			}
2096  			if len(invMsg.InvList)+1 > wire.MaxInvPerMsg {
2097  				break
2098  			}
2099  		}
2100  	}
2101  	// Send the inventory message if there is anything to send.
2102  	if len(invMsg.InvList) > 0 {
2103  		np.QueueMessage(invMsg, nil)
2104  	}
2105  }
2106  
2107  // OnRead is invoked when a peer receives a message and it is used to update the bytes received by the server.
2108  func (np *NodePeer) OnRead(
2109  	_ *peer.Peer,
2110  	bytesRead int, msg wire.Message, e error,
2111  ) {
2112  	np.Server.AddBytesReceived(uint64(bytesRead))
2113  }
2114  
2115  // OnTx is invoked when a peer receives a tx bitcoin message. It blocks until the bitcoin transaction has been fully
2116  // processed. Unlock the block handler this does not serialize all transactions through a single thread transactions
2117  // don't rely on the previous one in a linear fashion like blocks.
2118  func (np *NodePeer) OnTx(
2119  	_ *peer.Peer,
2120  	msg *wire.MsgTx,
2121  ) {
2122  	if np.Server.Config.BlocksOnly.True() {
2123  		T.F("ignoring tx %v from %v - blocksonly enabled", msg.TxHash(), np)
2124  		return
2125  	}
2126  	// Add the transaction to the known inventory for the peer. Convert the raw MsgTx to a util.Tx which provides some
2127  	// convenience methods and things such as hash caching.
2128  	tx := util.NewTx(msg)
2129  	iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
2130  	np.AddKnownInventory(iv)
2131  	// Queue the transaction up to be handled by the sync manager and intentionally block further receives until the
2132  	// transaction is fully processed and known good or bad.
2133  	//
2134  	// This helps prevent a malicious peer from queuing up a bunch of bad transactions before disconnecting (or being
2135  	// disconnected) and wasting memory.
2136  	np.Server.SyncManager.QueueTx(tx, np.Peer, np.TxProcessed)
2137  	<-np.TxProcessed
2138  }
2139  
2140  // OnVersion is invoked when a peer receives a version bitcoin message and is used to negotiate the protocol version
2141  // details as well as kick start the communications.
2142  func (np *NodePeer) OnVersion(
2143  	_ *peer.Peer,
2144  	msg *wire.MsgVersion,
2145  ) *wire.MsgReject {
2146  	// Update the address manager with the advertised services for outbound connections in case they have changed. This
2147  	// is not done for inbound connections to help prevent malicious behavior and is skipped when running on the
2148  	// simulation test network since it is only intended to connect to specified peers and actively avoids advertising
2149  	// and connecting to discovered peers.
2150  	//
2151  	// NOTE: This is done before rejecting peers that are too old to ensure it is updated regardless in the case a new
2152  	// minimum protocol version is enforced and the remote node has not upgraded yet.
2153  	isInbound := np.Inbound()
2154  	remoteAddr := np.NA()
2155  	addrManager := np.Server.AddrManager
2156  	if !((np.Server.Config.Network.V())[0] == 's') && !isInbound {
2157  		addrManager.SetServices(remoteAddr, msg.Services)
2158  	}
2159  	// Ignore peers that have a protocol version that is too old. The peer negotiation logic will disconnect it after
2160  	// this callback returns.
2161  	if msg.ProtocolVersion < int32(peer.MinAcceptableProtocolVersion) {
2162  		return nil
2163  	}
2164  	// Reject outbound peers that are not full nodes.
2165  	wantServices := wire.SFNodeNetwork
2166  	if !isInbound && !GetHasServices(msg.Services, wantServices) {
2167  		missingServices := wantServices & ^msg.Services
2168  		D.F(
2169  			"rejecting peer %s with services %v due to not providing"+
2170  				" desired services %v %s", np.Peer, msg.Services, missingServices,
2171  		)
2172  		reason := fmt.Sprintf(
2173  			"required services %#x not offered",
2174  			uint64(missingServices),
2175  		)
2176  		return wire.NewMsgReject(msg.Command(), wire.RejectNonstandard, reason)
2177  	}
2178  	// Update the address manager and request known addresses from the remote peer
2179  	// for outbound connections.
2180  	//
2181  	// This is skipped when running on the simulation test network since it is only
2182  	// intended to connect to specified peers and actively avoids advertising and
2183  	// connecting to discovered peers.
2184  	if !((np.Server.Config.Network.V())[0] == 's') && !isInbound {
2185  		// After soft-fork activation, only make outbound connection to peers if they
2186  		// flag that they're segwit enabled.
2187  		// chain := np.Server.Chain
2188  		// segwitActive, e := chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
2189  		// if e != nil  {
2190  		// 			// 	Error("unable to query for segwit soft-fork state:", e)
2191  		// 	return nil
2192  		// }
2193  		// if segwitActive && !np.IsWitnessEnabled() {
2194  		// 	I.Ln(
2195  		// 		"disconnecting non-segwit peer", np,
2196  		// 		"as it isn't segwit enabled and we need more segwit enabled peers",
2197  		// 	)
2198  		// 	np.Disconnect()
2199  		// 	return nil
2200  		// }
2201  		// Advertise the local address when the server accepts incoming connections and it believes itself to be close
2202  		// to the best known tip.
2203  		if !np.Server.Config.DisableListen.True() && np.Server.SyncManager.IsCurrent() {
2204  			// Get address that best matches.
2205  			lna := addrManager.GetBestLocalAddress(remoteAddr)
2206  			if addrmgr.IsRoutable(lna) {
2207  				// Filter addresses the peer already knows about.
2208  				addresses := []*wire.NetAddress{lna}
2209  				np.PreparePushAddrMsg(addresses)
2210  			}
2211  		}
2212  		// Request known addresses if the server address manager needs more and the peer has a protocol version new
2213  		// enough to include a timestamp with addresses.
2214  		hasTimestamp := np.ProtocolVersion() >= wire.NetAddressTimeVersion
2215  		if addrManager.NeedMoreAddresses() && hasTimestamp {
2216  			np.QueueMessage(wire.NewMsgGetAddr(), nil)
2217  		}
2218  		// Mark the address as a known good address.
2219  		addrManager.Good(remoteAddr)
2220  	}
2221  	// Add the remote peer time as a sample for creating an offset against the local clock to keep the network time in
2222  	// sync.
2223  	np.Server.TimeSource.AddTimeSample(np.Addr(), msg.Timestamp)
2224  	// Signal the sync manager this peer is a new sync candidate.
2225  	np.Server.SyncManager.NewPeer(np.Peer)
2226  	// Choose whether or not to relay transactions before a filter command is received.
2227  	np.SetDisableRelayTx(msg.DisableRelayTx)
2228  	hn := np.Server.HighestKnown.Load()
2229  	if msg.LastBlock >= hn {
2230  		np.Server.HighestKnown.Store(msg.LastBlock)
2231  	}
2232  	// Add valid peer to the server.
2233  	np.Server.AddPeer(np)
2234  	return nil
2235  }
2236  
2237  // OnWrite is invoked when a peer sends a message and it is used to update the bytes sent by the server.
2238  func (np *NodePeer) OnWrite(
2239  	_ *peer.Peer, bytesWritten int,
2240  	msg wire.Message, e error,
2241  ) {
2242  	np.Server.AddBytesSent(uint64(bytesWritten))
2243  }
2244  
2245  // AddBanScore increases the persistent and decaying ban score fields by the values passed as parameters. If the
2246  // resulting score exceeds half of the ban threshold, a warning is logged including the reason provided. Further, if the
2247  // score is above the ban threshold, the peer will be banned and disconnected.
2248  func (np *NodePeer) AddBanScore(persistent, transient uint32, reason string) bool {
2249  	// No warning is logged and no score is calculated if banning is disabled.
2250  	if np.Server.Config.DisableBanning.True() {
2251  		return false
2252  	}
2253  	if np.IsWhitelisted {
2254  		D.F("misbehaving whitelisted peer %s: %s %s", np, reason)
2255  		return false
2256  	}
2257  	warnThreshold := np.Server.Config.BanThreshold.V() >> 1
2258  	if transient == 0 && persistent == 0 {
2259  		// The score is not being increased, but a warning message is still logged if the score is above the warn
2260  		// threshold.
2261  		score := np.BanScore.Int()
2262  		if int(score) > warnThreshold {
2263  			W.F("misbehaving peer %s: %s -- ban score is %d, it was not increased this time", np, reason, score)
2264  		}
2265  		return false
2266  	}
2267  	score := np.BanScore.Increase(persistent, transient)
2268  	if int(score) > warnThreshold {
2269  		W.F("misbehaving peer %s: %s -- ban score increased to %d", np, reason, score)
2270  		if int(score) > np.Server.Config.BanThreshold.V() {
2271  			W.F("misbehaving peer %s -- banning and disconnecting", np)
2272  			np.Server.BanPeer(np)
2273  			np.Disconnect()
2274  			return true
2275  		}
2276  	}
2277  	return false
2278  }
2279  
2280  // AddKnownAddresses adds the given addresses to the set of known addresses to the peer to prevent sending duplicate
2281  // addresses.
2282  func (np *NodePeer) AddKnownAddresses(addresses []*wire.NetAddress) {
2283  	for _, na := range addresses {
2284  		np.KnownAddresses[addrmgr.NetAddressKey(na)] = struct{}{}
2285  	}
2286  }
2287  
2288  // IsAddressKnown true if the given address is already known to the peer.
2289  func (np *NodePeer) IsAddressKnown(na *wire.NetAddress) bool {
2290  	_, exists := np.KnownAddresses[addrmgr.NetAddressKey(na)]
2291  	return exists
2292  }
2293  
2294  // EnforceNodeBloomFlag disconnects the peer if the server is not configured to allow bloom filters. Additionally, if
2295  // the peer has negotiated to a protocol version that is high enough to observe the bloom filter service support bit, it
2296  // will be banned since it is intentionally violating the protocol.
2297  func (np *NodePeer) EnforceNodeBloomFlag(cmd string) bool {
2298  	if np.Server.Services&wire.SFNodeBloom != wire.SFNodeBloom {
2299  		// Ban the peer if the protocol version is high enough that the peer is knowingly violating the protocol and
2300  		// banning is enabled.
2301  		//
2302  		// NOTE: Even though the addBanScore function already examines whether or not banning is enabled, it is checked
2303  		// here as well to ensure the violation is logged and the peer is disconnected regardless.
2304  		if np.ProtocolVersion() >= wire.BIP0111Version &&
2305  			!np.Server.Config.DisableBanning.True() {
2306  			// Disconnect the peer regardless of whether it was banned.
2307  			np.AddBanScore(100, 0, cmd)
2308  			np.Disconnect()
2309  			return false
2310  		}
2311  		// Disconnect the peer regardless of protocol version or banning state.
2312  		D.F("%s sent an unsupported %s request -- disconnecting %s", np, cmd)
2313  		np.Disconnect()
2314  		return false
2315  	}
2316  	return true
2317  }
2318  
2319  // GetNewestBlock returns the current best block hash and height using the format required by the configuration for the
2320  // peer package.
2321  func (np *NodePeer) GetNewestBlock() (*chainhash.Hash, int32, error) {
2322  	best := np.Server.Chain.BestSnapshot()
2323  	return &best.Hash, best.Height, nil
2324  }
2325  
2326  // PreparePushAddrMsg sends an addr message to the connected peer using the provided addresses.
2327  func (np *NodePeer) PreparePushAddrMsg(addresses []*wire.NetAddress) {
2328  	// Filter addresses already known to the peer.
2329  	addrs := make([]*wire.NetAddress, 0, len(addresses))
2330  	for _, addr := range addresses {
2331  		if !np.IsAddressKnown(addr) {
2332  			addrs = append(addrs, addr)
2333  		}
2334  	}
2335  	known, e := np.PushAddrMsg(addrs)
2336  	if e != nil {
2337  		E.F("can't push address message to %s: %v", np.Peer, e)
2338  		np.Disconnect()
2339  		return
2340  	}
2341  	np.AddKnownAddresses(known)
2342  }
2343  
2344  // IsRelayTxDisabled returns whether or not relaying of transactions for the given peer is disabled.
2345  //
2346  // It is safe for concurrent access.
2347  func (np *NodePeer) IsRelayTxDisabled() bool {
2348  	np.RelayMtx.Lock()
2349  	isDisabled := np.DisableRelayTx
2350  	np.RelayMtx.Unlock()
2351  	return isDisabled
2352  }
2353  
2354  // SetDisableRelayTx toggles relaying of transactions for the given peer. It is safe for concurrent access.
2355  func (np *NodePeer) SetDisableRelayTx(disable bool) {
2356  	np.RelayMtx.Lock()
2357  	np.DisableRelayTx = disable
2358  	np.RelayMtx.Unlock()
2359  }
2360  
2361  // Len returns the number of checkpoints in the slice. It is part of the sort.Interface implementation.
2362  func (s CheckpointSorter) Len() int { return len(s) }
2363  
2364  //	Less returns whether the checkpoint with index i should txsort before the
2365  // checkpoint with index j. It is part of the sort.Interface implementation.
2366  func (s CheckpointSorter) Less(i, j int) bool {
2367  	return s[i].Height < s[j].
2368  		Height
2369  }
2370  
2371  // Swap swaps the checkpoints at the passed indices. It is part of the sort.Interface implementation.
2372  func (s CheckpointSorter) Swap(i, j int) {
2373  	s[i], s[j] = s[j], s[i]
2374  }
2375  
2376  // Network returns the network. This is part of the net.Addr interface.
2377  func (a SimpleAddr) Network() string {
2378  	return a.Net
2379  }
2380  
2381  // String returns the address. This is part of the net.Addr interface.
2382  func (a SimpleAddr) String() string {
2383  	return a.Addr
2384  }
2385  
2386  // AddLocalAddress adds an address that this node is listening on to the address manager so that it may be relayed to
2387  // peers.
2388  //
2389  // TODO: having just essentially rewritten this and only just finding it,
2390  //  this function needs to be split to separate the address manager from the
2391  //  listening address processing, and configuration of listening IP addresses is
2392  //  unnecessary (for the controller's unicast elements)
2393  func AddLocalAddress(addrMgr *addrmgr.AddrManager, addr string, services wire.ServiceFlag) (e error) {
2394  	host, portStr, e := net.SplitHostPort(addr)
2395  	if e != nil {
2396  		return e
2397  	}
2398  	var port uint64
2399  	port, e = strconv.ParseUint(portStr, 10, 16)
2400  	if e != nil {
2401  		return e
2402  	}
2403  	if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() {
2404  		// If bound to unspecified address, advertise all local interfaces
2405  		var addrs []net.Addr
2406  		addrs, e = net.InterfaceAddrs()
2407  		if e != nil {
2408  			return e
2409  		}
2410  		for _, addr := range addrs {
2411  			var ifaceIP net.IP
2412  			ifaceIP, _, e = net.ParseCIDR(addr.String())
2413  			if e != nil {
2414  				continue
2415  			}
2416  			// If bound to 0.0.0.0, do not add IPv6 interfaces and if bound to ::, do not add IPv4 interfaces.
2417  			if (ip.To4() == nil) != (ifaceIP.To4() == nil) {
2418  				continue
2419  			}
2420  			netAddr := wire.NewNetAddressIPPort(ifaceIP, uint16(port), services)
2421  			e = addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio)
2422  			if e != nil {
2423  				T.Ln(e)
2424  			}
2425  		}
2426  	} else {
2427  		var netAddr *wire.NetAddress
2428  		netAddr, e = addrMgr.HostToNetAddress(host, uint16(port), services)
2429  		if e != nil {
2430  			return e
2431  		}
2432  		e = addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio)
2433  		if e != nil {
2434  		}
2435  	}
2436  	return nil
2437  }
2438  
2439  // AddrStringToNetAddr takes an address in the form of 'host:port' and returns a net.Addr which maps to the original
2440  // address with any host names resolved to IP addresses. It also handles tor addresses properly by returning a net.Addr
2441  // that encapsulates the address.
2442  func AddrStringToNetAddr(config *config.Config, stateCfg *active.Config, addr string) (net.Addr, error) {
2443  	host, strPort, e := net.SplitHostPort(addr)
2444  	if e != nil {
2445  		return nil, e
2446  	}
2447  	port, e := strconv.Atoi(strPort)
2448  	if e != nil {
2449  		return nil, e
2450  	}
2451  	// Skip if host is already an IP address.
2452  	if ip := net.ParseIP(host); ip != nil {
2453  		return &net.TCPAddr{
2454  				IP:   ip,
2455  				Port: port,
2456  			},
2457  			nil
2458  	}
2459  	// Tor addresses cannot be resolved to an IP, so just return an onion address instead.
2460  	if strings.HasSuffix(host, ".onion") {
2461  		if !config.OnionEnabled.True() {
2462  			return nil, errors.New("tor has been disabled")
2463  		}
2464  		return &OnionAddr{Addr: addr}, nil
2465  	}
2466  	// Attempt to look up an IP address associated with the parsed host.
2467  	ips, e := Lookup(stateCfg)(host)
2468  	if e != nil {
2469  		return nil, e
2470  	}
2471  	if len(ips) == 0 {
2472  		return nil, fmt.Errorf("no addresses found for %s", host)
2473  	}
2474  	return &net.TCPAddr{
2475  			IP:   ips[0],
2476  			Port: port,
2477  		},
2478  		nil
2479  }
2480  
2481  // DisconnectPeer attempts to drop the connection of a targeted peer in the passed peer list. Targets are identified via
2482  // usage of the passed `compareFunc`, which should return `true` if the passed peer is the target peer.
2483  //
2484  // This function returns true on success and false if the peer is unable to be located. If the peer is found, and the
2485  // passed callback: `whenFound' isn't nil, we call it with the peer as the argument before it is removed from the
2486  // peerList, and is disconnected from the server.
2487  func DisconnectPeer(
2488  	peerList map[int32]*NodePeer,
2489  	compareFunc func(*NodePeer) bool, whenFound func(*NodePeer),
2490  ) bool {
2491  	for addr, nodePeer := range peerList {
2492  		if compareFunc(nodePeer) {
2493  			if whenFound != nil {
2494  				whenFound(nodePeer)
2495  			}
2496  			// This is ok because we are not continuing to iterate so won't corrupt the loop.
2497  			delete(peerList, addr)
2498  			nodePeer.Disconnect()
2499  			return true
2500  		}
2501  	}
2502  	return false
2503  }
2504  
2505  //	DynamicTickDuration is a convenience function used to dynamically choose a
2506  // tick duration based on remaining time. It is primarily used during server shutdown to make shutdown warnings more
2507  // frequent as the shutdown time approaches.
2508  func DynamicTickDuration(remaining time.Duration) time.Duration {
2509  	switch {
2510  	case remaining <= time.Second*5:
2511  		return time.Second
2512  	case remaining <= time.Second*15:
2513  		return time.Second * 5
2514  	case remaining <= time.Minute:
2515  		return time.Second * 15
2516  	case remaining <= time.Minute*5:
2517  		return time.Minute
2518  	case remaining <= time.Minute*15:
2519  		return time.Minute * 5
2520  	case remaining <= time.Hour:
2521  		return time.Minute * 15
2522  	}
2523  	return time.Hour
2524  }
2525  
2526  // GetHasServices returns whether or not the provided advertised service flags have all of the provided desired service
2527  // flags set.
2528  func GetHasServices(advertised, desired wire.ServiceFlag) bool {
2529  	return advertised&desired == desired
2530  }
2531  
2532  // InitListeners initializes the configured net listeners and adds any bound addresses to the address manager. Returns
2533  // the listeners and a upnp.NAT interface, which is non-nil if UPnP is in use.
2534  func InitListeners(
2535  	config *config.Config, activeNet *chaincfg.Params,
2536  	aMgr *addrmgr.AddrManager, listenAddrs []string, services wire.ServiceFlag,
2537  ) (listeners []net.Listener, nat upnp.NAT, e error) {
2538  	// Listen for TCP connections at the configured addresses
2539  	T.Ln("listenAddrs ", listenAddrs)
2540  	var netAddrs []net.Addr
2541  	netAddrs, e = ParseListeners(listenAddrs)
2542  	if e != nil {
2543  		return nil, nil, e
2544  	}
2545  	T.Ln("netAddrs ", netAddrs)
2546  	listeners = make([]net.Listener, 0, len(netAddrs))
2547  	for _, addr := range netAddrs {
2548  		T.Ln("addr ", addr, " ", addr.Network(), " ", addr.String())
2549  		listener, e := net.Listen(addr.Network(), addr.String())
2550  		if e != nil {
2551  			W.F("can't listen on %s: %v %s", addr, e)
2552  			continue
2553  		}
2554  		listeners = append(listeners, listener)
2555  	}
2556  	if len(config.ExternalIPs.S()) != 0 {
2557  		defaultPort, e := strconv.ParseUint(activeNet.DefaultPort, 10, 16)
2558  		if e != nil {
2559  			E.F("can not parse default port %s for active chain: %v", activeNet.DefaultPort, e)
2560  			return nil, nil, e
2561  		}
2562  		for _, sip := range config.ExternalIPs.S() {
2563  			eport := uint16(defaultPort)
2564  			host, portstr, e := net.SplitHostPort(sip)
2565  			if e != nil {
2566  				// no port, use default.
2567  				host = sip
2568  			} else {
2569  				var port uint64
2570  				port, e = strconv.ParseUint(portstr, 10, 16)
2571  				if e != nil {
2572  					E.F(
2573  						"can not parse port from %s for externalip: %v",
2574  						sip, e,
2575  					)
2576  					continue
2577  				}
2578  				eport = uint16(port)
2579  			}
2580  			var na *wire.NetAddress
2581  			na, e = aMgr.HostToNetAddress(host, eport, services)
2582  			if e != nil {
2583  				E.F("not adding %s as externalip: %v", sip, e)
2584  				continue
2585  			}
2586  			e = aMgr.AddLocalAddress(na, addrmgr.ManualPrio)
2587  			if e != nil {
2588  				E.F("skipping specified external IP: %v", e)
2589  			}
2590  		}
2591  	} else {
2592  		if config.UPNP.True() {
2593  			var e error
2594  			nat, e = upnp.Discover()
2595  			if e != nil {
2596  				E.F("can't discover upnp: %v", e)
2597  			}
2598  			// nil upnp.nat here is fine, just means no upnp on network.
2599  		}
2600  		// Add bound addresses to address manager to be advertised to peers.
2601  		for _, listener := range listeners {
2602  			addr := listener.Addr().String()
2603  			e := AddLocalAddress(aMgr, addr, services)
2604  			if e != nil {
2605  				E.F("skipping bound address %s: %v", addr, e)
2606  			}
2607  		}
2608  	}
2609  	return listeners, nat, nil
2610  }
2611  
2612  // GetIsWhitelisted returns whether the IP address is included in the whitelisted networks and IPs.
2613  func GetIsWhitelisted(statecfg *active.Config, addr net.Addr) bool {
2614  	if len(statecfg.ActiveWhitelists) == 0 {
2615  		return false
2616  	}
2617  	var host string
2618  	var e error
2619  	host, _, e = net.SplitHostPort(addr.String())
2620  	if e != nil {
2621  		E.F("unable to SplitHostPort on '%s': %v", addr, e)
2622  		return false
2623  	}
2624  	ip := net.ParseIP(host)
2625  	if ip == nil {
2626  		W.F("unable to parse IP '%s'", addr)
2627  		return false
2628  	}
2629  	for _, ipnet := range statecfg.ActiveWhitelists {
2630  		if ipnet.Contains(ip) {
2631  			return true
2632  		}
2633  	}
2634  	return false
2635  }
2636  
2637  // MergeCheckpoints returns two slices of checkpoints merged into one slice such that the checkpoints are sorted by
2638  // height.
2639  //
2640  // In the case the additional checkpoints contain a checkpoint with the same height as a checkpoint in the default
2641  // checkpoints, the additional checkpoint will take precedence and overwrite the default one.
2642  func MergeCheckpoints(defaultCheckpoints, additional []chaincfg.Checkpoint) []chaincfg.Checkpoint {
2643  	//	Create a map of the additional checkpoints to remove duplicates while
2644  	//	leaving the most recently-specified checkpoint.
2645  	extra := make(map[int32]chaincfg.Checkpoint)
2646  	for _, checkpoint := range additional {
2647  		extra[checkpoint.Height] = checkpoint
2648  	}
2649  	// Add all default checkpoints that do not have an override in the additional checkpoints.
2650  	numDefault := len(defaultCheckpoints)
2651  	checkpoints := make([]chaincfg.Checkpoint, 0, numDefault+len(extra))
2652  	for _, checkpoint := range defaultCheckpoints {
2653  		if _, exists := extra[checkpoint.Height]; !exists {
2654  			checkpoints = append(checkpoints, checkpoint)
2655  		}
2656  	}
2657  	// Append the additional checkpoints and return the sorted results.
2658  	for _, checkpoint := range extra {
2659  		checkpoints = append(checkpoints, checkpoint)
2660  	}
2661  	sort.Sort(CheckpointSorter(checkpoints))
2662  	return checkpoints
2663  }
2664  
2665  // NewPeerConfig returns the configuration for the given ServerPeer.
2666  func NewPeerConfig(sp *NodePeer) *peer.Config {
2667  	// to work around the lack of a single identifier in the protocol, for dealing
2668  	// with testing situations with multiple nodes on one IP address (and there is a
2669  	// to-do on this) we generate a random 32 bit value, convert to hex and set it
2670  	// as the first of the user agent comments, which we can then use to count
2671  	// individual connections properly
2672  	return &peer.Config{
2673  		Listeners: peer.MessageListeners{
2674  			OnVersion:      sp.OnVersion,
2675  			OnMemPool:      sp.OnMemPool,
2676  			OnTx:           sp.OnTx,
2677  			OnBlock:        sp.OnBlock,
2678  			OnInv:          sp.OnInv,
2679  			OnHeaders:      sp.OnHeaders,
2680  			OnGetData:      sp.OnGetData,
2681  			OnGetBlocks:    sp.OnGetBlocks,
2682  			OnGetHeaders:   sp.OnGetHeaders,
2683  			OnGetCFilters:  sp.OnGetCFilters,
2684  			OnGetCFHeaders: sp.OnGetCFHeaders,
2685  			OnGetCFCheckpt: sp.OnGetCFCheckpt,
2686  			OnFeeFilter:    sp.OnFeeFilter,
2687  			OnFilterAdd:    sp.OnFilterAdd,
2688  			OnFilterClear:  sp.OnFilterClear,
2689  			OnFilterLoad:   sp.OnFilterLoad,
2690  			OnGetAddr:      sp.OnGetAddr,
2691  			OnAddr:         sp.OnAddr,
2692  			OnRead:         sp.OnRead,
2693  			OnWrite:        sp.OnWrite,
2694  			// Note: The reference client currently bans peers that send alerts not signed with its key. We could verify
2695  			// against their key, but since the reference client is currently unwilling to support other
2696  			// implementations' alert messages, we will not relay theirs.
2697  			OnAlert: nil,
2698  		},
2699  		NewestBlock:       sp.GetNewestBlock,
2700  		HostToNetAddress:  sp.Server.AddrManager.HostToNetAddress,
2701  		Proxy:             sp.Server.Config.ProxyAddress.V(),
2702  		UserAgentName:     UserAgentName,
2703  		UserAgentVersion:  UserAgentVersion,
2704  		UserAgentComments: sp.Server.Config.UserAgentComments.S(),
2705  		ChainParams:       sp.Server.ChainParams,
2706  		Services:          sp.Server.Services,
2707  		DisableRelayTx:    sp.Server.Config.BlocksOnly.True(),
2708  		ProtocolVersion:   peer.MaxProtocolVersion,
2709  		TrickleInterval:   sp.Server.Config.TrickleInterval.V(),
2710  		IP:                sp.IP,
2711  		Port:              sp.Port,
2712  	}
2713  }
2714  
2715  type Context struct {
2716  	// Config is the pod all-in-one server config
2717  	Config *config.Config
2718  	// StateCfg is a reference to the main node state configuration struct
2719  	StateCfg *active.Config
2720  	// ActiveNet is the active net parameters
2721  	ActiveNet *chaincfg.Params
2722  	// Hashrate is the hash counter
2723  	Hashrate uberatomic.Uint64
2724  }
2725  
2726  // NewNode returns a new pod server configured to listen on addr for the bitcoin network type specified by chainParams.
2727  // Use start to begin accepting connections from peers.
2728  //
2729  // TODO: simplify/modularise this
2730  func NewNode(listenAddrs []string, db database.DB, interruptChan qu.C, cx *Context, mempoolUpdateHook func()) (
2731  	*Node,
2732  	error,
2733  ) {
2734  	D.Ln("listenAddrs ", listenAddrs)
2735  	services := DefaultServices
2736  	if cx.Config.NoPeerBloomFilters.True() {
2737  		services &^= wire.SFNodeBloom
2738  	}
2739  	if cx.Config.NoCFilters.True() {
2740  		services &^= wire.SFNodeCF
2741  	}
2742  	aMgr := addrmgr.New(cx.Config.DataDir.V()+string(os.PathSeparator)+cx.ActiveNet.Name, Lookup(cx.StateCfg))
2743  	var lstn []net.Listener
2744  	var nat upnp.NAT
2745  	if cx.Config.DisableListen.False() {
2746  		var e error
2747  		if lstn, nat, e = InitListeners(cx.Config, cx.ActiveNet, aMgr, listenAddrs, services); E.Chk(e) {
2748  			return nil, e
2749  		}
2750  		if len(lstn) == 0 {
2751  			return nil, errors.New("no valid listen address")
2752  		}
2753  	}
2754  	nThreads := runtime.NumCPU()
2755  	var thr int
2756  	if cx.Config.GenThreads.V() == -1 || thr > nThreads {
2757  		thr = nThreads
2758  	} else {
2759  		thr = cx.Config.GenThreads.V()
2760  	}
2761  	T.Ln("set genthreads to ", thr)
2762  	s := Node{
2763  		ChainParams:          cx.ActiveNet,
2764  		AddrManager:          aMgr,
2765  		NewPeers:             make(chan *NodePeer, cx.Config.MaxPeers.V()),
2766  		DonePeers:            make(chan *NodePeer, cx.Config.MaxPeers.V()),
2767  		BanPeers:             make(chan *NodePeer, cx.Config.MaxPeers.V()),
2768  		PeerState:            make(chan chan peersummary.PeerSummaries, 1),
2769  		Query:                make(chan interface{}),
2770  		RelayInv:             make(chan RelayMsg, cx.Config.MaxPeers.V()),
2771  		Broadcast:            make(chan BroadcastMsg, cx.Config.MaxPeers.V()),
2772  		Quit:                 qu.T(),
2773  		ModifyRebroadcastInv: make(chan interface{}),
2774  		PeerHeightsUpdate:    make(chan UpdatePeerHeightsMsg),
2775  		NAT:                  nat,
2776  		DB:                   db,
2777  		TimeSource:           blockchain.NewMedianTime(),
2778  		Services:             services,
2779  		SigCache:             txscript.NewSigCache(uint(cx.Config.SigCacheMaxSize.V())),
2780  		HashCache:            txscript.NewHashCache(uint(cx.Config.SigCacheMaxSize.V())),
2781  		CFCheckptCaches:      make(map[wire.FilterType][]CFHeaderKV),
2782  		GenThreads:           uint32(thr),
2783  		Config:               cx.Config,
2784  		StateCfg:             cx.StateCfg,
2785  		ActiveNet:            cx.ActiveNet,
2786  		StartController:      qu.Ts(2),
2787  		StopController:       qu.Ts(2),
2788  	}
2789  	// Create the transaction and address indexes if needed.
2790  	//
2791  	// CAUTION: the txindex needs to be first in the indexes array because the addrindex uses data from the txindex
2792  	// during catchup.
2793  	//
2794  	// If the addrindex is run first, it may not have the transactions from the current block indexed.
2795  	var indexes []indexers.Indexer
2796  	D.Ln("txindex", cx.Config.TxIndex.True(), "addrindex", cx.Config.AddrIndex.True())
2797  	if cx.Config.TxIndex.True() || cx.Config.AddrIndex.True() {
2798  		// Enable transaction index if address index is enabled since it requires it.
2799  		if !cx.Config.TxIndex.True() {
2800  			I.Ln("transaction index enabled because it is required by the address index")
2801  			cx.Config.TxIndex.T()
2802  		} else {
2803  			I.Ln("transaction index is enabled")
2804  		}
2805  		s.TxIndex = indexers.NewTxIndex(db)
2806  		indexes = append(indexes, s.TxIndex)
2807  	}
2808  	if cx.Config.AddrIndex.True() {
2809  		I.Ln("address index is enabled")
2810  		s.AddrIndex = indexers.NewAddrIndex(db, cx.ActiveNet)
2811  		indexes = append(indexes, s.AddrIndex)
2812  	}
2813  	if !cx.Config.NoCFilters.True() {
2814  		T.Ln("committed filter index is enabled")
2815  		s.CFIndex = indexers.NewCfIndex(db, cx.ActiveNet)
2816  		indexes = append(indexes, s.CFIndex)
2817  	}
2818  	// Create an index manager if any of the optional indexes are enabled.
2819  	var indexManager blockchain.IndexManager
2820  	if len(indexes) > 0 {
2821  		indexManager = indexers.NewManager(db, indexes)
2822  	}
2823  	// Merge given checkpoints with the default ones unless they are disabled.
2824  	var checkpoints []chaincfg.Checkpoint
2825  	if !cx.Config.DisableCheckpoints.True() {
2826  		checkpoints = MergeCheckpoints(
2827  			s.ChainParams.Checkpoints, cx.StateCfg.AddedCheckpoints,
2828  		)
2829  	}
2830  	// Create a new block chain instance with the appropriate configuration.
2831  	var e error
2832  	s.Chain, e = blockchain.New(
2833  		&blockchain.Config{
2834  			DB:           s.DB,
2835  			Interrupt:    interruptChan,
2836  			ChainParams:  s.ChainParams,
2837  			Checkpoints:  checkpoints,
2838  			TimeSource:   s.TimeSource,
2839  			SigCache:     s.SigCache,
2840  			IndexManager: indexManager,
2841  			HashCache:    s.HashCache,
2842  		},
2843  	)
2844  	if e != nil {
2845  		return nil, e
2846  	}
2847  	s.Chain.DifficultyAdjustments = make(map[string]float64)
2848  	s.Chain.DifficultyBits.Store(make(blockchain.Diffs))
2849  	// Search for a FeeEstimator state in the database. If none can be found or if it cannot be loaded, create a new
2850  	// one.
2851  	e = db.Update(
2852  		func(tx database.Tx) (e error) {
2853  			metadata := tx.Metadata()
2854  			feeEstimationData := metadata.Get(mempool.EstimateFeeDatabaseKey)
2855  			if feeEstimationData != nil {
2856  				// delete it from the database so that we don't try to restore the same thing again somehow.
2857  				e = metadata.Delete(mempool.EstimateFeeDatabaseKey)
2858  				if e != nil {
2859  					return e
2860  				}
2861  				// If there is an error, log it and make a new fee estimator.
2862  				var e error
2863  				s.FeeEstimator, e = mempool.RestoreFeeEstimator(feeEstimationData)
2864  				if e != nil {
2865  					return fmt.Errorf("failed to restore fee estimator %v", e)
2866  				}
2867  			}
2868  			return nil
2869  		},
2870  	)
2871  	if e != nil {
2872  		E.Ln(e)
2873  	}
2874  	// If no feeEstimator has been found, or if the one that has been found is behind somehow, create a new one and
2875  	// start over.
2876  	if s.FeeEstimator == nil || s.FeeEstimator.LastKnownHeight() != s.Chain.
2877  		BestSnapshot().Height {
2878  		s.FeeEstimator = mempool.NewFeeEstimator(
2879  			mempool.DefaultEstimateFeeMaxRollback,
2880  			mempool.DefaultEstimateFeeMinRegisteredBlocks,
2881  		)
2882  	}
2883  	txC := mempool.Config{
2884  		Policy: mempool.Policy{
2885  			DisableRelayPriority: cx.Config.NoRelayPriority.True(),
2886  			AcceptNonStd:         cx.Config.RelayNonStd.True(),
2887  			FreeTxRelayLimit:     cx.Config.FreeTxRelayLimit.V(),
2888  			MaxOrphanTxs:         cx.Config.MaxOrphanTxs.V(),
2889  			MaxOrphanTxSize:      DefaultMaxOrphanTxSize,
2890  			MaxSigOpCostPerTx:    blockchain.MaxBlockSigOpsCost / 4,
2891  			MinRelayTxFee:        cx.StateCfg.ActiveMinRelayTxFee,
2892  			MaxTxVersion:         2,
2893  		},
2894  		ChainParams:   cx.ActiveNet,
2895  		FetchUtxoView: s.Chain.FetchUtxoView,
2896  		BestHeight: func() int32 {
2897  			return s.Chain.BestSnapshot().Height
2898  		},
2899  		MedianTimePast: func() time.Time {
2900  			return s.Chain.BestSnapshot().MedianTime
2901  		},
2902  		// CalcSequenceLock: func(tx *util.Tx, view *blockchain.UtxoViewpoint) (
2903  		// 	*blockchain.SequenceLock, error,
2904  		// ) {
2905  		// 	return s.Chain.CalcSequenceLock(tx, view, true)
2906  		// },
2907  		// IsDeploymentActive: s.Chain.IsDeploymentActive,
2908  		SigCache:     s.SigCache,
2909  		HashCache:    s.HashCache,
2910  		AddrIndex:    s.AddrIndex,
2911  		FeeEstimator: s.FeeEstimator,
2912  		UpdateHook:   mempoolUpdateHook,
2913  	}
2914  	s.TxMemPool = mempool.New(&txC)
2915  	s.SyncManager, e =
2916  		netsync.New(
2917  			&netsync.Config{
2918  				PeerNotifier:       &s,
2919  				Chain:              s.Chain,
2920  				TxMemPool:          s.TxMemPool,
2921  				ChainParams:        s.ChainParams,
2922  				DisableCheckpoints: cx.Config.DisableCheckpoints.True(),
2923  				MaxPeers:           cx.Config.MaxPeers.V(),
2924  				FeeEstimator:       s.FeeEstimator,
2925  			},
2926  		)
2927  	if e != nil {
2928  		return nil, e
2929  	}
2930  	// // Create the mining policy and block template generator based on the
2931  	// // configuration options.
2932  	// // NOTE: The CPU miner relies on the mempool, so the mempool has to be
2933  	// // created before calling the function to create the CPU miner.
2934  	// policy := mining.Policy{
2935  	// 	BlockMinWeight:    uint32(*config.BlockMinWeight),
2936  	// 	BlockMaxWeight:    uint32(*config.BlockMaxWeight),
2937  	// 	BlockMinSize:      uint32(*config.BlockMinSize),
2938  	// 	BlockMaxSize:      uint32(*config.BlockMaxSize),
2939  	// 	BlockPrioritySize: uint32(*config.BlockPrioritySize),
2940  	// 	TxMinFreeFee:      stateCfg.ActiveMinRelayTxFee,
2941  	// }
2942  	// blockTemplateGenerator := mining.NewBlkTmplGenerator(&policy,
2943  	// 	s.ChainParams, s.TxMemPool, s.Chain, s.TimeSource,
2944  	// 	s.SigCache, s.HashCache, s.Algo)
2945  	// s.CPUMiner = cpuminer.New(&cpuminer.Config{
2946  	// 	Blockchain:             s.Chain,
2947  	// 	ChainParams:            chainParams,
2948  	// 	BlockTemplateGenerator: blockTemplateGenerator,
2949  	// 	MiningAddrs:            stateCfg.ActiveMiningAddrs,
2950  	// 	ProcessBlock:           s.SyncManager.ProcessBlock,
2951  	// 	ConnectedCount:         s.ConnectedCount,
2952  	// 	IsCurrent:              s.SyncManager.IsCurrent,
2953  	// 	NumThreads:             s.GenThreads,
2954  	// 	Algo:                   s.Algo,
2955  	// 	Solo:                   *config.Solo,
2956  	// })
2957  	// Only setup a function to return new addresses to connect to when
2958  	// not running in connect-only mode.  The simulation network is always
2959  	// in connect-only mode since it is only intended to connect to
2960  	// specified peers and actively avoid advertising and connecting to
2961  	// discovered peers in order to prevent it from becoming a public test
2962  	// network.
2963  	var newAddressFunc func() (net.Addr, error)
2964  	if !((cx.Config.Network.V())[0] == 's') && len(cx.Config.ConnectPeers.S()) == 0 {
2965  		newAddressFunc = func() (net.Addr, error) {
2966  			for tries := 0; tries < 100; tries++ {
2967  				addr := s.AddrManager.GetAddress()
2968  				if addr == nil {
2969  					break
2970  				}
2971  				// Address will not be invalid, local or unrouteable because addrmanager rejects those on addition.
2972  				//
2973  				// Just check that we don't already have an address in the same group so that we are not connecting to
2974  				// the same network segment at the expense of others.
2975  				key := addrmgr.GroupKey(addr.NetAddress())
2976  				if s.OutboundGroupCount(key) != 0 {
2977  					continue
2978  				}
2979  				// only allow recent nodes (10 min) after we failed 30 times
2980  				if tries < 30 && time.Since(addr.LastAttempt()) < 10*time.Minute {
2981  					continue
2982  				}
2983  				// allow non default ports after 50 failed tries.
2984  				if tries < 50 && fmt.Sprintf("%d", addr.NetAddress().Port) !=
2985  					cx.ActiveNet.DefaultPort {
2986  					continue
2987  				}
2988  				addrString := addrmgr.NetAddressKey(addr.NetAddress())
2989  				return AddrStringToNetAddr(cx.Config, cx.StateCfg, addrString)
2990  			}
2991  			return nil, errors.New("no valid connect address")
2992  		}
2993  	}
2994  	// Create a connection manager.
2995  	targetOutbound := DefaultTargetOutbound
2996  	if cx.Config.MaxPeers.V() < targetOutbound {
2997  		targetOutbound = cx.Config.MaxPeers.V()
2998  	}
2999  	cMgr, e :=
3000  		connmgr.New(
3001  			&connmgr.Config{
3002  				Listeners:      lstn,
3003  				OnAccept:       s.InboundPeerConnected,
3004  				RetryDuration:  ConnectionRetryInterval,
3005  				TargetOutbound: uint32(targetOutbound),
3006  				Dial:           Dial(cx.StateCfg),
3007  				OnConnection:   s.OutboundPeerConnected,
3008  				GetNewAddress:  newAddressFunc,
3009  			},
3010  		)
3011  	if e != nil {
3012  		return nil, e
3013  	}
3014  	s.ConnManager = cMgr
3015  	// Start up persistent peers.
3016  	permanentPeers := cx.Config.ConnectPeers.S()
3017  	if len(permanentPeers) == 0 {
3018  		permanentPeers = cx.Config.AddPeers.S()
3019  	}
3020  	for _, addr := range permanentPeers {
3021  		netAddr, e := AddrStringToNetAddr(cx.Config, cx.StateCfg, addr)
3022  		if e != nil {
3023  			return nil, e
3024  		}
3025  		go s.ConnManager.Connect(
3026  			&connmgr.ConnReq{
3027  				Addr:      netAddr,
3028  				Permanent: true,
3029  			},
3030  		)
3031  	}
3032  	if cx.Config.DisableRPC.False() {
3033  		// Setup listeners for the configured RPC listen addresses and TLS settings.
3034  		listeners := map[string][]string{
3035  			fork.SHA256d: cx.Config.RPCListeners.S(),
3036  		}
3037  		for l := range listeners {
3038  			rpcListeners, e := SetupRPCListeners(cx.Config, listeners[l])
3039  			if e != nil {
3040  				return nil, e
3041  			}
3042  			if len(rpcListeners) == 0 {
3043  				return nil, errors.New("RPCS: No valid listen address")
3044  			}
3045  			rp, e := NewRPCServer(
3046  				&ServerConfig{
3047  					Listeners:   rpcListeners,
3048  					StartupTime: s.StartupTime,
3049  					ConnMgr:     &ConnManager{&s},
3050  					SyncMgr:     &SyncManager{&s, s.SyncManager},
3051  					TimeSource:  s.TimeSource,
3052  					Chain:       s.Chain,
3053  					ChainParams: cx.ActiveNet,
3054  					DB:          db,
3055  					TxMemPool:   s.TxMemPool,
3056  					// Generator:    blockTemplateGenerator,
3057  					// CPUMiner:     s.CPUMiner,
3058  					TxIndex:         s.TxIndex,
3059  					AddrIndex:       s.AddrIndex,
3060  					CfIndex:         s.CFIndex,
3061  					FeeEstimator:    s.FeeEstimator,
3062  					Algo:            l,
3063  					Hashrate:        cx.Hashrate,
3064  					Quit:            s.Quit,
3065  					StartController: s.StartController,
3066  					StopController:  s.StopController,
3067  				}, cx.StateCfg, cx.Config,
3068  			)
3069  			if e != nil {
3070  				return nil, e
3071  			}
3072  			s.RPCServers = append(s.RPCServers, rp)
3073  		}
3074  		// Signal process shutdown when the RPC server requests it.
3075  		go func() {
3076  			s.Quit.Wait()
3077  			for i := range s.RPCServers {
3078  				s.RPCServers[i].Quit.Q()
3079  			}
3080  		}()
3081  		go func() {
3082  			for i := range s.RPCServers {
3083  				<-s.RPCServers[i].RequestedProcessShutdown()
3084  			}
3085  			interrupt.Request()
3086  		}()
3087  	}
3088  	return &s, nil
3089  }
3090  
3091  // NewServerPeer returns a new ServerPeer instance. The peer needs to be set by the caller.
3092  //
3093  // note that peers that give different addresses to the sender address are
3094  // disconnected to stop spoofing to disrupt other connections with the fake IP.
3095  // Therefore upnp external address must be found and otherwise zero for proxy,
3096  // empty or explicitly disabled. further external inbound ports as possible to
3097  // configure in externalIPs are left as an exercise for the reader, since there
3098  // can be more than one and the message only anyway allows one IP on both sides,
3099  // so anyhow external ip's also have to send different messages so what are they
3100  // for anyway (spoofing?) - but seriously, todo: remove external ips
3101  func NewServerPeer(s *Node, localIP net.IP, isPersistent bool) *NodePeer {
3102  	var e error
3103  	var host, port string
3104  	var ipa net.IP
3105  	var p uint64
3106  	if s.Config.P2PConnect.V() != nil ||
3107  		len(s.Config.P2PConnect.S()) < 1 ||
3108  		s.Config.DisableListen.True() ||
3109  		s.Config.ProxyAddress.V() != "" ||
3110  		s.Config.OnionProxyAddress.V() != "" {
3111  		// return an empty IP address if we are not listening (this also is done on
3112  		// proxy connections to not leak info)
3113  	} else {
3114  		myAddress := (s.Config.P2PConnect.S())[0]
3115  		if host, port, e = net.SplitHostPort(myAddress); E.Chk(e) {
3116  		}
3117  		if p, e = strconv.ParseUint(port, 10, 16); E.Chk(e) {
3118  		}
3119  		// use the given UPNP external address if in use so version message matches
3120  		// sender
3121  		if s.Config.UPNP.True() {
3122  			var exip net.IP
3123  			if exip, e = s.NAT.GetExternalAddress(); E.Chk(e) {
3124  			} else {
3125  				ipa = exip
3126  			}
3127  		} else {
3128  			ipa = net.ParseIP(host)
3129  		}
3130  		// if the return interface address is given...
3131  		if !localIP.Equal(net.IP{0, 0, 0, 0}) {
3132  			ipa = localIP
3133  		}
3134  		I.Ln(ipa, p)
3135  	}
3136  	return &NodePeer{
3137  		Server:         s,
3138  		Persistent:     isPersistent,
3139  		Filter:         bloom.LoadFilter(nil),
3140  		KnownAddresses: make(map[string]struct{}),
3141  		Quit:           qu.T(),
3142  		TxProcessed:    qu.Ts(1),
3143  		BlockProcessed: qu.Ts(1),
3144  		IP:             ipa,
3145  		Port:           uint16(p),
3146  	}
3147  }
3148  
3149  // ParseListeners determines whether each listen address is IPv4 and IPv6 and returns a slice of appropriate net.Addrs
3150  // to listen on with TCP.
3151  //
3152  // It also properly detects addresses which apply to "all interfaces" and adds the address as both IPv4 and IPv6.
3153  func ParseListeners(addrs []string) ([]net.Addr, error) {
3154  	netAddrs := make([]net.Addr, 0, len(addrs)*2)
3155  	for _, addr := range addrs {
3156  		var host string
3157  		var e error
3158  		host, _, e = net.SplitHostPort(addr)
3159  		if e != nil {
3160  			// Shouldn't happen due to already being normalized.
3161  			return nil, e
3162  		}
3163  		// Empty host or host of * on plan9 is both IPv4 and IPv6.
3164  		if host == "" || (host == "*" && runtime.GOOS == "plan9") {
3165  			netAddrs = append(netAddrs, SimpleAddr{Net: "tcp4", Addr: addr})
3166  			netAddrs = append(netAddrs, SimpleAddr{Net: "tcp6", Addr: addr})
3167  			continue
3168  		}
3169  		// Strip IPv6 zone id if present since net.ParseIP does not handle it.
3170  		zoneIndex := strings.LastIndex(host, "%")
3171  		if zoneIndex > 0 {
3172  			host = host[:zoneIndex]
3173  		}
3174  		// Parse the IP.
3175  		ip := net.ParseIP(host)
3176  		if ip == nil {
3177  			return nil, fmt.Errorf("'%s' is not a valid IP address", host)
3178  		}
3179  		// To4 returns nil when the IP is not an IPv4 address, so use this determine the
3180  		// address type.
3181  		if ip.To4() == nil {
3182  			netAddrs = append(netAddrs, SimpleAddr{Net: "tcp6", Addr: addr})
3183  		} else {
3184  			netAddrs = append(netAddrs, SimpleAddr{Net: "tcp4", Addr: addr})
3185  		}
3186  	}
3187  	return netAddrs, nil
3188  }
3189  
3190  // RandomUint16Number returns a random uint16 in a specified input range. Note
3191  // that the range is in zeroth ordering; if you pass it 1800, you will get
3192  // values from 0 to 1800.
3193  func RandomUint16Number(max uint16) uint16 {
3194  	// In order to avoid modulo bias and ensure every possible outcome in [0, max)
3195  	// has equal probability, the random number must be sampled from a random source
3196  	// that has a range limited to a multiple of the modulus.
3197  	var randomNumber uint16
3198  	var limitRange = (math.MaxUint16 / max) * max
3199  	for {
3200  		e := binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
3201  		if e != nil {
3202  		}
3203  		if randomNumber < limitRange {
3204  			return randomNumber % max
3205  		}
3206  	}
3207  }
3208  
3209  // SetupRPCListeners returns a slice of listeners that are configured for use with the RPC server depending on the
3210  // configuration settings for listen addresses and TLS.
3211  func SetupRPCListeners(config *config.Config, urls []string) ([]net.Listener, error) {
3212  	// Setup TLS if not disabled.
3213  	listenFunc := net.Listen
3214  	if config.ServerTLS.True() {
3215  		// Generate the TLS cert and key file if both don't already exist.
3216  		if !FileExists(config.RPCKey.V()) && !FileExists(config.RPCCert.V()) {
3217  			e := GenCertPair(config.RPCCert.V(), config.RPCKey.V())
3218  			if e != nil {
3219  				return nil, e
3220  			}
3221  		}
3222  		keyPair, e := tls.LoadX509KeyPair(config.RPCCert.V(), config.RPCKey.V())
3223  		if e != nil {
3224  			return nil, e
3225  		}
3226  		tlsConfig := tls.Config{
3227  			Certificates:       []tls.Certificate{keyPair},
3228  			MinVersion:         tls.VersionTLS12,
3229  			InsecureSkipVerify: config.TLSSkipVerify.True(),
3230  		}
3231  		// Change the standard net.Listen function to the tls one.
3232  		listenFunc = func(net string, laddr string) (net.Listener, error) {
3233  			return tls.Listen(net, laddr, &tlsConfig)
3234  		}
3235  	}
3236  	netAddrs, e := ParseListeners(urls)
3237  	if e != nil {
3238  		return nil, e
3239  	}
3240  	listeners := make([]net.Listener, 0, len(netAddrs))
3241  	for _, addr := range netAddrs {
3242  		listener, e := listenFunc(addr.Network(), addr.String())
3243  		if e != nil {
3244  			E.F("can't listen on %s: %v", addr, e)
3245  			continue
3246  		}
3247  		listeners = append(listeners, listener)
3248  	}
3249  	return listeners, nil
3250  }
3251  
3252  // FileExists reports whether the named file or directory exists.
3253  func FileExists(name string) bool {
3254  	if _, e := os.Stat(name); E.Chk(e) {
3255  		if os.IsNotExist(e) {
3256  			return false
3257  		}
3258  	}
3259  	return true
3260  }
3261  
3262  func GetBlkTemplateGenerator(node *Node, cfg *config.Config, stateCfg *active.Config) *mining.BlkTmplGenerator {
3263  	D.Ln("getting a block template generator")
3264  	return mining.NewBlkTmplGenerator(
3265  		&mining.Policy{
3266  			BlockMinWeight:    uint32(cfg.BlockMinWeight.V()),
3267  			BlockMaxWeight:    uint32(cfg.BlockMaxWeight.V()),
3268  			BlockMinSize:      uint32(cfg.BlockMinSize.V()),
3269  			BlockMaxSize:      uint32(cfg.BlockMaxSize.V()),
3270  			BlockPrioritySize: uint32(cfg.BlockPrioritySize.V()),
3271  			TxMinFreeFee:      stateCfg.ActiveMinRelayTxFee,
3272  		},
3273  		node.ChainParams,
3274  		node.TxMemPool,
3275  		node.Chain,
3276  		node.TimeSource,
3277  		node.SigCache,
3278  		node.HashCache,
3279  	)
3280  }
3281