stress_metrics.py raw
1 #!/usr/bin/env python3
2 """B.0.7 stress test driver.
3
4 Spawns N parallel WebSocket clients against the local relay, each
5 firing M signed events. Measures wall-clock throughput and OK round-
6 trip latency, then scrapes /metrics for server-side histograms.
7
8 Usage:
9 test/stress_metrics.py --port 18099 --conns 50 --events-per-conn 200
10 """
11 import argparse
12 import asyncio
13 import hashlib
14 import json
15 import secrets
16 import statistics
17 import sys
18 import time
19 import urllib.request
20
21 # Lazy import — only required when actually running.
22 try:
23 import websockets
24 except ImportError:
25 print("error: pip install websockets coincurve", file=sys.stderr)
26 sys.exit(1)
27 try:
28 from coincurve import PrivateKey
29 except ImportError:
30 print("error: pip install coincurve", file=sys.stderr)
31 sys.exit(1)
32
33
34 def make_event(privkey, kind=1, content_size=200, ref=None):
35 """Construct a signed Nostr event. BIP340 Schnorr signature."""
36 pubkey_bytes = privkey.public_key.format(compressed=True)[1:] # x-only
37 pubkey_hex = pubkey_bytes.hex()
38
39 created_at = int(time.time())
40 content = secrets.token_hex(content_size // 2)
41 tags = []
42 if ref is not None:
43 tags = [["e", ref]]
44
45 serialized = json.dumps(
46 [0, pubkey_hex, created_at, kind, tags, content], separators=(",", ":"), ensure_ascii=False,
47 ).encode("utf-8")
48 eid = hashlib.sha256(serialized).hexdigest()
49 eid_bytes = bytes.fromhex(eid)
50
51 sig = privkey.sign_schnorr(eid_bytes, aux_randomness=secrets.token_bytes(32)).hex()
52
53 return {
54 "id": eid,
55 "pubkey": pubkey_hex,
56 "created_at": created_at,
57 "kind": kind,
58 "tags": tags,
59 "content": content,
60 "sig": sig,
61 }
62
63
64 async def conn_worker(idx, url, events_per_conn, content_size, results):
65 """One WebSocket connection sending events_per_conn events."""
66 privkey = PrivateKey(secrets.token_bytes(32))
67 latencies = []
68 rejected = 0
69 accepted = 0
70
71 async with websockets.connect(url, max_size=20 * 1024 * 1024) as ws:
72 for i in range(events_per_conn):
73 ev = make_event(privkey, content_size=content_size)
74 send_t = time.perf_counter_ns()
75 await ws.send(json.dumps(["EVENT", ev]))
76 ok = await ws.recv()
77 recv_t = time.perf_counter_ns()
78 latencies.append(recv_t - send_t)
79 try:
80 msg = json.loads(ok)
81 if msg[0] == "OK" and msg[2] is True:
82 accepted += 1
83 else:
84 rejected += 1
85 except Exception:
86 rejected += 1
87
88 results.append({
89 "conn": idx,
90 "latencies_ns": latencies,
91 "accepted": accepted,
92 "rejected": rejected,
93 })
94
95
96 async def run_stress(url, conns, events_per_conn, content_size):
97 print(f"connecting {conns} clients, {events_per_conn} events each, content={content_size}B")
98 tasks = []
99 results = []
100 t0 = time.perf_counter()
101 for idx in range(conns):
102 tasks.append(asyncio.create_task(
103 conn_worker(idx, url, events_per_conn, content_size, results)
104 ))
105 await asyncio.gather(*tasks)
106 elapsed = time.perf_counter() - t0
107 return elapsed, results
108
109
110 def percentile(latencies, p):
111 if not latencies:
112 return 0
113 s = sorted(latencies)
114 idx = max(0, int(len(s) * p / 100) - 1)
115 return s[min(idx, len(s) - 1)]
116
117
118 def fmt_ns(n):
119 if n < 1_000:
120 return f"{n}ns"
121 if n < 1_000_000:
122 return f"{n/1_000:.1f}µs"
123 if n < 1_000_000_000:
124 return f"{n/1_000_000:.2f}ms"
125 return f"{n/1_000_000_000:.3f}s"
126
127
128 def main():
129 ap = argparse.ArgumentParser()
130 ap.add_argument("--port", type=int, default=18099)
131 ap.add_argument("--host", default="127.0.0.1")
132 ap.add_argument("--conns", type=int, default=50)
133 ap.add_argument("--events-per-conn", type=int, default=200)
134 ap.add_argument("--content-size", type=int, default=200)
135 args = ap.parse_args()
136
137 base = f"http://{args.host}:{args.port}"
138 ws_url = f"ws://{args.host}:{args.port}/"
139
140 # Reset server-side metrics so we measure only this run.
141 try:
142 urllib.request.urlopen(f"{base}/metrics/reset", timeout=2).read()
143 except Exception as e:
144 print(f"warn: metrics reset failed: {e}", file=sys.stderr)
145
146 elapsed, results = asyncio.run(
147 run_stress(ws_url, args.conns, args.events_per_conn, args.content_size)
148 )
149
150 all_lat = []
151 accepted = 0
152 rejected = 0
153 for r in results:
154 all_lat.extend(r["latencies_ns"])
155 accepted += r["accepted"]
156 rejected += r["rejected"]
157 total_evts = accepted + rejected
158
159 print()
160 print("=== client-side ===")
161 print(f"wall: {elapsed*1000:.1f} ms")
162 print(f"events: {total_evts} (accepted={accepted}, rejected={rejected})")
163 print(f"throughput: {total_evts/elapsed:.0f} evt/s")
164 if all_lat:
165 print(f"OK rtt p50: {fmt_ns(percentile(all_lat, 50))}")
166 print(f"OK rtt p95: {fmt_ns(percentile(all_lat, 95))}")
167 print(f"OK rtt p99: {fmt_ns(percentile(all_lat, 99))}")
168 print(f"OK rtt mean: {fmt_ns(int(statistics.mean(all_lat)))}")
169 print(f"OK rtt max: {fmt_ns(max(all_lat))}")
170
171 # Scrape server metrics
172 print()
173 print("=== server-side metrics ===")
174 try:
175 with urllib.request.urlopen(f"{base}/metrics", timeout=2) as f:
176 data = json.load(f)
177 for name, h in data.items():
178 count = h.get("count", 0)
179 if count == 0:
180 continue
181 print(f"{name}:")
182 print(f" count: {count}")
183 print(f" avg: {fmt_ns(h.get('avg_ns', 0))}")
184 print(f" p50: {fmt_ns(h.get('p50_ns', 0))}")
185 print(f" p95: {fmt_ns(h.get('p95_ns', 0))}")
186 print(f" p99: {fmt_ns(h.get('p99_ns', 0))}")
187 except Exception as e:
188 print(f" (scrape failed: {e})")
189
190
191 if __name__ == "__main__":
192 main()
193