ctrl.go raw
1 package ctrl
2
3 import (
4 "errors"
5 "fmt"
6 "math/rand"
7 "net"
8 "sync"
9 "time"
10
11 "github.com/VividCortex/ewma"
12 "github.com/niubaoshu/gotiny"
13 "go.uber.org/atomic"
14
15 "github.com/p9c/p9/pkg/qu"
16
17 "github.com/p9c/p9/cmd/node/active"
18 "github.com/p9c/p9/pkg/amt"
19 "github.com/p9c/p9/pkg/block"
20 "github.com/p9c/p9/pkg/blockchain"
21 "github.com/p9c/p9/pkg/btcaddr"
22 "github.com/p9c/p9/pkg/chainrpc"
23 "github.com/p9c/p9/pkg/chainrpc/hashrate"
24 "github.com/p9c/p9/pkg/chainrpc/job"
25 "github.com/p9c/p9/pkg/chainrpc/p2padvt"
26 "github.com/p9c/p9/pkg/chainrpc/pause"
27 "github.com/p9c/p9/pkg/chainrpc/sol"
28 "github.com/p9c/p9/pkg/chainrpc/templates"
29 "github.com/p9c/p9/pkg/constant"
30 "github.com/p9c/p9/pkg/fork"
31 "github.com/p9c/p9/pkg/mining"
32 rav "github.com/p9c/p9/pkg/ring"
33 "github.com/p9c/p9/pkg/rpcclient"
34 "github.com/p9c/p9/pkg/transport"
35 "github.com/p9c/p9/pkg/wire"
36 "github.com/p9c/p9/pod/config"
37 )
38
39 const (
40 BufferSize = 4096
41 )
42
43 // State stores the state of the controller
44 type State struct {
45 sync.Mutex
46 cfg *config.Config
47 node *chainrpc.Node
48 connMgr chainrpc.ServerConnManager
49 stateCfg *active.Config
50 mempoolUpdateChan qu.C
51 uuid uint64
52 start, stop, quit qu.C
53 blockUpdate chan *block.Block
54 generator *mining.BlkTmplGenerator
55 nextAddress btcaddr.Address
56 walletClient *rpcclient.Client
57 msgBlockTemplates *templates.RecentMessages
58 templateShards [][]byte
59 multiConn *transport.Channel
60 otherNodes map[uint64]*nodeSpec
61 hashSampleBuf *rav.BufferUint64
62 hashCount atomic.Uint64
63 lastNonce int32
64 lastBlockUpdate atomic.Int64
65 certs []byte
66 }
67
68 type nodeSpec struct {
69 time.Time
70 addr string
71 }
72
73 // New creates a new controller
74 func New(
75 syncing *atomic.Bool,
76 cfg *config.Config,
77 stateCfg *active.Config,
78 node *chainrpc.Node,
79 connMgr chainrpc.ServerConnManager,
80 mempoolUpdateChan qu.C,
81 uuid uint64,
82 killall qu.C,
83 start, stop qu.C,
84 ) (s *State, e error) {
85 quit := qu.T()
86 I.Ln("creating othernodes map")
87 I.Ln("getting configured TLS certificates")
88 certs := cfg.ReadCAFile()
89 s = &State{
90 cfg: cfg,
91 node: node,
92 connMgr: connMgr,
93 stateCfg: stateCfg,
94 mempoolUpdateChan: mempoolUpdateChan,
95 otherNodes: make(map[uint64]*nodeSpec),
96 quit: quit,
97 uuid: uuid,
98 start: start,
99 stop: stop,
100 blockUpdate: make(chan *block.Block, 1),
101 hashSampleBuf: rav.NewBufferUint64(100),
102 msgBlockTemplates: templates.NewRecentMessages(),
103 certs: certs,
104 }
105 s.lastBlockUpdate.Store(time.Now().Add(-time.Second * 3).Unix())
106 s.generator = chainrpc.GetBlkTemplateGenerator(node, cfg, stateCfg)
107 var mc *transport.Channel
108 I.S(cfg.MulticastPass.V(), cfg.MulticastPass.Bytes())
109 if mc, e = transport.NewBroadcastChannel(
110 "controller",
111 s,
112 cfg.MulticastPass.Bytes(),
113 transport.DefaultPort,
114 constant.MaxDatagramSize,
115 handlersMulticast,
116 quit,
117 ); E.Chk(e) {
118 return
119 }
120 s.multiConn = mc
121 go func() {
122 I.Ln("starting shutdown signal watcher")
123 select {
124 case <-killall:
125 I.Ln("received killall signal, signalling to quit controller")
126 s.Shutdown()
127 case <-s.quit:
128 I.Ln("received quit signal, breaking out of shutdown signal watcher")
129 }
130 }()
131 node.Chain.Subscribe(
132 func(n *blockchain.Notification) {
133 switch n.Type {
134 case blockchain.NTBlockConnected:
135 T.Ln("received block connected notification")
136 if b, ok := n.Data.(*block.Block); !ok {
137 W.Ln("block notification is not a block")
138 break
139 } else {
140 s.blockUpdate <- b
141 }
142 }
143 },
144 )
145 return
146 }
147
148 // todo: the stop
149
150 // Start up the controller
151 func (s *State) Start() {
152 I.Ln("calling start controller")
153 s.start.Signal()
154 }
155
156 // Stop the controller
157 func (s *State) Stop() {
158 I.Ln("calling stop controller")
159 s.stop.Signal()
160 }
161
162 // Shutdown the controller
163 func (s *State) Shutdown() {
164 I.Ln("sending shutdown signal to controller")
165 s.quit.Q()
166 }
167
168 func (s *State) startWallet() (e error) {
169 if s.walletClient, e = rpcclient.New(
170 &rpcclient.ConnConfig{
171 Host: s.cfg.WalletServer.V(),
172 Endpoint: "ws",
173 User: s.cfg.Username.V(),
174 Pass: s.cfg.Password.V(),
175 TLS: s.cfg.ServerTLS.True(),
176 Certificates: s.certs,
177 }, nil, s.quit,
178 ); T.Chk(e) {
179 return
180 }
181 I.Ln("established wallet connection")
182 return
183 }
184
185 func (s *State) updateBlockTemplate() (e error) {
186 I.Ln("getting current chain tip")
187 // s.node.Chain.ChainLock.Lock() // previously this was done before the above, it might be jumping the gun on a new block
188 h := s.node.Chain.BestSnapshot().Hash
189 var blk *block.Block
190 if blk, e = s.node.Chain.BlockByHash(&h); E.Chk(e) {
191 return
192 }
193 // s.node.Chain.ChainLock.Unlock()
194 I.Ln("updating block from chain tip")
195 // D.S(blk)
196 s.doBlockUpdate(blk)
197 I.Ln("sending out templates...")
198 if e = s.multiConn.SendMany(job.Magic, s.templateShards); E.Chk(e) {
199 return
200 }
201 return
202 }
203
204 // Run must be started as a goroutine, central routing for the business of the
205 // controller
206 //
207 // For increased simplicity, every type of work runs in one thread, only signalling
208 // from background goroutines to trigger state changes.
209 func (s *State) Run() {
210 I.Ln("starting controller")
211 var e error
212 ticker := time.NewTicker(time.Second)
213 out:
214 for {
215 // if !s.Syncing.Load() {
216 // if s.walletClient.Disconnected() {
217 // I.Ln("wallet client is disconnected, retrying")
218 // if e = s.startWallet(); !E.Chk(e) {
219 // continue
220 // }
221 // select {
222 // case <-time.After(time.Second):
223 // continue
224 // case <-s.quit:
225 // break out
226 // }
227 // }
228 // } else {
229 // select {
230 // case <-time.After(time.Second):
231 // continue
232 // case <-s.quit:
233 // break out
234 // }
235 // }
236 // // I.Ln("wallet client is connected, switching to running")
237 // // if e = s.updateBlockTemplate(); E.Chk(e) {
238 // // }
239 I.Ln("controller now pausing")
240 pausing:
241 for {
242 select {
243 case <-s.mempoolUpdateChan:
244 // I.Ln("mempool update chan signal")
245 // if e = s.updateBlockTemplate(); E.Chk(e) {
246 // }
247 case /* bu :=*/ <-s.blockUpdate:
248 // I.Ln("received new block update while paused")
249 // if e = s.doBlockUpdate(bu); E.Chk(e) {
250 // }
251 // // s.updateBlockTemplate()
252 case <-ticker.C:
253 // D.Ln("controller ticker running")
254 // s.Advertise()
255 // s.checkConnected()
256 if s.walletClient.Disconnected() {
257 I.Ln("wallet client is disconnected, retrying")
258 if e = s.startWallet(); e != nil { // T.Chk(e) {
259 // s.updateBlockTemplate()
260 break
261 }
262 }
263 I.Ln("wallet client is connected, switching to running")
264 break pausing
265 case <-s.start.Wait():
266 I.Ln("received start signal while paused")
267 if !s.checkConnected() {
268 I.Ln("not connected")
269 break
270 }
271 break pausing
272 case <-s.stop.Wait():
273 I.Ln("received stop signal while paused")
274 case <-s.quit.Wait():
275 I.Ln("received quit signal while paused")
276 break out
277 }
278 }
279 // if s.templateShards == nil || len(s.templateShards) < 1 {
280 // }
281 I.Ln("controller now running")
282 if e = s.updateBlockTemplate(); E.Chk(e) {
283 }
284 running:
285 for {
286 select {
287 case <-s.mempoolUpdateChan:
288 I.Ln("mempoolUpdateChan updating block templates")
289 if e = s.updateBlockTemplate(); E.Chk(e) {
290 break
291 }
292 I.Ln("sending out templates...")
293 if e = s.multiConn.SendMany(job.Magic, s.templateShards); E.Chk(e) {
294 }
295 case bu := <-s.blockUpdate:
296 // _ = bu
297 go func(){
298
299 I.Ln("received new block update while running")
300 s.doBlockUpdate(bu)
301 I.Ln("sending out templates...")
302 if e = s.multiConn.SendMany(job.Magic, s.templateShards); E.Chk(e) {
303 return
304 }
305 }()
306 case <-ticker.C:
307 // T.Ln("checking if wallet is connected")
308 if !s.checkConnected() {
309 break running
310 }
311 // I.Ln("resending current templates...")
312 if e = s.multiConn.SendMany(job.Magic, s.templateShards); E.Chk(e) {
313 break
314 }
315 if s.walletClient.Disconnected() {
316 I.Ln("wallet client has disconnected, switching to pausing")
317 break running
318 }
319 case <-s.start.Wait():
320 I.Ln("received start signal while running")
321 case <-s.stop.Wait():
322 I.Ln("received stop signal while running")
323 break running
324 case <-s.quit.Wait():
325 I.Ln("received quit signal while running")
326 break out
327 }
328 }
329 }
330 }
331
332 func (s *State) checkConnected() (connected bool) {
333 return true
334 // if !*s.cfg.Generate || *s.cfg.GenThreads == 0 {
335 // I.Ln("no need to check connectivity if we aren't mining")
336 // return
337 // }
338 // if s.cfg.Solo.True() {
339 // // I.Ln("in solo mode, mining anyway")
340 // // s.Start()
341 // return true
342 // }
343 // T.Ln("checking connectivity state")
344 // ps := make(chan peersummary.PeerSummaries, 1)
345 // s.node.PeerState <- ps
346 // T.Ln("sent peer list query")
347 // var lanPeers int
348 // var totalPeers int
349 // select {
350 // case connState := <-ps:
351 // T.Ln("received peer list query response")
352 // totalPeers = len(connState)
353 // for i := range connState {
354 // if routeable.IPNet.Contains(connState[i].IP) {
355 // lanPeers++
356 // }
357 // }
358 // if s.cfg.LAN.True() {
359 // // if there is no peers on lan and solo was not set, stop mining
360 // if lanPeers == 0 {
361 // T.Ln("no lan peers while in lan mode, stopping mining")
362 // s.Stop()
363 // } else {
364 // s.Start()
365 // connected = true
366 // }
367 // } else {
368 // if totalPeers-lanPeers == 0 {
369 // // we have no peers on the internet, stop mining
370 // T.Ln("no internet peers, stopping mining")
371 // s.Stop()
372 // } else {
373 // s.Start()
374 // connected = true
375 // }
376 // }
377 // break
378 // // quit waiting if we are shutting down
379 // case <-s.quit:
380 // break
381 // }
382 // T.Ln(totalPeers, "total peers", lanPeers, "lan peers solo:",
383 // s.cfg.Solo.True(), "lan:", s.cfg.LAN.True())
384 // return
385 }
386
387 //
388 // func (s *State) Advertise() {
389 // if !*s.cfg.Discovery {
390 // return
391 // }
392 // T.Ln("sending out advertisment")
393 // var e error
394 // if e = s.multiConn.SendMany(
395 // p2padvt.Magic,
396 // transport.GetShards(p2padvt.Get(s.uuid, s.cfg)),
397 // ); E.Chk(e) {
398 // }
399 // }
400
401 func (s *State) doBlockUpdate(prev *block.Block) (e error) {
402 I.Ln("do block update")
403 if s.nextAddress == nil {
404 I.Ln("getting new address for templates")
405 // if s.nextAddress, e = s.GetNewAddressFromMiningAddrs(); T.Chk(e) {
406 if s.nextAddress, e = s.GetNewAddressFromWallet(); T.Chk(e) {
407 s.Stop()
408 return
409 }
410 // }
411 }
412 I.Ln("getting templates...", prev.WireBlock().Header.Timestamp)
413 var tpl *templates.Message
414 if tpl, e = s.GetMsgBlockTemplate(prev, s.nextAddress); E.Chk(e) {
415 s.Stop()
416 return
417 }
418 // I.S(tpl)
419 s.msgBlockTemplates.Add(tpl)
420 // I.Ln(tpl.Timestamp)
421 I.Ln("caching error corrected message shards...")
422 srl := tpl.Serialize()
423 // I.S(srl)
424 s.templateShards = transport.GetShards(srl)
425 // var dt []byte
426 // if _, e = fec.Decode(s.templateShards); E.Chk(e) {
427 // }
428 // I.S(dt)
429 return
430 }
431
432 // GetMsgBlockTemplate gets a Message building on given block paying to a given
433 // address
434 func (s *State) GetMsgBlockTemplate(
435 prev *block.Block, addr btcaddr.Address,
436 ) (mbt *templates.Message, e error) {
437 T.Ln("GetMsgBlockTemplate")
438 rand.Seed(time.Now().Unix())
439 mbt = &templates.Message{
440 Nonce: rand.Uint64(),
441 UUID: s.uuid,
442 PrevBlock: prev.WireBlock().BlockHash(),
443 Height: prev.Height() + 1,
444 Bits: make(templates.Diffs),
445 Merkles: make(templates.Merkles),
446 }
447 for next, curr, more := fork.AlgoVerIterator(mbt.Height); more(); next() {
448 // I.Ln("creating template for", curr())
449 var templateX *mining.BlockTemplate
450 if templateX, e = s.generator.NewBlockTemplate(
451 addr,
452 fork.GetAlgoName(curr(), mbt.Height),
453 ); D.Chk(e) || templateX == nil {
454 } else {
455 // I.S(templateX)
456 newB := templateX.Block
457 newH := newB.Header
458 mbt.Timestamp = newH.Timestamp
459 mbt.Bits[curr()] = newH.Bits
460 mbt.Merkles[curr()] = newH.MerkleRoot
461 // I.Ln("merkle for", curr(), mbt.Merkles[curr()])
462 mbt.SetTxs(curr(), newB.Transactions)
463 }
464 }
465 return
466 }
467
468 // GetNewAddressFromWallet gets a new address from the wallet if it is
469 // connected, or returns an error
470 func (s *State) GetNewAddressFromWallet() (addr btcaddr.Address, e error) {
471 if s.walletClient != nil {
472 if !s.walletClient.Disconnected() {
473 I.Ln("have access to a wallet, generating address")
474 if addr, e = s.walletClient.GetNewAddress("default"); E.Chk(e) {
475 } else {
476 I.Ln("-------- found address", addr)
477 }
478 }
479 } else {
480 e = errors.New("no wallet available for new address")
481 I.Ln(e)
482 }
483 return
484 }
485
486 //
487 // // GetNewAddressFromMiningAddrs tries to get an address from the mining
488 // // addresses list in the configuration file
489 // func (s *State) GetNewAddressFromMiningAddrs() (addr btcaddr.Address, e error) {
490 // if s.cfg.MiningAddrs == nil {
491 // e = errors.New("mining addresses is nil")
492 // I.Ln(e)
493 // return
494 // }
495 // if len(s.cfg.MiningAddrs.S()) < 1 {
496 // e = errors.New("no mining addresses")
497 // I.Ln(e)
498 // return
499 // }
500 // // Choose a payment address at random.
501 // rand.Seed(time.Now().UnixNano())
502 // p2a := rand.Intn(len(s.cfg.MiningAddrs.S()))
503 // addr = s.stateCfg.ActiveMiningAddrs[p2a]
504 // // remove the address from the state
505 // if p2a == 0 {
506 // s.stateCfg.ActiveMiningAddrs = s.stateCfg.ActiveMiningAddrs[1:]
507 // } else {
508 // s.stateCfg.ActiveMiningAddrs = append(
509 // s.stateCfg.ActiveMiningAddrs[:p2a],
510 // s.stateCfg.ActiveMiningAddrs[p2a+1:]...,
511 // )
512 // }
513 // // update the config
514 // var ma cli.StringSlice
515 // for i := range s.stateCfg.ActiveMiningAddrs {
516 // ma = append(ma, s.stateCfg.ActiveMiningAddrs[i].String())
517 // }
518 // s.cfg.MiningAddrs.Set(ma)
519 // podcfg.Save(s.cfg)
520 // return
521 // }
522
523 var handlersMulticast = transport.Handlers{
524 string(sol.Magic): processSolMsg,
525 // string(p2padvt.Magic): processAdvtMsg,
526 string(hashrate.Magic): processHashrateMsg,
527 }
528
529 func processAdvtMsg(
530 ctx interface{}, src net.Addr, dst string, b []byte,
531 ) (e error) {
532 I.Ln("processing advertisment message", src, dst)
533 s := ctx.(*State)
534 var j p2padvt.Advertisment
535 gotiny.Unmarshal(b, &j)
536 var uuid uint64
537 uuid = j.UUID
538 // I.Ln("uuid of advertisment", uuid, s.otherNodes)
539 if uuid == s.uuid {
540 I.Ln("ignoring own advertisment message")
541 return
542 }
543 if _, ok := s.otherNodes[uuid]; !ok {
544 // if we haven't already added it to the permanent peer list, we can add it now
545 I.Ln("connecting to lan peer with same PSK", j.IPs, uuid)
546 s.otherNodes[uuid] = &nodeSpec{}
547 s.otherNodes[uuid].Time = time.Now()
548 // try all IPs
549 // if s.cfg.AutoListen.True() {
550 // s.cfg.P2PConnect.Set(cli.StringSlice{})
551 // }
552 for addr := range j.IPs {
553 peerIP := net.JoinHostPort(addr, fmt.Sprint(j.P2P))
554 if e = s.connMgr.Connect(
555 peerIP,
556 true,
557 ); E.Chk(e) {
558 continue
559 }
560 I.Ln("connected to peer via address", peerIP)
561 s.otherNodes[uuid].addr = peerIP
562 break
563 }
564 I.Ln("otherNodes", s.otherNodes)
565 } else {
566 // update last seen time for uuid for garbage collection of stale disconnected
567 // nodes
568 s.otherNodes[uuid].Time = time.Now()
569 }
570 // If we lose connection for more than 9 seconds we delete and if the node
571 // reappears it can be reconnected
572 for i := range s.otherNodes {
573 if time.Now().Sub(s.otherNodes[i].Time) > time.Second*6 {
574 // also remove from connection manager
575 if e = s.connMgr.RemoveByAddr(s.otherNodes[i].addr); E.Chk(e) {
576 }
577 I.Ln("deleting", s.otherNodes[i])
578 delete(s.otherNodes, i)
579 }
580 }
581 // on := int32(len(s.otherNodes))
582 // s.otherNodeCount.Store(on)
583 return
584 }
585
586 // Solutions submitted by workers
587 func processSolMsg(
588 ctx interface{}, src net.Addr, dst string, b []byte,
589 ) (e error) {
590 I.Ln("received solution", src, dst)
591 s := ctx.(*State)
592 var so sol.Solution
593 gotiny.Unmarshal(b, &so)
594 tpl := s.msgBlockTemplates.Find(so.Nonce)
595 if tpl == nil {
596 I.Ln("solution nonce", so.Nonce, "is not known by this controller")
597 return
598 }
599 if so.UUID != s.uuid {
600 I.Ln("solution is for another controller")
601 return
602 }
603 var newHeader *wire.BlockHeader
604 if newHeader, e = so.Decode(); E.Chk(e) {
605 return
606 }
607 if newHeader.PrevBlock != tpl.PrevBlock {
608 I.Ln("blk submitted by kopach miner worker is stale")
609 return
610 }
611 var msgBlock *wire.Block
612 if msgBlock, e = tpl.Reconstruct(newHeader); E.Chk(e) {
613 I.Ln("failed to construct new header")
614 return
615 }
616
617 I.Ln("sending pause to workers")
618 if e = s.multiConn.SendMany(pause.Magic, transport.GetShards(p2padvt.Get(s.uuid, (s.cfg.P2PListeners.S())[0])),
619 ); E.Chk(e) {
620 return
621 }
622 I.Ln("signalling controller to enter pause mode")
623 s.Stop()
624 defer s.Start()
625 blk := block.NewBlock(msgBlock)
626 blk.SetHeight(tpl.Height)
627 var isOrphan bool
628 I.Ln("submitting blk for processing")
629 if isOrphan, e = s.node.SyncManager.ProcessBlock(blk, blockchain.BFNone); E.Chk(e) {
630 // Anything other than a rule violation is an unexpected error, so log that
631 // error as an internal error.
632 if _, ok := e.(blockchain.RuleError); !ok {
633 W.F(
634 "Unexpected error while processing blk submitted via kopach miner:", e,
635 )
636 return
637 } else {
638 W.Ln("blk submitted via kopach miner rejected:", e)
639 if isOrphan {
640 W.Ln("blk is an orphan")
641 return
642 }
643 return
644 }
645 }
646 I.Ln("the blk was accepted, new height", blk.Height())
647 I.C(
648 func() string {
649 bmb := blk.WireBlock()
650 coinbaseTx := bmb.Transactions[0].TxOut[0]
651 prevHeight := blk.Height() - 1
652 var prevBlock *block.Block
653 if prevBlock, e = s.node.Chain.BlockByHeight(prevHeight); E.Chk(e) {
654 }
655 if prevBlock == nil {
656 return "prevblock nil while generating log"
657 }
658 prevTime := prevBlock.WireBlock().Header.Timestamp.Unix()
659 since := bmb.Header.Timestamp.Unix() - prevTime
660 bHash := bmb.BlockHashWithAlgos(blk.Height())
661 return fmt.Sprintf(
662 "new blk height %d %08x %s%10d %08x %v %s %ds since prev",
663 blk.Height(),
664 prevBlock.WireBlock().Header.Bits,
665 bHash,
666 bmb.Header.Timestamp.Unix(),
667 bmb.Header.Bits,
668 amt.Amount(coinbaseTx.Value),
669 fork.GetAlgoName(
670 bmb.Header.Version,
671 blk.Height(),
672 ), since,
673 )
674 },
675 )
676 I.Ln("clearing address used for blk")
677 s.nextAddress = nil
678 return
679 }
680
681 // hashrate reports from workers
682 func processHashrateMsg(
683 ctx interface{}, src net.Addr, dst string, b []byte,
684 ) (e error) {
685 s := ctx.(*State)
686 var hr hashrate.Hashrate
687 gotiny.Unmarshal(b, &hr)
688 // only count each one once
689 if s.lastNonce == hr.Nonce {
690 return
691 }
692 s.lastNonce = hr.Nonce
693 // add to total hash counts
694 s.hashCount.Add(uint64(hr.Count))
695 return
696 }
697
698 func (s *State) hashReport() float64 {
699 s.hashSampleBuf.Add(s.hashCount.Load())
700 av := ewma.NewMovingAverage()
701 var i int
702 var prev uint64
703 if e := s.hashSampleBuf.ForEach(
704 func(v uint64) (e error) {
705 if i < 1 {
706 prev = v
707 } else {
708 interval := v - prev
709 av.Add(float64(interval))
710 prev = v
711 }
712 i++
713 return nil
714 },
715 ); E.Chk(e) {
716 }
717 return av.Value()
718 }
719