manager.go raw

   1  package netsync
   2  
   3  import (
   4  	"container/list"
   5  	"fmt"
   6  	"net"
   7  	"sync"
   8  	"sync/atomic"
   9  	"time"
  10  
  11  	block2 "github.com/p9c/p9/pkg/block"
  12  
  13  	"github.com/p9c/p9/pkg/qu"
  14  
  15  	"github.com/p9c/p9/pkg/blockchain"
  16  	"github.com/p9c/p9/pkg/chaincfg"
  17  	"github.com/p9c/p9/pkg/chainhash"
  18  	"github.com/p9c/p9/pkg/database"
  19  	"github.com/p9c/p9/pkg/mempool"
  20  	peerpkg "github.com/p9c/p9/pkg/peer"
  21  	"github.com/p9c/p9/pkg/util"
  22  	"github.com/p9c/p9/pkg/wire"
  23  )
  24  
  25  type (
  26  	// SyncManager is used to communicate block related messages with peers. The
  27  	// SyncManager is started as by executing Start() in a goroutine. Once started,
  28  	// it selects peers to sync from and starts the initial block download. Once the
  29  	// chain is in sync, the SyncManager handles incoming block and header
  30  	// notifications and relays announcements of new blocks to peers.
  31  	SyncManager struct {
  32  		peerNotifier   PeerNotifier
  33  		started        int32
  34  		shutdown       int32
  35  		chain          *blockchain.BlockChain
  36  		txMemPool      *mempool.TxPool
  37  		chainParams    *chaincfg.Params
  38  		progressLogger *blockProgressLogger
  39  		msgChan        chan interface{}
  40  		wg             sync.WaitGroup
  41  		quit           qu.C
  42  		// These fields should only be accessed from the blockHandler thread
  43  		rejectedTxns    map[chainhash.Hash]struct{}
  44  		requestedTxns   map[chainhash.Hash]struct{}
  45  		requestedBlocks map[chainhash.Hash]struct{}
  46  		syncPeer        *peerpkg.Peer
  47  		peerStates      map[*peerpkg.Peer]*peerSyncState
  48  		// The following fields are used for headers-first mode.
  49  		headersFirstMode bool
  50  		headerList       *list.List
  51  		startHeader      *list.Element
  52  		nextCheckpoint   *chaincfg.Checkpoint
  53  		// An optional fee estimator.
  54  		feeEstimator *mempool.FeeEstimator
  55  	}
  56  	// blockMsg packages a bitcoin block message and the peer it came from together
  57  	// so the block handler has access to that information.
  58  	blockMsg struct {
  59  		block *block2.Block
  60  		peer  *peerpkg.Peer
  61  		reply qu.C
  62  	}
  63  	// donePeerMsg signifies a newly disconnected peer to the block handler.
  64  	donePeerMsg struct {
  65  		peer *peerpkg.Peer
  66  	}
  67  	// getSyncPeerMsg is a message type to be sent across the message channel for
  68  	// retrieving the current sync peer.
  69  	getSyncPeerMsg struct {
  70  		reply chan int32
  71  	}
  72  	// headerNode is used as a node in a list of headers that are linked together
  73  	// between checkpoints.
  74  	headerNode struct {
  75  		height int32
  76  		hash   *chainhash.Hash
  77  	}
  78  	// headersMsg packages a bitcoin headers message and the peer it came from
  79  	// together so the block handler has access to that information.
  80  	headersMsg struct {
  81  		headers *wire.MsgHeaders
  82  		peer    *peerpkg.Peer
  83  	}
  84  	// invMsg packages a bitcoin inv message and the peer it came from together so
  85  	// the block handler has access to that information.
  86  	invMsg struct {
  87  		inv  *wire.MsgInv
  88  		peer *peerpkg.Peer
  89  	}
  90  	// isCurrentMsg is a message type to be sent across the message channel for
  91  	// requesting whether or not the sync manager believes it is synced with the
  92  	// currently connected peers.
  93  	isCurrentMsg struct {
  94  		reply chan bool
  95  	}
  96  	// newPeerMsg signifies a newly connected peer to the block handler.
  97  	newPeerMsg struct {
  98  		peer *peerpkg.Peer
  99  	}
 100  	// pauseMsg is a message type to be sent across the message channel for pausing
 101  	// the sync manager. This effectively provides the caller with exclusive access
 102  	// over the manager until a receive is performed on the unpause channel.
 103  	pauseMsg struct {
 104  		unpause qu.C
 105  	}
 106  	// peerSyncState stores additional information that the SyncManager tracks about
 107  	// a peer.
 108  	peerSyncState struct {
 109  		syncCandidate   bool
 110  		requestQueue    []*wire.InvVect
 111  		requestedTxns   map[chainhash.Hash]struct{}
 112  		requestedBlocks map[chainhash.Hash]struct{}
 113  	}
 114  	// processBlockMsg is a message type to be sent across the message channel for
 115  	// requested a block is processed. Note this call differs from blockMsg above in
 116  	// that blockMsg is intended for blocks that came from peers and have extra
 117  	// handling whereas this message essentially is just a concurrent safe way to
 118  	// call ProcessBlock on the internal block chain instance.
 119  	processBlockMsg struct {
 120  		block *block2.Block
 121  		flags blockchain.BehaviorFlags
 122  		reply chan processBlockResponse
 123  	}
 124  	// processBlockResponse is a response sent to the reply channel of a processBlockMsg.
 125  	processBlockResponse struct {
 126  		isOrphan bool
 127  		err      error
 128  	}
 129  	// txMsg packages a bitcoin tx message and the peer it came from together so the
 130  	// block handler has access to that information.
 131  	txMsg struct {
 132  		tx    *util.Tx
 133  		peer  *peerpkg.Peer
 134  		reply qu.C
 135  	}
 136  )
 137  
 138  const (
 139  	// minInFlightBlocks is the minimum number of blocks that should be in the
 140  	// request queue for headers-first mode before requesting more.
 141  	minInFlightBlocks = 10
 142  	// maxRejectedTxns is the maximum number of rejected transactions hashes to
 143  	// store in memory.
 144  	maxRejectedTxns = 1000
 145  	// maxRequestedBlocks is the maximum number of requested block hashes to store
 146  	// in memory.
 147  	maxRequestedBlocks = wire.MaxInvPerMsg
 148  	// maxRequestedTxns is the maximum number of requested transactions hashes to
 149  	// store in memory.
 150  	maxRequestedTxns = wire.MaxInvPerMsg
 151  )
 152  
 153  // zeroHash is the zero value hash (all zeros)
 154  var zeroHash chainhash.Hash
 155  
 156  // DonePeer informs the blockmanager that a peer has disconnected.
 157  func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) {
 158  	// Ignore if we are shutting down.
 159  	if atomic.LoadInt32(&sm.shutdown) != 0 {
 160  		return
 161  	}
 162  	sm.msgChan <- &donePeerMsg{peer: peer}
 163  }
 164  
 165  // IsCurrent returns whether or not the sync manager believes it is synced with
 166  // the connected peers.
 167  func (sm *SyncManager) IsCurrent() bool {
 168  	reply := make(chan bool)
 169  	sm.msgChan <- isCurrentMsg{reply: reply}
 170  	return <-reply
 171  }
 172  
 173  // NewPeer informs the sync manager of a newly active peer.
 174  func (sm *SyncManager) NewPeer(peer *peerpkg.Peer) {
 175  	// Ignore if we are shutting down.
 176  	if atomic.LoadInt32(&sm.shutdown) != 0 {
 177  		return
 178  	}
 179  	sm.msgChan <- &newPeerMsg{peer: peer}
 180  }
 181  
 182  // Pause pauses the sync manager until the returned channel is closed.
 183  //
 184  // Note that while paused, all peer and block processing is halted. The message
 185  // sender should avoid pausing the sync manager for long durations.
 186  func (sm *SyncManager) Pause() chan<- struct{} {
 187  	c := qu.T()
 188  	sm.msgChan <- pauseMsg{c}
 189  	return c
 190  }
 191  
 192  // ProcessBlock makes use of ProcessBlock on an internal instance of a block
 193  // chain.
 194  func (sm *SyncManager) ProcessBlock(block *block2.Block, flags blockchain.BehaviorFlags) (bool, error) {
 195  	T.Ln("processing block")
 196  	// Traces(block)
 197  	reply := make(chan processBlockResponse, 1)
 198  	T.Ln("sending to msgChan")
 199  	sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}
 200  	T.Ln("waiting on reply")
 201  	response := <-reply
 202  	return response.isOrphan, response.err
 203  }
 204  
 205  // QueueBlock adds the passed block message and peer to the block handling
 206  // queue. Responds to the done channel argument after the block message is
 207  // processed.
 208  func (sm *SyncManager) QueueBlock(block *block2.Block, peer *peerpkg.Peer, done qu.C) {
 209  	// Don't accept more blocks if we're shutting down.
 210  	if atomic.LoadInt32(&sm.shutdown) != 0 {
 211  		done <- struct{}{}
 212  		return
 213  	}
 214  	sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
 215  }
 216  
 217  // QueueHeaders adds the passed headers message and peer to the block handling
 218  // queue.
 219  func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) {
 220  	// No channel handling here because peers do not need to block on headers
 221  	// messages.
 222  	if atomic.LoadInt32(&sm.shutdown) != 0 {
 223  		return
 224  	}
 225  	sm.msgChan <- &headersMsg{headers: headers, peer: peer}
 226  }
 227  
 228  // QueueInv adds the passed inv message and peer to the block handling queue.
 229  func (sm *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) {
 230  	// No channel handling here because peers do not need to block on inv messages.
 231  	if atomic.LoadInt32(&sm.shutdown) != 0 {
 232  		return
 233  	}
 234  	sm.msgChan <- &invMsg{inv: inv, peer: peer}
 235  }
 236  
 237  // QueueTx adds the passed transaction message and peer to the block handling
 238  // queue. Responds to the done channel argument after the tx message is
 239  // processed.
 240  func (sm *SyncManager) QueueTx(tx *util.Tx, peer *peerpkg.Peer, done qu.C) {
 241  	// Don't accept more transactions if we're shutting down.
 242  	if atomic.LoadInt32(&sm.shutdown) != 0 {
 243  		done <- struct{}{}
 244  		return
 245  	}
 246  	sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}
 247  }
 248  
 249  // Start begins the core block handler which processes block and inv messages.
 250  func (sm *SyncManager) Start() {
 251  	// Already started?
 252  	if atomic.AddInt32(&sm.started, 1) != 1 {
 253  		return
 254  	}
 255  	T.Ln("starting sync manager")
 256  	sm.wg.Add(1)
 257  	go sm.blockHandler(0)
 258  }
 259  
 260  // Stop gracefully shuts down the sync manager by stopping all asynchronous
 261  // handlers and waiting for them to finish.
 262  func (sm *SyncManager) Stop() (e error) {
 263  	if atomic.AddInt32(&sm.shutdown, 1) != 1 {
 264  		D.Ln("sync manager is already in the process of shutting down")
 265  		return nil
 266  	}
 267  	// DEBUG{"sync manager shutting down"}
 268  	sm.quit.Q()
 269  	sm.wg.Wait()
 270  	return nil
 271  }
 272  
 273  // SyncPeerID returns the ID of the current sync peer, or 0 if there is none.
 274  func (sm *SyncManager) SyncPeerID() int32 {
 275  	reply := make(chan int32)
 276  	sm.msgChan <- getSyncPeerMsg{reply: reply}
 277  	return <-reply
 278  }
 279  
 280  // blockHandler is the main handler for the sync manager. It must be run as a
 281  // goroutine. It processes block and inv messages in a separate goroutine from
 282  // the peer handlers so the block (Block) messages are handled by a single
 283  // thread without needing to lock memory data structures. This is important
 284  // because the sync manager controls which blocks are needed and how the
 285  // fetching should proceed.
 286  func (sm *SyncManager) blockHandler(workerNumber uint32) {
 287  out:
 288  	for {
 289  		select {
 290  		case m := <-sm.msgChan:
 291  			switch msg := m.(type) {
 292  			case *newPeerMsg:
 293  				sm.handleNewPeerMsg(msg.peer)
 294  			case *txMsg:
 295  				sm.handleTxMsg(msg)
 296  				msg.reply <- struct{}{}
 297  			case *blockMsg:
 298  				sm.handleBlockMsg(0, msg)
 299  				msg.reply <- struct{}{}
 300  			case *invMsg:
 301  				sm.handleInvMsg(msg)
 302  			case *headersMsg:
 303  				sm.handleHeadersMsg(msg)
 304  			case *donePeerMsg:
 305  				sm.handleDonePeerMsg(msg.peer)
 306  			case getSyncPeerMsg:
 307  				var peerID int32
 308  				if sm.syncPeer != nil {
 309  					peerID = sm.syncPeer.ID()
 310  				}
 311  				msg.reply <- peerID
 312  			case processBlockMsg:
 313  				T.Ln("received processBlockMsg")
 314  				var heightUpdate int32
 315  				header := &msg.block.WireBlock().Header
 316  				T.Ln("checking if have should have serialized block height")
 317  				if blockchain.ShouldHaveSerializedBlockHeight(header) {
 318  					T.Ln("reading coinbase transaction")
 319  					mbt := msg.block.Transactions()
 320  					if len(mbt) > 0 {
 321  						coinbaseTx := mbt[len(mbt)-1]
 322  						T.Ln("extracting coinbase height")
 323  						var e error
 324  						var cbHeight int32
 325  						if cbHeight, e = blockchain.ExtractCoinbaseHeight(coinbaseTx); E.Chk(e) {
 326  							W.Ln("unable to extract height from coinbase tx:", e)
 327  						} else {
 328  							heightUpdate = cbHeight
 329  						}
 330  					} else {
 331  						D.Ln("no transactions in block??")
 332  					}
 333  				}
 334  				T.Ln("passing to chain.ProcessBlock")
 335  				var isOrphan bool
 336  				var e error
 337  				if _, isOrphan, e = sm.chain.ProcessBlock(
 338  					workerNumber,
 339  					msg.block,
 340  					msg.flags,
 341  					heightUpdate,
 342  				); D.Chk(e) {
 343  					D.Ln("error processing new block ", e)
 344  					msg.reply <- processBlockResponse{
 345  						isOrphan: false,
 346  						err:      e,
 347  					}
 348  				}
 349  				T.Ln("sending back message on reply channel")
 350  				msg.reply <- processBlockResponse{
 351  					isOrphan: isOrphan,
 352  					err:      nil,
 353  				}
 354  				T.Ln("sent reply")
 355  			case isCurrentMsg:
 356  				msg.reply <- sm.current()
 357  			case pauseMsg:
 358  				// Wait until the sender unpauses the manager.
 359  				<-msg.unpause
 360  			default:
 361  				T.F("invalid message type in block handler: %Ter", msg)
 362  			}
 363  		case <-sm.quit.Wait():
 364  			break out
 365  		}
 366  	}
 367  	sm.wg.Done()
 368  }
 369  
 370  // current returns true if we believe we are synced with our peers, false if we
 371  // still have blocks to check
 372  func (sm *SyncManager) current() bool {
 373  	if !sm.chain.IsCurrent() {
 374  		return false
 375  	}
 376  	// if blockChain thinks we are current and we have no syncPeer it is probably
 377  	// right.
 378  	if sm.syncPeer == nil {
 379  		return true
 380  	}
 381  	// No matter what chain thinks, if we are below the block we are syncing to we
 382  	// are not current.
 383  	if sm.chain.BestSnapshot().Height < sm.syncPeer.LastBlock() {
 384  		return false
 385  	}
 386  	return true
 387  }
 388  
 389  // fetchHeaderBlocks creates and sends a request to the syncPeer for the next
 390  // list of blocks to be downloaded based on the current list of headers.
 391  func (sm *SyncManager) fetchHeaderBlocks() {
 392  	// Nothing to do if there is no start header.
 393  	if sm.startHeader == nil {
 394  		D.Ln("fetchHeaderBlocks called with no start header")
 395  		return
 396  	}
 397  	// Build up a getdata request for the list of blocks the headers describe. The
 398  	// size hint will be limited to wire.MaxInvPerMsg by the function, so no need to
 399  	// double check it here.
 400  	gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len()))
 401  	numRequested := 0
 402  	var sh *list.Element
 403  	for sh = sm.startHeader; sh != nil; sh = sh.Next() {
 404  		node, ok := sh.Value.(*headerNode)
 405  		if !ok {
 406  			D.Ln("header list node type is not a headerNode")
 407  			continue
 408  		}
 409  		iv := wire.NewInvVect(wire.InvTypeBlock, node.hash)
 410  		haveInv, e := sm.haveInventory(iv)
 411  		if e != nil {
 412  			T.Ln(
 413  				"unexpected failure when checking for existing inventory during header block fetch:",
 414  				e,
 415  			)
 416  		}
 417  		var ee error
 418  		if !haveInv {
 419  			syncPeerState := sm.peerStates[sm.syncPeer]
 420  			sm.requestedBlocks[*node.hash] = struct{}{}
 421  			syncPeerState.requestedBlocks[*node.hash] = struct{}{}
 422  			// If we're fetching from a witness enabled peer post-fork, then ensure that we
 423  			// receive all the witness data in the blocks.
 424  			// if sm.syncPeer.IsWitnessEnabled() {
 425  			// 	iv.Type = wire.InvTypeWitnessBlock
 426  			// }
 427  			ee = gdmsg.AddInvVect(iv)
 428  			if ee != nil {
 429  				D.Ln(ee)
 430  			}
 431  			numRequested++
 432  		}
 433  		sm.startHeader = sh.Next()
 434  		if numRequested >= wire.MaxInvPerMsg {
 435  			break
 436  		}
 437  	}
 438  	if len(gdmsg.InvList) > 0 {
 439  		sm.syncPeer.QueueMessage(gdmsg, nil)
 440  	}
 441  }
 442  
 443  // findNextHeaderCheckpoint returns the next checkpoint after the passed height.
 444  // It returns nil when there is not one either because the height is already
 445  // later than the final checkpoint or some other reason such as disabled
 446  // checkpoints.
 447  func (sm *SyncManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoint {
 448  	checkpoints := sm.chain.Checkpoints()
 449  	if len(checkpoints) == 0 {
 450  		return nil
 451  	}
 452  	// There is no next checkpoint if the height is already after the final checkpoint.
 453  	finalCheckpoint := &checkpoints[len(checkpoints)-1]
 454  	if height >= finalCheckpoint.Height {
 455  		return nil
 456  	}
 457  	// Find the next checkpoint.
 458  	nextCheckpoint := finalCheckpoint
 459  	for i := len(checkpoints) - 2; i >= 0; i-- {
 460  		if height >= checkpoints[i].Height {
 461  			break
 462  		}
 463  		nextCheckpoint = &checkpoints[i]
 464  	}
 465  	return nextCheckpoint
 466  }
 467  
 468  // handleBlockMsg handles block messages from all peers.
 469  func (sm *SyncManager) handleBlockMsg(workerNumber uint32, bmsg *blockMsg) {
 470  	pp := bmsg.peer
 471  	state, exists := sm.peerStates[pp]
 472  	if !exists {
 473  		T.Ln(
 474  			"received block message from unknown peer", pp,
 475  		)
 476  		return
 477  	}
 478  	// If we didn't ask for this block then the peer is misbehaving.
 479  	blockHash := bmsg.block.Hash()
 480  	if _, exists = state.requestedBlocks[*blockHash]; !exists {
 481  		// The regression test intentionally sends some blocks twice to test duplicate
 482  		// block insertion fails. Don't disconnect the peer or ignore the block when
 483  		// we're in regression test mode in this case so the chain code is actually fed
 484  		// the duplicate blocks.
 485  		if sm.chainParams != &chaincfg.RegressionTestParams {
 486  			W.C(
 487  				func() string {
 488  					return fmt.Sprintf(
 489  						"got unrequested block %v from %s -- disconnecting",
 490  						blockHash,
 491  						pp.Addr(),
 492  					)
 493  				},
 494  			)
 495  			pp.Disconnect()
 496  			return
 497  		}
 498  	}
 499  	// When in headers-first mode, if the block matches the hash of the first header
 500  	// in the list of headers that are being fetched, it's eligible for less
 501  	// validation since the headers have already been verified to link together and
 502  	// are valid up to the next checkpoint. Also, remove the list entry for all
 503  	// blocks except the checkpoint since it is needed to verify the next round of
 504  	// headers links properly.
 505  	isCheckpointBlock := false
 506  	behaviorFlags := blockchain.BFNone
 507  	if sm.headersFirstMode {
 508  		firstNodeEl := sm.headerList.Front()
 509  		if firstNodeEl != nil {
 510  			firstNode := firstNodeEl.Value.(*headerNode)
 511  			if blockHash.IsEqual(firstNode.hash) {
 512  				behaviorFlags |= blockchain.BFFastAdd
 513  				if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) {
 514  					isCheckpointBlock = true
 515  				} else {
 516  					sm.headerList.Remove(firstNodeEl)
 517  				}
 518  			}
 519  		}
 520  	}
 521  	// Remove block from request maps. Either chain will know about it and so we
 522  	// shouldn't have any more instances of trying to fetch it, or we will fail the
 523  	// insert and thus we'll retry next time we get an inv.
 524  	delete(state.requestedBlocks, *blockHash)
 525  	delete(sm.requestedBlocks, *blockHash)
 526  	var heightUpdate int32
 527  	var blkHashUpdate *chainhash.Hash
 528  	header := &bmsg.block.WireBlock().Header
 529  	if blockchain.ShouldHaveSerializedBlockHeight(header) {
 530  		coinbaseTx := bmsg.block.Transactions()[0]
 531  		cbHeight, e := blockchain.ExtractCoinbaseHeight(coinbaseTx)
 532  		if e != nil {
 533  			T.F(
 534  				"unable to extract height from coinbase tx: %v",
 535  				e,
 536  			)
 537  		} else {
 538  			heightUpdate = cbHeight
 539  			blkHashUpdate = blockHash
 540  		}
 541  	}
 542  	D.Ln("current best height", sm.chain.BestChain.Height())
 543  	_, isOrphan, e := sm.chain.ProcessBlock(
 544  		workerNumber, bmsg.block,
 545  		behaviorFlags, heightUpdate,
 546  	)
 547  	if e != nil {
 548  		if heightUpdate+1 <= sm.chain.BestChain.Height() {
 549  			// Process the block to include validation, best chain selection, orphan handling, etc.
 550  			// When the error is a rule error, it means the block was simply rejected as
 551  			// opposed to something actually going wrong, so log it as such. Otherwise,
 552  			// something really did go wrong, so log it as an actual error.
 553  			// Convert the error into an appropriate reject message and send it.
 554  			if _, ok := e.(blockchain.RuleError); ok {
 555  				E.F(
 556  					"rejected block %v from %s: %v",
 557  					blockHash, pp, e,
 558  				)
 559  			} else {
 560  				E.F("failed to process block %v: %v", blockHash, e)
 561  			}
 562  			if dbErr, ok := e.(database.DBError); ok && dbErr.ErrorCode ==
 563  				database.ErrCorruption {
 564  				panic(dbErr)
 565  			}
 566  			code, reason := mempool.ErrToRejectErr(e)
 567  			pp.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false)
 568  			return
 569  		} else {
 570  			isOrphan=true
 571  		}
 572  	}
 573  	// Meta-data about the new block this peer is reporting. We use this below to
 574  	// update this peer's lastest block height and the heights of other peers based
 575  	// on their last announced block hash. This allows us to dynamically update the
 576  	// block heights of peers, avoiding stale heights when looking for a new sync
 577  	// peer. Upon acceptance of a block or recognition of an orphan, we also use
 578  	// this information to update the block heights over other peers who's invs may
 579  	// have been ignored if we are actively syncing while the chain is not yet
 580  	// current or who may have lost the lock announcment race. Request the parents
 581  	// for the orphan block from the peer that sent it.
 582  	if isOrphan {
 583  		// We've just received an orphan block from a peer. In order to update the
 584  		// height of the peer, we try to extract the block height from the scriptSig of
 585  		// the coinbase transaction. Extraction is only attempted if the block's version
 586  		// is high enough (ver 2+).
 587  		header := &bmsg.block.WireBlock().Header
 588  		if blockchain.ShouldHaveSerializedBlockHeight(header) {
 589  			coinbaseTx := bmsg.block.Transactions()[0]
 590  			var cbHeight int32
 591  			cbHeight, e = blockchain.ExtractCoinbaseHeight(coinbaseTx)
 592  			if e != nil {
 593  				E.F("unable to extract height from coinbase tx: %v", e)
 594  			} else {
 595  				D.F(
 596  					"extracted height of %v from orphan block",
 597  					cbHeight,
 598  				)
 599  				heightUpdate = cbHeight
 600  				blkHashUpdate = blockHash
 601  			}
 602  		}
 603  		orphanRoot := sm.chain.GetOrphanRoot(blockHash)
 604  		var locator blockchain.BlockLocator
 605  		locator, e = sm.chain.LatestBlockLocator()
 606  		if e != nil {
 607  			E.F(
 608  				"failed to get block locator for the latest block: %v",
 609  				e,
 610  			)
 611  		} else {
 612  			e = pp.PushGetBlocksMsg(locator, orphanRoot)
 613  			if e != nil {
 614  			}
 615  		}
 616  	} else {
 617  		// When the block is not an orphan, log information about it and update the
 618  		// chain state.
 619  		sm.progressLogger.LogBlockHeight(bmsg.block)
 620  		// Update this peer's latest block height, for future potential sync node
 621  		// candidacy.
 622  		best := sm.chain.BestSnapshot()
 623  		heightUpdate = best.Height
 624  		blkHashUpdate = &best.Hash
 625  		// Clear the rejected transactions.
 626  		sm.rejectedTxns = make(map[chainhash.Hash]struct{})
 627  	}
 628  	// Update the block height for this peer. But only send a message to the server
 629  	// for updating peer heights if this is an orphan or our chain is "current".
 630  	// This avoids sending a spammy amount of messages if we're syncing the chain
 631  	// from scratch.
 632  	if blkHashUpdate != nil && heightUpdate != 0 {
 633  		pp.UpdateLastBlockHeight(heightUpdate)
 634  		if isOrphan || sm.current() {
 635  			go sm.peerNotifier.UpdatePeerHeights(
 636  				blkHashUpdate, heightUpdate,
 637  				pp,
 638  			)
 639  		}
 640  	}
 641  	// Nothing more to do if we aren't in headers-first mode.
 642  	if !sm.headersFirstMode {
 643  		return
 644  	}
 645  	// This is headers-first mode, so if the block is not a checkpoint request more
 646  	// blocks using the header list when the request queue is getting short.
 647  	if !isCheckpointBlock {
 648  		if sm.startHeader != nil &&
 649  			len(state.requestedBlocks) < minInFlightBlocks {
 650  			sm.fetchHeaderBlocks()
 651  		}
 652  		return
 653  	}
 654  	// This is headers-first mode and the block is a checkpoint. When there is a
 655  	// next checkpoint, get the next round of headers by asking for headers starting
 656  	// from the block after this one up to the next checkpoint.
 657  	prevHeight := sm.nextCheckpoint.Height
 658  	prevHash := sm.nextCheckpoint.Hash
 659  	sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight)
 660  	if sm.nextCheckpoint != nil {
 661  		locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash})
 662  		e = pp.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
 663  		if e != nil {
 664  			E.F(
 665  				"failed to send getheaders message to peer %s: %v",
 666  				pp.Addr(), e,
 667  			)
 668  			return
 669  		}
 670  		I.F(
 671  			"downloading headers for blocks %d to %d from peer %s",
 672  			prevHeight+1, sm.nextCheckpoint.Height, sm.syncPeer.Addr(),
 673  		)
 674  		return
 675  	}
 676  	// This is headers-first mode, the block is a checkpoint, and there are no more
 677  	// checkpoints, so switch to normal mode by requesting blocks from the block
 678  	// after this one up to the end of the chain (zero hash).
 679  	sm.headersFirstMode = false
 680  	sm.headerList.Init()
 681  	I.Ln(
 682  		"reached the final checkpoint -- switching to normal mode",
 683  	)
 684  	locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash})
 685  	e = pp.PushGetBlocksMsg(locator, &zeroHash)
 686  	if e != nil {
 687  		E.Ln(
 688  			"failed to send getblocks message to peer", pp, ":", e,
 689  		)
 690  		return
 691  	}
 692  }
 693  
 694  // handleBlockchainNotification handles notifications from blockchain. It does
 695  // things such as request orphan block parents and relay accepted blocks to
 696  // connected peers.
 697  func (sm *SyncManager) handleBlockchainNotification(notification *blockchain.Notification) {
 698  	switch notification.Type {
 699  	// A block has been accepted into the block chain. Relay it to other peers.
 700  	case blockchain.NTBlockAccepted:
 701  		// Don't relay if we are not current. Other peers that are current should
 702  		// already know about it.
 703  		if !sm.current() {
 704  			return
 705  		}
 706  		block, ok := notification.Data.(*block2.Block)
 707  		if !ok {
 708  			D.Ln("chain accepted notification is not a block")
 709  			break
 710  		}
 711  		// Generate the inventory vector and relay it.
 712  		iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
 713  		sm.peerNotifier.RelayInventory(iv, block.WireBlock().Header)
 714  	// A block has been connected to the main block chain.
 715  	case blockchain.NTBlockConnected:
 716  		block, ok := notification.Data.(*block2.Block)
 717  		if !ok {
 718  			D.Ln("chain connected notification is not a block")
 719  			break
 720  		}
 721  		// Remove all of the transactions (except the coinbase) in the connected block
 722  		// from the transaction pool. Secondly, remove any transactions which are now
 723  		// double spends as a result of these new transactions. Finally, remove any
 724  		// transaction that is no longer an orphan. Transactions which depend on a
 725  		// confirmed transaction are NOT removed recursively because they are still
 726  		// valid.
 727  		for _, tx := range block.Transactions()[1:] {
 728  			sm.txMemPool.RemoveTransaction(tx, false)
 729  			sm.txMemPool.RemoveDoubleSpends(tx)
 730  			sm.txMemPool.RemoveOrphan(tx)
 731  			sm.peerNotifier.TransactionConfirmed(tx)
 732  			acceptedTxs := sm.txMemPool.ProcessOrphans(sm.chain, tx)
 733  			sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
 734  		}
 735  		// Register block with the fee estimator, if it exists.
 736  		if sm.feeEstimator != nil {
 737  			e := sm.feeEstimator.RegisterBlock(block)
 738  			// If an error is somehow generated then the fee estimator has entered an
 739  			// invalid state. Since it doesn't know how to recover, create a new one.
 740  			if e != nil {
 741  				sm.feeEstimator = mempool.NewFeeEstimator(
 742  					mempool.DefaultEstimateFeeMaxRollback,
 743  					mempool.DefaultEstimateFeeMinRegisteredBlocks,
 744  				)
 745  			}
 746  		}
 747  	// A block has been disconnected from the main block chain.
 748  	case blockchain.NTBlockDisconnected:
 749  		block, ok := notification.Data.(*block2.Block)
 750  		if !ok {
 751  			D.Ln("chain disconnected notification is not a block.")
 752  			break
 753  		}
 754  		// Reinsert all of the transactions (except the coinbase) into the transaction pool.
 755  		for _, tx := range block.Transactions()[1:] {
 756  			var ee error
 757  			_, _, ee = sm.txMemPool.MaybeAcceptTransaction(
 758  				sm.chain, tx,
 759  				false, false,
 760  			)
 761  			if ee != nil {
 762  				// Remove the transaction and all transactions that depend on it if it wasn't
 763  				// accepted into the transaction pool.
 764  				sm.txMemPool.RemoveTransaction(tx, true)
 765  			}
 766  		}
 767  		// Rollback previous block recorded by the fee estimator.
 768  		if sm.feeEstimator != nil {
 769  			e := sm.feeEstimator.Rollback(block.Hash())
 770  			if e != nil {
 771  			}
 772  		}
 773  	}
 774  }
 775  
 776  // handleDonePeerMsg deals with peers that have signalled they are done. It
 777  // removes the peer as a candidate for syncing and in the case where it was the
 778  // current sync peer, attempts to select a new best peer to sync from. It is
 779  // invoked from the syncHandler goroutine.
 780  func (sm *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) {
 781  	state, exists := sm.peerStates[peer]
 782  	if !exists {
 783  		T.Ln("received done peer message for unknown peer", peer)
 784  		return
 785  	}
 786  	// Remove the peer from the list of candidate peers.
 787  	delete(sm.peerStates, peer)
 788  	T.Ln("lost peer ", peer)
 789  	// Remove requested transactions from the global map so that they will be
 790  	// fetched from elsewhere next time we get an inv.
 791  	for txHash := range state.requestedTxns {
 792  		delete(sm.requestedTxns, txHash)
 793  	}
 794  	// Remove requested blocks from the global map so that they will be fetched from
 795  	// elsewhere next time we get an inv.
 796  	//
 797  	// TODO: we could possibly here check which peers have these blocks and request them now to speed things up a little.
 798  	for blockHash := range state.requestedBlocks {
 799  		delete(sm.requestedBlocks, blockHash)
 800  	}
 801  	// Attempt to find a new peer to sync from if the quitting peer is the sync
 802  	// peer. Also, reset the headers-first state if in headers-first mode so
 803  	if sm.syncPeer == peer {
 804  		sm.syncPeer = nil
 805  		if sm.headersFirstMode {
 806  			best := sm.chain.BestSnapshot()
 807  			sm.resetHeaderState(&best.Hash, best.Height)
 808  		}
 809  		sm.startSync()
 810  	}
 811  }
 812  
 813  // handleHeadersMsg handles block header messages from all peers. Headers are
 814  // requested when performing a headers-first sync.
 815  func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
 816  	peer := hmsg.peer
 817  	_, exists := sm.peerStates[peer]
 818  	if !exists {
 819  		T.Ln("received headers message from unknown peer", peer)
 820  		return
 821  	}
 822  	// The remote peer is misbehaving if we didn't request headers.
 823  	msg := hmsg.headers
 824  	numHeaders := len(msg.Headers)
 825  	if !sm.headersFirstMode {
 826  		T.F(
 827  			"got %d unrequested headers from %s -- disconnecting",
 828  			numHeaders, peer,
 829  		)
 830  		peer.Disconnect()
 831  		return
 832  	}
 833  	// Nothing to do for an empty headers message.
 834  	if numHeaders == 0 {
 835  		return
 836  	}
 837  	// Process all of the received headers ensuring each one connects to the
 838  	// previous and that checkpoints match.
 839  	receivedCheckpoint := false
 840  	var finalHash *chainhash.Hash
 841  	for _, blockHeader := range msg.Headers {
 842  		blockHash := blockHeader.BlockHash()
 843  		finalHash = &blockHash
 844  		// Ensure there is a previous header to compare against.
 845  		prevNodeEl := sm.headerList.Back()
 846  		if prevNodeEl == nil {
 847  			W.Ln(
 848  				"header list does not contain a previous element as expected -- disconnecting peer",
 849  			)
 850  			peer.Disconnect()
 851  			return
 852  		}
 853  		// Ensure the header properly connects to the previous one and add it to the
 854  		// list of headers.
 855  		node := headerNode{hash: &blockHash}
 856  		prevNode := prevNodeEl.Value.(*headerNode)
 857  		if prevNode.hash.IsEqual(&blockHeader.PrevBlock) {
 858  			node.height = prevNode.height + 1
 859  			e := sm.headerList.PushBack(&node)
 860  			if sm.startHeader == nil {
 861  				sm.startHeader = e
 862  			}
 863  		} else {
 864  			T.Ln(
 865  				"received block header that does not properly connect to the chain from peer",
 866  				peer,
 867  				"-- disconnecting",
 868  			)
 869  			peer.Disconnect()
 870  			return
 871  		}
 872  		// Verify the header at the next checkpoint height matches.
 873  		if node.height == sm.nextCheckpoint.Height {
 874  			if node.hash.IsEqual(sm.nextCheckpoint.Hash) {
 875  				receivedCheckpoint = true
 876  				I.F(
 877  					"verified downloaded block header against checkpoint at height %d/hash %s",
 878  					node.height,
 879  					node.hash,
 880  				)
 881  			} else {
 882  				T.F(
 883  					"block header at height %d/hash %s from peer %s does NOT match expected checkpoint hash of"+
 884  						" %s -- disconnecting",
 885  					node.height,
 886  					node.hash,
 887  					peer,
 888  					sm.nextCheckpoint.Hash,
 889  				)
 890  				peer.Disconnect()
 891  				return
 892  			}
 893  			break
 894  		}
 895  	}
 896  	// When this header is a checkpoint, switch to fetching the blocks for all of
 897  	// the headers since the last checkpoint.
 898  	if receivedCheckpoint {
 899  		// Since the first entry of the list is always the final block that is already
 900  		// in the database and is only used to ensure the next header links properly, it
 901  		// must be removed before fetching the blocks.
 902  		sm.headerList.Remove(sm.headerList.Front())
 903  		I.F(
 904  			"received %v block headers: Fetching blocks",
 905  			sm.headerList.Len(),
 906  		)
 907  		sm.progressLogger.SetLastLogTime(time.Now())
 908  		sm.fetchHeaderBlocks()
 909  		return
 910  	}
 911  	// This header is not a checkpoint, so request the next batch of headers
 912  	// starting from the latest known header and ending with the next checkpoint.
 913  	locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash})
 914  	e := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
 915  	if e != nil {
 916  		E.F(
 917  			"failed to send getheaders message to peer %s: %v", peer,
 918  			e,
 919  		)
 920  		return
 921  	}
 922  }
 923  
 924  // handleInvMsg handles inv messages from all peers. We examine the inventory
 925  // advertised by the remote peer and act accordingly.
 926  func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
 927  	peer := imsg.peer
 928  	state, exists := sm.peerStates[peer]
 929  	if !exists {
 930  		T.Ln("received inv message from unknown peer", peer)
 931  		return
 932  	}
 933  	// Attempt to find the final block in the inventory list.  There may not be one.
 934  	lastBlock := -1
 935  	invVects := imsg.inv.InvList
 936  	for i := len(invVects) - 1; i >= 0; i-- {
 937  		if invVects[i].Type == wire.InvTypeBlock {
 938  			lastBlock = i
 939  			break
 940  		}
 941  	}
 942  	// If this inv contains a block announcement, and this isn't coming from our
 943  	// current sync peer or we're current, then update the last announced block for
 944  	// this peer. We'll use this information later to update the heights of peers
 945  	// based on blocks we've accepted that they previously announced.
 946  	if lastBlock != -1 && (peer != sm.syncPeer || sm.current()) {
 947  		peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash)
 948  	}
 949  	// Ignore invs from peers that aren't the sync if we are not current. Helps prevent fetching a mass of orphans.
 950  	if peer != sm.syncPeer && !sm.current() {
 951  		return
 952  	}
 953  	// If our chain is current and a peer announces a block we already know of, then update their current block height.
 954  	if lastBlock != -1 && sm.current() {
 955  		blkHeight, e := sm.chain.BlockHeightByHash(&invVects[lastBlock].Hash)
 956  		if e == nil {
 957  			peer.UpdateLastBlockHeight(blkHeight)
 958  		}
 959  	}
 960  	// Request the advertised inventory if we don't already have it. Also, request
 961  	// parent blocks of orphans if we receive one we already have. Finally, attempt
 962  	// to detect potential stalls due to long side chains we already have and
 963  	// request more blocks to prevent them.
 964  	for i, iv := range invVects {
 965  		// Ignore unsupported inventory types.
 966  		switch iv.Type {
 967  		case wire.InvTypeBlock:
 968  		case wire.InvTypeTx:
 969  		// case wire.InvTypeWitnessBlock:
 970  		// case wire.InvTypeWitnessTx:
 971  		default:
 972  			continue
 973  		}
 974  		// Add the inventory to the cache of known inventory for the peer.
 975  		peer.AddKnownInventory(iv)
 976  		// Ignore inventory when we're in headers-first mode.
 977  		if sm.headersFirstMode {
 978  			continue
 979  		}
 980  		// Request the inventory if we don't already have it.
 981  		haveInv, e := sm.haveInventory(iv)
 982  		if e != nil {
 983  			E.Ln("unexpected failure when checking for existing inventory during inv message processing:", e)
 984  			continue
 985  		}
 986  		if !haveInv {
 987  			if iv.Type == wire.InvTypeTx {
 988  				// Skip the transaction if it has already been rejected.
 989  				if _, exists := sm.rejectedTxns[iv.Hash]; exists {
 990  					continue
 991  				}
 992  			}
 993  			// Ignore invs block invs from non-witness enabled peers, as after segwit
 994  			// activation we only want to download from peers that can provide us full
 995  			// witness data for blocks. PARALLELCOIN HAS NO WITNESS STUFF if
 996  			// !peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock {
 997  			// 	continue
 998  			// }
 999  			// Add it to the request queue.
1000  			state.requestQueue = append(state.requestQueue, iv)
1001  			continue
1002  		}
1003  		if iv.Type == wire.InvTypeBlock {
1004  			// The block is an orphan block that we already have. When the existing orphan
1005  			// was processed, it requested the missing parent blocks. When this scenario
1006  			// happens, it means there were more blocks missing than are allowed into a
1007  			// single inventory message. As a result, once this peer requested the final
1008  			// advertised block, the remote peer noticed and is now resending the orphan
1009  			// block as an available block to signal there are more missing blocks that need
1010  			// to be requested.
1011  			if sm.chain.IsKnownOrphan(&iv.Hash) {
1012  				// Request blocks starting at the latest known up to the root of the orphan that just came in.
1013  				orphanRoot := sm.chain.GetOrphanRoot(&iv.Hash)
1014  				locator, e := sm.chain.LatestBlockLocator()
1015  				if e != nil {
1016  					E.Ln("failed to get block locator for the latest block:", e)
1017  					continue
1018  				}
1019  				e = peer.PushGetBlocksMsg(locator, orphanRoot)
1020  				if e != nil {
1021  				}
1022  				continue
1023  			}
1024  			// We already have the final block advertised by this inventory message, so
1025  			// force a request for more. This should only happen if we're on a really long
1026  			// side chain.
1027  			if i == lastBlock {
1028  				// Request blocks after this one up to the final one the remote peer knows about
1029  				// (zero stop hash).
1030  				locator := sm.chain.BlockLocatorFromHash(&iv.Hash)
1031  				e := peer.PushGetBlocksMsg(locator, &zeroHash)
1032  				if e != nil {
1033  				}
1034  			}
1035  		}
1036  	}
1037  	// Request as much as possible at once. Anything that won't fit into request
1038  	// will be requested on the next inv message.
1039  	numRequested := 0
1040  	gdmsg := wire.NewMsgGetData()
1041  	requestQueue := state.requestQueue
1042  	for len(requestQueue) != 0 {
1043  		iv := requestQueue[0]
1044  		requestQueue[0] = nil
1045  		requestQueue = requestQueue[1:]
1046  		switch iv.Type {
1047  		// case wire.InvTypeWitnessBlock:
1048  		// 	fallthrough
1049  		case wire.InvTypeBlock:
1050  			// Request the block if there is not already a pending request.
1051  			if _, exists := sm.requestedBlocks[iv.Hash]; !exists {
1052  				sm.requestedBlocks[iv.Hash] = struct{}{}
1053  				sm.limitMap(sm.requestedBlocks, maxRequestedBlocks)
1054  				state.requestedBlocks[iv.Hash] = struct{}{}
1055  				// if peer.IsWitnessEnabled() {
1056  				// 	iv.Type = wire.InvTypeWitnessBlock
1057  				// }
1058  				e := gdmsg.AddInvVect(iv)
1059  				if e != nil {
1060  				}
1061  				numRequested++
1062  			}
1063  		// case wire.InvTypeWitnessTx:
1064  		// 	fallthrough
1065  		case wire.InvTypeTx:
1066  			// Request the transaction if there is not already a pending request.
1067  			if _, exists := sm.requestedTxns[iv.Hash]; !exists {
1068  				sm.requestedTxns[iv.Hash] = struct{}{}
1069  				sm.limitMap(sm.requestedTxns, maxRequestedTxns)
1070  				state.requestedTxns[iv.Hash] = struct{}{}
1071  				// If the peer is capable, request the txn including all witness
1072  				// data.
1073  				// if peer.IsWitnessEnabled() {
1074  				// 	iv.Type = wire.InvTypeWitnessTx
1075  				// }
1076  				e := gdmsg.AddInvVect(iv)
1077  				if e != nil {
1078  				}
1079  				numRequested++
1080  			}
1081  		}
1082  		if numRequested >= wire.MaxInvPerMsg {
1083  			break
1084  		}
1085  	}
1086  	state.requestQueue = requestQueue
1087  	if len(gdmsg.InvList) > 0 {
1088  		peer.QueueMessage(gdmsg, nil)
1089  	}
1090  }
1091  
1092  // handleNewPeerMsg deals with new peers that have signalled they may be
1093  // considered as a sync peer (they have already successfully negotiated). It
1094  // also starts syncing if needed. It is invoked from the syncHandler goroutine.
1095  func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
1096  	// Ignore if in the process of shutting down.
1097  	if atomic.LoadInt32(&sm.shutdown) != 0 {
1098  		return
1099  	}
1100  	T.F("new valid peer %s (%s)", peer, peer.UserAgent())
1101  	// Initialize the peer state
1102  	isSyncCandidate := sm.isSyncCandidate(peer)
1103  	if isSyncCandidate {
1104  		I.Ln(peer, "is a sync candidate")
1105  	}
1106  	sm.peerStates[peer] = &peerSyncState{
1107  		syncCandidate:   isSyncCandidate,
1108  		requestedTxns:   make(map[chainhash.Hash]struct{}),
1109  		requestedBlocks: make(map[chainhash.Hash]struct{}),
1110  	}
1111  	// Start syncing by choosing the best candidate if needed.
1112  	if isSyncCandidate && sm.syncPeer == nil {
1113  		sm.startSync()
1114  	}
1115  }
1116  
1117  // handleTxMsg handles transaction messages from all peers.
1118  func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
1119  	peer := tmsg.peer
1120  	state, exists := sm.peerStates[peer]
1121  	if !exists {
1122  		W.C(
1123  			func() string {
1124  				return "received tx message from unknown peer " +
1125  					peer.String()
1126  			},
1127  		)
1128  		return
1129  	}
1130  	// NOTE: BitcoinJ, and possibly other wallets, don't follow the spec of sending
1131  	// an inventory message and allowing the remote peer to decide whether or not
1132  	// they want to request the transaction via a getdata message. Unfortunately,
1133  	// the reference implementation permits unrequested data, so it has allowed
1134  	// wallets that don't follow the spec to proliferate. While this is not ideal,
1135  	// there is no check here to disconnect peers for sending unsolicited
1136  	// transactions to provide interoperability.
1137  	txHash := tmsg.tx.Hash()
1138  	// Ignore transactions that we have already rejected. Do not send a reject
1139  	// message here because if the transaction was already rejected, the transaction
1140  	// was unsolicited.
1141  	if _, exists = sm.rejectedTxns[*txHash]; exists {
1142  		D.C(
1143  			func() string {
1144  				return "ignoring unsolicited previously rejected transaction " +
1145  					txHash.String() + " from " + peer.String()
1146  			},
1147  		)
1148  		return
1149  	}
1150  	// Process the transaction to include validation, insertion in the memory pool,
1151  	// orphan handling, etc.
1152  	acceptedTxs, e := sm.txMemPool.ProcessTransaction(
1153  		sm.chain, tmsg.tx,
1154  		true, true, mempool.Tag(peer.ID()),
1155  	)
1156  	// Remove transaction from request maps. Either the mempool/chain already knows
1157  	// about it and as such we shouldn't have any more instances of trying to fetch
1158  	// it, or we failed to insert and thus we'll retry next time we get an inv.
1159  	delete(state.requestedTxns, *txHash)
1160  	delete(sm.requestedTxns, *txHash)
1161  	if e != nil {
1162  		// Do not request this transaction again until a new block has been processed.
1163  		sm.rejectedTxns[*txHash] = struct{}{}
1164  		sm.limitMap(sm.rejectedTxns, maxRejectedTxns)
1165  		// When the error is a rule error, it means the transaction was simply rejected
1166  		// as opposed to something actually going wrong, so log it as such. Otherwise,
1167  		// something really did go wrong, so log it as an actual error.
1168  		if _, ok := e.(mempool.RuleError); ok {
1169  			D.F(
1170  				"rejected transaction %v from %s: %v",
1171  				txHash,
1172  				peer,
1173  				e,
1174  			)
1175  		} else {
1176  			E.F(
1177  				"failed to process transaction %v: %v",
1178  				txHash,
1179  				e,
1180  			)
1181  		}
1182  		// Convert the error into an appropriate reject message and send it.
1183  		code, reason := mempool.ErrToRejectErr(e)
1184  		peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, false)
1185  		return
1186  	}
1187  	sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
1188  }
1189  
1190  // haveInventory returns whether or not the inventory represented by the passed
1191  // inventory vector is known. This includes checking all of the various places
1192  // inventory can be when it is in different states such as blocks that are part
1193  // of the main chain, on a side chain, in the orphan pool, and transactions that
1194  // are in the memory pool (either the main pool or orphan pool).
1195  func (sm *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) {
1196  	switch invVect.Type {
1197  	// case wire.InvTypeWitnessBlock:
1198  	// 	fallthrough
1199  	case wire.InvTypeBlock:
1200  		// Ask chain if the block is known to it in any form (main chain, side chain, or
1201  		// orphan).
1202  		return sm.chain.HaveBlock(&invVect.Hash)
1203  	// case wire.InvTypeWitnessTx:
1204  	// 	fallthrough
1205  	case wire.InvTypeTx:
1206  		// Ask the transaction memory pool if the transaction is known to it in any form
1207  		// (main pool or orphan).
1208  		if sm.txMemPool.HaveTransaction(&invVect.Hash) {
1209  			return true, nil
1210  		}
1211  		// Chk if the transaction exists from the point of view of the end of the main
1212  		// chain. Note that this is only a best effort since it is expensive to check
1213  		// existence of every output and the only purpose of this check is to avoid
1214  		// downloading already known transactions. Only the first two outputs are
1215  		// checked because the vast majority of transactions consist of two outputs
1216  		// where one is some form of "pay-to-somebody-else" and the other is a change
1217  		// output.
1218  		prevOut := wire.OutPoint{Hash: invVect.Hash}
1219  		for i := uint32(0); i < 2; i++ {
1220  			prevOut.Index = i
1221  			entry, e := sm.chain.FetchUtxoEntry(prevOut)
1222  			if e != nil {
1223  				return false, e
1224  			}
1225  			if entry != nil && !entry.IsSpent() {
1226  				return true, nil
1227  			}
1228  		}
1229  		return false, nil
1230  	}
1231  	// The requested inventory is is an unsupported type, so just claim it is known
1232  	// to avoid requesting it.
1233  	return true, nil
1234  }
1235  
1236  // isSyncCandidate returns whether or not the peer is a candidate to consider
1237  // syncing from.
1238  func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool {
1239  	// Typically a peer is not a candidate for sync if it's not a full node, however
1240  	// regression test is special in that the regression tool is not a full node and
1241  	// still needs to be considered a sync candidate.
1242  	if sm.chainParams == &chaincfg.RegressionTestParams {
1243  		// The peer is not a candidate if it's not coming from localhost or the hostname
1244  		// can't be determined for some reason.
1245  		var host string
1246  		var e error
1247  		host, _, e = net.SplitHostPort(peer.Addr())
1248  		if e != nil {
1249  			return false
1250  		}
1251  		if host != "127.0.0.1" && host != "localhost" {
1252  			return false
1253  		}
1254  		// } else {
1255  		// // The peer is not a candidate for sync if it's not a full node. Additionally, if the segwit soft-fork package
1256  		// // has activated, then the peer must also be upgraded.
1257  		// segwitActive, e := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
1258  		// if e != nil  {
1259  		// 			// 	Error("unable to query for segwit soft-fork state:", e)
1260  		// }
1261  		// nodeServices := peer.Services()
1262  		// if nodeServices&wire.SFNodeNetwork != wire.SFNodeNetwork ||
1263  		// 	(segwitActive && !peer.IsWitnessEnabled()) {
1264  		// 	return false
1265  		// }
1266  	}
1267  	// Candidate if all checks passed.
1268  	return true
1269  }
1270  
1271  // limitMap is a helper function for maps that require a maximum limit by
1272  // evicting a random transaction if adding a new value would cause it to
1273  // overflow the maximum allowed.
1274  func (sm *SyncManager) limitMap(m map[chainhash.Hash]struct{}, limit int) {
1275  	if len(m)+1 > limit {
1276  		// Remove a random entry from the map. For most compilers, Go's range statement
1277  		// iterates starting at a random item although that is not 100% guaranteed by
1278  		// the spec. The iteration order is not important here because an adversary
1279  		// would have to be able to pull off preimage attacks on the hashing function in
1280  		// order to target eviction of specific entries anyways.
1281  		for txHash := range m {
1282  			delete(m, txHash)
1283  			return
1284  		}
1285  	}
1286  }
1287  
1288  // resetHeaderState sets the headers-first mode state to values appropriate for
1289  // syncing from a new peer.
1290  func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) {
1291  	sm.headersFirstMode = false
1292  	sm.headerList.Init()
1293  	sm.startHeader = nil
1294  	// When there is a next checkpoint, add an entry for the latest known block into
1295  	// the header pool. This allows the next downloaded header to prove it links to
1296  	// the chain properly.
1297  	if sm.nextCheckpoint != nil {
1298  		node := headerNode{height: newestHeight, hash: newestHash}
1299  		sm.headerList.PushBack(&node)
1300  	}
1301  }
1302  
1303  // startSync will choose the best peer among the available candidate peers to
1304  // download/sync the blockchain from. When syncing is already running, it simply
1305  // returns. It also examines the candidates for any which are no longer
1306  // candidates and removes them as needed.
1307  func (sm *SyncManager) startSync() {
1308  	// Return now if we're already syncing.
1309  	if sm.syncPeer != nil {
1310  		return
1311  	}
1312  	// Once the segwit soft-fork package has activated, we only want to sync from
1313  	// peers which are witness enabled to ensure that we fully validate all
1314  	// blockchain data.
1315  	// var e error
1316  	// var segwitActive bool
1317  	// segwitActive, e = sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
1318  	// if e != nil  {
1319  	// 	Error("unable to query for segwit soft-fork state:", e)
1320  	// 	return
1321  	// }
1322  	best := sm.chain.BestSnapshot()
1323  	var bestPeer *peerpkg.Peer
1324  	for peer, state := range sm.peerStates {
1325  		if !state.syncCandidate {
1326  			continue
1327  		}
1328  		// if segwitActive && !peer.IsWitnessEnabled() {
1329  		// 	D.Ln("peer", peer, "not witness enabled, skipping")
1330  		// 	continue
1331  		// } Remove sync candidate peers that are no longer candidates due to passing
1332  		// their latest known block.
1333  		//
1334  		// NOTE: The < is intentional as opposed to <=. While technically the peer
1335  		// doesn't have a later block when it's equal, it will likely have one soon so
1336  		// it is a reasonable choice. It also allows the case where both are at 0 such
1337  		// as during regression test.
1338  		if peer.LastBlock() < best.Height {
1339  			// state.syncCandidate = false
1340  			continue
1341  		}
1342  		// TODO(davec): Use a better algorithm to choose the best peer. For now, just pick the first available candidate.
1343  		bestPeer = peer
1344  	}
1345  	// Start syncing from the best peer if one was selected.
1346  	if bestPeer != nil {
1347  		// Clear the requestedBlocks if the sync peer changes, otherwise we may ignore blocks we need that the last sync
1348  		// peer failed to send.
1349  		sm.requestedBlocks = make(map[chainhash.Hash]struct{})
1350  		locator, e := sm.chain.LatestBlockLocator()
1351  		if e != nil {
1352  			E.Ln("failed to get block locator for the latest block:", e)
1353  			return
1354  		}
1355  		T.C(
1356  			func() string {
1357  				return fmt.Sprintf("syncing to block height %d from peer %v", bestPeer.LastBlock(), bestPeer.Addr())
1358  			},
1359  		)
1360  		// When the current height is less than a known checkpoint we can use block
1361  		// headers to learn about which blocks comprise the chain up to the checkpoint
1362  		// and perform less validation for them. This is possible since each header
1363  		// contains the hash of the previous header and a merkle root.
1364  		//
1365  		// Therefore if we validate all of the received headers link together properly
1366  		// and the checkpoint hashes match, we can be sure the hashes for the blocks in
1367  		// between are accurate. Further, once the full blocks are downloaded, the
1368  		// merkle root is computed and compared against the value in the header which
1369  		// proves the full block hasn't been tampered with. Once we have passed the
1370  		// final checkpoint, or checkpoints are disabled, use standard inv messages
1371  		// learn about the blocks and fully validate them. Finally, regression test mode
1372  		// does not support the headers-first approach so do normal block downloads when
1373  		// in regression test mode.
1374  		if sm.nextCheckpoint != nil &&
1375  			best.Height < sm.nextCheckpoint.Height &&
1376  			sm.chainParams != &chaincfg.RegressionTestParams {
1377  			e := bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
1378  			if e != nil {
1379  			}
1380  			sm.headersFirstMode = true
1381  			I.F(
1382  				"downloading headers for blocks %d to %d from peer %s",
1383  				best.Height+1,
1384  				sm.nextCheckpoint.Height,
1385  				bestPeer.Addr(),
1386  			)
1387  		} else {
1388  			e := bestPeer.PushGetBlocksMsg(locator, &zeroHash)
1389  			if e != nil {
1390  			}
1391  		}
1392  		sm.syncPeer = bestPeer
1393  	} else {
1394  		T.Ln("no sync peer candidates available")
1395  	}
1396  }
1397  
1398  // New constructs a new SyncManager. Use Start to begin processing asynchronous
1399  // block, tx, and inv updates.
1400  func New(config *Config) (*SyncManager, error) {
1401  	sm := SyncManager{
1402  		peerNotifier:    config.PeerNotifier,
1403  		chain:           config.Chain,
1404  		txMemPool:       config.TxMemPool,
1405  		chainParams:     config.ChainParams,
1406  		rejectedTxns:    make(map[chainhash.Hash]struct{}),
1407  		requestedTxns:   make(map[chainhash.Hash]struct{}),
1408  		requestedBlocks: make(map[chainhash.Hash]struct{}),
1409  		peerStates:      make(map[*peerpkg.Peer]*peerSyncState),
1410  		progressLogger:  newBlockProgressLogger("processed"),
1411  		msgChan:         make(chan interface{}, config.MaxPeers*3),
1412  		headerList:      list.New(),
1413  		quit:            qu.T(),
1414  		feeEstimator:    config.FeeEstimator,
1415  	}
1416  	best := sm.chain.BestSnapshot()
1417  	if !config.DisableCheckpoints {
1418  		// Initialize the next checkpoint based on the current height.
1419  		sm.nextCheckpoint = sm.findNextHeaderCheckpoint(best.Height)
1420  		if sm.nextCheckpoint != nil {
1421  			sm.resetHeaderState(&best.Hash, best.Height)
1422  		}
1423  	} else {
1424  		I.Ln("checkpoints are disabled")
1425  	}
1426  	sm.chain.Subscribe(sm.handleBlockchainNotification)
1427  	return &sm, nil
1428  }
1429