kopach.go raw
1 package kopach
2
3 import (
4 "context"
5 "crypto/rand"
6 "fmt"
7 "net"
8 "os"
9 "runtime"
10 "time"
11
12 "github.com/niubaoshu/gotiny"
13
14 "github.com/p9c/p9/pkg/log"
15 "github.com/p9c/p9/pkg/chainrpc/p2padvt"
16 "github.com/p9c/p9/pkg/chainrpc/templates"
17 "github.com/p9c/p9/pkg/constant"
18 "github.com/p9c/p9/pkg/pipe"
19 "github.com/p9c/p9/pod/state"
20
21 "github.com/p9c/p9/pkg/qu"
22
23 "github.com/VividCortex/ewma"
24 "go.uber.org/atomic"
25
26 "github.com/p9c/p9/pkg/interrupt"
27
28 "github.com/p9c/p9/cmd/kopach/client"
29 "github.com/p9c/p9/pkg/chainhash"
30 "github.com/p9c/p9/pkg/chainrpc/hashrate"
31 "github.com/p9c/p9/pkg/chainrpc/job"
32 "github.com/p9c/p9/pkg/chainrpc/pause"
33 rav "github.com/p9c/p9/pkg/ring"
34 "github.com/p9c/p9/pkg/transport"
35 )
36
37 var maxThreads = float32(runtime.NumCPU())
38
39 type HashCount struct {
40 uint64
41 Time time.Time
42 }
43
44 type SolutionData struct {
45 time time.Time
46 height int
47 algo string
48 hash string
49 indexHash string
50 version int32
51 prevBlock string
52 merkleRoot string
53 timestamp time.Time
54 bits uint32
55 nonce uint32
56 }
57
58 type Worker struct {
59 id string
60 cx *state.State
61 height int32
62 active atomic.Bool
63 conn *transport.Channel
64 ctx context.Context
65 quit qu.C
66 sendAddresses []*net.UDPAddr
67 clients []*client.Client
68 workers []*pipe.Worker
69 FirstSender atomic.Uint64
70 lastSent atomic.Int64
71 Status atomic.String
72 HashTick chan HashCount
73 LastHash *chainhash.Hash
74 StartChan, StopChan qu.C
75 // SetThreads chan int
76 PassChan chan string
77 solutions []SolutionData
78 solutionCount int
79 Update qu.C
80 hashCount atomic.Uint64
81 hashSampleBuf *rav.BufferUint64
82 hashrate float64
83 lastNonce uint64
84 }
85
86 func (w *Worker) Start() {
87 D.Ln("starting up kopach workers")
88 w.workers = []*pipe.Worker{}
89 w.clients = []*client.Client{}
90 for i := 0; i < w.cx.Config.GenThreads.V(); i++ {
91 D.Ln("starting worker", i)
92 cmd, _ := pipe.Spawn(w.quit, os.Args[0], "worker", w.id, w.cx.ActiveNet.Name, w.cx.Config.LogLevel.V())
93 w.workers = append(w.workers, cmd)
94 w.clients = append(w.clients, client.New(cmd.StdConn))
95 }
96 for i := range w.clients {
97 T.Ln("sending pass to worker", i)
98 e := w.clients[i].SendPass(w.cx.Config.MulticastPass.Bytes())
99 if e != nil {
100 }
101 }
102 D.Ln("setting workers to active")
103 w.active.Store(true)
104
105 }
106
107 func (w *Worker) Stop() {
108 var e error
109 for i := range w.clients {
110 if e = w.clients[i].Pause(); E.Chk(e) {
111 }
112 if e = w.clients[i].Stop(); E.Chk(e) {
113 }
114 if e = w.clients[i].Close(); E.Chk(e) {
115 }
116 }
117 for i := range w.workers {
118 // if e = w.workers[i].Interrupt(); !E.Chk(e) {
119 // }
120 if e = w.workers[i].Kill(); !E.Chk(e) {
121 }
122 D.Ln("stopped worker", i)
123 }
124 w.active.Store(false)
125 w.quit.Q()
126 }
127
128 // Run the miner
129 func Run(cx *state.State) (e error) {
130 D.Ln("miner starting")
131 randomBytes := make([]byte, 4)
132 if _, e = rand.Read(randomBytes); E.Chk(e) {
133 }
134 w := &Worker{
135 id: fmt.Sprintf("%x", randomBytes),
136 cx: cx,
137 quit: cx.KillAll,
138 sendAddresses: []*net.UDPAddr{},
139 StartChan: qu.T(),
140 StopChan: qu.T(),
141 // SetThreads: make(chan int),
142 solutions: make([]SolutionData, 0, 2048),
143 Update: qu.T(),
144 hashSampleBuf: rav.NewBufferUint64(1000),
145 }
146 w.lastSent.Store(time.Now().UnixNano())
147 w.active.Store(false)
148 D.Ln("opening broadcast channel listener")
149 w.conn, e = transport.NewBroadcastChannel(
150 "kopachmain", w, cx.Config.MulticastPass.Bytes(),
151 transport.DefaultPort, constant.MaxDatagramSize, handlers,
152 w.quit,
153 )
154 if e != nil {
155 return
156 }
157 // start up the workers
158 // if cx.Config.Generate.True() {
159 I.Ln("starting up miner workers")
160 w.Start()
161 interrupt.AddHandler(
162 func() {
163 w.Stop()
164 },
165 )
166 // }
167 // controller watcher thread
168 go func() {
169 D.Ln("starting controller watcher")
170 ticker := time.NewTicker(time.Second)
171 logger := time.NewTicker(time.Second)
172 out:
173 for {
174 select {
175 case <-ticker.C:
176 W.Ln("controller watcher ticker")
177 // if the last message sent was 3 seconds ago the server is almost certainly disconnected or crashed
178 // so clear FirstSender
179 since := time.Now().Sub(time.Unix(0, w.lastSent.Load()))
180 wasSending := since > time.Second*6 && w.FirstSender.Load() != 0
181 if wasSending {
182 D.Ln("previous current controller has stopped broadcasting", since, w.FirstSender.Load())
183 // when this string is clear other broadcasts will be listened to
184 w.FirstSender.Store(0)
185 // pause the workers
186 for i := range w.clients {
187 D.Ln("sending pause to worker", i)
188 e := w.clients[i].Pause()
189 if e != nil {
190 }
191 }
192 }
193 // if interrupt.Requested() {
194 // w.StopChan <- struct{}{}
195 // w.quit.Q()
196 // }
197 case <-logger.C:
198 W.Ln("hash report ticker")
199 w.hashrate = w.HashReport()
200 // if interrupt.Requested() {
201 // w.StopChan <- struct{}{}
202 // w.quit.Q()
203 // }
204 case <-w.StartChan.Wait():
205 D.Ln("received signal on StartChan")
206 cx.Config.Generate.T()
207 // if e = cx.Config.WriteToFile(cx.Config.ConfigFile.V()); E.Chk(e) {
208 // }
209 w.Start()
210 case <-w.StopChan.Wait():
211 D.Ln("received signal on StopChan")
212 cx.Config.Generate.F()
213 // if e = cx.Config.WriteToFile(cx.Config.ConfigFile.V()); E.Chk(e) {
214 // }
215 w.Stop()
216 case s := <-w.PassChan:
217 F.Ln("received signal on PassChan", s)
218 cx.Config.MulticastPass.Set(s)
219 // if e = cx.Config.WriteToFile(cx.Config.ConfigFile.V()); E.Chk(e) {
220 // }
221 w.Stop()
222 w.Start()
223 // case n := <-w.SetThreads:
224 // D.Ln("received signal on SetThreads", n)
225 // cx.Config.GenThreads.Set(n)
226 // // if e = cx.Config.WriteToFile(cx.Config.ConfigFile.V()); E.Chk(e) {
227 // // }
228 // if cx.Config.Generate.True() {
229 // // always sanitise
230 // if n < 0 {
231 // n = int(maxThreads)
232 // }
233 // if n > int(maxThreads) {
234 // n = int(maxThreads)
235 // }
236 // w.Stop()
237 // w.Start()
238 // }
239 case <-w.quit.Wait():
240 D.Ln("stopping from quit")
241 interrupt.Request()
242 break out
243 }
244 }
245 D.Ln("finished kopach miner work loop")
246 log.LogChanDisabled.Store(true)
247 }()
248 D.Ln("listening on", constant.UDP4MulticastAddress)
249 <-w.quit
250 I.Ln("kopach shutting down") // , interrupt.GoroutineDump())
251 // <-interrupt.HandlersDone
252 I.Ln("kopach finished shutdown")
253 return
254 }
255
256 // these are the handlers for specific message types.
257 var handlers = transport.Handlers{
258 string(hashrate.Magic): func(
259 ctx interface{}, src net.Addr, dst string, b []byte,
260 ) (e error) {
261 c := ctx.(*Worker)
262 if !c.active.Load() {
263 D.Ln("not active")
264 return
265 }
266 var hr hashrate.Hashrate
267 gotiny.Unmarshal(b, &hr)
268 // if this is not one of our workers reports ignore it
269 if hr.ID != c.id {
270 return
271 }
272 count := hr.Count
273 hc := c.hashCount.Load() + uint64(count)
274 c.hashCount.Store(hc)
275 return
276 },
277 string(job.Magic): func(
278 ctx interface{}, src net.Addr, dst string,
279 b []byte,
280 ) (e error) {
281 w := ctx.(*Worker)
282 if !w.active.Load() {
283 T.Ln("not active")
284 return
285 }
286 jr := templates.Message{}
287 gotiny.Unmarshal(b, &jr)
288 w.height = jr.Height
289 cN := jr.UUID
290 firstSender := w.FirstSender.Load()
291 otherSent := firstSender != cN && firstSender != 0
292 if otherSent {
293 T.Ln("ignoring other controller job", jr.Nonce, jr.UUID)
294 // ignore other controllers while one is active and received first
295 return
296 }
297 // if jr.Nonce == w.lastNonce {
298 // I.Ln("same job again, ignoring (NOT)")
299 // // return
300 // }
301 // w.lastNonce = jr.Nonce
302 // w.FirstSender.Store(cN)
303 T.Ln("received job, starting workers on it", jr.Nonce, jr.UUID)
304 w.lastSent.Store(time.Now().UnixNano())
305 for i := range w.clients {
306 if e = w.clients[i].NewJob(&jr); E.Chk(e) {
307 }
308 }
309 return
310 },
311 string(pause.Magic): func(
312 ctx interface{}, src net.Addr, dst string, b []byte,
313 ) (e error) {
314 w := ctx.(*Worker)
315 var advt p2padvt.Advertisment
316 gotiny.Unmarshal(b, &advt)
317 // p := pause.LoadPauseContainer(b)
318 fs := w.FirstSender.Load()
319 ni := advt.IPs
320 // ni := p.GetIPs()[0].String()
321 np := advt.UUID
322 // np := p.GetControllerListenerPort()
323 // ns := net.JoinHostPort(strings.Split(ni.String(), ":")[0], fmt.Sprint(np))
324 D.Ln("received pause from server at", ni, np, "stopping", len(w.clients), "workers stopping")
325 if fs == np {
326 for i := range w.clients {
327 // D.Ln("sending pause to worker", i, fs, np)
328 e := w.clients[i].Pause()
329 if e != nil {
330 }
331 }
332 }
333 w.FirstSender.Store(0)
334 return
335 },
336 // string(sol.Magic): func(
337 // ctx interface{}, src net.Addr, dst string,
338 // b []byte,
339 // ) (e error) {
340 // // w := ctx.(*Worker)
341 // // I.Ln("shuffling work due to solution on network")
342 // // w.FirstSender.Store(0)
343 // // D.Ln("solution detected from miner at", src)
344 // // portSlice := strings.Split(w.FirstSender.Load(), ":")
345 // // if len(portSlice) < 2 {
346 // // D.Ln("error with solution", w.FirstSender.Load(), portSlice)
347 // // return
348 // // }
349 // // // port := portSlice[1]
350 // // // j := sol.LoadSolContainer(b)
351 // // // senderPort := j.GetSenderPort()
352 // // // if fmt.Sprint(senderPort) == port {
353 // // // // W.Ln("we found a solution")
354 // // // // prepend to list of solutions for GUI display if enabled
355 // // // if *w.cx.Config.KopachGUI {
356 // // // // D.Ln("length solutions", len(w.solutions))
357 // // // blok := j.GetMsgBlock()
358 // // // w.solutions = append(
359 // // // w.solutions, []SolutionData{
360 // // // {
361 // // // time: time.Now(),
362 // // // height: int(w.height),
363 // // // algo: fmt.Sprint(
364 // // // fork.GetAlgoName(blok.Header.Version, w.height),
365 // // // ),
366 // // // hash: blok.Header.BlockHashWithAlgos(w.height).String(),
367 // // // indexHash: blok.Header.BlockHash().String(),
368 // // // version: blok.Header.Version,
369 // // // prevBlock: blok.Header.PrevBlock.String(),
370 // // // merkleRoot: blok.Header.MerkleRoot.String(),
371 // // // timestamp: blok.Header.Timestamp,
372 // // // bits: blok.Header.Bits,
373 // // // nonce: blok.Header.Nonce,
374 // // // },
375 // // // }...,
376 // // // )
377 // // // if len(w.solutions) > 2047 {
378 // // // w.solutions = w.solutions[len(w.solutions)-2047:]
379 // // // }
380 // // // w.solutionCount = len(w.solutions)
381 // // // w.Update <- struct{}{}
382 // // // }
383 // // // }
384 // // // D.Ln("no longer listening to", w.FirstSender.Load())
385 // // // w.FirstSender.Store("")
386 // return
387 // },
388 }
389
390 func (w *Worker) HashReport() float64 {
391 W.Ln("generating hash report")
392 w.hashSampleBuf.Add(w.hashCount.Load())
393 av := ewma.NewMovingAverage()
394 var i int
395 var prev uint64
396 if e := w.hashSampleBuf.ForEach(
397 func(v uint64) (e error) {
398 if i < 1 {
399 prev = v
400 } else {
401 interval := v - prev
402 av.Add(float64(interval))
403 prev = v
404 }
405 i++
406 return nil
407 },
408 ); E.Chk(e) {
409 }
410 average := av.Value()
411 W.Ln("hashrate average", average)
412 // panic("aaargh")
413 return average
414 }
415