main.mx raw

   1  // bench-workers measures the parallel ingest worker pool throughput.
   2  // Single-thread Stage A baseline vs N-worker pool with epoll-driven
   3  // collection. Worker count = NumWorkers (default 8 — matches thread
   4  // count on the test machine).
   5  package main
   6  
   7  import (
   8  	"encoding/binary"
   9  	"fmt"
  10  	"os"
  11  	"runtime"
  12  	"syscall"
  13  	"time"
  14  
  15  	"smesh.lol/pkg/nostr/envelope"
  16  	"smesh.lol/pkg/nostr/event"
  17  	"smesh.lol/pkg/nostr/signer/p8k"
  18  	"smesh.lol/pkg/relay/wire"
  19  )
  20  
  21  const NumWorkers = 8
  22  
  23  func epollAdd(epfd int, fd int32) error {
  24  	ev := syscall.EpollEvent{Events: syscall.EPOLLIN, Fd: fd}
  25  	return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int(fd), &ev)
  26  }
  27  
  28  func main() {
  29  	n := 1000
  30  	if len(os.Args) > 1 {
  31  		fmt.Sscanf(string(os.Args[1]), "%d", &n)
  32  	}
  33  
  34  	fmt.Fprintf(os.Stderr, "generating %d signed events...\n", n)
  35  	events := generateEvents(n)
  36  	fmt.Fprintf(os.Stderr, "generated %d events, sample size=%d bytes\n", len(events), len(events[0]))
  37  
  38  	// Phase 1: single-thread baseline.
  39  	fmt.Fprintln(os.Stderr, "\n=== single-thread baseline ===")
  40  	t0 := time.Now()
  41  	rejects := 0
  42  	for i, ev := range events {
  43  		req := wire.IngestRequest{ReqID: uint32(i), Bytes: ev}
  44  		resp := processOneInline(req)
  45  		if resp.Verdict == wire.VerdictReject {
  46  			rejects++
  47  		}
  48  	}
  49  	elapsed1 := time.Since(t0)
  50  	fmt.Fprintf(os.Stderr, "single-thread: %d events in %v, %.0f evt/s, %d rejects\n",
  51  		len(events), elapsed1, float64(len(events))/elapsed1.Seconds(), rejects)
  52  
  53  	// Phase 2: N-worker pool. Channels and per-worker state in arrays —
  54  	// the moxie compiler now routes Codec-typed chan ops through the
  55  	// runtime ChanSendDual / ChanRecvDual dispatch, which checks
  56  	// ch.pipeBoundFd at runtime, so chans accessed via index work.
  57  	fmt.Fprintf(os.Stderr, "\n=== %d-worker pool ===\n", NumWorkers)
  58  
  59  	in := [NumWorkers]chan wire.IngestRequest{}
  60  	out := [NumWorkers]chan wire.IngestResponse{}
  61  	fd := [NumWorkers]int32{}
  62  	busy := [NumWorkers]bool{}
  63  
  64  	for i := 0; i < NumWorkers; i++ {
  65  		in[i] = chan wire.IngestRequest{}
  66  		out[i] = chan wire.IngestResponse{}
  67  		spawn(workerLoop, in[i], out[i])
  68  		fd[i] = runtime.LastSpawnedParentFd()
  69  	}
  70  
  71  	epfd, err := syscall.EpollCreate1(0)
  72  	if err != nil {
  73  		fmt.Fprintf(os.Stderr, "epoll_create1: %v\n", err)
  74  		os.Exit(1)
  75  	}
  76  	for i := 0; i < NumWorkers; i++ {
  77  		if err := epollAdd(epfd, fd[i]); err != nil {
  78  			fmt.Fprintf(os.Stderr, "epoll add: %v\n", err)
  79  			os.Exit(1)
  80  		}
  81  	}
  82  	epev := []syscall.EpollEvent{:NumWorkers * 2}
  83  
  84  	t0 = time.Now()
  85  	dispatched := 0
  86  	collected := 0
  87  	rejects = 0
  88  
  89  	for collected < n {
  90  		// Dispatch as many as fit on idle workers.
  91  		for dispatched < n {
  92  			placed := false
  93  			for i := 0; i < NumWorkers; i++ {
  94  				if !busy[i] {
  95  					in[i] <- wire.IngestRequest{ReqID: uint32(dispatched), Bytes: events[dispatched]}
  96  					busy[i] = true
  97  					dispatched++
  98  					placed = true
  99  					break
 100  				}
 101  			}
 102  			if !placed {
 103  				break // all busy
 104  			}
 105  		}
 106  
 107  		// Block in the kernel until any worker has a response. Interrupt-
 108  		// driven wake-up; zero CPU spent waiting.
 109  		nev, err := syscall.EpollWait(epfd, epev, -1)
 110  		if err != nil {
 111  			if err == syscall.EINTR {
 112  				continue
 113  			}
 114  			fmt.Fprintf(os.Stderr, "epoll_wait: %v\n", err)
 115  			os.Exit(1)
 116  		}
 117  		for j := 0; j < nev; j++ {
 118  			rfd := epev[j].Fd
 119  			for i := 0; i < NumWorkers; i++ {
 120  				if fd[i] == rfd {
 121  					resp := <-out[i]
 122  					if resp.Verdict == wire.VerdictReject {
 123  						rejects++
 124  					}
 125  					busy[i] = false
 126  					collected++
 127  					break
 128  				}
 129  			}
 130  		}
 131  	}
 132  
 133  	elapsed2 := time.Since(t0)
 134  	fmt.Fprintf(os.Stderr, "%d-worker:    %d events in %v, %.0f evt/s, %d rejects\n",
 135  		NumWorkers, len(events), elapsed2, float64(len(events))/elapsed2.Seconds(), rejects)
 136  
 137  	speedup := elapsed1.Seconds() / elapsed2.Seconds()
 138  	fmt.Fprintf(os.Stderr, "\nspeedup: %.2fx\n", speedup)
 139  }
 140  
 141  func workerLoop(in chan wire.IngestRequest, out chan wire.IngestResponse) {
 142  	for {
 143  		req, ok := <-in
 144  		if !ok {
 145  			return // parent shut down
 146  		}
 147  		resp := processOneInline(req)
 148  		out <- resp
 149  	}
 150  }
 151  
 152  func processOneInline(req wire.IngestRequest) wire.IngestResponse {
 153  	resp := wire.IngestResponse{ReqID: req.ReqID, Bytes: req.Bytes}
 154  	label, rem, _ := envelope.Identify(req.Bytes)
 155  	if label != envelope.EventLabel {
 156  		resp.Verdict = wire.VerdictReject
 157  		return resp
 158  	}
 159  	var es envelope.EventSubmission
 160  	if _, err := es.Unmarshal(rem); err != nil || es.E == nil {
 161  		resp.Verdict = wire.VerdictReject
 162  		return resp
 163  	}
 164  	ev := es.E
 165  	if len(ev.ID) != 32 || len(ev.Pubkey) != 32 || len(ev.Sig) != 64 {
 166  		resp.Verdict = wire.VerdictReject
 167  		return resp
 168  	}
 169  	canonical := ev.GetIDBytes()
 170  	for i := 0; i < 32; i++ {
 171  		if canonical[i] != ev.ID[i] {
 172  			resp.Verdict = wire.VerdictReject
 173  			return resp
 174  		}
 175  	}
 176  	valid, _ := ev.Verify()
 177  	if !valid {
 178  		resp.Verdict = wire.VerdictReject
 179  		return resp
 180  	}
 181  	resp.Kind = ev.Kind
 182  	resp.CreatedAt = ev.CreatedAt
 183  	copy(resp.Pubkey[:], ev.Pubkey)
 184  	copy(resp.EventID[:], ev.ID)
 185  	resp.Verdict = wire.VerdictAccept
 186  	return resp
 187  }
 188  
 189  func generateEvents(n int) [][]byte {
 190  	s := p8k.MustNew()
 191  	if err := s.Generate(); err != nil {
 192  		fmt.Fprintf(os.Stderr, "signer init: %v\n", err)
 193  		os.Exit(1)
 194  	}
 195  	out := [][]byte{:0:n}
 196  	now := int64(time.Now().Unix())
 197  	for i := 0; i < n; i++ {
 198  		ev := event.New()
 199  		ev.Kind = 1
 200  		ev.CreatedAt = now + int64(i)
 201  		var contentBuf [200]byte
 202  		binary.LittleEndian.PutUint32(contentBuf[0:4], uint32(i))
 203  		for j := 4; j < 200; j++ {
 204  			contentBuf[j] = byte((i + j) & 0x7f)
 205  		}
 206  		ev.Content = contentBuf[:]
 207  		if err := ev.Sign(s); err != nil {
 208  			fmt.Fprintf(os.Stderr, "sign %d: %v\n", i, err)
 209  			os.Exit(1)
 210  		}
 211  		es := envelope.EventSubmission{E: ev}
 212  		out = append(out, es.Marshal(nil))
 213  	}
 214  	return out
 215  }
 216