1 package peer
2 3 import (
4 "container/list"
5 "errors"
6 "fmt"
7 "github.com/p9c/p9/pkg/chaincfg"
8 "github.com/p9c/p9/pkg/log"
9 "io"
10 "math/rand"
11 "net"
12 "strconv"
13 "sync"
14 "sync/atomic"
15 "time"
16 17 "github.com/p9c/p9/pkg/qu"
18 19 "github.com/btcsuite/go-socks/socks"
20 21 "github.com/p9c/p9/pkg/blockchain"
22 "github.com/p9c/p9/pkg/chainhash"
23 "github.com/p9c/p9/pkg/wire"
24 )
25 26 const (
27 // MaxProtocolVersion is the max protocol version the peer supports.
28 MaxProtocolVersion = wire.FeeFilterVersion
29 // DefaultTrickleInterval is the min time between attempts to send an inv message to a peer.
30 DefaultTrickleInterval = time.Second
31 // MinAcceptableProtocolVersion is the lowest protocol version that a connected peer may support.
32 MinAcceptableProtocolVersion = 1
33 // outputBufferSize is the number of elements the output channels use.
34 outputBufferSize = 1000
35 // invTrickleSize is the maximum amount of inventory to send in a single message when trickling inventory to remote
36 // peers.
37 maxInvTrickleSize = 5000
38 // maxKnownInventory is the maximum number of items to keep in the known inventory cache.
39 maxKnownInventory = 30000
40 // pingInterval is the interval of time to wait in between sending ping messages.
41 pingInterval = 1 * time.Second
42 // negotiateTimeout is the duration of inactivity before we timeout a peer that hasn't completed the initial version
43 // negotiation.
44 negotiateTimeout = 27 * time.Second
45 // idleTimeout is the duration of inactivity before we time out a peer.
46 idleTimeout = time.Minute
47 // stallTickInterval is the interval of time between each check for stalled peers.
48 stallTickInterval = 60 * time.Second
49 // stallResponseTimeout is the base maximum amount of time messages that expect a response will wait before
50 // disconnecting the peer for stalling. The deadlines are adjusted for callback running times and checked on each
51 // stall tick interval.
52 stallResponseTimeout = 360 * time.Second
53 )
54 55 var (
56 // nodeCount is the total number of peer connections made since startup and is used to assign an id to a peer.
57 nodeCount int32
58 // zeroHash is the zero value hash (all zeros). It is defined as a convenience.
59 zeroHash chainhash.Hash
60 // SentNonces houses the unique nonces that are generated when pushing version messages that are used to detect self
61 // connections.
62 SentNonces = newMruNonceMap(50)
63 // AllowSelfConns is only used to allow the tests to bypass the self connection detecting and disconnect logic since
64 // they intentionally do so for testing purposes.
65 AllowSelfConns bool
66 )
67 68 // MessageListeners defines callback function pointers to invoke with message listeners for a peer. Any listener which
69 // is not set to a concrete callback during peer initialization is ignored.
70 //
71 // Execution of multiple message listeners occurs serially, so one callback blocks the execution of the next.
72 //
73 // NOTE: Unless otherwise documented, these listeners must NOT directly call any blocking calls ( such as
74 // WaitForShutdown) on the peer instance since the input handler goroutine blocks until the callback has completed.
75 // Doing so will result in a deadlock.
76 type MessageListeners struct {
77 // OnGetAddr is invoked when a peer receives a getaddr bitcoin message.
78 OnGetAddr func(p *Peer, msg *wire.MsgGetAddr)
79 // OnAddr is invoked when a peer receives an addr bitcoin message.
80 OnAddr func(p *Peer, msg *wire.MsgAddr)
81 // OnPing is invoked when a peer receives a ping bitcoin message.
82 OnPing func(p *Peer, msg *wire.MsgPing)
83 // OnPong is invoked when a peer receives a pong bitcoin message.
84 OnPong func(p *Peer, msg *wire.MsgPong)
85 // OnAlert is invoked when a peer receives an alert bitcoin message.
86 OnAlert func(p *Peer, msg *wire.MsgAlert)
87 // OnMemPool is invoked when a peer receives a mempool bitcoin message.
88 OnMemPool func(p *Peer, msg *wire.MsgMemPool)
89 // OnTx is invoked when a peer receives a tx bitcoin message.
90 OnTx func(p *Peer, msg *wire.MsgTx)
91 // OnBlock is invoked when a peer receives a block bitcoin message.
92 OnBlock func(p *Peer, msg *wire.Block, buf []byte)
93 // OnCFilter is invoked when a peer receives a cfilter bitcoin message.
94 OnCFilter func(p *Peer, msg *wire.MsgCFilter)
95 // OnCFHeaders is invoked when a peer receives a cfheaders bitcoin message.
96 OnCFHeaders func(p *Peer, msg *wire.MsgCFHeaders)
97 // OnCFCheckpt is invoked when a peer receives a cfcheckpt bitcoin message.
98 OnCFCheckpt func(p *Peer, msg *wire.MsgCFCheckpt)
99 // OnInv is invoked when a peer receives an inv bitcoin message.
100 OnInv func(p *Peer, msg *wire.MsgInv)
101 // OnHeaders is invoked when a peer receives a headers bitcoin message.
102 OnHeaders func(p *Peer, msg *wire.MsgHeaders)
103 // OnNotFound is invoked when a peer receives a notfound bitcoin message.
104 OnNotFound func(p *Peer, msg *wire.MsgNotFound)
105 // OnGetData is invoked when a peer receives a getdata bitcoin message.
106 OnGetData func(p *Peer, msg *wire.MsgGetData)
107 // OnGetBlocks is invoked when a peer receives a getblocks bitcoin message.
108 OnGetBlocks func(p *Peer, msg *wire.MsgGetBlocks)
109 // OnGetHeaders is invoked when a peer receives a getheaders bitcoin
110 // message.
111 OnGetHeaders func(p *Peer, msg *wire.MsgGetHeaders)
112 // OnGetCFilters is invoked when a peer receives a getcfilters bitcoin
113 // message.
114 OnGetCFilters func(p *Peer, msg *wire.MsgGetCFilters)
115 // OnGetCFHeaders is invoked when a peer receives a getcfheaders bitcoin
116 // message.
117 OnGetCFHeaders func(p *Peer, msg *wire.MsgGetCFHeaders)
118 // OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin
119 // message.
120 OnGetCFCheckpt func(p *Peer, msg *wire.MsgGetCFCheckpt)
121 // OnFeeFilter is invoked when a peer receives a feefilter bitcoin message.
122 OnFeeFilter func(p *Peer, msg *wire.MsgFeeFilter)
123 // OnFilterAdd is invoked when a peer receives a filteradd bitcoin message.
124 OnFilterAdd func(p *Peer, msg *wire.MsgFilterAdd)
125 // OnFilterClear is invoked when a peer receives a filterclear bitcoin
126 // message.
127 OnFilterClear func(p *Peer, msg *wire.MsgFilterClear)
128 // OnFilterLoad is invoked when a peer receives a filterload bitcoin
129 // message.
130 OnFilterLoad func(p *Peer, msg *wire.MsgFilterLoad)
131 // OnMerkleBlock is invoked when a peer receives a merkleblock bitcoin
132 // message.
133 OnMerkleBlock func(p *Peer, msg *wire.MsgMerkleBlock)
134 // OnVersion is invoked when a peer receives a version bitcoin message.
135 // The caller may return a reject message in which case the message will
136 // be sent to the peer and the peer will be disconnected.
137 OnVersion func(p *Peer, msg *wire.MsgVersion) *wire.MsgReject
138 // OnVerAck is invoked when a peer receives a verack bitcoin message.
139 OnVerAck func(p *Peer, msg *wire.MsgVerAck)
140 // OnReject is invoked when a peer receives a reject bitcoin message.
141 OnReject func(p *Peer, msg *wire.MsgReject)
142 // OnSendHeaders is invoked when a peer receives a sendheaders bitcoin
143 // message.
144 OnSendHeaders func(p *Peer, msg *wire.MsgSendHeaders)
145 // OnRead is invoked when a peer receives a bitcoin message.
146 //
147 // It consists of the number of bytes read, the message, and whether or not an error in the read occurred.
148 // Typically, callers will opt to use the callbacks for the specific message types, however this can be useful for
149 // circumstances such as keeping track of server-wide byte counts or working with custom message types for which the
150 // peer does not directly provide a callback.
151 OnRead func(p *Peer, bytesRead int, msg wire.Message, e error)
152 // OnWrite is invoked when we write a bitcoin message to a peer.
153 //
154 // It consists of the number of bytes written, the message, and whether or not an error in the write occurred. This
155 // can be useful for circumstances such as keeping track of server -wide byte counts.
156 OnWrite func(p *Peer, bytesWritten int, msg wire.Message, e error)
157 }
158 159 // Config is the struct to hold configuration options useful to Peer.
160 type Config struct {
161 // NewestBlock specifies a callback which provides the newest block details to the peer as needed.
162 //
163 // This can be nil in which case the peer will report a block height of 0, however it is good practice for peers to
164 // specify this so their currently best known is accurately reported.
165 NewestBlock HashFunc
166 // HostToNetAddress returns the netaddress for the given host. This can be nil in which case the host will be parsed
167 // as an IP address.
168 HostToNetAddress HostToNetAddrFunc
169 // Proxy indicates a proxy is being used for connections. The only effect this has is to prevent leaking the tor
170 // proxy address, so it only needs to specified if using a tor proxy.
171 Proxy string
172 // UserAgentName specifies the user agent name to advertise. It is highly recommended to specify this value.
173 UserAgentName string
174 // UserAgentVersion specifies the user agent version to advertise. It is highly recommended to specify this value
175 // and that it follows the form "major.minor.revision" e.g. "2.6.41".
176 UserAgentVersion string
177 // UserAgentComments specify the user agent comments to advertise. These values must not contain the illegal
178 // characters specified in BIP 14: '/', ':', '(', ')'.
179 UserAgentComments []string
180 // ChainParams identifies which chain parameters the peer is associated with. It is highly recommended to specify
181 // this field, however it can be omitted in which case the test network will be used.
182 ChainParams *chaincfg.Params
183 // Services specifies which services to advertise as supported by the local peer. This field can be omitted in which
184 // case it will be 0 and therefore advertise no supported services.
185 Services wire.ServiceFlag
186 // ProtocolVersion specifies the maximum protocol version to use and advertise. This field can be omitted in which
187 // case peer. MaxProtocolVersion will be used.
188 ProtocolVersion uint32
189 // DisableRelayTx specifies if the remote peer should be informed to not send inv messages for transactions.
190 DisableRelayTx bool
191 // Listeners houses callback functions to be invoked on receiving peer
192 // messages.
193 Listeners MessageListeners
194 // TrickleInterval is the duration of the ticker which trickles down the inventory to a peer.
195 TrickleInterval time.Duration
196 IP net.IP
197 Port uint16
198 }
199 200 // minUint32 is a helper function to return the minimum of two uint32s. This avoids a math import and the need to cast
201 // to floats.
202 func minUint32(a, b uint32) uint32 {
203 if a < b {
204 return a
205 }
206 return b
207 }
208 209 // newNetAddress attempts to extract the IP address and port from the passed net.Addr interface and create a bitcoin
210 // NetAddress structure using that information.
211 func newNetAddress(addr net.Addr, services wire.ServiceFlag) (*wire.NetAddress, error) {
212 // addr will be a net.TCPAddr when not using a proxy.
213 if tcpAddr, ok := addr.(*net.TCPAddr); ok {
214 ip := tcpAddr.IP
215 port := uint16(tcpAddr.Port)
216 na := wire.NewNetAddressIPPort(ip, port, services)
217 return na, nil
218 }
219 // addr will be a socks.ProxiedAddr when using a proxy.
220 if proxiedAddr, ok := addr.(*socks.ProxiedAddr); ok {
221 ip := net.ParseIP(proxiedAddr.Host)
222 if ip == nil {
223 ip = net.ParseIP("0.0.0.0")
224 }
225 port := uint16(proxiedAddr.Port)
226 na := wire.NewNetAddressIPPort(ip, port, services)
227 return na, nil
228 }
229 // For the most part, addr should be one of the two above cases, but to be safe, fall back to trying to parse the
230 // information from the address string as a last resort.
231 host, portStr, e := net.SplitHostPort(addr.String())
232 if e != nil {
233 return nil, e
234 }
235 ip := net.ParseIP(host)
236 port, e := strconv.ParseUint(portStr, 10, 16)
237 if e != nil {
238 return nil, e
239 }
240 na := wire.NewNetAddressIPPort(ip, uint16(port), services)
241 return na, nil
242 }
243 244 // outMsg is used to house a message to be sent along with a channel to signal when the message has been sent (or won't
245 // be sent due to things such as shutdown)
246 type outMsg struct {
247 msg wire.Message
248 doneChan chan<- struct{}
249 encoding wire.MessageEncoding
250 }
251 252 // stallControlCmd represents the command of a stall control message.
253 type stallControlCmd uint8
254 255 // Constants for the command of a stall control message.
256 const (
257 // sccSendMessage indicates a message is being sent to the remote peer.
258 sccSendMessage stallControlCmd = iota
259 // sccReceiveMessage indicates a message has been received from the
260 // remote peer.
261 sccReceiveMessage
262 // sccHandlerStart indicates a callback handler is about to be invoked.
263 sccHandlerStart
264 // sccHandlerStart indicates a callback handler has completed.
265 sccHandlerDone
266 )
267 268 // stallControlMsg is used to signal the stall handler about specific events so it can properly detect and handle
269 // stalled remote peers.
270 type stallControlMsg struct {
271 command stallControlCmd
272 message wire.Message
273 }
274 275 // StatsSnap is a snapshot of peer stats at a point in time.
276 type StatsSnap struct {
277 ID int32
278 Addr string
279 Services wire.ServiceFlag
280 LastSend time.Time
281 LastRecv time.Time
282 BytesSent uint64
283 BytesRecv uint64
284 ConnTime time.Time
285 TimeOffset int64
286 Version uint32
287 UserAgent string
288 Inbound bool
289 StartingHeight int32
290 LastBlock int32
291 LastPingNonce uint64
292 LastPingTime time.Time
293 LastPingMicros int64
294 }
295 296 // HashFunc is a function which returns a block hash, height and error It is used as a callback to get newest block
297 // details.
298 type HashFunc func() (hash *chainhash.Hash, height int32, e error)
299 300 // AddrFunc is a func which takes an address and returns a related address.
301 type AddrFunc func(remoteAddr *wire.NetAddress) *wire.NetAddress
302 303 // HostToNetAddrFunc is a func which takes a host, port, services and returns the netaddress.
304 type HostToNetAddrFunc func(
305 host string, port uint16,
306 services wire.ServiceFlag,
307 ) (*wire.NetAddress, error)
308 309 // NOTE: The overall data flow of a peer is split into 3 goroutines.
310 //
311 // Inbound messages are read via the inHandler goroutine and generally dispatched to their own handler.
312 //
313 // For inbound data-related messages such as blocks, transactions, and inventory, the data is handled by the
314 // corresponding message handlers.
315 //
316 // The data flow for outbound messages is split into 2 goroutines, queueHandler and outHandler. The first, queueHandler,
317 // is used as a way for external entities to queue messages, by way of the QueueMessage function, quickly regardless of
318 // whether the peer is currently sending or not. It acts as the traffic cop between the external world and the actual
319 // goroutine which writes to the network socket.
320 321 // Peer provides a basic concurrent safe bitcoin peer for handling bitcoin communications via the peer-to-peer protocol.
322 //
323 // It provides full duplex reading and writing, automatic handling of the initial handshake process, querying of usage
324 // statistics and other information about the remote peer such as its address, user agent, and protocol version, output
325 // message queuing, inventory trickling, and the ability to dynamically register and unregister callbacks for handling
326 // bitcoin protocol messages.
327 //
328 // Outbound messages are typically queued via QueueMessage or QueueInventory.
329 //
330 // QueueMessage is intended for all messages, including responses to data such as blocks and transactions.
331 //
332 // QueueInventory, on the other hand, is only intended for relaying inventory as it employs a trickling mechanism to
333 // batch the inventory together. However, some helper functions for pushing messages of specific types that typically
334 // require common special handling are provided as a convenience.
335 type Peer struct {
336 // The following variables must only be used atomically.
337 bytesReceived uint64
338 bytesSent uint64
339 lastRecv int64
340 lastSend int64
341 connected int32
342 disconnect int32
343 conn net.Conn
344 // These fields are set at creation time and never modified, so they are safe to read from concurrently without a
345 // mutex.
346 Nonce uint64
347 addr string
348 cfg Config
349 inbound bool
350 flagsMtx sync.Mutex // protects the peer flags below
351 na *wire.NetAddress
352 id int32
353 userAgent string
354 services wire.ServiceFlag
355 versionKnown bool
356 advertisedProtoVer uint32 // protocol version advertised by remote
357 protocolVersion uint32 // negotiated protocol version
358 sendHeadersPreferred bool // peer sent a sendheaders message
359 verAckReceived bool
360 witnessEnabled bool
361 wireEncoding wire.MessageEncoding
362 knownInventory *mruInventoryMap
363 prevGetBlocksMtx sync.Mutex
364 prevGetBlocksBegin *chainhash.Hash
365 prevGetBlocksStop *chainhash.Hash
366 prevGetHdrsMtx sync.Mutex
367 prevGetHdrsBegin *chainhash.Hash
368 prevGetHdrsStop *chainhash.Hash
369 // These fields keep track of statistics for the peer and are protected by the statsMtx mutex.
370 statsMtx sync.RWMutex
371 timeOffset int64
372 timeConnected time.Time
373 startingHeight int32
374 lastBlock int32
375 lastAnnouncedBlock *chainhash.Hash
376 lastPingNonce uint64 // Set to Nonce if we have a pending ping.
377 lastPingTime time.Time // Time we sent last ping.
378 lastPingMicros int64 // Time for last ping to return.
379 stallControl chan stallControlMsg
380 outputQueue chan outMsg
381 sendQueue chan outMsg
382 sendDoneQueue qu.C
383 outputInvChan chan *wire.InvVect
384 inQuit qu.C
385 queueQuit qu.C
386 outQuit qu.C
387 quit qu.C
388 IP net.IP
389 Port uint16
390 }
391 392 // String returns the peer's address and directionality as a human-readable string.
393 //
394 // This function is safe for concurrent access.
395 func (p *Peer) String() string {
396 return fmt.Sprintf("%s (%s)", p.addr, log.DirectionString(p.inbound))
397 }
398 399 // UpdateLastBlockHeight updates the last known block for the peer.
400 //
401 // This function is safe for concurrent access.
402 func (p *Peer) UpdateLastBlockHeight(newHeight int32) {
403 p.statsMtx.Lock()
404 T.F(
405 "updating last block height of peer %v from %v to %v",
406 p.addr,
407 p.lastBlock,
408 newHeight,
409 )
410 p.lastBlock = newHeight
411 p.statsMtx.Unlock()
412 }
413 414 // UpdateLastAnnouncedBlock updates meta-data about the last block hash this
415 // peer is known to have announced.
416 //
417 // This function is safe for concurrent access.
418 func (p *Peer) UpdateLastAnnouncedBlock(blkHash *chainhash.Hash) {
419 T.Ln("updating last blk for peer", p.addr, ",", blkHash)
420 p.statsMtx.Lock()
421 p.lastAnnouncedBlock = blkHash
422 p.statsMtx.Unlock()
423 }
424 425 // AddKnownInventory adds the passed inventory to the cache of known inventory for the peer.
426 //
427 // This function is safe for concurrent access.
428 func (p *Peer) AddKnownInventory(invVect *wire.InvVect) {
429 p.knownInventory.Add(invVect)
430 }
431 432 // StatsSnapshot returns a snapshot of the current peer flags and statistics.
433 //
434 // This function is safe for concurrent access.
435 func (p *Peer) StatsSnapshot() *StatsSnap {
436 p.statsMtx.RLock()
437 p.flagsMtx.Lock()
438 id := p.id
439 addr := p.addr
440 userAgent := p.userAgent
441 services := p.services
442 protocolVersion := p.advertisedProtoVer
443 p.flagsMtx.Unlock()
444 // Get a copy of all relevant flags and stats.
445 statsSnap := &StatsSnap{
446 ID: id,
447 Addr: addr,
448 UserAgent: userAgent,
449 Services: services,
450 LastSend: p.LastSend(),
451 LastRecv: p.LastRecv(),
452 BytesSent: p.BytesSent(),
453 BytesRecv: p.BytesReceived(),
454 ConnTime: p.timeConnected,
455 TimeOffset: p.timeOffset,
456 Version: protocolVersion,
457 Inbound: p.inbound,
458 StartingHeight: p.startingHeight,
459 LastBlock: p.lastBlock,
460 LastPingNonce: p.lastPingNonce,
461 LastPingMicros: p.lastPingMicros,
462 LastPingTime: p.lastPingTime,
463 }
464 p.statsMtx.RUnlock()
465 return statsSnap
466 }
467 468 // ID returns the peer id.
469 //
470 // This function is safe for concurrent access.
471 func (p *Peer) ID() int32 {
472 p.flagsMtx.Lock()
473 id := p.id
474 p.flagsMtx.Unlock()
475 return id
476 }
477 478 // NA returns the peer network address.
479 //
480 // This function is safe for concurrent access.
481 func (p *Peer) NA() *wire.NetAddress {
482 p.flagsMtx.Lock()
483 na := p.na
484 p.flagsMtx.Unlock()
485 return na
486 }
487 488 // Addr returns the peer address.
489 //
490 // This function is safe for concurrent access.
491 func (p *Peer) Addr() string {
492 // The address doesn't change after initialization, therefore it is not protected by a mutex.
493 return p.addr
494 }
495 496 // Inbound returns whether the peer is inbound. This function is safe for concurrent access.
497 func (p *Peer) Inbound() bool {
498 return p.inbound
499 }
500 501 // Services returns the services flag of the remote peer. This function is safe for concurrent access.
502 func (p *Peer) Services() wire.ServiceFlag {
503 p.flagsMtx.Lock()
504 services := p.services
505 p.flagsMtx.Unlock()
506 return services
507 }
508 509 // UserAgent returns the user agent of the remote peer.
510 //
511 // This function is safe for concurrent access.
512 func (p *Peer) UserAgent() string {
513 p.flagsMtx.Lock()
514 userAgent := p.userAgent
515 p.flagsMtx.Unlock()
516 return userAgent
517 }
518 519 // LastAnnouncedBlock returns the last announced block of the remote peer.
520 //
521 // This function is safe for concurrent access.
522 func (p *Peer) LastAnnouncedBlock() *chainhash.Hash {
523 p.statsMtx.RLock()
524 lastAnnouncedBlock := p.lastAnnouncedBlock
525 p.statsMtx.RUnlock()
526 return lastAnnouncedBlock
527 }
528 529 // LastPingNonce returns the last ping Nonce of the remote peer.
530 //
531 // This function is safe for concurrent access.
532 func (p *Peer) LastPingNonce() uint64 {
533 p.statsMtx.RLock()
534 lastPingNonce := p.lastPingNonce
535 p.statsMtx.RUnlock()
536 return lastPingNonce
537 }
538 539 // LastPingTime returns the last ping time of the remote peer.
540 //
541 // This function is safe for concurrent access.
542 func (p *Peer) LastPingTime() time.Time {
543 p.statsMtx.RLock()
544 lastPingTime := p.lastPingTime
545 p.statsMtx.RUnlock()
546 return lastPingTime
547 }
548 549 // LastPingMicros returns the last ping micros of the remote peer.
550 //
551 // This function is safe for concurrent access.
552 func (p *Peer) LastPingMicros() int64 {
553 p.statsMtx.RLock()
554 lastPingMicros := p.lastPingMicros
555 p.statsMtx.RUnlock()
556 return lastPingMicros
557 }
558 559 // VersionKnown returns the whether or not the version of a peer is known locally.
560 //
561 // This function is safe for concurrent access.
562 func (p *Peer) VersionKnown() bool {
563 p.flagsMtx.Lock()
564 versionKnown := p.versionKnown
565 p.flagsMtx.Unlock()
566 return versionKnown
567 }
568 569 // VerAckReceived returns whether or not a verack message was received by the peer.
570 //
571 // This function is safe for concurrent access.
572 func (p *Peer) VerAckReceived() bool {
573 p.flagsMtx.Lock()
574 verAckReceived := p.verAckReceived
575 p.flagsMtx.Unlock()
576 return verAckReceived
577 }
578 579 // ProtocolVersion returns the negotiated peer protocol version.
580 //
581 // This function is safe for concurrent access.
582 func (p *Peer) ProtocolVersion() uint32 {
583 p.flagsMtx.Lock()
584 protocolVersion := p.protocolVersion
585 p.flagsMtx.Unlock()
586 return protocolVersion
587 }
588 589 // LastBlock returns the last block of the peer.
590 //
591 // This function is safe for concurrent access.
592 func (p *Peer) LastBlock() int32 {
593 p.statsMtx.RLock()
594 lastBlock := p.lastBlock
595 p.statsMtx.RUnlock()
596 return lastBlock
597 }
598 599 // LastSend returns the last send time of the peer.
600 //
601 // This function is safe for concurrent access.
602 func (p *Peer) LastSend() time.Time {
603 return time.Unix(atomic.LoadInt64(&p.lastSend), 0)
604 }
605 606 // LastRecv returns the last recv time of the peer.
607 //
608 // This function is safe for concurrent access.
609 func (p *Peer) LastRecv() time.Time {
610 return time.Unix(atomic.LoadInt64(&p.lastRecv), 0)
611 }
612 613 // LocalAddr returns the local address of the connection.
614 //
615 // This function is safe fo concurrent access.
616 func (p *Peer) LocalAddr() net.Addr {
617 var localAddr net.Addr
618 if atomic.LoadInt32(&p.connected) != 0 {
619 localAddr = p.conn.LocalAddr()
620 }
621 return localAddr
622 }
623 624 // BytesSent returns the total number of bytes sent by the peer.
625 //
626 // This function is safe for concurrent access.
627 func (p *Peer) BytesSent() uint64 {
628 return atomic.LoadUint64(&p.bytesSent)
629 }
630 631 // BytesReceived returns the total number of bytes received by the peer.
632 //
633 // This function is safe for concurrent access.
634 func (p *Peer) BytesReceived() uint64 {
635 return atomic.LoadUint64(&p.bytesReceived)
636 }
637 638 // TimeConnected returns the time at which the peer connected.
639 //
640 // This function is safe for concurrent access.
641 func (p *Peer) TimeConnected() time.Time {
642 p.statsMtx.RLock()
643 timeConnected := p.timeConnected
644 p.statsMtx.RUnlock()
645 return timeConnected
646 }
647 648 // TimeOffset returns the number of seconds the local time was offset from the time the peer reported during the initial
649 // negotiation phase.
650 //
651 // Negative values indicate the remote peer's time is before the local time.
652 //
653 // This function is safe for concurrent access.
654 func (p *Peer) TimeOffset() int64 {
655 p.statsMtx.RLock()
656 timeOffset := p.timeOffset
657 p.statsMtx.RUnlock()
658 return timeOffset
659 }
660 661 // StartingHeight returns the last known height the peer reported during the initial negotiation phase. This function is
662 // safe for concurrent access.
663 func (p *Peer) StartingHeight() int32 {
664 p.statsMtx.RLock()
665 startingHeight := p.startingHeight
666 p.statsMtx.RUnlock()
667 return startingHeight
668 }
669 670 // WantsHeaders returns if the peer wants header messages instead of inventory vectors for blocks. This function is safe
671 // for concurrent access.
672 func (p *Peer) WantsHeaders() bool {
673 p.flagsMtx.Lock()
674 sendHeadersPreferred := p.sendHeadersPreferred
675 p.flagsMtx.Unlock()
676 return sendHeadersPreferred
677 }
678 679 // // IsWitnessEnabled returns true if the peer has signalled that it supports
680 // // segregated witness. This function is safe for concurrent access.
681 // func (p *Peer) IsWitnessEnabled() bool {
682 // p.flagsMtx.Lock()
683 // witnessEnabled := p.witnessEnabled
684 // p.flagsMtx.Unlock()
685 // return witnessEnabled
686 // }
687 688 // PushAddrMsg sends an addr message to the connected peer using the provided
689 // addresses.
690 //
691 // This function is useful over manually sending the message via QueueMessage since it automatically limits the
692 // addresses to the maximum number allowed by the message and randomizes the chosen addresses when there are too many.
693 //
694 // It returns the addresses that were actually sent and no message will be sent if there are no entries in the provided
695 // addresses slice. This function is safe for concurrent access.
696 func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) ([]*wire.NetAddress, error) {
697 addressCount := len(addresses)
698 // Nothing to send.
699 if addressCount == 0 {
700 return nil, nil
701 }
702 msg := wire.NewMsgAddr()
703 msg.AddrList = make([]*wire.NetAddress, addressCount)
704 copy(msg.AddrList, addresses)
705 // Randomize the addresses sent if there are more than the maximum allowed.
706 if addressCount > wire.MaxAddrPerMsg {
707 // Shuffle the address list.
708 for i := 0; i < wire.MaxAddrPerMsg; i++ {
709 j := i + rand.Intn(addressCount-i)
710 msg.AddrList[i], msg.AddrList[j] = msg.AddrList[j], msg.AddrList[i]
711 }
712 // Truncate it to the maximum size.
713 msg.AddrList = msg.AddrList[:wire.MaxAddrPerMsg]
714 }
715 p.QueueMessage(msg, nil)
716 return msg.AddrList, nil
717 }
718 719 // PushGetBlocksMsg sends a getblocks message for the provided block locator and stop hash. It will ignore back-to-back
720 // duplicate requests.
721 //
722 // This function is safe for concurrent access.
723 func (p *Peer) PushGetBlocksMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) (e error) {
724 // Extract the begin hash from the block locator, if one was specified, to use for filtering duplicate getblocks
725 // requests.
726 var beginHash *chainhash.Hash
727 if len(locator) > 0 {
728 beginHash = locator[0]
729 }
730 // Filter duplicate getblocks requests.
731 p.prevGetBlocksMtx.Lock()
732 isDuplicate := p.prevGetBlocksStop != nil && p.prevGetBlocksBegin != nil &&
733 beginHash != nil && stopHash.IsEqual(p.prevGetBlocksStop) &&
734 beginHash.IsEqual(p.prevGetBlocksBegin)
735 p.prevGetBlocksMtx.Unlock()
736 if isDuplicate {
737 T.F("filtering duplicate [getblocks] with begin hash %v, stop hash %v", beginHash, stopHash)
738 return nil
739 }
740 // Construct the getblocks request and queue it to be sent.
741 msg := wire.NewMsgGetBlocks(stopHash)
742 for _, hash := range locator {
743 e := msg.AddBlockLocatorHash(hash)
744 if e != nil {
745 return e
746 }
747 }
748 p.QueueMessage(msg, nil)
749 // Update the previous getblocks request information for filtering duplicates.
750 p.prevGetBlocksMtx.Lock()
751 p.prevGetBlocksBegin = beginHash
752 p.prevGetBlocksStop = stopHash
753 p.prevGetBlocksMtx.Unlock()
754 return nil
755 }
756 757 // PushGetHeadersMsg sends a getblocks message for the provided block locator and stop hash. It will ignore back-to-back
758 // duplicate requests.
759 //
760 // This function is safe for concurrent access.
761 func (p *Peer) PushGetHeadersMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) (e error) {
762 // Extract the begin hash from the block locator, if one was specified, to use for filtering duplicate getheaders
763 // requests.
764 var beginHash *chainhash.Hash
765 if len(locator) > 0 {
766 beginHash = locator[0]
767 }
768 // Filter duplicate getheaders requests.
769 p.prevGetHdrsMtx.Lock()
770 isDuplicate := p.prevGetHdrsStop != nil && p.prevGetHdrsBegin != nil &&
771 beginHash != nil && stopHash.IsEqual(p.prevGetHdrsStop) &&
772 beginHash.IsEqual(p.prevGetHdrsBegin)
773 p.prevGetHdrsMtx.Unlock()
774 if isDuplicate {
775 T.Ln(
776 "Filtering duplicate [getheaders] with begin hash", beginHash,
777 )
778 return nil
779 }
780 // Construct the getheaders request and queue it to be sent.
781 msg := wire.NewMsgGetHeaders()
782 msg.HashStop = *stopHash
783 for _, hash := range locator {
784 e := msg.AddBlockLocatorHash(hash)
785 if e != nil {
786 return e
787 }
788 }
789 p.QueueMessage(msg, nil)
790 // Update the previous getheaders request information for filtering
791 // duplicates.
792 p.prevGetHdrsMtx.Lock()
793 p.prevGetHdrsBegin = beginHash
794 p.prevGetHdrsStop = stopHash
795 p.prevGetHdrsMtx.Unlock()
796 return nil
797 }
798 799 // PushRejectMsg sends a reject message for the provided command, reject code, reject reason, and hash.
800 //
801 // The hash will only be used when the command is a tx or block and should be nil in other cases.
802 //
803 // The wait parameter will cause the function to block until the reject message has actually been sent. This function is
804 // safe for concurrent access.
805 func (p *Peer) PushRejectMsg(command string, code wire.RejectCode, reason string, hash *chainhash.Hash, wait bool) {
806 // Don't bother sending the reject message if the protocol version is too
807 // low.
808 if p.VersionKnown() && p.ProtocolVersion() < wire.RejectVersion {
809 return
810 }
811 msg := wire.NewMsgReject(command, code, reason)
812 if command == wire.CmdTx || command == wire.CmdBlock {
813 if hash == nil {
814 W.Ln(
815 "Sending a reject message for command type", command,
816 "which should have specified a hash but does not",
817 )
818 hash = &zeroHash
819 }
820 msg.Hash = *hash
821 }
822 // Send the message without waiting if the caller has not requested it.
823 if !wait {
824 p.QueueMessage(msg, nil)
825 return
826 }
827 // Send the message and block until it has been sent before returning.
828 doneChan := qu.Ts(1)
829 p.QueueMessage(msg, doneChan)
830 <-doneChan
831 }
832 833 // handlePingMsg is invoked when a peer receives a ping bitcoin message.
834 // For recent clients (protocol version > BIP0031Version),
835 // it replies with a pong message. For older clients,
836 // it does nothing and anything other than failure is considered a successful
837 // ping.
838 func (p *Peer) handlePingMsg(msg *wire.MsgPing) {
839 // Only reply with pong if the message is from a new enough client.
840 if p.ProtocolVersion() > wire.BIP0031Version {
841 // Include Nonce from ping so pong can be identified.
842 p.QueueMessage(wire.NewMsgPong(msg.Nonce), nil)
843 }
844 }
845 846 // handlePongMsg is invoked when a peer receives a pong bitcoin message. It updates the ping statistics as required for
847 // recent clients (protocol version > BIP0031Version). There is no effect for older clients or when a ping was not
848 // previously sent.
849 func (p *Peer) handlePongMsg(msg *wire.MsgPong) {
850 // Arguably we could use a buffered channel here sending data in a fifo manner whenever we send a ping, or a list
851 // keeping track of the times of each ping.
852 //
853 // For now we just make a best effort and only record stats if it was for the last ping sent. Any preceding and
854 // overlapping pings will be ignored. It is unlikely to occur without large usage of the ping rpc call since we ping
855 // infrequently enough that if they overlap we would have timed out the peer.
856 if p.ProtocolVersion() > wire.BIP0031Version {
857 p.statsMtx.Lock()
858 if p.lastPingNonce != 0 && msg.Nonce == p.lastPingNonce {
859 p.lastPingMicros = time.Since(p.lastPingTime).Nanoseconds()
860 p.lastPingMicros /= 1000 // convert to microseconds.
861 p.lastPingNonce = 0
862 }
863 p.statsMtx.Unlock()
864 }
865 }
866 867 // readMessage reads the next bitcoin message from the peer with logging.
868 func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, error) {
869 n, msg, buf, e := wire.ReadMessageWithEncodingN(
870 p.conn,
871 p.ProtocolVersion(), p.cfg.ChainParams.Net, encoding,
872 )
873 atomic.AddUint64(&p.bytesReceived, uint64(n))
874 if p.cfg.Listeners.OnRead != nil {
875 p.cfg.Listeners.OnRead(p, n, msg, e)
876 }
877 if e != nil {
878 T.Ln(e)
879 return nil, nil, e
880 }
881 // // Use closures to log expensive operations so they are only run when the logging level requires it.
882 T.C(
883 func() (o string) {
884 // Debug summary of message.
885 summary := messageSummary(msg)
886 if len(summary) > 0 {
887 summary = " (" + summary + ")"
888 }
889 o = fmt.Sprintf(
890 "Received %v%s from %s",
891 msg.Command(), summary, p,
892 )
893 // o += spew.Sdump(msg)
894 // o += spew.Sdump(buf)
895 return o
896 },
897 )
898 return msg, buf, nil
899 }
900 901 // writeMessage sends a bitcoin message to the peer with logging.
902 func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) (e error) {
903 // Don't do anything if we're disconnecting.
904 if atomic.LoadInt32(&p.disconnect) != 0 {
905 return nil
906 }
907 // // Use closures to log expensive operations so they are only run when the logging level requires it.
908 T.C(
909 func() (o string) {
910 // Debug summary of message.
911 summary := messageSummary(msg)
912 if len(summary) > 0 {
913 summary = " (" + summary + ")"
914 }
915 o = fmt.Sprintf(
916 "Sending %v%s to %s", msg.Command(),
917 summary, p,
918 )
919 // o += spew.Sdump(msg)
920 // var buf bytes.Buffer
921 // _, e := wire.WriteMessageWithEncodingN(
922 // &buf, msg, p.ProtocolVersion(),
923 // p.cfg.ChainParams.Net, enc,
924 // )
925 // if e != nil {
926 // return e.Error()
927 // }
928 // o += spew.Sdump(buf.Bytes())
929 return
930 },
931 )
932 cmd := msg.Command()
933 if cmd != "ping" && cmd != "pong" && cmd != "inv" {
934 D.C(
935 func() string {
936 // Debug summary of message.
937 summary := messageSummary(msg)
938 if len(summary) > 0 {
939 summary = " (" + summary + ")"
940 }
941 o := fmt.Sprintf("Sending %v%s to %s", msg.Command(), summary, p)
942 // o += spew.Sdump(msg)
943 // var buf bytes.Buffer
944 // _, e = wire.WriteMessageWithEncodingN(&buf, msg, p.ProtocolVersion(), p.cfg.ChainParams.Net, enc)
945 // if e != nil {
946 // // return e.Error()
947 // }
948 return o // + spew.Sdump(buf.Bytes())
949 },
950 )
951 }
952 // Write the message to the peer.
953 n, e := wire.WriteMessageWithEncodingN(
954 p.conn, msg,
955 p.ProtocolVersion(), p.cfg.ChainParams.Net, enc,
956 )
957 atomic.AddUint64(&p.bytesSent, uint64(n))
958 if p.cfg.Listeners.OnWrite != nil {
959 p.cfg.Listeners.OnWrite(p, n, msg, e)
960 }
961 return e
962 }
963 964 // isAllowedReadError returns whether or not the passed error is allowed without disconnecting the peer. In particular,
965 // regression tests need to be allowed to send malformed messages without the peer being disconnected.
966 func (p *Peer) isAllowedReadError(e error) bool {
967 // Only allow read errors in regression test mode.
968 if p.cfg.ChainParams.Net != wire.TestNet {
969 return false
970 }
971 // Don't allow the error if it's not specifically a malformed message error.
972 if _, ok := e.(*wire.MessageError); !ok {
973 return false
974 }
975 // Don't allow the error if it's not coming from localhost or the hostname can't be determined for some reason.
976 var host string
977 host, _, e = net.SplitHostPort(p.addr)
978 if e != nil {
979 return false
980 }
981 if host != "127.0.0.1" && host != "localhost" {
982 return false
983 }
984 // Allowed if all checks passed.
985 return true
986 }
987 988 // shouldHandleReadError returns whether or not the passed error, which is expected to have come from reading from the
989 // remote peer in the inHandler, should be logged and responded to with a reject message.
990 func (p *Peer) shouldHandleReadError(e error) bool {
991 // No logging or reject message when the peer is being forcibly disconnected.
992 if atomic.LoadInt32(&p.disconnect) != 0 {
993 return false
994 }
995 // No logging or reject message when the remote peer has been disconnected.
996 if e == io.EOF {
997 return false
998 }
999 if opErr, ok := e.(*net.OpError); ok && !opErr.Temporary() {
1000 return false
1001 }
1002 return true
1003 }
1004 1005 // maybeAddDeadline potentially adds a deadline for the appropriate expected response for the passed wire protocol
1006 // command to the pending responses map.
1007 func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd string) {
1008 // Setup a deadline for each message being sent that expects a response.
1009 //
1010 // NOTE: Pings are intentionally ignored here since they are typically sent asynchronously and as a result of a long
1011 // backlog of messages, such as is typical in the case of initial block download, the response won't be received in
1012 // time.
1013 deadline := time.Now().Add(stallResponseTimeout)
1014 switch msgCmd {
1015 case wire.CmdVersion:
1016 // Expects a verack message.
1017 pendingResponses[wire.CmdVerAck] = deadline
1018 case wire.CmdMemPool:
1019 // Expects an inv message.
1020 pendingResponses[wire.CmdInv] = deadline
1021 case wire.CmdGetBlocks:
1022 // Expects an inv message.
1023 pendingResponses[wire.CmdInv] = deadline
1024 case wire.CmdGetData:
1025 // Expects a block, merkleblock, tx, or notfound message.
1026 pendingResponses[wire.CmdBlock] = deadline
1027 pendingResponses[wire.CmdMerkleBlock] = deadline
1028 pendingResponses[wire.CmdTx] = deadline
1029 pendingResponses[wire.CmdNotFound] = deadline
1030 case wire.CmdGetHeaders:
1031 // Expects a headers message.
1032 //
1033 // Use a longer deadline since it can take a while for the remote peer to load all of the headers.
1034 deadline = time.Now().Add(stallResponseTimeout * 3)
1035 pendingResponses[wire.CmdHeaders] = deadline
1036 }
1037 }
1038 1039 // stallHandler handles stall detection for the peer.
1040 //
1041 // This entails keeping track of expected responses and assigning them deadlines while accounting for the time spent in
1042 // callbacks.
1043 //
1044 // It must be run as a goroutine.
1045 func (p *Peer) stallHandler() {
1046 T.Ln("starting stallHandler for", p.addr)
1047 // These variables are used to adjust the deadline times forward by the time it takes callbacks to execute.
1048 //
1049 // This is done because new messages aren't read until the previous one is finished processing (which includes
1050 // callbacks), so the deadline for receiving a response for a given message must account for the processing time as
1051 // well.
1052 var handlerActive bool
1053 var handlersStartTime time.Time
1054 var deadlineOffset time.Duration
1055 // pendingResponses tracks the expected response deadline times.
1056 pendingResponses := make(map[string]time.Time)
1057 // stallTicker is used to periodically check pending responses that have exceeded the expected deadline and
1058 // disconnect the peer due to stalling.
1059 stallTicker := time.NewTicker(stallTickInterval)
1060 defer stallTicker.Stop()
1061 // ioStopped is used to detect when both the input and output handler goroutines are done.
1062 var ioStopped bool
1063 out:
1064 for {
1065 select {
1066 case msg := <-p.stallControl:
1067 switch msg.command {
1068 case sccSendMessage:
1069 // Add a deadline for the expected response message if needed.
1070 p.maybeAddDeadline(
1071 pendingResponses,
1072 msg.message.Command(),
1073 )
1074 case sccReceiveMessage:
1075 // Remove received messages from the expected response map.
1076 //
1077 // Since certain commands expect one of a group of responses, remove everything in the expected group
1078 // accordingly.
1079 switch msgCmd := msg.message.Command(); msgCmd {
1080 case wire.CmdBlock:
1081 fallthrough
1082 case wire.CmdMerkleBlock:
1083 fallthrough
1084 case wire.CmdTx:
1085 fallthrough
1086 case wire.CmdNotFound:
1087 delete(pendingResponses, wire.CmdBlock)
1088 delete(pendingResponses, wire.CmdMerkleBlock)
1089 delete(pendingResponses, wire.CmdTx)
1090 delete(pendingResponses, wire.CmdNotFound)
1091 default:
1092 delete(pendingResponses, msgCmd)
1093 }
1094 case sccHandlerStart:
1095 // Warn on unbalanced callback signalling.
1096 if handlerActive {
1097 W.Ln(
1098 "Received handler start control command while a handler is already active",
1099 )
1100 continue
1101 }
1102 handlerActive = true
1103 handlersStartTime = time.Now()
1104 case sccHandlerDone:
1105 // Warn on unbalanced callback signalling.
1106 if !handlerActive {
1107 W.Ln(
1108 "Received handler done control command when a handler is not already active",
1109 )
1110 continue
1111 }
1112 // Extend active deadlines by the time it took to execute the callback.
1113 duration := time.Since(handlersStartTime)
1114 deadlineOffset += duration
1115 handlerActive = false
1116 default:
1117 W.Ln(
1118 "Unsupported message command", msg.command,
1119 )
1120 }
1121 case <-stallTicker.C:
1122 // Calculate the offset to apply to the deadline based on how long the handlers have taken to execute since
1123 // the last tick.
1124 now := time.Now()
1125 offset := deadlineOffset
1126 if handlerActive {
1127 offset += now.Sub(handlersStartTime)
1128 }
1129 // Disconnect the peer if any of the pending responses don't arrive by their adjusted deadline.
1130 for command, deadline := range pendingResponses {
1131 if now.Before(deadline.Add(offset)) {
1132 continue
1133 }
1134 D.F(
1135 "Peer %s appears to be stalled or misbehaving, %s timeout -- disconnecting",
1136 p,
1137 command,
1138 )
1139 p.Disconnect()
1140 break
1141 }
1142 // Reset the deadline offset for the next tick.
1143 deadlineOffset = 0
1144 case <-p.inQuit.Wait():
1145 // The stall handler can exit once both the input and output handler goroutines are done.
1146 if ioStopped {
1147 break out
1148 }
1149 ioStopped = true
1150 case <-p.outQuit.Wait():
1151 // The stall handler can exit once both the input and output handler goroutines are done.
1152 if ioStopped {
1153 break out
1154 }
1155 ioStopped = true
1156 }
1157 }
1158 // Drain any wait channels before going away so there is nothing left waiting on this goroutine.
1159 cleanup:
1160 for {
1161 select {
1162 case <-p.stallControl:
1163 default:
1164 break cleanup
1165 }
1166 }
1167 T.Ln("peer stall handler done for", p)
1168 }
1169 1170 // inHandler handles all incoming messages for the peer.
1171 //
1172 // It must be run as a goroutine.
1173 func (p *Peer) inHandler() {
1174 T.Ln("starting inHandler for", p.addr)
1175 // The timer is stopped when a new message is received and reset after it is processed.
1176 idleTimer := time.AfterFunc(
1177 idleTimeout, func() {
1178 W.F("peer %s no answer for %s -- disconnecting", p, idleTimeout)
1179 p.Disconnect()
1180 },
1181 )
1182 out:
1183 for atomic.LoadInt32(&p.disconnect) == 0 {
1184 // Read a message and stop the idle timer as soon as the read is done. The timer is reset below for the next
1185 // iteration if needed.
1186 rMsg, buf, e := p.readMessage(p.wireEncoding)
1187 idleTimer.Stop()
1188 if e != nil {
1189 T.Ln(e)
1190 // In order to allow regression tests with malformed messages, don't disconnect the peer when we're in
1191 // regression test mode and the error is one of the allowed errors.
1192 if p.isAllowedReadError(e) {
1193 E.F("allowed test error from %s: %v", p, e)
1194 idleTimer.Reset(idleTimeout)
1195 continue
1196 }
1197 // Only log the error and send reject message if the local peer is not forcibly disconnecting and the remote
1198 // peer has not disconnected.
1199 if p.shouldHandleReadError(e) {
1200 errMsg := fmt.Sprintf("Can't read message from %s: %v", p, e)
1201 if e != io.ErrUnexpectedEOF {
1202 E.Ln(errMsg)
1203 }
1204 // Push a reject message for the malformed message and wait for the message to be sent before
1205 // disconnecting.
1206 //
1207 // NOTE: Ideally this would include the command in the header if at least that much of the message was
1208 // valid, but that is not currently exposed by wire, so just used malformed for the command.
1209 p.PushRejectMsg("malformed", wire.RejectMalformed, errMsg, nil, true)
1210 }
1211 break out
1212 }
1213 atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
1214 p.stallControl <- stallControlMsg{sccReceiveMessage, rMsg}
1215 // Handle each supported message type.
1216 p.stallControl <- stallControlMsg{sccHandlerStart, rMsg}
1217 switch msg := rMsg.(type) {
1218 case *wire.MsgVersion:
1219 // Limit to one version message per peer.
1220 p.PushRejectMsg(
1221 msg.Command(), wire.RejectDuplicate,
1222 "duplicate version message", nil, true,
1223 )
1224 break out
1225 case *wire.MsgVerAck:
1226 // No read lock is necessary because verAckReceived is not written to in any other goroutine.
1227 //
1228 // Because of the potential for an attacker to use the UAC based node identifiers to cause a peer to
1229 // disconnect from the attacked node, we have commented this thing out.
1230 //
1231 // if p.verAckReceived {
1232 // I.F("already received 'verack' from peer %v"+
1233 // " -- disconnecting", p)
1234 // break out
1235 // }
1236 //
1237 // because of the commented section above, we won't run this if the peer is already marked
1238 // VerAckReceived. This basically responds to spurious veracks by dropping them
1239 if !p.verAckReceived {
1240 p.flagsMtx.Lock()
1241 p.verAckReceived = true
1242 p.flagsMtx.Unlock()
1243 if p.cfg.Listeners.OnVerAck != nil {
1244 p.cfg.Listeners.OnVerAck(p, msg)
1245 }
1246 }
1247 case *wire.MsgGetAddr:
1248 if p.cfg.Listeners.OnGetAddr != nil {
1249 p.cfg.Listeners.OnGetAddr(p, msg)
1250 }
1251 case *wire.MsgAddr:
1252 if p.cfg.Listeners.OnAddr != nil {
1253 p.cfg.Listeners.OnAddr(p, msg)
1254 }
1255 case *wire.MsgPing:
1256 p.handlePingMsg(msg)
1257 if p.cfg.Listeners.OnPing != nil {
1258 p.cfg.Listeners.OnPing(p, msg)
1259 }
1260 case *wire.MsgPong:
1261 p.handlePongMsg(msg)
1262 if p.cfg.Listeners.OnPong != nil {
1263 p.cfg.Listeners.OnPong(p, msg)
1264 }
1265 case *wire.MsgAlert:
1266 if p.cfg.Listeners.OnAlert != nil {
1267 p.cfg.Listeners.OnAlert(p, msg)
1268 }
1269 case *wire.MsgMemPool:
1270 if p.cfg.Listeners.OnMemPool != nil {
1271 p.cfg.Listeners.OnMemPool(p, msg)
1272 }
1273 case *wire.MsgTx:
1274 if p.cfg.Listeners.OnTx != nil {
1275 p.cfg.Listeners.OnTx(p, msg)
1276 }
1277 case *wire.Block:
1278 if p.cfg.Listeners.OnBlock != nil {
1279 p.cfg.Listeners.OnBlock(p, msg, buf)
1280 }
1281 case *wire.MsgInv:
1282 if p.cfg.Listeners.OnInv != nil {
1283 p.cfg.Listeners.OnInv(p, msg)
1284 }
1285 case *wire.MsgHeaders:
1286 if p.cfg.Listeners.OnHeaders != nil {
1287 p.cfg.Listeners.OnHeaders(p, msg)
1288 }
1289 case *wire.MsgNotFound:
1290 if p.cfg.Listeners.OnNotFound != nil {
1291 p.cfg.Listeners.OnNotFound(p, msg)
1292 }
1293 case *wire.MsgGetData:
1294 if p.cfg.Listeners.OnGetData != nil {
1295 p.cfg.Listeners.OnGetData(p, msg)
1296 }
1297 case *wire.MsgGetBlocks:
1298 if p.cfg.Listeners.OnGetBlocks != nil {
1299 p.cfg.Listeners.OnGetBlocks(p, msg)
1300 }
1301 case *wire.MsgGetHeaders:
1302 if p.cfg.Listeners.OnGetHeaders != nil {
1303 p.cfg.Listeners.OnGetHeaders(p, msg)
1304 }
1305 case *wire.MsgGetCFilters:
1306 if p.cfg.Listeners.OnGetCFilters != nil {
1307 p.cfg.Listeners.OnGetCFilters(p, msg)
1308 }
1309 case *wire.MsgGetCFHeaders:
1310 if p.cfg.Listeners.OnGetCFHeaders != nil {
1311 p.cfg.Listeners.OnGetCFHeaders(p, msg)
1312 }
1313 case *wire.MsgGetCFCheckpt:
1314 if p.cfg.Listeners.OnGetCFCheckpt != nil {
1315 p.cfg.Listeners.OnGetCFCheckpt(p, msg)
1316 }
1317 case *wire.MsgCFilter:
1318 if p.cfg.Listeners.OnCFilter != nil {
1319 p.cfg.Listeners.OnCFilter(p, msg)
1320 }
1321 case *wire.MsgCFHeaders:
1322 if p.cfg.Listeners.OnCFHeaders != nil {
1323 p.cfg.Listeners.OnCFHeaders(p, msg)
1324 }
1325 case *wire.MsgFeeFilter:
1326 if p.cfg.Listeners.OnFeeFilter != nil {
1327 p.cfg.Listeners.OnFeeFilter(p, msg)
1328 }
1329 case *wire.MsgFilterAdd:
1330 if p.cfg.Listeners.OnFilterAdd != nil {
1331 p.cfg.Listeners.OnFilterAdd(p, msg)
1332 }
1333 case *wire.MsgFilterClear:
1334 if p.cfg.Listeners.OnFilterClear != nil {
1335 p.cfg.Listeners.OnFilterClear(p, msg)
1336 }
1337 case *wire.MsgFilterLoad:
1338 if p.cfg.Listeners.OnFilterLoad != nil {
1339 p.cfg.Listeners.OnFilterLoad(p, msg)
1340 }
1341 case *wire.MsgMerkleBlock:
1342 if p.cfg.Listeners.OnMerkleBlock != nil {
1343 p.cfg.Listeners.OnMerkleBlock(p, msg)
1344 }
1345 case *wire.MsgReject:
1346 if p.cfg.Listeners.OnReject != nil {
1347 p.cfg.Listeners.OnReject(p, msg)
1348 }
1349 case *wire.MsgSendHeaders:
1350 p.flagsMtx.Lock()
1351 p.sendHeadersPreferred = true
1352 p.flagsMtx.Unlock()
1353 if p.cfg.Listeners.OnSendHeaders != nil {
1354 p.cfg.Listeners.OnSendHeaders(p, msg)
1355 }
1356 default:
1357 D.F(
1358 "Received unhandled message of type %v from %v %s",
1359 rMsg.Command(),
1360 p,
1361 )
1362 }
1363 p.stallControl <- stallControlMsg{sccHandlerDone, rMsg}
1364 // A message was received so reset the idle timer.
1365 idleTimer.Reset(idleTimeout)
1366 }
1367 // Ensure the idle timer is stopped to avoid leaking the resource.
1368 idleTimer.Stop()
1369 // Ensure connection is closed.
1370 p.Disconnect()
1371 p.inQuit.Q()
1372 T.Ln("peer input handler done for", p)
1373 }
1374 1375 // queueHandler handles the queuing of outgoing data for the peer.
1376 //
1377 // This runs as a muxer for various sources of input so we can ensure that server and peer handlers will not block on us
1378 // sending a message.
1379 //
1380 // That data is then passed on outHandler to be actually written.
1381 func (p *Peer) queueHandler() {
1382 T.Ln("starting queueHandler for", p.addr)
1383 pendingMsgs := list.New()
1384 invSendQueue := list.New()
1385 trickleTicker := time.NewTicker(p.cfg.TrickleInterval)
1386 defer trickleTicker.Stop()
1387 // We keep the waiting flag so that we know if we have a message queued to the outHandler or not.
1388 //
1389 // We could use the presence of a head of the list for this but then we have rather racy concerns about whether it
1390 // has gotten it at cleanup time - and thus who sends on the message's done channel.
1391 //
1392 // To avoid such confusion we keep a different flag and pendingMsgs only contains messages that we have not yet
1393 // passed to outHandler.
1394 waiting := false
1395 // To avoid duplication below.
1396 queuePacket := func(msg outMsg, list *list.List, waiting bool) bool {
1397 if !waiting {
1398 p.sendQueue <- msg
1399 } else {
1400 list.PushBack(msg)
1401 }
1402 // we are always waiting now.
1403 return true
1404 }
1405 out:
1406 for {
1407 select {
1408 case msg := <-p.outputQueue:
1409 waiting = queuePacket(msg, pendingMsgs, waiting)
1410 // This channel is notified when a message has been sent across the network socket.
1411 case <-p.sendDoneQueue.Wait():
1412 // No longer waiting if there are no more messages in the pending messages queue.
1413 next := pendingMsgs.Front()
1414 if next == nil {
1415 waiting = false
1416 continue
1417 }
1418 // Notify the outHandler about the next item to asynchronously send.
1419 val := pendingMsgs.Remove(next)
1420 p.sendQueue <- val.(outMsg)
1421 case iv := <-p.outputInvChan:
1422 // No handshake? They'll find out soon enough.
1423 if p.VersionKnown() {
1424 // If this is a new block, then we'll blast it out immediately, sipping the inv trickle queue.
1425 if iv.Type == wire.InvTypeBlock {
1426 invMsg := wire.NewMsgInvSizeHint(1)
1427 e := invMsg.AddInvVect(iv)
1428 if e != nil {
1429 D.Ln(e)
1430 }
1431 waiting = queuePacket(
1432 outMsg{msg: invMsg},
1433 pendingMsgs, waiting,
1434 )
1435 } else {
1436 invSendQueue.PushBack(iv)
1437 }
1438 }
1439 case <-trickleTicker.C:
1440 // Don't send anything if we're disconnecting or there is no queued inventory. version is known if send
1441 // queue has any entries.
1442 if atomic.LoadInt32(&p.disconnect) != 0 ||
1443 invSendQueue.Len() == 0 {
1444 continue
1445 }
1446 // Create and send as many inv messages as needed to drain the inventory send queue.
1447 invMsg := wire.NewMsgInvSizeHint(uint(invSendQueue.Len()))
1448 for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() {
1449 iv := invSendQueue.Remove(e).(*wire.InvVect)
1450 // Don't send inventory that became known after the initial check.
1451 if p.knownInventory.Exists(iv) {
1452 continue
1453 }
1454 e := invMsg.AddInvVect(iv)
1455 if e != nil {
1456 D.Ln(e)
1457 }
1458 if len(invMsg.InvList) >= maxInvTrickleSize {
1459 waiting = queuePacket(
1460 outMsg{msg: invMsg},
1461 pendingMsgs, waiting,
1462 )
1463 invMsg = wire.NewMsgInvSizeHint(uint(invSendQueue.Len()))
1464 }
1465 // Add the inventory that is being relayed to the known inventory for the peer.
1466 p.AddKnownInventory(iv)
1467 }
1468 if len(invMsg.InvList) > 0 {
1469 waiting = queuePacket(
1470 outMsg{msg: invMsg},
1471 pendingMsgs, waiting,
1472 )
1473 }
1474 case <-p.quit.Wait():
1475 break out
1476 }
1477 }
1478 // Drain any wait channels before we go away so we don't leave something waiting for us.
1479 for e := pendingMsgs.Front(); e != nil; e = pendingMsgs.Front() {
1480 val := pendingMsgs.Remove(e)
1481 msg := val.(outMsg)
1482 if msg.doneChan != nil {
1483 msg.doneChan <- struct{}{}
1484 }
1485 }
1486 cleanup:
1487 for {
1488 select {
1489 case msg := <-p.outputQueue:
1490 if msg.doneChan != nil {
1491 msg.doneChan <- struct{}{}
1492 }
1493 case <-p.outputInvChan:
1494 // Just drain channel sendDoneQueue is buffered so doesn't need draining.
1495 default:
1496 break cleanup
1497 }
1498 }
1499 p.queueQuit.Q()
1500 T.Ln("peer queue handler done for", p)
1501 }
1502 1503 // shouldLogWriteError returns whether or not the passed error, which is expected to have come from writing to the
1504 // remote peer in the outHandler, should be logged.
1505 func (p *Peer) shouldLogWriteError(e error) bool {
1506 // No logging when the peer is being forcibly disconnected.
1507 if atomic.LoadInt32(&p.disconnect) != 0 {
1508 return false
1509 }
1510 // No logging when the remote peer has been disconnected.
1511 if e == io.EOF {
1512 return false
1513 }
1514 if opErr, ok := e.(*net.OpError); ok && !opErr.Temporary() {
1515 return false
1516 }
1517 return true
1518 }
1519 1520 // outHandler handles all outgoing messages for the peer.
1521 //
1522 // It must be run as a goroutine.
1523 //
1524 // It uses a buffered channel to serialize output messages while allowing the sender to continue running asynchronously.
1525 func (p *Peer) outHandler() {
1526 T.Ln("starting outHandler for", p.addr)
1527 out:
1528 for {
1529 select {
1530 case msg := <-p.sendQueue:
1531 switch m := msg.msg.(type) {
1532 case *wire.MsgPing:
1533 // Only expects a pong message in later protocol versions. Also set up statistics.
1534 if p.ProtocolVersion() > wire.BIP0031Version {
1535 p.statsMtx.Lock()
1536 p.lastPingNonce = m.Nonce
1537 p.lastPingTime = time.Now()
1538 p.statsMtx.Unlock()
1539 }
1540 }
1541 p.stallControl <- stallControlMsg{sccSendMessage, msg.msg}
1542 e := p.writeMessage(msg.msg, msg.encoding)
1543 if e != nil {
1544 p.Disconnect()
1545 if p.shouldLogWriteError(e) {
1546 E.F("failed to send message to %s: %v", p, e)
1547 }
1548 if msg.doneChan != nil {
1549 msg.doneChan <- struct{}{}
1550 }
1551 continue
1552 }
1553 // At this point, the message was successfully sent, so update the last send time, signal the sender of the
1554 // message that it has been sent ( if requested), and signal the send queue to the deliver the next queued
1555 // message.
1556 atomic.StoreInt64(&p.lastSend, time.Now().Unix())
1557 if msg.doneChan != nil {
1558 msg.doneChan <- struct{}{}
1559 }
1560 p.sendDoneQueue <- struct{}{}
1561 case <-p.quit.Wait():
1562 break out
1563 }
1564 }
1565 <-p.queueQuit
1566 // Drain any wait channels before we go away so we don't leave something waiting for us. We have waited on queueQuit
1567 // and thus we can be sure that we will not miss anything sent on sendQueue.
1568 cleanup:
1569 for {
1570 select {
1571 case msg := <-p.sendQueue:
1572 if msg.doneChan != nil {
1573 msg.doneChan <- struct{}{}
1574 }
1575 // no need to send on sendDoneQueue since queueHandler has been waited on and already exited.
1576 default:
1577 break cleanup
1578 }
1579 }
1580 p.outQuit.Q()
1581 T.Ln("peer output handler done for", p)
1582 }
1583 1584 // pingHandler periodically pings the peer. It must be run as a goroutine.
1585 func (p *Peer) pingHandler() {
1586 T.Ln("starting pingHandler for", p.addr)
1587 pingTicker := time.NewTicker(pingInterval)
1588 defer pingTicker.Stop()
1589 out:
1590 for {
1591 select {
1592 case <-pingTicker.C:
1593 nonce, e := wire.RandomUint64()
1594 if e != nil {
1595 E.F("not sending ping to %s: %v", p, e)
1596 continue
1597 }
1598 p.QueueMessage(wire.NewMsgPing(nonce), nil)
1599 case <-p.quit.Wait():
1600 break out
1601 }
1602 }
1603 }
1604 1605 // QueueMessage adds the passed bitcoin message to the peer send queue. This function is safe for concurrent access.
1606 func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) {
1607 p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding)
1608 }
1609 1610 // QueueMessageWithEncoding adds the passed bitcoin message to the peer send queue. This function is identical to
1611 // QueueMessage, however it allows the caller to specify the wire encoding type that should be used when
1612 // encoding/decoding blocks and transactions.
1613 //
1614 // This function is safe for concurrent access.
1615 func (p *Peer) QueueMessageWithEncoding(
1616 msg wire.Message, doneChan chan<- struct{},
1617 encoding wire.MessageEncoding,
1618 ) {
1619 // Avoid risk of deadlock if goroutine already exited. The goroutine we will be sending to hangs around until it
1620 // knows for a fact that it is marked as disconnected and *then* it drains the channels.
1621 if !p.Connected() {
1622 if doneChan != nil {
1623 go func() {
1624 doneChan <- struct{}{}
1625 }()
1626 }
1627 return
1628 }
1629 p.outputQueue <- outMsg{msg: msg, encoding: encoding, doneChan: doneChan}
1630 }
1631 1632 // QueueInventory adds the passed inventory to the inventory send queue which might not be sent right away, rather it is
1633 // trickled to the peer in batches.
1634 //
1635 // Inventory that the peer is already known to have is ignored.
1636 //
1637 // This function is safe for concurrent access.
1638 func (p *Peer) QueueInventory(invVect *wire.InvVect) {
1639 // Don't add the inventory to the send queue if the peer is already known to have it.
1640 if p.knownInventory.Exists(invVect) {
1641 return
1642 }
1643 // Avoid risk of deadlock if goroutine already exited. The goroutine we will be sending to hangs around until it
1644 // knows for a fact that it is marked as disconnected and *then* it drains the channels.
1645 if !p.Connected() {
1646 return
1647 }
1648 p.outputInvChan <- invVect
1649 }
1650 1651 // Connected returns whether or not the peer is currently connected. This function is safe for concurrent access.
1652 func (p *Peer) Connected() bool {
1653 return atomic.LoadInt32(&p.connected) != 0 &&
1654 atomic.LoadInt32(&p.disconnect) == 0
1655 }
1656 1657 // Disconnect disconnects the peer by closing the connection. Calling this function when the peer is already
1658 // disconnected or in the process of disconnecting will have no effect.
1659 func (p *Peer) Disconnect() {
1660 if atomic.AddInt32(&p.disconnect, 1) != 1 {
1661 return
1662 }
1663 T.Ln("disconnecting", p, log.Caller("from", 1))
1664 if atomic.LoadInt32(&p.connected) != 0 {
1665 _ = p.conn.Close()
1666 }
1667 p.quit.Q()
1668 }
1669 1670 // readRemoteVersionMsg waits for the next message to arrive from the remote peer. If the next message is not a version
1671 // message or the version is not acceptable then return an error.
1672 func (p *Peer) readRemoteVersionMsg() (msg *wire.MsgVersion, e error) {
1673 if p.versionKnown {
1674 D.Ln("received version previously, dropping")
1675 return
1676 }
1677 // Read their version message.
1678 var remoteMsg wire.Message
1679 remoteMsg, _, e = p.readMessage(wire.LatestEncoding)
1680 if e != nil {
1681 if e != io.EOF {
1682 }
1683 return
1684 }
1685 // Notify and disconnect clients if the first message is not a version message.
1686 var ok bool
1687 msg, ok = remoteMsg.(*wire.MsgVersion)
1688 if !ok {
1689 reason := "a version message must precede all others"
1690 rejectMsg := wire.NewMsgReject(
1691 msg.Command(), wire.RejectMalformed,
1692 reason,
1693 )
1694 _ = p.writeMessage(rejectMsg, wire.LatestEncoding)
1695 e = errors.New(reason)
1696 return
1697 }
1698 // Detect self connections.
1699 if !AllowSelfConns && SentNonces.Exists(msg.Nonce) {
1700 e = errors.New("disconnecting peer connected to self")
1701 return
1702 }
1703 // Negotiate the protocol version and set the services to what the remote peer advertised.
1704 p.flagsMtx.Lock()
1705 p.Nonce = msg.Nonce
1706 p.advertisedProtoVer = uint32(msg.ProtocolVersion)
1707 p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
1708 p.versionKnown = true
1709 p.services = msg.Services
1710 p.flagsMtx.Unlock()
1711 T.F(
1712 "negotiated protocol version %d for peer %s",
1713 p.protocolVersion, p,
1714 )
1715 // Updating a bunch of stats including block based stats, and the peer's time offset.
1716 p.statsMtx.Lock()
1717 p.lastBlock = msg.LastBlock
1718 p.startingHeight = msg.LastBlock
1719 p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
1720 p.statsMtx.Unlock()
1721 // Set the peer's ID, user agent, and potentially the flag which specifies the
1722 // witness support is enabled.
1723 p.flagsMtx.Lock()
1724 p.id = atomic.AddInt32(&nodeCount, 1)
1725 p.userAgent = msg.UserAgent
1726 // // Determine if the peer would like to receive witness data with transactions,
1727 // // or not.
1728 // if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
1729 // p.witnessEnabled = true
1730 // }
1731 p.flagsMtx.Unlock()
1732 // // Once the version message has been exchanged, we're able to determine if this
1733 // // peer knows how to encode witness data over the wire protocol. If so, then
1734 // // we'll switch to a decoding mode which is prepared for the new transaction
1735 // // format introduced as part of BIP0144.
1736 // if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
1737 // p.wireEncoding = wire.BaseEncoding
1738 // }
1739 // Invoke the callback if specified.
1740 if p.cfg.Listeners.OnVersion != nil {
1741 I.Ln("writing version message")
1742 rejectMsg := p.cfg.Listeners.OnVersion(p, msg)
1743 if rejectMsg != nil {
1744 _ = p.writeMessage(rejectMsg, wire.LatestEncoding)
1745 e = errors.New(rejectMsg.Reason)
1746 return
1747 }
1748 }
1749 // Notify and disconnect clients that have a protocol version that is too old.
1750 //
1751 // NOTE: If minAcceptableProtocolVersion is raised to be higher than wire.RejectVersion, this should send a reject
1752 // packet before disconnecting.
1753 if uint32(msg.ProtocolVersion) < MinAcceptableProtocolVersion {
1754 // Send a reject message indicating the protocol version is obsolete
1755 // and wait for the message to be sent before disconnecting.
1756 reason := fmt.Sprintf(
1757 "protocol version must be %d or greater",
1758 MinAcceptableProtocolVersion,
1759 )
1760 rejectMsg := wire.NewMsgReject(
1761 msg.Command(), wire.RejectObsolete,
1762 reason,
1763 )
1764 _ = p.writeMessage(rejectMsg, wire.LatestEncoding)
1765 e = errors.New(reason)
1766 return
1767 }
1768 return
1769 }
1770 1771 // localVersionMsg creates a version message that can be used to send to the remote peer.
1772 func (p *Peer) localVersionMsg() (mv *wire.MsgVersion, e error) {
1773 var blockNum int32
1774 if p.cfg.NewestBlock != nil {
1775 _, blockNum, e = p.cfg.NewestBlock()
1776 if e != nil {
1777 return nil, e
1778 }
1779 }
1780 theirNA := p.na
1781 // If we are behind a proxy and the connection comes from the proxy then we return an non routeable address as their
1782 // address. This is to prevent leaking the tor proxy address.
1783 if p.cfg.Proxy != "" {
1784 var proxyAddress string
1785 proxyAddress, _, e = net.SplitHostPort(p.cfg.Proxy)
1786 // invalid proxy means poorly configured, be on the safe side.
1787 if e != nil || p.na.IP.String() == proxyAddress {
1788 theirNA = wire.NewNetAddressIPPort(
1789 []byte{0, 0, 0, 0}, 0,
1790 theirNA.Services,
1791 )
1792 }
1793 }
1794 // Create a wire.NetAddress with only the services set to use as the "addrme" in the version message.
1795 //
1796 // Older nodes previously added the IP and port information to the address manager which proved to be unreliable as
1797 // an inbound connection from a peer didn't necessarily mean the peer itself accepted inbound connections.
1798 //
1799 // Also, the timestamp is unused in the version message.
1800 // I.Ln(p.addr)
1801 // var h string
1802 // var port string
1803 // if h, port, e = net.SplitHostPort(p.addr); E.Chk(e) {
1804 // }
1805 // var portN int64
1806 // if portN, e = strconv.ParseInt(port, 10, 64); E.Chk(e) {
1807 // }
1808 // ipAddr := net.ParseIP(h)
1809 ourNA := &wire.NetAddress{
1810 Timestamp: time.Now(),
1811 Services: p.cfg.Services,
1812 IP: p.IP,
1813 Port: p.Port,
1814 }
1815 // Generate a unique Nonce for this peer so self connections can be detected. This is accomplished by adding it to a
1816 // size-limited map of recently seen nonces.
1817 nonce := uint64(rand.Int63())
1818 SentNonces.Add(nonce)
1819 // Version message.
1820 msg := wire.NewMsgVersion(ourNA, theirNA, nonce, blockNum)
1821 e = msg.AddUserAgent(
1822 p.cfg.UserAgentName, p.cfg.UserAgentVersion,
1823 p.cfg.UserAgentComments...,
1824 )
1825 if e != nil {
1826 }
1827 // Advertise local services.
1828 msg.Services = p.cfg.Services
1829 // Advertise our max supported protocol version.
1830 msg.ProtocolVersion = int32(p.cfg.ProtocolVersion)
1831 // Advertise if inv messages for transactions are desired.
1832 msg.DisableRelayTx = p.cfg.DisableRelayTx
1833 return msg, nil
1834 }
1835 1836 // writeLocalVersionMsg writes our version message to the remote peer.
1837 func (p *Peer) writeLocalVersionMsg() (msg *wire.MsgVersion, e error) {
1838 if msg, e = p.localVersionMsg(); E.Chk(e) {
1839 return
1840 }
1841 return msg, p.writeMessage(msg, wire.LatestEncoding)
1842 }
1843 1844 // negotiateInboundProtocol waits to receive a version message from the peer then sends our version message.
1845 //
1846 // If the events do not occur in that order then it returns an error.
1847 func (p *Peer) negotiateInboundProtocol() (msg *wire.MsgVersion, e error) {
1848 if msg, e = p.readRemoteVersionMsg(); E.Chk(e) {
1849 return
1850 }
1851 return p.writeLocalVersionMsg()
1852 }
1853 1854 // negotiateOutboundProtocol sends our version message then waits to receive a version message from the peer.
1855 //
1856 // If the events do not occur in that order then it returns an error.
1857 func (p *Peer) negotiateOutboundProtocol() (msg *wire.MsgVersion, e error) {
1858 if msg, e = p.writeLocalVersionMsg(); E.Chk(e) {
1859 return
1860 }
1861 return p.readRemoteVersionMsg()
1862 }
1863 1864 // start begins processing input and output messages.
1865 func (p *Peer) start(msgChan chan *wire.MsgVersion) (e error) {
1866 T.Ln("starting peer", p, p.LocalAddr())
1867 negotiateErr := make(chan error, 1)
1868 go func() {
1869 var ee error
1870 var msg *wire.MsgVersion
1871 if p.inbound {
1872 if msg, ee = p.negotiateInboundProtocol(); E.Chk(ee) {
1873 negotiateErr <- ee
1874 }
1875 } else {
1876 if msg, e = p.negotiateOutboundProtocol(); E.Chk(ee) {
1877 negotiateErr <- ee
1878 }
1879 }
1880 I.Ln("sending version message back")
1881 msgChan <- msg
1882 I.Ln("sent version message back")
1883 negotiateErr <- nil
1884 }()
1885 // Negotiate the protocol within the specified negotiateTimeout.
1886 select {
1887 case e = <-negotiateErr:
1888 if e != nil {
1889 if e != io.EOF {
1890 }
1891 p.Disconnect()
1892 return
1893 }
1894 case <-time.After(negotiateTimeout):
1895 p.Disconnect()
1896 e = errors.New("protocol negotiation timeout")
1897 return
1898 }
1899 T.Ln("connected to", p)
1900 // The protocol has been negotiated successfully so start processing input and output messages.
1901 go p.stallHandler()
1902 go p.inHandler()
1903 go p.queueHandler()
1904 go p.outHandler()
1905 go p.pingHandler()
1906 // Send our verack message now that the IO processing machinery has started.
1907 p.QueueMessage(wire.NewMsgVerAck(), nil)
1908 return
1909 }
1910 1911 // AssociateConnection associates the given conn to the peer. Calling this function when the peer is already connected
1912 // will have no effect.
1913 func (p *Peer) AssociateConnection(conn net.Conn) (msgChan chan *wire.MsgVersion) {
1914 // Already connected?
1915 if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) {
1916 I.Ln("already connected to peer", conn.RemoteAddr(), conn.LocalAddr())
1917 return
1918 }
1919 p.conn = conn
1920 p.timeConnected = time.Now()
1921 if p.inbound {
1922 p.addr = p.conn.RemoteAddr().String()
1923 // Set up a NetAddress for the peer to be used with AddrManager.
1924 //
1925 // We only do this inbound because outbound set this up at connection time and no point recomputing.
1926 na, e := newNetAddress(p.conn.RemoteAddr(), p.services)
1927 if e != nil {
1928 E.Ln("cannot create remote net address:", e)
1929 p.Disconnect()
1930 return
1931 }
1932 p.na = na
1933 }
1934 msgChan = make(chan *wire.MsgVersion, 1)
1935 I.Ln("starting peer", conn.RemoteAddr(), conn.LocalAddr())
1936 go func() {
1937 if e := p.start(msgChan); E.Chk(e) {
1938 D.F("cannot start peer %v: %v", p, e)
1939 p.Disconnect()
1940 }
1941 I.Ln("finished starting peer", conn.RemoteAddr(), conn.LocalAddr())
1942 }()
1943 I.Ln("returning meanwhile starting peer", conn.RemoteAddr(), conn.LocalAddr())
1944 return
1945 }
1946 1947 // WaitForDisconnect waits until the peer has completely disconnected and all resources are cleaned up. This will happen
1948 // if either the local or remote side has been disconnected or the peer is forcibly disconnected via Disconnect.
1949 func (p *Peer) WaitForDisconnect() {
1950 <-p.quit
1951 }
1952 1953 // newPeerBase returns a new base bitcoin peer based on the inbound flag. This is used by the NewInboundPeer and
1954 // NewOutboundPeer functions to perform base setup needed by both types of peers.
1955 func newPeerBase(origCfg *Config, inbound bool) *Peer {
1956 // Default to the max supported protocol version if not specified by the caller.
1957 cfg := *origCfg // Copy to avoid mutating caller.
1958 if cfg.ProtocolVersion == 0 {
1959 cfg.ProtocolVersion = MaxProtocolVersion
1960 }
1961 // Set the chain parameters to testnet if the caller did not specify any.
1962 if cfg.ChainParams == nil {
1963 cfg.ChainParams = &chaincfg.TestNet3Params
1964 }
1965 // Set the trickle interval if a non-positive value is specified.
1966 if cfg.TrickleInterval <= 0 {
1967 cfg.TrickleInterval = DefaultTrickleInterval
1968 }
1969 p := Peer{
1970 inbound: inbound,
1971 wireEncoding: wire.BaseEncoding,
1972 knownInventory: newMruInventoryMap(maxKnownInventory),
1973 stallControl: make(chan stallControlMsg, 1), // nonblocking sync
1974 outputQueue: make(chan outMsg, outputBufferSize),
1975 sendQueue: make(chan outMsg, 1), // nonblocking sync
1976 sendDoneQueue: qu.Ts(1), // nonblocking sync
1977 outputInvChan: make(chan *wire.InvVect, outputBufferSize),
1978 inQuit: qu.T(),
1979 queueQuit: qu.T(),
1980 outQuit: qu.T(),
1981 quit: qu.T(),
1982 cfg: cfg, // Copy so caller can't mutate.
1983 services: cfg.Services,
1984 protocolVersion: cfg.ProtocolVersion,
1985 IP: origCfg.IP,
1986 Port: origCfg.Port,
1987 }
1988 return &p
1989 }
1990 1991 // NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin processing incoming and outgoing messages.
1992 func NewInboundPeer(cfg *Config) *Peer {
1993 return newPeerBase(cfg, true)
1994 }
1995 1996 // NewOutboundPeer returns a new outbound bitcoin peer.
1997 func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
1998 p := newPeerBase(cfg, false)
1999 p.addr = addr
2000 host, portStr, e := net.SplitHostPort(addr)
2001 if e != nil {
2002 return nil, e
2003 }
2004 port, e := strconv.ParseUint(portStr, 10, 16)
2005 if e != nil {
2006 return nil, e
2007 }
2008 if cfg.HostToNetAddress != nil {
2009 na, e := cfg.HostToNetAddress(host, uint16(port), 0)
2010 if e != nil {
2011 return nil, e
2012 }
2013 p.na = na
2014 } else {
2015 p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
2016 }
2017 return p, nil
2018 }
2019 2020 func init() {
2021 2022 rand.Seed(time.Now().UnixNano())
2023 }
2024