// bench-workers measures the parallel ingest worker pool throughput. // Single-thread Stage A baseline vs N-worker pool with epoll-driven // collection. Worker count = NumWorkers (default 8 — matches thread // count on the test machine). package main import ( "encoding/binary" "fmt" "os" "runtime" "syscall" "time" "smesh.lol/pkg/nostr/envelope" "smesh.lol/pkg/nostr/event" "smesh.lol/pkg/nostr/signer/p8k" "smesh.lol/pkg/relay/wire" ) const NumWorkers = 8 func epollAdd(epfd int, fd int32) error { ev := syscall.EpollEvent{Events: syscall.EPOLLIN, Fd: fd} return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int(fd), &ev) } func main() { n := 1000 if len(os.Args) > 1 { fmt.Sscanf(string(os.Args[1]), "%d", &n) } fmt.Fprintf(os.Stderr, "generating %d signed events...\n", n) events := generateEvents(n) fmt.Fprintf(os.Stderr, "generated %d events, sample size=%d bytes\n", len(events), len(events[0])) // Phase 1: single-thread baseline. fmt.Fprintln(os.Stderr, "\n=== single-thread baseline ===") t0 := time.Now() rejects := 0 for i, ev := range events { req := wire.IngestRequest{ReqID: uint32(i), Bytes: ev} resp := processOneInline(req) if resp.Verdict == wire.VerdictReject { rejects++ } } elapsed1 := time.Since(t0) fmt.Fprintf(os.Stderr, "single-thread: %d events in %v, %.0f evt/s, %d rejects\n", len(events), elapsed1, float64(len(events))/elapsed1.Seconds(), rejects) // Phase 2: N-worker pool. Channels and per-worker state in arrays — // the moxie compiler now routes Codec-typed chan ops through the // runtime ChanSendDual / ChanRecvDual dispatch, which checks // ch.pipeBoundFd at runtime, so chans accessed via index work. fmt.Fprintf(os.Stderr, "\n=== %d-worker pool ===\n", NumWorkers) in := [NumWorkers]chan wire.IngestRequest{} out := [NumWorkers]chan wire.IngestResponse{} fd := [NumWorkers]int32{} busy := [NumWorkers]bool{} for i := 0; i < NumWorkers; i++ { in[i] = chan wire.IngestRequest{} out[i] = chan wire.IngestResponse{} spawn(workerLoop, in[i], out[i]) fd[i] = runtime.LastSpawnedParentFd() } epfd, err := syscall.EpollCreate1(0) if err != nil { fmt.Fprintf(os.Stderr, "epoll_create1: %v\n", err) os.Exit(1) } for i := 0; i < NumWorkers; i++ { if err := epollAdd(epfd, fd[i]); err != nil { fmt.Fprintf(os.Stderr, "epoll add: %v\n", err) os.Exit(1) } } epev := []syscall.EpollEvent{:NumWorkers * 2} t0 = time.Now() dispatched := 0 collected := 0 rejects = 0 for collected < n { // Dispatch as many as fit on idle workers. for dispatched < n { placed := false for i := 0; i < NumWorkers; i++ { if !busy[i] { in[i] <- wire.IngestRequest{ReqID: uint32(dispatched), Bytes: events[dispatched]} busy[i] = true dispatched++ placed = true break } } if !placed { break // all busy } } // Block in the kernel until any worker has a response. Interrupt- // driven wake-up; zero CPU spent waiting. nev, err := syscall.EpollWait(epfd, epev, -1) if err != nil { if err == syscall.EINTR { continue } fmt.Fprintf(os.Stderr, "epoll_wait: %v\n", err) os.Exit(1) } for j := 0; j < nev; j++ { rfd := epev[j].Fd for i := 0; i < NumWorkers; i++ { if fd[i] == rfd { resp := <-out[i] if resp.Verdict == wire.VerdictReject { rejects++ } busy[i] = false collected++ break } } } } elapsed2 := time.Since(t0) fmt.Fprintf(os.Stderr, "%d-worker: %d events in %v, %.0f evt/s, %d rejects\n", NumWorkers, len(events), elapsed2, float64(len(events))/elapsed2.Seconds(), rejects) speedup := elapsed1.Seconds() / elapsed2.Seconds() fmt.Fprintf(os.Stderr, "\nspeedup: %.2fx\n", speedup) } func workerLoop(in chan wire.IngestRequest, out chan wire.IngestResponse) { for { req, ok := <-in if !ok { return // parent shut down } resp := processOneInline(req) out <- resp } } func processOneInline(req wire.IngestRequest) wire.IngestResponse { resp := wire.IngestResponse{ReqID: req.ReqID, Bytes: req.Bytes} label, rem, _ := envelope.Identify(req.Bytes) if label != envelope.EventLabel { resp.Verdict = wire.VerdictReject return resp } var es envelope.EventSubmission if _, err := es.Unmarshal(rem); err != nil || es.E == nil { resp.Verdict = wire.VerdictReject return resp } ev := es.E if len(ev.ID) != 32 || len(ev.Pubkey) != 32 || len(ev.Sig) != 64 { resp.Verdict = wire.VerdictReject return resp } canonical := ev.GetIDBytes() for i := 0; i < 32; i++ { if canonical[i] != ev.ID[i] { resp.Verdict = wire.VerdictReject return resp } } valid, _ := ev.Verify() if !valid { resp.Verdict = wire.VerdictReject return resp } resp.Kind = ev.Kind resp.CreatedAt = ev.CreatedAt copy(resp.Pubkey[:], ev.Pubkey) copy(resp.EventID[:], ev.ID) resp.Verdict = wire.VerdictAccept return resp } func generateEvents(n int) [][]byte { s := p8k.MustNew() if err := s.Generate(); err != nil { fmt.Fprintf(os.Stderr, "signer init: %v\n", err) os.Exit(1) } out := [][]byte{:0:n} now := int64(time.Now().Unix()) for i := 0; i < n; i++ { ev := event.New() ev.Kind = 1 ev.CreatedAt = now + int64(i) var contentBuf [200]byte binary.LittleEndian.PutUint32(contentBuf[0:4], uint32(i)) for j := 4; j < 200; j++ { contentBuf[j] = byte((i + j) & 0x7f) } ev.Content = contentBuf[:] if err := ev.Sign(s); err != nil { fmt.Fprintf(os.Stderr, "sign %d: %v\n", i, err) os.Exit(1) } es := envelope.EventSubmission{E: ev} out = append(out, es.Marshal(nil)) } return out }