#!/usr/bin/env python3 """B.0.7 stress test driver. Spawns N parallel WebSocket clients against the local relay, each firing M signed events. Measures wall-clock throughput and OK round- trip latency, then scrapes /metrics for server-side histograms. Usage: test/stress_metrics.py --port 18099 --conns 50 --events-per-conn 200 """ import argparse import asyncio import hashlib import json import secrets import statistics import sys import time import urllib.request # Lazy import — only required when actually running. try: import websockets except ImportError: print("error: pip install websockets coincurve", file=sys.stderr) sys.exit(1) try: from coincurve import PrivateKey except ImportError: print("error: pip install coincurve", file=sys.stderr) sys.exit(1) def make_event(privkey, kind=1, content_size=200, ref=None): """Construct a signed Nostr event. BIP340 Schnorr signature.""" pubkey_bytes = privkey.public_key.format(compressed=True)[1:] # x-only pubkey_hex = pubkey_bytes.hex() created_at = int(time.time()) content = secrets.token_hex(content_size // 2) tags = [] if ref is not None: tags = [["e", ref]] serialized = json.dumps( [0, pubkey_hex, created_at, kind, tags, content], separators=(",", ":"), ensure_ascii=False, ).encode("utf-8") eid = hashlib.sha256(serialized).hexdigest() eid_bytes = bytes.fromhex(eid) sig = privkey.sign_schnorr(eid_bytes, aux_randomness=secrets.token_bytes(32)).hex() return { "id": eid, "pubkey": pubkey_hex, "created_at": created_at, "kind": kind, "tags": tags, "content": content, "sig": sig, } async def conn_worker(idx, url, events_per_conn, content_size, results): """One WebSocket connection sending events_per_conn events.""" privkey = PrivateKey(secrets.token_bytes(32)) latencies = [] rejected = 0 accepted = 0 async with websockets.connect(url, max_size=20 * 1024 * 1024) as ws: for i in range(events_per_conn): ev = make_event(privkey, content_size=content_size) send_t = time.perf_counter_ns() await ws.send(json.dumps(["EVENT", ev])) ok = await ws.recv() recv_t = time.perf_counter_ns() latencies.append(recv_t - send_t) try: msg = json.loads(ok) if msg[0] == "OK" and msg[2] is True: accepted += 1 else: rejected += 1 except Exception: rejected += 1 results.append({ "conn": idx, "latencies_ns": latencies, "accepted": accepted, "rejected": rejected, }) async def run_stress(url, conns, events_per_conn, content_size): print(f"connecting {conns} clients, {events_per_conn} events each, content={content_size}B") tasks = [] results = [] t0 = time.perf_counter() for idx in range(conns): tasks.append(asyncio.create_task( conn_worker(idx, url, events_per_conn, content_size, results) )) await asyncio.gather(*tasks) elapsed = time.perf_counter() - t0 return elapsed, results def percentile(latencies, p): if not latencies: return 0 s = sorted(latencies) idx = max(0, int(len(s) * p / 100) - 1) return s[min(idx, len(s) - 1)] def fmt_ns(n): if n < 1_000: return f"{n}ns" if n < 1_000_000: return f"{n/1_000:.1f}µs" if n < 1_000_000_000: return f"{n/1_000_000:.2f}ms" return f"{n/1_000_000_000:.3f}s" def main(): ap = argparse.ArgumentParser() ap.add_argument("--port", type=int, default=18099) ap.add_argument("--host", default="127.0.0.1") ap.add_argument("--conns", type=int, default=50) ap.add_argument("--events-per-conn", type=int, default=200) ap.add_argument("--content-size", type=int, default=200) args = ap.parse_args() base = f"http://{args.host}:{args.port}" ws_url = f"ws://{args.host}:{args.port}/" # Reset server-side metrics so we measure only this run. try: urllib.request.urlopen(f"{base}/metrics/reset", timeout=2).read() except Exception as e: print(f"warn: metrics reset failed: {e}", file=sys.stderr) elapsed, results = asyncio.run( run_stress(ws_url, args.conns, args.events_per_conn, args.content_size) ) all_lat = [] accepted = 0 rejected = 0 for r in results: all_lat.extend(r["latencies_ns"]) accepted += r["accepted"] rejected += r["rejected"] total_evts = accepted + rejected print() print("=== client-side ===") print(f"wall: {elapsed*1000:.1f} ms") print(f"events: {total_evts} (accepted={accepted}, rejected={rejected})") print(f"throughput: {total_evts/elapsed:.0f} evt/s") if all_lat: print(f"OK rtt p50: {fmt_ns(percentile(all_lat, 50))}") print(f"OK rtt p95: {fmt_ns(percentile(all_lat, 95))}") print(f"OK rtt p99: {fmt_ns(percentile(all_lat, 99))}") print(f"OK rtt mean: {fmt_ns(int(statistics.mean(all_lat)))}") print(f"OK rtt max: {fmt_ns(max(all_lat))}") # Scrape server metrics print() print("=== server-side metrics ===") try: with urllib.request.urlopen(f"{base}/metrics", timeout=2) as f: data = json.load(f) for name, h in data.items(): count = h.get("count", 0) if count == 0: continue print(f"{name}:") print(f" count: {count}") print(f" avg: {fmt_ns(h.get('avg_ns', 0))}") print(f" p50: {fmt_ns(h.get('p50_ns', 0))}") print(f" p95: {fmt_ns(h.get('p95_ns', 0))}") print(f" p99: {fmt_ns(h.get('p99_ns', 0))}") except Exception as e: print(f" (scrape failed: {e})") if __name__ == "__main__": main()