kopach.go raw

   1  package kopach
   2  
   3  import (
   4  	"context"
   5  	"crypto/rand"
   6  	"fmt"
   7  	"net"
   8  	"os"
   9  	"runtime"
  10  	"time"
  11  
  12  	"github.com/niubaoshu/gotiny"
  13  
  14  	"github.com/p9c/p9/pkg/log"
  15  	"github.com/p9c/p9/pkg/chainrpc/p2padvt"
  16  	"github.com/p9c/p9/pkg/chainrpc/templates"
  17  	"github.com/p9c/p9/pkg/constant"
  18  	"github.com/p9c/p9/pkg/pipe"
  19  	"github.com/p9c/p9/pod/state"
  20  
  21  	"github.com/p9c/p9/pkg/qu"
  22  
  23  	"github.com/VividCortex/ewma"
  24  	"go.uber.org/atomic"
  25  
  26  	"github.com/p9c/p9/pkg/interrupt"
  27  
  28  	"github.com/p9c/p9/cmd/kopach/client"
  29  	"github.com/p9c/p9/pkg/chainhash"
  30  	"github.com/p9c/p9/pkg/chainrpc/hashrate"
  31  	"github.com/p9c/p9/pkg/chainrpc/job"
  32  	"github.com/p9c/p9/pkg/chainrpc/pause"
  33  	rav "github.com/p9c/p9/pkg/ring"
  34  	"github.com/p9c/p9/pkg/transport"
  35  )
  36  
  37  var maxThreads = float32(runtime.NumCPU())
  38  
  39  type HashCount struct {
  40  	uint64
  41  	Time time.Time
  42  }
  43  
  44  type SolutionData struct {
  45  	time       time.Time
  46  	height     int
  47  	algo       string
  48  	hash       string
  49  	indexHash  string
  50  	version    int32
  51  	prevBlock  string
  52  	merkleRoot string
  53  	timestamp  time.Time
  54  	bits       uint32
  55  	nonce      uint32
  56  }
  57  
  58  type Worker struct {
  59  	id                  string
  60  	cx                  *state.State
  61  	height              int32
  62  	active              atomic.Bool
  63  	conn                *transport.Channel
  64  	ctx                 context.Context
  65  	quit                qu.C
  66  	sendAddresses       []*net.UDPAddr
  67  	clients             []*client.Client
  68  	workers             []*pipe.Worker
  69  	FirstSender         atomic.Uint64
  70  	lastSent            atomic.Int64
  71  	Status              atomic.String
  72  	HashTick            chan HashCount
  73  	LastHash            *chainhash.Hash
  74  	StartChan, StopChan qu.C
  75  	// SetThreads          chan int
  76  	PassChan      chan string
  77  	solutions     []SolutionData
  78  	solutionCount int
  79  	Update        qu.C
  80  	hashCount     atomic.Uint64
  81  	hashSampleBuf *rav.BufferUint64
  82  	hashrate      float64
  83  	lastNonce     uint64
  84  }
  85  
  86  func (w *Worker) Start() {
  87  	D.Ln("starting up kopach workers")
  88  	w.workers = []*pipe.Worker{}
  89  	w.clients = []*client.Client{}
  90  	for i := 0; i < w.cx.Config.GenThreads.V(); i++ {
  91  		D.Ln("starting worker", i)
  92  		cmd, _ := pipe.Spawn(w.quit, os.Args[0], "worker", w.id, w.cx.ActiveNet.Name, w.cx.Config.LogLevel.V())
  93  		w.workers = append(w.workers, cmd)
  94  		w.clients = append(w.clients, client.New(cmd.StdConn))
  95  	}
  96  	for i := range w.clients {
  97  		T.Ln("sending pass to worker", i)
  98  		e := w.clients[i].SendPass(w.cx.Config.MulticastPass.Bytes())
  99  		if e != nil {
 100  		}
 101  	}
 102  	D.Ln("setting workers to active")
 103  	w.active.Store(true)
 104  
 105  }
 106  
 107  func (w *Worker) Stop() {
 108  	var e error
 109  	for i := range w.clients {
 110  		if e = w.clients[i].Pause(); E.Chk(e) {
 111  		}
 112  		if e = w.clients[i].Stop(); E.Chk(e) {
 113  		}
 114  		if e = w.clients[i].Close(); E.Chk(e) {
 115  		}
 116  	}
 117  	for i := range w.workers {
 118  		// if e = w.workers[i].Interrupt(); !E.Chk(e) {
 119  		// }
 120  		if e = w.workers[i].Kill(); !E.Chk(e) {
 121  		}
 122  		D.Ln("stopped worker", i)
 123  	}
 124  	w.active.Store(false)
 125  	w.quit.Q()
 126  }
 127  
 128  // Run the miner
 129  func Run(cx *state.State) (e error) {
 130  	D.Ln("miner starting")
 131  	randomBytes := make([]byte, 4)
 132  	if _, e = rand.Read(randomBytes); E.Chk(e) {
 133  	}
 134  	w := &Worker{
 135  		id:            fmt.Sprintf("%x", randomBytes),
 136  		cx:            cx,
 137  		quit:          cx.KillAll,
 138  		sendAddresses: []*net.UDPAddr{},
 139  		StartChan:     qu.T(),
 140  		StopChan:      qu.T(),
 141  		// SetThreads:    make(chan int),
 142  		solutions:     make([]SolutionData, 0, 2048),
 143  		Update:        qu.T(),
 144  		hashSampleBuf: rav.NewBufferUint64(1000),
 145  	}
 146  	w.lastSent.Store(time.Now().UnixNano())
 147  	w.active.Store(false)
 148  	D.Ln("opening broadcast channel listener")
 149  	w.conn, e = transport.NewBroadcastChannel(
 150  		"kopachmain", w, cx.Config.MulticastPass.Bytes(),
 151  		transport.DefaultPort, constant.MaxDatagramSize, handlers,
 152  		w.quit,
 153  	)
 154  	if e != nil {
 155  		return
 156  	}
 157  	// start up the workers
 158  	// if cx.Config.Generate.True() {
 159  	I.Ln("starting up miner workers")
 160  	w.Start()
 161  	interrupt.AddHandler(
 162  		func() {
 163  			w.Stop()
 164  		},
 165  	)
 166  	// }
 167  	// controller watcher thread
 168  	go func() {
 169  		D.Ln("starting controller watcher")
 170  		ticker := time.NewTicker(time.Second)
 171  		logger := time.NewTicker(time.Second)
 172  	out:
 173  		for {
 174  			select {
 175  			case <-ticker.C:
 176  				W.Ln("controller watcher ticker")
 177  				// if the last message sent was 3 seconds ago the server is almost certainly disconnected or crashed
 178  				// so clear FirstSender
 179  				since := time.Now().Sub(time.Unix(0, w.lastSent.Load()))
 180  				wasSending := since > time.Second*6 && w.FirstSender.Load() != 0
 181  				if wasSending {
 182  					D.Ln("previous current controller has stopped broadcasting", since, w.FirstSender.Load())
 183  					// when this string is clear other broadcasts will be listened to
 184  					w.FirstSender.Store(0)
 185  					// pause the workers
 186  					for i := range w.clients {
 187  						D.Ln("sending pause to worker", i)
 188  						e := w.clients[i].Pause()
 189  						if e != nil {
 190  						}
 191  					}
 192  				}
 193  				// if interrupt.Requested() {
 194  				// 	w.StopChan <- struct{}{}
 195  				// 	w.quit.Q()
 196  				// }
 197  			case <-logger.C:
 198  				W.Ln("hash report ticker")
 199  				w.hashrate = w.HashReport()
 200  				// if interrupt.Requested() {
 201  				// 	w.StopChan <- struct{}{}
 202  				// 	w.quit.Q()
 203  				// }
 204  			case <-w.StartChan.Wait():
 205  				D.Ln("received signal on StartChan")
 206  				cx.Config.Generate.T()
 207  				// if e = cx.Config.WriteToFile(cx.Config.ConfigFile.V()); E.Chk(e) {
 208  				// }
 209  				w.Start()
 210  			case <-w.StopChan.Wait():
 211  				D.Ln("received signal on StopChan")
 212  				cx.Config.Generate.F()
 213  				// if e = cx.Config.WriteToFile(cx.Config.ConfigFile.V()); E.Chk(e) {
 214  				// }
 215  				w.Stop()
 216  			case s := <-w.PassChan:
 217  				F.Ln("received signal on PassChan", s)
 218  				cx.Config.MulticastPass.Set(s)
 219  				// if e = cx.Config.WriteToFile(cx.Config.ConfigFile.V()); E.Chk(e) {
 220  				// }
 221  				w.Stop()
 222  				w.Start()
 223  			// case n := <-w.SetThreads:
 224  			// 	D.Ln("received signal on SetThreads", n)
 225  			// 	cx.Config.GenThreads.Set(n)
 226  			// 	// if e = cx.Config.WriteToFile(cx.Config.ConfigFile.V()); E.Chk(e) {
 227  			// 	// }
 228  			// 	if cx.Config.Generate.True() {
 229  			// 		// always sanitise
 230  			// 		if n < 0 {
 231  			// 			n = int(maxThreads)
 232  			// 		}
 233  			// 		if n > int(maxThreads) {
 234  			// 			n = int(maxThreads)
 235  			// 		}
 236  			// 		w.Stop()
 237  			// 		w.Start()
 238  			// 	}
 239  			case <-w.quit.Wait():
 240  				D.Ln("stopping from quit")
 241  				interrupt.Request()
 242  				break out
 243  			}
 244  		}
 245  		D.Ln("finished kopach miner work loop")
 246  		log.LogChanDisabled.Store(true)
 247  	}()
 248  	D.Ln("listening on", constant.UDP4MulticastAddress)
 249  	<-w.quit
 250  	I.Ln("kopach shutting down") // , interrupt.GoroutineDump())
 251  	// <-interrupt.HandlersDone
 252  	I.Ln("kopach finished shutdown")
 253  	return
 254  }
 255  
 256  // these are the handlers for specific message types.
 257  var handlers = transport.Handlers{
 258  	string(hashrate.Magic): func(
 259  		ctx interface{}, src net.Addr, dst string, b []byte,
 260  	) (e error) {
 261  		c := ctx.(*Worker)
 262  		if !c.active.Load() {
 263  			D.Ln("not active")
 264  			return
 265  		}
 266  		var hr hashrate.Hashrate
 267  		gotiny.Unmarshal(b, &hr)
 268  		// if this is not one of our workers reports ignore it
 269  		if hr.ID != c.id {
 270  			return
 271  		}
 272  		count := hr.Count
 273  		hc := c.hashCount.Load() + uint64(count)
 274  		c.hashCount.Store(hc)
 275  		return
 276  	},
 277  	string(job.Magic): func(
 278  		ctx interface{}, src net.Addr, dst string,
 279  		b []byte,
 280  	) (e error) {
 281  		w := ctx.(*Worker)
 282  		if !w.active.Load() {
 283  			T.Ln("not active")
 284  			return
 285  		}
 286  		jr := templates.Message{}
 287  		gotiny.Unmarshal(b, &jr)
 288  		w.height = jr.Height
 289  		cN := jr.UUID
 290  		firstSender := w.FirstSender.Load()
 291  		otherSent := firstSender != cN && firstSender != 0
 292  		if otherSent {
 293  			T.Ln("ignoring other controller job", jr.Nonce, jr.UUID)
 294  			// ignore other controllers while one is active and received first
 295  			return
 296  		}
 297  		// if jr.Nonce == w.lastNonce {
 298  		// 	I.Ln("same job again, ignoring (NOT)")
 299  		// 	// return
 300  		// }
 301  		// w.lastNonce = jr.Nonce
 302  		// w.FirstSender.Store(cN)
 303  		T.Ln("received job, starting workers on it", jr.Nonce, jr.UUID)
 304  		w.lastSent.Store(time.Now().UnixNano())
 305  		for i := range w.clients {
 306  			if e = w.clients[i].NewJob(&jr); E.Chk(e) {
 307  			}
 308  		}
 309  		return
 310  	},
 311  	string(pause.Magic): func(
 312  		ctx interface{}, src net.Addr, dst string, b []byte,
 313  	) (e error) {
 314  		w := ctx.(*Worker)
 315  		var advt p2padvt.Advertisment
 316  		gotiny.Unmarshal(b, &advt)
 317  		// p := pause.LoadPauseContainer(b)
 318  		fs := w.FirstSender.Load()
 319  		ni := advt.IPs
 320  		// ni := p.GetIPs()[0].String()
 321  		np := advt.UUID
 322  		// np := p.GetControllerListenerPort()
 323  		// ns := net.JoinHostPort(strings.Split(ni.String(), ":")[0], fmt.Sprint(np))
 324  		D.Ln("received pause from server at", ni, np, "stopping", len(w.clients), "workers stopping")
 325  		if fs == np {
 326  			for i := range w.clients {
 327  				// D.Ln("sending pause to worker", i, fs, np)
 328  				e := w.clients[i].Pause()
 329  				if e != nil {
 330  				}
 331  			}
 332  		}
 333  		w.FirstSender.Store(0)
 334  		return
 335  	},
 336  	// string(sol.Magic): func(
 337  	// 	ctx interface{}, src net.Addr, dst string,
 338  	// 	b []byte,
 339  	// ) (e error) {
 340  	// 	// w := ctx.(*Worker)
 341  	// 	// I.Ln("shuffling work due to solution on network")
 342  	// 	// w.FirstSender.Store(0)
 343  	// 	// 	D.Ln("solution detected from miner at", src)
 344  	// 	// 	portSlice := strings.Split(w.FirstSender.Load(), ":")
 345  	// 	// 	if len(portSlice) < 2 {
 346  	// 	// 		D.Ln("error with solution", w.FirstSender.Load(), portSlice)
 347  	// 	// 		return
 348  	// 	// 	}
 349  	// 	// 	// port := portSlice[1]
 350  	// 	// 	// j := sol.LoadSolContainer(b)
 351  	// 	// 	// senderPort := j.GetSenderPort()
 352  	// 	// 	// if fmt.Sprint(senderPort) == port {
 353  	// 	// 	// // W.Ln("we found a solution")
 354  	// 	// 	// // prepend to list of solutions for GUI display if enabled
 355  	// 	// 	// if *w.cx.Config.KopachGUI {
 356  	// 	// 	// 	// D.Ln("length solutions", len(w.solutions))
 357  	// 	// 	// 	blok := j.GetMsgBlock()
 358  	// 	// 	// 	w.solutions = append(
 359  	// 	// 	// 		w.solutions, []SolutionData{
 360  	// 	// 	// 			{
 361  	// 	// 	// 				time:   time.Now(),
 362  	// 	// 	// 				height: int(w.height),
 363  	// 	// 	// 				algo: fmt.Sprint(
 364  	// 	// 	// 					fork.GetAlgoName(blok.Header.Version, w.height),
 365  	// 	// 	// 				),
 366  	// 	// 	// 				hash:       blok.Header.BlockHashWithAlgos(w.height).String(),
 367  	// 	// 	// 				indexHash:  blok.Header.BlockHash().String(),
 368  	// 	// 	// 				version:    blok.Header.Version,
 369  	// 	// 	// 				prevBlock:  blok.Header.PrevBlock.String(),
 370  	// 	// 	// 				merkleRoot: blok.Header.MerkleRoot.String(),
 371  	// 	// 	// 				timestamp:  blok.Header.Timestamp,
 372  	// 	// 	// 				bits:       blok.Header.Bits,
 373  	// 	// 	// 				nonce:      blok.Header.Nonce,
 374  	// 	// 	// 			},
 375  	// 	// 	// 		}...,
 376  	// 	// 	// 	)
 377  	// 	// 	// 	if len(w.solutions) > 2047 {
 378  	// 	// 	// 		w.solutions = w.solutions[len(w.solutions)-2047:]
 379  	// 	// 	// 	}
 380  	// 	// 	// 	w.solutionCount = len(w.solutions)
 381  	// 	// 	// 	w.Update <- struct{}{}
 382  	// 	// 	// }
 383  	// 	// 	// }
 384  	// 	// 	// D.Ln("no longer listening to", w.FirstSender.Load())
 385  	// 	// 	// w.FirstSender.Store("")
 386  	// 	return
 387  	// },
 388  }
 389  
 390  func (w *Worker) HashReport() float64 {
 391  	W.Ln("generating hash report")
 392  	w.hashSampleBuf.Add(w.hashCount.Load())
 393  	av := ewma.NewMovingAverage()
 394  	var i int
 395  	var prev uint64
 396  	if e := w.hashSampleBuf.ForEach(
 397  		func(v uint64) (e error) {
 398  			if i < 1 {
 399  				prev = v
 400  			} else {
 401  				interval := v - prev
 402  				av.Add(float64(interval))
 403  				prev = v
 404  			}
 405  			i++
 406  			return nil
 407  		},
 408  	); E.Chk(e) {
 409  	}
 410  	average := av.Value()
 411  	W.Ln("hashrate average", average)
 412  	// panic("aaargh")
 413  	return average
 414  }
 415