implementation.go raw

   1  package worker
   2  
   3  import (
   4  	"crypto/cipher"
   5  	"math/rand"
   6  	"net"
   7  	"os"
   8  	"sync"
   9  	"time"
  10  
  11  	"github.com/p9c/p9/pkg/bits"
  12  	"github.com/p9c/p9/pkg/chainrpc/templates"
  13  	"github.com/p9c/p9/pkg/constant"
  14  	"github.com/p9c/p9/pkg/fork"
  15  	"github.com/p9c/p9/pkg/pipe"
  16  
  17  	"github.com/p9c/p9/pkg/qu"
  18  
  19  	"github.com/p9c/p9/pkg/blockchain"
  20  	"github.com/p9c/p9/pkg/chainrpc/hashrate"
  21  	"github.com/p9c/p9/pkg/chainrpc/sol"
  22  
  23  	"go.uber.org/atomic"
  24  
  25  	"github.com/p9c/p9/pkg/interrupt"
  26  
  27  	"github.com/p9c/p9/pkg/ring"
  28  	"github.com/p9c/p9/pkg/transport"
  29  )
  30  
  31  const CountPerRound = 81
  32  
  33  type Worker struct {
  34  	mx               sync.Mutex
  35  	id               string
  36  	pipeConn         *pipe.StdConn
  37  	dispatchConn     *transport.Channel
  38  	dispatchReady    atomic.Bool
  39  	ciph             cipher.AEAD
  40  	quit             qu.C
  41  	templatesMessage *templates.Message
  42  	uuid             atomic.Uint64
  43  	roller           *Counter
  44  	startNonce       uint32
  45  	startChan        qu.C
  46  	stopChan         qu.C
  47  	running          atomic.Bool
  48  	hashCount        atomic.Uint64
  49  	hashSampleBuf    *ring.BufferUint64
  50  }
  51  
  52  type Counter struct {
  53  	rpa           int32
  54  	C             atomic.Int32
  55  	Algos         atomic.Value // []int32
  56  	RoundsPerAlgo atomic.Int32
  57  }
  58  
  59  // NewCounter returns an initialized algorithm rolling counter that ensures each
  60  // miner does equal amounts of every algorithm
  61  func NewCounter(countPerRound int32) (c *Counter) {
  62  	// these will be populated when work arrives
  63  	var algos []int32
  64  	// Start the counter at a random position
  65  	rand.Seed(time.Now().UnixNano())
  66  	c = &Counter{}
  67  	c.C.Store(int32(rand.Intn(int(countPerRound)+1) + 1))
  68  	c.Algos.Store(algos)
  69  	c.RoundsPerAlgo.Store(countPerRound)
  70  	c.rpa = countPerRound
  71  	return
  72  }
  73  
  74  // GetAlgoVer returns the next algo version based on the current configuration
  75  func (c *Counter) GetAlgoVer(height int32) (ver int32) {
  76  	// the formula below rolls through versions with blocks roundsPerAlgo long for each algorithm by its index
  77  	algs := fork.GetAlgoVerSlice(height)
  78  	// D.Ln(algs)
  79  	if c.RoundsPerAlgo.Load() < 1 {
  80  		D.Ln("CountPerRound is", c.RoundsPerAlgo.Load(), len(algs))
  81  		return 0
  82  	}
  83  	if len(algs) > 0 {
  84  		ver = algs[c.C.Load()%int32(len(algs))]
  85  		// ver = algs[(c.C.Load()/
  86  		// 	c.CountPerRound.Load())%
  87  		// 	int32(len(algs))]
  88  		c.C.Add(1)
  89  	}
  90  	return
  91  }
  92  
  93  //
  94  // func (w *Worker) hashReport() {
  95  // 	w.hashSampleBuf.Add(w.hashCount.Load())
  96  // 	av := ewma.NewMovingAverage(15)
  97  // 	var i int
  98  // 	var prev uint64
  99  // 	if e := w.hashSampleBuf.ForEach(
 100  // 		func(v uint64) (e error) {
 101  // 			if i < 1 {
 102  // 				prev = v
 103  // 			} else {
 104  // 				interval := v - prev
 105  // 				av.Add(float64(interval))
 106  // 				prev = v
 107  // 			}
 108  // 			i++
 109  // 			return nil
 110  // 		},
 111  // 	); E.Chk(e) {
 112  // 	}
 113  // 	// I.Ln("kopach",w.hashSampleBuf.Cursor, w.hashSampleBuf.Buf)
 114  // 	Tracef("average hashrate %.2f", av.Value())
 115  // }
 116  
 117  // NewWithConnAndSemaphore is exposed to enable use an actual network connection while retaining the same RPC API to
 118  // allow a worker to be configured to run on a bare metal system with a different launcher main
 119  func NewWithConnAndSemaphore(id string, conn *pipe.StdConn, quit qu.C, uuid uint64) *Worker {
 120  	T.Ln("creating new worker")
 121  	// msgBlock := wire.WireBlock{Header: wire.BlockHeader{}}
 122  	w := &Worker{
 123  		id:            id,
 124  		pipeConn:      conn,
 125  		quit:          quit,
 126  		roller:        NewCounter(CountPerRound),
 127  		startChan:     qu.T(),
 128  		stopChan:      qu.T(),
 129  		hashSampleBuf: ring.NewBufferUint64(1000),
 130  	}
 131  	w.uuid.Store(uuid)
 132  	w.dispatchReady.Store(false)
 133  	// with this we can report cumulative hash counts as well as using it to distribute algorithms evenly
 134  	w.startNonce = uint32(w.roller.C.Load())
 135  	interrupt.AddHandler(
 136  		func() {
 137  			D.Ln("worker", id, "quitting")
 138  			w.stopChan <- struct{}{}
 139  			// _ = w.pipeConn.Close()
 140  			w.dispatchReady.Store(false)
 141  		},
 142  	)
 143  	go worker(w)
 144  	return w
 145  }
 146  
 147  func worker(w *Worker) {
 148  	D.Ln("main work loop starting")
 149  	// sampleTicker := time.NewTicker(time.Second)
 150  	var nonce uint32
 151  out:
 152  	for {
 153  		// Pause state
 154  		T.Ln("worker pausing")
 155  	pausing:
 156  		for {
 157  			select {
 158  			// case <-sampleTicker.C:
 159  			// 	// w.hashReport()
 160  			// 	break
 161  			case <-w.stopChan.Wait():
 162  				D.Ln("received pause signal while paused")
 163  				// drain stop channel in pause
 164  				break
 165  			case <-w.startChan.Wait():
 166  				D.Ln("received start signal")
 167  				break pausing
 168  			case <-w.quit.Wait():
 169  				D.Ln("quitting")
 170  				break out
 171  			}
 172  		}
 173  		// Run state
 174  		T.Ln("worker running")
 175  	running:
 176  		for {
 177  			select {
 178  			// case <-sampleTicker.C:
 179  			// 	// w.hashReport()
 180  			// 	break
 181  			case <-w.startChan.Wait():
 182  				D.Ln("received start signal while running")
 183  				// drain start channel in run mode
 184  				break
 185  			case <-w.stopChan.Wait():
 186  				D.Ln("received pause signal while running")
 187  				break running
 188  			case <-w.quit.Wait():
 189  				D.Ln("worker stopping while running")
 190  				break out
 191  			default:
 192  				if w.templatesMessage == nil || !w.dispatchReady.Load() {
 193  					D.Ln("not ready to work")
 194  				} else {
 195  					// I.Ln("starting mining round")
 196  					newHeight := w.templatesMessage.Height
 197  					vers := w.roller.GetAlgoVer(newHeight)
 198  					nonce++
 199  					tn := time.Now().Round(time.Second)
 200  					if tn.After(w.templatesMessage.Timestamp.Round(time.Second)) {
 201  						w.templatesMessage.Timestamp = tn
 202  					}
 203  					if w.roller.C.Load()%w.roller.RoundsPerAlgo.Load() == 0 {
 204  						D.Ln("switching algorithms", w.roller.C.Load())
 205  						// send out broadcast containing worker nonce and algorithm and count of blocks
 206  						w.hashCount.Store(w.hashCount.Load() + uint64(w.roller.RoundsPerAlgo.Load()))
 207  						hashReport := hashrate.Get(w.roller.RoundsPerAlgo.Load(), vers, newHeight, w.id)
 208  						e := w.dispatchConn.SendMany(
 209  							hashrate.Magic,
 210  							transport.GetShards(hashReport),
 211  						)
 212  						if e != nil {
 213  						}
 214  						// reseed the nonce
 215  						rand.Seed(time.Now().UnixNano())
 216  						nonce = rand.Uint32()
 217  						select {
 218  						case <-w.quit.Wait():
 219  							D.Ln("breaking out of work loop")
 220  							break out
 221  						case <-w.stopChan.Wait():
 222  							D.Ln("received pause signal while running")
 223  							break running
 224  						default:
 225  						}
 226  					}
 227  					blockHeader := w.templatesMessage.GenBlockHeader(vers)
 228  					blockHeader.Nonce = nonce
 229  					// D.S(w.templatesMessage)
 230  					// D.S(blockHeader)
 231  					hash := blockHeader.BlockHashWithAlgos(newHeight)
 232  					bigHash := blockchain.HashToBig(&hash)
 233  					if bigHash.Cmp(bits.CompactToBig(blockHeader.Bits)) <= 0 {
 234  						D.Ln("found solution", newHeight, w.templatesMessage.Nonce, w.templatesMessage.UUID)
 235  						srs := sol.Encode(w.templatesMessage.Nonce, w.templatesMessage.UUID, blockHeader)
 236  						e := w.dispatchConn.SendMany(
 237  							sol.Magic,
 238  							transport.GetShards(srs),
 239  						)
 240  						if e != nil {
 241  						}
 242  						D.Ln("sent solution")
 243  						w.templatesMessage = nil
 244  						select {
 245  						case <-w.quit.Wait():
 246  							D.Ln("breaking out of work loop")
 247  							break out
 248  						default:
 249  						}
 250  						break running
 251  					}
 252  					// D.Ln("completed mining round")
 253  				}
 254  			}
 255  		}
 256  	}
 257  	D.Ln("worker finished")
 258  	interrupt.Request()
 259  }
 260  
 261  // New initialises the state for a worker, loading the work function handler that runs a round of processing between
 262  // checking quit signal and work semaphore
 263  func New(id string, quit qu.C, uuid uint64) (w *Worker, conn net.Conn) {
 264  	// log.L.SetLevel("trace", true)
 265  	sc := pipe.New(os.Stdin, os.Stdout, quit)
 266  	
 267  	return NewWithConnAndSemaphore(id, sc, quit, uuid), sc
 268  }
 269  
 270  // NewJob is a delivery of a new job for the worker, this makes the miner start
 271  // mining from pause or pause, prepare the work and restart
 272  func (w *Worker) NewJob(j *templates.Message, reply *bool) (e error) {
 273  	// T.Ln("received new job")
 274  	if !w.dispatchReady.Load() {
 275  		D.Ln("dispatch not ready")
 276  		*reply = true
 277  		return
 278  	}
 279  	if w.templatesMessage != nil {
 280  		if j.PrevBlock == w.templatesMessage.PrevBlock {
 281  			// T.Ln("not a new job")
 282  			*reply = true
 283  			return
 284  		}
 285  	}
 286  	// D.S(j)
 287  	*reply = true
 288  	D.Ln("halting current work")
 289  	w.stopChan <- struct{}{}
 290  	D.Ln("halt signal sent")
 291  	// load the job into the template
 292  	if w.templatesMessage == nil {
 293  		w.templatesMessage = j
 294  	} else {
 295  		*w.templatesMessage = *j
 296  	}
 297  	D.Ln("switching to new job")
 298  	w.startChan <- struct{}{}
 299  	D.Ln("start signal sent")
 300  	return
 301  }
 302  
 303  // Pause signals the worker to stop working, releases its semaphore and the worker is then idle
 304  func (w *Worker) Pause(_ int, reply *bool) (e error) {
 305  	T.Ln("pausing from IPC")
 306  	w.running.Store(false)
 307  	w.stopChan <- struct{}{}
 308  	*reply = true
 309  	return
 310  }
 311  
 312  // Stop signals the worker to quit
 313  func (w *Worker) Stop(_ int, reply *bool) (e error) {
 314  	D.Ln("stopping from IPC")
 315  	w.stopChan <- struct{}{}
 316  	defer w.quit.Q()
 317  	*reply = true
 318  	// time.Sleep(time.Second * 3)
 319  	// os.Exit(0)
 320  	return
 321  }
 322  
 323  // SendPass gives the encryption key configured in the kopach controller ( pod) configuration to allow workers to
 324  // dispatch their solutions
 325  func (w *Worker) SendPass(pass []byte, reply *bool) (e error) {
 326  	D.Ln("receiving dispatch password", pass)
 327  	rand.Seed(time.Now().UnixNano())
 328  	// sp := fmt.Sprint(rand.Intn(32767) + 1025)
 329  	// rp := fmt.Sprint(rand.Intn(32767) + 1025)
 330  	var conn *transport.Channel
 331  	conn, e = transport.NewBroadcastChannel(
 332  		"kopachworker",
 333  		w,
 334  		pass,
 335  		transport.DefaultPort,
 336  		constant.MaxDatagramSize,
 337  		transport.Handlers{},
 338  		w.quit,
 339  	)
 340  	if e != nil {
 341  	}
 342  	w.dispatchConn = conn
 343  	w.dispatchReady.Store(true)
 344  	*reply = true
 345  	return
 346  }
 347