test_relay_policy.py raw

   1  """Relay policy tests — connection limits, auth gating, HTTP guard, CORS, health.
   2  
   3  Each test class starts its own relay with the specific env vars it needs.
   4  Uses raw sockets for WebSocket to avoid test framework dependencies.
   5  """
   6  
   7  import json
   8  import os
   9  import signal
  10  import socket
  11  import struct
  12  import subprocess
  13  import time
  14  import urllib.request
  15  import urllib.error
  16  
  17  import pytest
  18  
  19  from nostr_helpers import make_event, TEST_SECKEY, TEST_PUBKEY
  20  
  21  PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  22  RELAY_BIN = os.path.join(PROJECT_ROOT, "smesh")
  23  STATIC_DIR = os.path.join(PROJECT_ROOT, "web", "static")
  24  HOST = "127.0.0.1"
  25  
  26  
  27  def _fresh_data_dir(name):
  28      d = f"/tmp/smesh-test-{name}-{os.getpid()}"
  29      os.makedirs(d, exist_ok=True)
  30      return d
  31  
  32  
  33  def _wait_port_free(port, timeout=5):
  34      """Wait until nothing is listening on port."""
  35      deadline = time.time() + timeout
  36      while time.time() < deadline:
  37          try:
  38              s = socket.create_connection((HOST, port), timeout=0.2)
  39              s.close()
  40              time.sleep(0.1)
  41          except (ConnectionRefusedError, OSError, socket.timeout):
  42              return
  43      raise RuntimeError(f"port {port} still in use after {timeout}s")
  44  
  45  
  46  def _start_relay(port, extra_env=None, data_dir_name=None):
  47      """Start a relay and wait for it to accept connections."""
  48      if not os.path.exists(RELAY_BIN):
  49          pytest.skip("Relay binary not found")
  50      _wait_port_free(port)
  51      name = data_dir_name or f"policy-{port}"
  52      data_dir = _fresh_data_dir(name)
  53      env = os.environ.copy()
  54      env["ORLY_DATA_DIR"] = data_dir
  55      env["ORLY_LISTEN"] = HOST
  56      env["ORLY_PORT"] = str(port)
  57      env["ORLY_STATIC_DIR"] = STATIC_DIR
  58      env["ORLY_LOG_TO_STDOUT"] = "true"
  59      if extra_env:
  60          env.update(extra_env)
  61      proc = subprocess.Popen(
  62          [RELAY_BIN],
  63          env=env,
  64          stdout=subprocess.PIPE,
  65          stderr=subprocess.PIPE,
  66      )
  67      for _ in range(40):
  68          try:
  69              s = socket.create_connection((HOST, port), timeout=0.5)
  70              s.close()
  71              break
  72          except Exception:
  73              if proc.poll() is not None:
  74                  out = proc.stderr.read().decode(errors="replace")
  75                  pytest.fail(f"Relay on :{port} exited early: {out[:500]}")
  76              time.sleep(0.15)
  77      else:
  78          proc.kill()
  79          out = proc.stderr.read().decode(errors="replace")
  80          pytest.fail(f"Relay on :{port} failed to start: {out[:500]}")
  81      time.sleep(0.05)
  82      return proc, data_dir
  83  
  84  
  85  def _stop_relay(proc):
  86      proc.send_signal(signal.SIGTERM)
  87      try:
  88          proc.wait(timeout=3)
  89      except subprocess.TimeoutExpired:
  90          proc.kill()
  91          proc.wait(timeout=2)
  92  
  93  
  94  # ── WebSocket helpers ────────────────────────────────
  95  
  96  
  97  def _ws_handshake(host, port):
  98      """Open a raw TCP socket and perform a WS upgrade. Returns the socket."""
  99      sock = socket.create_connection((host, port), timeout=5)
 100      import base64
 101      ws_key = base64.b64encode(os.urandom(16)).decode()
 102      handshake = (
 103          f"GET / HTTP/1.1\r\n"
 104          f"Host: {host}:{port}\r\n"
 105          f"Upgrade: websocket\r\n"
 106          f"Connection: Upgrade\r\n"
 107          f"Sec-WebSocket-Key: {ws_key}\r\n"
 108          f"Sec-WebSocket-Version: 13\r\n"
 109          f"\r\n"
 110      )
 111      sock.sendall(handshake.encode())
 112      resp = b""
 113      while b"\r\n\r\n" not in resp:
 114          resp += sock.recv(4096)
 115      if b"101" not in resp:
 116          sock.close()
 117          raise ConnectionError(f"WS upgrade failed: {resp[:200]}")
 118      return sock
 119  
 120  
 121  def _ws_send(sock, payload):
 122      """Send a masked WS text frame."""
 123      mask = os.urandom(4)
 124      header = bytearray()
 125      header.append(0x81)
 126      length = len(payload)
 127      if length < 126:
 128          header.append(0x80 | length)
 129      elif length < 65536:
 130          header.append(0x80 | 126)
 131          header.extend(struct.pack(">H", length))
 132      else:
 133          header.append(0x80 | 127)
 134          header.extend(struct.pack(">Q", length))
 135      header.extend(mask)
 136      masked = bytearray(payload[i] ^ mask[i % 4] for i in range(len(payload)))
 137      sock.sendall(bytes(header) + bytes(masked))
 138  
 139  
 140  def _ws_recv(sock, timeout=5):
 141      """Read one WS frame (unmasked from server). Returns payload bytes."""
 142      sock.settimeout(timeout)
 143      hdr = _recv_exact(sock, 2)
 144      length = hdr[1] & 0x7F
 145      if length == 126:
 146          length = struct.unpack(">H", _recv_exact(sock, 2))[0]
 147      elif length == 127:
 148          length = struct.unpack(">Q", _recv_exact(sock, 8))[0]
 149      return _recv_exact(sock, length)
 150  
 151  
 152  def _recv_exact(sock, n):
 153      buf = bytearray()
 154      while len(buf) < n:
 155          chunk = sock.recv(n - len(buf))
 156          if not chunk:
 157              raise ConnectionError("socket closed")
 158          buf.extend(chunk)
 159      return bytes(buf)
 160  
 161  
 162  def _ws_send_json(sock, obj):
 163      _ws_send(sock, json.dumps(obj).encode())
 164  
 165  
 166  def _ws_recv_json(sock, timeout=5):
 167      return json.loads(_ws_recv(sock, timeout))
 168  
 169  
 170  def _drain_until(sock, label, timeout=5):
 171      """Read WS messages until one with msg[0]==label is found. Returns it."""
 172      deadline = time.time() + timeout
 173      while time.time() < deadline:
 174          msg = _ws_recv_json(sock, timeout=max(0.5, deadline - time.time()))
 175          if msg[0] == label:
 176              return msg
 177      raise TimeoutError(f"never received {label}")
 178  
 179  
 180  # ── Tests ────────────────────────────────────────────
 181  
 182  
 183  class TestHealthEndpoint:
 184      """GET /health returns 200 ok."""
 185  
 186      def test_health(self):
 187          port = 24001
 188          proc, _ = _start_relay(port)
 189          try:
 190              resp = urllib.request.urlopen(f"http://{HOST}:{port}/health", timeout=3)
 191              assert resp.status == 200
 192              assert resp.read() == b"ok"
 193          finally:
 194              _stop_relay(proc)
 195  
 196  
 197  class TestNIP11:
 198      """NIP-11 relay info respects ORLY_APP_NAME."""
 199  
 200      def test_custom_name(self):
 201          port = 24002
 202          proc, _ = _start_relay(port, {"ORLY_APP_NAME": "TestRelay"})
 203          try:
 204              req = urllib.request.Request(
 205                  f"http://{HOST}:{port}/",
 206                  headers={"Accept": "application/nostr+json"},
 207              )
 208              resp = urllib.request.urlopen(req, timeout=3)
 209              data = json.loads(resp.read())
 210              assert data["name"] == "TestRelay"
 211              assert 42 in data["supported_nips"]
 212              assert 50 in data["supported_nips"]
 213          finally:
 214              _stop_relay(proc)
 215  
 216      def test_default_name(self):
 217          port = 24003
 218          proc, _ = _start_relay(port)
 219          try:
 220              req = urllib.request.Request(
 221                  f"http://{HOST}:{port}/",
 222                  headers={"Accept": "application/nostr+json"},
 223              )
 224              resp = urllib.request.urlopen(req, timeout=3)
 225              data = json.loads(resp.read())
 226              assert data["name"] == "ORLY"
 227          finally:
 228              _stop_relay(proc)
 229  
 230  
 231  class TestCORS:
 232      """CORS preflight responses."""
 233  
 234      def test_options_returns_cors(self):
 235          port = 24004
 236          proc, _ = _start_relay(port, {
 237              "ORLY_CORS_ENABLED": "true",
 238          })
 239          try:
 240              req = urllib.request.Request(
 241                  f"http://{HOST}:{port}/health",
 242                  method="OPTIONS",
 243                  headers={"Origin": "http://example.com"},
 244              )
 245              resp = urllib.request.urlopen(req, timeout=3)
 246              assert resp.status == 204
 247              assert "access-control-allow-origin" in {
 248                  k.lower(): v for k, v in resp.headers.items()
 249              }
 250          finally:
 251              _stop_relay(proc)
 252  
 253  
 254  class TestGlobalConnLimit:
 255      """ORLY_MAX_GLOBAL_CONNECTIONS limits total connections."""
 256  
 257      def test_excess_connection_rejected(self):
 258          port = 24005
 259          proc, _ = _start_relay(port, {
 260              "ORLY_MAX_GLOBAL_CONNECTIONS": "2",
 261          })
 262          try:
 263              socks = []
 264              for _ in range(2):
 265                  s = _ws_handshake(HOST, port)
 266                  socks.append(s)
 267              # Third connection should be rejected (TCP close).
 268              try:
 269                  s3 = socket.create_connection((HOST, port), timeout=2)
 270                  # Send HTTP request — if global limit works, server closes this.
 271                  s3.sendall(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n")
 272                  s3.settimeout(2)
 273                  data = s3.recv(4096)
 274                  # Connection either closed or gets no WS upgrade
 275                  s3.close()
 276              except (ConnectionError, socket.timeout, OSError):
 277                  pass  # expected — rejected
 278              for s in socks:
 279                  s.close()
 280          finally:
 281              _stop_relay(proc)
 282  
 283  
 284  class TestPerIPConnLimit:
 285      """ORLY_MAX_CONN_PER_IP limits connections per IP."""
 286  
 287      def test_excess_per_ip_rejected(self):
 288          port = 24006
 289          proc, _ = _start_relay(port, {
 290              "ORLY_MAX_CONN_PER_IP": "2",
 291          })
 292          try:
 293              socks = []
 294              for _ in range(2):
 295                  s = _ws_handshake(HOST, port)
 296                  socks.append(s)
 297              # Third from same IP should fail.
 298              try:
 299                  s3 = socket.create_connection((HOST, port), timeout=2)
 300                  s3.sendall(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n")
 301                  s3.settimeout(2)
 302                  data = s3.recv(4096)
 303                  s3.close()
 304              except (ConnectionError, socket.timeout, OSError):
 305                  pass  # expected
 306              for s in socks:
 307                  s.close()
 308          finally:
 309              _stop_relay(proc)
 310  
 311  
 312  class TestHTTPGuard:
 313      """HTTP guard rate limiting and bot blocking."""
 314  
 315      def test_bot_user_agent_blocked(self):
 316          port = 24007
 317          proc, _ = _start_relay(port, {
 318              "ORLY_HTTP_GUARD_ENABLED": "true",
 319              "ORLY_HTTP_GUARD_BOT_BLOCK": "true",
 320          })
 321          try:
 322              req = urllib.request.Request(
 323                  f"http://{HOST}:{port}/health",
 324                  headers={"User-Agent": "Mozilla/5.0 (compatible; AhrefsBot/7.0)"},
 325              )
 326              try:
 327                  resp = urllib.request.urlopen(req, timeout=3)
 328                  # Should get 403 forbidden
 329                  assert resp.status == 403
 330              except urllib.error.HTTPError as e:
 331                  assert e.code == 403
 332          finally:
 333              _stop_relay(proc)
 334  
 335      def test_normal_user_agent_allowed(self):
 336          port = 24008
 337          proc, _ = _start_relay(port, {
 338              "ORLY_HTTP_GUARD_ENABLED": "true",
 339              "ORLY_HTTP_GUARD_BOT_BLOCK": "true",
 340          })
 341          try:
 342              req = urllib.request.Request(
 343                  f"http://{HOST}:{port}/health",
 344                  headers={"User-Agent": "Mozilla/5.0 Firefox"},
 345              )
 346              resp = urllib.request.urlopen(req, timeout=3)
 347              assert resp.status == 200
 348          finally:
 349              _stop_relay(proc)
 350  
 351  
 352  class TestOpenRelay:
 353      """Default open relay — no auth required, events flow freely."""
 354  
 355      def test_publish_and_query(self):
 356          port = 24010
 357          proc, _ = _start_relay(port)
 358          try:
 359              ws = _ws_handshake(HOST, port)
 360              # Drain any AUTH challenge (only sent if RelayURL is set).
 361              time.sleep(0.2)
 362  
 363              ev = make_event(TEST_SECKEY, "open-relay-test", kind=1)
 364              _ws_send_json(ws, ["EVENT", ev])
 365              ok = _drain_until(ws, "OK")
 366              assert ok[2] is True, f"rejected: {ok}"
 367  
 368              # Query it back.
 369              _ws_send_json(ws, ["REQ", "q1", {"ids": [ev["id"]]}])
 370              msg = _drain_until(ws, "EVENT")
 371              assert msg[2]["id"] == ev["id"]
 372              _drain_until(ws, "EOSE")
 373              _ws_send_json(ws, ["CLOSE", "q1"])
 374              ws.close()
 375          finally:
 376              _stop_relay(proc)
 377  
 378  
 379  class TestAuthToWrite:
 380      """ORLY_AUTH_TO_WRITE=true requires NIP-42 auth before EVENT."""
 381  
 382      def test_unauthed_event_rejected(self):
 383          port = 24011
 384          proc, _ = _start_relay(port, {
 385              "ORLY_RELAY_URL": f"ws://{HOST}:{port}",
 386              "ORLY_AUTH_TO_WRITE": "true",
 387              "ORLY_FREE_WRITE_LIMIT": "0",
 388          })
 389          try:
 390              ws = _ws_handshake(HOST, port)
 391              # Drain AUTH challenge.
 392              auth_msg = _drain_until(ws, "AUTH")
 393              assert auth_msg[0] == "AUTH"
 394  
 395              # Send event without authenticating.
 396              ev = make_event(TEST_SECKEY, "should-be-rejected", kind=1)
 397              _ws_send_json(ws, ["EVENT", ev])
 398              ok = _drain_until(ws, "OK")
 399              assert ok[2] is False, "unauthed event should be rejected"
 400              assert "auth-required" in ok[3]
 401              ws.close()
 402          finally:
 403              _stop_relay(proc)
 404  
 405  
 406  class TestAuthRequired:
 407      """ORLY_AUTH_REQUIRED=true requires auth for REQ/COUNT."""
 408  
 409      def test_unauthed_req_closed(self):
 410          port = 24012
 411          proc, _ = _start_relay(port, {
 412              "ORLY_RELAY_URL": f"ws://{HOST}:{port}",
 413              "ORLY_AUTH_REQUIRED": "true",
 414          })
 415          try:
 416              ws = _ws_handshake(HOST, port)
 417              _drain_until(ws, "AUTH")
 418  
 419              _ws_send_json(ws, ["REQ", "test-sub", {"limit": 1}])
 420              msg = _drain_until(ws, "CLOSED", timeout=10)
 421              assert "auth-required" in msg[2]
 422              ws.close()
 423          finally:
 424              _stop_relay(proc)
 425  
 426  
 427  class TestFreeWriteLimit:
 428      """ORLY_FREE_WRITE_LIMIT caps unauthed writes per IP."""
 429  
 430      def test_free_writes_then_rate_limit(self):
 431          port = 24013
 432          proc, _ = _start_relay(port, {
 433              "ORLY_RELAY_URL": f"ws://{HOST}:{port}",
 434              "ORLY_AUTH_TO_WRITE": "true",
 435              "ORLY_FREE_WRITE_LIMIT": "3",
 436              "ORLY_FREE_WRITE_WINDOW": "3600",
 437          })
 438          try:
 439              ws = _ws_handshake(HOST, port)
 440              _drain_until(ws, "AUTH")
 441  
 442              # First 3 should succeed (free write bucket).
 443              for i in range(3):
 444                  ev = make_event(TEST_SECKEY, f"free-{i}", kind=1)
 445                  _ws_send_json(ws, ["EVENT", ev])
 446                  ok = _drain_until(ws, "OK")
 447                  assert ok[2] is True, f"event {i} rejected: {ok}"
 448  
 449              # 4th should be rate-limited.
 450              ev = make_event(TEST_SECKEY, "free-overflow", kind=1)
 451              _ws_send_json(ws, ["EVENT", ev])
 452              ok = _drain_until(ws, "OK")
 453              assert ok[2] is False
 454              assert "rate-limited" in ok[3]
 455              ws.close()
 456          finally:
 457              _stop_relay(proc)
 458  
 459  
 460  class TestSubscriptionLimit:
 461      """ORLY_MAX_SUBSCRIPTIONS limits active subscriptions per connection."""
 462  
 463      def test_excess_sub_closed(self):
 464          port = 24014
 465          proc, _ = _start_relay(port, {
 466              "ORLY_MAX_SUBSCRIPTIONS": "3",
 467          })
 468          try:
 469              ws = _ws_handshake(HOST, port)
 470              time.sleep(0.2)
 471  
 472              # Open 3 subscriptions.
 473              for i in range(3):
 474                  _ws_send_json(ws, ["REQ", f"sub-{i}", {"limit": 1}])
 475                  _drain_until(ws, "EOSE")
 476  
 477              # 4th should be rejected.
 478              _ws_send_json(ws, ["REQ", "sub-overflow", {"limit": 1}])
 479              msg = _drain_until(ws, "CLOSED")
 480              assert "too many" in msg[2]
 481              ws.close()
 482          finally:
 483              _stop_relay(proc)
 484  
 485  
 486  class TestQueryResultLimit:
 487      """ORLY_QUERY_RESULT_LIMIT caps events returned per REQ."""
 488  
 489      def test_result_capped(self):
 490          port = 24015
 491          proc, _ = _start_relay(port, {
 492              "ORLY_QUERY_RESULT_LIMIT": "5",
 493          })
 494          try:
 495              ws = _ws_handshake(HOST, port)
 496              time.sleep(0.2)
 497  
 498              # Publish 10 events.
 499              for i in range(10):
 500                  ev = make_event(TEST_SECKEY, f"limit-test-{i}", kind=1,
 501                                  created_at=int(time.time()) - 100 + i)
 502                  _ws_send_json(ws, ["EVENT", ev])
 503                  _drain_until(ws, "OK")
 504  
 505              # Query all — should get at most 5.
 506              _ws_send_json(ws, ["REQ", "q-limit", {"authors": [TEST_PUBKEY], "kinds": [1]}])
 507              count = 0
 508              while True:
 509                  msg = _ws_recv_json(ws)
 510                  if msg[0] == "EOSE":
 511                      break
 512                  if msg[0] == "EVENT":
 513                      count += 1
 514              assert count <= 5, f"got {count} events, expected at most 5"
 515              _ws_send_json(ws, ["CLOSE", "q-limit"])
 516              ws.close()
 517          finally:
 518              _stop_relay(proc)
 519  
 520  
 521  class TestLiveSubscription:
 522      """Live subscription receives broadcast events."""
 523  
 524      def test_live_delivery(self):
 525          port = 24016
 526          proc, _ = _start_relay(port)
 527          try:
 528              # Subscriber connects and subscribes.
 529              sub_ws = _ws_handshake(HOST, port)
 530              time.sleep(0.1)
 531              _ws_send_json(sub_ws, ["REQ", "live", {"authors": [TEST_PUBKEY], "kinds": [1]}])
 532              _drain_until(sub_ws, "EOSE")
 533  
 534              # Publisher sends an event.
 535              pub_ws = _ws_handshake(HOST, port)
 536              time.sleep(0.1)
 537              ev = make_event(TEST_SECKEY, "live-broadcast-test", kind=1)
 538              _ws_send_json(pub_ws, ["EVENT", ev])
 539              ok = _drain_until(pub_ws, "OK")
 540              assert ok[2] is True
 541  
 542              # Subscriber should get it.
 543              msg = _ws_recv_json(sub_ws, timeout=5)
 544              assert msg[0] == "EVENT"
 545              assert msg[2]["content"] == "live-broadcast-test"
 546  
 547              _ws_send_json(sub_ws, ["CLOSE", "live"])
 548              sub_ws.close()
 549              pub_ws.close()
 550          finally:
 551              _stop_relay(proc)
 552  
 553  
 554  class TestReplaceableEvents:
 555      """NIP-01 replaceable event handling — newer replaces older."""
 556  
 557      def test_replaceable_kind_overwrites(self):
 558          port = 24017
 559          proc, _ = _start_relay(port)
 560          try:
 561              ws = _ws_handshake(HOST, port)
 562              time.sleep(0.1)
 563  
 564              now = int(time.time())
 565              # Kind 0 (metadata) is replaceable.
 566              ev1 = make_event(TEST_SECKEY, '{"name":"old"}', kind=0, created_at=now - 10)
 567              _ws_send_json(ws, ["EVENT", ev1])
 568              ok1 = _drain_until(ws, "OK")
 569              assert ok1[2] is True
 570  
 571              ev2 = make_event(TEST_SECKEY, '{"name":"new"}', kind=0, created_at=now)
 572              _ws_send_json(ws, ["EVENT", ev2])
 573              ok2 = _drain_until(ws, "OK")
 574              assert ok2[2] is True
 575  
 576              # Query — should only get the newer one.
 577              _ws_send_json(ws, ["REQ", "rep", {"authors": [TEST_PUBKEY], "kinds": [0]}])
 578              events = []
 579              while True:
 580                  msg = _ws_recv_json(ws)
 581                  if msg[0] == "EOSE":
 582                      break
 583                  if msg[0] == "EVENT":
 584                      events.append(msg[2])
 585              assert len(events) == 1
 586              assert events[0]["content"] == '{"name":"new"}'
 587  
 588              _ws_send_json(ws, ["CLOSE", "rep"])
 589              ws.close()
 590          finally:
 591              _stop_relay(proc)
 592  
 593  
 594  class TestEphemeralEvents:
 595      """NIP-16 ephemeral events are accepted but not stored."""
 596  
 597      def test_ephemeral_not_stored(self):
 598          port = 24018
 599          proc, _ = _start_relay(port)
 600          try:
 601              ws = _ws_handshake(HOST, port)
 602              time.sleep(0.1)
 603  
 604              # Kind 20000-29999 are ephemeral.
 605              ev = make_event(TEST_SECKEY, "ephemeral-test", kind=20001)
 606              _ws_send_json(ws, ["EVENT", ev])
 607              ok = _drain_until(ws, "OK")
 608              assert ok[2] is True
 609  
 610              # Query — should not find it.
 611              _ws_send_json(ws, ["REQ", "eph", {"ids": [ev["id"]]}])
 612              msg = _drain_until(ws, "EOSE")
 613              # No EVENT should precede the EOSE for this ID.
 614              ws.close()
 615          finally:
 616              _stop_relay(proc)
 617  
 618  
 619  class TestDeletion:
 620      """NIP-09 event deletion."""
 621  
 622      def test_delete_own_event(self):
 623          port = 24019
 624          proc, _ = _start_relay(port)
 625          try:
 626              ws = _ws_handshake(HOST, port)
 627              time.sleep(0.1)
 628  
 629              ev = make_event(TEST_SECKEY, "to-be-deleted", kind=1)
 630              _ws_send_json(ws, ["EVENT", ev])
 631              ok = _drain_until(ws, "OK")
 632              assert ok[2] is True
 633  
 634              # Delete it — kind 5, with "e" tag pointing to the event.
 635              del_ev = make_event(TEST_SECKEY, "", kind=5, tags=[["e", ev["id"]]])
 636              _ws_send_json(ws, ["EVENT", del_ev])
 637              ok2 = _drain_until(ws, "OK")
 638              assert ok2[2] is True
 639  
 640              # Query — should not find the original event.
 641              _ws_send_json(ws, ["REQ", "del", {"ids": [ev["id"]]}])
 642              msg = _drain_until(ws, "EOSE")
 643              _ws_send_json(ws, ["CLOSE", "del"])
 644              ws.close()
 645          finally:
 646              _stop_relay(proc)
 647  
 648  
 649  class TestNIP42AuthChallenge:
 650      """NIP-42 AUTH challenge is sent when RelayURL is configured."""
 651  
 652      def test_auth_challenge_sent(self):
 653          port = 24020
 654          proc, _ = _start_relay(port, {
 655              "ORLY_RELAY_URL": f"ws://{HOST}:{port}",
 656          })
 657          try:
 658              ws = _ws_handshake(HOST, port)
 659              msg = _drain_until(ws, "AUTH")
 660              assert msg[0] == "AUTH"
 661              assert isinstance(msg[1], str)
 662              assert len(msg[1]) == 64  # 32 bytes hex
 663              ws.close()
 664          finally:
 665              _stop_relay(proc)
 666  
 667      def test_no_challenge_without_relay_url(self):
 668          port = 24021
 669          proc, _ = _start_relay(port)
 670          try:
 671              ws = _ws_handshake(HOST, port)
 672              # With no RelayURL, no AUTH challenge. Send REQ instead.
 673              _ws_send_json(ws, ["REQ", "nochall", {"limit": 1}])
 674              msg = _ws_recv_json(ws)
 675              # Should get EOSE, not AUTH.
 676              assert msg[0] in ("EVENT", "EOSE"), f"unexpected: {msg[0]}"
 677              ws.close()
 678          finally:
 679              _stop_relay(proc)
 680  
 681  
 682  class TestEventValidation:
 683      """Pipeline rejects malformed events."""
 684  
 685      def test_bad_signature_rejected(self):
 686          port = 24022
 687          proc, _ = _start_relay(port)
 688          try:
 689              ws = _ws_handshake(HOST, port)
 690              time.sleep(0.1)
 691  
 692              ev = make_event(TEST_SECKEY, "bad-sig-test", kind=1)
 693              # Corrupt the signature.
 694              ev["sig"] = "00" * 64
 695              _ws_send_json(ws, ["EVENT", ev])
 696              ok = _drain_until(ws, "OK")
 697              assert ok[2] is False
 698              assert "invalid" in ok[3] or "bad" in ok[3].lower() or "signature" in ok[3].lower()
 699              ws.close()
 700          finally:
 701              _stop_relay(proc)
 702  
 703      def test_future_timestamp_rejected(self):
 704          port = 24023
 705          proc, _ = _start_relay(port)
 706          try:
 707              ws = _ws_handshake(HOST, port)
 708              time.sleep(0.1)
 709  
 710              far_future = int(time.time()) + 86400
 711              ev = make_event(TEST_SECKEY, "future-test", kind=1, created_at=far_future)
 712              _ws_send_json(ws, ["EVENT", ev])
 713              ok = _drain_until(ws, "OK")
 714              assert ok[2] is False
 715              assert "future" in ok[3]
 716              ws.close()
 717          finally:
 718              _stop_relay(proc)
 719  
 720  
 721  class TestCloseSubscription:
 722      """CLOSE unsubscribes — no more events after CLOSE."""
 723  
 724      def test_close_stops_delivery(self):
 725          port = 24024
 726          proc, _ = _start_relay(port)
 727          try:
 728              ws = _ws_handshake(HOST, port)
 729              time.sleep(0.1)
 730  
 731              _ws_send_json(ws, ["REQ", "closeme", {"authors": [TEST_PUBKEY], "kinds": [1]}])
 732              _drain_until(ws, "EOSE")
 733              _ws_send_json(ws, ["CLOSE", "closeme"])
 734              time.sleep(0.2)
 735  
 736              # Publish an event — should NOT arrive on the closed sub.
 737              ev = make_event(TEST_SECKEY, "after-close", kind=1)
 738              pub_ws = _ws_handshake(HOST, port)
 739              _ws_send_json(pub_ws, ["EVENT", ev])
 740              _drain_until(pub_ws, "OK")
 741              pub_ws.close()
 742  
 743              # Try to read — should timeout (no event delivered).
 744              try:
 745                  msg = _ws_recv_json(ws, timeout=1)
 746                  # If we get something, it should not be an EVENT for closeme.
 747                  if msg[0] == "EVENT":
 748                      assert msg[1] != "closeme", "received event on closed subscription"
 749              except (socket.timeout, ConnectionError):
 750                  pass  # expected — no message
 751              ws.close()
 752          finally:
 753              _stop_relay(proc)
 754  
 755  
 756  class TestCountEnvelope:
 757      """NIP-45 COUNT returns event counts."""
 758  
 759      def test_count_returns_number(self):
 760          port = 24025
 761          proc, _ = _start_relay(port)
 762          try:
 763              ws = _ws_handshake(HOST, port)
 764              time.sleep(0.1)
 765  
 766              # Publish a few events.
 767              for i in range(3):
 768                  ev = make_event(TEST_SECKEY, f"count-{i}", kind=1,
 769                                  created_at=int(time.time()) + i)
 770                  _ws_send_json(ws, ["EVENT", ev])
 771                  _drain_until(ws, "OK")
 772  
 773              _ws_send_json(ws, ["COUNT", "c1", {"authors": [TEST_PUBKEY], "kinds": [1]}])
 774              msg = _drain_until(ws, "COUNT")
 775              assert msg[1] == "c1"
 776              count = msg[2] if isinstance(msg[2], int) else msg[2].get("count", 0)
 777              assert count >= 3
 778              ws.close()
 779          finally:
 780              _stop_relay(proc)
 781