1 package netsync
2 3 import (
4 "container/list"
5 "fmt"
6 "net"
7 "sync"
8 "sync/atomic"
9 "time"
10 11 block2 "github.com/p9c/p9/pkg/block"
12 13 "github.com/p9c/p9/pkg/qu"
14 15 "github.com/p9c/p9/pkg/blockchain"
16 "github.com/p9c/p9/pkg/chaincfg"
17 "github.com/p9c/p9/pkg/chainhash"
18 "github.com/p9c/p9/pkg/database"
19 "github.com/p9c/p9/pkg/mempool"
20 peerpkg "github.com/p9c/p9/pkg/peer"
21 "github.com/p9c/p9/pkg/util"
22 "github.com/p9c/p9/pkg/wire"
23 )
24 25 type (
26 // SyncManager is used to communicate block related messages with peers. The
27 // SyncManager is started as by executing Start() in a goroutine. Once started,
28 // it selects peers to sync from and starts the initial block download. Once the
29 // chain is in sync, the SyncManager handles incoming block and header
30 // notifications and relays announcements of new blocks to peers.
31 SyncManager struct {
32 peerNotifier PeerNotifier
33 started int32
34 shutdown int32
35 chain *blockchain.BlockChain
36 txMemPool *mempool.TxPool
37 chainParams *chaincfg.Params
38 progressLogger *blockProgressLogger
39 msgChan chan interface{}
40 wg sync.WaitGroup
41 quit qu.C
42 // These fields should only be accessed from the blockHandler thread
43 rejectedTxns map[chainhash.Hash]struct{}
44 requestedTxns map[chainhash.Hash]struct{}
45 requestedBlocks map[chainhash.Hash]struct{}
46 syncPeer *peerpkg.Peer
47 peerStates map[*peerpkg.Peer]*peerSyncState
48 // The following fields are used for headers-first mode.
49 headersFirstMode bool
50 headerList *list.List
51 startHeader *list.Element
52 nextCheckpoint *chaincfg.Checkpoint
53 // An optional fee estimator.
54 feeEstimator *mempool.FeeEstimator
55 }
56 // blockMsg packages a bitcoin block message and the peer it came from together
57 // so the block handler has access to that information.
58 blockMsg struct {
59 block *block2.Block
60 peer *peerpkg.Peer
61 reply qu.C
62 }
63 // donePeerMsg signifies a newly disconnected peer to the block handler.
64 donePeerMsg struct {
65 peer *peerpkg.Peer
66 }
67 // getSyncPeerMsg is a message type to be sent across the message channel for
68 // retrieving the current sync peer.
69 getSyncPeerMsg struct {
70 reply chan int32
71 }
72 // headerNode is used as a node in a list of headers that are linked together
73 // between checkpoints.
74 headerNode struct {
75 height int32
76 hash *chainhash.Hash
77 }
78 // headersMsg packages a bitcoin headers message and the peer it came from
79 // together so the block handler has access to that information.
80 headersMsg struct {
81 headers *wire.MsgHeaders
82 peer *peerpkg.Peer
83 }
84 // invMsg packages a bitcoin inv message and the peer it came from together so
85 // the block handler has access to that information.
86 invMsg struct {
87 inv *wire.MsgInv
88 peer *peerpkg.Peer
89 }
90 // isCurrentMsg is a message type to be sent across the message channel for
91 // requesting whether or not the sync manager believes it is synced with the
92 // currently connected peers.
93 isCurrentMsg struct {
94 reply chan bool
95 }
96 // newPeerMsg signifies a newly connected peer to the block handler.
97 newPeerMsg struct {
98 peer *peerpkg.Peer
99 }
100 // pauseMsg is a message type to be sent across the message channel for pausing
101 // the sync manager. This effectively provides the caller with exclusive access
102 // over the manager until a receive is performed on the unpause channel.
103 pauseMsg struct {
104 unpause qu.C
105 }
106 // peerSyncState stores additional information that the SyncManager tracks about
107 // a peer.
108 peerSyncState struct {
109 syncCandidate bool
110 requestQueue []*wire.InvVect
111 requestedTxns map[chainhash.Hash]struct{}
112 requestedBlocks map[chainhash.Hash]struct{}
113 }
114 // processBlockMsg is a message type to be sent across the message channel for
115 // requested a block is processed. Note this call differs from blockMsg above in
116 // that blockMsg is intended for blocks that came from peers and have extra
117 // handling whereas this message essentially is just a concurrent safe way to
118 // call ProcessBlock on the internal block chain instance.
119 processBlockMsg struct {
120 block *block2.Block
121 flags blockchain.BehaviorFlags
122 reply chan processBlockResponse
123 }
124 // processBlockResponse is a response sent to the reply channel of a processBlockMsg.
125 processBlockResponse struct {
126 isOrphan bool
127 err error
128 }
129 // txMsg packages a bitcoin tx message and the peer it came from together so the
130 // block handler has access to that information.
131 txMsg struct {
132 tx *util.Tx
133 peer *peerpkg.Peer
134 reply qu.C
135 }
136 )
137 138 const (
139 // minInFlightBlocks is the minimum number of blocks that should be in the
140 // request queue for headers-first mode before requesting more.
141 minInFlightBlocks = 10
142 // maxRejectedTxns is the maximum number of rejected transactions hashes to
143 // store in memory.
144 maxRejectedTxns = 1000
145 // maxRequestedBlocks is the maximum number of requested block hashes to store
146 // in memory.
147 maxRequestedBlocks = wire.MaxInvPerMsg
148 // maxRequestedTxns is the maximum number of requested transactions hashes to
149 // store in memory.
150 maxRequestedTxns = wire.MaxInvPerMsg
151 )
152 153 // zeroHash is the zero value hash (all zeros)
154 var zeroHash chainhash.Hash
155 156 // DonePeer informs the blockmanager that a peer has disconnected.
157 func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) {
158 // Ignore if we are shutting down.
159 if atomic.LoadInt32(&sm.shutdown) != 0 {
160 return
161 }
162 sm.msgChan <- &donePeerMsg{peer: peer}
163 }
164 165 // IsCurrent returns whether or not the sync manager believes it is synced with
166 // the connected peers.
167 func (sm *SyncManager) IsCurrent() bool {
168 reply := make(chan bool)
169 sm.msgChan <- isCurrentMsg{reply: reply}
170 return <-reply
171 }
172 173 // NewPeer informs the sync manager of a newly active peer.
174 func (sm *SyncManager) NewPeer(peer *peerpkg.Peer) {
175 // Ignore if we are shutting down.
176 if atomic.LoadInt32(&sm.shutdown) != 0 {
177 return
178 }
179 sm.msgChan <- &newPeerMsg{peer: peer}
180 }
181 182 // Pause pauses the sync manager until the returned channel is closed.
183 //
184 // Note that while paused, all peer and block processing is halted. The message
185 // sender should avoid pausing the sync manager for long durations.
186 func (sm *SyncManager) Pause() chan<- struct{} {
187 c := qu.T()
188 sm.msgChan <- pauseMsg{c}
189 return c
190 }
191 192 // ProcessBlock makes use of ProcessBlock on an internal instance of a block
193 // chain.
194 func (sm *SyncManager) ProcessBlock(block *block2.Block, flags blockchain.BehaviorFlags) (bool, error) {
195 T.Ln("processing block")
196 // Traces(block)
197 reply := make(chan processBlockResponse, 1)
198 T.Ln("sending to msgChan")
199 sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}
200 T.Ln("waiting on reply")
201 response := <-reply
202 return response.isOrphan, response.err
203 }
204 205 // QueueBlock adds the passed block message and peer to the block handling
206 // queue. Responds to the done channel argument after the block message is
207 // processed.
208 func (sm *SyncManager) QueueBlock(block *block2.Block, peer *peerpkg.Peer, done qu.C) {
209 // Don't accept more blocks if we're shutting down.
210 if atomic.LoadInt32(&sm.shutdown) != 0 {
211 done <- struct{}{}
212 return
213 }
214 sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
215 }
216 217 // QueueHeaders adds the passed headers message and peer to the block handling
218 // queue.
219 func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) {
220 // No channel handling here because peers do not need to block on headers
221 // messages.
222 if atomic.LoadInt32(&sm.shutdown) != 0 {
223 return
224 }
225 sm.msgChan <- &headersMsg{headers: headers, peer: peer}
226 }
227 228 // QueueInv adds the passed inv message and peer to the block handling queue.
229 func (sm *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) {
230 // No channel handling here because peers do not need to block on inv messages.
231 if atomic.LoadInt32(&sm.shutdown) != 0 {
232 return
233 }
234 sm.msgChan <- &invMsg{inv: inv, peer: peer}
235 }
236 237 // QueueTx adds the passed transaction message and peer to the block handling
238 // queue. Responds to the done channel argument after the tx message is
239 // processed.
240 func (sm *SyncManager) QueueTx(tx *util.Tx, peer *peerpkg.Peer, done qu.C) {
241 // Don't accept more transactions if we're shutting down.
242 if atomic.LoadInt32(&sm.shutdown) != 0 {
243 done <- struct{}{}
244 return
245 }
246 sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}
247 }
248 249 // Start begins the core block handler which processes block and inv messages.
250 func (sm *SyncManager) Start() {
251 // Already started?
252 if atomic.AddInt32(&sm.started, 1) != 1 {
253 return
254 }
255 T.Ln("starting sync manager")
256 sm.wg.Add(1)
257 go sm.blockHandler(0)
258 }
259 260 // Stop gracefully shuts down the sync manager by stopping all asynchronous
261 // handlers and waiting for them to finish.
262 func (sm *SyncManager) Stop() (e error) {
263 if atomic.AddInt32(&sm.shutdown, 1) != 1 {
264 D.Ln("sync manager is already in the process of shutting down")
265 return nil
266 }
267 // DEBUG{"sync manager shutting down"}
268 sm.quit.Q()
269 sm.wg.Wait()
270 return nil
271 }
272 273 // SyncPeerID returns the ID of the current sync peer, or 0 if there is none.
274 func (sm *SyncManager) SyncPeerID() int32 {
275 reply := make(chan int32)
276 sm.msgChan <- getSyncPeerMsg{reply: reply}
277 return <-reply
278 }
279 280 // blockHandler is the main handler for the sync manager. It must be run as a
281 // goroutine. It processes block and inv messages in a separate goroutine from
282 // the peer handlers so the block (Block) messages are handled by a single
283 // thread without needing to lock memory data structures. This is important
284 // because the sync manager controls which blocks are needed and how the
285 // fetching should proceed.
286 func (sm *SyncManager) blockHandler(workerNumber uint32) {
287 out:
288 for {
289 select {
290 case m := <-sm.msgChan:
291 switch msg := m.(type) {
292 case *newPeerMsg:
293 sm.handleNewPeerMsg(msg.peer)
294 case *txMsg:
295 sm.handleTxMsg(msg)
296 msg.reply <- struct{}{}
297 case *blockMsg:
298 sm.handleBlockMsg(0, msg)
299 msg.reply <- struct{}{}
300 case *invMsg:
301 sm.handleInvMsg(msg)
302 case *headersMsg:
303 sm.handleHeadersMsg(msg)
304 case *donePeerMsg:
305 sm.handleDonePeerMsg(msg.peer)
306 case getSyncPeerMsg:
307 var peerID int32
308 if sm.syncPeer != nil {
309 peerID = sm.syncPeer.ID()
310 }
311 msg.reply <- peerID
312 case processBlockMsg:
313 T.Ln("received processBlockMsg")
314 var heightUpdate int32
315 header := &msg.block.WireBlock().Header
316 T.Ln("checking if have should have serialized block height")
317 if blockchain.ShouldHaveSerializedBlockHeight(header) {
318 T.Ln("reading coinbase transaction")
319 mbt := msg.block.Transactions()
320 if len(mbt) > 0 {
321 coinbaseTx := mbt[len(mbt)-1]
322 T.Ln("extracting coinbase height")
323 var e error
324 var cbHeight int32
325 if cbHeight, e = blockchain.ExtractCoinbaseHeight(coinbaseTx); E.Chk(e) {
326 W.Ln("unable to extract height from coinbase tx:", e)
327 } else {
328 heightUpdate = cbHeight
329 }
330 } else {
331 D.Ln("no transactions in block??")
332 }
333 }
334 T.Ln("passing to chain.ProcessBlock")
335 var isOrphan bool
336 var e error
337 if _, isOrphan, e = sm.chain.ProcessBlock(
338 workerNumber,
339 msg.block,
340 msg.flags,
341 heightUpdate,
342 ); D.Chk(e) {
343 D.Ln("error processing new block ", e)
344 msg.reply <- processBlockResponse{
345 isOrphan: false,
346 err: e,
347 }
348 }
349 T.Ln("sending back message on reply channel")
350 msg.reply <- processBlockResponse{
351 isOrphan: isOrphan,
352 err: nil,
353 }
354 T.Ln("sent reply")
355 case isCurrentMsg:
356 msg.reply <- sm.current()
357 case pauseMsg:
358 // Wait until the sender unpauses the manager.
359 <-msg.unpause
360 default:
361 T.F("invalid message type in block handler: %Ter", msg)
362 }
363 case <-sm.quit.Wait():
364 break out
365 }
366 }
367 sm.wg.Done()
368 }
369 370 // current returns true if we believe we are synced with our peers, false if we
371 // still have blocks to check
372 func (sm *SyncManager) current() bool {
373 if !sm.chain.IsCurrent() {
374 return false
375 }
376 // if blockChain thinks we are current and we have no syncPeer it is probably
377 // right.
378 if sm.syncPeer == nil {
379 return true
380 }
381 // No matter what chain thinks, if we are below the block we are syncing to we
382 // are not current.
383 if sm.chain.BestSnapshot().Height < sm.syncPeer.LastBlock() {
384 return false
385 }
386 return true
387 }
388 389 // fetchHeaderBlocks creates and sends a request to the syncPeer for the next
390 // list of blocks to be downloaded based on the current list of headers.
391 func (sm *SyncManager) fetchHeaderBlocks() {
392 // Nothing to do if there is no start header.
393 if sm.startHeader == nil {
394 D.Ln("fetchHeaderBlocks called with no start header")
395 return
396 }
397 // Build up a getdata request for the list of blocks the headers describe. The
398 // size hint will be limited to wire.MaxInvPerMsg by the function, so no need to
399 // double check it here.
400 gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len()))
401 numRequested := 0
402 var sh *list.Element
403 for sh = sm.startHeader; sh != nil; sh = sh.Next() {
404 node, ok := sh.Value.(*headerNode)
405 if !ok {
406 D.Ln("header list node type is not a headerNode")
407 continue
408 }
409 iv := wire.NewInvVect(wire.InvTypeBlock, node.hash)
410 haveInv, e := sm.haveInventory(iv)
411 if e != nil {
412 T.Ln(
413 "unexpected failure when checking for existing inventory during header block fetch:",
414 e,
415 )
416 }
417 var ee error
418 if !haveInv {
419 syncPeerState := sm.peerStates[sm.syncPeer]
420 sm.requestedBlocks[*node.hash] = struct{}{}
421 syncPeerState.requestedBlocks[*node.hash] = struct{}{}
422 // If we're fetching from a witness enabled peer post-fork, then ensure that we
423 // receive all the witness data in the blocks.
424 // if sm.syncPeer.IsWitnessEnabled() {
425 // iv.Type = wire.InvTypeWitnessBlock
426 // }
427 ee = gdmsg.AddInvVect(iv)
428 if ee != nil {
429 D.Ln(ee)
430 }
431 numRequested++
432 }
433 sm.startHeader = sh.Next()
434 if numRequested >= wire.MaxInvPerMsg {
435 break
436 }
437 }
438 if len(gdmsg.InvList) > 0 {
439 sm.syncPeer.QueueMessage(gdmsg, nil)
440 }
441 }
442 443 // findNextHeaderCheckpoint returns the next checkpoint after the passed height.
444 // It returns nil when there is not one either because the height is already
445 // later than the final checkpoint or some other reason such as disabled
446 // checkpoints.
447 func (sm *SyncManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoint {
448 checkpoints := sm.chain.Checkpoints()
449 if len(checkpoints) == 0 {
450 return nil
451 }
452 // There is no next checkpoint if the height is already after the final checkpoint.
453 finalCheckpoint := &checkpoints[len(checkpoints)-1]
454 if height >= finalCheckpoint.Height {
455 return nil
456 }
457 // Find the next checkpoint.
458 nextCheckpoint := finalCheckpoint
459 for i := len(checkpoints) - 2; i >= 0; i-- {
460 if height >= checkpoints[i].Height {
461 break
462 }
463 nextCheckpoint = &checkpoints[i]
464 }
465 return nextCheckpoint
466 }
467 468 // handleBlockMsg handles block messages from all peers.
469 func (sm *SyncManager) handleBlockMsg(workerNumber uint32, bmsg *blockMsg) {
470 pp := bmsg.peer
471 state, exists := sm.peerStates[pp]
472 if !exists {
473 T.Ln(
474 "received block message from unknown peer", pp,
475 )
476 return
477 }
478 // If we didn't ask for this block then the peer is misbehaving.
479 blockHash := bmsg.block.Hash()
480 if _, exists = state.requestedBlocks[*blockHash]; !exists {
481 // The regression test intentionally sends some blocks twice to test duplicate
482 // block insertion fails. Don't disconnect the peer or ignore the block when
483 // we're in regression test mode in this case so the chain code is actually fed
484 // the duplicate blocks.
485 if sm.chainParams != &chaincfg.RegressionTestParams {
486 W.C(
487 func() string {
488 return fmt.Sprintf(
489 "got unrequested block %v from %s -- disconnecting",
490 blockHash,
491 pp.Addr(),
492 )
493 },
494 )
495 pp.Disconnect()
496 return
497 }
498 }
499 // When in headers-first mode, if the block matches the hash of the first header
500 // in the list of headers that are being fetched, it's eligible for less
501 // validation since the headers have already been verified to link together and
502 // are valid up to the next checkpoint. Also, remove the list entry for all
503 // blocks except the checkpoint since it is needed to verify the next round of
504 // headers links properly.
505 isCheckpointBlock := false
506 behaviorFlags := blockchain.BFNone
507 if sm.headersFirstMode {
508 firstNodeEl := sm.headerList.Front()
509 if firstNodeEl != nil {
510 firstNode := firstNodeEl.Value.(*headerNode)
511 if blockHash.IsEqual(firstNode.hash) {
512 behaviorFlags |= blockchain.BFFastAdd
513 if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) {
514 isCheckpointBlock = true
515 } else {
516 sm.headerList.Remove(firstNodeEl)
517 }
518 }
519 }
520 }
521 // Remove block from request maps. Either chain will know about it and so we
522 // shouldn't have any more instances of trying to fetch it, or we will fail the
523 // insert and thus we'll retry next time we get an inv.
524 delete(state.requestedBlocks, *blockHash)
525 delete(sm.requestedBlocks, *blockHash)
526 var heightUpdate int32
527 var blkHashUpdate *chainhash.Hash
528 header := &bmsg.block.WireBlock().Header
529 if blockchain.ShouldHaveSerializedBlockHeight(header) {
530 coinbaseTx := bmsg.block.Transactions()[0]
531 cbHeight, e := blockchain.ExtractCoinbaseHeight(coinbaseTx)
532 if e != nil {
533 T.F(
534 "unable to extract height from coinbase tx: %v",
535 e,
536 )
537 } else {
538 heightUpdate = cbHeight
539 blkHashUpdate = blockHash
540 }
541 }
542 D.Ln("current best height", sm.chain.BestChain.Height())
543 _, isOrphan, e := sm.chain.ProcessBlock(
544 workerNumber, bmsg.block,
545 behaviorFlags, heightUpdate,
546 )
547 if e != nil {
548 if heightUpdate+1 <= sm.chain.BestChain.Height() {
549 // Process the block to include validation, best chain selection, orphan handling, etc.
550 // When the error is a rule error, it means the block was simply rejected as
551 // opposed to something actually going wrong, so log it as such. Otherwise,
552 // something really did go wrong, so log it as an actual error.
553 // Convert the error into an appropriate reject message and send it.
554 if _, ok := e.(blockchain.RuleError); ok {
555 E.F(
556 "rejected block %v from %s: %v",
557 blockHash, pp, e,
558 )
559 } else {
560 E.F("failed to process block %v: %v", blockHash, e)
561 }
562 if dbErr, ok := e.(database.DBError); ok && dbErr.ErrorCode ==
563 database.ErrCorruption {
564 panic(dbErr)
565 }
566 code, reason := mempool.ErrToRejectErr(e)
567 pp.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false)
568 return
569 } else {
570 isOrphan=true
571 }
572 }
573 // Meta-data about the new block this peer is reporting. We use this below to
574 // update this peer's lastest block height and the heights of other peers based
575 // on their last announced block hash. This allows us to dynamically update the
576 // block heights of peers, avoiding stale heights when looking for a new sync
577 // peer. Upon acceptance of a block or recognition of an orphan, we also use
578 // this information to update the block heights over other peers who's invs may
579 // have been ignored if we are actively syncing while the chain is not yet
580 // current or who may have lost the lock announcment race. Request the parents
581 // for the orphan block from the peer that sent it.
582 if isOrphan {
583 // We've just received an orphan block from a peer. In order to update the
584 // height of the peer, we try to extract the block height from the scriptSig of
585 // the coinbase transaction. Extraction is only attempted if the block's version
586 // is high enough (ver 2+).
587 header := &bmsg.block.WireBlock().Header
588 if blockchain.ShouldHaveSerializedBlockHeight(header) {
589 coinbaseTx := bmsg.block.Transactions()[0]
590 var cbHeight int32
591 cbHeight, e = blockchain.ExtractCoinbaseHeight(coinbaseTx)
592 if e != nil {
593 E.F("unable to extract height from coinbase tx: %v", e)
594 } else {
595 D.F(
596 "extracted height of %v from orphan block",
597 cbHeight,
598 )
599 heightUpdate = cbHeight
600 blkHashUpdate = blockHash
601 }
602 }
603 orphanRoot := sm.chain.GetOrphanRoot(blockHash)
604 var locator blockchain.BlockLocator
605 locator, e = sm.chain.LatestBlockLocator()
606 if e != nil {
607 E.F(
608 "failed to get block locator for the latest block: %v",
609 e,
610 )
611 } else {
612 e = pp.PushGetBlocksMsg(locator, orphanRoot)
613 if e != nil {
614 }
615 }
616 } else {
617 // When the block is not an orphan, log information about it and update the
618 // chain state.
619 sm.progressLogger.LogBlockHeight(bmsg.block)
620 // Update this peer's latest block height, for future potential sync node
621 // candidacy.
622 best := sm.chain.BestSnapshot()
623 heightUpdate = best.Height
624 blkHashUpdate = &best.Hash
625 // Clear the rejected transactions.
626 sm.rejectedTxns = make(map[chainhash.Hash]struct{})
627 }
628 // Update the block height for this peer. But only send a message to the server
629 // for updating peer heights if this is an orphan or our chain is "current".
630 // This avoids sending a spammy amount of messages if we're syncing the chain
631 // from scratch.
632 if blkHashUpdate != nil && heightUpdate != 0 {
633 pp.UpdateLastBlockHeight(heightUpdate)
634 if isOrphan || sm.current() {
635 go sm.peerNotifier.UpdatePeerHeights(
636 blkHashUpdate, heightUpdate,
637 pp,
638 )
639 }
640 }
641 // Nothing more to do if we aren't in headers-first mode.
642 if !sm.headersFirstMode {
643 return
644 }
645 // This is headers-first mode, so if the block is not a checkpoint request more
646 // blocks using the header list when the request queue is getting short.
647 if !isCheckpointBlock {
648 if sm.startHeader != nil &&
649 len(state.requestedBlocks) < minInFlightBlocks {
650 sm.fetchHeaderBlocks()
651 }
652 return
653 }
654 // This is headers-first mode and the block is a checkpoint. When there is a
655 // next checkpoint, get the next round of headers by asking for headers starting
656 // from the block after this one up to the next checkpoint.
657 prevHeight := sm.nextCheckpoint.Height
658 prevHash := sm.nextCheckpoint.Hash
659 sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight)
660 if sm.nextCheckpoint != nil {
661 locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash})
662 e = pp.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
663 if e != nil {
664 E.F(
665 "failed to send getheaders message to peer %s: %v",
666 pp.Addr(), e,
667 )
668 return
669 }
670 I.F(
671 "downloading headers for blocks %d to %d from peer %s",
672 prevHeight+1, sm.nextCheckpoint.Height, sm.syncPeer.Addr(),
673 )
674 return
675 }
676 // This is headers-first mode, the block is a checkpoint, and there are no more
677 // checkpoints, so switch to normal mode by requesting blocks from the block
678 // after this one up to the end of the chain (zero hash).
679 sm.headersFirstMode = false
680 sm.headerList.Init()
681 I.Ln(
682 "reached the final checkpoint -- switching to normal mode",
683 )
684 locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash})
685 e = pp.PushGetBlocksMsg(locator, &zeroHash)
686 if e != nil {
687 E.Ln(
688 "failed to send getblocks message to peer", pp, ":", e,
689 )
690 return
691 }
692 }
693 694 // handleBlockchainNotification handles notifications from blockchain. It does
695 // things such as request orphan block parents and relay accepted blocks to
696 // connected peers.
697 func (sm *SyncManager) handleBlockchainNotification(notification *blockchain.Notification) {
698 switch notification.Type {
699 // A block has been accepted into the block chain. Relay it to other peers.
700 case blockchain.NTBlockAccepted:
701 // Don't relay if we are not current. Other peers that are current should
702 // already know about it.
703 if !sm.current() {
704 return
705 }
706 block, ok := notification.Data.(*block2.Block)
707 if !ok {
708 D.Ln("chain accepted notification is not a block")
709 break
710 }
711 // Generate the inventory vector and relay it.
712 iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
713 sm.peerNotifier.RelayInventory(iv, block.WireBlock().Header)
714 // A block has been connected to the main block chain.
715 case blockchain.NTBlockConnected:
716 block, ok := notification.Data.(*block2.Block)
717 if !ok {
718 D.Ln("chain connected notification is not a block")
719 break
720 }
721 // Remove all of the transactions (except the coinbase) in the connected block
722 // from the transaction pool. Secondly, remove any transactions which are now
723 // double spends as a result of these new transactions. Finally, remove any
724 // transaction that is no longer an orphan. Transactions which depend on a
725 // confirmed transaction are NOT removed recursively because they are still
726 // valid.
727 for _, tx := range block.Transactions()[1:] {
728 sm.txMemPool.RemoveTransaction(tx, false)
729 sm.txMemPool.RemoveDoubleSpends(tx)
730 sm.txMemPool.RemoveOrphan(tx)
731 sm.peerNotifier.TransactionConfirmed(tx)
732 acceptedTxs := sm.txMemPool.ProcessOrphans(sm.chain, tx)
733 sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
734 }
735 // Register block with the fee estimator, if it exists.
736 if sm.feeEstimator != nil {
737 e := sm.feeEstimator.RegisterBlock(block)
738 // If an error is somehow generated then the fee estimator has entered an
739 // invalid state. Since it doesn't know how to recover, create a new one.
740 if e != nil {
741 sm.feeEstimator = mempool.NewFeeEstimator(
742 mempool.DefaultEstimateFeeMaxRollback,
743 mempool.DefaultEstimateFeeMinRegisteredBlocks,
744 )
745 }
746 }
747 // A block has been disconnected from the main block chain.
748 case blockchain.NTBlockDisconnected:
749 block, ok := notification.Data.(*block2.Block)
750 if !ok {
751 D.Ln("chain disconnected notification is not a block.")
752 break
753 }
754 // Reinsert all of the transactions (except the coinbase) into the transaction pool.
755 for _, tx := range block.Transactions()[1:] {
756 var ee error
757 _, _, ee = sm.txMemPool.MaybeAcceptTransaction(
758 sm.chain, tx,
759 false, false,
760 )
761 if ee != nil {
762 // Remove the transaction and all transactions that depend on it if it wasn't
763 // accepted into the transaction pool.
764 sm.txMemPool.RemoveTransaction(tx, true)
765 }
766 }
767 // Rollback previous block recorded by the fee estimator.
768 if sm.feeEstimator != nil {
769 e := sm.feeEstimator.Rollback(block.Hash())
770 if e != nil {
771 }
772 }
773 }
774 }
775 776 // handleDonePeerMsg deals with peers that have signalled they are done. It
777 // removes the peer as a candidate for syncing and in the case where it was the
778 // current sync peer, attempts to select a new best peer to sync from. It is
779 // invoked from the syncHandler goroutine.
780 func (sm *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) {
781 state, exists := sm.peerStates[peer]
782 if !exists {
783 T.Ln("received done peer message for unknown peer", peer)
784 return
785 }
786 // Remove the peer from the list of candidate peers.
787 delete(sm.peerStates, peer)
788 T.Ln("lost peer ", peer)
789 // Remove requested transactions from the global map so that they will be
790 // fetched from elsewhere next time we get an inv.
791 for txHash := range state.requestedTxns {
792 delete(sm.requestedTxns, txHash)
793 }
794 // Remove requested blocks from the global map so that they will be fetched from
795 // elsewhere next time we get an inv.
796 //
797 // TODO: we could possibly here check which peers have these blocks and request them now to speed things up a little.
798 for blockHash := range state.requestedBlocks {
799 delete(sm.requestedBlocks, blockHash)
800 }
801 // Attempt to find a new peer to sync from if the quitting peer is the sync
802 // peer. Also, reset the headers-first state if in headers-first mode so
803 if sm.syncPeer == peer {
804 sm.syncPeer = nil
805 if sm.headersFirstMode {
806 best := sm.chain.BestSnapshot()
807 sm.resetHeaderState(&best.Hash, best.Height)
808 }
809 sm.startSync()
810 }
811 }
812 813 // handleHeadersMsg handles block header messages from all peers. Headers are
814 // requested when performing a headers-first sync.
815 func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
816 peer := hmsg.peer
817 _, exists := sm.peerStates[peer]
818 if !exists {
819 T.Ln("received headers message from unknown peer", peer)
820 return
821 }
822 // The remote peer is misbehaving if we didn't request headers.
823 msg := hmsg.headers
824 numHeaders := len(msg.Headers)
825 if !sm.headersFirstMode {
826 T.F(
827 "got %d unrequested headers from %s -- disconnecting",
828 numHeaders, peer,
829 )
830 peer.Disconnect()
831 return
832 }
833 // Nothing to do for an empty headers message.
834 if numHeaders == 0 {
835 return
836 }
837 // Process all of the received headers ensuring each one connects to the
838 // previous and that checkpoints match.
839 receivedCheckpoint := false
840 var finalHash *chainhash.Hash
841 for _, blockHeader := range msg.Headers {
842 blockHash := blockHeader.BlockHash()
843 finalHash = &blockHash
844 // Ensure there is a previous header to compare against.
845 prevNodeEl := sm.headerList.Back()
846 if prevNodeEl == nil {
847 W.Ln(
848 "header list does not contain a previous element as expected -- disconnecting peer",
849 )
850 peer.Disconnect()
851 return
852 }
853 // Ensure the header properly connects to the previous one and add it to the
854 // list of headers.
855 node := headerNode{hash: &blockHash}
856 prevNode := prevNodeEl.Value.(*headerNode)
857 if prevNode.hash.IsEqual(&blockHeader.PrevBlock) {
858 node.height = prevNode.height + 1
859 e := sm.headerList.PushBack(&node)
860 if sm.startHeader == nil {
861 sm.startHeader = e
862 }
863 } else {
864 T.Ln(
865 "received block header that does not properly connect to the chain from peer",
866 peer,
867 "-- disconnecting",
868 )
869 peer.Disconnect()
870 return
871 }
872 // Verify the header at the next checkpoint height matches.
873 if node.height == sm.nextCheckpoint.Height {
874 if node.hash.IsEqual(sm.nextCheckpoint.Hash) {
875 receivedCheckpoint = true
876 I.F(
877 "verified downloaded block header against checkpoint at height %d/hash %s",
878 node.height,
879 node.hash,
880 )
881 } else {
882 T.F(
883 "block header at height %d/hash %s from peer %s does NOT match expected checkpoint hash of"+
884 " %s -- disconnecting",
885 node.height,
886 node.hash,
887 peer,
888 sm.nextCheckpoint.Hash,
889 )
890 peer.Disconnect()
891 return
892 }
893 break
894 }
895 }
896 // When this header is a checkpoint, switch to fetching the blocks for all of
897 // the headers since the last checkpoint.
898 if receivedCheckpoint {
899 // Since the first entry of the list is always the final block that is already
900 // in the database and is only used to ensure the next header links properly, it
901 // must be removed before fetching the blocks.
902 sm.headerList.Remove(sm.headerList.Front())
903 I.F(
904 "received %v block headers: Fetching blocks",
905 sm.headerList.Len(),
906 )
907 sm.progressLogger.SetLastLogTime(time.Now())
908 sm.fetchHeaderBlocks()
909 return
910 }
911 // This header is not a checkpoint, so request the next batch of headers
912 // starting from the latest known header and ending with the next checkpoint.
913 locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash})
914 e := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
915 if e != nil {
916 E.F(
917 "failed to send getheaders message to peer %s: %v", peer,
918 e,
919 )
920 return
921 }
922 }
923 924 // handleInvMsg handles inv messages from all peers. We examine the inventory
925 // advertised by the remote peer and act accordingly.
926 func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
927 peer := imsg.peer
928 state, exists := sm.peerStates[peer]
929 if !exists {
930 T.Ln("received inv message from unknown peer", peer)
931 return
932 }
933 // Attempt to find the final block in the inventory list. There may not be one.
934 lastBlock := -1
935 invVects := imsg.inv.InvList
936 for i := len(invVects) - 1; i >= 0; i-- {
937 if invVects[i].Type == wire.InvTypeBlock {
938 lastBlock = i
939 break
940 }
941 }
942 // If this inv contains a block announcement, and this isn't coming from our
943 // current sync peer or we're current, then update the last announced block for
944 // this peer. We'll use this information later to update the heights of peers
945 // based on blocks we've accepted that they previously announced.
946 if lastBlock != -1 && (peer != sm.syncPeer || sm.current()) {
947 peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash)
948 }
949 // Ignore invs from peers that aren't the sync if we are not current. Helps prevent fetching a mass of orphans.
950 if peer != sm.syncPeer && !sm.current() {
951 return
952 }
953 // If our chain is current and a peer announces a block we already know of, then update their current block height.
954 if lastBlock != -1 && sm.current() {
955 blkHeight, e := sm.chain.BlockHeightByHash(&invVects[lastBlock].Hash)
956 if e == nil {
957 peer.UpdateLastBlockHeight(blkHeight)
958 }
959 }
960 // Request the advertised inventory if we don't already have it. Also, request
961 // parent blocks of orphans if we receive one we already have. Finally, attempt
962 // to detect potential stalls due to long side chains we already have and
963 // request more blocks to prevent them.
964 for i, iv := range invVects {
965 // Ignore unsupported inventory types.
966 switch iv.Type {
967 case wire.InvTypeBlock:
968 case wire.InvTypeTx:
969 // case wire.InvTypeWitnessBlock:
970 // case wire.InvTypeWitnessTx:
971 default:
972 continue
973 }
974 // Add the inventory to the cache of known inventory for the peer.
975 peer.AddKnownInventory(iv)
976 // Ignore inventory when we're in headers-first mode.
977 if sm.headersFirstMode {
978 continue
979 }
980 // Request the inventory if we don't already have it.
981 haveInv, e := sm.haveInventory(iv)
982 if e != nil {
983 E.Ln("unexpected failure when checking for existing inventory during inv message processing:", e)
984 continue
985 }
986 if !haveInv {
987 if iv.Type == wire.InvTypeTx {
988 // Skip the transaction if it has already been rejected.
989 if _, exists := sm.rejectedTxns[iv.Hash]; exists {
990 continue
991 }
992 }
993 // Ignore invs block invs from non-witness enabled peers, as after segwit
994 // activation we only want to download from peers that can provide us full
995 // witness data for blocks. PARALLELCOIN HAS NO WITNESS STUFF if
996 // !peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock {
997 // continue
998 // }
999 // Add it to the request queue.
1000 state.requestQueue = append(state.requestQueue, iv)
1001 continue
1002 }
1003 if iv.Type == wire.InvTypeBlock {
1004 // The block is an orphan block that we already have. When the existing orphan
1005 // was processed, it requested the missing parent blocks. When this scenario
1006 // happens, it means there were more blocks missing than are allowed into a
1007 // single inventory message. As a result, once this peer requested the final
1008 // advertised block, the remote peer noticed and is now resending the orphan
1009 // block as an available block to signal there are more missing blocks that need
1010 // to be requested.
1011 if sm.chain.IsKnownOrphan(&iv.Hash) {
1012 // Request blocks starting at the latest known up to the root of the orphan that just came in.
1013 orphanRoot := sm.chain.GetOrphanRoot(&iv.Hash)
1014 locator, e := sm.chain.LatestBlockLocator()
1015 if e != nil {
1016 E.Ln("failed to get block locator for the latest block:", e)
1017 continue
1018 }
1019 e = peer.PushGetBlocksMsg(locator, orphanRoot)
1020 if e != nil {
1021 }
1022 continue
1023 }
1024 // We already have the final block advertised by this inventory message, so
1025 // force a request for more. This should only happen if we're on a really long
1026 // side chain.
1027 if i == lastBlock {
1028 // Request blocks after this one up to the final one the remote peer knows about
1029 // (zero stop hash).
1030 locator := sm.chain.BlockLocatorFromHash(&iv.Hash)
1031 e := peer.PushGetBlocksMsg(locator, &zeroHash)
1032 if e != nil {
1033 }
1034 }
1035 }
1036 }
1037 // Request as much as possible at once. Anything that won't fit into request
1038 // will be requested on the next inv message.
1039 numRequested := 0
1040 gdmsg := wire.NewMsgGetData()
1041 requestQueue := state.requestQueue
1042 for len(requestQueue) != 0 {
1043 iv := requestQueue[0]
1044 requestQueue[0] = nil
1045 requestQueue = requestQueue[1:]
1046 switch iv.Type {
1047 // case wire.InvTypeWitnessBlock:
1048 // fallthrough
1049 case wire.InvTypeBlock:
1050 // Request the block if there is not already a pending request.
1051 if _, exists := sm.requestedBlocks[iv.Hash]; !exists {
1052 sm.requestedBlocks[iv.Hash] = struct{}{}
1053 sm.limitMap(sm.requestedBlocks, maxRequestedBlocks)
1054 state.requestedBlocks[iv.Hash] = struct{}{}
1055 // if peer.IsWitnessEnabled() {
1056 // iv.Type = wire.InvTypeWitnessBlock
1057 // }
1058 e := gdmsg.AddInvVect(iv)
1059 if e != nil {
1060 }
1061 numRequested++
1062 }
1063 // case wire.InvTypeWitnessTx:
1064 // fallthrough
1065 case wire.InvTypeTx:
1066 // Request the transaction if there is not already a pending request.
1067 if _, exists := sm.requestedTxns[iv.Hash]; !exists {
1068 sm.requestedTxns[iv.Hash] = struct{}{}
1069 sm.limitMap(sm.requestedTxns, maxRequestedTxns)
1070 state.requestedTxns[iv.Hash] = struct{}{}
1071 // If the peer is capable, request the txn including all witness
1072 // data.
1073 // if peer.IsWitnessEnabled() {
1074 // iv.Type = wire.InvTypeWitnessTx
1075 // }
1076 e := gdmsg.AddInvVect(iv)
1077 if e != nil {
1078 }
1079 numRequested++
1080 }
1081 }
1082 if numRequested >= wire.MaxInvPerMsg {
1083 break
1084 }
1085 }
1086 state.requestQueue = requestQueue
1087 if len(gdmsg.InvList) > 0 {
1088 peer.QueueMessage(gdmsg, nil)
1089 }
1090 }
1091 1092 // handleNewPeerMsg deals with new peers that have signalled they may be
1093 // considered as a sync peer (they have already successfully negotiated). It
1094 // also starts syncing if needed. It is invoked from the syncHandler goroutine.
1095 func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
1096 // Ignore if in the process of shutting down.
1097 if atomic.LoadInt32(&sm.shutdown) != 0 {
1098 return
1099 }
1100 T.F("new valid peer %s (%s)", peer, peer.UserAgent())
1101 // Initialize the peer state
1102 isSyncCandidate := sm.isSyncCandidate(peer)
1103 if isSyncCandidate {
1104 I.Ln(peer, "is a sync candidate")
1105 }
1106 sm.peerStates[peer] = &peerSyncState{
1107 syncCandidate: isSyncCandidate,
1108 requestedTxns: make(map[chainhash.Hash]struct{}),
1109 requestedBlocks: make(map[chainhash.Hash]struct{}),
1110 }
1111 // Start syncing by choosing the best candidate if needed.
1112 if isSyncCandidate && sm.syncPeer == nil {
1113 sm.startSync()
1114 }
1115 }
1116 1117 // handleTxMsg handles transaction messages from all peers.
1118 func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
1119 peer := tmsg.peer
1120 state, exists := sm.peerStates[peer]
1121 if !exists {
1122 W.C(
1123 func() string {
1124 return "received tx message from unknown peer " +
1125 peer.String()
1126 },
1127 )
1128 return
1129 }
1130 // NOTE: BitcoinJ, and possibly other wallets, don't follow the spec of sending
1131 // an inventory message and allowing the remote peer to decide whether or not
1132 // they want to request the transaction via a getdata message. Unfortunately,
1133 // the reference implementation permits unrequested data, so it has allowed
1134 // wallets that don't follow the spec to proliferate. While this is not ideal,
1135 // there is no check here to disconnect peers for sending unsolicited
1136 // transactions to provide interoperability.
1137 txHash := tmsg.tx.Hash()
1138 // Ignore transactions that we have already rejected. Do not send a reject
1139 // message here because if the transaction was already rejected, the transaction
1140 // was unsolicited.
1141 if _, exists = sm.rejectedTxns[*txHash]; exists {
1142 D.C(
1143 func() string {
1144 return "ignoring unsolicited previously rejected transaction " +
1145 txHash.String() + " from " + peer.String()
1146 },
1147 )
1148 return
1149 }
1150 // Process the transaction to include validation, insertion in the memory pool,
1151 // orphan handling, etc.
1152 acceptedTxs, e := sm.txMemPool.ProcessTransaction(
1153 sm.chain, tmsg.tx,
1154 true, true, mempool.Tag(peer.ID()),
1155 )
1156 // Remove transaction from request maps. Either the mempool/chain already knows
1157 // about it and as such we shouldn't have any more instances of trying to fetch
1158 // it, or we failed to insert and thus we'll retry next time we get an inv.
1159 delete(state.requestedTxns, *txHash)
1160 delete(sm.requestedTxns, *txHash)
1161 if e != nil {
1162 // Do not request this transaction again until a new block has been processed.
1163 sm.rejectedTxns[*txHash] = struct{}{}
1164 sm.limitMap(sm.rejectedTxns, maxRejectedTxns)
1165 // When the error is a rule error, it means the transaction was simply rejected
1166 // as opposed to something actually going wrong, so log it as such. Otherwise,
1167 // something really did go wrong, so log it as an actual error.
1168 if _, ok := e.(mempool.RuleError); ok {
1169 D.F(
1170 "rejected transaction %v from %s: %v",
1171 txHash,
1172 peer,
1173 e,
1174 )
1175 } else {
1176 E.F(
1177 "failed to process transaction %v: %v",
1178 txHash,
1179 e,
1180 )
1181 }
1182 // Convert the error into an appropriate reject message and send it.
1183 code, reason := mempool.ErrToRejectErr(e)
1184 peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, false)
1185 return
1186 }
1187 sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
1188 }
1189 1190 // haveInventory returns whether or not the inventory represented by the passed
1191 // inventory vector is known. This includes checking all of the various places
1192 // inventory can be when it is in different states such as blocks that are part
1193 // of the main chain, on a side chain, in the orphan pool, and transactions that
1194 // are in the memory pool (either the main pool or orphan pool).
1195 func (sm *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) {
1196 switch invVect.Type {
1197 // case wire.InvTypeWitnessBlock:
1198 // fallthrough
1199 case wire.InvTypeBlock:
1200 // Ask chain if the block is known to it in any form (main chain, side chain, or
1201 // orphan).
1202 return sm.chain.HaveBlock(&invVect.Hash)
1203 // case wire.InvTypeWitnessTx:
1204 // fallthrough
1205 case wire.InvTypeTx:
1206 // Ask the transaction memory pool if the transaction is known to it in any form
1207 // (main pool or orphan).
1208 if sm.txMemPool.HaveTransaction(&invVect.Hash) {
1209 return true, nil
1210 }
1211 // Chk if the transaction exists from the point of view of the end of the main
1212 // chain. Note that this is only a best effort since it is expensive to check
1213 // existence of every output and the only purpose of this check is to avoid
1214 // downloading already known transactions. Only the first two outputs are
1215 // checked because the vast majority of transactions consist of two outputs
1216 // where one is some form of "pay-to-somebody-else" and the other is a change
1217 // output.
1218 prevOut := wire.OutPoint{Hash: invVect.Hash}
1219 for i := uint32(0); i < 2; i++ {
1220 prevOut.Index = i
1221 entry, e := sm.chain.FetchUtxoEntry(prevOut)
1222 if e != nil {
1223 return false, e
1224 }
1225 if entry != nil && !entry.IsSpent() {
1226 return true, nil
1227 }
1228 }
1229 return false, nil
1230 }
1231 // The requested inventory is is an unsupported type, so just claim it is known
1232 // to avoid requesting it.
1233 return true, nil
1234 }
1235 1236 // isSyncCandidate returns whether or not the peer is a candidate to consider
1237 // syncing from.
1238 func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool {
1239 // Typically a peer is not a candidate for sync if it's not a full node, however
1240 // regression test is special in that the regression tool is not a full node and
1241 // still needs to be considered a sync candidate.
1242 if sm.chainParams == &chaincfg.RegressionTestParams {
1243 // The peer is not a candidate if it's not coming from localhost or the hostname
1244 // can't be determined for some reason.
1245 var host string
1246 var e error
1247 host, _, e = net.SplitHostPort(peer.Addr())
1248 if e != nil {
1249 return false
1250 }
1251 if host != "127.0.0.1" && host != "localhost" {
1252 return false
1253 }
1254 // } else {
1255 // // The peer is not a candidate for sync if it's not a full node. Additionally, if the segwit soft-fork package
1256 // // has activated, then the peer must also be upgraded.
1257 // segwitActive, e := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
1258 // if e != nil {
1259 // // Error("unable to query for segwit soft-fork state:", e)
1260 // }
1261 // nodeServices := peer.Services()
1262 // if nodeServices&wire.SFNodeNetwork != wire.SFNodeNetwork ||
1263 // (segwitActive && !peer.IsWitnessEnabled()) {
1264 // return false
1265 // }
1266 }
1267 // Candidate if all checks passed.
1268 return true
1269 }
1270 1271 // limitMap is a helper function for maps that require a maximum limit by
1272 // evicting a random transaction if adding a new value would cause it to
1273 // overflow the maximum allowed.
1274 func (sm *SyncManager) limitMap(m map[chainhash.Hash]struct{}, limit int) {
1275 if len(m)+1 > limit {
1276 // Remove a random entry from the map. For most compilers, Go's range statement
1277 // iterates starting at a random item although that is not 100% guaranteed by
1278 // the spec. The iteration order is not important here because an adversary
1279 // would have to be able to pull off preimage attacks on the hashing function in
1280 // order to target eviction of specific entries anyways.
1281 for txHash := range m {
1282 delete(m, txHash)
1283 return
1284 }
1285 }
1286 }
1287 1288 // resetHeaderState sets the headers-first mode state to values appropriate for
1289 // syncing from a new peer.
1290 func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) {
1291 sm.headersFirstMode = false
1292 sm.headerList.Init()
1293 sm.startHeader = nil
1294 // When there is a next checkpoint, add an entry for the latest known block into
1295 // the header pool. This allows the next downloaded header to prove it links to
1296 // the chain properly.
1297 if sm.nextCheckpoint != nil {
1298 node := headerNode{height: newestHeight, hash: newestHash}
1299 sm.headerList.PushBack(&node)
1300 }
1301 }
1302 1303 // startSync will choose the best peer among the available candidate peers to
1304 // download/sync the blockchain from. When syncing is already running, it simply
1305 // returns. It also examines the candidates for any which are no longer
1306 // candidates and removes them as needed.
1307 func (sm *SyncManager) startSync() {
1308 // Return now if we're already syncing.
1309 if sm.syncPeer != nil {
1310 return
1311 }
1312 // Once the segwit soft-fork package has activated, we only want to sync from
1313 // peers which are witness enabled to ensure that we fully validate all
1314 // blockchain data.
1315 // var e error
1316 // var segwitActive bool
1317 // segwitActive, e = sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
1318 // if e != nil {
1319 // Error("unable to query for segwit soft-fork state:", e)
1320 // return
1321 // }
1322 best := sm.chain.BestSnapshot()
1323 var bestPeer *peerpkg.Peer
1324 for peer, state := range sm.peerStates {
1325 if !state.syncCandidate {
1326 continue
1327 }
1328 // if segwitActive && !peer.IsWitnessEnabled() {
1329 // D.Ln("peer", peer, "not witness enabled, skipping")
1330 // continue
1331 // } Remove sync candidate peers that are no longer candidates due to passing
1332 // their latest known block.
1333 //
1334 // NOTE: The < is intentional as opposed to <=. While technically the peer
1335 // doesn't have a later block when it's equal, it will likely have one soon so
1336 // it is a reasonable choice. It also allows the case where both are at 0 such
1337 // as during regression test.
1338 if peer.LastBlock() < best.Height {
1339 // state.syncCandidate = false
1340 continue
1341 }
1342 // TODO(davec): Use a better algorithm to choose the best peer. For now, just pick the first available candidate.
1343 bestPeer = peer
1344 }
1345 // Start syncing from the best peer if one was selected.
1346 if bestPeer != nil {
1347 // Clear the requestedBlocks if the sync peer changes, otherwise we may ignore blocks we need that the last sync
1348 // peer failed to send.
1349 sm.requestedBlocks = make(map[chainhash.Hash]struct{})
1350 locator, e := sm.chain.LatestBlockLocator()
1351 if e != nil {
1352 E.Ln("failed to get block locator for the latest block:", e)
1353 return
1354 }
1355 T.C(
1356 func() string {
1357 return fmt.Sprintf("syncing to block height %d from peer %v", bestPeer.LastBlock(), bestPeer.Addr())
1358 },
1359 )
1360 // When the current height is less than a known checkpoint we can use block
1361 // headers to learn about which blocks comprise the chain up to the checkpoint
1362 // and perform less validation for them. This is possible since each header
1363 // contains the hash of the previous header and a merkle root.
1364 //
1365 // Therefore if we validate all of the received headers link together properly
1366 // and the checkpoint hashes match, we can be sure the hashes for the blocks in
1367 // between are accurate. Further, once the full blocks are downloaded, the
1368 // merkle root is computed and compared against the value in the header which
1369 // proves the full block hasn't been tampered with. Once we have passed the
1370 // final checkpoint, or checkpoints are disabled, use standard inv messages
1371 // learn about the blocks and fully validate them. Finally, regression test mode
1372 // does not support the headers-first approach so do normal block downloads when
1373 // in regression test mode.
1374 if sm.nextCheckpoint != nil &&
1375 best.Height < sm.nextCheckpoint.Height &&
1376 sm.chainParams != &chaincfg.RegressionTestParams {
1377 e := bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
1378 if e != nil {
1379 }
1380 sm.headersFirstMode = true
1381 I.F(
1382 "downloading headers for blocks %d to %d from peer %s",
1383 best.Height+1,
1384 sm.nextCheckpoint.Height,
1385 bestPeer.Addr(),
1386 )
1387 } else {
1388 e := bestPeer.PushGetBlocksMsg(locator, &zeroHash)
1389 if e != nil {
1390 }
1391 }
1392 sm.syncPeer = bestPeer
1393 } else {
1394 T.Ln("no sync peer candidates available")
1395 }
1396 }
1397 1398 // New constructs a new SyncManager. Use Start to begin processing asynchronous
1399 // block, tx, and inv updates.
1400 func New(config *Config) (*SyncManager, error) {
1401 sm := SyncManager{
1402 peerNotifier: config.PeerNotifier,
1403 chain: config.Chain,
1404 txMemPool: config.TxMemPool,
1405 chainParams: config.ChainParams,
1406 rejectedTxns: make(map[chainhash.Hash]struct{}),
1407 requestedTxns: make(map[chainhash.Hash]struct{}),
1408 requestedBlocks: make(map[chainhash.Hash]struct{}),
1409 peerStates: make(map[*peerpkg.Peer]*peerSyncState),
1410 progressLogger: newBlockProgressLogger("processed"),
1411 msgChan: make(chan interface{}, config.MaxPeers*3),
1412 headerList: list.New(),
1413 quit: qu.T(),
1414 feeEstimator: config.FeeEstimator,
1415 }
1416 best := sm.chain.BestSnapshot()
1417 if !config.DisableCheckpoints {
1418 // Initialize the next checkpoint based on the current height.
1419 sm.nextCheckpoint = sm.findNextHeaderCheckpoint(best.Height)
1420 if sm.nextCheckpoint != nil {
1421 sm.resetHeaderState(&best.Hash, best.Height)
1422 }
1423 } else {
1424 I.Ln("checkpoints are disabled")
1425 }
1426 sm.chain.Subscribe(sm.handleBlockchainNotification)
1427 return &sm, nil
1428 }
1429