1 package chainrpc
2 3 import (
4 "bytes"
5 "crypto/rand"
6 "crypto/tls"
7 "encoding/binary"
8 "errors"
9 "fmt"
10 "math"
11 "net"
12 "os"
13 "os/exec"
14 "runtime"
15 "sort"
16 "strconv"
17 "strings"
18 "sync"
19 "sync/atomic"
20 "time"
21 22 "github.com/p9c/p9/pkg/qu"
23 24 "github.com/p9c/p9/pkg/log"
25 "github.com/p9c/p9/pkg/amt"
26 block2 "github.com/p9c/p9/pkg/block"
27 "github.com/p9c/p9/pkg/chainrpc/peersummary"
28 "github.com/p9c/p9/pkg/fork"
29 "github.com/p9c/p9/pkg/interrupt"
30 "github.com/p9c/p9/pkg/mining"
31 "github.com/p9c/p9/pod/config"
32 33 uberatomic "go.uber.org/atomic"
34 35 "github.com/p9c/p9/cmd/node/active"
36 "github.com/p9c/p9/pkg/addrmgr"
37 "github.com/p9c/p9/pkg/blockchain"
38 "github.com/p9c/p9/pkg/bloom"
39 "github.com/p9c/p9/pkg/chaincfg"
40 "github.com/p9c/p9/pkg/chainhash"
41 "github.com/p9c/p9/pkg/connmgr"
42 "github.com/p9c/p9/pkg/database"
43 "github.com/p9c/p9/pkg/indexers"
44 "github.com/p9c/p9/pkg/mempool"
45 "github.com/p9c/p9/pkg/netsync"
46 "github.com/p9c/p9/pkg/peer"
47 "github.com/p9c/p9/pkg/txscript"
48 "github.com/p9c/p9/pkg/upnp"
49 "github.com/p9c/p9/pkg/util"
50 "github.com/p9c/p9/pkg/wire"
51 "github.com/p9c/p9/version"
52 )
53 54 const DefaultMaxOrphanTxSize = 100000
55 56 type (
57 // BroadcastInventoryAdd is a type used to declare that the InvVect it contains needs to be added to the rebroadcast
58 // map
59 BroadcastInventoryAdd RelayMsg
60 // BroadcastInventoryDel is a type used to declare that the InvVect it contains needs to be removed from the
61 // rebroadcast map
62 BroadcastInventoryDel *wire.InvVect
63 // BroadcastMsg provides the ability to house a bitcoin message to be broadcast to all connected peers except
64 // specified excluded peers.
65 BroadcastMsg struct {
66 Message wire.Message
67 ExcludePeers []*NodePeer
68 }
69 // CFHeaderKV is a tuple of a filter header and its associated block hash. The struct is used to cache cfcheckpt
70 // responses.
71 CFHeaderKV struct {
72 BlockHash chainhash.Hash
73 FilterHeader chainhash.Hash
74 }
75 // CheckpointSorter implements sort.Interface to allow a slice of checkpoints to be sorted.
76 CheckpointSorter []chaincfg.Checkpoint
77 ConnectNodeMsg struct {
78 Addr string
79 Permanent bool
80 Reply chan error
81 }
82 DisconnectNodeMsg struct {
83 Cmp func(*NodePeer) bool
84 Reply chan error
85 }
86 GetAddedNodesMsg struct {
87 Reply chan []*NodePeer
88 }
89 GetConnCountMsg struct {
90 Reply chan int32
91 }
92 GetOutboundGroup struct {
93 Key string
94 Reply chan int
95 }
96 GetPeersMsg struct {
97 Reply chan []*NodePeer
98 }
99 // OnionAddr implements the net.Addr interface and represents a tor address.
100 OnionAddr struct {
101 Addr string
102 }
103 // PeerState maintains state of inbound, persistent, outbound peers as well as banned peers and outbound groups.
104 PeerState struct {
105 InboundPeers map[int32]*NodePeer
106 OutboundPeers map[int32]*NodePeer
107 PersistentPeers map[int32]*NodePeer
108 Banned map[string]time.Time
109 OutboundGroups map[string]int
110 }
111 // RelayMsg packages an inventory vector along with the newly discovered inventory so the relay has access to that
112 // information.
113 RelayMsg struct {
114 InvVect *wire.InvVect
115 Data interface{}
116 }
117 RemoveNodeMsg struct {
118 Cmp func(*NodePeer) bool
119 Reply chan error
120 }
121 // Node provides a bitcoin Node for handling communications to and from bitcoin peers.
122 Node struct {
123 // The following variables must only be used atomically. Putting the uint64s first makes them 64-bit aligned for
124 // 32-bit systems.
125 BytesReceived uint64 // Total bytes received from all peers since start.
126 BytesSent uint64 // Total bytes sent by all peers since start.
127 StartupTime int64
128 ChainParams *chaincfg.Params
129 AddrManager *addrmgr.AddrManager
130 ConnManager *connmgr.ConnManager
131 SigCache *txscript.SigCache
132 HashCache *txscript.HashCache
133 RPCServers []*Server
134 SyncManager *netsync.SyncManager
135 Chain *blockchain.BlockChain
136 TxMemPool *mempool.TxPool
137 CPUMiner *exec.Cmd
138 ModifyRebroadcastInv chan interface{}
139 NewPeers chan *NodePeer
140 DonePeers chan *NodePeer
141 BanPeers chan *NodePeer
142 PeerState chan chan peersummary.PeerSummaries
143 Query chan interface{}
144 RelayInv chan RelayMsg
145 Broadcast chan BroadcastMsg
146 PeerHeightsUpdate chan UpdatePeerHeightsMsg
147 WG sync.WaitGroup
148 Quit qu.C
149 NAT upnp.NAT
150 DB database.DB
151 TimeSource blockchain.MedianTimeSource
152 Services wire.ServiceFlag
153 // The following fields are used for optional indexes. They will be nil if the associated index is not enabled.
154 //
155 // These fields are set during initial creation of the server and never changed afterwards, so they do not need
156 // to be protected for concurrent access.
157 TxIndex *indexers.TxIndex
158 AddrIndex *indexers.AddrIndex
159 CFIndex *indexers.CFIndex
160 // The fee estimator keeps track of how long transactions are left in the mempool before they are mined into
161 // blocks.
162 FeeEstimator *mempool.FeeEstimator
163 // CFCheckptCaches stores a cached slice of filter headers for cfcheckpt messages for each filter type.
164 CFCheckptCaches map[wire.FilterType][]CFHeaderKV
165 CFCheckptCachesMtx sync.RWMutex
166 Config *config.Config
167 ActiveNet *chaincfg.Params
168 StateCfg *active.Config
169 GenThreads uint32
170 Started int32
171 Shutdown int32
172 ShutdownSched int32
173 HighestKnown uberatomic.Int32
174 peerState *PeerState
175 StartController, StopController qu.C
176 }
177 // NodePeer extends the peer to maintain state shared by the server and the blockmanager.
178 NodePeer struct {
179 *peer.Peer
180 // The following variables must only be used atomically
181 FeeFilter int64
182 ConnReq *connmgr.ConnReq
183 Server *Node
184 ContinueHash *chainhash.Hash
185 RelayMtx sync.Mutex
186 Filter *bloom.Filter
187 KnownAddresses map[string]struct{}
188 BanScore connmgr.DynamicBanScore
189 Quit qu.C
190 // The following chans are used to sync blockmanager and server.
191 TxProcessed qu.C
192 BlockProcessed qu.C
193 SentAddrs bool
194 IsWhitelisted bool
195 Persistent bool
196 DisableRelayTx bool
197 IP net.IP
198 Port uint16
199 }
200 // SimpleAddr implements the net.Addr interface with two struct fields
201 SimpleAddr struct {
202 Net, Addr string
203 }
204 // UpdatePeerHeightsMsg is a message sent from the blockmanager to the server after a new block has been accepted.
205 // The purpose of the message is to update the heights of peers that were known to announce the block before we
206 // connected it to the main chain or recognized it as an orphan.
207 //
208 // With these updates, peer heights will be kept up to date, allowing for fresh data when selecting sync peer
209 // candidacy.
210 UpdatePeerHeightsMsg struct {
211 NewHash *chainhash.Hash
212 NewHeight int32
213 OriginPeer *peer.Peer
214 }
215 )
216 217 const (
218 // DefaultServices describes the default services that are supported by the server.
219 DefaultServices = wire.SFNodeNetwork | wire.SFNodeBloom |
220 /*wire.SFNodeWitness |*/ wire.SFNodeCF
221 // DefaultRequiredServices describes the default services that are required to be supported by outbound peers.
222 DefaultRequiredServices = wire.SFNodeNetwork
223 // DefaultTargetOutbound is the default number of outbound peers to target.
224 DefaultTargetOutbound = 125
225 // ConnectionRetryInterval is the base amount of time to wait in between retries
226 // when connecting to persistent peers. It is adjusted by the number of retries
227 // such that there is a retry backoff.
228 ConnectionRetryInterval = time.Minute
229 )
230 231 var (
232 // Ensure simpleAddr implements the net.Addr interface.
233 _ net.Addr = SimpleAddr{}
234 // UserAgentName is the user agent name and is used to help identify ourselves
235 // to peers.
236 UserAgentName = "pod"
237 // UserAgentVersion is the user agent version and is used to help identify
238 // ourselves to peers.
239 UserAgentVersion = version.Tag
240 // zeroHash is the zero value hash (all zeros). It is defined as a convenience.
241 zeroHash chainhash.Hash
242 )
243 244 // Network returns "onion". This is part of the net.Addr interface.
245 func (oa *OnionAddr) Network() string {
246 return "onion"
247 }
248 249 // String returns the onion address. This is part of the net.Addr interface.
250 func (oa *OnionAddr) String() string {
251 return oa.Addr
252 }
253 254 // Count returns the count of all known peers.
255 func (ps *PeerState) Count() int {
256 return len(ps.InboundPeers) + len(ps.OutboundPeers) +
257 len(ps.PersistentPeers)
258 }
259 260 // ForAllOutboundPeers is a helper function that runs closure on all outbound
261 // peers known to peerState.
262 func (ps *PeerState) ForAllOutboundPeers(closure func(sp *NodePeer)) {
263 for _, e := range ps.OutboundPeers {
264 closure(e)
265 }
266 for _, e := range ps.PersistentPeers {
267 closure(e)
268 }
269 }
270 271 // ForAllPeers is a helper function that runs closure on all peers known to peerState.
272 func (ps *PeerState) ForAllPeers(closure func(sp *NodePeer)) {
273 for _, e := range ps.InboundPeers {
274 closure(e)
275 }
276 ps.ForAllOutboundPeers(closure)
277 }
278 279 // AddBytesReceived adds the passed number of bytes to the total bytes received counter for the server.
280 //
281 // It is safe for concurrent access.
282 func (n *Node) AddBytesReceived(bytesReceived uint64) {
283 atomic.AddUint64(&n.BytesReceived, bytesReceived)
284 }
285 286 // AddBytesSent adds the passed number of bytes to the total bytes sent counter for the server.
287 //
288 // It is safe for concurrent access.
289 func (n *Node) AddBytesSent(bytesSent uint64) {
290 atomic.AddUint64(&n.BytesSent, bytesSent)
291 }
292 293 // AddPeer adds a new peer that has already been connected to the server.
294 func (n *Node) AddPeer(sp *NodePeer) {
295 n.NewPeers <- sp
296 }
297 298 // AddRebroadcastInventory adds 'iv' to the list of inventories to be rebroadcasted at random intervals until they show
299 // up in a block.
300 func (n *Node) AddRebroadcastInventory(iv *wire.InvVect, data interface{}) {
301 // Ignore if shutting down.
302 if atomic.LoadInt32(&n.Shutdown) != 0 {
303 return
304 }
305 n.ModifyRebroadcastInv <- BroadcastInventoryAdd{InvVect: iv, Data: data}
306 }
307 308 // AnnounceNewTransactions generates and relays inventory vectors and notifies both websocket and getblocktemplate long
309 // poll clients of the passed transactions.
310 //
311 // This function should be called whenever new transactions are added to the mempool.
312 func (n *Node) AnnounceNewTransactions(txns []*mempool.TxDesc) {
313 // Generate and relay inventory vectors for all newly accepted transactions.
314 n.RelayTransactions(txns)
315 // Notify both websocket and getblocktemplate long poll clients of all newly accepted transactions.
316 for i := range n.RPCServers {
317 if n.RPCServers[i] != nil {
318 n.RPCServers[i].NotifyNewTransactions(txns)
319 }
320 }
321 }
322 323 // BanPeer bans a peer that has already been connected to the server by ip.
324 func (n *Node) BanPeer(sp *NodePeer) {
325 n.BanPeers <- sp
326 }
327 328 // BroadcastMessage sends msg to all peers currently connected to the server except those in the passed peers to
329 // exclude.
330 func (n *Node) BroadcastMessage(msg wire.Message, exclPeers ...*NodePeer) {
331 // XXX: Need to determine if this is an alert that has already been broadcast and refrain from broadcasting again.
332 bmsg := BroadcastMsg{Message: msg, ExcludePeers: exclPeers}
333 n.Broadcast <- bmsg
334 }
335 336 // ConnectedCount returns the number of currently connected peers.
337 func (n *Node) ConnectedCount() int32 {
338 replyChan := make(chan int32)
339 n.Query <- GetConnCountMsg{Reply: replyChan}
340 return <-replyChan
341 }
342 343 // NetTotals returns the sum of all bytes received and sent across the network for all peers.
344 //
345 // It is safe for concurrent access.
346 func (n *Node) NetTotals() (uint64, uint64) {
347 return atomic.LoadUint64(&n.BytesReceived),
348 atomic.LoadUint64(&n.BytesSent)
349 }
350 351 // OutboundGroupCount returns the number of peers connected to the given outbound group key.
352 func (n *Node) OutboundGroupCount(
353 key string,
354 ) int {
355 replyChan := make(chan int)
356 n.Query <- GetOutboundGroup{Key: key, Reply: replyChan}
357 return <-replyChan
358 }
359 360 // RelayInventory relays the passed inventory vector to all connected peers that are not already known to have it.
361 func (n *Node) RelayInventory(invVect *wire.InvVect, data interface{}) {
362 n.RelayInv <- RelayMsg{InvVect: invVect, Data: data}
363 }
364 365 // RemoveRebroadcastInventory removes 'iv' from the list of items to be rebroadcasted if present.
366 func (n *Node) RemoveRebroadcastInventory(iv *wire.InvVect) {
367 // Log<-cl.Debug{emoveBroadcastInventory"
368 // Ignore if shutting down.
369 if atomic.LoadInt32(&n.Shutdown) != 0 {
370 // Log<-cl.Debug{gnoring due to shutdown"
371 return
372 }
373 n.ModifyRebroadcastInv <- BroadcastInventoryDel(iv)
374 }
375 376 // ScheduleShutdown schedules a server shutdown after the specified duration. It also dynamically adjusts how often to
377 // warn the server is going down based on remaining duration.
378 func (n *Node) ScheduleShutdown(duration time.Duration) {
379 // Don't schedule shutdown more than once.
380 if atomic.AddInt32(&n.ShutdownSched, 1) != 1 {
381 return
382 }
383 W.F("server shutdown in %v", duration)
384 go func() {
385 remaining := duration
386 tickDuration := DynamicTickDuration(remaining)
387 done := time.After(remaining)
388 ticker := time.NewTicker(tickDuration)
389 out:
390 for {
391 select {
392 case <-done:
393 ticker.Stop()
394 e := n.Stop()
395 if e != nil {
396 }
397 break out
398 case <-ticker.C:
399 remaining -= -tickDuration
400 if remaining < time.Second {
401 continue
402 }
403 // Change tick duration dynamically based on remaining time.
404 newDuration := DynamicTickDuration(remaining)
405 if tickDuration != newDuration {
406 tickDuration = newDuration
407 ticker.Stop()
408 ticker = time.NewTicker(tickDuration)
409 }
410 W.F("server shutdown in %v", remaining)
411 }
412 }
413 }()
414 }
415 416 // Start begins accepting connections from peers.
417 func (n *Node) Start() {
418 // Already started?
419 if atomic.AddInt32(&n.Started, 1) != 1 {
420 return
421 }
422 D.Ln("starting server")
423 // Server startup time. Used for the uptime command for uptime calculation.
424 n.StartupTime = time.Now().Unix()
425 // Start the peer handler which in turn starts the address and block managers.
426 n.WG.Add(1)
427 go n.PeerHandler()
428 if n.NAT != nil {
429 n.WG.Add(1)
430 go n.UPNPUpdateThread()
431 }
432 if n.Config.DisableRPC.False() {
433 n.WG.Add(1)
434 // Start the rebroadcastHandler, which ensures user tx received by the RPC server are rebroadcast until being
435 // included in a block.
436 go n.RebroadcastHandler()
437 for i := range n.RPCServers {
438 n.RPCServers[i].Start()
439 }
440 }
441 // // Start the CPU miner if generation is enabled.
442 // if *n.Config.Generate && *n.Config.GenThreads != 0 {
443 // D.Ln("starting miner")
444 // args := []string{os.Args[0], "-D", *n.Config.DataDir}
445 // if *n.Config.KopachGUI {
446 // args = append(args, "--kopachgui")
447 // }
448 // args = append(args, "kopach")
449 // // args = apputil.PrependForWindows(args)
450 // n.StateCfg.Miner = consume.Log(n.Quit, func(ent *log.Entry) (e error) {
451 // D.Ln(ent.Level, ent.Time, ent.Text, ent.CodeLocation)
452 // return
453 // }, func(pkg string) (out bool) {
454 // return false
455 // }, args...)
456 // consume.Start(n.StateCfg.Miner)
457 // // defer consume.Kill(n.StateCfg.Miner)
458 // }
459 // interrupt.AddHandler(func() {
460 // // Stop the CPU miner if needed
461 // consume.Kill(n.StateCfg.Miner)
462 // D.Ln("miner has stopped")
463 // })
464 }
465 466 // Stop gracefully shuts down the server by stopping and disconnecting all peers and the main listener.
467 func (n *Node) Stop() (e error) {
468 D.Ln("stopping chain rpc server")
469 // Make sure this only happens once.
470 if atomic.AddInt32(&n.Shutdown, 1) != 1 {
471 D.Ln("server is already in the process of shutting down")
472 return nil
473 }
474 T.Ln("node shutting down")
475 476 // Shutdown the RPC server if it'n not disabled.
477 if !n.Config.DisableRPC.True() {
478 for i := range n.RPCServers {
479 e = n.RPCServers[i].Stop()
480 if e != nil {
481 }
482 }
483 }
484 // Save fee estimator state in the database.
485 if e = n.DB.Update(
486 func(tx database.Tx) (e error) {
487 metadata := tx.Metadata()
488 if e = metadata.Put(mempool.EstimateFeeDatabaseKey, n.FeeEstimator.Save()); E.Chk(e) {
489 }
490 return nil
491 },
492 ); E.Chk(e) {
493 }
494 // Stop the CPU miner if needed
495 // consume.Kill(n.StateCfg.Miner)
496 // D.Ln("miner has stopped")
497 D.Ln("Signal the remaining goroutines to quit.")
498 n.Quit.Q()
499 return
500 }
501 502 // TransactionConfirmed has one confirmation on the main chain. Now we can mark it as no longer needing rebroadcasting.
503 func (n *Node) TransactionConfirmed(tx *util.Tx) {
504 // Rebroadcasting is only necessary when the RPC server is active.
505 for i := range n.RPCServers {
506 if n.RPCServers[i] == nil {
507 return
508 }
509 }
510 iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
511 n.RemoveRebroadcastInventory(iv)
512 }
513 514 // UpdatePeerHeights updates the heights of all peers who have have announced the latest connected main chain block, or
515 // a recognized orphan.
516 //
517 // These height updates allow us to dynamically refresh peer heights, ensuring sync peer selection has access to the
518 // latest block heights for each peer.
519 func (n *Node) UpdatePeerHeights(
520 latestBlkHash *chainhash.Hash,
521 latestHeight int32, updateSource *peer.Peer,
522 ) {
523 n.PeerHeightsUpdate <- UpdatePeerHeightsMsg{
524 NewHash: latestBlkHash,
525 NewHeight: latestHeight,
526 OriginPeer: updateSource,
527 }
528 }
529 530 // WaitForShutdown blocks until the main listener and peer handlers are stopped.
531 func (n *Node) WaitForShutdown() {
532 n.WG.Wait()
533 }
534 535 // HandleAddPeerMsg deals with adding new peers. It is invoked from the peerHandler goroutine.
536 func (n *Node) HandleAddPeerMsg(state *PeerState, sp *NodePeer) bool {
537 I.Ln("HandleAddPeerMsg")
538 if sp == nil {
539 return false
540 }
541 // Ignore new peers if we're shutting down.
542 if atomic.LoadInt32(&n.Shutdown) != 0 {
543 I.F("new peer %n ignored - server is shutting down", sp)
544 sp.Disconnect()
545 return false
546 }
547 // Disconnect banned peers.
548 var host string
549 var e error
550 host, _, e = net.SplitHostPort(sp.Addr())
551 if e != nil {
552 E.Ln("can't split host/port", e)
553 sp.Disconnect()
554 return false
555 }
556 if banEnd, ok := state.Banned[host]; ok {
557 if time.Now().Before(banEnd) {
558 D.F(
559 "peer %n is banned for another %v - disconnecting %n",
560 host, time.Until(banEnd),
561 )
562 sp.Disconnect()
563 return false
564 }
565 I.F("peer %n is no longer banned", host)
566 delete(state.Banned, host)
567 }
568 // TODO: Chk for max peers from a single IP.
569 570 // Limit max number of total peers.
571 if state.Count() >= n.Config.MaxPeers.V() {
572 I.F(
573 "max peers reached [%d] - disconnecting peer %n",
574 n.Config.MaxPeers, sp.Addr(),
575 )
576 sp.Disconnect()
577 // TODO: how to handle permanent peers here? they should be rescheduled.
578 return false
579 }
580 // I.S(state.InboundPeers)
581 // I.S(state.OutboundPeers)
582 // I.S(state.PersistentPeers)
583 584 // for i := range state.InboundPeers {
585 // if state.InboundPeers[i].LocalAddr() == sp.Addr() {
586 //
587 // }
588 // }
589 // for i := range state.OutboundPeers {
590 //
591 // }
592 593 // D.Ln(
594 // state.OutboundPeers[i].UserAgent(),
595 // state.InboundPeers[j].UserAgent(),
596 // sp.UserAgent(),
597 // state.OutboundPeers[i].LocalAddr().String(),
598 // state.InboundPeers[j].LocalAddr().String(),
599 // sp.Addr(),
600 // state.OutboundPeers[i].Addr(),
601 // state.InboundPeers[j].Addr(),
602 // sp.Addr(),
603 // )
604 // if strings.Contains(sp.UserAgent(), "nonce") &&
605 // state.OutboundPeers[i].UserAgent() == sp.UserAgent() ||
606 // state.InboundPeers[j].UserAgent() == sp.UserAgent() ||
607 // state.OutboundPeers[i].LocalAddr().String() == sp.Addr() ||
608 // state.InboundPeers[j].LocalAddr().String() == sp.Addr() ||
609 // state.OutboundPeers[i].Addr() == sp.Addr() ||
610 // state.InboundPeers[j].Addr() == sp.Addr() {
611 // D.Ln("already have connection to peer with UAC", sp.UserAgent())
612 //
613 // sp.Disconnect()
614 // }
615 // for i := range state.InboundPeers {
616 // D.Ln("inbound peer:", state.InboundPeers[i].LocalAddr(), state.InboundPeers[i].Addr())
617 // }
618 // for i := range state.OutboundPeers {
619 // D.Ln("outbound peer:", state.OutboundPeers[i].LocalAddr(), state.OutboundPeers[i].Addr())
620 // }
621 // Add the new peer and start it.
622 D.Ln("new peer", sp.UserAgent(), sp.Addr(), sp.LocalAddr(), sp.Inbound())
623 if sp.Inbound() {
624 state.InboundPeers[sp.ID()] = sp
625 } else {
626 state.OutboundGroups[addrmgr.GroupKey(sp.NA())]++
627 if sp.Persistent {
628 state.PersistentPeers[sp.ID()] = sp
629 } else {
630 state.OutboundPeers[sp.ID()] = sp
631 }
632 }
633 return true
634 }
635 636 // HandleBanPeerMsg deals with banning peers. It is invoked from the peerHandler goroutine.
637 func (n *Node) HandleBanPeerMsg(state *PeerState, sp *NodePeer) {
638 var host string
639 var e error
640 host, _, e = net.SplitHostPort(sp.Addr())
641 if e != nil {
642 E.F("can't split ban peer %n %v %n", sp.Addr(), e)
643 return
644 }
645 direction := log.DirectionString(sp.Inbound())
646 I.F("banned peer %n (%n) for %v", host, direction, *n.Config.BanDuration)
647 state.Banned[host] = time.Now().Add(n.Config.BanDuration.V())
648 }
649 650 // HandleBroadcastMsg deals with broadcasting messages to peers. It is invoked from the peerHandler goroutine.
651 func (n *Node) HandleBroadcastMsg(state *PeerState, bmsg *BroadcastMsg) {
652 state.ForAllPeers(
653 func(sp *NodePeer) {
654 if !sp.Connected() {
655 return
656 }
657 for _, ep := range bmsg.ExcludePeers {
658 if sp == ep {
659 return
660 }
661 }
662 sp.QueueMessage(bmsg.Message, nil)
663 },
664 )
665 }
666 667 // HandleDonePeerMsg deals with peers that have signalled they are done. It is invoked from the peerHandler goroutine.
668 func (n *Node) HandleDonePeerMsg(state *PeerState, sp *NodePeer) {
669 var list map[int32]*NodePeer
670 switch {
671 case sp.Persistent:
672 list = state.PersistentPeers
673 case sp.Inbound():
674 list = state.InboundPeers
675 default:
676 list = state.OutboundPeers
677 }
678 if _, ok := list[sp.ID()]; ok {
679 if !sp.Inbound() && sp.VersionKnown() {
680 state.OutboundGroups[addrmgr.GroupKey(sp.NA())]--
681 }
682 if !sp.Inbound() && sp.ConnReq != nil {
683 n.ConnManager.Disconnect(sp.ConnReq.ID())
684 }
685 delete(list, sp.ID())
686 T.Ln("removed peer ", sp)
687 return
688 }
689 if sp.ConnReq != nil {
690 n.ConnManager.Disconnect(sp.ConnReq.ID())
691 }
692 // Update the address' last seen time if the peer has acknowledged our version and has sent us its version as well.
693 if sp.VerAckReceived() && sp.VersionKnown() && sp.NA() != nil {
694 n.AddrManager.Connected(sp.NA())
695 }
696 // If we get here it means that either we didn't know about the peer or we purposefully deleted it.
697 }
698 699 // HandleQuery is the central handler for all queries and commands from other goroutines related to peer state.
700 //
701 // Previously this counts two if the same node was connected outbound and then connected back inbound. The nonce given
702 // in a Version message is now added to the Peer struct and then as this iterates the connected peers list, it adds
703 // nonces from Peers marked connected to a map, thus excluding double-counting, and returns this value.
704 //
705 // No idea why it was not written to exclude keeping multiple peers open like this, since a connection is a duplex
706 // channel, but at least now the ConnectedCount query will provide the correct numbers (this was changed in order to
707 // allow identifying local area network nodes so a non-internet test environment can be created
708 func (n *Node) HandleQuery(state *PeerState, querymsg interface{}) {
709 switch msg := querymsg.(type) {
710 case GetConnCountMsg:
711 nonces := make(map[string]struct{})
712 nonce := ""
713 state.ForAllPeers(
714 func(sp *NodePeer) {
715 // D.Ln(sp.UserAgent())
716 ua := strings.Split(sp.UserAgent(), "nonce")
717 if len(ua) < 2 {
718 nonce = fmt.Sprintf("%s/%s", sp.Peer.LocalAddr().String(), sp.Peer.Addr())
719 } else {
720 nonce = fmt.Sprintf(
721 "%s/%s", ua[1][:8],
722 strings.Split(sp.Peer.LocalAddr().String(), ":")[0],
723 )
724 }
725 _, ok := nonces[nonce]
726 if !ok {
727 if sp.Connected() {
728 nonces[nonce] = struct{}{}
729 // nconnected++
730 }
731 }
732 },
733 )
734 // D.Ln(nonces)
735 msg.Reply <- int32(len(nonces))
736 case GetPeersMsg:
737 peers := make([]*NodePeer, 0, state.Count())
738 state.ForAllPeers(
739 func(sp *NodePeer) {
740 if !sp.Connected() {
741 return
742 }
743 peers = append(peers, sp)
744 },
745 )
746 msg.Reply <- peers
747 case ConnectNodeMsg:
748 // TODO: duplicate oneshots? Limit max number of total peers.
749 if state.Count() >= n.Config.MaxPeers.V() {
750 msg.Reply <- errors.New("max peers reached")
751 return
752 }
753 for _, nodePeer := range state.PersistentPeers {
754 if nodePeer.Addr() == msg.Addr || nodePeer.Peer.LocalAddr().String() == msg.Addr {
755 if msg.Permanent {
756 msg.Reply <- errors.New("nodePeer already connected")
757 } else {
758 msg.Reply <- errors.New("nodePeer exists as a permanent nodePeer")
759 }
760 return
761 }
762 }
763 for _, nodePeer := range state.InboundPeers {
764 if nodePeer.Addr() == msg.Addr || nodePeer.Peer.LocalAddr().String() == msg.Addr {
765 if msg.Permanent {
766 msg.Reply <- errors.New("nodePeer already connected inbound")
767 } else {
768 msg.Reply <- errors.New("nodePeer exists as a permanent nodePeer inbound")
769 }
770 return
771 }
772 }
773 for _, nodePeer := range state.OutboundPeers {
774 if nodePeer.Addr() == msg.Addr || nodePeer.Peer.LocalAddr().String() == msg.Addr {
775 if msg.Permanent {
776 msg.Reply <- errors.New("nodePeer already connected outbound inbound")
777 } else {
778 msg.Reply <- errors.New("nodePeer exists as a permanent nodePeer")
779 }
780 return
781 }
782 }
783 netAddr, e := AddrStringToNetAddr(n.Config, n.StateCfg, msg.Addr)
784 if e != nil {
785 msg.Reply <- e
786 return
787 }
788 // TODO: if too many, nuke a non-perm nodePeer.
789 go n.ConnManager.Connect(
790 &connmgr.ConnReq{
791 Addr: netAddr,
792 Permanent: msg.Permanent,
793 },
794 )
795 msg.Reply <- nil
796 case RemoveNodeMsg:
797 found := DisconnectPeer(
798 state.PersistentPeers, msg.Cmp, func(sp *NodePeer) {
799 // Keep group counts ok since we remove from the list now.
800 state.OutboundGroups[addrmgr.GroupKey(sp.NA())]--
801 },
802 )
803 if found {
804 msg.Reply <- nil
805 } else {
806 msg.Reply <- errors.New("nodePeer not found")
807 }
808 case GetOutboundGroup:
809 count, ok := state.OutboundGroups[msg.Key]
810 if ok {
811 msg.Reply <- count
812 } else {
813 msg.Reply <- 0
814 }
815 // Request a list of the persistent (added) peers.
816 case GetAddedNodesMsg:
817 // Respond with a slice of the relevant peers.
818 peers := make([]*NodePeer, 0, len(state.PersistentPeers))
819 for _, sp := range state.PersistentPeers {
820 peers = append(peers, sp)
821 }
822 msg.Reply <- peers
823 case DisconnectNodeMsg:
824 // Chk inbound peers. We pass a nil callback since we don't require any additional actions on disconnect for
825 // inbound peers.
826 found := DisconnectPeer(state.InboundPeers, msg.Cmp, nil)
827 if found {
828 msg.Reply <- nil
829 return
830 }
831 // Chk outbound peers.
832 found = DisconnectPeer(
833 state.OutboundPeers, msg.Cmp, func(sp *NodePeer) {
834 // Keep group counts ok since we remove from the list now.
835 state.OutboundGroups[addrmgr.GroupKey(sp.NA())]--
836 },
837 )
838 if found {
839 // If there are multiple outbound connections to the same ip:port, continue disconnecting them all until no
840 // such peers are found.
841 for found {
842 found = DisconnectPeer(
843 state.OutboundPeers, msg.Cmp,
844 func(sp *NodePeer) {
845 state.OutboundGroups[addrmgr.GroupKey(sp.NA())]--
846 },
847 )
848 }
849 msg.Reply <- nil
850 return
851 }
852 msg.Reply <- errors.New("nodePeer not found")
853 }
854 }
855 856 // HandleRelayInvMsg deals with relaying inventory to peers that are not already known to have it. It is invoked from
857 // the peerHandler goroutine.
858 func (n *Node) HandleRelayInvMsg(state *PeerState, msg RelayMsg) {
859 state.ForAllPeers(
860 func(sp *NodePeer) {
861 if !sp.Connected() {
862 return
863 }
864 // If the inventory is a block and the peer prefers headers, generate and send a headers message instead of an
865 // inventory message.
866 if msg.InvVect.Type == wire.InvTypeBlock && sp.WantsHeaders() {
867 blockHeader, ok := msg.Data.(wire.BlockHeader)
868 if !ok {
869 W.Ln("underlying data for headers is not a block header")
870 return
871 }
872 msgHeaders := wire.NewMsgHeaders()
873 if e := msgHeaders.AddBlockHeader(&blockHeader); E.Chk(e) {
874 E.Ln("failed to add block header:", e)
875 return
876 }
877 sp.QueueMessage(msgHeaders, nil)
878 return
879 }
880 if msg.InvVect.Type == wire.InvTypeTx {
881 // Don't relay the transaction to the peer when it has transaction relaying disabled.
882 if sp.IsRelayTxDisabled() {
883 return
884 }
885 txD, ok := msg.Data.(*mempool.TxDesc)
886 if !ok {
887 W.F("underlying data for tx inv relay is not a *mempool.TxDesc: %Ter", msg.Data)
888 return
889 }
890 // Don't relay the transaction if the transaction fee-per-kb is less than the peer'n feefilter.
891 feeFilter := atomic.LoadInt64(&sp.FeeFilter)
892 if feeFilter > 0 && txD.FeePerKB < feeFilter {
893 return
894 }
895 // Don't relay the transaction if there is a bloom filter loaded and the transaction doesn't match it.
896 if sp.Filter.IsLoaded() {
897 if !sp.Filter.MatchTxAndUpdate(txD.Tx) {
898 return
899 }
900 }
901 }
902 // Queue the inventory to be relayed with the next batch. It will be ignored if the peer is already known to
903 // have the inventory.
904 sp.QueueInventory(msg.InvVect)
905 },
906 )
907 }
908 909 // HandleUpdatePeerHeights updates the heights of all peers who were known to announce a block we recently accepted.
910 func (n *Node) HandleUpdatePeerHeights(
911 state *PeerState,
912 umsg UpdatePeerHeightsMsg,
913 ) {
914 state.ForAllPeers(
915 func(sp *NodePeer) {
916 // The origin peer should already have the updated height.
917 if sp.Peer == umsg.OriginPeer {
918 return
919 }
920 // This is a pointer to the underlying memory which doesn't change.
921 latestBlkHash := sp.LastAnnouncedBlock()
922 // Skip this peer if it hasn't recently announced any new blocks.
923 if latestBlkHash == nil {
924 return
925 }
926 // If the peer has recently announced a block, and this block matches our newly accepted block, then update
927 // their block height.
928 if *latestBlkHash == *umsg.NewHash {
929 sp.UpdateLastBlockHeight(umsg.NewHeight)
930 sp.UpdateLastAnnouncedBlock(nil)
931 }
932 },
933 )
934 }
935 936 // InboundPeerConnected is invoked by the connection manager when a new inbound connection is established. It
937 // initializes a new inbound server peer instance, associates it with the connection, and starts a goroutine to wait for
938 // disconnection.
939 func (n *Node) InboundPeerConnected(conn net.Conn) {
940 ca := conn.RemoteAddr().String()
941 var h string
942 var e error
943 if h, _, e = net.SplitHostPort(ca); E.Chk(e) {
944 }
945 remoteIP := net.ParseIP(h)
946 cla := conn.LocalAddr().String()
947 var hh string
948 if hh, _, e = net.SplitHostPort(cla); E.Chk(e) {
949 }
950 localIP := net.ParseIP(hh)
951 I.Ln("inbound peer connected", ca, cla, remoteIP)
952 sp := NewServerPeer(n, localIP, false)
953 sp.IsWhitelisted = GetIsWhitelisted(n.StateCfg, conn.RemoteAddr())
954 sp.Peer = peer.NewInboundPeer(NewPeerConfig(sp))
955 _ = sp.AssociateConnection(conn)
956 go n.PeerDoneHandler(sp)
957 // go func() {
958 // msg := <-msgChan
959 // n.peerState.ForAllPeers(
960 // func(np *NodePeer) {
961 // if np.IP.Equal(msg.AddrMe.IP) && np.Port == msg.AddrMe.Port && np.ID() != sp.ID() {
962 // I.Ln("disconnecting inbound peer we are connected outbound to")
963 // sp.Disconnect()
964 // }
965 // // check also that the address of origin matches the one in the message, if this
966 // // is wrong and not zero it is being spoofed and is suspicious
967 // if !msg.AddrMe.IP.Equal(net.IP{0, 0, 0, 0}) && msg.AddrYou.IP.String() != remoteIP.String() {
968 // W.Ln("disconnecting peer", remoteIP, "who is sending message with non-matching IP", msg.AddrYou.IP)
969 // sp.Disconnect()
970 // }
971 // },
972 // )
973 // }()
974 }
975 976 // OutboundPeerConnected is invoked by the connection manager when a new
977 // outbound connection is established. It initializes a new outbound server peer
978 // instance, associates it with the relevant state such as the connection
979 // request instance and the connection itself, and finally notifies the address
980 // manager of the attempt.
981 //
982 // TODO: the serverpeer should attach to the same connection as the inbound
983 // connection?
984 func (n *Node) OutboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
985 ca := conn.RemoteAddr().String()
986 cla := conn.LocalAddr().String()
987 I.Ln("outbound peer connected", ca, cla)
988 var hh string
989 var e error
990 if hh, _, e = net.SplitHostPort(cla); E.Chk(e) {
991 }
992 localIP := net.ParseIP(hh)
993 sp := NewServerPeer(n, localIP, c.Permanent)
994 p, e := peer.NewOutboundPeer(NewPeerConfig(sp), c.Addr.String())
995 if e != nil {
996 E.F("cannot create outbound peer %n: %v %n", c.Addr, e)
997 n.ConnManager.Disconnect(c.ID())
998 }
999 sp.Peer = p
1000 sp.ConnReq = c
1001 sp.IsWhitelisted = GetIsWhitelisted(n.StateCfg, conn.RemoteAddr())
1002 _ = sp.AssociateConnection(conn)
1003 go n.PeerDoneHandler(sp)
1004 // go func() {
1005 // msg := <-msgChan
1006 // n.peerState.ForAllPeers(
1007 // func(np *NodePeer) {
1008 // if np.IP.Equal(msg.AddrMe.IP) && np.Port == msg.AddrMe.Port && np.ID() != sp.ID() {
1009 // I.Ln("disconnecting outbound peer we are connected inbound to")
1010 // sp.Disconnect()
1011 // }
1012 // },
1013 // )
1014 // }()
1015 n.AddrManager.Attempt(sp.NA())
1016 }
1017 1018 // PeerDoneHandler handles peer disconnects by notifiying the server that it's done along with other performing other
1019 // desirable cleanup.
1020 func (n *Node) PeerDoneHandler(sp *NodePeer) {
1021 sp.WaitForDisconnect()
1022 n.DonePeers <- sp
1023 // Only tell sync manager we are gone if we ever told it we existed.
1024 if sp.VersionKnown() {
1025 n.SyncManager.DonePeer(sp.Peer)
1026 // Evict any remaining orphans that were sent by the peer.
1027 numEvicted := n.TxMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID()))
1028 if numEvicted > 0 {
1029 D.F(
1030 "Evicted %d %n from peer %v (id %d)",
1031 numEvicted, log.PickNoun(int(numEvicted), "orphan", "orphans"),
1032 sp, sp.ID(),
1033 )
1034 }
1035 }
1036 sp.Quit.Q()
1037 }
1038 1039 // PeerHandler is used to handle peer operations such as adding and removing peers to and from the server, banning
1040 // peers, and broadcasting messages to peers. It must be run in a goroutine.
1041 func (n *Node) PeerHandler() {
1042 // Start the address manager and sync manager, both of which are needed by peers. This is done here since their
1043 // lifecycle is closely tied to this handler and rather than adding more channels to synchronize things, it's easier
1044 // and slightly faster to simply start and stop them in this handler.
1045 n.AddrManager.Start()
1046 n.SyncManager.Start()
1047 T.Ln("starting peer handler")
1048 n.peerState = &PeerState{
1049 InboundPeers: make(map[int32]*NodePeer),
1050 PersistentPeers: make(map[int32]*NodePeer),
1051 OutboundPeers: make(map[int32]*NodePeer),
1052 Banned: make(map[string]time.Time),
1053 OutboundGroups: make(map[string]int),
1054 }
1055 if !n.Config.DisableDNSSeed.True() || len(n.Config.ConnectPeers.S()) < 0 {
1056 // Add peers discovered through DNS to the address manager.
1057 connmgr.SeedFromDNS(
1058 n.ActiveNet, DefaultRequiredServices,
1059 Lookup(n.StateCfg), func(addrs []*wire.NetAddress) {
1060 // Bitcoind uses a lookup of the dns seeder here. This is rather strange since the values looked up by
1061 // the DNS seed lookups will vary quite a lot. To replicate this behaviour we put all addresses as
1062 // having come from the first one.
1063 D.Ln("adding addresses")
1064 n.AddrManager.AddAddresses(addrs, addrs[0])
1065 },
1066 )
1067 }
1068 T.Ln("starting connmgr")
1069 go n.ConnManager.Start()
1070 out:
1071 for {
1072 select {
1073 // queries for current peer summary list
1074 case qc := <-n.PeerState:
1075 go func() {
1076 T.Ln("handling peer summary query")
1077 // flatten the list of
1078 res := make(map[int32]*NodePeer)
1079 for i := range n.peerState.InboundPeers {
1080 res[i] = n.peerState.InboundPeers[i]
1081 }
1082 for i := range n.peerState.OutboundPeers {
1083 res[i] = n.peerState.OutboundPeers[i]
1084 }
1085 for i := range n.peerState.PersistentPeers {
1086 res[i] = n.peerState.PersistentPeers[i]
1087 }
1088 var ps peersummary.PeerSummaries
1089 for i := range res {
1090 if res[i].Connected() {
1091 ps = append(
1092 ps, peersummary.PeerSummary{
1093 IP: res[i].Peer.NA().IP,
1094 Inbound: res[i].Inbound(),
1095 },
1096 )
1097 }
1098 }
1099 // send back the answer
1100 T.Ln("sending back peer summary")
1101 // D.S(ps)
1102 qc <- ps
1103 }()
1104 // New peers connected to the server.
1105 case p := <-n.NewPeers:
1106 n.HandleAddPeerMsg(n.peerState, p)
1107 // Disconnected peers.
1108 case p := <-n.DonePeers:
1109 n.HandleDonePeerMsg(n.peerState, p)
1110 // Block accepted in mainchain or orphan, update peer height.
1111 case umsg := <-n.PeerHeightsUpdate:
1112 n.HandleUpdatePeerHeights(n.peerState, umsg)
1113 // Peer to ban.
1114 case p := <-n.BanPeers:
1115 n.HandleBanPeerMsg(n.peerState, p)
1116 // New inventory to potentially be relayed to other peers.
1117 case invMsg := <-n.RelayInv:
1118 n.HandleRelayInvMsg(n.peerState, invMsg)
1119 // Message to broadcast to all connected peers except those which are excluded by the message.
1120 case bmsg := <-n.Broadcast:
1121 n.HandleBroadcastMsg(n.peerState, &bmsg)
1122 case qmsg := <-n.Query:
1123 n.HandleQuery(n.peerState, qmsg)
1124 case <-n.Quit.Wait():
1125 D.Ln("chain peer server shutting down")
1126 // Disconnect all peers on server shutdown.
1127 n.peerState.ForAllPeers(
1128 func(sp *NodePeer) {
1129 T.F("shutdown peer %n", sp.Addr())
1130 sp.Disconnect()
1131 },
1132 )
1133 break out
1134 }
1135 }
1136 n.ConnManager.Stop()
1137 e := n.SyncManager.Stop()
1138 if e != nil {
1139 }
1140 e = n.AddrManager.Stop()
1141 if e != nil {
1142 }
1143 // Drain channels before exiting so nothing is left waiting around to send.
1144 cleanup:
1145 for {
1146 select {
1147 case <-n.NewPeers:
1148 case <-n.DonePeers:
1149 case <-n.PeerHeightsUpdate:
1150 case <-n.RelayInv:
1151 case <-n.Broadcast:
1152 case <-n.Query:
1153 default:
1154 break cleanup
1155 }
1156 }
1157 n.WG.Done()
1158 T.F("peer handler done")
1159 }
1160 1161 // PushBlockMsg sends a block message for the provided block hash to the connected peer. An error is returned if the
1162 // block hash is not known.
1163 func (n *Node) PushBlockMsg(
1164 sp *NodePeer, hash *chainhash.Hash,
1165 doneChan chan<- struct{}, waitChan qu.C,
1166 encoding wire.MessageEncoding,
1167 ) (e error) {
1168 // Fetch the raw block bytes from the database.
1169 var blockBytes []byte
1170 e = sp.Server.DB.View(
1171 func(dbTx database.Tx) (e error) {
1172 blockBytes, e = dbTx.FetchBlock(hash)
1173 return e
1174 },
1175 )
1176 if e != nil {
1177 E.F(
1178 "unable to fetch requested block hash %v: %v",
1179 hash, e,
1180 )
1181 if doneChan != nil {
1182 doneChan <- struct{}{}
1183 }
1184 return e
1185 }
1186 // Deserialize the block.
1187 var msgBlock wire.Block
1188 e = msgBlock.Deserialize(bytes.NewReader(blockBytes))
1189 if e != nil {
1190 E.F(
1191 "unable to deserialize requested block hash %v: %v",
1192 hash, e,
1193 )
1194 if doneChan != nil {
1195 doneChan <- struct{}{}
1196 }
1197 return e
1198 }
1199 // Once we have fetched data wait for any previous operation to finish.
1200 if waitChan != nil {
1201 <-waitChan
1202 }
1203 // We only send the channel for this message if we aren't sending an inv straight after.
1204 var dc chan<- struct{}
1205 continueHash := sp.ContinueHash
1206 sendInv := continueHash != nil && continueHash.IsEqual(hash)
1207 if !sendInv {
1208 dc = doneChan
1209 }
1210 sp.QueueMessageWithEncoding(&msgBlock, dc, encoding)
1211 // When the peer requests the final block that was advertised in response to a getblocks message which requested
1212 // more blocks than would fit into a single message, send it a new inventory message to trigger it to issue another
1213 // getblocks message for the next batch of inventory.
1214 if sendInv {
1215 best := sp.Server.Chain.BestSnapshot()
1216 invMsg := wire.NewMsgInvSizeHint(1)
1217 iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash)
1218 e := invMsg.AddInvVect(iv)
1219 if e != nil {
1220 }
1221 sp.QueueMessage(invMsg, doneChan)
1222 sp.ContinueHash = nil
1223 }
1224 return nil
1225 }
1226 1227 // PushMerkleBlockMsg sends a merkleblock message for the provided block hash to the connected peer. Since a merkle
1228 // block requires the peer to have a filter loaded, this call will simply be ignored if there is no filter loaded.
1229 //
1230 // An error is returned if the block hash is not known.
1231 func (n *Node) PushMerkleBlockMsg(
1232 sp *NodePeer, hash *chainhash.Hash,
1233 doneChan chan<- struct{}, waitChan qu.C,
1234 encoding wire.MessageEncoding,
1235 ) (e error) {
1236 // Do not send a response if the peer doesn't have a filter loaded.
1237 if !sp.Filter.IsLoaded() {
1238 if doneChan != nil {
1239 doneChan <- struct{}{}
1240 }
1241 return nil
1242 }
1243 // Fetch the raw block bytes from the database.
1244 blk, e := sp.Server.Chain.BlockByHash(hash)
1245 if e != nil {
1246 E.F(
1247 "unable to fetch requested block hash %v: %v",
1248 hash, e,
1249 )
1250 if doneChan != nil {
1251 doneChan <- struct{}{}
1252 }
1253 return e
1254 }
1255 // Generate a merkle block by filtering the requested block according to the filter for the peer.
1256 merkle, matchedTxIndices := bloom.NewMerkleBlock(blk, sp.Filter)
1257 // Once we have fetched data wait for any previous operation to finish.
1258 if waitChan != nil {
1259 <-waitChan
1260 }
1261 // Send the merkleblock. Only send the done channel with this message if no transactions will be sent afterwards.
1262 var dc chan<- struct{}
1263 if len(matchedTxIndices) == 0 {
1264 dc = doneChan
1265 }
1266 sp.QueueMessage(merkle, dc)
1267 // Finally, send any matched transactions.
1268 blkTransactions := blk.WireBlock().Transactions
1269 for i, txIndex := range matchedTxIndices {
1270 // Only send the done channel on the final transaction.
1271 var dc chan<- struct{}
1272 if i == len(matchedTxIndices)-1 {
1273 dc = doneChan
1274 }
1275 if txIndex < uint32(len(blkTransactions)) {
1276 sp.QueueMessageWithEncoding(
1277 blkTransactions[txIndex], dc,
1278 encoding,
1279 )
1280 }
1281 }
1282 return nil
1283 }
1284 1285 // PushTxMsg sends a tx message for the provided transaction hash to the connected peer.
1286 //
1287 // An error is returned if the transaction hash is not known.
1288 func (n *Node) PushTxMsg(
1289 sp *NodePeer, hash *chainhash.Hash,
1290 doneChan chan<- struct{}, waitChan qu.C,
1291 encoding wire.MessageEncoding,
1292 ) (e error) {
1293 // Attempt to fetch the requested transaction from the pool. A call could be made to check for existence first, but
1294 // simply trying to fetch a missing transaction results in the same behavior.
1295 tx, e := n.TxMemPool.FetchTransaction(hash)
1296 if e != nil {
1297 E.F("unable to fetch tx %v from transaction pool: %v", hash, e)
1298 if doneChan != nil {
1299 doneChan <- struct{}{}
1300 }
1301 return e
1302 }
1303 // Once we have fetched data wait for any previous operation to finish.
1304 if waitChan != nil {
1305 <-waitChan
1306 }
1307 sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding)
1308 return nil
1309 }
1310 1311 // RebroadcastHandler keeps track of user submitted inventories that we have sent out but have not yet made it into a
1312 // block. We periodically rebroadcast them in case our peers restarted or otherwise lost track of them.
1313 func (n *Node) RebroadcastHandler() {
1314 // Wait 5 min before first tx rebroadcast.
1315 timer := time.NewTimer(5 * time.Minute)
1316 pendingInvs := make(map[wire.InvVect]interface{})
1317 out:
1318 for {
1319 select {
1320 case riv := <-n.ModifyRebroadcastInv:
1321 switch msg := riv.(type) {
1322 // Incoming InvVects are added to our map of RPC txs.
1323 case BroadcastInventoryAdd:
1324 pendingInvs[*msg.InvVect] = msg.Data
1325 // When an InvVect has been added to a block, we can now remove it, if it was present.
1326 case BroadcastInventoryDel:
1327 if _, ok := pendingInvs[*msg]; ok {
1328 delete(pendingInvs, *msg)
1329 }
1330 }
1331 case <-timer.C:
1332 // Any inventory we have has not made it into a block yet. We periodically resubmit them until they have.
1333 for iv, data := range pendingInvs {
1334 ivCopy := iv
1335 n.RelayInventory(&ivCopy, data)
1336 }
1337 // Process at a random time up to 30mins (in seconds) in the future.
1338 timer.Reset(
1339 time.Second *
1340 time.Duration(RandomUint16Number(1800)),
1341 )
1342 case <-n.Quit.Wait():
1343 break out
1344 // default:
1345 }
1346 }
1347 timer.Stop()
1348 // Drain channels before exiting so nothing is left waiting around to send.
1349 cleanup:
1350 for {
1351 select {
1352 case <-n.ModifyRebroadcastInv:
1353 default:
1354 break cleanup
1355 }
1356 }
1357 n.WG.Done()
1358 }
1359 1360 // RelayTransactions generates and relays inventory vectors for all of the
1361 // passed transactions to all connected peers.
1362 func (n *Node) RelayTransactions(txns []*mempool.TxDesc) {
1363 for _, txD := range txns {
1364 iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash())
1365 n.RelayInventory(iv, txD)
1366 }
1367 }
1368 func (n *Node) UPNPUpdateThread() {
1369 // Go off immediately to prevent code duplication, thereafter we renew lease
1370 // every 15 minutes.
1371 timer := time.NewTimer(0 * time.Second)
1372 lport, _ := strconv.ParseInt(n.ActiveNet.DefaultPort, 10, 16)
1373 first := true
1374 out:
1375 for {
1376 select {
1377 case <-timer.C:
1378 // TODO: pick external port more cleverly
1379 // TODO: know which ports we are listening to on an external net.
1380 // TODO: if specific listen port doesn't work then ask for wildcard
1381 // listen port?
1382 // XXX this assumes timeout is in seconds.
1383 listenPort, e := n.NAT.AddPortMapping(
1384 "tcp", int(lport), int(lport), "pod listen port",
1385 20*60,
1386 )
1387 if e != nil {
1388 E.F("can't add UPnP port mapping: %v %n", e)
1389 }
1390 if first && e == nil {
1391 // TODO: look this up periodically to see if upnp domain changed and so did ip.
1392 externalip, e := n.NAT.GetExternalAddress()
1393 if e != nil {
1394 E.F("UPnP can't get external address: %v", e)
1395 continue out
1396 }
1397 na := wire.NewNetAddressIPPort(externalip, uint16(listenPort), n.Services)
1398 e = n.AddrManager.AddLocalAddress(na, addrmgr.UpnpPrio)
1399 if e != nil {
1400 _ = e
1401 // XXX DeletePortMapping?
1402 }
1403 W.F("successfully bound via UPnP to %n", addrmgr.NetAddressKey(na))
1404 first = false
1405 }
1406 timer.Reset(time.Minute * 15)
1407 case <-n.Quit.Wait():
1408 break out
1409 }
1410 }
1411 timer.Stop()
1412 if e := n.NAT.DeletePortMapping(
1413 "tcp", int(lport),
1414 int(lport),
1415 ); E.Chk(e) {
1416 D.F("unable to remove UPnP port mapping: %v %n", e)
1417 } else {
1418 D.Ln("successfully cleared UPnP port mapping")
1419 }
1420 n.WG.Done()
1421 }
1422 1423 // OnAddr is invoked when a peer receives an addr bitcoin message and is used to notify the server about advertised addresses.
1424 func (np *NodePeer) OnAddr(
1425 _ *peer.Peer,
1426 msg *wire.MsgAddr,
1427 ) {
1428 // Ignore addresses when running on the simulation test network. This helps prevent the network from becoming
1429 // another public test network since it will not be able to learn about other peers that have not specifically been
1430 // provided.
1431 if (np.Server.Config.Network.V())[0] == 's' {
1432 return
1433 }
1434 // Ignore old style addresses which don't include a timestamp.
1435 if np.ProtocolVersion() < wire.NetAddressTimeVersion {
1436 return
1437 }
1438 // A message that has no addresses is invalid.
1439 if len(msg.AddrList) == 0 {
1440 E.F(
1441 "command [%s] from %s does not contain any addresses",
1442 msg.Command(), np.Peer,
1443 )
1444 np.Disconnect()
1445 return
1446 }
1447 for _, na := range msg.AddrList {
1448 // Don't add more address if we're disconnecting.
1449 if !np.Connected() {
1450 return
1451 }
1452 // Set the timestamp to 5 days ago if it's more than 24 hours in the future so this address is one of the first
1453 // to be removed when space is needed.
1454 now := time.Now()
1455 if na.Timestamp.After(now.Add(time.Minute * 10)) {
1456 na.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
1457 }
1458 // Add address to known addresses for this peer.
1459 np.AddKnownAddresses([]*wire.NetAddress{na})
1460 }
1461 // Add addresses to server address manager. The address manager handles the details of things such as preventing
1462 // duplicate addresses, max addresses, and last seen updates. XXX bitcoind gives a 2 hour time penalty here, do we
1463 // want to do the same?
1464 np.Server.AddrManager.AddAddresses(msg.AddrList, np.NA())
1465 }
1466 1467 // OnBlock is invoked when a peer receives a block bitcoin message. It blocks until the bitcoin block has been fully
1468 // processed.
1469 func (np *NodePeer) OnBlock(p *peer.Peer, msg *wire.Block, buf []byte) {
1470 T.Ln("OnBlock from", p.Addr())
1471 // Convert the raw Block to a util.Block which provides some convenience
1472 // methods and things such as hash caching.
1473 block := block2.NewFromBlockAndBytes(msg, buf)
1474 // Add the block to the known inventory for the peer.
1475 iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
1476 np.AddKnownInventory(iv)
1477 // Queue the block up to be handled by the block manager and intentionally block further receives until the bitcoin
1478 // block is fully processed and known good or bad.
1479 //
1480 // This helps prevent a malicious peer from queuing up a bunch of bad blocks before disconnecting (or being
1481 // disconnected) and wasting memory.
1482 //
1483 // Additionally, this behavior is depended on by at least the block acceptance test tool as the reference
1484 // implementation processes blocks in the same thread and therefore blocks further messages until the bitcoin block
1485 // has been fully processed.
1486 np.Server.SyncManager.QueueBlock(block, np.Peer, np.BlockProcessed)
1487 <-np.BlockProcessed
1488 }
1489 1490 // OnFeeFilter is invoked when a peer receives a feefilter bitcoin message and is used by remote peers to request that
1491 // no transactions which have a fee rate lower than provided value are inventoried to them. The peer will be
1492 // disconnected if an invalid fee filter value is provided.
1493 func (np *NodePeer) OnFeeFilter(
1494 _ *peer.Peer,
1495 msg *wire.MsgFeeFilter,
1496 ) {
1497 // Chk that the passed minimum fee is a valid amount.
1498 if msg.MinFee < 0 || msg.MinFee > int64(amt.MaxSatoshi) {
1499 D.F(
1500 "peer %v sent an invalid feefilter '%v' -- disconnecting %s",
1501 np, amt.Amount(msg.MinFee),
1502 )
1503 np.Disconnect()
1504 return
1505 }
1506 atomic.StoreInt64(&np.FeeFilter, msg.MinFee)
1507 }
1508 1509 // OnFilterAdd is invoked when a peer receives a filteradd bitcoin message and is used by remote peers to add data to an
1510 // already loaded bloom filter. The peer will be disconnected if a filter is not loaded when this message is received or
1511 // the server is not configured to allow bloom filters.
1512 func (np *NodePeer) OnFilterAdd(
1513 _ *peer.Peer,
1514 msg *wire.MsgFilterAdd,
1515 ) {
1516 // Disconnect and/or ban depending on the node bloom services flag and negotiated protocol version.
1517 if !np.EnforceNodeBloomFlag(msg.Command()) {
1518 return
1519 }
1520 if !np.Filter.IsLoaded() {
1521 D.F("%s sent a filteradd request with no filter loaded -- disconnecting %s", np)
1522 np.Disconnect()
1523 return
1524 }
1525 np.Filter.Add(msg.Data)
1526 }
1527 1528 // OnFilterClear is invoked when a peer receives a filterclear bitcoin message and is used by remote peers to clear an
1529 // already loaded bloom filter. The peer will be disconnected if a filter is not loaded when this message is received or
1530 // the server is not configured to allow bloom filters.
1531 func (np *NodePeer) OnFilterClear(
1532 _ *peer.Peer,
1533 msg *wire.MsgFilterClear,
1534 ) {
1535 // Disconnect and/or ban depending on the node bloom services flag and negotiated protocol version.
1536 if !np.EnforceNodeBloomFlag(msg.Command()) {
1537 return
1538 }
1539 if !np.Filter.IsLoaded() {
1540 D.F(
1541 "%s sent a filterclear request with no filter loaded"+
1542 " -- disconnecting %s", np,
1543 )
1544 np.Disconnect()
1545 return
1546 }
1547 np.Filter.Unload()
1548 }
1549 1550 // OnFilterLoad is invoked when a peer receives a filterload bitcoin message and it used to load a bloom filter that
1551 // should be used for delivering merkle blocks and associated transactions that match the filter. The peer will be
1552 // disconnected if the server is not configured to allow bloom filters.
1553 func (np *NodePeer) OnFilterLoad(
1554 _ *peer.Peer,
1555 msg *wire.MsgFilterLoad,
1556 ) {
1557 // Disconnect and/or ban depending on the node bloom services flag and negotiated protocol version.
1558 if !np.EnforceNodeBloomFlag(msg.Command()) {
1559 return
1560 }
1561 np.SetDisableRelayTx(false)
1562 np.Filter.Reload(msg)
1563 }
1564 1565 // OnGetAddr is invoked when a peer receives a getaddr bitcoin message and is used to provide the peer with known
1566 // addresses from the address manager.
1567 func (np *NodePeer) OnGetAddr(
1568 _ *peer.Peer,
1569 msg *wire.MsgGetAddr,
1570 ) {
1571 // Don't return any addresses when running on the simulation test network. This helps prevent the network from
1572 // becoming another public test network since it will not be able to learn about other peers that have not
1573 // specifically been provided.
1574 if (np.Server.Config.Network.V())[0] == 's' {
1575 return
1576 }
1577 // Do not accept getaddr requests from outbound peers. This reduces fingerprinting attacks.
1578 if !np.Inbound() {
1579 D.Ln("ignoring getaddr request from outbound peer", np)
1580 return
1581 }
1582 // Only allow one getaddr request per connection to discourage address stamping of inv announcements.
1583 if np.SentAddrs {
1584 D.F("ignoring repeated getaddr request from peer %s %s", np)
1585 return
1586 }
1587 np.SentAddrs = true
1588 // Get the current known addresses from the address manager.
1589 addrCache := np.Server.AddrManager.AddressCache()
1590 // Push the addresses.
1591 np.PreparePushAddrMsg(addrCache)
1592 }
1593 1594 // OnGetBlocks is invoked when a peer receives a getblocks bitcoin message.
1595 func (np *NodePeer) OnGetBlocks(
1596 _ *peer.Peer,
1597 msg *wire.MsgGetBlocks,
1598 ) {
1599 // Find the most recent known block in the best chain based on the block locator and fetch all of the block hashes
1600 // after it until either wire.MaxBlocksPerMsg have been fetched or the provided stop hash is encountered. Use the
1601 // block after the genesis block if no other blocks in the provided locator are known.
1602 //
1603 // This does mean the client will start over with the genesis block if unknown block locators are provided. This
1604 // mirrors the behavior in the reference implementation.
1605 chain := np.Server.Chain
1606 hashList := chain.LocateBlocks(
1607 msg.BlockLocatorHashes, &msg.HashStop,
1608 wire.MaxBlocksPerMsg,
1609 )
1610 // Generate inventory message.
1611 invMsg := wire.NewMsgInv()
1612 for i := range hashList {
1613 iv := wire.NewInvVect(wire.InvTypeBlock, &hashList[i])
1614 e := invMsg.AddInvVect(iv)
1615 if e != nil {
1616 }
1617 }
1618 // Send the inventory message if there is anything to send.
1619 if len(invMsg.InvList) > 0 {
1620 invListLen := len(invMsg.InvList)
1621 if invListLen == wire.MaxBlocksPerMsg {
1622 // Intentionally use a copy of the final hash so there is not a reference into the inventory slice which
1623 // would prevent the entire slice from being eligible for GC as soon as it's sent.
1624 continueHash := invMsg.InvList[invListLen-1].Hash
1625 np.ContinueHash = &continueHash
1626 }
1627 np.QueueMessage(invMsg, nil)
1628 }
1629 }
1630 1631 // OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin message.
1632 func (np *NodePeer) OnGetCFCheckpt(
1633 _ *peer.Peer,
1634 msg *wire.MsgGetCFCheckpt,
1635 ) {
1636 // Ignore getcfcheckpt requests if not in sync.
1637 if !np.Server.SyncManager.IsCurrent() {
1638 return
1639 }
1640 // We'll also ensure that the remote party is requesting a set of checkpoints for filters that we actually currently
1641 // maintain.
1642 switch msg.FilterType {
1643 case wire.GCSFilterRegular:
1644 break
1645 default:
1646 D.Ln(
1647 "filter request for unknown checkpoints for filter:",
1648 msg.FilterType,
1649 )
1650 return
1651 }
1652 // Now that we know the client is fetching a filter that we know of, we'll fetch the block hashes et each check
1653 // point interval so we can compare against our cache, and create new check points if necessary.
1654 blockHashes, e := np.Server.Chain.IntervalBlockHashes(
1655 &msg.StopHash, wire.CFCheckptInterval,
1656 )
1657 if e != nil {
1658 E.Ln("invalid getcfilters request:", e)
1659 return
1660 }
1661 checkptMsg := wire.NewMsgCFCheckpt(
1662 msg.FilterType, &msg.StopHash, len(blockHashes),
1663 )
1664 // Fetch the current existing cache so we can decide if we need to extend it or if its adequate as is.
1665 np.Server.CFCheckptCachesMtx.RLock()
1666 checkptCache := np.Server.CFCheckptCaches[msg.FilterType]
1667 // If the set of block hashes is beyond the current size of the cache, then we'll expand the size of the cache and
1668 // also retain the write lock.
1669 var updateCache bool
1670 if len(blockHashes) > len(checkptCache) {
1671 // Now that we know we'll need to modify the size of the cache, we'll release the read lock and grab the write
1672 // lock to possibly expand the cache size.
1673 np.Server.CFCheckptCachesMtx.RUnlock()
1674 np.Server.CFCheckptCachesMtx.Lock()
1675 defer np.Server.CFCheckptCachesMtx.Unlock()
1676 // Now that we have the write lock, we'll check again as it's possible that the cache has already been expanded.
1677 checkptCache = np.Server.CFCheckptCaches[msg.FilterType]
1678 // If we still need to expand the cache, then We'll mark that we need to update the cache for below and also
1679 // expand the size of the cache in place.
1680 if len(blockHashes) > len(checkptCache) {
1681 updateCache = true
1682 additionalLength := len(blockHashes) - len(checkptCache)
1683 newEntries := make([]CFHeaderKV, additionalLength)
1684 I.F(
1685 "growing size of checkpoint cache from %v to %v block hashes",
1686 len(checkptCache), len(blockHashes),
1687 )
1688 checkptCache = append(np.Server.CFCheckptCaches[msg.FilterType], newEntries...)
1689 }
1690 } else {
1691 // Otherwise, we'll hold onto the read lock for the remainder of this method.
1692 defer np.Server.CFCheckptCachesMtx.RUnlock()
1693 T.F("serving stale cache of size %v", len(checkptCache))
1694 }
1695 // Now that we know the cache is of an appropriate size, we'll iterate backwards until the find the block hash. We
1696 // do this as it's possible a re-org has occurred so items in the db are now in the main china while the cache has
1697 // been partially invalidated.
1698 var forkIdx int
1699 for forkIdx = len(blockHashes); forkIdx > 0; forkIdx-- {
1700 if checkptCache[forkIdx-1].BlockHash == blockHashes[forkIdx-1] {
1701 break
1702 }
1703 }
1704 // Now that we know the how much of the cache is relevant for this query, we'll populate our check point message
1705 // with the cache as is. Shortly below, we'll populate the new elements of the cache.
1706 for i := 0; i < forkIdx; i++ {
1707 e = checkptMsg.AddCFHeader(&checkptCache[i].FilterHeader)
1708 if e != nil {
1709 }
1710 }
1711 // We'll now collect the set of hashes that are beyond our cache so we can look up the filter headers to populate
1712 // the final cache.
1713 blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx)
1714 for i := forkIdx; i < len(blockHashes); i++ {
1715 blockHashPtrs = append(blockHashPtrs, &blockHashes[i])
1716 }
1717 filterHeaders, e := np.Server.CFIndex.FilterHeadersByBlockHashes(
1718 blockHashPtrs, msg.FilterType,
1719 )
1720 E.Ln("error retrieving cfilter headers:", e)
1721 if e != nil {
1722 return
1723 }
1724 // Now that we have the full set of filter headers, we'll add them to the checkpoint message, and also update our
1725 // cache in line.
1726 for i, filterHeaderBytes := range filterHeaders {
1727 if len(filterHeaderBytes) == 0 {
1728 W.Ln("could not obtain CF header for", blockHashPtrs[i])
1729 return
1730 }
1731 filterHeader, e := chainhash.NewHash(filterHeaderBytes)
1732 if e != nil {
1733 E.Ln("committed filter header deserialize failed:", e)
1734 return
1735 }
1736 e = checkptMsg.AddCFHeader(filterHeader)
1737 if e != nil {
1738 }
1739 // If the new main chain is longer than what's in the cache, then we'll override it beyond the fork point.
1740 if updateCache {
1741 checkptCache[forkIdx+i] = CFHeaderKV{
1742 BlockHash: blockHashes[forkIdx+i],
1743 FilterHeader: *filterHeader,
1744 }
1745 }
1746 }
1747 // Finally, we'll update the cache if we need to, and send the final message back to the requesting peer.
1748 if updateCache {
1749 np.Server.CFCheckptCaches[msg.FilterType] = checkptCache
1750 }
1751 np.QueueMessage(checkptMsg, nil)
1752 }
1753 1754 // OnGetCFHeaders is invoked when a peer receives a getcfheader bitcoin message.
1755 func (np *NodePeer) OnGetCFHeaders(
1756 _ *peer.Peer,
1757 msg *wire.MsgGetCFHeaders,
1758 ) {
1759 // Ignore getcfilterheader requests if not in sync.
1760 if !np.Server.SyncManager.IsCurrent() {
1761 return
1762 }
1763 // We'll also ensure that the remote party is requesting a set of headers for filters that we actually currently
1764 // maintain.
1765 switch msg.FilterType {
1766 case wire.GCSFilterRegular:
1767 break
1768 default:
1769 D.Ln("filter request for unknown headers for filter:", msg.FilterType)
1770 return
1771 }
1772 startHeight := int32(msg.StartHeight)
1773 maxResults := wire.MaxCFHeadersPerMsg
1774 // If StartHeight is positive, fetch the predecessor block hash so we can populate the PrevFilterHeader field.
1775 if msg.StartHeight > 0 {
1776 startHeight--
1777 maxResults++
1778 }
1779 // Fetch the hashes from the block index.
1780 hashList, e := np.Server.Chain.HeightToHashRange(
1781 startHeight, &msg.StopHash, maxResults,
1782 )
1783 if e != nil {
1784 E.Ln("invalid getcfheaders request:", e)
1785 }
1786 // This is possible if StartHeight is one greater that the height of StopHash, and we pull a valid range of hashes
1787 // including the previous filter header.
1788 if len(hashList) == 0 || (msg.StartHeight > 0 && len(hashList) == 1) {
1789 D.Ln("no results for getcfheaders request")
1790 return
1791 }
1792 // Create []*chainhash.Hash from []chainhash.Hash to pass to FilterHeadersByBlockHashes.
1793 hashPtrs := make([]*chainhash.Hash, len(hashList))
1794 for i := range hashList {
1795 hashPtrs[i] = &hashList[i]
1796 }
1797 // Fetch the raw filter hash bytes from the database for all blocks.
1798 filterHashes, e := np.Server.CFIndex.FilterHashesByBlockHashes(
1799 hashPtrs, msg.FilterType,
1800 )
1801 if e != nil {
1802 E.Ln("error retrieving cfilter hashes:", e)
1803 return
1804 }
1805 // Generate cfheaders message and send it.
1806 headersMsg := wire.NewMsgCFHeaders()
1807 // Populate the PrevFilterHeader field.
1808 if msg.StartHeight > 0 {
1809 prevBlockHash := &hashList[0]
1810 // Fetch the raw committed filter header bytes from the database.
1811 headerBytes, e := np.Server.CFIndex.FilterHeaderByBlockHash(
1812 prevBlockHash, msg.FilterType,
1813 )
1814 if e != nil {
1815 E.Ln("error retrieving CF header:", e)
1816 return
1817 }
1818 if len(headerBytes) == 0 {
1819 W.Ln("could not obtain CF header for", prevBlockHash)
1820 return
1821 }
1822 // Deserialize the hash into PrevFilterHeader.
1823 e = headersMsg.PrevFilterHeader.SetBytes(headerBytes)
1824 if e != nil {
1825 E.Ln("committed filter header deserialize failed:", e)
1826 return
1827 }
1828 hashList = hashList[1:]
1829 filterHashes = filterHashes[1:]
1830 }
1831 // Populate HeaderHashes.
1832 for i, hashBytes := range filterHashes {
1833 if len(hashBytes) == 0 {
1834 W.Ln("could not obtain CF hash for", hashList[i])
1835 return
1836 }
1837 // Deserialize the hash.
1838 filterHash, e := chainhash.NewHash(hashBytes)
1839 if e != nil {
1840 E.Ln("committed filter hash deserialize failed:", e)
1841 return
1842 }
1843 e = headersMsg.AddCFHash(filterHash)
1844 if e != nil {
1845 }
1846 }
1847 headersMsg.FilterType = msg.FilterType
1848 headersMsg.StopHash = msg.StopHash
1849 np.QueueMessage(headersMsg, nil)
1850 }
1851 1852 // OnGetCFilters is invoked when a peer receives a getcfilters bitcoin message.
1853 func (np *NodePeer) OnGetCFilters(
1854 _ *peer.Peer,
1855 msg *wire.MsgGetCFilters,
1856 ) {
1857 // Ignore getcfilters requests if not in sync.
1858 if !np.Server.SyncManager.IsCurrent() {
1859 return
1860 }
1861 // We'll also ensure that the remote party is requesting a set of filters that we actually currently maintain.
1862 switch msg.FilterType {
1863 case wire.GCSFilterRegular:
1864 break
1865 default:
1866 D.Ln("filter request for unknown filter:", msg.FilterType)
1867 return
1868 }
1869 hashes, e := np.Server.Chain.HeightToHashRange(
1870 int32(msg.StartHeight), &msg.StopHash, wire.MaxGetCFiltersReqRange,
1871 )
1872 if e != nil {
1873 E.Ln("invalid getcfilters request:", e)
1874 return
1875 }
1876 // Create []*chainhash.Hash from []chainhash.Hash to pass to FiltersByBlockHashes.
1877 hashPtrs := make([]*chainhash.Hash, len(hashes))
1878 for i := range hashes {
1879 hashPtrs[i] = &hashes[i]
1880 }
1881 filters, e := np.Server.CFIndex.FiltersByBlockHashes(
1882 hashPtrs, msg.FilterType,
1883 )
1884 if e != nil {
1885 E.Ln("error retrieving cfilters:", e)
1886 return
1887 }
1888 for i, filterBytes := range filters {
1889 if len(filterBytes) == 0 {
1890 W.Ln("could not obtain cfilter for", hashes[i])
1891 return
1892 }
1893 filterMsg := wire.NewMsgCFilter(
1894 msg.FilterType, &hashes[i], filterBytes,
1895 )
1896 np.QueueMessage(filterMsg, nil)
1897 }
1898 }
1899 1900 // OnGetData is invoked when a peer receives a getdata bitcoin message and is used to deliver block and transaction
1901 // information.
1902 func (np *NodePeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
1903 numAdded := 0
1904 notFound := wire.NewMsgNotFound()
1905 length := len(msg.InvList)
1906 // A decaying ban score increase is applied to prevent exhausting resources with unusually large inventory queries.
1907 //
1908 // Requesting more than the maximum inventory vector length within a short period of time yields a score above the
1909 // default ban threshold.
1910 //
1911 // Sustained bursts of small requests are not penalized as that would potentially ban peers performing IBD. This
1912 // incremental score decays each minute to half of its value.
1913 if np.AddBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") {
1914 return
1915 }
1916 // We wait on this wait channel periodically to prevent queuing far more data than we can send in a reasonable time,
1917 // wasting memory.
1918 //
1919 // The waiting occurs after the database fetch for the next one to provide a little pipelining.
1920 var waitChan qu.C
1921 doneChan := qu.Ts(1)
1922 for i, iv := range msg.InvList {
1923 var c qu.C
1924 // If this will be the last message we send.
1925 if i == length-1 && len(notFound.InvList) == 0 {
1926 c = doneChan
1927 } else if (i+1)%3 == 0 {
1928 // Buffered so as to not make the send goroutine block.
1929 c = qu.Ts(1)
1930 }
1931 var e error
1932 switch iv.Type {
1933 // case wire.InvTypeWitnessTx:
1934 // e = np.Server.PushTxMsg(
1935 // np, &iv.Hash, c, waitChan,
1936 // wire.WitnessEncoding,
1937 // )
1938 case wire.InvTypeTx:
1939 e = np.Server.PushTxMsg(
1940 np, &iv.Hash, c, waitChan,
1941 wire.BaseEncoding,
1942 )
1943 // case wire.InvTypeWitnessBlock:
1944 // e = np.Server.PushBlockMsg(
1945 // np, &iv.Hash, c, waitChan,
1946 // wire.WitnessEncoding,
1947 // )
1948 case wire.InvTypeBlock:
1949 e = np.Server.PushBlockMsg(
1950 np, &iv.Hash, c, waitChan,
1951 wire.BaseEncoding,
1952 )
1953 // case wire.InvTypeFilteredWitnessBlock:
1954 // e = np.Server.PushMerkleBlockMsg(
1955 // np, &iv.Hash, c, waitChan,
1956 // wire.WitnessEncoding,
1957 // )
1958 case wire.InvTypeFilteredBlock:
1959 e = np.Server.PushMerkleBlockMsg(
1960 np, &iv.Hash, c, waitChan,
1961 wire.BaseEncoding,
1962 )
1963 default:
1964 W.Ln("unknown type in inventory request", iv.Type)
1965 continue
1966 }
1967 if e != nil {
1968 e := notFound.AddInvVect(iv)
1969 if e != nil {
1970 }
1971 // When there is a failure fetching the final entry and the done channel was sent in due to there being no
1972 // outstanding not found inventory, consume it here because there is now not found inventory that will use
1973 // the channel momentarily.
1974 if i == len(msg.InvList)-1 && c != nil {
1975 <-c
1976 }
1977 }
1978 numAdded++
1979 waitChan = c
1980 }
1981 if len(notFound.InvList) != 0 {
1982 np.QueueMessage(notFound, doneChan)
1983 }
1984 // Wait for messages to be sent. We can send quite a lot of data at this point and this will keep the peer busy for
1985 // a decent amount of time.
1986 //
1987 // We don't process anything else by them in this time so that we have an idea of when we should hear back from them
1988 // - else the idle timeout could fire when we were only half done sending the blocks.
1989 if numAdded > 0 {
1990 <-doneChan
1991 }
1992 }
1993 1994 // OnGetHeaders is invoked when a peer receives a getheaders bitcoin message.
1995 func (np *NodePeer) OnGetHeaders(
1996 _ *peer.Peer,
1997 msg *wire.MsgGetHeaders,
1998 ) {
1999 // Ignore getheaders requests if not in sync.
2000 if !np.Server.SyncManager.IsCurrent() {
2001 return
2002 }
2003 // Find the most recent known block in the best chain based on the block locator and fetch all of the headers after
2004 // it until either wire.MaxBlockHeadersPerMsg have been fetched or the provided stop hash is encountered. Use the
2005 // block after the genesis block if no other blocks in the provided locator are known. This does mean the client
2006 // will start over with the genesis block if unknown block locators are provided. This mirrors the behavior in the
2007 // reference implementation.
2008 chain := np.Server.Chain
2009 headers := chain.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop)
2010 // Send found headers to the requesting peer.
2011 blockHeaders := make([]*wire.BlockHeader, len(headers))
2012 for i := range headers {
2013 blockHeaders[i] = &headers[i]
2014 }
2015 np.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil)
2016 }
2017 2018 // OnHeaders is invoked when a peer receives a headers bitcoin message. The message is passed down to the sync manager.
2019 func (np *NodePeer) OnHeaders(
2020 _ *peer.Peer,
2021 msg *wire.MsgHeaders,
2022 ) {
2023 np.Server.SyncManager.QueueHeaders(msg, np.Peer)
2024 }
2025 2026 // OnInv is invoked when a peer receives an inv bitcoin message and is used to examine the inventory being advertised by
2027 // the remote peer and react accordingly. We pass the message down to blockmanager which will call QueueMessage with any
2028 // appropriate responses.
2029 func (np *NodePeer) OnInv(
2030 _ *peer.Peer,
2031 msg *wire.MsgInv,
2032 ) {
2033 if !np.Server.Config.BlocksOnly.True() {
2034 if len(msg.InvList) > 0 {
2035 np.Server.SyncManager.QueueInv(msg, np.Peer)
2036 }
2037 return
2038 }
2039 newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList)))
2040 for _, invVect := range msg.InvList {
2041 if invVect.Type == wire.InvTypeTx {
2042 T.F("ignoring tx %v in inv from %v -- blocksonly enabled", invVect.Hash, np)
2043 if np.ProtocolVersion() >= wire.BIP0037Version {
2044 I.F("peer %v is announcing transactions -- disconnecting", np)
2045 np.Disconnect()
2046 return
2047 }
2048 continue
2049 }
2050 e := newInv.AddInvVect(invVect)
2051 if e != nil {
2052 E.Ln("failed to add inventory vector:", e)
2053 break
2054 }
2055 }
2056 if len(newInv.InvList) > 0 {
2057 np.Server.SyncManager.QueueInv(newInv, np.Peer)
2058 }
2059 }
2060 2061 // OnMemPool is invoked when a peer receives a mempool bitcoin message. It creates and sends an inventory message with
2062 // the contents of the memory pool up to the maximum inventory allowed per message. When the peer has a bloom filter
2063 // loaded, the contents are filtered accordingly.
2064 func (np *NodePeer) OnMemPool(
2065 _ *peer.Peer,
2066 msg *wire.MsgMemPool,
2067 ) {
2068 // Only allow mempool requests if the server has bloom filtering enabled.
2069 if np.Server.Services&wire.SFNodeBloom != wire.SFNodeBloom {
2070 D.Ln(
2071 "peer", np, "sent mempool request with bloom filtering disabled"+
2072 " -- disconnecting",
2073 )
2074 np.Disconnect()
2075 return
2076 }
2077 // A decaying ban score increase is applied to prevent flooding. The ban score accumulates and passes the ban
2078 // threshold if a burst of mempool messages comes from a peer. The score decays each minute to half of its value.
2079 np.AddBanScore(0, 33, "mempool")
2080 // Generate inventory message with the available transactions in the transaction memory pool. Limit it to the max
2081 // allowed inventory per message.
2082 //
2083 // The NewMsgInvSizeHint function automatically limits the passed hint to the maximum allowed, so it's safe to pass
2084 // it without double checking it here.
2085 txMemPool := np.Server.TxMemPool
2086 txDescs := txMemPool.TxDescs()
2087 invMsg := wire.NewMsgInvSizeHint(uint(len(txDescs)))
2088 for _, txDesc := range txDescs {
2089 // Either add all transactions when there is no bloom filter, or only the transactions that match the filter
2090 // when there is one.
2091 if !np.Filter.IsLoaded() || np.Filter.MatchTxAndUpdate(txDesc.Tx) {
2092 iv := wire.NewInvVect(wire.InvTypeTx, txDesc.Tx.Hash())
2093 e := invMsg.AddInvVect(iv)
2094 if e != nil {
2095 }
2096 if len(invMsg.InvList)+1 > wire.MaxInvPerMsg {
2097 break
2098 }
2099 }
2100 }
2101 // Send the inventory message if there is anything to send.
2102 if len(invMsg.InvList) > 0 {
2103 np.QueueMessage(invMsg, nil)
2104 }
2105 }
2106 2107 // OnRead is invoked when a peer receives a message and it is used to update the bytes received by the server.
2108 func (np *NodePeer) OnRead(
2109 _ *peer.Peer,
2110 bytesRead int, msg wire.Message, e error,
2111 ) {
2112 np.Server.AddBytesReceived(uint64(bytesRead))
2113 }
2114 2115 // OnTx is invoked when a peer receives a tx bitcoin message. It blocks until the bitcoin transaction has been fully
2116 // processed. Unlock the block handler this does not serialize all transactions through a single thread transactions
2117 // don't rely on the previous one in a linear fashion like blocks.
2118 func (np *NodePeer) OnTx(
2119 _ *peer.Peer,
2120 msg *wire.MsgTx,
2121 ) {
2122 if np.Server.Config.BlocksOnly.True() {
2123 T.F("ignoring tx %v from %v - blocksonly enabled", msg.TxHash(), np)
2124 return
2125 }
2126 // Add the transaction to the known inventory for the peer. Convert the raw MsgTx to a util.Tx which provides some
2127 // convenience methods and things such as hash caching.
2128 tx := util.NewTx(msg)
2129 iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
2130 np.AddKnownInventory(iv)
2131 // Queue the transaction up to be handled by the sync manager and intentionally block further receives until the
2132 // transaction is fully processed and known good or bad.
2133 //
2134 // This helps prevent a malicious peer from queuing up a bunch of bad transactions before disconnecting (or being
2135 // disconnected) and wasting memory.
2136 np.Server.SyncManager.QueueTx(tx, np.Peer, np.TxProcessed)
2137 <-np.TxProcessed
2138 }
2139 2140 // OnVersion is invoked when a peer receives a version bitcoin message and is used to negotiate the protocol version
2141 // details as well as kick start the communications.
2142 func (np *NodePeer) OnVersion(
2143 _ *peer.Peer,
2144 msg *wire.MsgVersion,
2145 ) *wire.MsgReject {
2146 // Update the address manager with the advertised services for outbound connections in case they have changed. This
2147 // is not done for inbound connections to help prevent malicious behavior and is skipped when running on the
2148 // simulation test network since it is only intended to connect to specified peers and actively avoids advertising
2149 // and connecting to discovered peers.
2150 //
2151 // NOTE: This is done before rejecting peers that are too old to ensure it is updated regardless in the case a new
2152 // minimum protocol version is enforced and the remote node has not upgraded yet.
2153 isInbound := np.Inbound()
2154 remoteAddr := np.NA()
2155 addrManager := np.Server.AddrManager
2156 if !((np.Server.Config.Network.V())[0] == 's') && !isInbound {
2157 addrManager.SetServices(remoteAddr, msg.Services)
2158 }
2159 // Ignore peers that have a protocol version that is too old. The peer negotiation logic will disconnect it after
2160 // this callback returns.
2161 if msg.ProtocolVersion < int32(peer.MinAcceptableProtocolVersion) {
2162 return nil
2163 }
2164 // Reject outbound peers that are not full nodes.
2165 wantServices := wire.SFNodeNetwork
2166 if !isInbound && !GetHasServices(msg.Services, wantServices) {
2167 missingServices := wantServices & ^msg.Services
2168 D.F(
2169 "rejecting peer %s with services %v due to not providing"+
2170 " desired services %v %s", np.Peer, msg.Services, missingServices,
2171 )
2172 reason := fmt.Sprintf(
2173 "required services %#x not offered",
2174 uint64(missingServices),
2175 )
2176 return wire.NewMsgReject(msg.Command(), wire.RejectNonstandard, reason)
2177 }
2178 // Update the address manager and request known addresses from the remote peer
2179 // for outbound connections.
2180 //
2181 // This is skipped when running on the simulation test network since it is only
2182 // intended to connect to specified peers and actively avoids advertising and
2183 // connecting to discovered peers.
2184 if !((np.Server.Config.Network.V())[0] == 's') && !isInbound {
2185 // After soft-fork activation, only make outbound connection to peers if they
2186 // flag that they're segwit enabled.
2187 // chain := np.Server.Chain
2188 // segwitActive, e := chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
2189 // if e != nil {
2190 // // Error("unable to query for segwit soft-fork state:", e)
2191 // return nil
2192 // }
2193 // if segwitActive && !np.IsWitnessEnabled() {
2194 // I.Ln(
2195 // "disconnecting non-segwit peer", np,
2196 // "as it isn't segwit enabled and we need more segwit enabled peers",
2197 // )
2198 // np.Disconnect()
2199 // return nil
2200 // }
2201 // Advertise the local address when the server accepts incoming connections and it believes itself to be close
2202 // to the best known tip.
2203 if !np.Server.Config.DisableListen.True() && np.Server.SyncManager.IsCurrent() {
2204 // Get address that best matches.
2205 lna := addrManager.GetBestLocalAddress(remoteAddr)
2206 if addrmgr.IsRoutable(lna) {
2207 // Filter addresses the peer already knows about.
2208 addresses := []*wire.NetAddress{lna}
2209 np.PreparePushAddrMsg(addresses)
2210 }
2211 }
2212 // Request known addresses if the server address manager needs more and the peer has a protocol version new
2213 // enough to include a timestamp with addresses.
2214 hasTimestamp := np.ProtocolVersion() >= wire.NetAddressTimeVersion
2215 if addrManager.NeedMoreAddresses() && hasTimestamp {
2216 np.QueueMessage(wire.NewMsgGetAddr(), nil)
2217 }
2218 // Mark the address as a known good address.
2219 addrManager.Good(remoteAddr)
2220 }
2221 // Add the remote peer time as a sample for creating an offset against the local clock to keep the network time in
2222 // sync.
2223 np.Server.TimeSource.AddTimeSample(np.Addr(), msg.Timestamp)
2224 // Signal the sync manager this peer is a new sync candidate.
2225 np.Server.SyncManager.NewPeer(np.Peer)
2226 // Choose whether or not to relay transactions before a filter command is received.
2227 np.SetDisableRelayTx(msg.DisableRelayTx)
2228 hn := np.Server.HighestKnown.Load()
2229 if msg.LastBlock >= hn {
2230 np.Server.HighestKnown.Store(msg.LastBlock)
2231 }
2232 // Add valid peer to the server.
2233 np.Server.AddPeer(np)
2234 return nil
2235 }
2236 2237 // OnWrite is invoked when a peer sends a message and it is used to update the bytes sent by the server.
2238 func (np *NodePeer) OnWrite(
2239 _ *peer.Peer, bytesWritten int,
2240 msg wire.Message, e error,
2241 ) {
2242 np.Server.AddBytesSent(uint64(bytesWritten))
2243 }
2244 2245 // AddBanScore increases the persistent and decaying ban score fields by the values passed as parameters. If the
2246 // resulting score exceeds half of the ban threshold, a warning is logged including the reason provided. Further, if the
2247 // score is above the ban threshold, the peer will be banned and disconnected.
2248 func (np *NodePeer) AddBanScore(persistent, transient uint32, reason string) bool {
2249 // No warning is logged and no score is calculated if banning is disabled.
2250 if np.Server.Config.DisableBanning.True() {
2251 return false
2252 }
2253 if np.IsWhitelisted {
2254 D.F("misbehaving whitelisted peer %s: %s %s", np, reason)
2255 return false
2256 }
2257 warnThreshold := np.Server.Config.BanThreshold.V() >> 1
2258 if transient == 0 && persistent == 0 {
2259 // The score is not being increased, but a warning message is still logged if the score is above the warn
2260 // threshold.
2261 score := np.BanScore.Int()
2262 if int(score) > warnThreshold {
2263 W.F("misbehaving peer %s: %s -- ban score is %d, it was not increased this time", np, reason, score)
2264 }
2265 return false
2266 }
2267 score := np.BanScore.Increase(persistent, transient)
2268 if int(score) > warnThreshold {
2269 W.F("misbehaving peer %s: %s -- ban score increased to %d", np, reason, score)
2270 if int(score) > np.Server.Config.BanThreshold.V() {
2271 W.F("misbehaving peer %s -- banning and disconnecting", np)
2272 np.Server.BanPeer(np)
2273 np.Disconnect()
2274 return true
2275 }
2276 }
2277 return false
2278 }
2279 2280 // AddKnownAddresses adds the given addresses to the set of known addresses to the peer to prevent sending duplicate
2281 // addresses.
2282 func (np *NodePeer) AddKnownAddresses(addresses []*wire.NetAddress) {
2283 for _, na := range addresses {
2284 np.KnownAddresses[addrmgr.NetAddressKey(na)] = struct{}{}
2285 }
2286 }
2287 2288 // IsAddressKnown true if the given address is already known to the peer.
2289 func (np *NodePeer) IsAddressKnown(na *wire.NetAddress) bool {
2290 _, exists := np.KnownAddresses[addrmgr.NetAddressKey(na)]
2291 return exists
2292 }
2293 2294 // EnforceNodeBloomFlag disconnects the peer if the server is not configured to allow bloom filters. Additionally, if
2295 // the peer has negotiated to a protocol version that is high enough to observe the bloom filter service support bit, it
2296 // will be banned since it is intentionally violating the protocol.
2297 func (np *NodePeer) EnforceNodeBloomFlag(cmd string) bool {
2298 if np.Server.Services&wire.SFNodeBloom != wire.SFNodeBloom {
2299 // Ban the peer if the protocol version is high enough that the peer is knowingly violating the protocol and
2300 // banning is enabled.
2301 //
2302 // NOTE: Even though the addBanScore function already examines whether or not banning is enabled, it is checked
2303 // here as well to ensure the violation is logged and the peer is disconnected regardless.
2304 if np.ProtocolVersion() >= wire.BIP0111Version &&
2305 !np.Server.Config.DisableBanning.True() {
2306 // Disconnect the peer regardless of whether it was banned.
2307 np.AddBanScore(100, 0, cmd)
2308 np.Disconnect()
2309 return false
2310 }
2311 // Disconnect the peer regardless of protocol version or banning state.
2312 D.F("%s sent an unsupported %s request -- disconnecting %s", np, cmd)
2313 np.Disconnect()
2314 return false
2315 }
2316 return true
2317 }
2318 2319 // GetNewestBlock returns the current best block hash and height using the format required by the configuration for the
2320 // peer package.
2321 func (np *NodePeer) GetNewestBlock() (*chainhash.Hash, int32, error) {
2322 best := np.Server.Chain.BestSnapshot()
2323 return &best.Hash, best.Height, nil
2324 }
2325 2326 // PreparePushAddrMsg sends an addr message to the connected peer using the provided addresses.
2327 func (np *NodePeer) PreparePushAddrMsg(addresses []*wire.NetAddress) {
2328 // Filter addresses already known to the peer.
2329 addrs := make([]*wire.NetAddress, 0, len(addresses))
2330 for _, addr := range addresses {
2331 if !np.IsAddressKnown(addr) {
2332 addrs = append(addrs, addr)
2333 }
2334 }
2335 known, e := np.PushAddrMsg(addrs)
2336 if e != nil {
2337 E.F("can't push address message to %s: %v", np.Peer, e)
2338 np.Disconnect()
2339 return
2340 }
2341 np.AddKnownAddresses(known)
2342 }
2343 2344 // IsRelayTxDisabled returns whether or not relaying of transactions for the given peer is disabled.
2345 //
2346 // It is safe for concurrent access.
2347 func (np *NodePeer) IsRelayTxDisabled() bool {
2348 np.RelayMtx.Lock()
2349 isDisabled := np.DisableRelayTx
2350 np.RelayMtx.Unlock()
2351 return isDisabled
2352 }
2353 2354 // SetDisableRelayTx toggles relaying of transactions for the given peer. It is safe for concurrent access.
2355 func (np *NodePeer) SetDisableRelayTx(disable bool) {
2356 np.RelayMtx.Lock()
2357 np.DisableRelayTx = disable
2358 np.RelayMtx.Unlock()
2359 }
2360 2361 // Len returns the number of checkpoints in the slice. It is part of the sort.Interface implementation.
2362 func (s CheckpointSorter) Len() int { return len(s) }
2363 2364 // Less returns whether the checkpoint with index i should txsort before the
2365 // checkpoint with index j. It is part of the sort.Interface implementation.
2366 func (s CheckpointSorter) Less(i, j int) bool {
2367 return s[i].Height < s[j].
2368 Height
2369 }
2370 2371 // Swap swaps the checkpoints at the passed indices. It is part of the sort.Interface implementation.
2372 func (s CheckpointSorter) Swap(i, j int) {
2373 s[i], s[j] = s[j], s[i]
2374 }
2375 2376 // Network returns the network. This is part of the net.Addr interface.
2377 func (a SimpleAddr) Network() string {
2378 return a.Net
2379 }
2380 2381 // String returns the address. This is part of the net.Addr interface.
2382 func (a SimpleAddr) String() string {
2383 return a.Addr
2384 }
2385 2386 // AddLocalAddress adds an address that this node is listening on to the address manager so that it may be relayed to
2387 // peers.
2388 //
2389 // TODO: having just essentially rewritten this and only just finding it,
2390 // this function needs to be split to separate the address manager from the
2391 // listening address processing, and configuration of listening IP addresses is
2392 // unnecessary (for the controller's unicast elements)
2393 func AddLocalAddress(addrMgr *addrmgr.AddrManager, addr string, services wire.ServiceFlag) (e error) {
2394 host, portStr, e := net.SplitHostPort(addr)
2395 if e != nil {
2396 return e
2397 }
2398 var port uint64
2399 port, e = strconv.ParseUint(portStr, 10, 16)
2400 if e != nil {
2401 return e
2402 }
2403 if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() {
2404 // If bound to unspecified address, advertise all local interfaces
2405 var addrs []net.Addr
2406 addrs, e = net.InterfaceAddrs()
2407 if e != nil {
2408 return e
2409 }
2410 for _, addr := range addrs {
2411 var ifaceIP net.IP
2412 ifaceIP, _, e = net.ParseCIDR(addr.String())
2413 if e != nil {
2414 continue
2415 }
2416 // If bound to 0.0.0.0, do not add IPv6 interfaces and if bound to ::, do not add IPv4 interfaces.
2417 if (ip.To4() == nil) != (ifaceIP.To4() == nil) {
2418 continue
2419 }
2420 netAddr := wire.NewNetAddressIPPort(ifaceIP, uint16(port), services)
2421 e = addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio)
2422 if e != nil {
2423 T.Ln(e)
2424 }
2425 }
2426 } else {
2427 var netAddr *wire.NetAddress
2428 netAddr, e = addrMgr.HostToNetAddress(host, uint16(port), services)
2429 if e != nil {
2430 return e
2431 }
2432 e = addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio)
2433 if e != nil {
2434 }
2435 }
2436 return nil
2437 }
2438 2439 // AddrStringToNetAddr takes an address in the form of 'host:port' and returns a net.Addr which maps to the original
2440 // address with any host names resolved to IP addresses. It also handles tor addresses properly by returning a net.Addr
2441 // that encapsulates the address.
2442 func AddrStringToNetAddr(config *config.Config, stateCfg *active.Config, addr string) (net.Addr, error) {
2443 host, strPort, e := net.SplitHostPort(addr)
2444 if e != nil {
2445 return nil, e
2446 }
2447 port, e := strconv.Atoi(strPort)
2448 if e != nil {
2449 return nil, e
2450 }
2451 // Skip if host is already an IP address.
2452 if ip := net.ParseIP(host); ip != nil {
2453 return &net.TCPAddr{
2454 IP: ip,
2455 Port: port,
2456 },
2457 nil
2458 }
2459 // Tor addresses cannot be resolved to an IP, so just return an onion address instead.
2460 if strings.HasSuffix(host, ".onion") {
2461 if !config.OnionEnabled.True() {
2462 return nil, errors.New("tor has been disabled")
2463 }
2464 return &OnionAddr{Addr: addr}, nil
2465 }
2466 // Attempt to look up an IP address associated with the parsed host.
2467 ips, e := Lookup(stateCfg)(host)
2468 if e != nil {
2469 return nil, e
2470 }
2471 if len(ips) == 0 {
2472 return nil, fmt.Errorf("no addresses found for %s", host)
2473 }
2474 return &net.TCPAddr{
2475 IP: ips[0],
2476 Port: port,
2477 },
2478 nil
2479 }
2480 2481 // DisconnectPeer attempts to drop the connection of a targeted peer in the passed peer list. Targets are identified via
2482 // usage of the passed `compareFunc`, which should return `true` if the passed peer is the target peer.
2483 //
2484 // This function returns true on success and false if the peer is unable to be located. If the peer is found, and the
2485 // passed callback: `whenFound' isn't nil, we call it with the peer as the argument before it is removed from the
2486 // peerList, and is disconnected from the server.
2487 func DisconnectPeer(
2488 peerList map[int32]*NodePeer,
2489 compareFunc func(*NodePeer) bool, whenFound func(*NodePeer),
2490 ) bool {
2491 for addr, nodePeer := range peerList {
2492 if compareFunc(nodePeer) {
2493 if whenFound != nil {
2494 whenFound(nodePeer)
2495 }
2496 // This is ok because we are not continuing to iterate so won't corrupt the loop.
2497 delete(peerList, addr)
2498 nodePeer.Disconnect()
2499 return true
2500 }
2501 }
2502 return false
2503 }
2504 2505 // DynamicTickDuration is a convenience function used to dynamically choose a
2506 // tick duration based on remaining time. It is primarily used during server shutdown to make shutdown warnings more
2507 // frequent as the shutdown time approaches.
2508 func DynamicTickDuration(remaining time.Duration) time.Duration {
2509 switch {
2510 case remaining <= time.Second*5:
2511 return time.Second
2512 case remaining <= time.Second*15:
2513 return time.Second * 5
2514 case remaining <= time.Minute:
2515 return time.Second * 15
2516 case remaining <= time.Minute*5:
2517 return time.Minute
2518 case remaining <= time.Minute*15:
2519 return time.Minute * 5
2520 case remaining <= time.Hour:
2521 return time.Minute * 15
2522 }
2523 return time.Hour
2524 }
2525 2526 // GetHasServices returns whether or not the provided advertised service flags have all of the provided desired service
2527 // flags set.
2528 func GetHasServices(advertised, desired wire.ServiceFlag) bool {
2529 return advertised&desired == desired
2530 }
2531 2532 // InitListeners initializes the configured net listeners and adds any bound addresses to the address manager. Returns
2533 // the listeners and a upnp.NAT interface, which is non-nil if UPnP is in use.
2534 func InitListeners(
2535 config *config.Config, activeNet *chaincfg.Params,
2536 aMgr *addrmgr.AddrManager, listenAddrs []string, services wire.ServiceFlag,
2537 ) (listeners []net.Listener, nat upnp.NAT, e error) {
2538 // Listen for TCP connections at the configured addresses
2539 T.Ln("listenAddrs ", listenAddrs)
2540 var netAddrs []net.Addr
2541 netAddrs, e = ParseListeners(listenAddrs)
2542 if e != nil {
2543 return nil, nil, e
2544 }
2545 T.Ln("netAddrs ", netAddrs)
2546 listeners = make([]net.Listener, 0, len(netAddrs))
2547 for _, addr := range netAddrs {
2548 T.Ln("addr ", addr, " ", addr.Network(), " ", addr.String())
2549 listener, e := net.Listen(addr.Network(), addr.String())
2550 if e != nil {
2551 W.F("can't listen on %s: %v %s", addr, e)
2552 continue
2553 }
2554 listeners = append(listeners, listener)
2555 }
2556 if len(config.ExternalIPs.S()) != 0 {
2557 defaultPort, e := strconv.ParseUint(activeNet.DefaultPort, 10, 16)
2558 if e != nil {
2559 E.F("can not parse default port %s for active chain: %v", activeNet.DefaultPort, e)
2560 return nil, nil, e
2561 }
2562 for _, sip := range config.ExternalIPs.S() {
2563 eport := uint16(defaultPort)
2564 host, portstr, e := net.SplitHostPort(sip)
2565 if e != nil {
2566 // no port, use default.
2567 host = sip
2568 } else {
2569 var port uint64
2570 port, e = strconv.ParseUint(portstr, 10, 16)
2571 if e != nil {
2572 E.F(
2573 "can not parse port from %s for externalip: %v",
2574 sip, e,
2575 )
2576 continue
2577 }
2578 eport = uint16(port)
2579 }
2580 var na *wire.NetAddress
2581 na, e = aMgr.HostToNetAddress(host, eport, services)
2582 if e != nil {
2583 E.F("not adding %s as externalip: %v", sip, e)
2584 continue
2585 }
2586 e = aMgr.AddLocalAddress(na, addrmgr.ManualPrio)
2587 if e != nil {
2588 E.F("skipping specified external IP: %v", e)
2589 }
2590 }
2591 } else {
2592 if config.UPNP.True() {
2593 var e error
2594 nat, e = upnp.Discover()
2595 if e != nil {
2596 E.F("can't discover upnp: %v", e)
2597 }
2598 // nil upnp.nat here is fine, just means no upnp on network.
2599 }
2600 // Add bound addresses to address manager to be advertised to peers.
2601 for _, listener := range listeners {
2602 addr := listener.Addr().String()
2603 e := AddLocalAddress(aMgr, addr, services)
2604 if e != nil {
2605 E.F("skipping bound address %s: %v", addr, e)
2606 }
2607 }
2608 }
2609 return listeners, nat, nil
2610 }
2611 2612 // GetIsWhitelisted returns whether the IP address is included in the whitelisted networks and IPs.
2613 func GetIsWhitelisted(statecfg *active.Config, addr net.Addr) bool {
2614 if len(statecfg.ActiveWhitelists) == 0 {
2615 return false
2616 }
2617 var host string
2618 var e error
2619 host, _, e = net.SplitHostPort(addr.String())
2620 if e != nil {
2621 E.F("unable to SplitHostPort on '%s': %v", addr, e)
2622 return false
2623 }
2624 ip := net.ParseIP(host)
2625 if ip == nil {
2626 W.F("unable to parse IP '%s'", addr)
2627 return false
2628 }
2629 for _, ipnet := range statecfg.ActiveWhitelists {
2630 if ipnet.Contains(ip) {
2631 return true
2632 }
2633 }
2634 return false
2635 }
2636 2637 // MergeCheckpoints returns two slices of checkpoints merged into one slice such that the checkpoints are sorted by
2638 // height.
2639 //
2640 // In the case the additional checkpoints contain a checkpoint with the same height as a checkpoint in the default
2641 // checkpoints, the additional checkpoint will take precedence and overwrite the default one.
2642 func MergeCheckpoints(defaultCheckpoints, additional []chaincfg.Checkpoint) []chaincfg.Checkpoint {
2643 // Create a map of the additional checkpoints to remove duplicates while
2644 // leaving the most recently-specified checkpoint.
2645 extra := make(map[int32]chaincfg.Checkpoint)
2646 for _, checkpoint := range additional {
2647 extra[checkpoint.Height] = checkpoint
2648 }
2649 // Add all default checkpoints that do not have an override in the additional checkpoints.
2650 numDefault := len(defaultCheckpoints)
2651 checkpoints := make([]chaincfg.Checkpoint, 0, numDefault+len(extra))
2652 for _, checkpoint := range defaultCheckpoints {
2653 if _, exists := extra[checkpoint.Height]; !exists {
2654 checkpoints = append(checkpoints, checkpoint)
2655 }
2656 }
2657 // Append the additional checkpoints and return the sorted results.
2658 for _, checkpoint := range extra {
2659 checkpoints = append(checkpoints, checkpoint)
2660 }
2661 sort.Sort(CheckpointSorter(checkpoints))
2662 return checkpoints
2663 }
2664 2665 // NewPeerConfig returns the configuration for the given ServerPeer.
2666 func NewPeerConfig(sp *NodePeer) *peer.Config {
2667 // to work around the lack of a single identifier in the protocol, for dealing
2668 // with testing situations with multiple nodes on one IP address (and there is a
2669 // to-do on this) we generate a random 32 bit value, convert to hex and set it
2670 // as the first of the user agent comments, which we can then use to count
2671 // individual connections properly
2672 return &peer.Config{
2673 Listeners: peer.MessageListeners{
2674 OnVersion: sp.OnVersion,
2675 OnMemPool: sp.OnMemPool,
2676 OnTx: sp.OnTx,
2677 OnBlock: sp.OnBlock,
2678 OnInv: sp.OnInv,
2679 OnHeaders: sp.OnHeaders,
2680 OnGetData: sp.OnGetData,
2681 OnGetBlocks: sp.OnGetBlocks,
2682 OnGetHeaders: sp.OnGetHeaders,
2683 OnGetCFilters: sp.OnGetCFilters,
2684 OnGetCFHeaders: sp.OnGetCFHeaders,
2685 OnGetCFCheckpt: sp.OnGetCFCheckpt,
2686 OnFeeFilter: sp.OnFeeFilter,
2687 OnFilterAdd: sp.OnFilterAdd,
2688 OnFilterClear: sp.OnFilterClear,
2689 OnFilterLoad: sp.OnFilterLoad,
2690 OnGetAddr: sp.OnGetAddr,
2691 OnAddr: sp.OnAddr,
2692 OnRead: sp.OnRead,
2693 OnWrite: sp.OnWrite,
2694 // Note: The reference client currently bans peers that send alerts not signed with its key. We could verify
2695 // against their key, but since the reference client is currently unwilling to support other
2696 // implementations' alert messages, we will not relay theirs.
2697 OnAlert: nil,
2698 },
2699 NewestBlock: sp.GetNewestBlock,
2700 HostToNetAddress: sp.Server.AddrManager.HostToNetAddress,
2701 Proxy: sp.Server.Config.ProxyAddress.V(),
2702 UserAgentName: UserAgentName,
2703 UserAgentVersion: UserAgentVersion,
2704 UserAgentComments: sp.Server.Config.UserAgentComments.S(),
2705 ChainParams: sp.Server.ChainParams,
2706 Services: sp.Server.Services,
2707 DisableRelayTx: sp.Server.Config.BlocksOnly.True(),
2708 ProtocolVersion: peer.MaxProtocolVersion,
2709 TrickleInterval: sp.Server.Config.TrickleInterval.V(),
2710 IP: sp.IP,
2711 Port: sp.Port,
2712 }
2713 }
2714 2715 type Context struct {
2716 // Config is the pod all-in-one server config
2717 Config *config.Config
2718 // StateCfg is a reference to the main node state configuration struct
2719 StateCfg *active.Config
2720 // ActiveNet is the active net parameters
2721 ActiveNet *chaincfg.Params
2722 // Hashrate is the hash counter
2723 Hashrate uberatomic.Uint64
2724 }
2725 2726 // NewNode returns a new pod server configured to listen on addr for the bitcoin network type specified by chainParams.
2727 // Use start to begin accepting connections from peers.
2728 //
2729 // TODO: simplify/modularise this
2730 func NewNode(listenAddrs []string, db database.DB, interruptChan qu.C, cx *Context, mempoolUpdateHook func()) (
2731 *Node,
2732 error,
2733 ) {
2734 D.Ln("listenAddrs ", listenAddrs)
2735 services := DefaultServices
2736 if cx.Config.NoPeerBloomFilters.True() {
2737 services &^= wire.SFNodeBloom
2738 }
2739 if cx.Config.NoCFilters.True() {
2740 services &^= wire.SFNodeCF
2741 }
2742 aMgr := addrmgr.New(cx.Config.DataDir.V()+string(os.PathSeparator)+cx.ActiveNet.Name, Lookup(cx.StateCfg))
2743 var lstn []net.Listener
2744 var nat upnp.NAT
2745 if cx.Config.DisableListen.False() {
2746 var e error
2747 if lstn, nat, e = InitListeners(cx.Config, cx.ActiveNet, aMgr, listenAddrs, services); E.Chk(e) {
2748 return nil, e
2749 }
2750 if len(lstn) == 0 {
2751 return nil, errors.New("no valid listen address")
2752 }
2753 }
2754 nThreads := runtime.NumCPU()
2755 var thr int
2756 if cx.Config.GenThreads.V() == -1 || thr > nThreads {
2757 thr = nThreads
2758 } else {
2759 thr = cx.Config.GenThreads.V()
2760 }
2761 T.Ln("set genthreads to ", thr)
2762 s := Node{
2763 ChainParams: cx.ActiveNet,
2764 AddrManager: aMgr,
2765 NewPeers: make(chan *NodePeer, cx.Config.MaxPeers.V()),
2766 DonePeers: make(chan *NodePeer, cx.Config.MaxPeers.V()),
2767 BanPeers: make(chan *NodePeer, cx.Config.MaxPeers.V()),
2768 PeerState: make(chan chan peersummary.PeerSummaries, 1),
2769 Query: make(chan interface{}),
2770 RelayInv: make(chan RelayMsg, cx.Config.MaxPeers.V()),
2771 Broadcast: make(chan BroadcastMsg, cx.Config.MaxPeers.V()),
2772 Quit: qu.T(),
2773 ModifyRebroadcastInv: make(chan interface{}),
2774 PeerHeightsUpdate: make(chan UpdatePeerHeightsMsg),
2775 NAT: nat,
2776 DB: db,
2777 TimeSource: blockchain.NewMedianTime(),
2778 Services: services,
2779 SigCache: txscript.NewSigCache(uint(cx.Config.SigCacheMaxSize.V())),
2780 HashCache: txscript.NewHashCache(uint(cx.Config.SigCacheMaxSize.V())),
2781 CFCheckptCaches: make(map[wire.FilterType][]CFHeaderKV),
2782 GenThreads: uint32(thr),
2783 Config: cx.Config,
2784 StateCfg: cx.StateCfg,
2785 ActiveNet: cx.ActiveNet,
2786 StartController: qu.Ts(2),
2787 StopController: qu.Ts(2),
2788 }
2789 // Create the transaction and address indexes if needed.
2790 //
2791 // CAUTION: the txindex needs to be first in the indexes array because the addrindex uses data from the txindex
2792 // during catchup.
2793 //
2794 // If the addrindex is run first, it may not have the transactions from the current block indexed.
2795 var indexes []indexers.Indexer
2796 D.Ln("txindex", cx.Config.TxIndex.True(), "addrindex", cx.Config.AddrIndex.True())
2797 if cx.Config.TxIndex.True() || cx.Config.AddrIndex.True() {
2798 // Enable transaction index if address index is enabled since it requires it.
2799 if !cx.Config.TxIndex.True() {
2800 I.Ln("transaction index enabled because it is required by the address index")
2801 cx.Config.TxIndex.T()
2802 } else {
2803 I.Ln("transaction index is enabled")
2804 }
2805 s.TxIndex = indexers.NewTxIndex(db)
2806 indexes = append(indexes, s.TxIndex)
2807 }
2808 if cx.Config.AddrIndex.True() {
2809 I.Ln("address index is enabled")
2810 s.AddrIndex = indexers.NewAddrIndex(db, cx.ActiveNet)
2811 indexes = append(indexes, s.AddrIndex)
2812 }
2813 if !cx.Config.NoCFilters.True() {
2814 T.Ln("committed filter index is enabled")
2815 s.CFIndex = indexers.NewCfIndex(db, cx.ActiveNet)
2816 indexes = append(indexes, s.CFIndex)
2817 }
2818 // Create an index manager if any of the optional indexes are enabled.
2819 var indexManager blockchain.IndexManager
2820 if len(indexes) > 0 {
2821 indexManager = indexers.NewManager(db, indexes)
2822 }
2823 // Merge given checkpoints with the default ones unless they are disabled.
2824 var checkpoints []chaincfg.Checkpoint
2825 if !cx.Config.DisableCheckpoints.True() {
2826 checkpoints = MergeCheckpoints(
2827 s.ChainParams.Checkpoints, cx.StateCfg.AddedCheckpoints,
2828 )
2829 }
2830 // Create a new block chain instance with the appropriate configuration.
2831 var e error
2832 s.Chain, e = blockchain.New(
2833 &blockchain.Config{
2834 DB: s.DB,
2835 Interrupt: interruptChan,
2836 ChainParams: s.ChainParams,
2837 Checkpoints: checkpoints,
2838 TimeSource: s.TimeSource,
2839 SigCache: s.SigCache,
2840 IndexManager: indexManager,
2841 HashCache: s.HashCache,
2842 },
2843 )
2844 if e != nil {
2845 return nil, e
2846 }
2847 s.Chain.DifficultyAdjustments = make(map[string]float64)
2848 s.Chain.DifficultyBits.Store(make(blockchain.Diffs))
2849 // Search for a FeeEstimator state in the database. If none can be found or if it cannot be loaded, create a new
2850 // one.
2851 e = db.Update(
2852 func(tx database.Tx) (e error) {
2853 metadata := tx.Metadata()
2854 feeEstimationData := metadata.Get(mempool.EstimateFeeDatabaseKey)
2855 if feeEstimationData != nil {
2856 // delete it from the database so that we don't try to restore the same thing again somehow.
2857 e = metadata.Delete(mempool.EstimateFeeDatabaseKey)
2858 if e != nil {
2859 return e
2860 }
2861 // If there is an error, log it and make a new fee estimator.
2862 var e error
2863 s.FeeEstimator, e = mempool.RestoreFeeEstimator(feeEstimationData)
2864 if e != nil {
2865 return fmt.Errorf("failed to restore fee estimator %v", e)
2866 }
2867 }
2868 return nil
2869 },
2870 )
2871 if e != nil {
2872 E.Ln(e)
2873 }
2874 // If no feeEstimator has been found, or if the one that has been found is behind somehow, create a new one and
2875 // start over.
2876 if s.FeeEstimator == nil || s.FeeEstimator.LastKnownHeight() != s.Chain.
2877 BestSnapshot().Height {
2878 s.FeeEstimator = mempool.NewFeeEstimator(
2879 mempool.DefaultEstimateFeeMaxRollback,
2880 mempool.DefaultEstimateFeeMinRegisteredBlocks,
2881 )
2882 }
2883 txC := mempool.Config{
2884 Policy: mempool.Policy{
2885 DisableRelayPriority: cx.Config.NoRelayPriority.True(),
2886 AcceptNonStd: cx.Config.RelayNonStd.True(),
2887 FreeTxRelayLimit: cx.Config.FreeTxRelayLimit.V(),
2888 MaxOrphanTxs: cx.Config.MaxOrphanTxs.V(),
2889 MaxOrphanTxSize: DefaultMaxOrphanTxSize,
2890 MaxSigOpCostPerTx: blockchain.MaxBlockSigOpsCost / 4,
2891 MinRelayTxFee: cx.StateCfg.ActiveMinRelayTxFee,
2892 MaxTxVersion: 2,
2893 },
2894 ChainParams: cx.ActiveNet,
2895 FetchUtxoView: s.Chain.FetchUtxoView,
2896 BestHeight: func() int32 {
2897 return s.Chain.BestSnapshot().Height
2898 },
2899 MedianTimePast: func() time.Time {
2900 return s.Chain.BestSnapshot().MedianTime
2901 },
2902 // CalcSequenceLock: func(tx *util.Tx, view *blockchain.UtxoViewpoint) (
2903 // *blockchain.SequenceLock, error,
2904 // ) {
2905 // return s.Chain.CalcSequenceLock(tx, view, true)
2906 // },
2907 // IsDeploymentActive: s.Chain.IsDeploymentActive,
2908 SigCache: s.SigCache,
2909 HashCache: s.HashCache,
2910 AddrIndex: s.AddrIndex,
2911 FeeEstimator: s.FeeEstimator,
2912 UpdateHook: mempoolUpdateHook,
2913 }
2914 s.TxMemPool = mempool.New(&txC)
2915 s.SyncManager, e =
2916 netsync.New(
2917 &netsync.Config{
2918 PeerNotifier: &s,
2919 Chain: s.Chain,
2920 TxMemPool: s.TxMemPool,
2921 ChainParams: s.ChainParams,
2922 DisableCheckpoints: cx.Config.DisableCheckpoints.True(),
2923 MaxPeers: cx.Config.MaxPeers.V(),
2924 FeeEstimator: s.FeeEstimator,
2925 },
2926 )
2927 if e != nil {
2928 return nil, e
2929 }
2930 // // Create the mining policy and block template generator based on the
2931 // // configuration options.
2932 // // NOTE: The CPU miner relies on the mempool, so the mempool has to be
2933 // // created before calling the function to create the CPU miner.
2934 // policy := mining.Policy{
2935 // BlockMinWeight: uint32(*config.BlockMinWeight),
2936 // BlockMaxWeight: uint32(*config.BlockMaxWeight),
2937 // BlockMinSize: uint32(*config.BlockMinSize),
2938 // BlockMaxSize: uint32(*config.BlockMaxSize),
2939 // BlockPrioritySize: uint32(*config.BlockPrioritySize),
2940 // TxMinFreeFee: stateCfg.ActiveMinRelayTxFee,
2941 // }
2942 // blockTemplateGenerator := mining.NewBlkTmplGenerator(&policy,
2943 // s.ChainParams, s.TxMemPool, s.Chain, s.TimeSource,
2944 // s.SigCache, s.HashCache, s.Algo)
2945 // s.CPUMiner = cpuminer.New(&cpuminer.Config{
2946 // Blockchain: s.Chain,
2947 // ChainParams: chainParams,
2948 // BlockTemplateGenerator: blockTemplateGenerator,
2949 // MiningAddrs: stateCfg.ActiveMiningAddrs,
2950 // ProcessBlock: s.SyncManager.ProcessBlock,
2951 // ConnectedCount: s.ConnectedCount,
2952 // IsCurrent: s.SyncManager.IsCurrent,
2953 // NumThreads: s.GenThreads,
2954 // Algo: s.Algo,
2955 // Solo: *config.Solo,
2956 // })
2957 // Only setup a function to return new addresses to connect to when
2958 // not running in connect-only mode. The simulation network is always
2959 // in connect-only mode since it is only intended to connect to
2960 // specified peers and actively avoid advertising and connecting to
2961 // discovered peers in order to prevent it from becoming a public test
2962 // network.
2963 var newAddressFunc func() (net.Addr, error)
2964 if !((cx.Config.Network.V())[0] == 's') && len(cx.Config.ConnectPeers.S()) == 0 {
2965 newAddressFunc = func() (net.Addr, error) {
2966 for tries := 0; tries < 100; tries++ {
2967 addr := s.AddrManager.GetAddress()
2968 if addr == nil {
2969 break
2970 }
2971 // Address will not be invalid, local or unrouteable because addrmanager rejects those on addition.
2972 //
2973 // Just check that we don't already have an address in the same group so that we are not connecting to
2974 // the same network segment at the expense of others.
2975 key := addrmgr.GroupKey(addr.NetAddress())
2976 if s.OutboundGroupCount(key) != 0 {
2977 continue
2978 }
2979 // only allow recent nodes (10 min) after we failed 30 times
2980 if tries < 30 && time.Since(addr.LastAttempt()) < 10*time.Minute {
2981 continue
2982 }
2983 // allow non default ports after 50 failed tries.
2984 if tries < 50 && fmt.Sprintf("%d", addr.NetAddress().Port) !=
2985 cx.ActiveNet.DefaultPort {
2986 continue
2987 }
2988 addrString := addrmgr.NetAddressKey(addr.NetAddress())
2989 return AddrStringToNetAddr(cx.Config, cx.StateCfg, addrString)
2990 }
2991 return nil, errors.New("no valid connect address")
2992 }
2993 }
2994 // Create a connection manager.
2995 targetOutbound := DefaultTargetOutbound
2996 if cx.Config.MaxPeers.V() < targetOutbound {
2997 targetOutbound = cx.Config.MaxPeers.V()
2998 }
2999 cMgr, e :=
3000 connmgr.New(
3001 &connmgr.Config{
3002 Listeners: lstn,
3003 OnAccept: s.InboundPeerConnected,
3004 RetryDuration: ConnectionRetryInterval,
3005 TargetOutbound: uint32(targetOutbound),
3006 Dial: Dial(cx.StateCfg),
3007 OnConnection: s.OutboundPeerConnected,
3008 GetNewAddress: newAddressFunc,
3009 },
3010 )
3011 if e != nil {
3012 return nil, e
3013 }
3014 s.ConnManager = cMgr
3015 // Start up persistent peers.
3016 permanentPeers := cx.Config.ConnectPeers.S()
3017 if len(permanentPeers) == 0 {
3018 permanentPeers = cx.Config.AddPeers.S()
3019 }
3020 for _, addr := range permanentPeers {
3021 netAddr, e := AddrStringToNetAddr(cx.Config, cx.StateCfg, addr)
3022 if e != nil {
3023 return nil, e
3024 }
3025 go s.ConnManager.Connect(
3026 &connmgr.ConnReq{
3027 Addr: netAddr,
3028 Permanent: true,
3029 },
3030 )
3031 }
3032 if cx.Config.DisableRPC.False() {
3033 // Setup listeners for the configured RPC listen addresses and TLS settings.
3034 listeners := map[string][]string{
3035 fork.SHA256d: cx.Config.RPCListeners.S(),
3036 }
3037 for l := range listeners {
3038 rpcListeners, e := SetupRPCListeners(cx.Config, listeners[l])
3039 if e != nil {
3040 return nil, e
3041 }
3042 if len(rpcListeners) == 0 {
3043 return nil, errors.New("RPCS: No valid listen address")
3044 }
3045 rp, e := NewRPCServer(
3046 &ServerConfig{
3047 Listeners: rpcListeners,
3048 StartupTime: s.StartupTime,
3049 ConnMgr: &ConnManager{&s},
3050 SyncMgr: &SyncManager{&s, s.SyncManager},
3051 TimeSource: s.TimeSource,
3052 Chain: s.Chain,
3053 ChainParams: cx.ActiveNet,
3054 DB: db,
3055 TxMemPool: s.TxMemPool,
3056 // Generator: blockTemplateGenerator,
3057 // CPUMiner: s.CPUMiner,
3058 TxIndex: s.TxIndex,
3059 AddrIndex: s.AddrIndex,
3060 CfIndex: s.CFIndex,
3061 FeeEstimator: s.FeeEstimator,
3062 Algo: l,
3063 Hashrate: cx.Hashrate,
3064 Quit: s.Quit,
3065 StartController: s.StartController,
3066 StopController: s.StopController,
3067 }, cx.StateCfg, cx.Config,
3068 )
3069 if e != nil {
3070 return nil, e
3071 }
3072 s.RPCServers = append(s.RPCServers, rp)
3073 }
3074 // Signal process shutdown when the RPC server requests it.
3075 go func() {
3076 s.Quit.Wait()
3077 for i := range s.RPCServers {
3078 s.RPCServers[i].Quit.Q()
3079 }
3080 }()
3081 go func() {
3082 for i := range s.RPCServers {
3083 <-s.RPCServers[i].RequestedProcessShutdown()
3084 }
3085 interrupt.Request()
3086 }()
3087 }
3088 return &s, nil
3089 }
3090 3091 // NewServerPeer returns a new ServerPeer instance. The peer needs to be set by the caller.
3092 //
3093 // note that peers that give different addresses to the sender address are
3094 // disconnected to stop spoofing to disrupt other connections with the fake IP.
3095 // Therefore upnp external address must be found and otherwise zero for proxy,
3096 // empty or explicitly disabled. further external inbound ports as possible to
3097 // configure in externalIPs are left as an exercise for the reader, since there
3098 // can be more than one and the message only anyway allows one IP on both sides,
3099 // so anyhow external ip's also have to send different messages so what are they
3100 // for anyway (spoofing?) - but seriously, todo: remove external ips
3101 func NewServerPeer(s *Node, localIP net.IP, isPersistent bool) *NodePeer {
3102 var e error
3103 var host, port string
3104 var ipa net.IP
3105 var p uint64
3106 if s.Config.P2PConnect.V() != nil ||
3107 len(s.Config.P2PConnect.S()) < 1 ||
3108 s.Config.DisableListen.True() ||
3109 s.Config.ProxyAddress.V() != "" ||
3110 s.Config.OnionProxyAddress.V() != "" {
3111 // return an empty IP address if we are not listening (this also is done on
3112 // proxy connections to not leak info)
3113 } else {
3114 myAddress := (s.Config.P2PConnect.S())[0]
3115 if host, port, e = net.SplitHostPort(myAddress); E.Chk(e) {
3116 }
3117 if p, e = strconv.ParseUint(port, 10, 16); E.Chk(e) {
3118 }
3119 // use the given UPNP external address if in use so version message matches
3120 // sender
3121 if s.Config.UPNP.True() {
3122 var exip net.IP
3123 if exip, e = s.NAT.GetExternalAddress(); E.Chk(e) {
3124 } else {
3125 ipa = exip
3126 }
3127 } else {
3128 ipa = net.ParseIP(host)
3129 }
3130 // if the return interface address is given...
3131 if !localIP.Equal(net.IP{0, 0, 0, 0}) {
3132 ipa = localIP
3133 }
3134 I.Ln(ipa, p)
3135 }
3136 return &NodePeer{
3137 Server: s,
3138 Persistent: isPersistent,
3139 Filter: bloom.LoadFilter(nil),
3140 KnownAddresses: make(map[string]struct{}),
3141 Quit: qu.T(),
3142 TxProcessed: qu.Ts(1),
3143 BlockProcessed: qu.Ts(1),
3144 IP: ipa,
3145 Port: uint16(p),
3146 }
3147 }
3148 3149 // ParseListeners determines whether each listen address is IPv4 and IPv6 and returns a slice of appropriate net.Addrs
3150 // to listen on with TCP.
3151 //
3152 // It also properly detects addresses which apply to "all interfaces" and adds the address as both IPv4 and IPv6.
3153 func ParseListeners(addrs []string) ([]net.Addr, error) {
3154 netAddrs := make([]net.Addr, 0, len(addrs)*2)
3155 for _, addr := range addrs {
3156 var host string
3157 var e error
3158 host, _, e = net.SplitHostPort(addr)
3159 if e != nil {
3160 // Shouldn't happen due to already being normalized.
3161 return nil, e
3162 }
3163 // Empty host or host of * on plan9 is both IPv4 and IPv6.
3164 if host == "" || (host == "*" && runtime.GOOS == "plan9") {
3165 netAddrs = append(netAddrs, SimpleAddr{Net: "tcp4", Addr: addr})
3166 netAddrs = append(netAddrs, SimpleAddr{Net: "tcp6", Addr: addr})
3167 continue
3168 }
3169 // Strip IPv6 zone id if present since net.ParseIP does not handle it.
3170 zoneIndex := strings.LastIndex(host, "%")
3171 if zoneIndex > 0 {
3172 host = host[:zoneIndex]
3173 }
3174 // Parse the IP.
3175 ip := net.ParseIP(host)
3176 if ip == nil {
3177 return nil, fmt.Errorf("'%s' is not a valid IP address", host)
3178 }
3179 // To4 returns nil when the IP is not an IPv4 address, so use this determine the
3180 // address type.
3181 if ip.To4() == nil {
3182 netAddrs = append(netAddrs, SimpleAddr{Net: "tcp6", Addr: addr})
3183 } else {
3184 netAddrs = append(netAddrs, SimpleAddr{Net: "tcp4", Addr: addr})
3185 }
3186 }
3187 return netAddrs, nil
3188 }
3189 3190 // RandomUint16Number returns a random uint16 in a specified input range. Note
3191 // that the range is in zeroth ordering; if you pass it 1800, you will get
3192 // values from 0 to 1800.
3193 func RandomUint16Number(max uint16) uint16 {
3194 // In order to avoid modulo bias and ensure every possible outcome in [0, max)
3195 // has equal probability, the random number must be sampled from a random source
3196 // that has a range limited to a multiple of the modulus.
3197 var randomNumber uint16
3198 var limitRange = (math.MaxUint16 / max) * max
3199 for {
3200 e := binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
3201 if e != nil {
3202 }
3203 if randomNumber < limitRange {
3204 return randomNumber % max
3205 }
3206 }
3207 }
3208 3209 // SetupRPCListeners returns a slice of listeners that are configured for use with the RPC server depending on the
3210 // configuration settings for listen addresses and TLS.
3211 func SetupRPCListeners(config *config.Config, urls []string) ([]net.Listener, error) {
3212 // Setup TLS if not disabled.
3213 listenFunc := net.Listen
3214 if config.ServerTLS.True() {
3215 // Generate the TLS cert and key file if both don't already exist.
3216 if !FileExists(config.RPCKey.V()) && !FileExists(config.RPCCert.V()) {
3217 e := GenCertPair(config.RPCCert.V(), config.RPCKey.V())
3218 if e != nil {
3219 return nil, e
3220 }
3221 }
3222 keyPair, e := tls.LoadX509KeyPair(config.RPCCert.V(), config.RPCKey.V())
3223 if e != nil {
3224 return nil, e
3225 }
3226 tlsConfig := tls.Config{
3227 Certificates: []tls.Certificate{keyPair},
3228 MinVersion: tls.VersionTLS12,
3229 InsecureSkipVerify: config.TLSSkipVerify.True(),
3230 }
3231 // Change the standard net.Listen function to the tls one.
3232 listenFunc = func(net string, laddr string) (net.Listener, error) {
3233 return tls.Listen(net, laddr, &tlsConfig)
3234 }
3235 }
3236 netAddrs, e := ParseListeners(urls)
3237 if e != nil {
3238 return nil, e
3239 }
3240 listeners := make([]net.Listener, 0, len(netAddrs))
3241 for _, addr := range netAddrs {
3242 listener, e := listenFunc(addr.Network(), addr.String())
3243 if e != nil {
3244 E.F("can't listen on %s: %v", addr, e)
3245 continue
3246 }
3247 listeners = append(listeners, listener)
3248 }
3249 return listeners, nil
3250 }
3251 3252 // FileExists reports whether the named file or directory exists.
3253 func FileExists(name string) bool {
3254 if _, e := os.Stat(name); E.Chk(e) {
3255 if os.IsNotExist(e) {
3256 return false
3257 }
3258 }
3259 return true
3260 }
3261 3262 func GetBlkTemplateGenerator(node *Node, cfg *config.Config, stateCfg *active.Config) *mining.BlkTmplGenerator {
3263 D.Ln("getting a block template generator")
3264 return mining.NewBlkTmplGenerator(
3265 &mining.Policy{
3266 BlockMinWeight: uint32(cfg.BlockMinWeight.V()),
3267 BlockMaxWeight: uint32(cfg.BlockMaxWeight.V()),
3268 BlockMinSize: uint32(cfg.BlockMinSize.V()),
3269 BlockMaxSize: uint32(cfg.BlockMaxSize.V()),
3270 BlockPrioritySize: uint32(cfg.BlockPrioritySize.V()),
3271 TxMinFreeFee: stateCfg.ActiveMinRelayTxFee,
3272 },
3273 node.ChainParams,
3274 node.TxMemPool,
3275 node.Chain,
3276 node.TimeSource,
3277 node.SigCache,
3278 node.HashCache,
3279 )
3280 }
3281