"""Relay policy tests — connection limits, auth gating, HTTP guard, CORS, health. Each test class starts its own relay with the specific env vars it needs. Uses raw sockets for WebSocket to avoid test framework dependencies. """ import json import os import signal import socket import struct import subprocess import time import urllib.request import urllib.error import pytest from nostr_helpers import make_event, TEST_SECKEY, TEST_PUBKEY PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) RELAY_BIN = os.path.join(PROJECT_ROOT, "smesh") STATIC_DIR = os.path.join(PROJECT_ROOT, "web", "static") HOST = "127.0.0.1" def _fresh_data_dir(name): d = f"/tmp/smesh-test-{name}-{os.getpid()}" os.makedirs(d, exist_ok=True) return d def _wait_port_free(port, timeout=5): """Wait until nothing is listening on port.""" deadline = time.time() + timeout while time.time() < deadline: try: s = socket.create_connection((HOST, port), timeout=0.2) s.close() time.sleep(0.1) except (ConnectionRefusedError, OSError, socket.timeout): return raise RuntimeError(f"port {port} still in use after {timeout}s") def _start_relay(port, extra_env=None, data_dir_name=None): """Start a relay and wait for it to accept connections.""" if not os.path.exists(RELAY_BIN): pytest.skip("Relay binary not found") _wait_port_free(port) name = data_dir_name or f"policy-{port}" data_dir = _fresh_data_dir(name) env = os.environ.copy() env["ORLY_DATA_DIR"] = data_dir env["ORLY_LISTEN"] = HOST env["ORLY_PORT"] = str(port) env["ORLY_STATIC_DIR"] = STATIC_DIR env["ORLY_LOG_TO_STDOUT"] = "true" if extra_env: env.update(extra_env) proc = subprocess.Popen( [RELAY_BIN], env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) for _ in range(40): try: s = socket.create_connection((HOST, port), timeout=0.5) s.close() break except Exception: if proc.poll() is not None: out = proc.stderr.read().decode(errors="replace") pytest.fail(f"Relay on :{port} exited early: {out[:500]}") time.sleep(0.15) else: proc.kill() out = proc.stderr.read().decode(errors="replace") pytest.fail(f"Relay on :{port} failed to start: {out[:500]}") time.sleep(0.05) return proc, data_dir def _stop_relay(proc): proc.send_signal(signal.SIGTERM) try: proc.wait(timeout=3) except subprocess.TimeoutExpired: proc.kill() proc.wait(timeout=2) # ── WebSocket helpers ──────────────────────────────── def _ws_handshake(host, port): """Open a raw TCP socket and perform a WS upgrade. Returns the socket.""" sock = socket.create_connection((host, port), timeout=5) import base64 ws_key = base64.b64encode(os.urandom(16)).decode() handshake = ( f"GET / HTTP/1.1\r\n" f"Host: {host}:{port}\r\n" f"Upgrade: websocket\r\n" f"Connection: Upgrade\r\n" f"Sec-WebSocket-Key: {ws_key}\r\n" f"Sec-WebSocket-Version: 13\r\n" f"\r\n" ) sock.sendall(handshake.encode()) resp = b"" while b"\r\n\r\n" not in resp: resp += sock.recv(4096) if b"101" not in resp: sock.close() raise ConnectionError(f"WS upgrade failed: {resp[:200]}") return sock def _ws_send(sock, payload): """Send a masked WS text frame.""" mask = os.urandom(4) header = bytearray() header.append(0x81) length = len(payload) if length < 126: header.append(0x80 | length) elif length < 65536: header.append(0x80 | 126) header.extend(struct.pack(">H", length)) else: header.append(0x80 | 127) header.extend(struct.pack(">Q", length)) header.extend(mask) masked = bytearray(payload[i] ^ mask[i % 4] for i in range(len(payload))) sock.sendall(bytes(header) + bytes(masked)) def _ws_recv(sock, timeout=5): """Read one WS frame (unmasked from server). Returns payload bytes.""" sock.settimeout(timeout) hdr = _recv_exact(sock, 2) length = hdr[1] & 0x7F if length == 126: length = struct.unpack(">H", _recv_exact(sock, 2))[0] elif length == 127: length = struct.unpack(">Q", _recv_exact(sock, 8))[0] return _recv_exact(sock, length) def _recv_exact(sock, n): buf = bytearray() while len(buf) < n: chunk = sock.recv(n - len(buf)) if not chunk: raise ConnectionError("socket closed") buf.extend(chunk) return bytes(buf) def _ws_send_json(sock, obj): _ws_send(sock, json.dumps(obj).encode()) def _ws_recv_json(sock, timeout=5): return json.loads(_ws_recv(sock, timeout)) def _drain_until(sock, label, timeout=5): """Read WS messages until one with msg[0]==label is found. Returns it.""" deadline = time.time() + timeout while time.time() < deadline: msg = _ws_recv_json(sock, timeout=max(0.5, deadline - time.time())) if msg[0] == label: return msg raise TimeoutError(f"never received {label}") # ── Tests ──────────────────────────────────────────── class TestHealthEndpoint: """GET /health returns 200 ok.""" def test_health(self): port = 24001 proc, _ = _start_relay(port) try: resp = urllib.request.urlopen(f"http://{HOST}:{port}/health", timeout=3) assert resp.status == 200 assert resp.read() == b"ok" finally: _stop_relay(proc) class TestNIP11: """NIP-11 relay info respects ORLY_APP_NAME.""" def test_custom_name(self): port = 24002 proc, _ = _start_relay(port, {"ORLY_APP_NAME": "TestRelay"}) try: req = urllib.request.Request( f"http://{HOST}:{port}/", headers={"Accept": "application/nostr+json"}, ) resp = urllib.request.urlopen(req, timeout=3) data = json.loads(resp.read()) assert data["name"] == "TestRelay" assert 42 in data["supported_nips"] assert 50 in data["supported_nips"] finally: _stop_relay(proc) def test_default_name(self): port = 24003 proc, _ = _start_relay(port) try: req = urllib.request.Request( f"http://{HOST}:{port}/", headers={"Accept": "application/nostr+json"}, ) resp = urllib.request.urlopen(req, timeout=3) data = json.loads(resp.read()) assert data["name"] == "ORLY" finally: _stop_relay(proc) class TestCORS: """CORS preflight responses.""" def test_options_returns_cors(self): port = 24004 proc, _ = _start_relay(port, { "ORLY_CORS_ENABLED": "true", }) try: req = urllib.request.Request( f"http://{HOST}:{port}/health", method="OPTIONS", headers={"Origin": "http://example.com"}, ) resp = urllib.request.urlopen(req, timeout=3) assert resp.status == 204 assert "access-control-allow-origin" in { k.lower(): v for k, v in resp.headers.items() } finally: _stop_relay(proc) class TestGlobalConnLimit: """ORLY_MAX_GLOBAL_CONNECTIONS limits total connections.""" def test_excess_connection_rejected(self): port = 24005 proc, _ = _start_relay(port, { "ORLY_MAX_GLOBAL_CONNECTIONS": "2", }) try: socks = [] for _ in range(2): s = _ws_handshake(HOST, port) socks.append(s) # Third connection should be rejected (TCP close). try: s3 = socket.create_connection((HOST, port), timeout=2) # Send HTTP request — if global limit works, server closes this. s3.sendall(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n") s3.settimeout(2) data = s3.recv(4096) # Connection either closed or gets no WS upgrade s3.close() except (ConnectionError, socket.timeout, OSError): pass # expected — rejected for s in socks: s.close() finally: _stop_relay(proc) class TestPerIPConnLimit: """ORLY_MAX_CONN_PER_IP limits connections per IP.""" def test_excess_per_ip_rejected(self): port = 24006 proc, _ = _start_relay(port, { "ORLY_MAX_CONN_PER_IP": "2", }) try: socks = [] for _ in range(2): s = _ws_handshake(HOST, port) socks.append(s) # Third from same IP should fail. try: s3 = socket.create_connection((HOST, port), timeout=2) s3.sendall(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n") s3.settimeout(2) data = s3.recv(4096) s3.close() except (ConnectionError, socket.timeout, OSError): pass # expected for s in socks: s.close() finally: _stop_relay(proc) class TestHTTPGuard: """HTTP guard rate limiting and bot blocking.""" def test_bot_user_agent_blocked(self): port = 24007 proc, _ = _start_relay(port, { "ORLY_HTTP_GUARD_ENABLED": "true", "ORLY_HTTP_GUARD_BOT_BLOCK": "true", }) try: req = urllib.request.Request( f"http://{HOST}:{port}/health", headers={"User-Agent": "Mozilla/5.0 (compatible; AhrefsBot/7.0)"}, ) try: resp = urllib.request.urlopen(req, timeout=3) # Should get 403 forbidden assert resp.status == 403 except urllib.error.HTTPError as e: assert e.code == 403 finally: _stop_relay(proc) def test_normal_user_agent_allowed(self): port = 24008 proc, _ = _start_relay(port, { "ORLY_HTTP_GUARD_ENABLED": "true", "ORLY_HTTP_GUARD_BOT_BLOCK": "true", }) try: req = urllib.request.Request( f"http://{HOST}:{port}/health", headers={"User-Agent": "Mozilla/5.0 Firefox"}, ) resp = urllib.request.urlopen(req, timeout=3) assert resp.status == 200 finally: _stop_relay(proc) class TestOpenRelay: """Default open relay — no auth required, events flow freely.""" def test_publish_and_query(self): port = 24010 proc, _ = _start_relay(port) try: ws = _ws_handshake(HOST, port) # Drain any AUTH challenge (only sent if RelayURL is set). time.sleep(0.2) ev = make_event(TEST_SECKEY, "open-relay-test", kind=1) _ws_send_json(ws, ["EVENT", ev]) ok = _drain_until(ws, "OK") assert ok[2] is True, f"rejected: {ok}" # Query it back. _ws_send_json(ws, ["REQ", "q1", {"ids": [ev["id"]]}]) msg = _drain_until(ws, "EVENT") assert msg[2]["id"] == ev["id"] _drain_until(ws, "EOSE") _ws_send_json(ws, ["CLOSE", "q1"]) ws.close() finally: _stop_relay(proc) class TestAuthToWrite: """ORLY_AUTH_TO_WRITE=true requires NIP-42 auth before EVENT.""" def test_unauthed_event_rejected(self): port = 24011 proc, _ = _start_relay(port, { "ORLY_RELAY_URL": f"ws://{HOST}:{port}", "ORLY_AUTH_TO_WRITE": "true", "ORLY_FREE_WRITE_LIMIT": "0", }) try: ws = _ws_handshake(HOST, port) # Drain AUTH challenge. auth_msg = _drain_until(ws, "AUTH") assert auth_msg[0] == "AUTH" # Send event without authenticating. ev = make_event(TEST_SECKEY, "should-be-rejected", kind=1) _ws_send_json(ws, ["EVENT", ev]) ok = _drain_until(ws, "OK") assert ok[2] is False, "unauthed event should be rejected" assert "auth-required" in ok[3] ws.close() finally: _stop_relay(proc) class TestAuthRequired: """ORLY_AUTH_REQUIRED=true requires auth for REQ/COUNT.""" def test_unauthed_req_closed(self): port = 24012 proc, _ = _start_relay(port, { "ORLY_RELAY_URL": f"ws://{HOST}:{port}", "ORLY_AUTH_REQUIRED": "true", }) try: ws = _ws_handshake(HOST, port) _drain_until(ws, "AUTH") _ws_send_json(ws, ["REQ", "test-sub", {"limit": 1}]) msg = _drain_until(ws, "CLOSED", timeout=10) assert "auth-required" in msg[2] ws.close() finally: _stop_relay(proc) class TestFreeWriteLimit: """ORLY_FREE_WRITE_LIMIT caps unauthed writes per IP.""" def test_free_writes_then_rate_limit(self): port = 24013 proc, _ = _start_relay(port, { "ORLY_RELAY_URL": f"ws://{HOST}:{port}", "ORLY_AUTH_TO_WRITE": "true", "ORLY_FREE_WRITE_LIMIT": "3", "ORLY_FREE_WRITE_WINDOW": "3600", }) try: ws = _ws_handshake(HOST, port) _drain_until(ws, "AUTH") # First 3 should succeed (free write bucket). for i in range(3): ev = make_event(TEST_SECKEY, f"free-{i}", kind=1) _ws_send_json(ws, ["EVENT", ev]) ok = _drain_until(ws, "OK") assert ok[2] is True, f"event {i} rejected: {ok}" # 4th should be rate-limited. ev = make_event(TEST_SECKEY, "free-overflow", kind=1) _ws_send_json(ws, ["EVENT", ev]) ok = _drain_until(ws, "OK") assert ok[2] is False assert "rate-limited" in ok[3] ws.close() finally: _stop_relay(proc) class TestSubscriptionLimit: """ORLY_MAX_SUBSCRIPTIONS limits active subscriptions per connection.""" def test_excess_sub_closed(self): port = 24014 proc, _ = _start_relay(port, { "ORLY_MAX_SUBSCRIPTIONS": "3", }) try: ws = _ws_handshake(HOST, port) time.sleep(0.2) # Open 3 subscriptions. for i in range(3): _ws_send_json(ws, ["REQ", f"sub-{i}", {"limit": 1}]) _drain_until(ws, "EOSE") # 4th should be rejected. _ws_send_json(ws, ["REQ", "sub-overflow", {"limit": 1}]) msg = _drain_until(ws, "CLOSED") assert "too many" in msg[2] ws.close() finally: _stop_relay(proc) class TestQueryResultLimit: """ORLY_QUERY_RESULT_LIMIT caps events returned per REQ.""" def test_result_capped(self): port = 24015 proc, _ = _start_relay(port, { "ORLY_QUERY_RESULT_LIMIT": "5", }) try: ws = _ws_handshake(HOST, port) time.sleep(0.2) # Publish 10 events. for i in range(10): ev = make_event(TEST_SECKEY, f"limit-test-{i}", kind=1, created_at=int(time.time()) - 100 + i) _ws_send_json(ws, ["EVENT", ev]) _drain_until(ws, "OK") # Query all — should get at most 5. _ws_send_json(ws, ["REQ", "q-limit", {"authors": [TEST_PUBKEY], "kinds": [1]}]) count = 0 while True: msg = _ws_recv_json(ws) if msg[0] == "EOSE": break if msg[0] == "EVENT": count += 1 assert count <= 5, f"got {count} events, expected at most 5" _ws_send_json(ws, ["CLOSE", "q-limit"]) ws.close() finally: _stop_relay(proc) class TestLiveSubscription: """Live subscription receives broadcast events.""" def test_live_delivery(self): port = 24016 proc, _ = _start_relay(port) try: # Subscriber connects and subscribes. sub_ws = _ws_handshake(HOST, port) time.sleep(0.1) _ws_send_json(sub_ws, ["REQ", "live", {"authors": [TEST_PUBKEY], "kinds": [1]}]) _drain_until(sub_ws, "EOSE") # Publisher sends an event. pub_ws = _ws_handshake(HOST, port) time.sleep(0.1) ev = make_event(TEST_SECKEY, "live-broadcast-test", kind=1) _ws_send_json(pub_ws, ["EVENT", ev]) ok = _drain_until(pub_ws, "OK") assert ok[2] is True # Subscriber should get it. msg = _ws_recv_json(sub_ws, timeout=5) assert msg[0] == "EVENT" assert msg[2]["content"] == "live-broadcast-test" _ws_send_json(sub_ws, ["CLOSE", "live"]) sub_ws.close() pub_ws.close() finally: _stop_relay(proc) class TestReplaceableEvents: """NIP-01 replaceable event handling — newer replaces older.""" def test_replaceable_kind_overwrites(self): port = 24017 proc, _ = _start_relay(port) try: ws = _ws_handshake(HOST, port) time.sleep(0.1) now = int(time.time()) # Kind 0 (metadata) is replaceable. ev1 = make_event(TEST_SECKEY, '{"name":"old"}', kind=0, created_at=now - 10) _ws_send_json(ws, ["EVENT", ev1]) ok1 = _drain_until(ws, "OK") assert ok1[2] is True ev2 = make_event(TEST_SECKEY, '{"name":"new"}', kind=0, created_at=now) _ws_send_json(ws, ["EVENT", ev2]) ok2 = _drain_until(ws, "OK") assert ok2[2] is True # Query — should only get the newer one. _ws_send_json(ws, ["REQ", "rep", {"authors": [TEST_PUBKEY], "kinds": [0]}]) events = [] while True: msg = _ws_recv_json(ws) if msg[0] == "EOSE": break if msg[0] == "EVENT": events.append(msg[2]) assert len(events) == 1 assert events[0]["content"] == '{"name":"new"}' _ws_send_json(ws, ["CLOSE", "rep"]) ws.close() finally: _stop_relay(proc) class TestEphemeralEvents: """NIP-16 ephemeral events are accepted but not stored.""" def test_ephemeral_not_stored(self): port = 24018 proc, _ = _start_relay(port) try: ws = _ws_handshake(HOST, port) time.sleep(0.1) # Kind 20000-29999 are ephemeral. ev = make_event(TEST_SECKEY, "ephemeral-test", kind=20001) _ws_send_json(ws, ["EVENT", ev]) ok = _drain_until(ws, "OK") assert ok[2] is True # Query — should not find it. _ws_send_json(ws, ["REQ", "eph", {"ids": [ev["id"]]}]) msg = _drain_until(ws, "EOSE") # No EVENT should precede the EOSE for this ID. ws.close() finally: _stop_relay(proc) class TestDeletion: """NIP-09 event deletion.""" def test_delete_own_event(self): port = 24019 proc, _ = _start_relay(port) try: ws = _ws_handshake(HOST, port) time.sleep(0.1) ev = make_event(TEST_SECKEY, "to-be-deleted", kind=1) _ws_send_json(ws, ["EVENT", ev]) ok = _drain_until(ws, "OK") assert ok[2] is True # Delete it — kind 5, with "e" tag pointing to the event. del_ev = make_event(TEST_SECKEY, "", kind=5, tags=[["e", ev["id"]]]) _ws_send_json(ws, ["EVENT", del_ev]) ok2 = _drain_until(ws, "OK") assert ok2[2] is True # Query — should not find the original event. _ws_send_json(ws, ["REQ", "del", {"ids": [ev["id"]]}]) msg = _drain_until(ws, "EOSE") _ws_send_json(ws, ["CLOSE", "del"]) ws.close() finally: _stop_relay(proc) class TestNIP42AuthChallenge: """NIP-42 AUTH challenge is sent when RelayURL is configured.""" def test_auth_challenge_sent(self): port = 24020 proc, _ = _start_relay(port, { "ORLY_RELAY_URL": f"ws://{HOST}:{port}", }) try: ws = _ws_handshake(HOST, port) msg = _drain_until(ws, "AUTH") assert msg[0] == "AUTH" assert isinstance(msg[1], str) assert len(msg[1]) == 64 # 32 bytes hex ws.close() finally: _stop_relay(proc) def test_no_challenge_without_relay_url(self): port = 24021 proc, _ = _start_relay(port) try: ws = _ws_handshake(HOST, port) # With no RelayURL, no AUTH challenge. Send REQ instead. _ws_send_json(ws, ["REQ", "nochall", {"limit": 1}]) msg = _ws_recv_json(ws) # Should get EOSE, not AUTH. assert msg[0] in ("EVENT", "EOSE"), f"unexpected: {msg[0]}" ws.close() finally: _stop_relay(proc) class TestEventValidation: """Pipeline rejects malformed events.""" def test_bad_signature_rejected(self): port = 24022 proc, _ = _start_relay(port) try: ws = _ws_handshake(HOST, port) time.sleep(0.1) ev = make_event(TEST_SECKEY, "bad-sig-test", kind=1) # Corrupt the signature. ev["sig"] = "00" * 64 _ws_send_json(ws, ["EVENT", ev]) ok = _drain_until(ws, "OK") assert ok[2] is False assert "invalid" in ok[3] or "bad" in ok[3].lower() or "signature" in ok[3].lower() ws.close() finally: _stop_relay(proc) def test_future_timestamp_rejected(self): port = 24023 proc, _ = _start_relay(port) try: ws = _ws_handshake(HOST, port) time.sleep(0.1) far_future = int(time.time()) + 86400 ev = make_event(TEST_SECKEY, "future-test", kind=1, created_at=far_future) _ws_send_json(ws, ["EVENT", ev]) ok = _drain_until(ws, "OK") assert ok[2] is False assert "future" in ok[3] ws.close() finally: _stop_relay(proc) class TestCloseSubscription: """CLOSE unsubscribes — no more events after CLOSE.""" def test_close_stops_delivery(self): port = 24024 proc, _ = _start_relay(port) try: ws = _ws_handshake(HOST, port) time.sleep(0.1) _ws_send_json(ws, ["REQ", "closeme", {"authors": [TEST_PUBKEY], "kinds": [1]}]) _drain_until(ws, "EOSE") _ws_send_json(ws, ["CLOSE", "closeme"]) time.sleep(0.2) # Publish an event — should NOT arrive on the closed sub. ev = make_event(TEST_SECKEY, "after-close", kind=1) pub_ws = _ws_handshake(HOST, port) _ws_send_json(pub_ws, ["EVENT", ev]) _drain_until(pub_ws, "OK") pub_ws.close() # Try to read — should timeout (no event delivered). try: msg = _ws_recv_json(ws, timeout=1) # If we get something, it should not be an EVENT for closeme. if msg[0] == "EVENT": assert msg[1] != "closeme", "received event on closed subscription" except (socket.timeout, ConnectionError): pass # expected — no message ws.close() finally: _stop_relay(proc) class TestCountEnvelope: """NIP-45 COUNT returns event counts.""" def test_count_returns_number(self): port = 24025 proc, _ = _start_relay(port) try: ws = _ws_handshake(HOST, port) time.sleep(0.1) # Publish a few events. for i in range(3): ev = make_event(TEST_SECKEY, f"count-{i}", kind=1, created_at=int(time.time()) + i) _ws_send_json(ws, ["EVENT", ev]) _drain_until(ws, "OK") _ws_send_json(ws, ["COUNT", "c1", {"authors": [TEST_PUBKEY], "kinds": [1]}]) msg = _drain_until(ws, "COUNT") assert msg[1] == "c1" count = msg[2] if isinstance(msg[2], int) else msg[2].get("count", 0) assert count >= 3 ws.close() finally: _stop_relay(proc)