"""Smoke tests — relay serves static files, app loads, basic WebSocket.""" import json import time import urllib.request import pytest from conftest import RELAY_URL, WS_URL class TestRelayHTTP: """Verify the relay serves static files and NIP-11.""" def test_index_html(self, relay): resp = urllib.request.urlopen(f"{relay['url']}/") html = resp.read().decode() assert "" in html assert 'id="app-root"' in html def test_entry_mjs(self, relay): resp = urllib.request.urlopen(f"{relay['url']}/$entry.mjs") assert resp.status == 200 body = resp.read().decode() assert "$start" in body or "import" in body def test_style_css(self, relay): resp = urllib.request.urlopen(f"{relay['url']}/style.css") assert resp.status == 200 body = resp.read().decode() assert "--bg" in body def test_sw_register(self, relay): resp = urllib.request.urlopen(f"{relay['url']}/sw-register.js") assert resp.status == 200 def test_sw_entry(self, relay): resp = urllib.request.urlopen(f"{relay['url']}/$sw/$entry.mjs") assert resp.status == 200 def test_nip11(self, relay): req = urllib.request.Request( relay["url"], headers={"Accept": "application/nostr+json"}, ) resp = urllib.request.urlopen(req) data = json.loads(resp.read()) assert data["name"] == "smesh" assert 1 in data["supported_nips"] def test_spa_fallback(self, relay): """Unknown paths serve index.html (SPA fallback) instead of 404.""" resp = urllib.request.urlopen(f"{relay['url']}/nonexistent") body = resp.read().decode() assert "app-root" in body, "SPA fallback should serve index.html" class TestRelayWebSocket: """Basic WebSocket connectivity (REQ → EOSE).""" def test_req_eose(self, relay): """Connect via WebSocket, send REQ, expect EOSE back.""" import socket import hashlib import base64 import os host = relay["host"] port = relay["port"] sock = socket.create_connection((host, port), timeout=5) # WebSocket handshake. ws_key = base64.b64encode(os.urandom(16)).decode() handshake = ( f"GET / HTTP/1.1\r\n" f"Host: {host}:{port}\r\n" f"Upgrade: websocket\r\n" f"Connection: Upgrade\r\n" f"Sec-WebSocket-Key: {ws_key}\r\n" f"Sec-WebSocket-Version: 13\r\n" f"\r\n" ) sock.sendall(handshake.encode()) # Read upgrade response. resp = b"" while b"\r\n\r\n" not in resp: resp += sock.recv(4096) assert b"101" in resp # Send REQ frame. req = json.dumps(["REQ", "smoke-test", {"limit": 1}]).encode() _ws_send_text(sock, req) # Read EOSE. data = _ws_read_frame(sock) msg = json.loads(data) # Could be EVENT or EOSE — we want at least EOSE eventually. while msg[0] != "EOSE": data = _ws_read_frame(sock) msg = json.loads(data) assert msg[0] == "EOSE" assert msg[1] == "smoke-test" # Send CLOSE subscription before disconnecting. close_msg = json.dumps(["CLOSE", "smoke-test"]).encode() _ws_send_text(sock, close_msg) time.sleep(0.1) # Drop connection (no WS close frame — moxie netpoller bug with close frames). sock.close() def _ws_send_close(sock): """Send a WebSocket close frame (client-masked).""" mask = os.urandom(4) # Close frame: FIN + opcode 0x8, masked, 2-byte status code 1000. import struct payload = struct.pack(">H", 1000) header = bytearray([0x88, 0x80 | len(payload)]) header.extend(mask) masked = bytearray(payload[i] ^ mask[i % 4] for i in range(len(payload))) try: sock.sendall(bytes(header) + bytes(masked)) except Exception: pass def _ws_send_text(sock, payload): """Send a WebSocket text frame (client-masked).""" import struct mask = os.urandom(4) header = bytearray() header.append(0x81) # FIN + text length = len(payload) if length < 126: header.append(0x80 | length) elif length < 65536: header.append(0x80 | 126) header.extend(struct.pack(">H", length)) else: header.append(0x80 | 127) header.extend(struct.pack(">Q", length)) header.extend(mask) masked = bytearray(payload[i] ^ mask[i % 4] for i in range(len(payload))) sock.sendall(bytes(header) + bytes(masked)) def _ws_read_frame(sock): """Read a single WebSocket text frame (unmasked from server).""" import struct hdr = _recv_exact(sock, 2) length = hdr[1] & 0x7F if length == 126: length = struct.unpack(">H", _recv_exact(sock, 2))[0] elif length == 127: length = struct.unpack(">Q", _recv_exact(sock, 8))[0] return _recv_exact(sock, length) def _recv_exact(sock, n): buf = bytearray() while len(buf) < n: chunk = sock.recv(n - len(buf)) if not chunk: raise ConnectionError("socket closed") buf.extend(chunk) return bytes(buf) import os