ctrl.go raw

   1  package ctrl
   2  
   3  import (
   4  	"errors"
   5  	"fmt"
   6  	"math/rand"
   7  	"net"
   8  	"sync"
   9  	"time"
  10  
  11  	"github.com/VividCortex/ewma"
  12  	"github.com/niubaoshu/gotiny"
  13  	"go.uber.org/atomic"
  14  
  15  	"github.com/p9c/p9/pkg/qu"
  16  
  17  	"github.com/p9c/p9/cmd/node/active"
  18  	"github.com/p9c/p9/pkg/amt"
  19  	"github.com/p9c/p9/pkg/block"
  20  	"github.com/p9c/p9/pkg/blockchain"
  21  	"github.com/p9c/p9/pkg/btcaddr"
  22  	"github.com/p9c/p9/pkg/chainrpc"
  23  	"github.com/p9c/p9/pkg/chainrpc/hashrate"
  24  	"github.com/p9c/p9/pkg/chainrpc/job"
  25  	"github.com/p9c/p9/pkg/chainrpc/p2padvt"
  26  	"github.com/p9c/p9/pkg/chainrpc/pause"
  27  	"github.com/p9c/p9/pkg/chainrpc/sol"
  28  	"github.com/p9c/p9/pkg/chainrpc/templates"
  29  	"github.com/p9c/p9/pkg/constant"
  30  	"github.com/p9c/p9/pkg/fork"
  31  	"github.com/p9c/p9/pkg/mining"
  32  	rav "github.com/p9c/p9/pkg/ring"
  33  	"github.com/p9c/p9/pkg/rpcclient"
  34  	"github.com/p9c/p9/pkg/transport"
  35  	"github.com/p9c/p9/pkg/wire"
  36  	"github.com/p9c/p9/pod/config"
  37  )
  38  
  39  const (
  40  	BufferSize = 4096
  41  )
  42  
  43  // State stores the state of the controller
  44  type State struct {
  45  	sync.Mutex
  46  	cfg               *config.Config
  47  	node              *chainrpc.Node
  48  	connMgr           chainrpc.ServerConnManager
  49  	stateCfg          *active.Config
  50  	mempoolUpdateChan qu.C
  51  	uuid              uint64
  52  	start, stop, quit qu.C
  53  	blockUpdate       chan *block.Block
  54  	generator         *mining.BlkTmplGenerator
  55  	nextAddress       btcaddr.Address
  56  	walletClient      *rpcclient.Client
  57  	msgBlockTemplates *templates.RecentMessages
  58  	templateShards    [][]byte
  59  	multiConn         *transport.Channel
  60  	otherNodes        map[uint64]*nodeSpec
  61  	hashSampleBuf     *rav.BufferUint64
  62  	hashCount         atomic.Uint64
  63  	lastNonce         int32
  64  	lastBlockUpdate   atomic.Int64
  65  	certs             []byte
  66  }
  67  
  68  type nodeSpec struct {
  69  	time.Time
  70  	addr string
  71  }
  72  
  73  // New creates a new controller
  74  func New(
  75  	syncing *atomic.Bool,
  76  	cfg *config.Config,
  77  	stateCfg *active.Config,
  78  	node *chainrpc.Node,
  79  	connMgr chainrpc.ServerConnManager,
  80  	mempoolUpdateChan qu.C,
  81  	uuid uint64,
  82  	killall qu.C,
  83  	start, stop qu.C,
  84  ) (s *State, e error) {
  85  	quit := qu.T()
  86  	I.Ln("creating othernodes map")
  87  	I.Ln("getting configured TLS certificates")
  88  	certs := cfg.ReadCAFile()
  89  	s = &State{
  90  		cfg:               cfg,
  91  		node:              node,
  92  		connMgr:           connMgr,
  93  		stateCfg:          stateCfg,
  94  		mempoolUpdateChan: mempoolUpdateChan,
  95  		otherNodes:        make(map[uint64]*nodeSpec),
  96  		quit:              quit,
  97  		uuid:              uuid,
  98  		start:             start,
  99  		stop:              stop,
 100  		blockUpdate:       make(chan *block.Block, 1),
 101  		hashSampleBuf:     rav.NewBufferUint64(100),
 102  		msgBlockTemplates: templates.NewRecentMessages(),
 103  		certs:             certs,
 104  	}
 105  	s.lastBlockUpdate.Store(time.Now().Add(-time.Second * 3).Unix())
 106  	s.generator = chainrpc.GetBlkTemplateGenerator(node, cfg, stateCfg)
 107  	var mc *transport.Channel
 108  	I.S(cfg.MulticastPass.V(), cfg.MulticastPass.Bytes())
 109  	if mc, e = transport.NewBroadcastChannel(
 110  		"controller",
 111  		s,
 112  		cfg.MulticastPass.Bytes(),
 113  		transport.DefaultPort,
 114  		constant.MaxDatagramSize,
 115  		handlersMulticast,
 116  		quit,
 117  	); E.Chk(e) {
 118  		return
 119  	}
 120  	s.multiConn = mc
 121  	go func() {
 122  		I.Ln("starting shutdown signal watcher")
 123  		select {
 124  		case <-killall:
 125  			I.Ln("received killall signal, signalling to quit controller")
 126  			s.Shutdown()
 127  		case <-s.quit:
 128  			I.Ln("received quit signal, breaking out of shutdown signal watcher")
 129  		}
 130  	}()
 131  	node.Chain.Subscribe(
 132  		func(n *blockchain.Notification) {
 133  			switch n.Type {
 134  			case blockchain.NTBlockConnected:
 135  				T.Ln("received block connected notification")
 136  				if b, ok := n.Data.(*block.Block); !ok {
 137  					W.Ln("block notification is not a block")
 138  					break
 139  				} else {
 140  					s.blockUpdate <- b
 141  				}
 142  			}
 143  		},
 144  	)
 145  	return
 146  }
 147  
 148  // todo: the stop
 149  
 150  // Start up the controller
 151  func (s *State) Start() {
 152  	I.Ln("calling start controller")
 153  	s.start.Signal()
 154  }
 155  
 156  // Stop the controller
 157  func (s *State) Stop() {
 158  	I.Ln("calling stop controller")
 159  	s.stop.Signal()
 160  }
 161  
 162  // Shutdown the controller
 163  func (s *State) Shutdown() {
 164  	I.Ln("sending shutdown signal to controller")
 165  	s.quit.Q()
 166  }
 167  
 168  func (s *State) startWallet() (e error) {
 169  	if s.walletClient, e = rpcclient.New(
 170  		&rpcclient.ConnConfig{
 171  			Host:         s.cfg.WalletServer.V(),
 172  			Endpoint:     "ws",
 173  			User:         s.cfg.Username.V(),
 174  			Pass:         s.cfg.Password.V(),
 175  			TLS:          s.cfg.ServerTLS.True(),
 176  			Certificates: s.certs,
 177  		}, nil, s.quit,
 178  	); T.Chk(e) {
 179  		return
 180  	}
 181  	I.Ln("established wallet connection")
 182  	return
 183  }
 184  
 185  func (s *State) updateBlockTemplate() (e error) {
 186  	I.Ln("getting current chain tip")
 187  	// s.node.Chain.ChainLock.Lock() // previously this was done before the above, it might be jumping the gun on a new block
 188  	h := s.node.Chain.BestSnapshot().Hash
 189  	var blk *block.Block
 190  	if blk, e = s.node.Chain.BlockByHash(&h); E.Chk(e) {
 191  		return
 192  	}
 193  	// s.node.Chain.ChainLock.Unlock()
 194  	I.Ln("updating block from chain tip")
 195  	// D.S(blk)
 196  	s.doBlockUpdate(blk)
 197  	I.Ln("sending out templates...")
 198  	if e = s.multiConn.SendMany(job.Magic, s.templateShards); E.Chk(e) {
 199  		return
 200  	}
 201  	return
 202  }
 203  
 204  // Run must be started as a goroutine, central routing for the business of the
 205  // controller
 206  //
 207  // For increased simplicity, every type of work runs in one thread, only signalling
 208  // from background goroutines to trigger state changes.
 209  func (s *State) Run() {
 210  	I.Ln("starting controller")
 211  	var e error
 212  	ticker := time.NewTicker(time.Second)
 213  out:
 214  	for {
 215  		// if !s.Syncing.Load() {
 216  		// 	if s.walletClient.Disconnected() {
 217  		// 		I.Ln("wallet client is disconnected, retrying")
 218  		// 		if e = s.startWallet(); !E.Chk(e) {
 219  		// 			continue
 220  		// 		}
 221  		// 		select {
 222  		// 		case <-time.After(time.Second):
 223  		// 			continue
 224  		// 		case <-s.quit:
 225  		// 			break out
 226  		// 		}
 227  		// 	}
 228  		// } else {
 229  		// 	select {
 230  		// 	case <-time.After(time.Second):
 231  		// 		continue
 232  		// 	case <-s.quit:
 233  		// 		break out
 234  		// 	}
 235  		// }
 236  		// // I.Ln("wallet client is connected, switching to running")
 237  		// // if e = s.updateBlockTemplate(); E.Chk(e) {
 238  		// // }
 239  		I.Ln("controller now pausing")
 240  	pausing:
 241  		for {
 242  			select {
 243  			case <-s.mempoolUpdateChan:
 244  				// I.Ln("mempool update chan signal")
 245  				// if e = s.updateBlockTemplate(); E.Chk(e) {
 246  				// }
 247  			case /* bu :=*/ <-s.blockUpdate:
 248  				// I.Ln("received new block update while paused")
 249  				// if e = s.doBlockUpdate(bu); E.Chk(e) {
 250  				// }
 251  				// // s.updateBlockTemplate()
 252  			case <-ticker.C:
 253  				// D.Ln("controller ticker running")
 254  				// s.Advertise()
 255  				// s.checkConnected()
 256  				if s.walletClient.Disconnected() {
 257  					I.Ln("wallet client is disconnected, retrying")
 258  					if e = s.startWallet(); e != nil { // T.Chk(e) {
 259  						// s.updateBlockTemplate()
 260  						break
 261  					}
 262  				}
 263  				I.Ln("wallet client is connected, switching to running")
 264  				break pausing
 265  			case <-s.start.Wait():
 266  				I.Ln("received start signal while paused")
 267  				if !s.checkConnected() {
 268  					I.Ln("not connected")
 269  					break
 270  				}
 271  				break pausing
 272  			case <-s.stop.Wait():
 273  				I.Ln("received stop signal while paused")
 274  			case <-s.quit.Wait():
 275  				I.Ln("received quit signal while paused")
 276  				break out
 277  			}
 278  		}
 279  		// if s.templateShards == nil || len(s.templateShards) < 1 {
 280  		// }
 281  		I.Ln("controller now running")
 282  		if e = s.updateBlockTemplate(); E.Chk(e) {
 283  		}
 284  	running:
 285  		for {
 286  			select {
 287  			case <-s.mempoolUpdateChan:
 288  				I.Ln("mempoolUpdateChan updating block templates")
 289  				if e = s.updateBlockTemplate(); E.Chk(e) {
 290  					break
 291  				}
 292  				I.Ln("sending out templates...")
 293  				if e = s.multiConn.SendMany(job.Magic, s.templateShards); E.Chk(e) {
 294  				}
 295  			case bu := <-s.blockUpdate:
 296  				// _ = bu
 297  				go func(){
 298  
 299  					I.Ln("received new block update while running")
 300  					s.doBlockUpdate(bu)
 301  					I.Ln("sending out templates...")
 302  					if e = s.multiConn.SendMany(job.Magic, s.templateShards); E.Chk(e) {
 303  						return
 304  					}
 305  				}()
 306  			case <-ticker.C:
 307  				// T.Ln("checking if wallet is connected")
 308  				if !s.checkConnected() {
 309  					break running
 310  				}
 311  				// I.Ln("resending current templates...")
 312  				if e = s.multiConn.SendMany(job.Magic, s.templateShards); E.Chk(e) {
 313  					break
 314  				}
 315  				if s.walletClient.Disconnected() {
 316  					I.Ln("wallet client has disconnected, switching to pausing")
 317  					break running
 318  				}
 319  			case <-s.start.Wait():
 320  				I.Ln("received start signal while running")
 321  			case <-s.stop.Wait():
 322  				I.Ln("received stop signal while running")
 323  				break running
 324  			case <-s.quit.Wait():
 325  				I.Ln("received quit signal while running")
 326  				break out
 327  			}
 328  		}
 329  	}
 330  }
 331  
 332  func (s *State) checkConnected() (connected bool) {
 333  	return true
 334  	// if !*s.cfg.Generate || *s.cfg.GenThreads == 0 {
 335  	// 	I.Ln("no need to check connectivity if we aren't mining")
 336  	// 	return
 337  	// }
 338  	// if s.cfg.Solo.True() {
 339  	// 	// I.Ln("in solo mode, mining anyway")
 340  	// 	// s.Start()
 341  	// 	return true
 342  	// }
 343  	// T.Ln("checking connectivity state")
 344  	// ps := make(chan peersummary.PeerSummaries, 1)
 345  	// s.node.PeerState <- ps
 346  	// T.Ln("sent peer list query")
 347  	// var lanPeers int
 348  	// var totalPeers int
 349  	// select {
 350  	// case connState := <-ps:
 351  	// 	T.Ln("received peer list query response")
 352  	// 	totalPeers = len(connState)
 353  	// 	for i := range connState {
 354  	// 		if routeable.IPNet.Contains(connState[i].IP) {
 355  	// 			lanPeers++
 356  	// 		}
 357  	// 	}
 358  	// 	if s.cfg.LAN.True() {
 359  	// 		// if there is no peers on lan and solo was not set, stop mining
 360  	// 		if lanPeers == 0 {
 361  	// 			T.Ln("no lan peers while in lan mode, stopping mining")
 362  	// 			s.Stop()
 363  	// 		} else {
 364  	// 			s.Start()
 365  	// 			connected = true
 366  	// 		}
 367  	// 	} else {
 368  	// 		if totalPeers-lanPeers == 0 {
 369  	// 			// we have no peers on the internet, stop mining
 370  	// 			T.Ln("no internet peers, stopping mining")
 371  	// 			s.Stop()
 372  	// 		} else {
 373  	// 			s.Start()
 374  	// 			connected = true
 375  	// 		}
 376  	// 	}
 377  	// 	break
 378  	// 	// quit waiting if we are shutting down
 379  	// case <-s.quit:
 380  	// 	break
 381  	// }
 382  	// T.Ln(totalPeers, "total peers", lanPeers, "lan peers solo:",
 383  	// 	s.cfg.Solo.True(), "lan:", s.cfg.LAN.True())
 384  	// return
 385  }
 386  
 387  //
 388  // func (s *State) Advertise() {
 389  // 	if !*s.cfg.Discovery {
 390  // 		return
 391  // 	}
 392  // 	T.Ln("sending out advertisment")
 393  // 	var e error
 394  // 	if e = s.multiConn.SendMany(
 395  // 		p2padvt.Magic,
 396  // 		transport.GetShards(p2padvt.Get(s.uuid, s.cfg)),
 397  // 	); E.Chk(e) {
 398  // 	}
 399  // }
 400  
 401  func (s *State) doBlockUpdate(prev *block.Block) (e error) {
 402  	I.Ln("do block update")
 403  	if s.nextAddress == nil {
 404  		I.Ln("getting new address for templates")
 405  		// if s.nextAddress, e = s.GetNewAddressFromMiningAddrs(); T.Chk(e) {
 406  		if s.nextAddress, e = s.GetNewAddressFromWallet(); T.Chk(e) {
 407  			s.Stop()
 408  			return
 409  		}
 410  		// }
 411  	}
 412  	I.Ln("getting templates...", prev.WireBlock().Header.Timestamp)
 413  	var tpl *templates.Message
 414  	if tpl, e = s.GetMsgBlockTemplate(prev, s.nextAddress); E.Chk(e) {
 415  		s.Stop()
 416  		return
 417  	}
 418  	// I.S(tpl)
 419  	s.msgBlockTemplates.Add(tpl)
 420  	// I.Ln(tpl.Timestamp)
 421  	I.Ln("caching error corrected message shards...")
 422  	srl := tpl.Serialize()
 423  	// I.S(srl)
 424  	s.templateShards = transport.GetShards(srl)
 425  	// var dt []byte
 426  	// if _, e = fec.Decode(s.templateShards); E.Chk(e) {
 427  	// }
 428  	// I.S(dt)
 429  	return
 430  }
 431  
 432  // GetMsgBlockTemplate gets a Message building on given block paying to a given
 433  // address
 434  func (s *State) GetMsgBlockTemplate(
 435  	prev *block.Block, addr btcaddr.Address,
 436  ) (mbt *templates.Message, e error) {
 437  	T.Ln("GetMsgBlockTemplate")
 438  	rand.Seed(time.Now().Unix())
 439  	mbt = &templates.Message{
 440  		Nonce:     rand.Uint64(),
 441  		UUID:      s.uuid,
 442  		PrevBlock: prev.WireBlock().BlockHash(),
 443  		Height:    prev.Height() + 1,
 444  		Bits:      make(templates.Diffs),
 445  		Merkles:   make(templates.Merkles),
 446  	}
 447  	for next, curr, more := fork.AlgoVerIterator(mbt.Height); more(); next() {
 448  		// I.Ln("creating template for", curr())
 449  		var templateX *mining.BlockTemplate
 450  		if templateX, e = s.generator.NewBlockTemplate(
 451  			addr,
 452  			fork.GetAlgoName(curr(), mbt.Height),
 453  		); D.Chk(e) || templateX == nil {
 454  		} else {
 455  			// I.S(templateX)
 456  			newB := templateX.Block
 457  			newH := newB.Header
 458  			mbt.Timestamp = newH.Timestamp
 459  			mbt.Bits[curr()] = newH.Bits
 460  			mbt.Merkles[curr()] = newH.MerkleRoot
 461  			// I.Ln("merkle for", curr(), mbt.Merkles[curr()])
 462  			mbt.SetTxs(curr(), newB.Transactions)
 463  		}
 464  	}
 465  	return
 466  }
 467  
 468  // GetNewAddressFromWallet gets a new address from the wallet if it is
 469  // connected, or returns an error
 470  func (s *State) GetNewAddressFromWallet() (addr btcaddr.Address, e error) {
 471  	if s.walletClient != nil {
 472  		if !s.walletClient.Disconnected() {
 473  			I.Ln("have access to a wallet, generating address")
 474  			if addr, e = s.walletClient.GetNewAddress("default"); E.Chk(e) {
 475  			} else {
 476  				I.Ln("-------- found address", addr)
 477  			}
 478  		}
 479  	} else {
 480  		e = errors.New("no wallet available for new address")
 481  		I.Ln(e)
 482  	}
 483  	return
 484  }
 485  
 486  //
 487  // // GetNewAddressFromMiningAddrs tries to get an address from the mining
 488  // // addresses list in the configuration file
 489  // func (s *State) GetNewAddressFromMiningAddrs() (addr btcaddr.Address, e error) {
 490  // 	if s.cfg.MiningAddrs == nil {
 491  // 		e = errors.New("mining addresses is nil")
 492  // 		I.Ln(e)
 493  // 		return
 494  // 	}
 495  // 	if len(s.cfg.MiningAddrs.S()) < 1 {
 496  // 		e = errors.New("no mining addresses")
 497  // 		I.Ln(e)
 498  // 		return
 499  // 	}
 500  // 	// Choose a payment address at random.
 501  // 	rand.Seed(time.Now().UnixNano())
 502  // 	p2a := rand.Intn(len(s.cfg.MiningAddrs.S()))
 503  // 	addr = s.stateCfg.ActiveMiningAddrs[p2a]
 504  // 	// remove the address from the state
 505  // 	if p2a == 0 {
 506  // 		s.stateCfg.ActiveMiningAddrs = s.stateCfg.ActiveMiningAddrs[1:]
 507  // 	} else {
 508  // 		s.stateCfg.ActiveMiningAddrs = append(
 509  // 			s.stateCfg.ActiveMiningAddrs[:p2a],
 510  // 			s.stateCfg.ActiveMiningAddrs[p2a+1:]...,
 511  // 		)
 512  // 	}
 513  // 	// update the config
 514  // 	var ma cli.StringSlice
 515  // 	for i := range s.stateCfg.ActiveMiningAddrs {
 516  // 		ma = append(ma, s.stateCfg.ActiveMiningAddrs[i].String())
 517  // 	}
 518  // 	s.cfg.MiningAddrs.Set(ma)
 519  // 	podcfg.Save(s.cfg)
 520  // 	return
 521  // }
 522  
 523  var handlersMulticast = transport.Handlers{
 524  	string(sol.Magic): processSolMsg,
 525  	// string(p2padvt.Magic):  processAdvtMsg,
 526  	string(hashrate.Magic): processHashrateMsg,
 527  }
 528  
 529  func processAdvtMsg(
 530  	ctx interface{}, src net.Addr, dst string, b []byte,
 531  ) (e error) {
 532  	I.Ln("processing advertisment message", src, dst)
 533  	s := ctx.(*State)
 534  	var j p2padvt.Advertisment
 535  	gotiny.Unmarshal(b, &j)
 536  	var uuid uint64
 537  	uuid = j.UUID
 538  	// I.Ln("uuid of advertisment", uuid, s.otherNodes)
 539  	if uuid == s.uuid {
 540  		I.Ln("ignoring own advertisment message")
 541  		return
 542  	}
 543  	if _, ok := s.otherNodes[uuid]; !ok {
 544  		// if we haven't already added it to the permanent peer list, we can add it now
 545  		I.Ln("connecting to lan peer with same PSK", j.IPs, uuid)
 546  		s.otherNodes[uuid] = &nodeSpec{}
 547  		s.otherNodes[uuid].Time = time.Now()
 548  		// try all IPs
 549  		// if s.cfg.AutoListen.True() {
 550  		// 	s.cfg.P2PConnect.Set(cli.StringSlice{})
 551  		// }
 552  		for addr := range j.IPs {
 553  			peerIP := net.JoinHostPort(addr, fmt.Sprint(j.P2P))
 554  			if e = s.connMgr.Connect(
 555  				peerIP,
 556  				true,
 557  			); E.Chk(e) {
 558  				continue
 559  			}
 560  			I.Ln("connected to peer via address", peerIP)
 561  			s.otherNodes[uuid].addr = peerIP
 562  			break
 563  		}
 564  		I.Ln("otherNodes", s.otherNodes)
 565  	} else {
 566  		// update last seen time for uuid for garbage collection of stale disconnected
 567  		// nodes
 568  		s.otherNodes[uuid].Time = time.Now()
 569  	}
 570  	// If we lose connection for more than 9 seconds we delete and if the node
 571  	// reappears it can be reconnected
 572  	for i := range s.otherNodes {
 573  		if time.Now().Sub(s.otherNodes[i].Time) > time.Second*6 {
 574  			// also remove from connection manager
 575  			if e = s.connMgr.RemoveByAddr(s.otherNodes[i].addr); E.Chk(e) {
 576  			}
 577  			I.Ln("deleting", s.otherNodes[i])
 578  			delete(s.otherNodes, i)
 579  		}
 580  	}
 581  	// on := int32(len(s.otherNodes))
 582  	// s.otherNodeCount.Store(on)
 583  	return
 584  }
 585  
 586  // Solutions submitted by workers
 587  func processSolMsg(
 588  	ctx interface{}, src net.Addr, dst string, b []byte,
 589  ) (e error) {
 590  	I.Ln("received solution", src, dst)
 591  	s := ctx.(*State)
 592  	var so sol.Solution
 593  	gotiny.Unmarshal(b, &so)
 594  	tpl := s.msgBlockTemplates.Find(so.Nonce)
 595  	if tpl == nil {
 596  		I.Ln("solution nonce", so.Nonce, "is not known by this controller")
 597  		return
 598  	}
 599  	if so.UUID != s.uuid {
 600  		I.Ln("solution is for another controller")
 601  		return
 602  	}
 603  	var newHeader *wire.BlockHeader
 604  	if newHeader, e = so.Decode(); E.Chk(e) {
 605  		return
 606  	}
 607  	if newHeader.PrevBlock != tpl.PrevBlock {
 608  		I.Ln("blk submitted by kopach miner worker is stale")
 609  		return
 610  	}
 611  	var msgBlock *wire.Block
 612  	if msgBlock, e = tpl.Reconstruct(newHeader); E.Chk(e) {
 613  		I.Ln("failed to construct new header")
 614  		return
 615  	}
 616  
 617  	I.Ln("sending pause to workers")
 618  	if e = s.multiConn.SendMany(pause.Magic, transport.GetShards(p2padvt.Get(s.uuid, (s.cfg.P2PListeners.S())[0])),
 619  	); E.Chk(e) {
 620  		return
 621  	}
 622  	I.Ln("signalling controller to enter pause mode")
 623  	s.Stop()
 624  	defer s.Start()
 625  	blk := block.NewBlock(msgBlock)
 626  	blk.SetHeight(tpl.Height)
 627  	var isOrphan bool
 628  	I.Ln("submitting blk for processing")
 629  	if isOrphan, e = s.node.SyncManager.ProcessBlock(blk, blockchain.BFNone); E.Chk(e) {
 630  		// Anything other than a rule violation is an unexpected error, so log that
 631  		// error as an internal error.
 632  		if _, ok := e.(blockchain.RuleError); !ok {
 633  			W.F(
 634  				"Unexpected error while processing blk submitted via kopach miner:", e,
 635  			)
 636  			return
 637  		} else {
 638  			W.Ln("blk submitted via kopach miner rejected:", e)
 639  			if isOrphan {
 640  				W.Ln("blk is an orphan")
 641  				return
 642  			}
 643  			return
 644  		}
 645  	}
 646  	I.Ln("the blk was accepted, new height", blk.Height())
 647  	I.C(
 648  		func() string {
 649  			bmb := blk.WireBlock()
 650  			coinbaseTx := bmb.Transactions[0].TxOut[0]
 651  			prevHeight := blk.Height() - 1
 652  			var prevBlock *block.Block
 653  			if prevBlock, e = s.node.Chain.BlockByHeight(prevHeight); E.Chk(e) {
 654  			}
 655  			if prevBlock == nil {
 656  				return "prevblock nil while generating log"
 657  			}
 658  			prevTime := prevBlock.WireBlock().Header.Timestamp.Unix()
 659  			since := bmb.Header.Timestamp.Unix() - prevTime
 660  			bHash := bmb.BlockHashWithAlgos(blk.Height())
 661  			return fmt.Sprintf(
 662  				"new blk height %d %08x %s%10d %08x %v %s %ds since prev",
 663  				blk.Height(),
 664  				prevBlock.WireBlock().Header.Bits,
 665  				bHash,
 666  				bmb.Header.Timestamp.Unix(),
 667  				bmb.Header.Bits,
 668  				amt.Amount(coinbaseTx.Value),
 669  				fork.GetAlgoName(
 670  					bmb.Header.Version,
 671  					blk.Height(),
 672  				), since,
 673  			)
 674  		},
 675  	)
 676  	I.Ln("clearing address used for blk")
 677  	s.nextAddress = nil
 678  	return
 679  }
 680  
 681  // hashrate reports from workers
 682  func processHashrateMsg(
 683  	ctx interface{}, src net.Addr, dst string, b []byte,
 684  ) (e error) {
 685  	s := ctx.(*State)
 686  	var hr hashrate.Hashrate
 687  	gotiny.Unmarshal(b, &hr)
 688  	// only count each one once
 689  	if s.lastNonce == hr.Nonce {
 690  		return
 691  	}
 692  	s.lastNonce = hr.Nonce
 693  	// add to total hash counts
 694  	s.hashCount.Add(uint64(hr.Count))
 695  	return
 696  }
 697  
 698  func (s *State) hashReport() float64 {
 699  	s.hashSampleBuf.Add(s.hashCount.Load())
 700  	av := ewma.NewMovingAverage()
 701  	var i int
 702  	var prev uint64
 703  	if e := s.hashSampleBuf.ForEach(
 704  		func(v uint64) (e error) {
 705  			if i < 1 {
 706  				prev = v
 707  			} else {
 708  				interval := v - prev
 709  				av.Add(float64(interval))
 710  				prev = v
 711  			}
 712  			i++
 713  			return nil
 714  		},
 715  	); E.Chk(e) {
 716  	}
 717  	return av.Value()
 718  }
 719