test_relay_policy.py raw
1 """Relay policy tests — connection limits, auth gating, HTTP guard, CORS, health.
2
3 Each test class starts its own relay with the specific env vars it needs.
4 Uses raw sockets for WebSocket to avoid test framework dependencies.
5 """
6
7 import json
8 import os
9 import signal
10 import socket
11 import struct
12 import subprocess
13 import time
14 import urllib.request
15 import urllib.error
16
17 import pytest
18
19 from nostr_helpers import make_event, TEST_SECKEY, TEST_PUBKEY
20
21 PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
22 RELAY_BIN = os.path.join(PROJECT_ROOT, "smesh")
23 STATIC_DIR = os.path.join(PROJECT_ROOT, "web", "static")
24 HOST = "127.0.0.1"
25
26
27 def _fresh_data_dir(name):
28 d = f"/tmp/smesh-test-{name}-{os.getpid()}"
29 os.makedirs(d, exist_ok=True)
30 return d
31
32
33 def _wait_port_free(port, timeout=5):
34 """Wait until nothing is listening on port."""
35 deadline = time.time() + timeout
36 while time.time() < deadline:
37 try:
38 s = socket.create_connection((HOST, port), timeout=0.2)
39 s.close()
40 time.sleep(0.1)
41 except (ConnectionRefusedError, OSError, socket.timeout):
42 return
43 raise RuntimeError(f"port {port} still in use after {timeout}s")
44
45
46 def _start_relay(port, extra_env=None, data_dir_name=None):
47 """Start a relay and wait for it to accept connections."""
48 if not os.path.exists(RELAY_BIN):
49 pytest.skip("Relay binary not found")
50 _wait_port_free(port)
51 name = data_dir_name or f"policy-{port}"
52 data_dir = _fresh_data_dir(name)
53 env = os.environ.copy()
54 env["ORLY_DATA_DIR"] = data_dir
55 env["ORLY_LISTEN"] = HOST
56 env["ORLY_PORT"] = str(port)
57 env["ORLY_STATIC_DIR"] = STATIC_DIR
58 env["ORLY_LOG_TO_STDOUT"] = "true"
59 if extra_env:
60 env.update(extra_env)
61 proc = subprocess.Popen(
62 [RELAY_BIN],
63 env=env,
64 stdout=subprocess.PIPE,
65 stderr=subprocess.PIPE,
66 )
67 for _ in range(40):
68 try:
69 s = socket.create_connection((HOST, port), timeout=0.5)
70 s.close()
71 break
72 except Exception:
73 if proc.poll() is not None:
74 out = proc.stderr.read().decode(errors="replace")
75 pytest.fail(f"Relay on :{port} exited early: {out[:500]}")
76 time.sleep(0.15)
77 else:
78 proc.kill()
79 out = proc.stderr.read().decode(errors="replace")
80 pytest.fail(f"Relay on :{port} failed to start: {out[:500]}")
81 time.sleep(0.05)
82 return proc, data_dir
83
84
85 def _stop_relay(proc):
86 proc.send_signal(signal.SIGTERM)
87 try:
88 proc.wait(timeout=3)
89 except subprocess.TimeoutExpired:
90 proc.kill()
91 proc.wait(timeout=2)
92
93
94 # ── WebSocket helpers ────────────────────────────────
95
96
97 def _ws_handshake(host, port):
98 """Open a raw TCP socket and perform a WS upgrade. Returns the socket."""
99 sock = socket.create_connection((host, port), timeout=5)
100 import base64
101 ws_key = base64.b64encode(os.urandom(16)).decode()
102 handshake = (
103 f"GET / HTTP/1.1\r\n"
104 f"Host: {host}:{port}\r\n"
105 f"Upgrade: websocket\r\n"
106 f"Connection: Upgrade\r\n"
107 f"Sec-WebSocket-Key: {ws_key}\r\n"
108 f"Sec-WebSocket-Version: 13\r\n"
109 f"\r\n"
110 )
111 sock.sendall(handshake.encode())
112 resp = b""
113 while b"\r\n\r\n" not in resp:
114 resp += sock.recv(4096)
115 if b"101" not in resp:
116 sock.close()
117 raise ConnectionError(f"WS upgrade failed: {resp[:200]}")
118 return sock
119
120
121 def _ws_send(sock, payload):
122 """Send a masked WS text frame."""
123 mask = os.urandom(4)
124 header = bytearray()
125 header.append(0x81)
126 length = len(payload)
127 if length < 126:
128 header.append(0x80 | length)
129 elif length < 65536:
130 header.append(0x80 | 126)
131 header.extend(struct.pack(">H", length))
132 else:
133 header.append(0x80 | 127)
134 header.extend(struct.pack(">Q", length))
135 header.extend(mask)
136 masked = bytearray(payload[i] ^ mask[i % 4] for i in range(len(payload)))
137 sock.sendall(bytes(header) + bytes(masked))
138
139
140 def _ws_recv(sock, timeout=5):
141 """Read one WS frame (unmasked from server). Returns payload bytes."""
142 sock.settimeout(timeout)
143 hdr = _recv_exact(sock, 2)
144 length = hdr[1] & 0x7F
145 if length == 126:
146 length = struct.unpack(">H", _recv_exact(sock, 2))[0]
147 elif length == 127:
148 length = struct.unpack(">Q", _recv_exact(sock, 8))[0]
149 return _recv_exact(sock, length)
150
151
152 def _recv_exact(sock, n):
153 buf = bytearray()
154 while len(buf) < n:
155 chunk = sock.recv(n - len(buf))
156 if not chunk:
157 raise ConnectionError("socket closed")
158 buf.extend(chunk)
159 return bytes(buf)
160
161
162 def _ws_send_json(sock, obj):
163 _ws_send(sock, json.dumps(obj).encode())
164
165
166 def _ws_recv_json(sock, timeout=5):
167 return json.loads(_ws_recv(sock, timeout))
168
169
170 def _drain_until(sock, label, timeout=5):
171 """Read WS messages until one with msg[0]==label is found. Returns it."""
172 deadline = time.time() + timeout
173 while time.time() < deadline:
174 msg = _ws_recv_json(sock, timeout=max(0.5, deadline - time.time()))
175 if msg[0] == label:
176 return msg
177 raise TimeoutError(f"never received {label}")
178
179
180 # ── Tests ────────────────────────────────────────────
181
182
183 class TestHealthEndpoint:
184 """GET /health returns 200 ok."""
185
186 def test_health(self):
187 port = 24001
188 proc, _ = _start_relay(port)
189 try:
190 resp = urllib.request.urlopen(f"http://{HOST}:{port}/health", timeout=3)
191 assert resp.status == 200
192 assert resp.read() == b"ok"
193 finally:
194 _stop_relay(proc)
195
196
197 class TestNIP11:
198 """NIP-11 relay info respects ORLY_APP_NAME."""
199
200 def test_custom_name(self):
201 port = 24002
202 proc, _ = _start_relay(port, {"ORLY_APP_NAME": "TestRelay"})
203 try:
204 req = urllib.request.Request(
205 f"http://{HOST}:{port}/",
206 headers={"Accept": "application/nostr+json"},
207 )
208 resp = urllib.request.urlopen(req, timeout=3)
209 data = json.loads(resp.read())
210 assert data["name"] == "TestRelay"
211 assert 42 in data["supported_nips"]
212 assert 50 in data["supported_nips"]
213 finally:
214 _stop_relay(proc)
215
216 def test_default_name(self):
217 port = 24003
218 proc, _ = _start_relay(port)
219 try:
220 req = urllib.request.Request(
221 f"http://{HOST}:{port}/",
222 headers={"Accept": "application/nostr+json"},
223 )
224 resp = urllib.request.urlopen(req, timeout=3)
225 data = json.loads(resp.read())
226 assert data["name"] == "ORLY"
227 finally:
228 _stop_relay(proc)
229
230
231 class TestCORS:
232 """CORS preflight responses."""
233
234 def test_options_returns_cors(self):
235 port = 24004
236 proc, _ = _start_relay(port, {
237 "ORLY_CORS_ENABLED": "true",
238 })
239 try:
240 req = urllib.request.Request(
241 f"http://{HOST}:{port}/health",
242 method="OPTIONS",
243 headers={"Origin": "http://example.com"},
244 )
245 resp = urllib.request.urlopen(req, timeout=3)
246 assert resp.status == 204
247 assert "access-control-allow-origin" in {
248 k.lower(): v for k, v in resp.headers.items()
249 }
250 finally:
251 _stop_relay(proc)
252
253
254 class TestGlobalConnLimit:
255 """ORLY_MAX_GLOBAL_CONNECTIONS limits total connections."""
256
257 def test_excess_connection_rejected(self):
258 port = 24005
259 proc, _ = _start_relay(port, {
260 "ORLY_MAX_GLOBAL_CONNECTIONS": "2",
261 })
262 try:
263 socks = []
264 for _ in range(2):
265 s = _ws_handshake(HOST, port)
266 socks.append(s)
267 # Third connection should be rejected (TCP close).
268 try:
269 s3 = socket.create_connection((HOST, port), timeout=2)
270 # Send HTTP request — if global limit works, server closes this.
271 s3.sendall(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n")
272 s3.settimeout(2)
273 data = s3.recv(4096)
274 # Connection either closed or gets no WS upgrade
275 s3.close()
276 except (ConnectionError, socket.timeout, OSError):
277 pass # expected — rejected
278 for s in socks:
279 s.close()
280 finally:
281 _stop_relay(proc)
282
283
284 class TestPerIPConnLimit:
285 """ORLY_MAX_CONN_PER_IP limits connections per IP."""
286
287 def test_excess_per_ip_rejected(self):
288 port = 24006
289 proc, _ = _start_relay(port, {
290 "ORLY_MAX_CONN_PER_IP": "2",
291 })
292 try:
293 socks = []
294 for _ in range(2):
295 s = _ws_handshake(HOST, port)
296 socks.append(s)
297 # Third from same IP should fail.
298 try:
299 s3 = socket.create_connection((HOST, port), timeout=2)
300 s3.sendall(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n")
301 s3.settimeout(2)
302 data = s3.recv(4096)
303 s3.close()
304 except (ConnectionError, socket.timeout, OSError):
305 pass # expected
306 for s in socks:
307 s.close()
308 finally:
309 _stop_relay(proc)
310
311
312 class TestHTTPGuard:
313 """HTTP guard rate limiting and bot blocking."""
314
315 def test_bot_user_agent_blocked(self):
316 port = 24007
317 proc, _ = _start_relay(port, {
318 "ORLY_HTTP_GUARD_ENABLED": "true",
319 "ORLY_HTTP_GUARD_BOT_BLOCK": "true",
320 })
321 try:
322 req = urllib.request.Request(
323 f"http://{HOST}:{port}/health",
324 headers={"User-Agent": "Mozilla/5.0 (compatible; AhrefsBot/7.0)"},
325 )
326 try:
327 resp = urllib.request.urlopen(req, timeout=3)
328 # Should get 403 forbidden
329 assert resp.status == 403
330 except urllib.error.HTTPError as e:
331 assert e.code == 403
332 finally:
333 _stop_relay(proc)
334
335 def test_normal_user_agent_allowed(self):
336 port = 24008
337 proc, _ = _start_relay(port, {
338 "ORLY_HTTP_GUARD_ENABLED": "true",
339 "ORLY_HTTP_GUARD_BOT_BLOCK": "true",
340 })
341 try:
342 req = urllib.request.Request(
343 f"http://{HOST}:{port}/health",
344 headers={"User-Agent": "Mozilla/5.0 Firefox"},
345 )
346 resp = urllib.request.urlopen(req, timeout=3)
347 assert resp.status == 200
348 finally:
349 _stop_relay(proc)
350
351
352 class TestOpenRelay:
353 """Default open relay — no auth required, events flow freely."""
354
355 def test_publish_and_query(self):
356 port = 24010
357 proc, _ = _start_relay(port)
358 try:
359 ws = _ws_handshake(HOST, port)
360 # Drain any AUTH challenge (only sent if RelayURL is set).
361 time.sleep(0.2)
362
363 ev = make_event(TEST_SECKEY, "open-relay-test", kind=1)
364 _ws_send_json(ws, ["EVENT", ev])
365 ok = _drain_until(ws, "OK")
366 assert ok[2] is True, f"rejected: {ok}"
367
368 # Query it back.
369 _ws_send_json(ws, ["REQ", "q1", {"ids": [ev["id"]]}])
370 msg = _drain_until(ws, "EVENT")
371 assert msg[2]["id"] == ev["id"]
372 _drain_until(ws, "EOSE")
373 _ws_send_json(ws, ["CLOSE", "q1"])
374 ws.close()
375 finally:
376 _stop_relay(proc)
377
378
379 class TestAuthToWrite:
380 """ORLY_AUTH_TO_WRITE=true requires NIP-42 auth before EVENT."""
381
382 def test_unauthed_event_rejected(self):
383 port = 24011
384 proc, _ = _start_relay(port, {
385 "ORLY_RELAY_URL": f"ws://{HOST}:{port}",
386 "ORLY_AUTH_TO_WRITE": "true",
387 "ORLY_FREE_WRITE_LIMIT": "0",
388 })
389 try:
390 ws = _ws_handshake(HOST, port)
391 # Drain AUTH challenge.
392 auth_msg = _drain_until(ws, "AUTH")
393 assert auth_msg[0] == "AUTH"
394
395 # Send event without authenticating.
396 ev = make_event(TEST_SECKEY, "should-be-rejected", kind=1)
397 _ws_send_json(ws, ["EVENT", ev])
398 ok = _drain_until(ws, "OK")
399 assert ok[2] is False, "unauthed event should be rejected"
400 assert "auth-required" in ok[3]
401 ws.close()
402 finally:
403 _stop_relay(proc)
404
405
406 class TestAuthRequired:
407 """ORLY_AUTH_REQUIRED=true requires auth for REQ/COUNT."""
408
409 def test_unauthed_req_closed(self):
410 port = 24012
411 proc, _ = _start_relay(port, {
412 "ORLY_RELAY_URL": f"ws://{HOST}:{port}",
413 "ORLY_AUTH_REQUIRED": "true",
414 })
415 try:
416 ws = _ws_handshake(HOST, port)
417 _drain_until(ws, "AUTH")
418
419 _ws_send_json(ws, ["REQ", "test-sub", {"limit": 1}])
420 msg = _drain_until(ws, "CLOSED", timeout=10)
421 assert "auth-required" in msg[2]
422 ws.close()
423 finally:
424 _stop_relay(proc)
425
426
427 class TestFreeWriteLimit:
428 """ORLY_FREE_WRITE_LIMIT caps unauthed writes per IP."""
429
430 def test_free_writes_then_rate_limit(self):
431 port = 24013
432 proc, _ = _start_relay(port, {
433 "ORLY_RELAY_URL": f"ws://{HOST}:{port}",
434 "ORLY_AUTH_TO_WRITE": "true",
435 "ORLY_FREE_WRITE_LIMIT": "3",
436 "ORLY_FREE_WRITE_WINDOW": "3600",
437 })
438 try:
439 ws = _ws_handshake(HOST, port)
440 _drain_until(ws, "AUTH")
441
442 # First 3 should succeed (free write bucket).
443 for i in range(3):
444 ev = make_event(TEST_SECKEY, f"free-{i}", kind=1)
445 _ws_send_json(ws, ["EVENT", ev])
446 ok = _drain_until(ws, "OK")
447 assert ok[2] is True, f"event {i} rejected: {ok}"
448
449 # 4th should be rate-limited.
450 ev = make_event(TEST_SECKEY, "free-overflow", kind=1)
451 _ws_send_json(ws, ["EVENT", ev])
452 ok = _drain_until(ws, "OK")
453 assert ok[2] is False
454 assert "rate-limited" in ok[3]
455 ws.close()
456 finally:
457 _stop_relay(proc)
458
459
460 class TestSubscriptionLimit:
461 """ORLY_MAX_SUBSCRIPTIONS limits active subscriptions per connection."""
462
463 def test_excess_sub_closed(self):
464 port = 24014
465 proc, _ = _start_relay(port, {
466 "ORLY_MAX_SUBSCRIPTIONS": "3",
467 })
468 try:
469 ws = _ws_handshake(HOST, port)
470 time.sleep(0.2)
471
472 # Open 3 subscriptions.
473 for i in range(3):
474 _ws_send_json(ws, ["REQ", f"sub-{i}", {"limit": 1}])
475 _drain_until(ws, "EOSE")
476
477 # 4th should be rejected.
478 _ws_send_json(ws, ["REQ", "sub-overflow", {"limit": 1}])
479 msg = _drain_until(ws, "CLOSED")
480 assert "too many" in msg[2]
481 ws.close()
482 finally:
483 _stop_relay(proc)
484
485
486 class TestQueryResultLimit:
487 """ORLY_QUERY_RESULT_LIMIT caps events returned per REQ."""
488
489 def test_result_capped(self):
490 port = 24015
491 proc, _ = _start_relay(port, {
492 "ORLY_QUERY_RESULT_LIMIT": "5",
493 })
494 try:
495 ws = _ws_handshake(HOST, port)
496 time.sleep(0.2)
497
498 # Publish 10 events.
499 for i in range(10):
500 ev = make_event(TEST_SECKEY, f"limit-test-{i}", kind=1,
501 created_at=int(time.time()) - 100 + i)
502 _ws_send_json(ws, ["EVENT", ev])
503 _drain_until(ws, "OK")
504
505 # Query all — should get at most 5.
506 _ws_send_json(ws, ["REQ", "q-limit", {"authors": [TEST_PUBKEY], "kinds": [1]}])
507 count = 0
508 while True:
509 msg = _ws_recv_json(ws)
510 if msg[0] == "EOSE":
511 break
512 if msg[0] == "EVENT":
513 count += 1
514 assert count <= 5, f"got {count} events, expected at most 5"
515 _ws_send_json(ws, ["CLOSE", "q-limit"])
516 ws.close()
517 finally:
518 _stop_relay(proc)
519
520
521 class TestLiveSubscription:
522 """Live subscription receives broadcast events."""
523
524 def test_live_delivery(self):
525 port = 24016
526 proc, _ = _start_relay(port)
527 try:
528 # Subscriber connects and subscribes.
529 sub_ws = _ws_handshake(HOST, port)
530 time.sleep(0.1)
531 _ws_send_json(sub_ws, ["REQ", "live", {"authors": [TEST_PUBKEY], "kinds": [1]}])
532 _drain_until(sub_ws, "EOSE")
533
534 # Publisher sends an event.
535 pub_ws = _ws_handshake(HOST, port)
536 time.sleep(0.1)
537 ev = make_event(TEST_SECKEY, "live-broadcast-test", kind=1)
538 _ws_send_json(pub_ws, ["EVENT", ev])
539 ok = _drain_until(pub_ws, "OK")
540 assert ok[2] is True
541
542 # Subscriber should get it.
543 msg = _ws_recv_json(sub_ws, timeout=5)
544 assert msg[0] == "EVENT"
545 assert msg[2]["content"] == "live-broadcast-test"
546
547 _ws_send_json(sub_ws, ["CLOSE", "live"])
548 sub_ws.close()
549 pub_ws.close()
550 finally:
551 _stop_relay(proc)
552
553
554 class TestReplaceableEvents:
555 """NIP-01 replaceable event handling — newer replaces older."""
556
557 def test_replaceable_kind_overwrites(self):
558 port = 24017
559 proc, _ = _start_relay(port)
560 try:
561 ws = _ws_handshake(HOST, port)
562 time.sleep(0.1)
563
564 now = int(time.time())
565 # Kind 0 (metadata) is replaceable.
566 ev1 = make_event(TEST_SECKEY, '{"name":"old"}', kind=0, created_at=now - 10)
567 _ws_send_json(ws, ["EVENT", ev1])
568 ok1 = _drain_until(ws, "OK")
569 assert ok1[2] is True
570
571 ev2 = make_event(TEST_SECKEY, '{"name":"new"}', kind=0, created_at=now)
572 _ws_send_json(ws, ["EVENT", ev2])
573 ok2 = _drain_until(ws, "OK")
574 assert ok2[2] is True
575
576 # Query — should only get the newer one.
577 _ws_send_json(ws, ["REQ", "rep", {"authors": [TEST_PUBKEY], "kinds": [0]}])
578 events = []
579 while True:
580 msg = _ws_recv_json(ws)
581 if msg[0] == "EOSE":
582 break
583 if msg[0] == "EVENT":
584 events.append(msg[2])
585 assert len(events) == 1
586 assert events[0]["content"] == '{"name":"new"}'
587
588 _ws_send_json(ws, ["CLOSE", "rep"])
589 ws.close()
590 finally:
591 _stop_relay(proc)
592
593
594 class TestEphemeralEvents:
595 """NIP-16 ephemeral events are accepted but not stored."""
596
597 def test_ephemeral_not_stored(self):
598 port = 24018
599 proc, _ = _start_relay(port)
600 try:
601 ws = _ws_handshake(HOST, port)
602 time.sleep(0.1)
603
604 # Kind 20000-29999 are ephemeral.
605 ev = make_event(TEST_SECKEY, "ephemeral-test", kind=20001)
606 _ws_send_json(ws, ["EVENT", ev])
607 ok = _drain_until(ws, "OK")
608 assert ok[2] is True
609
610 # Query — should not find it.
611 _ws_send_json(ws, ["REQ", "eph", {"ids": [ev["id"]]}])
612 msg = _drain_until(ws, "EOSE")
613 # No EVENT should precede the EOSE for this ID.
614 ws.close()
615 finally:
616 _stop_relay(proc)
617
618
619 class TestDeletion:
620 """NIP-09 event deletion."""
621
622 def test_delete_own_event(self):
623 port = 24019
624 proc, _ = _start_relay(port)
625 try:
626 ws = _ws_handshake(HOST, port)
627 time.sleep(0.1)
628
629 ev = make_event(TEST_SECKEY, "to-be-deleted", kind=1)
630 _ws_send_json(ws, ["EVENT", ev])
631 ok = _drain_until(ws, "OK")
632 assert ok[2] is True
633
634 # Delete it — kind 5, with "e" tag pointing to the event.
635 del_ev = make_event(TEST_SECKEY, "", kind=5, tags=[["e", ev["id"]]])
636 _ws_send_json(ws, ["EVENT", del_ev])
637 ok2 = _drain_until(ws, "OK")
638 assert ok2[2] is True
639
640 # Query — should not find the original event.
641 _ws_send_json(ws, ["REQ", "del", {"ids": [ev["id"]]}])
642 msg = _drain_until(ws, "EOSE")
643 _ws_send_json(ws, ["CLOSE", "del"])
644 ws.close()
645 finally:
646 _stop_relay(proc)
647
648
649 class TestNIP42AuthChallenge:
650 """NIP-42 AUTH challenge is sent when RelayURL is configured."""
651
652 def test_auth_challenge_sent(self):
653 port = 24020
654 proc, _ = _start_relay(port, {
655 "ORLY_RELAY_URL": f"ws://{HOST}:{port}",
656 })
657 try:
658 ws = _ws_handshake(HOST, port)
659 msg = _drain_until(ws, "AUTH")
660 assert msg[0] == "AUTH"
661 assert isinstance(msg[1], str)
662 assert len(msg[1]) == 64 # 32 bytes hex
663 ws.close()
664 finally:
665 _stop_relay(proc)
666
667 def test_no_challenge_without_relay_url(self):
668 port = 24021
669 proc, _ = _start_relay(port)
670 try:
671 ws = _ws_handshake(HOST, port)
672 # With no RelayURL, no AUTH challenge. Send REQ instead.
673 _ws_send_json(ws, ["REQ", "nochall", {"limit": 1}])
674 msg = _ws_recv_json(ws)
675 # Should get EOSE, not AUTH.
676 assert msg[0] in ("EVENT", "EOSE"), f"unexpected: {msg[0]}"
677 ws.close()
678 finally:
679 _stop_relay(proc)
680
681
682 class TestEventValidation:
683 """Pipeline rejects malformed events."""
684
685 def test_bad_signature_rejected(self):
686 port = 24022
687 proc, _ = _start_relay(port)
688 try:
689 ws = _ws_handshake(HOST, port)
690 time.sleep(0.1)
691
692 ev = make_event(TEST_SECKEY, "bad-sig-test", kind=1)
693 # Corrupt the signature.
694 ev["sig"] = "00" * 64
695 _ws_send_json(ws, ["EVENT", ev])
696 ok = _drain_until(ws, "OK")
697 assert ok[2] is False
698 assert "invalid" in ok[3] or "bad" in ok[3].lower() or "signature" in ok[3].lower()
699 ws.close()
700 finally:
701 _stop_relay(proc)
702
703 def test_future_timestamp_rejected(self):
704 port = 24023
705 proc, _ = _start_relay(port)
706 try:
707 ws = _ws_handshake(HOST, port)
708 time.sleep(0.1)
709
710 far_future = int(time.time()) + 86400
711 ev = make_event(TEST_SECKEY, "future-test", kind=1, created_at=far_future)
712 _ws_send_json(ws, ["EVENT", ev])
713 ok = _drain_until(ws, "OK")
714 assert ok[2] is False
715 assert "future" in ok[3]
716 ws.close()
717 finally:
718 _stop_relay(proc)
719
720
721 class TestCloseSubscription:
722 """CLOSE unsubscribes — no more events after CLOSE."""
723
724 def test_close_stops_delivery(self):
725 port = 24024
726 proc, _ = _start_relay(port)
727 try:
728 ws = _ws_handshake(HOST, port)
729 time.sleep(0.1)
730
731 _ws_send_json(ws, ["REQ", "closeme", {"authors": [TEST_PUBKEY], "kinds": [1]}])
732 _drain_until(ws, "EOSE")
733 _ws_send_json(ws, ["CLOSE", "closeme"])
734 time.sleep(0.2)
735
736 # Publish an event — should NOT arrive on the closed sub.
737 ev = make_event(TEST_SECKEY, "after-close", kind=1)
738 pub_ws = _ws_handshake(HOST, port)
739 _ws_send_json(pub_ws, ["EVENT", ev])
740 _drain_until(pub_ws, "OK")
741 pub_ws.close()
742
743 # Try to read — should timeout (no event delivered).
744 try:
745 msg = _ws_recv_json(ws, timeout=1)
746 # If we get something, it should not be an EVENT for closeme.
747 if msg[0] == "EVENT":
748 assert msg[1] != "closeme", "received event on closed subscription"
749 except (socket.timeout, ConnectionError):
750 pass # expected — no message
751 ws.close()
752 finally:
753 _stop_relay(proc)
754
755
756 class TestCountEnvelope:
757 """NIP-45 COUNT returns event counts."""
758
759 def test_count_returns_number(self):
760 port = 24025
761 proc, _ = _start_relay(port)
762 try:
763 ws = _ws_handshake(HOST, port)
764 time.sleep(0.1)
765
766 # Publish a few events.
767 for i in range(3):
768 ev = make_event(TEST_SECKEY, f"count-{i}", kind=1,
769 created_at=int(time.time()) + i)
770 _ws_send_json(ws, ["EVENT", ev])
771 _drain_until(ws, "OK")
772
773 _ws_send_json(ws, ["COUNT", "c1", {"authors": [TEST_PUBKEY], "kinds": [1]}])
774 msg = _drain_until(ws, "COUNT")
775 assert msg[1] == "c1"
776 count = msg[2] if isinstance(msg[2], int) else msg[2].get("count", 0)
777 assert count >= 3
778 ws.close()
779 finally:
780 _stop_relay(proc)
781