peer.go raw

   1  package peer
   2  
   3  import (
   4  	"container/list"
   5  	"errors"
   6  	"fmt"
   7  	"github.com/p9c/p9/pkg/chaincfg"
   8  	"github.com/p9c/p9/pkg/log"
   9  	"io"
  10  	"math/rand"
  11  	"net"
  12  	"strconv"
  13  	"sync"
  14  	"sync/atomic"
  15  	"time"
  16  	
  17  	"github.com/p9c/p9/pkg/qu"
  18  	
  19  	"github.com/btcsuite/go-socks/socks"
  20  	
  21  	"github.com/p9c/p9/pkg/blockchain"
  22  	"github.com/p9c/p9/pkg/chainhash"
  23  	"github.com/p9c/p9/pkg/wire"
  24  )
  25  
  26  const (
  27  	// MaxProtocolVersion is the max protocol version the peer supports.
  28  	MaxProtocolVersion = wire.FeeFilterVersion
  29  	// DefaultTrickleInterval is the min time between attempts to send an inv message to a peer.
  30  	DefaultTrickleInterval = time.Second
  31  	// MinAcceptableProtocolVersion is the lowest protocol version that a connected peer may support.
  32  	MinAcceptableProtocolVersion = 1
  33  	// outputBufferSize is the number of elements the output channels use.
  34  	outputBufferSize = 1000
  35  	// invTrickleSize is the maximum amount of inventory to send in a single message when trickling inventory to remote
  36  	// peers.
  37  	maxInvTrickleSize = 5000
  38  	// maxKnownInventory is the maximum number of items to keep in the known inventory cache.
  39  	maxKnownInventory = 30000
  40  	// pingInterval is the interval of time to wait in between sending ping messages.
  41  	pingInterval = 1 * time.Second
  42  	// negotiateTimeout is the duration of inactivity before we timeout a peer that hasn't completed the initial version
  43  	// negotiation.
  44  	negotiateTimeout = 27 * time.Second
  45  	// idleTimeout is the duration of inactivity before we time out a peer.
  46  	idleTimeout = time.Minute
  47  	// stallTickInterval is the interval of time between each check for stalled peers.
  48  	stallTickInterval = 60 * time.Second
  49  	// stallResponseTimeout is the base maximum amount of time messages that expect a response will wait before
  50  	// disconnecting the peer for stalling. The deadlines are adjusted for callback running times and checked on each
  51  	// stall tick interval.
  52  	stallResponseTimeout = 360 * time.Second
  53  )
  54  
  55  var (
  56  	// nodeCount is the total number of peer connections made since startup and is used to assign an id to a peer.
  57  	nodeCount int32
  58  	// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
  59  	zeroHash chainhash.Hash
  60  	// SentNonces houses the unique nonces that are generated when pushing version messages that are used to detect self
  61  	// connections.
  62  	SentNonces = newMruNonceMap(50)
  63  	// AllowSelfConns is only used to allow the tests to bypass the self connection detecting and disconnect logic since
  64  	// they intentionally do so for testing purposes.
  65  	AllowSelfConns bool
  66  )
  67  
  68  // MessageListeners defines callback function pointers to invoke with message listeners for a peer. Any listener which
  69  // is not set to a concrete callback during peer initialization is ignored.
  70  //
  71  // Execution of multiple message listeners occurs serially, so one callback blocks the execution of the next.
  72  //
  73  // NOTE: Unless otherwise documented, these listeners must NOT directly call any blocking calls ( such as
  74  // WaitForShutdown) on the peer instance since the input handler goroutine blocks until the callback has completed.
  75  // Doing so will result in a deadlock.
  76  type MessageListeners struct {
  77  	// OnGetAddr is invoked when a peer receives a getaddr bitcoin message.
  78  	OnGetAddr func(p *Peer, msg *wire.MsgGetAddr)
  79  	// OnAddr is invoked when a peer receives an addr bitcoin message.
  80  	OnAddr func(p *Peer, msg *wire.MsgAddr)
  81  	// OnPing is invoked when a peer receives a ping bitcoin message.
  82  	OnPing func(p *Peer, msg *wire.MsgPing)
  83  	// OnPong is invoked when a peer receives a pong bitcoin message.
  84  	OnPong func(p *Peer, msg *wire.MsgPong)
  85  	// OnAlert is invoked when a peer receives an alert bitcoin message.
  86  	OnAlert func(p *Peer, msg *wire.MsgAlert)
  87  	// OnMemPool is invoked when a peer receives a mempool bitcoin message.
  88  	OnMemPool func(p *Peer, msg *wire.MsgMemPool)
  89  	// OnTx is invoked when a peer receives a tx bitcoin message.
  90  	OnTx func(p *Peer, msg *wire.MsgTx)
  91  	// OnBlock is invoked when a peer receives a block bitcoin message.
  92  	OnBlock func(p *Peer, msg *wire.Block, buf []byte)
  93  	// OnCFilter is invoked when a peer receives a cfilter bitcoin message.
  94  	OnCFilter func(p *Peer, msg *wire.MsgCFilter)
  95  	// OnCFHeaders is invoked when a peer receives a cfheaders bitcoin message.
  96  	OnCFHeaders func(p *Peer, msg *wire.MsgCFHeaders)
  97  	// OnCFCheckpt is invoked when a peer receives a cfcheckpt bitcoin message.
  98  	OnCFCheckpt func(p *Peer, msg *wire.MsgCFCheckpt)
  99  	// OnInv is invoked when a peer receives an inv bitcoin message.
 100  	OnInv func(p *Peer, msg *wire.MsgInv)
 101  	// OnHeaders is invoked when a peer receives a headers bitcoin message.
 102  	OnHeaders func(p *Peer, msg *wire.MsgHeaders)
 103  	// OnNotFound is invoked when a peer receives a notfound bitcoin message.
 104  	OnNotFound func(p *Peer, msg *wire.MsgNotFound)
 105  	// OnGetData is invoked when a peer receives a getdata bitcoin message.
 106  	OnGetData func(p *Peer, msg *wire.MsgGetData)
 107  	// OnGetBlocks is invoked when a peer receives a getblocks bitcoin message.
 108  	OnGetBlocks func(p *Peer, msg *wire.MsgGetBlocks)
 109  	// OnGetHeaders is invoked when a peer receives a getheaders bitcoin
 110  	// message.
 111  	OnGetHeaders func(p *Peer, msg *wire.MsgGetHeaders)
 112  	// OnGetCFilters is invoked when a peer receives a getcfilters bitcoin
 113  	// message.
 114  	OnGetCFilters func(p *Peer, msg *wire.MsgGetCFilters)
 115  	// OnGetCFHeaders is invoked when a peer receives a getcfheaders bitcoin
 116  	// message.
 117  	OnGetCFHeaders func(p *Peer, msg *wire.MsgGetCFHeaders)
 118  	// OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin
 119  	// message.
 120  	OnGetCFCheckpt func(p *Peer, msg *wire.MsgGetCFCheckpt)
 121  	// OnFeeFilter is invoked when a peer receives a feefilter bitcoin message.
 122  	OnFeeFilter func(p *Peer, msg *wire.MsgFeeFilter)
 123  	// OnFilterAdd is invoked when a peer receives a filteradd bitcoin message.
 124  	OnFilterAdd func(p *Peer, msg *wire.MsgFilterAdd)
 125  	// OnFilterClear is invoked when a peer receives a filterclear bitcoin
 126  	// message.
 127  	OnFilterClear func(p *Peer, msg *wire.MsgFilterClear)
 128  	// OnFilterLoad is invoked when a peer receives a filterload bitcoin
 129  	// message.
 130  	OnFilterLoad func(p *Peer, msg *wire.MsgFilterLoad)
 131  	// OnMerkleBlock  is invoked when a peer receives a merkleblock bitcoin
 132  	// message.
 133  	OnMerkleBlock func(p *Peer, msg *wire.MsgMerkleBlock)
 134  	// OnVersion is invoked when a peer receives a version bitcoin message.
 135  	// The caller may return a reject message in which case the message will
 136  	// be sent to the peer and the peer will be disconnected.
 137  	OnVersion func(p *Peer, msg *wire.MsgVersion) *wire.MsgReject
 138  	// OnVerAck is invoked when a peer receives a verack bitcoin message.
 139  	OnVerAck func(p *Peer, msg *wire.MsgVerAck)
 140  	// OnReject is invoked when a peer receives a reject bitcoin message.
 141  	OnReject func(p *Peer, msg *wire.MsgReject)
 142  	// OnSendHeaders is invoked when a peer receives a sendheaders bitcoin
 143  	// message.
 144  	OnSendHeaders func(p *Peer, msg *wire.MsgSendHeaders)
 145  	// OnRead is invoked when a peer receives a bitcoin message.
 146  	//
 147  	// It consists of the number of bytes read, the message, and whether or not an error in the read occurred.
 148  	// Typically, callers will opt to use the callbacks for the specific message types, however this can be useful for
 149  	// circumstances such as keeping track of server-wide byte counts or working with custom message types for which the
 150  	// peer does not directly provide a callback.
 151  	OnRead func(p *Peer, bytesRead int, msg wire.Message, e error)
 152  	// OnWrite is invoked when we write a bitcoin message to a peer.
 153  	//
 154  	// It consists of the number of bytes written, the message, and whether or not an error in the write occurred. This
 155  	// can be useful for circumstances such as keeping track of server -wide byte counts.
 156  	OnWrite func(p *Peer, bytesWritten int, msg wire.Message, e error)
 157  }
 158  
 159  // Config is the struct to hold configuration options useful to Peer.
 160  type Config struct {
 161  	// NewestBlock specifies a callback which provides the newest block details to the peer as needed.
 162  	//
 163  	// This can be nil in which case the peer will report a block height of 0, however it is good practice for peers to
 164  	// specify this so their currently best known is accurately reported.
 165  	NewestBlock HashFunc
 166  	// HostToNetAddress returns the netaddress for the given host. This can be nil in which case the host will be parsed
 167  	// as an IP address.
 168  	HostToNetAddress HostToNetAddrFunc
 169  	// Proxy indicates a proxy is being used for connections. The only effect this has is to prevent leaking the tor
 170  	// proxy address, so it only needs to specified if using a tor proxy.
 171  	Proxy string
 172  	// UserAgentName specifies the user agent name to advertise. It is highly recommended to specify this value.
 173  	UserAgentName string
 174  	// UserAgentVersion specifies the user agent version to advertise. It is highly recommended to specify this value
 175  	// and that it follows the form "major.minor.revision" e.g. "2.6.41".
 176  	UserAgentVersion string
 177  	// UserAgentComments specify the user agent comments to advertise. These values must not contain the illegal
 178  	// characters specified in BIP 14: '/', ':', '(', ')'.
 179  	UserAgentComments []string
 180  	// ChainParams identifies which chain parameters the peer is associated with. It is highly recommended to specify
 181  	// this field, however it can be omitted in which case the test network will be used.
 182  	ChainParams *chaincfg.Params
 183  	// Services specifies which services to advertise as supported by the local peer. This field can be omitted in which
 184  	// case it will be 0 and therefore advertise no supported services.
 185  	Services wire.ServiceFlag
 186  	// ProtocolVersion specifies the maximum protocol version to use and advertise. This field can be omitted in which
 187  	// case peer. MaxProtocolVersion will be used.
 188  	ProtocolVersion uint32
 189  	// DisableRelayTx specifies if the remote peer should be informed to not send inv messages for transactions.
 190  	DisableRelayTx bool
 191  	// Listeners houses callback functions to be invoked on receiving peer
 192  	// messages.
 193  	Listeners MessageListeners
 194  	// TrickleInterval is the duration of the ticker which trickles down the inventory to a peer.
 195  	TrickleInterval time.Duration
 196  	IP              net.IP
 197  	Port            uint16
 198  }
 199  
 200  // minUint32 is a helper function to return the minimum of two uint32s. This avoids a math import and the need to cast
 201  // to floats.
 202  func minUint32(a, b uint32) uint32 {
 203  	if a < b {
 204  		return a
 205  	}
 206  	return b
 207  }
 208  
 209  // newNetAddress attempts to extract the IP address and port from the passed net.Addr interface and create a bitcoin
 210  // NetAddress structure using that information.
 211  func newNetAddress(addr net.Addr, services wire.ServiceFlag) (*wire.NetAddress, error) {
 212  	// addr will be a net.TCPAddr when not using a proxy.
 213  	if tcpAddr, ok := addr.(*net.TCPAddr); ok {
 214  		ip := tcpAddr.IP
 215  		port := uint16(tcpAddr.Port)
 216  		na := wire.NewNetAddressIPPort(ip, port, services)
 217  		return na, nil
 218  	}
 219  	// addr will be a socks.ProxiedAddr when using a proxy.
 220  	if proxiedAddr, ok := addr.(*socks.ProxiedAddr); ok {
 221  		ip := net.ParseIP(proxiedAddr.Host)
 222  		if ip == nil {
 223  			ip = net.ParseIP("0.0.0.0")
 224  		}
 225  		port := uint16(proxiedAddr.Port)
 226  		na := wire.NewNetAddressIPPort(ip, port, services)
 227  		return na, nil
 228  	}
 229  	// For the most part, addr should be one of the two above cases, but to be safe, fall back to trying to parse the
 230  	// information from the address string as a last resort.
 231  	host, portStr, e := net.SplitHostPort(addr.String())
 232  	if e != nil {
 233  		return nil, e
 234  	}
 235  	ip := net.ParseIP(host)
 236  	port, e := strconv.ParseUint(portStr, 10, 16)
 237  	if e != nil {
 238  		return nil, e
 239  	}
 240  	na := wire.NewNetAddressIPPort(ip, uint16(port), services)
 241  	return na, nil
 242  }
 243  
 244  // outMsg is used to house a message to be sent along with a channel to signal when the message has been sent (or won't
 245  // be sent due to things such as shutdown)
 246  type outMsg struct {
 247  	msg      wire.Message
 248  	doneChan chan<- struct{}
 249  	encoding wire.MessageEncoding
 250  }
 251  
 252  // stallControlCmd represents the command of a stall control message.
 253  type stallControlCmd uint8
 254  
 255  // Constants for the command of a stall control message.
 256  const (
 257  	// sccSendMessage indicates a message is being sent to the remote peer.
 258  	sccSendMessage stallControlCmd = iota
 259  	// sccReceiveMessage indicates a message has been received from the
 260  	// remote peer.
 261  	sccReceiveMessage
 262  	// sccHandlerStart indicates a callback handler is about to be invoked.
 263  	sccHandlerStart
 264  	// sccHandlerStart indicates a callback handler has completed.
 265  	sccHandlerDone
 266  )
 267  
 268  // stallControlMsg is used to signal the stall handler about specific events so it can properly detect and handle
 269  // stalled remote peers.
 270  type stallControlMsg struct {
 271  	command stallControlCmd
 272  	message wire.Message
 273  }
 274  
 275  // StatsSnap is a snapshot of peer stats at a point in time.
 276  type StatsSnap struct {
 277  	ID             int32
 278  	Addr           string
 279  	Services       wire.ServiceFlag
 280  	LastSend       time.Time
 281  	LastRecv       time.Time
 282  	BytesSent      uint64
 283  	BytesRecv      uint64
 284  	ConnTime       time.Time
 285  	TimeOffset     int64
 286  	Version        uint32
 287  	UserAgent      string
 288  	Inbound        bool
 289  	StartingHeight int32
 290  	LastBlock      int32
 291  	LastPingNonce  uint64
 292  	LastPingTime   time.Time
 293  	LastPingMicros int64
 294  }
 295  
 296  // HashFunc is a function which returns a block hash, height and error It is used as a callback to get newest block
 297  // details.
 298  type HashFunc func() (hash *chainhash.Hash, height int32, e error)
 299  
 300  // AddrFunc is a func which takes an address and returns a related address.
 301  type AddrFunc func(remoteAddr *wire.NetAddress) *wire.NetAddress
 302  
 303  // HostToNetAddrFunc is a func which takes a host, port, services and returns the netaddress.
 304  type HostToNetAddrFunc func(
 305  	host string, port uint16,
 306  	services wire.ServiceFlag,
 307  ) (*wire.NetAddress, error)
 308  
 309  // NOTE: The overall data flow of a peer is split into 3 goroutines.
 310  //
 311  // Inbound messages are read via the inHandler goroutine and generally dispatched to their own handler.
 312  //
 313  // For inbound data-related messages such as blocks, transactions, and inventory, the data is handled by the
 314  // corresponding message handlers.
 315  //
 316  // The data flow for outbound messages is split into 2 goroutines, queueHandler and outHandler. The first, queueHandler,
 317  // is used as a way for external entities to queue messages, by way of the QueueMessage function, quickly regardless of
 318  // whether the peer is currently sending or not. It acts as the traffic cop between the external world and the actual
 319  // goroutine which writes to the network socket.
 320  
 321  // Peer provides a basic concurrent safe bitcoin peer for handling bitcoin communications via the peer-to-peer protocol.
 322  //
 323  // It provides full duplex reading and writing, automatic handling of the initial handshake process, querying of usage
 324  // statistics and other information about the remote peer such as its address, user agent, and protocol version, output
 325  // message queuing, inventory trickling, and the ability to dynamically register and unregister callbacks for handling
 326  // bitcoin protocol messages.
 327  //
 328  // Outbound messages are typically queued via QueueMessage or QueueInventory.
 329  //
 330  // QueueMessage is intended for all messages, including responses to data such as blocks and transactions.
 331  //
 332  // QueueInventory, on the other hand, is only intended for relaying inventory as it employs a trickling mechanism to
 333  // batch the inventory together. However, some helper functions for pushing messages of specific types that typically
 334  // require common special handling are provided as a convenience.
 335  type Peer struct {
 336  	// The following variables must only be used atomically.
 337  	bytesReceived uint64
 338  	bytesSent     uint64
 339  	lastRecv      int64
 340  	lastSend      int64
 341  	connected     int32
 342  	disconnect    int32
 343  	conn          net.Conn
 344  	// These fields are set at creation time and never modified, so they are safe to read from concurrently without a
 345  	// mutex.
 346  	Nonce                uint64
 347  	addr                 string
 348  	cfg                  Config
 349  	inbound              bool
 350  	flagsMtx             sync.Mutex // protects the peer flags below
 351  	na                   *wire.NetAddress
 352  	id                   int32
 353  	userAgent            string
 354  	services             wire.ServiceFlag
 355  	versionKnown         bool
 356  	advertisedProtoVer   uint32 // protocol version advertised by remote
 357  	protocolVersion      uint32 // negotiated protocol version
 358  	sendHeadersPreferred bool   // peer sent a sendheaders message
 359  	verAckReceived       bool
 360  	witnessEnabled       bool
 361  	wireEncoding         wire.MessageEncoding
 362  	knownInventory       *mruInventoryMap
 363  	prevGetBlocksMtx     sync.Mutex
 364  	prevGetBlocksBegin   *chainhash.Hash
 365  	prevGetBlocksStop    *chainhash.Hash
 366  	prevGetHdrsMtx       sync.Mutex
 367  	prevGetHdrsBegin     *chainhash.Hash
 368  	prevGetHdrsStop      *chainhash.Hash
 369  	// These fields keep track of statistics for the peer and are protected by the statsMtx mutex.
 370  	statsMtx           sync.RWMutex
 371  	timeOffset         int64
 372  	timeConnected      time.Time
 373  	startingHeight     int32
 374  	lastBlock          int32
 375  	lastAnnouncedBlock *chainhash.Hash
 376  	lastPingNonce      uint64    // Set to Nonce if we have a pending ping.
 377  	lastPingTime       time.Time // Time we sent last ping.
 378  	lastPingMicros     int64     // Time for last ping to return.
 379  	stallControl       chan stallControlMsg
 380  	outputQueue        chan outMsg
 381  	sendQueue          chan outMsg
 382  	sendDoneQueue      qu.C
 383  	outputInvChan      chan *wire.InvVect
 384  	inQuit             qu.C
 385  	queueQuit          qu.C
 386  	outQuit            qu.C
 387  	quit               qu.C
 388  	IP                 net.IP
 389  	Port               uint16
 390  }
 391  
 392  // String returns the peer's address and directionality as a human-readable string.
 393  //
 394  // This function is safe for concurrent access.
 395  func (p *Peer) String() string {
 396  	return fmt.Sprintf("%s (%s)", p.addr, log.DirectionString(p.inbound))
 397  }
 398  
 399  // UpdateLastBlockHeight updates the last known block for the peer.
 400  //
 401  // This function is safe for concurrent access.
 402  func (p *Peer) UpdateLastBlockHeight(newHeight int32) {
 403  	p.statsMtx.Lock()
 404  	T.F(
 405  		"updating last block height of peer %v from %v to %v",
 406  		p.addr,
 407  		p.lastBlock,
 408  		newHeight,
 409  	)
 410  	p.lastBlock = newHeight
 411  	p.statsMtx.Unlock()
 412  }
 413  
 414  // UpdateLastAnnouncedBlock updates meta-data about the last block hash this
 415  // peer is known to have announced.
 416  //
 417  // This function is safe for concurrent access.
 418  func (p *Peer) UpdateLastAnnouncedBlock(blkHash *chainhash.Hash) {
 419  	T.Ln("updating last blk for peer", p.addr, ",", blkHash)
 420  	p.statsMtx.Lock()
 421  	p.lastAnnouncedBlock = blkHash
 422  	p.statsMtx.Unlock()
 423  }
 424  
 425  // AddKnownInventory adds the passed inventory to the cache of known inventory for the peer.
 426  //
 427  // This function is safe for concurrent access.
 428  func (p *Peer) AddKnownInventory(invVect *wire.InvVect) {
 429  	p.knownInventory.Add(invVect)
 430  }
 431  
 432  // StatsSnapshot returns a snapshot of the current peer flags and statistics.
 433  //
 434  // This function is safe for concurrent access.
 435  func (p *Peer) StatsSnapshot() *StatsSnap {
 436  	p.statsMtx.RLock()
 437  	p.flagsMtx.Lock()
 438  	id := p.id
 439  	addr := p.addr
 440  	userAgent := p.userAgent
 441  	services := p.services
 442  	protocolVersion := p.advertisedProtoVer
 443  	p.flagsMtx.Unlock()
 444  	// Get a copy of all relevant flags and stats.
 445  	statsSnap := &StatsSnap{
 446  		ID:             id,
 447  		Addr:           addr,
 448  		UserAgent:      userAgent,
 449  		Services:       services,
 450  		LastSend:       p.LastSend(),
 451  		LastRecv:       p.LastRecv(),
 452  		BytesSent:      p.BytesSent(),
 453  		BytesRecv:      p.BytesReceived(),
 454  		ConnTime:       p.timeConnected,
 455  		TimeOffset:     p.timeOffset,
 456  		Version:        protocolVersion,
 457  		Inbound:        p.inbound,
 458  		StartingHeight: p.startingHeight,
 459  		LastBlock:      p.lastBlock,
 460  		LastPingNonce:  p.lastPingNonce,
 461  		LastPingMicros: p.lastPingMicros,
 462  		LastPingTime:   p.lastPingTime,
 463  	}
 464  	p.statsMtx.RUnlock()
 465  	return statsSnap
 466  }
 467  
 468  // ID returns the peer id.
 469  //
 470  // This function is safe for concurrent access.
 471  func (p *Peer) ID() int32 {
 472  	p.flagsMtx.Lock()
 473  	id := p.id
 474  	p.flagsMtx.Unlock()
 475  	return id
 476  }
 477  
 478  // NA returns the peer network address.
 479  //
 480  // This function is safe for concurrent access.
 481  func (p *Peer) NA() *wire.NetAddress {
 482  	p.flagsMtx.Lock()
 483  	na := p.na
 484  	p.flagsMtx.Unlock()
 485  	return na
 486  }
 487  
 488  // Addr returns the peer address.
 489  //
 490  // This function is safe for concurrent access.
 491  func (p *Peer) Addr() string {
 492  	// The address doesn't change after initialization, therefore it is not protected by a mutex.
 493  	return p.addr
 494  }
 495  
 496  // Inbound returns whether the peer is inbound. This function is safe for concurrent access.
 497  func (p *Peer) Inbound() bool {
 498  	return p.inbound
 499  }
 500  
 501  // Services returns the services flag of the remote peer. This function is safe for concurrent access.
 502  func (p *Peer) Services() wire.ServiceFlag {
 503  	p.flagsMtx.Lock()
 504  	services := p.services
 505  	p.flagsMtx.Unlock()
 506  	return services
 507  }
 508  
 509  // UserAgent returns the user agent of the remote peer.
 510  //
 511  // This function is safe for concurrent access.
 512  func (p *Peer) UserAgent() string {
 513  	p.flagsMtx.Lock()
 514  	userAgent := p.userAgent
 515  	p.flagsMtx.Unlock()
 516  	return userAgent
 517  }
 518  
 519  // LastAnnouncedBlock returns the last announced block of the remote peer.
 520  //
 521  // This function is safe for concurrent access.
 522  func (p *Peer) LastAnnouncedBlock() *chainhash.Hash {
 523  	p.statsMtx.RLock()
 524  	lastAnnouncedBlock := p.lastAnnouncedBlock
 525  	p.statsMtx.RUnlock()
 526  	return lastAnnouncedBlock
 527  }
 528  
 529  // LastPingNonce returns the last ping Nonce of the remote peer.
 530  //
 531  // This function is safe for concurrent access.
 532  func (p *Peer) LastPingNonce() uint64 {
 533  	p.statsMtx.RLock()
 534  	lastPingNonce := p.lastPingNonce
 535  	p.statsMtx.RUnlock()
 536  	return lastPingNonce
 537  }
 538  
 539  // LastPingTime returns the last ping time of the remote peer.
 540  //
 541  // This function is safe for concurrent access.
 542  func (p *Peer) LastPingTime() time.Time {
 543  	p.statsMtx.RLock()
 544  	lastPingTime := p.lastPingTime
 545  	p.statsMtx.RUnlock()
 546  	return lastPingTime
 547  }
 548  
 549  // LastPingMicros returns the last ping micros of the remote peer.
 550  //
 551  // This function is safe for concurrent access.
 552  func (p *Peer) LastPingMicros() int64 {
 553  	p.statsMtx.RLock()
 554  	lastPingMicros := p.lastPingMicros
 555  	p.statsMtx.RUnlock()
 556  	return lastPingMicros
 557  }
 558  
 559  // VersionKnown returns the whether or not the version of a peer is known locally.
 560  //
 561  // This function is safe for concurrent access.
 562  func (p *Peer) VersionKnown() bool {
 563  	p.flagsMtx.Lock()
 564  	versionKnown := p.versionKnown
 565  	p.flagsMtx.Unlock()
 566  	return versionKnown
 567  }
 568  
 569  // VerAckReceived returns whether or not a verack message was received by the peer.
 570  //
 571  // This function is safe for concurrent access.
 572  func (p *Peer) VerAckReceived() bool {
 573  	p.flagsMtx.Lock()
 574  	verAckReceived := p.verAckReceived
 575  	p.flagsMtx.Unlock()
 576  	return verAckReceived
 577  }
 578  
 579  // ProtocolVersion returns the negotiated peer protocol version.
 580  //
 581  // This function is safe for concurrent access.
 582  func (p *Peer) ProtocolVersion() uint32 {
 583  	p.flagsMtx.Lock()
 584  	protocolVersion := p.protocolVersion
 585  	p.flagsMtx.Unlock()
 586  	return protocolVersion
 587  }
 588  
 589  // LastBlock returns the last block of the peer.
 590  //
 591  // This function is safe for concurrent access.
 592  func (p *Peer) LastBlock() int32 {
 593  	p.statsMtx.RLock()
 594  	lastBlock := p.lastBlock
 595  	p.statsMtx.RUnlock()
 596  	return lastBlock
 597  }
 598  
 599  // LastSend returns the last send time of the peer.
 600  //
 601  // This function is safe for concurrent access.
 602  func (p *Peer) LastSend() time.Time {
 603  	return time.Unix(atomic.LoadInt64(&p.lastSend), 0)
 604  }
 605  
 606  // LastRecv returns the last recv time of the peer.
 607  //
 608  // This function is safe for concurrent access.
 609  func (p *Peer) LastRecv() time.Time {
 610  	return time.Unix(atomic.LoadInt64(&p.lastRecv), 0)
 611  }
 612  
 613  // LocalAddr returns the local address of the connection.
 614  //
 615  // This function is safe fo concurrent access.
 616  func (p *Peer) LocalAddr() net.Addr {
 617  	var localAddr net.Addr
 618  	if atomic.LoadInt32(&p.connected) != 0 {
 619  		localAddr = p.conn.LocalAddr()
 620  	}
 621  	return localAddr
 622  }
 623  
 624  // BytesSent returns the total number of bytes sent by the peer.
 625  //
 626  // This function is safe for concurrent access.
 627  func (p *Peer) BytesSent() uint64 {
 628  	return atomic.LoadUint64(&p.bytesSent)
 629  }
 630  
 631  // BytesReceived returns the total number of bytes received by the peer.
 632  //
 633  // This function is safe for concurrent access.
 634  func (p *Peer) BytesReceived() uint64 {
 635  	return atomic.LoadUint64(&p.bytesReceived)
 636  }
 637  
 638  // TimeConnected returns the time at which the peer connected.
 639  //
 640  // This function is safe for concurrent access.
 641  func (p *Peer) TimeConnected() time.Time {
 642  	p.statsMtx.RLock()
 643  	timeConnected := p.timeConnected
 644  	p.statsMtx.RUnlock()
 645  	return timeConnected
 646  }
 647  
 648  // TimeOffset returns the number of seconds the local time was offset from the time the peer reported during the initial
 649  // negotiation phase.
 650  //
 651  // Negative values indicate the remote peer's time is before the local time.
 652  //
 653  // This function is safe for concurrent access.
 654  func (p *Peer) TimeOffset() int64 {
 655  	p.statsMtx.RLock()
 656  	timeOffset := p.timeOffset
 657  	p.statsMtx.RUnlock()
 658  	return timeOffset
 659  }
 660  
 661  // StartingHeight returns the last known height the peer reported during the initial negotiation phase. This function is
 662  // safe for concurrent access.
 663  func (p *Peer) StartingHeight() int32 {
 664  	p.statsMtx.RLock()
 665  	startingHeight := p.startingHeight
 666  	p.statsMtx.RUnlock()
 667  	return startingHeight
 668  }
 669  
 670  // WantsHeaders returns if the peer wants header messages instead of inventory vectors for blocks. This function is safe
 671  // for concurrent access.
 672  func (p *Peer) WantsHeaders() bool {
 673  	p.flagsMtx.Lock()
 674  	sendHeadersPreferred := p.sendHeadersPreferred
 675  	p.flagsMtx.Unlock()
 676  	return sendHeadersPreferred
 677  }
 678  
 679  // // IsWitnessEnabled returns true if the peer has signalled that it supports
 680  // // segregated witness. This function is safe for concurrent access.
 681  // func (p *Peer) IsWitnessEnabled() bool {
 682  // 	p.flagsMtx.Lock()
 683  // 	witnessEnabled := p.witnessEnabled
 684  // 	p.flagsMtx.Unlock()
 685  // 	return witnessEnabled
 686  // }
 687  
 688  // PushAddrMsg sends an addr message to the connected peer using the provided
 689  // addresses.
 690  //
 691  // This function is useful over manually sending the message via QueueMessage since it automatically limits the
 692  // addresses to the maximum number allowed by the message and randomizes the chosen addresses when there are too many.
 693  //
 694  // It returns the addresses that were actually sent and no message will be sent if there are no entries in the provided
 695  // addresses slice. This function is safe for concurrent access.
 696  func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) ([]*wire.NetAddress, error) {
 697  	addressCount := len(addresses)
 698  	// Nothing to send.
 699  	if addressCount == 0 {
 700  		return nil, nil
 701  	}
 702  	msg := wire.NewMsgAddr()
 703  	msg.AddrList = make([]*wire.NetAddress, addressCount)
 704  	copy(msg.AddrList, addresses)
 705  	// Randomize the addresses sent if there are more than the maximum allowed.
 706  	if addressCount > wire.MaxAddrPerMsg {
 707  		// Shuffle the address list.
 708  		for i := 0; i < wire.MaxAddrPerMsg; i++ {
 709  			j := i + rand.Intn(addressCount-i)
 710  			msg.AddrList[i], msg.AddrList[j] = msg.AddrList[j], msg.AddrList[i]
 711  		}
 712  		// Truncate it to the maximum size.
 713  		msg.AddrList = msg.AddrList[:wire.MaxAddrPerMsg]
 714  	}
 715  	p.QueueMessage(msg, nil)
 716  	return msg.AddrList, nil
 717  }
 718  
 719  // PushGetBlocksMsg sends a getblocks message for the provided block locator and stop hash. It will ignore back-to-back
 720  // duplicate requests.
 721  //
 722  // This function is safe for concurrent access.
 723  func (p *Peer) PushGetBlocksMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) (e error) {
 724  	// Extract the begin hash from the block locator, if one was specified, to use for filtering duplicate getblocks
 725  	// requests.
 726  	var beginHash *chainhash.Hash
 727  	if len(locator) > 0 {
 728  		beginHash = locator[0]
 729  	}
 730  	// Filter duplicate getblocks requests.
 731  	p.prevGetBlocksMtx.Lock()
 732  	isDuplicate := p.prevGetBlocksStop != nil && p.prevGetBlocksBegin != nil &&
 733  		beginHash != nil && stopHash.IsEqual(p.prevGetBlocksStop) &&
 734  		beginHash.IsEqual(p.prevGetBlocksBegin)
 735  	p.prevGetBlocksMtx.Unlock()
 736  	if isDuplicate {
 737  		T.F("filtering duplicate [getblocks] with begin hash %v, stop hash %v", beginHash, stopHash)
 738  		return nil
 739  	}
 740  	// Construct the getblocks request and queue it to be sent.
 741  	msg := wire.NewMsgGetBlocks(stopHash)
 742  	for _, hash := range locator {
 743  		e := msg.AddBlockLocatorHash(hash)
 744  		if e != nil {
 745  			return e
 746  		}
 747  	}
 748  	p.QueueMessage(msg, nil)
 749  	// Update the previous getblocks request information for filtering duplicates.
 750  	p.prevGetBlocksMtx.Lock()
 751  	p.prevGetBlocksBegin = beginHash
 752  	p.prevGetBlocksStop = stopHash
 753  	p.prevGetBlocksMtx.Unlock()
 754  	return nil
 755  }
 756  
 757  // PushGetHeadersMsg sends a getblocks message for the provided block locator and stop hash. It will ignore back-to-back
 758  // duplicate requests.
 759  //
 760  // This function is safe for concurrent access.
 761  func (p *Peer) PushGetHeadersMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) (e error) {
 762  	// Extract the begin hash from the block locator, if one was specified, to use for filtering duplicate getheaders
 763  	// requests.
 764  	var beginHash *chainhash.Hash
 765  	if len(locator) > 0 {
 766  		beginHash = locator[0]
 767  	}
 768  	// Filter duplicate getheaders requests.
 769  	p.prevGetHdrsMtx.Lock()
 770  	isDuplicate := p.prevGetHdrsStop != nil && p.prevGetHdrsBegin != nil &&
 771  		beginHash != nil && stopHash.IsEqual(p.prevGetHdrsStop) &&
 772  		beginHash.IsEqual(p.prevGetHdrsBegin)
 773  	p.prevGetHdrsMtx.Unlock()
 774  	if isDuplicate {
 775  		T.Ln(
 776  			"Filtering duplicate [getheaders] with begin hash", beginHash,
 777  		)
 778  		return nil
 779  	}
 780  	// Construct the getheaders request and queue it to be sent.
 781  	msg := wire.NewMsgGetHeaders()
 782  	msg.HashStop = *stopHash
 783  	for _, hash := range locator {
 784  		e := msg.AddBlockLocatorHash(hash)
 785  		if e != nil {
 786  			return e
 787  		}
 788  	}
 789  	p.QueueMessage(msg, nil)
 790  	// Update the previous getheaders request information for filtering
 791  	// duplicates.
 792  	p.prevGetHdrsMtx.Lock()
 793  	p.prevGetHdrsBegin = beginHash
 794  	p.prevGetHdrsStop = stopHash
 795  	p.prevGetHdrsMtx.Unlock()
 796  	return nil
 797  }
 798  
 799  // PushRejectMsg sends a reject message for the provided command, reject code, reject reason, and hash.
 800  //
 801  // The hash will only be used when the command is a tx or block and should be nil in other cases.
 802  //
 803  // The wait parameter will cause the function to block until the reject message has actually been sent. This function is
 804  // safe for concurrent access.
 805  func (p *Peer) PushRejectMsg(command string, code wire.RejectCode, reason string, hash *chainhash.Hash, wait bool) {
 806  	// Don't bother sending the reject message if the protocol version is too
 807  	// low.
 808  	if p.VersionKnown() && p.ProtocolVersion() < wire.RejectVersion {
 809  		return
 810  	}
 811  	msg := wire.NewMsgReject(command, code, reason)
 812  	if command == wire.CmdTx || command == wire.CmdBlock {
 813  		if hash == nil {
 814  			W.Ln(
 815  				"Sending a reject message for command type", command,
 816  				"which should have specified a hash but does not",
 817  			)
 818  			hash = &zeroHash
 819  		}
 820  		msg.Hash = *hash
 821  	}
 822  	// Send the message without waiting if the caller has not requested it.
 823  	if !wait {
 824  		p.QueueMessage(msg, nil)
 825  		return
 826  	}
 827  	// Send the message and block until it has been sent before returning.
 828  	doneChan := qu.Ts(1)
 829  	p.QueueMessage(msg, doneChan)
 830  	<-doneChan
 831  }
 832  
 833  // handlePingMsg is invoked when a peer receives a ping bitcoin message.
 834  // For recent clients (protocol version > BIP0031Version),
 835  // it replies with a pong message.  For older clients,
 836  // it does nothing and anything other than failure is considered a successful
 837  // ping.
 838  func (p *Peer) handlePingMsg(msg *wire.MsgPing) {
 839  	// Only reply with pong if the message is from a new enough client.
 840  	if p.ProtocolVersion() > wire.BIP0031Version {
 841  		// Include Nonce from ping so pong can be identified.
 842  		p.QueueMessage(wire.NewMsgPong(msg.Nonce), nil)
 843  	}
 844  }
 845  
 846  // handlePongMsg is invoked when a peer receives a pong bitcoin message. It updates the ping statistics as required for
 847  // recent clients (protocol version > BIP0031Version). There is no effect for older clients or when a ping was not
 848  // previously sent.
 849  func (p *Peer) handlePongMsg(msg *wire.MsgPong) {
 850  	// Arguably we could use a buffered channel here sending data in a fifo manner whenever we send a ping, or a list
 851  	// keeping track of the times of each ping.
 852  	//
 853  	// For now we just make a best effort and only record stats if it was for the last ping sent. Any preceding and
 854  	// overlapping pings will be ignored. It is unlikely to occur without large usage of the ping rpc call since we ping
 855  	// infrequently enough that if they overlap we would have timed out the peer.
 856  	if p.ProtocolVersion() > wire.BIP0031Version {
 857  		p.statsMtx.Lock()
 858  		if p.lastPingNonce != 0 && msg.Nonce == p.lastPingNonce {
 859  			p.lastPingMicros = time.Since(p.lastPingTime).Nanoseconds()
 860  			p.lastPingMicros /= 1000 // convert to microseconds.
 861  			p.lastPingNonce = 0
 862  		}
 863  		p.statsMtx.Unlock()
 864  	}
 865  }
 866  
 867  // readMessage reads the next bitcoin message from the peer with logging.
 868  func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, error) {
 869  	n, msg, buf, e := wire.ReadMessageWithEncodingN(
 870  		p.conn,
 871  		p.ProtocolVersion(), p.cfg.ChainParams.Net, encoding,
 872  	)
 873  	atomic.AddUint64(&p.bytesReceived, uint64(n))
 874  	if p.cfg.Listeners.OnRead != nil {
 875  		p.cfg.Listeners.OnRead(p, n, msg, e)
 876  	}
 877  	if e != nil {
 878  		T.Ln(e)
 879  		return nil, nil, e
 880  	}
 881  	// // Use closures to log expensive operations so they are only run when the logging level requires it.
 882  	T.C(
 883  		func() (o string) {
 884  			// Debug summary of message.
 885  			summary := messageSummary(msg)
 886  			if len(summary) > 0 {
 887  				summary = " (" + summary + ")"
 888  			}
 889  			o = fmt.Sprintf(
 890  				"Received %v%s from %s",
 891  				msg.Command(), summary, p,
 892  			)
 893  			// o += spew.Sdump(msg)
 894  			// o += spew.Sdump(buf)
 895  			return o
 896  		},
 897  	)
 898  	return msg, buf, nil
 899  }
 900  
 901  // writeMessage sends a bitcoin message to the peer with logging.
 902  func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) (e error) {
 903  	// Don't do anything if we're disconnecting.
 904  	if atomic.LoadInt32(&p.disconnect) != 0 {
 905  		return nil
 906  	}
 907  	// // Use closures to log expensive operations so they are only run when the logging level requires it.
 908  	T.C(
 909  		func() (o string) {
 910  			// Debug summary of message.
 911  			summary := messageSummary(msg)
 912  			if len(summary) > 0 {
 913  				summary = " (" + summary + ")"
 914  			}
 915  			o = fmt.Sprintf(
 916  				"Sending %v%s to %s", msg.Command(),
 917  				summary, p,
 918  			)
 919  			// o += spew.Sdump(msg)
 920  			// var buf bytes.Buffer
 921  			// _, e := wire.WriteMessageWithEncodingN(
 922  			// 	&buf, msg, p.ProtocolVersion(),
 923  			// 	p.cfg.ChainParams.Net, enc,
 924  			// )
 925  			// if e != nil {
 926  			// 	return e.Error()
 927  			// }
 928  			// o += spew.Sdump(buf.Bytes())
 929  			return
 930  		},
 931  	)
 932  	cmd := msg.Command()
 933  	if cmd != "ping" && cmd != "pong" && cmd != "inv" {
 934  		D.C(
 935  			func() string {
 936  				// Debug summary of message.
 937  				summary := messageSummary(msg)
 938  				if len(summary) > 0 {
 939  					summary = " (" + summary + ")"
 940  				}
 941  				o := fmt.Sprintf("Sending %v%s to %s", msg.Command(), summary, p)
 942  				// o += spew.Sdump(msg)
 943  				// var buf bytes.Buffer
 944  				// _, e = wire.WriteMessageWithEncodingN(&buf, msg, p.ProtocolVersion(), p.cfg.ChainParams.Net, enc)
 945  				// if e != nil {
 946  				// 	// 	return e.Error()
 947  				// }
 948  				return o // + spew.Sdump(buf.Bytes())
 949  			},
 950  		)
 951  	}
 952  	// Write the message to the peer.
 953  	n, e := wire.WriteMessageWithEncodingN(
 954  		p.conn, msg,
 955  		p.ProtocolVersion(), p.cfg.ChainParams.Net, enc,
 956  	)
 957  	atomic.AddUint64(&p.bytesSent, uint64(n))
 958  	if p.cfg.Listeners.OnWrite != nil {
 959  		p.cfg.Listeners.OnWrite(p, n, msg, e)
 960  	}
 961  	return e
 962  }
 963  
 964  // isAllowedReadError returns whether or not the passed error is allowed without disconnecting the peer. In particular,
 965  // regression tests need to be allowed to send malformed messages without the peer being disconnected.
 966  func (p *Peer) isAllowedReadError(e error) bool {
 967  	// Only allow read errors in regression test mode.
 968  	if p.cfg.ChainParams.Net != wire.TestNet {
 969  		return false
 970  	}
 971  	// Don't allow the error if it's not specifically a malformed message error.
 972  	if _, ok := e.(*wire.MessageError); !ok {
 973  		return false
 974  	}
 975  	// Don't allow the error if it's not coming from localhost or the hostname can't be determined for some reason.
 976  	var host string
 977  	host, _, e = net.SplitHostPort(p.addr)
 978  	if e != nil {
 979  		return false
 980  	}
 981  	if host != "127.0.0.1" && host != "localhost" {
 982  		return false
 983  	}
 984  	// Allowed if all checks passed.
 985  	return true
 986  }
 987  
 988  // shouldHandleReadError returns whether or not the passed error, which is expected to have come from reading from the
 989  // remote peer in the inHandler, should be logged and responded to with a reject message.
 990  func (p *Peer) shouldHandleReadError(e error) bool {
 991  	// No logging or reject message when the peer is being forcibly disconnected.
 992  	if atomic.LoadInt32(&p.disconnect) != 0 {
 993  		return false
 994  	}
 995  	// No logging or reject message when the remote peer has been disconnected.
 996  	if e == io.EOF {
 997  		return false
 998  	}
 999  	if opErr, ok := e.(*net.OpError); ok && !opErr.Temporary() {
1000  		return false
1001  	}
1002  	return true
1003  }
1004  
1005  // maybeAddDeadline potentially adds a deadline for the appropriate expected response for the passed wire protocol
1006  // command to the pending responses map.
1007  func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd string) {
1008  	// Setup a deadline for each message being sent that expects a response.
1009  	//
1010  	// NOTE: Pings are intentionally ignored here since they are typically sent asynchronously and as a result of a long
1011  	// backlog of messages, such as is typical in the case of initial block download, the response won't be received in
1012  	// time.
1013  	deadline := time.Now().Add(stallResponseTimeout)
1014  	switch msgCmd {
1015  	case wire.CmdVersion:
1016  		// Expects a verack message.
1017  		pendingResponses[wire.CmdVerAck] = deadline
1018  	case wire.CmdMemPool:
1019  		// Expects an inv message.
1020  		pendingResponses[wire.CmdInv] = deadline
1021  	case wire.CmdGetBlocks:
1022  		// Expects an inv message.
1023  		pendingResponses[wire.CmdInv] = deadline
1024  	case wire.CmdGetData:
1025  		// Expects a block, merkleblock, tx, or notfound message.
1026  		pendingResponses[wire.CmdBlock] = deadline
1027  		pendingResponses[wire.CmdMerkleBlock] = deadline
1028  		pendingResponses[wire.CmdTx] = deadline
1029  		pendingResponses[wire.CmdNotFound] = deadline
1030  	case wire.CmdGetHeaders:
1031  		// Expects a headers message.
1032  		//
1033  		// Use a longer deadline since it can take a while for the remote peer to load all of the headers.
1034  		deadline = time.Now().Add(stallResponseTimeout * 3)
1035  		pendingResponses[wire.CmdHeaders] = deadline
1036  	}
1037  }
1038  
1039  // stallHandler handles stall detection for the peer.
1040  //
1041  // This entails keeping track of expected responses and assigning them deadlines while accounting for the time spent in
1042  // callbacks.
1043  //
1044  // It must be run as a goroutine.
1045  func (p *Peer) stallHandler() {
1046  	T.Ln("starting stallHandler for", p.addr)
1047  	// These variables are used to adjust the deadline times forward by the time it takes callbacks to execute.
1048  	//
1049  	// This is done because new messages aren't read until the previous one is finished processing (which includes
1050  	// callbacks), so the deadline for receiving a response for a given message must account for the processing time as
1051  	// well.
1052  	var handlerActive bool
1053  	var handlersStartTime time.Time
1054  	var deadlineOffset time.Duration
1055  	// pendingResponses tracks the expected response deadline times.
1056  	pendingResponses := make(map[string]time.Time)
1057  	// stallTicker is used to periodically check pending responses that have exceeded the expected deadline and
1058  	// disconnect the peer due to stalling.
1059  	stallTicker := time.NewTicker(stallTickInterval)
1060  	defer stallTicker.Stop()
1061  	// ioStopped is used to detect when both the input and output handler goroutines are done.
1062  	var ioStopped bool
1063  out:
1064  	for {
1065  		select {
1066  		case msg := <-p.stallControl:
1067  			switch msg.command {
1068  			case sccSendMessage:
1069  				// Add a deadline for the expected response message if needed.
1070  				p.maybeAddDeadline(
1071  					pendingResponses,
1072  					msg.message.Command(),
1073  				)
1074  			case sccReceiveMessage:
1075  				// Remove received messages from the expected response map.
1076  				//
1077  				// Since certain commands expect one of a group of responses, remove everything in the expected group
1078  				// accordingly.
1079  				switch msgCmd := msg.message.Command(); msgCmd {
1080  				case wire.CmdBlock:
1081  					fallthrough
1082  				case wire.CmdMerkleBlock:
1083  					fallthrough
1084  				case wire.CmdTx:
1085  					fallthrough
1086  				case wire.CmdNotFound:
1087  					delete(pendingResponses, wire.CmdBlock)
1088  					delete(pendingResponses, wire.CmdMerkleBlock)
1089  					delete(pendingResponses, wire.CmdTx)
1090  					delete(pendingResponses, wire.CmdNotFound)
1091  				default:
1092  					delete(pendingResponses, msgCmd)
1093  				}
1094  			case sccHandlerStart:
1095  				// Warn on unbalanced callback signalling.
1096  				if handlerActive {
1097  					W.Ln(
1098  						"Received handler start control command while a handler is already active",
1099  					)
1100  					continue
1101  				}
1102  				handlerActive = true
1103  				handlersStartTime = time.Now()
1104  			case sccHandlerDone:
1105  				// Warn on unbalanced callback signalling.
1106  				if !handlerActive {
1107  					W.Ln(
1108  						"Received handler done control command when a handler is not already active",
1109  					)
1110  					continue
1111  				}
1112  				// Extend active deadlines by the time it took to execute the callback.
1113  				duration := time.Since(handlersStartTime)
1114  				deadlineOffset += duration
1115  				handlerActive = false
1116  			default:
1117  				W.Ln(
1118  					"Unsupported message command", msg.command,
1119  				)
1120  			}
1121  		case <-stallTicker.C:
1122  			// Calculate the offset to apply to the deadline based on how long the handlers have taken to execute since
1123  			// the last tick.
1124  			now := time.Now()
1125  			offset := deadlineOffset
1126  			if handlerActive {
1127  				offset += now.Sub(handlersStartTime)
1128  			}
1129  			// Disconnect the peer if any of the pending responses don't arrive by their adjusted deadline.
1130  			for command, deadline := range pendingResponses {
1131  				if now.Before(deadline.Add(offset)) {
1132  					continue
1133  				}
1134  				D.F(
1135  					"Peer %s appears to be stalled or misbehaving, %s timeout -- disconnecting",
1136  					p,
1137  					command,
1138  				)
1139  				p.Disconnect()
1140  				break
1141  			}
1142  			// Reset the deadline offset for the next tick.
1143  			deadlineOffset = 0
1144  		case <-p.inQuit.Wait():
1145  			// The stall handler can exit once both the input and output handler goroutines are done.
1146  			if ioStopped {
1147  				break out
1148  			}
1149  			ioStopped = true
1150  		case <-p.outQuit.Wait():
1151  			// The stall handler can exit once both the input and output handler goroutines are done.
1152  			if ioStopped {
1153  				break out
1154  			}
1155  			ioStopped = true
1156  		}
1157  	}
1158  	// Drain any wait channels before going away so there is nothing left waiting on this goroutine.
1159  cleanup:
1160  	for {
1161  		select {
1162  		case <-p.stallControl:
1163  		default:
1164  			break cleanup
1165  		}
1166  	}
1167  	T.Ln("peer stall handler done for", p)
1168  }
1169  
1170  // inHandler handles all incoming messages for the peer.
1171  //
1172  // It must be run as a goroutine.
1173  func (p *Peer) inHandler() {
1174  	T.Ln("starting inHandler for", p.addr)
1175  	// The timer is stopped when a new message is received and reset after it is processed.
1176  	idleTimer := time.AfterFunc(
1177  		idleTimeout, func() {
1178  			W.F("peer %s no answer for %s -- disconnecting", p, idleTimeout)
1179  			p.Disconnect()
1180  		},
1181  	)
1182  out:
1183  	for atomic.LoadInt32(&p.disconnect) == 0 {
1184  		// Read a message and stop the idle timer as soon as the read is done. The timer is reset below for the next
1185  		// iteration if needed.
1186  		rMsg, buf, e := p.readMessage(p.wireEncoding)
1187  		idleTimer.Stop()
1188  		if e != nil {
1189  			T.Ln(e)
1190  			// In order to allow regression tests with malformed messages, don't disconnect the peer when we're in
1191  			// regression test mode and the error is one of the allowed errors.
1192  			if p.isAllowedReadError(e) {
1193  				E.F("allowed test error from %s: %v", p, e)
1194  				idleTimer.Reset(idleTimeout)
1195  				continue
1196  			}
1197  			// Only log the error and send reject message if the local peer is not forcibly disconnecting and the remote
1198  			// peer has not disconnected.
1199  			if p.shouldHandleReadError(e) {
1200  				errMsg := fmt.Sprintf("Can't read message from %s: %v", p, e)
1201  				if e != io.ErrUnexpectedEOF {
1202  					E.Ln(errMsg)
1203  				}
1204  				// Push a reject message for the malformed message and wait for the message to be sent before
1205  				// disconnecting.
1206  				//
1207  				// NOTE: Ideally this would include the command in the header if at least that much of the message was
1208  				// valid, but that is not currently exposed by wire, so just used malformed for the command.
1209  				p.PushRejectMsg("malformed", wire.RejectMalformed, errMsg, nil, true)
1210  			}
1211  			break out
1212  		}
1213  		atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
1214  		p.stallControl <- stallControlMsg{sccReceiveMessage, rMsg}
1215  		// Handle each supported message type.
1216  		p.stallControl <- stallControlMsg{sccHandlerStart, rMsg}
1217  		switch msg := rMsg.(type) {
1218  		case *wire.MsgVersion:
1219  			// Limit to one version message per peer.
1220  			p.PushRejectMsg(
1221  				msg.Command(), wire.RejectDuplicate,
1222  				"duplicate version message", nil, true,
1223  			)
1224  			break out
1225  		case *wire.MsgVerAck:
1226  			// No read lock is necessary because verAckReceived is not written to in any other goroutine.
1227  			//
1228  			// Because of the potential for an attacker to use the UAC based node identifiers to cause a peer to
1229  			// disconnect from the attacked node, we have commented this thing out.
1230  			//
1231  			// if p.verAckReceived {
1232  			// 	I.F("already received 'verack' from peer %v"+
1233  			// 		" -- disconnecting", p)
1234  			// 	break out
1235  			// }
1236  			//
1237  			// because of the commented section above, we won't run this if the peer is already marked
1238  			// VerAckReceived. This basically responds to spurious veracks by dropping them
1239  			if !p.verAckReceived {
1240  				p.flagsMtx.Lock()
1241  				p.verAckReceived = true
1242  				p.flagsMtx.Unlock()
1243  				if p.cfg.Listeners.OnVerAck != nil {
1244  					p.cfg.Listeners.OnVerAck(p, msg)
1245  				}
1246  			}
1247  		case *wire.MsgGetAddr:
1248  			if p.cfg.Listeners.OnGetAddr != nil {
1249  				p.cfg.Listeners.OnGetAddr(p, msg)
1250  			}
1251  		case *wire.MsgAddr:
1252  			if p.cfg.Listeners.OnAddr != nil {
1253  				p.cfg.Listeners.OnAddr(p, msg)
1254  			}
1255  		case *wire.MsgPing:
1256  			p.handlePingMsg(msg)
1257  			if p.cfg.Listeners.OnPing != nil {
1258  				p.cfg.Listeners.OnPing(p, msg)
1259  			}
1260  		case *wire.MsgPong:
1261  			p.handlePongMsg(msg)
1262  			if p.cfg.Listeners.OnPong != nil {
1263  				p.cfg.Listeners.OnPong(p, msg)
1264  			}
1265  		case *wire.MsgAlert:
1266  			if p.cfg.Listeners.OnAlert != nil {
1267  				p.cfg.Listeners.OnAlert(p, msg)
1268  			}
1269  		case *wire.MsgMemPool:
1270  			if p.cfg.Listeners.OnMemPool != nil {
1271  				p.cfg.Listeners.OnMemPool(p, msg)
1272  			}
1273  		case *wire.MsgTx:
1274  			if p.cfg.Listeners.OnTx != nil {
1275  				p.cfg.Listeners.OnTx(p, msg)
1276  			}
1277  		case *wire.Block:
1278  			if p.cfg.Listeners.OnBlock != nil {
1279  				p.cfg.Listeners.OnBlock(p, msg, buf)
1280  			}
1281  		case *wire.MsgInv:
1282  			if p.cfg.Listeners.OnInv != nil {
1283  				p.cfg.Listeners.OnInv(p, msg)
1284  			}
1285  		case *wire.MsgHeaders:
1286  			if p.cfg.Listeners.OnHeaders != nil {
1287  				p.cfg.Listeners.OnHeaders(p, msg)
1288  			}
1289  		case *wire.MsgNotFound:
1290  			if p.cfg.Listeners.OnNotFound != nil {
1291  				p.cfg.Listeners.OnNotFound(p, msg)
1292  			}
1293  		case *wire.MsgGetData:
1294  			if p.cfg.Listeners.OnGetData != nil {
1295  				p.cfg.Listeners.OnGetData(p, msg)
1296  			}
1297  		case *wire.MsgGetBlocks:
1298  			if p.cfg.Listeners.OnGetBlocks != nil {
1299  				p.cfg.Listeners.OnGetBlocks(p, msg)
1300  			}
1301  		case *wire.MsgGetHeaders:
1302  			if p.cfg.Listeners.OnGetHeaders != nil {
1303  				p.cfg.Listeners.OnGetHeaders(p, msg)
1304  			}
1305  		case *wire.MsgGetCFilters:
1306  			if p.cfg.Listeners.OnGetCFilters != nil {
1307  				p.cfg.Listeners.OnGetCFilters(p, msg)
1308  			}
1309  		case *wire.MsgGetCFHeaders:
1310  			if p.cfg.Listeners.OnGetCFHeaders != nil {
1311  				p.cfg.Listeners.OnGetCFHeaders(p, msg)
1312  			}
1313  		case *wire.MsgGetCFCheckpt:
1314  			if p.cfg.Listeners.OnGetCFCheckpt != nil {
1315  				p.cfg.Listeners.OnGetCFCheckpt(p, msg)
1316  			}
1317  		case *wire.MsgCFilter:
1318  			if p.cfg.Listeners.OnCFilter != nil {
1319  				p.cfg.Listeners.OnCFilter(p, msg)
1320  			}
1321  		case *wire.MsgCFHeaders:
1322  			if p.cfg.Listeners.OnCFHeaders != nil {
1323  				p.cfg.Listeners.OnCFHeaders(p, msg)
1324  			}
1325  		case *wire.MsgFeeFilter:
1326  			if p.cfg.Listeners.OnFeeFilter != nil {
1327  				p.cfg.Listeners.OnFeeFilter(p, msg)
1328  			}
1329  		case *wire.MsgFilterAdd:
1330  			if p.cfg.Listeners.OnFilterAdd != nil {
1331  				p.cfg.Listeners.OnFilterAdd(p, msg)
1332  			}
1333  		case *wire.MsgFilterClear:
1334  			if p.cfg.Listeners.OnFilterClear != nil {
1335  				p.cfg.Listeners.OnFilterClear(p, msg)
1336  			}
1337  		case *wire.MsgFilterLoad:
1338  			if p.cfg.Listeners.OnFilterLoad != nil {
1339  				p.cfg.Listeners.OnFilterLoad(p, msg)
1340  			}
1341  		case *wire.MsgMerkleBlock:
1342  			if p.cfg.Listeners.OnMerkleBlock != nil {
1343  				p.cfg.Listeners.OnMerkleBlock(p, msg)
1344  			}
1345  		case *wire.MsgReject:
1346  			if p.cfg.Listeners.OnReject != nil {
1347  				p.cfg.Listeners.OnReject(p, msg)
1348  			}
1349  		case *wire.MsgSendHeaders:
1350  			p.flagsMtx.Lock()
1351  			p.sendHeadersPreferred = true
1352  			p.flagsMtx.Unlock()
1353  			if p.cfg.Listeners.OnSendHeaders != nil {
1354  				p.cfg.Listeners.OnSendHeaders(p, msg)
1355  			}
1356  		default:
1357  			D.F(
1358  				"Received unhandled message of type %v from %v %s",
1359  				rMsg.Command(),
1360  				p,
1361  			)
1362  		}
1363  		p.stallControl <- stallControlMsg{sccHandlerDone, rMsg}
1364  		// A message was received so reset the idle timer.
1365  		idleTimer.Reset(idleTimeout)
1366  	}
1367  	// Ensure the idle timer is stopped to avoid leaking the resource.
1368  	idleTimer.Stop()
1369  	// Ensure connection is closed.
1370  	p.Disconnect()
1371  	p.inQuit.Q()
1372  	T.Ln("peer input handler done for", p)
1373  }
1374  
1375  // queueHandler handles the queuing of outgoing data for the peer.
1376  //
1377  // This runs as a muxer for various sources of input so we can ensure that server and peer handlers will not block on us
1378  // sending a message.
1379  //
1380  // That data is then passed on outHandler to be actually written.
1381  func (p *Peer) queueHandler() {
1382  	T.Ln("starting queueHandler for", p.addr)
1383  	pendingMsgs := list.New()
1384  	invSendQueue := list.New()
1385  	trickleTicker := time.NewTicker(p.cfg.TrickleInterval)
1386  	defer trickleTicker.Stop()
1387  	// We keep the waiting flag so that we know if we have a message queued to the outHandler or not.
1388  	//
1389  	// We could use the presence of a head of the list for this but then we have rather racy concerns about whether it
1390  	// has gotten it at cleanup time - and thus who sends on the message's done channel.
1391  	//
1392  	// To avoid such confusion we keep a different flag and pendingMsgs only contains messages that we have not yet
1393  	// passed to outHandler.
1394  	waiting := false
1395  	// To avoid duplication below.
1396  	queuePacket := func(msg outMsg, list *list.List, waiting bool) bool {
1397  		if !waiting {
1398  			p.sendQueue <- msg
1399  		} else {
1400  			list.PushBack(msg)
1401  		}
1402  		// we are always waiting now.
1403  		return true
1404  	}
1405  out:
1406  	for {
1407  		select {
1408  		case msg := <-p.outputQueue:
1409  			waiting = queuePacket(msg, pendingMsgs, waiting)
1410  		// This channel is notified when a message has been sent across the network socket.
1411  		case <-p.sendDoneQueue.Wait():
1412  			// No longer waiting if there are no more messages in the pending messages queue.
1413  			next := pendingMsgs.Front()
1414  			if next == nil {
1415  				waiting = false
1416  				continue
1417  			}
1418  			// Notify the outHandler about the next item to asynchronously send.
1419  			val := pendingMsgs.Remove(next)
1420  			p.sendQueue <- val.(outMsg)
1421  		case iv := <-p.outputInvChan:
1422  			// No handshake?  They'll find out soon enough.
1423  			if p.VersionKnown() {
1424  				// If this is a new block, then we'll blast it out immediately, sipping the inv trickle queue.
1425  				if iv.Type == wire.InvTypeBlock {
1426  					invMsg := wire.NewMsgInvSizeHint(1)
1427  					e := invMsg.AddInvVect(iv)
1428  					if e != nil {
1429  						D.Ln(e)
1430  					}
1431  					waiting = queuePacket(
1432  						outMsg{msg: invMsg},
1433  						pendingMsgs, waiting,
1434  					)
1435  				} else {
1436  					invSendQueue.PushBack(iv)
1437  				}
1438  			}
1439  		case <-trickleTicker.C:
1440  			// Don't send anything if we're disconnecting or there is no queued inventory. version is known if send
1441  			// queue has any entries.
1442  			if atomic.LoadInt32(&p.disconnect) != 0 ||
1443  				invSendQueue.Len() == 0 {
1444  				continue
1445  			}
1446  			// Create and send as many inv messages as needed to drain the inventory send queue.
1447  			invMsg := wire.NewMsgInvSizeHint(uint(invSendQueue.Len()))
1448  			for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() {
1449  				iv := invSendQueue.Remove(e).(*wire.InvVect)
1450  				// Don't send inventory that became known after the initial check.
1451  				if p.knownInventory.Exists(iv) {
1452  					continue
1453  				}
1454  				e := invMsg.AddInvVect(iv)
1455  				if e != nil {
1456  					D.Ln(e)
1457  				}
1458  				if len(invMsg.InvList) >= maxInvTrickleSize {
1459  					waiting = queuePacket(
1460  						outMsg{msg: invMsg},
1461  						pendingMsgs, waiting,
1462  					)
1463  					invMsg = wire.NewMsgInvSizeHint(uint(invSendQueue.Len()))
1464  				}
1465  				// Add the inventory that is being relayed to the known inventory for the peer.
1466  				p.AddKnownInventory(iv)
1467  			}
1468  			if len(invMsg.InvList) > 0 {
1469  				waiting = queuePacket(
1470  					outMsg{msg: invMsg},
1471  					pendingMsgs, waiting,
1472  				)
1473  			}
1474  		case <-p.quit.Wait():
1475  			break out
1476  		}
1477  	}
1478  	// Drain any wait channels before we go away so we don't leave something waiting for us.
1479  	for e := pendingMsgs.Front(); e != nil; e = pendingMsgs.Front() {
1480  		val := pendingMsgs.Remove(e)
1481  		msg := val.(outMsg)
1482  		if msg.doneChan != nil {
1483  			msg.doneChan <- struct{}{}
1484  		}
1485  	}
1486  cleanup:
1487  	for {
1488  		select {
1489  		case msg := <-p.outputQueue:
1490  			if msg.doneChan != nil {
1491  				msg.doneChan <- struct{}{}
1492  			}
1493  		case <-p.outputInvChan:
1494  			// Just drain channel sendDoneQueue is buffered so doesn't need draining.
1495  		default:
1496  			break cleanup
1497  		}
1498  	}
1499  	p.queueQuit.Q()
1500  	T.Ln("peer queue handler done for", p)
1501  }
1502  
1503  // shouldLogWriteError returns whether or not the passed error, which is expected to have come from writing to the
1504  // remote peer in the outHandler, should be logged.
1505  func (p *Peer) shouldLogWriteError(e error) bool {
1506  	// No logging when the peer is being forcibly disconnected.
1507  	if atomic.LoadInt32(&p.disconnect) != 0 {
1508  		return false
1509  	}
1510  	// No logging when the remote peer has been disconnected.
1511  	if e == io.EOF {
1512  		return false
1513  	}
1514  	if opErr, ok := e.(*net.OpError); ok && !opErr.Temporary() {
1515  		return false
1516  	}
1517  	return true
1518  }
1519  
1520  // outHandler handles all outgoing messages for the peer.
1521  //
1522  // It must be run as a goroutine.
1523  //
1524  // It uses a buffered channel to serialize output messages while allowing the sender to continue running asynchronously.
1525  func (p *Peer) outHandler() {
1526  	T.Ln("starting outHandler for", p.addr)
1527  out:
1528  	for {
1529  		select {
1530  		case msg := <-p.sendQueue:
1531  			switch m := msg.msg.(type) {
1532  			case *wire.MsgPing:
1533  				// Only expects a pong message in later protocol versions. Also set up statistics.
1534  				if p.ProtocolVersion() > wire.BIP0031Version {
1535  					p.statsMtx.Lock()
1536  					p.lastPingNonce = m.Nonce
1537  					p.lastPingTime = time.Now()
1538  					p.statsMtx.Unlock()
1539  				}
1540  			}
1541  			p.stallControl <- stallControlMsg{sccSendMessage, msg.msg}
1542  			e := p.writeMessage(msg.msg, msg.encoding)
1543  			if e != nil {
1544  				p.Disconnect()
1545  				if p.shouldLogWriteError(e) {
1546  					E.F("failed to send message to %s: %v", p, e)
1547  				}
1548  				if msg.doneChan != nil {
1549  					msg.doneChan <- struct{}{}
1550  				}
1551  				continue
1552  			}
1553  			// At this point, the message was successfully sent, so update the last send time, signal the sender of the
1554  			// message that it has been sent ( if requested), and signal the send queue to the deliver the next queued
1555  			// message.
1556  			atomic.StoreInt64(&p.lastSend, time.Now().Unix())
1557  			if msg.doneChan != nil {
1558  				msg.doneChan <- struct{}{}
1559  			}
1560  			p.sendDoneQueue <- struct{}{}
1561  		case <-p.quit.Wait():
1562  			break out
1563  		}
1564  	}
1565  	<-p.queueQuit
1566  	// Drain any wait channels before we go away so we don't leave something waiting for us. We have waited on queueQuit
1567  	// and thus we can be sure that we will not miss anything sent on sendQueue.
1568  cleanup:
1569  	for {
1570  		select {
1571  		case msg := <-p.sendQueue:
1572  			if msg.doneChan != nil {
1573  				msg.doneChan <- struct{}{}
1574  			}
1575  			// no need to send on sendDoneQueue since queueHandler has been waited on and already exited.
1576  		default:
1577  			break cleanup
1578  		}
1579  	}
1580  	p.outQuit.Q()
1581  	T.Ln("peer output handler done for", p)
1582  }
1583  
1584  // pingHandler periodically pings the peer.  It must be run as a goroutine.
1585  func (p *Peer) pingHandler() {
1586  	T.Ln("starting pingHandler for", p.addr)
1587  	pingTicker := time.NewTicker(pingInterval)
1588  	defer pingTicker.Stop()
1589  out:
1590  	for {
1591  		select {
1592  		case <-pingTicker.C:
1593  			nonce, e := wire.RandomUint64()
1594  			if e != nil {
1595  				E.F("not sending ping to %s: %v", p, e)
1596  				continue
1597  			}
1598  			p.QueueMessage(wire.NewMsgPing(nonce), nil)
1599  		case <-p.quit.Wait():
1600  			break out
1601  		}
1602  	}
1603  }
1604  
1605  // QueueMessage adds the passed bitcoin message to the peer send queue. This function is safe for concurrent access.
1606  func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) {
1607  	p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding)
1608  }
1609  
1610  // QueueMessageWithEncoding adds the passed bitcoin message to the peer send queue. This function is identical to
1611  // QueueMessage, however it allows the caller to specify the wire encoding type that should be used when
1612  // encoding/decoding blocks and transactions.
1613  //
1614  // This function is safe for concurrent access.
1615  func (p *Peer) QueueMessageWithEncoding(
1616  	msg wire.Message, doneChan chan<- struct{},
1617  	encoding wire.MessageEncoding,
1618  ) {
1619  	// Avoid risk of deadlock if goroutine already exited. The goroutine we will be sending to hangs around until it
1620  	// knows for a fact that it is marked as disconnected and *then* it drains the channels.
1621  	if !p.Connected() {
1622  		if doneChan != nil {
1623  			go func() {
1624  				doneChan <- struct{}{}
1625  			}()
1626  		}
1627  		return
1628  	}
1629  	p.outputQueue <- outMsg{msg: msg, encoding: encoding, doneChan: doneChan}
1630  }
1631  
1632  // QueueInventory adds the passed inventory to the inventory send queue which might not be sent right away, rather it is
1633  // trickled to the peer in batches.
1634  //
1635  // Inventory that the peer is already known to have is ignored.
1636  //
1637  // This function is safe for concurrent access.
1638  func (p *Peer) QueueInventory(invVect *wire.InvVect) {
1639  	// Don't add the inventory to the send queue if the peer is already known to have it.
1640  	if p.knownInventory.Exists(invVect) {
1641  		return
1642  	}
1643  	// Avoid risk of deadlock if goroutine already exited. The goroutine we will be sending to hangs around until it
1644  	// knows for a fact that it is marked as disconnected and *then* it drains the channels.
1645  	if !p.Connected() {
1646  		return
1647  	}
1648  	p.outputInvChan <- invVect
1649  }
1650  
1651  // Connected returns whether or not the peer is currently connected. This function is safe for concurrent access.
1652  func (p *Peer) Connected() bool {
1653  	return atomic.LoadInt32(&p.connected) != 0 &&
1654  		atomic.LoadInt32(&p.disconnect) == 0
1655  }
1656  
1657  // Disconnect disconnects the peer by closing the connection. Calling this function when the peer is already
1658  // disconnected or in the process of disconnecting will have no effect.
1659  func (p *Peer) Disconnect() {
1660  	if atomic.AddInt32(&p.disconnect, 1) != 1 {
1661  		return
1662  	}
1663  	T.Ln("disconnecting", p, log.Caller("from", 1))
1664  	if atomic.LoadInt32(&p.connected) != 0 {
1665  		_ = p.conn.Close()
1666  	}
1667  	p.quit.Q()
1668  }
1669  
1670  // readRemoteVersionMsg waits for the next message to arrive from the remote peer. If the next message is not a version
1671  // message or the version is not acceptable then return an error.
1672  func (p *Peer) readRemoteVersionMsg() (msg *wire.MsgVersion, e error) {
1673  	if p.versionKnown {
1674  		D.Ln("received version previously, dropping")
1675  		return
1676  	}
1677  	// Read their version message.
1678  	var remoteMsg wire.Message
1679  	remoteMsg, _, e = p.readMessage(wire.LatestEncoding)
1680  	if e != nil {
1681  		if e != io.EOF {
1682  		}
1683  		return
1684  	}
1685  	// Notify and disconnect clients if the first message is not a version message.
1686  	var ok bool
1687  	msg, ok = remoteMsg.(*wire.MsgVersion)
1688  	if !ok {
1689  		reason := "a version message must precede all others"
1690  		rejectMsg := wire.NewMsgReject(
1691  			msg.Command(), wire.RejectMalformed,
1692  			reason,
1693  		)
1694  		_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
1695  		e = errors.New(reason)
1696  		return
1697  	}
1698  	// Detect self connections.
1699  	if !AllowSelfConns && SentNonces.Exists(msg.Nonce) {
1700  		e = errors.New("disconnecting peer connected to self")
1701  		return
1702  	}
1703  	// Negotiate the protocol version and set the services to what the remote peer advertised.
1704  	p.flagsMtx.Lock()
1705  	p.Nonce = msg.Nonce
1706  	p.advertisedProtoVer = uint32(msg.ProtocolVersion)
1707  	p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
1708  	p.versionKnown = true
1709  	p.services = msg.Services
1710  	p.flagsMtx.Unlock()
1711  	T.F(
1712  		"negotiated protocol version %d for peer %s",
1713  		p.protocolVersion, p,
1714  	)
1715  	// Updating a bunch of stats including block based stats, and the peer's time offset.
1716  	p.statsMtx.Lock()
1717  	p.lastBlock = msg.LastBlock
1718  	p.startingHeight = msg.LastBlock
1719  	p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
1720  	p.statsMtx.Unlock()
1721  	// Set the peer's ID, user agent, and potentially the flag which specifies the
1722  	// witness support is enabled.
1723  	p.flagsMtx.Lock()
1724  	p.id = atomic.AddInt32(&nodeCount, 1)
1725  	p.userAgent = msg.UserAgent
1726  	// // Determine if the peer would like to receive witness data with transactions,
1727  	// // or not.
1728  	// if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
1729  	// 	p.witnessEnabled = true
1730  	// }
1731  	p.flagsMtx.Unlock()
1732  	// // Once the version message has been exchanged, we're able to determine if this
1733  	// // peer knows how to encode witness data over the wire protocol. If so, then
1734  	// // we'll switch to a decoding mode which is prepared for the new transaction
1735  	// // format introduced as part of BIP0144.
1736  	// if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
1737  	// 	p.wireEncoding = wire.BaseEncoding
1738  	// }
1739  	// Invoke the callback if specified.
1740  	if p.cfg.Listeners.OnVersion != nil {
1741  		I.Ln("writing version message")
1742  		rejectMsg := p.cfg.Listeners.OnVersion(p, msg)
1743  		if rejectMsg != nil {
1744  			_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
1745  			e = errors.New(rejectMsg.Reason)
1746  			return
1747  		}
1748  	}
1749  	// Notify and disconnect clients that have a protocol version that is too old.
1750  	//
1751  	// NOTE: If minAcceptableProtocolVersion is raised to be higher than wire.RejectVersion, this should send a reject
1752  	// packet before disconnecting.
1753  	if uint32(msg.ProtocolVersion) < MinAcceptableProtocolVersion {
1754  		// Send a reject message indicating the protocol version is obsolete
1755  		// and wait for the message to be sent before disconnecting.
1756  		reason := fmt.Sprintf(
1757  			"protocol version must be %d or greater",
1758  			MinAcceptableProtocolVersion,
1759  		)
1760  		rejectMsg := wire.NewMsgReject(
1761  			msg.Command(), wire.RejectObsolete,
1762  			reason,
1763  		)
1764  		_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
1765  		e = errors.New(reason)
1766  		return
1767  	}
1768  	return
1769  }
1770  
1771  // localVersionMsg creates a version message that can be used to send to the remote peer.
1772  func (p *Peer) localVersionMsg() (mv *wire.MsgVersion, e error) {
1773  	var blockNum int32
1774  	if p.cfg.NewestBlock != nil {
1775  		_, blockNum, e = p.cfg.NewestBlock()
1776  		if e != nil {
1777  			return nil, e
1778  		}
1779  	}
1780  	theirNA := p.na
1781  	// If we are behind a proxy and the connection comes from the proxy then we return an non routeable address as their
1782  	// address. This is to prevent leaking the tor proxy address.
1783  	if p.cfg.Proxy != "" {
1784  		var proxyAddress string
1785  		proxyAddress, _, e = net.SplitHostPort(p.cfg.Proxy)
1786  		// invalid proxy means poorly configured, be on the safe side.
1787  		if e != nil || p.na.IP.String() == proxyAddress {
1788  			theirNA = wire.NewNetAddressIPPort(
1789  				[]byte{0, 0, 0, 0}, 0,
1790  				theirNA.Services,
1791  			)
1792  		}
1793  	}
1794  	// Create a wire.NetAddress with only the services set to use as the "addrme" in the version message.
1795  	//
1796  	// Older nodes previously added the IP and port information to the address manager which proved to be unreliable as
1797  	// an inbound connection from a peer didn't necessarily mean the peer itself accepted inbound connections.
1798  	//
1799  	// Also, the timestamp is unused in the version message.
1800  	// I.Ln(p.addr)
1801  	// var h string
1802  	// var port string
1803  	// if h, port, e = net.SplitHostPort(p.addr); E.Chk(e) {
1804  	// }
1805  	// var portN int64
1806  	// if portN, e = strconv.ParseInt(port, 10, 64); E.Chk(e) {
1807  	// }
1808  	// ipAddr := net.ParseIP(h)
1809  	ourNA := &wire.NetAddress{
1810  		Timestamp: time.Now(),
1811  		Services:  p.cfg.Services,
1812  		IP:        p.IP,
1813  		Port:      p.Port,
1814  	}
1815  	// Generate a unique Nonce for this peer so self connections can be detected. This is accomplished by adding it to a
1816  	// size-limited map of recently seen nonces.
1817  	nonce := uint64(rand.Int63())
1818  	SentNonces.Add(nonce)
1819  	// Version message.
1820  	msg := wire.NewMsgVersion(ourNA, theirNA, nonce, blockNum)
1821  	e = msg.AddUserAgent(
1822  		p.cfg.UserAgentName, p.cfg.UserAgentVersion,
1823  		p.cfg.UserAgentComments...,
1824  	)
1825  	if e != nil {
1826  	}
1827  	// Advertise local services.
1828  	msg.Services = p.cfg.Services
1829  	// Advertise our max supported protocol version.
1830  	msg.ProtocolVersion = int32(p.cfg.ProtocolVersion)
1831  	// Advertise if inv messages for transactions are desired.
1832  	msg.DisableRelayTx = p.cfg.DisableRelayTx
1833  	return msg, nil
1834  }
1835  
1836  // writeLocalVersionMsg writes our version message to the remote peer.
1837  func (p *Peer) writeLocalVersionMsg() (msg *wire.MsgVersion, e error) {
1838  	if msg, e = p.localVersionMsg(); E.Chk(e) {
1839  		return
1840  	}
1841  	return msg, p.writeMessage(msg, wire.LatestEncoding)
1842  }
1843  
1844  // negotiateInboundProtocol waits to receive a version message from the peer then sends our version message.
1845  //
1846  // If the events do not occur in that order then it returns an error.
1847  func (p *Peer) negotiateInboundProtocol() (msg *wire.MsgVersion, e error) {
1848  	if msg, e = p.readRemoteVersionMsg(); E.Chk(e) {
1849  		return
1850  	}
1851  	return p.writeLocalVersionMsg()
1852  }
1853  
1854  // negotiateOutboundProtocol sends our version message then waits to receive a version message from the peer.
1855  //
1856  // If the events do not occur in that order then it returns an error.
1857  func (p *Peer) negotiateOutboundProtocol() (msg *wire.MsgVersion, e error) {
1858  	if msg, e = p.writeLocalVersionMsg(); E.Chk(e) {
1859  		return
1860  	}
1861  	return p.readRemoteVersionMsg()
1862  }
1863  
1864  // start begins processing input and output messages.
1865  func (p *Peer) start(msgChan chan *wire.MsgVersion) (e error) {
1866  	T.Ln("starting peer", p, p.LocalAddr())
1867  	negotiateErr := make(chan error, 1)
1868  	go func() {
1869  		var ee error
1870  		var msg *wire.MsgVersion
1871  		if p.inbound {
1872  			if msg, ee = p.negotiateInboundProtocol(); E.Chk(ee) {
1873  				negotiateErr <- ee
1874  			}
1875  		} else {
1876  			if msg, e = p.negotiateOutboundProtocol(); E.Chk(ee) {
1877  				negotiateErr <- ee
1878  			}
1879  		}
1880  		I.Ln("sending version message back")
1881  		msgChan <- msg
1882  		I.Ln("sent version message back")
1883  		negotiateErr <- nil
1884  	}()
1885  	// Negotiate the protocol within the specified negotiateTimeout.
1886  	select {
1887  	case e = <-negotiateErr:
1888  		if e != nil {
1889  			if e != io.EOF {
1890  			}
1891  			p.Disconnect()
1892  			return
1893  		}
1894  	case <-time.After(negotiateTimeout):
1895  		p.Disconnect()
1896  		e = errors.New("protocol negotiation timeout")
1897  		return
1898  	}
1899  	T.Ln("connected to", p)
1900  	// The protocol has been negotiated successfully so start processing input and output messages.
1901  	go p.stallHandler()
1902  	go p.inHandler()
1903  	go p.queueHandler()
1904  	go p.outHandler()
1905  	go p.pingHandler()
1906  	// Send our verack message now that the IO processing machinery has started.
1907  	p.QueueMessage(wire.NewMsgVerAck(), nil)
1908  	return
1909  }
1910  
1911  // AssociateConnection associates the given conn to the peer. Calling this function when the peer is already connected
1912  // will have no effect.
1913  func (p *Peer) AssociateConnection(conn net.Conn) (msgChan chan *wire.MsgVersion) {
1914  	// Already connected?
1915  	if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) {
1916  		I.Ln("already connected to peer", conn.RemoteAddr(), conn.LocalAddr())
1917  		return
1918  	}
1919  	p.conn = conn
1920  	p.timeConnected = time.Now()
1921  	if p.inbound {
1922  		p.addr = p.conn.RemoteAddr().String()
1923  		// Set up a NetAddress for the peer to be used with AddrManager.
1924  		//
1925  		// We only do this inbound because outbound set this up at connection time and no point recomputing.
1926  		na, e := newNetAddress(p.conn.RemoteAddr(), p.services)
1927  		if e != nil {
1928  			E.Ln("cannot create remote net address:", e)
1929  			p.Disconnect()
1930  			return
1931  		}
1932  		p.na = na
1933  	}
1934  	msgChan = make(chan *wire.MsgVersion, 1)
1935  	I.Ln("starting peer", conn.RemoteAddr(), conn.LocalAddr())
1936  	go func() {
1937  		if e := p.start(msgChan); E.Chk(e) {
1938  			D.F("cannot start peer %v: %v", p, e)
1939  			p.Disconnect()
1940  		}
1941  		I.Ln("finished starting peer", conn.RemoteAddr(), conn.LocalAddr())
1942  	}()
1943  	I.Ln("returning meanwhile starting peer", conn.RemoteAddr(), conn.LocalAddr())
1944  	return
1945  }
1946  
1947  // WaitForDisconnect waits until the peer has completely disconnected and all resources are cleaned up. This will happen
1948  // if either the local or remote side has been disconnected or the peer is forcibly disconnected via Disconnect.
1949  func (p *Peer) WaitForDisconnect() {
1950  	<-p.quit
1951  }
1952  
1953  // newPeerBase returns a new base bitcoin peer based on the inbound flag. This is used by the NewInboundPeer and
1954  // NewOutboundPeer functions to perform base setup needed by both types of peers.
1955  func newPeerBase(origCfg *Config, inbound bool) *Peer {
1956  	// Default to the max supported protocol version if not specified by the caller.
1957  	cfg := *origCfg // Copy to avoid mutating caller.
1958  	if cfg.ProtocolVersion == 0 {
1959  		cfg.ProtocolVersion = MaxProtocolVersion
1960  	}
1961  	// Set the chain parameters to testnet if the caller did not specify any.
1962  	if cfg.ChainParams == nil {
1963  		cfg.ChainParams = &chaincfg.TestNet3Params
1964  	}
1965  	// Set the trickle interval if a non-positive value is specified.
1966  	if cfg.TrickleInterval <= 0 {
1967  		cfg.TrickleInterval = DefaultTrickleInterval
1968  	}
1969  	p := Peer{
1970  		inbound:         inbound,
1971  		wireEncoding:    wire.BaseEncoding,
1972  		knownInventory:  newMruInventoryMap(maxKnownInventory),
1973  		stallControl:    make(chan stallControlMsg, 1), // nonblocking sync
1974  		outputQueue:     make(chan outMsg, outputBufferSize),
1975  		sendQueue:       make(chan outMsg, 1), // nonblocking sync
1976  		sendDoneQueue:   qu.Ts(1),             // nonblocking sync
1977  		outputInvChan:   make(chan *wire.InvVect, outputBufferSize),
1978  		inQuit:          qu.T(),
1979  		queueQuit:       qu.T(),
1980  		outQuit:         qu.T(),
1981  		quit:            qu.T(),
1982  		cfg:             cfg, // Copy so caller can't mutate.
1983  		services:        cfg.Services,
1984  		protocolVersion: cfg.ProtocolVersion,
1985  		IP:              origCfg.IP,
1986  		Port:            origCfg.Port,
1987  	}
1988  	return &p
1989  }
1990  
1991  // NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin processing incoming and outgoing messages.
1992  func NewInboundPeer(cfg *Config) *Peer {
1993  	return newPeerBase(cfg, true)
1994  }
1995  
1996  // NewOutboundPeer returns a new outbound bitcoin peer.
1997  func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
1998  	p := newPeerBase(cfg, false)
1999  	p.addr = addr
2000  	host, portStr, e := net.SplitHostPort(addr)
2001  	if e != nil {
2002  		return nil, e
2003  	}
2004  	port, e := strconv.ParseUint(portStr, 10, 16)
2005  	if e != nil {
2006  		return nil, e
2007  	}
2008  	if cfg.HostToNetAddress != nil {
2009  		na, e := cfg.HostToNetAddress(host, uint16(port), 0)
2010  		if e != nil {
2011  			return nil, e
2012  		}
2013  		p.na = na
2014  	} else {
2015  		p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
2016  	}
2017  	return p, nil
2018  }
2019  
2020  func init() {
2021  	
2022  	rand.Seed(time.Now().UnixNano())
2023  }
2024