main.mx raw
1 // bench-workers measures the parallel ingest worker pool throughput.
2 // Single-thread Stage A baseline vs N-worker pool with epoll-driven
3 // collection. Worker count = NumWorkers (default 8 — matches thread
4 // count on the test machine).
5 package main
6
7 import (
8 "encoding/binary"
9 "fmt"
10 "os"
11 "runtime"
12 "syscall"
13 "time"
14
15 "smesh.lol/pkg/nostr/envelope"
16 "smesh.lol/pkg/nostr/event"
17 "smesh.lol/pkg/nostr/signer/p8k"
18 "smesh.lol/pkg/relay/wire"
19 )
20
21 const NumWorkers = 8
22
23 func epollAdd(epfd int, fd int32) error {
24 ev := syscall.EpollEvent{Events: syscall.EPOLLIN, Fd: fd}
25 return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int(fd), &ev)
26 }
27
28 func main() {
29 n := 1000
30 if len(os.Args) > 1 {
31 fmt.Sscanf(string(os.Args[1]), "%d", &n)
32 }
33
34 fmt.Fprintf(os.Stderr, "generating %d signed events...\n", n)
35 events := generateEvents(n)
36 fmt.Fprintf(os.Stderr, "generated %d events, sample size=%d bytes\n", len(events), len(events[0]))
37
38 // Phase 1: single-thread baseline.
39 fmt.Fprintln(os.Stderr, "\n=== single-thread baseline ===")
40 t0 := time.Now()
41 rejects := 0
42 for i, ev := range events {
43 req := wire.IngestRequest{ReqID: uint32(i), Bytes: ev}
44 resp := processOneInline(req)
45 if resp.Verdict == wire.VerdictReject {
46 rejects++
47 }
48 }
49 elapsed1 := time.Since(t0)
50 fmt.Fprintf(os.Stderr, "single-thread: %d events in %v, %.0f evt/s, %d rejects\n",
51 len(events), elapsed1, float64(len(events))/elapsed1.Seconds(), rejects)
52
53 // Phase 2: N-worker pool. Channels and per-worker state in arrays —
54 // the moxie compiler now routes Codec-typed chan ops through the
55 // runtime ChanSendDual / ChanRecvDual dispatch, which checks
56 // ch.pipeBoundFd at runtime, so chans accessed via index work.
57 fmt.Fprintf(os.Stderr, "\n=== %d-worker pool ===\n", NumWorkers)
58
59 in := [NumWorkers]chan wire.IngestRequest{}
60 out := [NumWorkers]chan wire.IngestResponse{}
61 fd := [NumWorkers]int32{}
62 busy := [NumWorkers]bool{}
63
64 for i := 0; i < NumWorkers; i++ {
65 in[i] = chan wire.IngestRequest{}
66 out[i] = chan wire.IngestResponse{}
67 spawn(workerLoop, in[i], out[i])
68 fd[i] = runtime.LastSpawnedParentFd()
69 }
70
71 epfd, err := syscall.EpollCreate1(0)
72 if err != nil {
73 fmt.Fprintf(os.Stderr, "epoll_create1: %v\n", err)
74 os.Exit(1)
75 }
76 for i := 0; i < NumWorkers; i++ {
77 if err := epollAdd(epfd, fd[i]); err != nil {
78 fmt.Fprintf(os.Stderr, "epoll add: %v\n", err)
79 os.Exit(1)
80 }
81 }
82 epev := []syscall.EpollEvent{:NumWorkers * 2}
83
84 t0 = time.Now()
85 dispatched := 0
86 collected := 0
87 rejects = 0
88
89 for collected < n {
90 // Dispatch as many as fit on idle workers.
91 for dispatched < n {
92 placed := false
93 for i := 0; i < NumWorkers; i++ {
94 if !busy[i] {
95 in[i] <- wire.IngestRequest{ReqID: uint32(dispatched), Bytes: events[dispatched]}
96 busy[i] = true
97 dispatched++
98 placed = true
99 break
100 }
101 }
102 if !placed {
103 break // all busy
104 }
105 }
106
107 // Block in the kernel until any worker has a response. Interrupt-
108 // driven wake-up; zero CPU spent waiting.
109 nev, err := syscall.EpollWait(epfd, epev, -1)
110 if err != nil {
111 if err == syscall.EINTR {
112 continue
113 }
114 fmt.Fprintf(os.Stderr, "epoll_wait: %v\n", err)
115 os.Exit(1)
116 }
117 for j := 0; j < nev; j++ {
118 rfd := epev[j].Fd
119 for i := 0; i < NumWorkers; i++ {
120 if fd[i] == rfd {
121 resp := <-out[i]
122 if resp.Verdict == wire.VerdictReject {
123 rejects++
124 }
125 busy[i] = false
126 collected++
127 break
128 }
129 }
130 }
131 }
132
133 elapsed2 := time.Since(t0)
134 fmt.Fprintf(os.Stderr, "%d-worker: %d events in %v, %.0f evt/s, %d rejects\n",
135 NumWorkers, len(events), elapsed2, float64(len(events))/elapsed2.Seconds(), rejects)
136
137 speedup := elapsed1.Seconds() / elapsed2.Seconds()
138 fmt.Fprintf(os.Stderr, "\nspeedup: %.2fx\n", speedup)
139 }
140
141 func workerLoop(in chan wire.IngestRequest, out chan wire.IngestResponse) {
142 for {
143 req, ok := <-in
144 if !ok {
145 return // parent shut down
146 }
147 resp := processOneInline(req)
148 out <- resp
149 }
150 }
151
152 func processOneInline(req wire.IngestRequest) wire.IngestResponse {
153 resp := wire.IngestResponse{ReqID: req.ReqID, Bytes: req.Bytes}
154 label, rem, _ := envelope.Identify(req.Bytes)
155 if label != envelope.EventLabel {
156 resp.Verdict = wire.VerdictReject
157 return resp
158 }
159 var es envelope.EventSubmission
160 if _, err := es.Unmarshal(rem); err != nil || es.E == nil {
161 resp.Verdict = wire.VerdictReject
162 return resp
163 }
164 ev := es.E
165 if len(ev.ID) != 32 || len(ev.Pubkey) != 32 || len(ev.Sig) != 64 {
166 resp.Verdict = wire.VerdictReject
167 return resp
168 }
169 canonical := ev.GetIDBytes()
170 for i := 0; i < 32; i++ {
171 if canonical[i] != ev.ID[i] {
172 resp.Verdict = wire.VerdictReject
173 return resp
174 }
175 }
176 valid, _ := ev.Verify()
177 if !valid {
178 resp.Verdict = wire.VerdictReject
179 return resp
180 }
181 resp.Kind = ev.Kind
182 resp.CreatedAt = ev.CreatedAt
183 copy(resp.Pubkey[:], ev.Pubkey)
184 copy(resp.EventID[:], ev.ID)
185 resp.Verdict = wire.VerdictAccept
186 return resp
187 }
188
189 func generateEvents(n int) [][]byte {
190 s := p8k.MustNew()
191 if err := s.Generate(); err != nil {
192 fmt.Fprintf(os.Stderr, "signer init: %v\n", err)
193 os.Exit(1)
194 }
195 out := [][]byte{:0:n}
196 now := int64(time.Now().Unix())
197 for i := 0; i < n; i++ {
198 ev := event.New()
199 ev.Kind = 1
200 ev.CreatedAt = now + int64(i)
201 var contentBuf [200]byte
202 binary.LittleEndian.PutUint32(contentBuf[0:4], uint32(i))
203 for j := 4; j < 200; j++ {
204 contentBuf[j] = byte((i + j) & 0x7f)
205 }
206 ev.Content = contentBuf[:]
207 if err := ev.Sign(s); err != nil {
208 fmt.Fprintf(os.Stderr, "sign %d: %v\n", i, err)
209 os.Exit(1)
210 }
211 es := envelope.EventSubmission{E: ev}
212 out = append(out, es.Marshal(nil))
213 }
214 return out
215 }
216