implementation.go raw
1 package worker
2
3 import (
4 "crypto/cipher"
5 "math/rand"
6 "net"
7 "os"
8 "sync"
9 "time"
10
11 "github.com/p9c/p9/pkg/bits"
12 "github.com/p9c/p9/pkg/chainrpc/templates"
13 "github.com/p9c/p9/pkg/constant"
14 "github.com/p9c/p9/pkg/fork"
15 "github.com/p9c/p9/pkg/pipe"
16
17 "github.com/p9c/p9/pkg/qu"
18
19 "github.com/p9c/p9/pkg/blockchain"
20 "github.com/p9c/p9/pkg/chainrpc/hashrate"
21 "github.com/p9c/p9/pkg/chainrpc/sol"
22
23 "go.uber.org/atomic"
24
25 "github.com/p9c/p9/pkg/interrupt"
26
27 "github.com/p9c/p9/pkg/ring"
28 "github.com/p9c/p9/pkg/transport"
29 )
30
31 const CountPerRound = 81
32
33 type Worker struct {
34 mx sync.Mutex
35 id string
36 pipeConn *pipe.StdConn
37 dispatchConn *transport.Channel
38 dispatchReady atomic.Bool
39 ciph cipher.AEAD
40 quit qu.C
41 templatesMessage *templates.Message
42 uuid atomic.Uint64
43 roller *Counter
44 startNonce uint32
45 startChan qu.C
46 stopChan qu.C
47 running atomic.Bool
48 hashCount atomic.Uint64
49 hashSampleBuf *ring.BufferUint64
50 }
51
52 type Counter struct {
53 rpa int32
54 C atomic.Int32
55 Algos atomic.Value // []int32
56 RoundsPerAlgo atomic.Int32
57 }
58
59 // NewCounter returns an initialized algorithm rolling counter that ensures each
60 // miner does equal amounts of every algorithm
61 func NewCounter(countPerRound int32) (c *Counter) {
62 // these will be populated when work arrives
63 var algos []int32
64 // Start the counter at a random position
65 rand.Seed(time.Now().UnixNano())
66 c = &Counter{}
67 c.C.Store(int32(rand.Intn(int(countPerRound)+1) + 1))
68 c.Algos.Store(algos)
69 c.RoundsPerAlgo.Store(countPerRound)
70 c.rpa = countPerRound
71 return
72 }
73
74 // GetAlgoVer returns the next algo version based on the current configuration
75 func (c *Counter) GetAlgoVer(height int32) (ver int32) {
76 // the formula below rolls through versions with blocks roundsPerAlgo long for each algorithm by its index
77 algs := fork.GetAlgoVerSlice(height)
78 // D.Ln(algs)
79 if c.RoundsPerAlgo.Load() < 1 {
80 D.Ln("CountPerRound is", c.RoundsPerAlgo.Load(), len(algs))
81 return 0
82 }
83 if len(algs) > 0 {
84 ver = algs[c.C.Load()%int32(len(algs))]
85 // ver = algs[(c.C.Load()/
86 // c.CountPerRound.Load())%
87 // int32(len(algs))]
88 c.C.Add(1)
89 }
90 return
91 }
92
93 //
94 // func (w *Worker) hashReport() {
95 // w.hashSampleBuf.Add(w.hashCount.Load())
96 // av := ewma.NewMovingAverage(15)
97 // var i int
98 // var prev uint64
99 // if e := w.hashSampleBuf.ForEach(
100 // func(v uint64) (e error) {
101 // if i < 1 {
102 // prev = v
103 // } else {
104 // interval := v - prev
105 // av.Add(float64(interval))
106 // prev = v
107 // }
108 // i++
109 // return nil
110 // },
111 // ); E.Chk(e) {
112 // }
113 // // I.Ln("kopach",w.hashSampleBuf.Cursor, w.hashSampleBuf.Buf)
114 // Tracef("average hashrate %.2f", av.Value())
115 // }
116
117 // NewWithConnAndSemaphore is exposed to enable use an actual network connection while retaining the same RPC API to
118 // allow a worker to be configured to run on a bare metal system with a different launcher main
119 func NewWithConnAndSemaphore(id string, conn *pipe.StdConn, quit qu.C, uuid uint64) *Worker {
120 T.Ln("creating new worker")
121 // msgBlock := wire.WireBlock{Header: wire.BlockHeader{}}
122 w := &Worker{
123 id: id,
124 pipeConn: conn,
125 quit: quit,
126 roller: NewCounter(CountPerRound),
127 startChan: qu.T(),
128 stopChan: qu.T(),
129 hashSampleBuf: ring.NewBufferUint64(1000),
130 }
131 w.uuid.Store(uuid)
132 w.dispatchReady.Store(false)
133 // with this we can report cumulative hash counts as well as using it to distribute algorithms evenly
134 w.startNonce = uint32(w.roller.C.Load())
135 interrupt.AddHandler(
136 func() {
137 D.Ln("worker", id, "quitting")
138 w.stopChan <- struct{}{}
139 // _ = w.pipeConn.Close()
140 w.dispatchReady.Store(false)
141 },
142 )
143 go worker(w)
144 return w
145 }
146
147 func worker(w *Worker) {
148 D.Ln("main work loop starting")
149 // sampleTicker := time.NewTicker(time.Second)
150 var nonce uint32
151 out:
152 for {
153 // Pause state
154 T.Ln("worker pausing")
155 pausing:
156 for {
157 select {
158 // case <-sampleTicker.C:
159 // // w.hashReport()
160 // break
161 case <-w.stopChan.Wait():
162 D.Ln("received pause signal while paused")
163 // drain stop channel in pause
164 break
165 case <-w.startChan.Wait():
166 D.Ln("received start signal")
167 break pausing
168 case <-w.quit.Wait():
169 D.Ln("quitting")
170 break out
171 }
172 }
173 // Run state
174 T.Ln("worker running")
175 running:
176 for {
177 select {
178 // case <-sampleTicker.C:
179 // // w.hashReport()
180 // break
181 case <-w.startChan.Wait():
182 D.Ln("received start signal while running")
183 // drain start channel in run mode
184 break
185 case <-w.stopChan.Wait():
186 D.Ln("received pause signal while running")
187 break running
188 case <-w.quit.Wait():
189 D.Ln("worker stopping while running")
190 break out
191 default:
192 if w.templatesMessage == nil || !w.dispatchReady.Load() {
193 D.Ln("not ready to work")
194 } else {
195 // I.Ln("starting mining round")
196 newHeight := w.templatesMessage.Height
197 vers := w.roller.GetAlgoVer(newHeight)
198 nonce++
199 tn := time.Now().Round(time.Second)
200 if tn.After(w.templatesMessage.Timestamp.Round(time.Second)) {
201 w.templatesMessage.Timestamp = tn
202 }
203 if w.roller.C.Load()%w.roller.RoundsPerAlgo.Load() == 0 {
204 D.Ln("switching algorithms", w.roller.C.Load())
205 // send out broadcast containing worker nonce and algorithm and count of blocks
206 w.hashCount.Store(w.hashCount.Load() + uint64(w.roller.RoundsPerAlgo.Load()))
207 hashReport := hashrate.Get(w.roller.RoundsPerAlgo.Load(), vers, newHeight, w.id)
208 e := w.dispatchConn.SendMany(
209 hashrate.Magic,
210 transport.GetShards(hashReport),
211 )
212 if e != nil {
213 }
214 // reseed the nonce
215 rand.Seed(time.Now().UnixNano())
216 nonce = rand.Uint32()
217 select {
218 case <-w.quit.Wait():
219 D.Ln("breaking out of work loop")
220 break out
221 case <-w.stopChan.Wait():
222 D.Ln("received pause signal while running")
223 break running
224 default:
225 }
226 }
227 blockHeader := w.templatesMessage.GenBlockHeader(vers)
228 blockHeader.Nonce = nonce
229 // D.S(w.templatesMessage)
230 // D.S(blockHeader)
231 hash := blockHeader.BlockHashWithAlgos(newHeight)
232 bigHash := blockchain.HashToBig(&hash)
233 if bigHash.Cmp(bits.CompactToBig(blockHeader.Bits)) <= 0 {
234 D.Ln("found solution", newHeight, w.templatesMessage.Nonce, w.templatesMessage.UUID)
235 srs := sol.Encode(w.templatesMessage.Nonce, w.templatesMessage.UUID, blockHeader)
236 e := w.dispatchConn.SendMany(
237 sol.Magic,
238 transport.GetShards(srs),
239 )
240 if e != nil {
241 }
242 D.Ln("sent solution")
243 w.templatesMessage = nil
244 select {
245 case <-w.quit.Wait():
246 D.Ln("breaking out of work loop")
247 break out
248 default:
249 }
250 break running
251 }
252 // D.Ln("completed mining round")
253 }
254 }
255 }
256 }
257 D.Ln("worker finished")
258 interrupt.Request()
259 }
260
261 // New initialises the state for a worker, loading the work function handler that runs a round of processing between
262 // checking quit signal and work semaphore
263 func New(id string, quit qu.C, uuid uint64) (w *Worker, conn net.Conn) {
264 // log.L.SetLevel("trace", true)
265 sc := pipe.New(os.Stdin, os.Stdout, quit)
266
267 return NewWithConnAndSemaphore(id, sc, quit, uuid), sc
268 }
269
270 // NewJob is a delivery of a new job for the worker, this makes the miner start
271 // mining from pause or pause, prepare the work and restart
272 func (w *Worker) NewJob(j *templates.Message, reply *bool) (e error) {
273 // T.Ln("received new job")
274 if !w.dispatchReady.Load() {
275 D.Ln("dispatch not ready")
276 *reply = true
277 return
278 }
279 if w.templatesMessage != nil {
280 if j.PrevBlock == w.templatesMessage.PrevBlock {
281 // T.Ln("not a new job")
282 *reply = true
283 return
284 }
285 }
286 // D.S(j)
287 *reply = true
288 D.Ln("halting current work")
289 w.stopChan <- struct{}{}
290 D.Ln("halt signal sent")
291 // load the job into the template
292 if w.templatesMessage == nil {
293 w.templatesMessage = j
294 } else {
295 *w.templatesMessage = *j
296 }
297 D.Ln("switching to new job")
298 w.startChan <- struct{}{}
299 D.Ln("start signal sent")
300 return
301 }
302
303 // Pause signals the worker to stop working, releases its semaphore and the worker is then idle
304 func (w *Worker) Pause(_ int, reply *bool) (e error) {
305 T.Ln("pausing from IPC")
306 w.running.Store(false)
307 w.stopChan <- struct{}{}
308 *reply = true
309 return
310 }
311
312 // Stop signals the worker to quit
313 func (w *Worker) Stop(_ int, reply *bool) (e error) {
314 D.Ln("stopping from IPC")
315 w.stopChan <- struct{}{}
316 defer w.quit.Q()
317 *reply = true
318 // time.Sleep(time.Second * 3)
319 // os.Exit(0)
320 return
321 }
322
323 // SendPass gives the encryption key configured in the kopach controller ( pod) configuration to allow workers to
324 // dispatch their solutions
325 func (w *Worker) SendPass(pass []byte, reply *bool) (e error) {
326 D.Ln("receiving dispatch password", pass)
327 rand.Seed(time.Now().UnixNano())
328 // sp := fmt.Sprint(rand.Intn(32767) + 1025)
329 // rp := fmt.Sprint(rand.Intn(32767) + 1025)
330 var conn *transport.Channel
331 conn, e = transport.NewBroadcastChannel(
332 "kopachworker",
333 w,
334 pass,
335 transport.DefaultPort,
336 constant.MaxDatagramSize,
337 transport.Handlers{},
338 w.quit,
339 )
340 if e != nil {
341 }
342 w.dispatchConn = conn
343 w.dispatchReady.Store(true)
344 *reply = true
345 return
346 }
347