stress_metrics.py raw

   1  #!/usr/bin/env python3
   2  """B.0.7 stress test driver.
   3  
   4  Spawns N parallel WebSocket clients against the local relay, each
   5  firing M signed events. Measures wall-clock throughput and OK round-
   6  trip latency, then scrapes /metrics for server-side histograms.
   7  
   8  Usage:
   9      test/stress_metrics.py --port 18099 --conns 50 --events-per-conn 200
  10  """
  11  import argparse
  12  import asyncio
  13  import hashlib
  14  import json
  15  import secrets
  16  import statistics
  17  import sys
  18  import time
  19  import urllib.request
  20  
  21  # Lazy import — only required when actually running.
  22  try:
  23      import websockets
  24  except ImportError:
  25      print("error: pip install websockets coincurve", file=sys.stderr)
  26      sys.exit(1)
  27  try:
  28      from coincurve import PrivateKey
  29  except ImportError:
  30      print("error: pip install coincurve", file=sys.stderr)
  31      sys.exit(1)
  32  
  33  
  34  def make_event(privkey, kind=1, content_size=200, ref=None):
  35      """Construct a signed Nostr event. BIP340 Schnorr signature."""
  36      pubkey_bytes = privkey.public_key.format(compressed=True)[1:]  # x-only
  37      pubkey_hex = pubkey_bytes.hex()
  38  
  39      created_at = int(time.time())
  40      content = secrets.token_hex(content_size // 2)
  41      tags = []
  42      if ref is not None:
  43          tags = [["e", ref]]
  44  
  45      serialized = json.dumps(
  46          [0, pubkey_hex, created_at, kind, tags, content], separators=(",", ":"), ensure_ascii=False,
  47      ).encode("utf-8")
  48      eid = hashlib.sha256(serialized).hexdigest()
  49      eid_bytes = bytes.fromhex(eid)
  50  
  51      sig = privkey.sign_schnorr(eid_bytes, aux_randomness=secrets.token_bytes(32)).hex()
  52  
  53      return {
  54          "id": eid,
  55          "pubkey": pubkey_hex,
  56          "created_at": created_at,
  57          "kind": kind,
  58          "tags": tags,
  59          "content": content,
  60          "sig": sig,
  61      }
  62  
  63  
  64  async def conn_worker(idx, url, events_per_conn, content_size, results):
  65      """One WebSocket connection sending events_per_conn events."""
  66      privkey = PrivateKey(secrets.token_bytes(32))
  67      latencies = []
  68      rejected = 0
  69      accepted = 0
  70  
  71      async with websockets.connect(url, max_size=20 * 1024 * 1024) as ws:
  72          for i in range(events_per_conn):
  73              ev = make_event(privkey, content_size=content_size)
  74              send_t = time.perf_counter_ns()
  75              await ws.send(json.dumps(["EVENT", ev]))
  76              ok = await ws.recv()
  77              recv_t = time.perf_counter_ns()
  78              latencies.append(recv_t - send_t)
  79              try:
  80                  msg = json.loads(ok)
  81                  if msg[0] == "OK" and msg[2] is True:
  82                      accepted += 1
  83                  else:
  84                      rejected += 1
  85              except Exception:
  86                  rejected += 1
  87  
  88      results.append({
  89          "conn": idx,
  90          "latencies_ns": latencies,
  91          "accepted": accepted,
  92          "rejected": rejected,
  93      })
  94  
  95  
  96  async def run_stress(url, conns, events_per_conn, content_size):
  97      print(f"connecting {conns} clients, {events_per_conn} events each, content={content_size}B")
  98      tasks = []
  99      results = []
 100      t0 = time.perf_counter()
 101      for idx in range(conns):
 102          tasks.append(asyncio.create_task(
 103              conn_worker(idx, url, events_per_conn, content_size, results)
 104          ))
 105      await asyncio.gather(*tasks)
 106      elapsed = time.perf_counter() - t0
 107      return elapsed, results
 108  
 109  
 110  def percentile(latencies, p):
 111      if not latencies:
 112          return 0
 113      s = sorted(latencies)
 114      idx = max(0, int(len(s) * p / 100) - 1)
 115      return s[min(idx, len(s) - 1)]
 116  
 117  
 118  def fmt_ns(n):
 119      if n < 1_000:
 120          return f"{n}ns"
 121      if n < 1_000_000:
 122          return f"{n/1_000:.1f}µs"
 123      if n < 1_000_000_000:
 124          return f"{n/1_000_000:.2f}ms"
 125      return f"{n/1_000_000_000:.3f}s"
 126  
 127  
 128  def main():
 129      ap = argparse.ArgumentParser()
 130      ap.add_argument("--port", type=int, default=18099)
 131      ap.add_argument("--host", default="127.0.0.1")
 132      ap.add_argument("--conns", type=int, default=50)
 133      ap.add_argument("--events-per-conn", type=int, default=200)
 134      ap.add_argument("--content-size", type=int, default=200)
 135      args = ap.parse_args()
 136  
 137      base = f"http://{args.host}:{args.port}"
 138      ws_url = f"ws://{args.host}:{args.port}/"
 139  
 140      # Reset server-side metrics so we measure only this run.
 141      try:
 142          urllib.request.urlopen(f"{base}/metrics/reset", timeout=2).read()
 143      except Exception as e:
 144          print(f"warn: metrics reset failed: {e}", file=sys.stderr)
 145  
 146      elapsed, results = asyncio.run(
 147          run_stress(ws_url, args.conns, args.events_per_conn, args.content_size)
 148      )
 149  
 150      all_lat = []
 151      accepted = 0
 152      rejected = 0
 153      for r in results:
 154          all_lat.extend(r["latencies_ns"])
 155          accepted += r["accepted"]
 156          rejected += r["rejected"]
 157      total_evts = accepted + rejected
 158  
 159      print()
 160      print("=== client-side ===")
 161      print(f"wall:        {elapsed*1000:.1f} ms")
 162      print(f"events:      {total_evts} (accepted={accepted}, rejected={rejected})")
 163      print(f"throughput:  {total_evts/elapsed:.0f} evt/s")
 164      if all_lat:
 165          print(f"OK rtt p50:  {fmt_ns(percentile(all_lat, 50))}")
 166          print(f"OK rtt p95:  {fmt_ns(percentile(all_lat, 95))}")
 167          print(f"OK rtt p99:  {fmt_ns(percentile(all_lat, 99))}")
 168          print(f"OK rtt mean: {fmt_ns(int(statistics.mean(all_lat)))}")
 169          print(f"OK rtt max:  {fmt_ns(max(all_lat))}")
 170  
 171      # Scrape server metrics
 172      print()
 173      print("=== server-side metrics ===")
 174      try:
 175          with urllib.request.urlopen(f"{base}/metrics", timeout=2) as f:
 176              data = json.load(f)
 177          for name, h in data.items():
 178              count = h.get("count", 0)
 179              if count == 0:
 180                  continue
 181              print(f"{name}:")
 182              print(f"  count:  {count}")
 183              print(f"  avg:    {fmt_ns(h.get('avg_ns', 0))}")
 184              print(f"  p50:    {fmt_ns(h.get('p50_ns', 0))}")
 185              print(f"  p95:    {fmt_ns(h.get('p95_ns', 0))}")
 186              print(f"  p99:    {fmt_ns(h.get('p99_ns', 0))}")
 187      except Exception as e:
 188          print(f"  (scrape failed: {e})")
 189  
 190  
 191  if __name__ == "__main__":
 192      main()
 193