"""End-to-end test: two users exchange MLS DMs through a local relay without auth. Requires: relay binary, WASM app built, geckodriver in PATH. Run: ./test/run.sh -k mls_dm --headed (visible) or without --headed (headless) """ import json import os import signal import subprocess import time import pytest from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.firefox.options import Options from selenium.webdriver.firefox.service import Service from selenium.webdriver.support.ui import WebDriverWait from conftest import Helpers, RELAY_BIN, STATIC_DIR MLS_DATA_DIR = "/tmp/smesh-mls-test" MLS_PORT = 23335 def _bech32_encode_nsec(seckey_bytes): """Encode 32 bytes as bech32 nsec1...""" CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" def _convertbits(data, frombits, tobits, pad=True): acc, bits, ret = 0, 0, [] maxv = (1 << tobits) - 1 for value in data: acc = (acc << frombits) | value bits += frombits while bits >= tobits: bits -= tobits ret.append((acc >> bits) & maxv) if pad and bits: ret.append((acc << (tobits - bits)) & maxv) return ret def _bech32_polymod(values): GEN = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3] chk = 1 for v in values: b = chk >> 25 chk = ((chk & 0x1ffffff) << 5) ^ v for i in range(5): chk ^= GEN[i] if ((b >> i) & 1) else 0 return chk def _bech32_create_checksum(hrp, data): values = [ord(c) >> 5 for c in hrp] + [0] + [ord(c) & 31 for c in hrp] + data polymod = _bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1 return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] hrp = "nsec" data5 = _convertbits(list(seckey_bytes), 8, 5) checksum = _bech32_create_checksum(hrp, data5) return hrp + "1" + "".join(CHARSET[d] for d in data5 + checksum) def _start_relay(): """Start relay with ORLY_MARMOT_OPEN=true, return info dict.""" if not os.path.exists(RELAY_BIN): pytest.skip(f"Relay binary not found: {RELAY_BIN}") if os.path.exists(MLS_DATA_DIR): subprocess.run(["rm", "-rf", MLS_DATA_DIR], check=True) os.makedirs(MLS_DATA_DIR, exist_ok=True) env = os.environ.copy() env["ORLY_DATA_DIR"] = MLS_DATA_DIR env["ORLY_LISTEN"] = "127.0.0.1" env["ORLY_PORT"] = str(MLS_PORT) env["ORLY_STATIC_DIR"] = STATIC_DIR env["ORLY_HTTP_GUARD_ENABLED"] = "false" env["ORLY_MARMOT_OPEN"] = "true" proc = subprocess.Popen( [RELAY_BIN], env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) import urllib.request url = f"http://127.0.0.1:{MLS_PORT}" for _ in range(30): try: urllib.request.urlopen(url, timeout=1) break except Exception: time.sleep(0.2) else: proc.kill() pytest.fail("MLS test relay failed to start") return { "proc": proc, "url": url, "ws": f"ws://127.0.0.1:{MLS_PORT}", } def _stop_relay(info): proc = info["proc"] if proc.poll() is not None: stderr = proc.stderr.read().decode(errors="replace") if stderr: print(f"\n=== MLS RELAY STDERR ===\n{stderr[-2000:]}\n=== END ===\n") return proc.send_signal(signal.SIGTERM) try: proc.wait(timeout=5) except subprocess.TimeoutExpired: proc.kill() def _make_browser(headed): opts = Options() if not headed: opts.add_argument("--headless") opts.set_preference("xpinstall.signatures.required", False) opts.set_preference("extensions.autoDisableScopes", 0) opts.set_preference("devtools.console.stdout.content", True) svc = Service(log_output="/dev/null") driver = webdriver.Firefox(options=opts, service=svc) driver.set_page_load_timeout(60) driver.set_script_timeout(30) driver.implicitly_wait(3) return driver def _setup_user(driver, relay_url, nsec): """Load app, create vault with given nsec, return hex pubkey.""" driver.get(relay_url) h = Helpers(driver) h.wait_wasm_ready() h.wait_signer_ready() status = h.vault_status() if status == "none": h.js_async(""" var cb = arguments[arguments.length - 1]; window.__smesh_create_vault(arguments[0]) .then(function(ok) { cb(ok); }) .catch(function() { cb(false); }); """, "testpass123", timeout=30) elif status == "locked": h.js_async(""" var cb = arguments[arguments.length - 1]; window.__smesh_unlock_vault(arguments[0]) .then(function(ok) { cb(ok); }) .catch(function() { cb(false); }); """, "testpass123", timeout=30) h.js_async(""" var cb = arguments[arguments.length - 1]; window.__smesh_add_identity(arguments[0]) .then(function(r) { cb(!!r); }) .catch(function() { cb(false); }); """, nsec, timeout=10) pk = h.get_public_key() assert pk and len(pk) == 64, f"Failed to get pubkey after adding identity: {pk}" return pk def _query_relay(ws_url, filt, timeout=5): """Query relay via WebSocket, return list of events.""" import websocket ws = websocket.create_connection(ws_url, timeout=timeout) ws.send(json.dumps(["REQ", "diag", filt])) events = [] deadline = time.time() + timeout while time.time() < deadline: try: raw = ws.recv() except Exception: break msg = json.loads(raw) if msg[0] == "EOSE": break if msg[0] == "EVENT": events.append(msg[2]) ws.close() return events def _poll_relay(ws_url, filt, min_count, timeout=15): """Poll relay until at least min_count events match filter.""" deadline = time.time() + timeout while time.time() < deadline: evs = _query_relay(ws_url, filt, timeout=3) if len(evs) >= min_count: return evs time.sleep(1) return _query_relay(ws_url, filt, timeout=3) def _wait_for_message(driver, text, timeout=30): """Wait for a message bubble containing text to appear.""" def _check(d): bubbles = d.find_elements( By.XPATH, "//div[contains(@style, 'inline-block') and contains(@style, 'borderRadius')]" ) for b in bubbles: if text in b.text: return True return False try: WebDriverWait(driver, timeout).until(_check) return True except Exception: return False def _navigate_to_dm(driver, relay_url, peer_hex): """Navigate to DM thread with peer using URL path.""" from nostr_helpers import _convertbits_for_bech32, _bech32_encode # Convert hex to npub peer_bytes = bytes.fromhex(peer_hex) npub = _hex_to_npub(peer_hex) driver.get(f"{relay_url}/msg/{npub}") time.sleep(1) def _hex_to_npub(hex_pubkey): """Convert hex pubkey to npub bech32.""" CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" def convertbits(data, frombits, tobits, pad=True): acc, bits, ret = 0, 0, [] maxv = (1 << tobits) - 1 for value in data: acc = (acc << frombits) | value bits += frombits while bits >= tobits: bits -= tobits ret.append((acc >> bits) & maxv) if pad and bits: ret.append((acc << (tobits - bits)) & maxv) return ret def polymod(values): GEN = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3] chk = 1 for v in values: b = chk >> 25 chk = ((chk & 0x1ffffff) << 5) ^ v for i in range(5): chk ^= GEN[i] if ((b >> i) & 1) else 0 return chk def create_checksum(hrp, data): values = [ord(c) >> 5 for c in hrp] + [0] + [ord(c) & 31 for c in hrp] + data p = polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1 return [(p >> 5 * (5 - i)) & 31 for i in range(6)] hrp = "npub" data5 = convertbits(list(bytes.fromhex(hex_pubkey)), 8, 5) checksum = create_checksum(hrp, data5) return hrp + "1" + "".join(CHARSET[d] for d in data5 + checksum) def _send_message(driver, text): """Type into compose textarea and send.""" textarea = WebDriverWait(driver, 10).until( lambda d: d.find_element(By.TAG_NAME, "textarea") ) textarea.clear() textarea.send_keys(text) textarea.send_keys(Keys.ENTER) # ── The Test ────────────────────────────────────── @pytest.fixture def mls_env(request): """Spin up relay + two browsers, yield everything, tear down.""" headed = request.config.getoption("--headed", default=False) relay = _start_relay() alice_driver = _make_browser(headed) bob_driver = _make_browser(headed) # Generate two distinct keypairs import hashlib alice_sec = hashlib.sha256(b"mls-test-alice-key").digest() bob_sec = hashlib.sha256(b"mls-test-bob-key").digest() alice_nsec = _bech32_encode_nsec(alice_sec) bob_nsec = _bech32_encode_nsec(bob_sec) yield { "relay": relay, "alice": alice_driver, "bob": bob_driver, "alice_nsec": alice_nsec, "bob_nsec": bob_nsec, } alice_driver.quit() bob_driver.quit() _stop_relay(relay) def test_mls_dm_bidirectional(mls_env): """Two users exchange MLS DMs through local relay without NIP-42 auth.""" relay = mls_env["relay"] alice = mls_env["alice"] bob = mls_env["bob"] url = relay["url"] ws = relay["ws"] # 1. Setup identities pk_alice = _setup_user(alice, url, mls_env["alice_nsec"]) pk_bob = _setup_user(bob, url, mls_env["bob_nsec"]) print(f"\nAlice pubkey: {pk_alice}") print(f"Bob pubkey: {pk_bob}") assert pk_alice != pk_bob, "Alice and Bob must have distinct keys" # 2. Both navigate to messages with each other (triggers MLS_INIT + KP publish) _navigate_to_dm(alice, url, pk_bob) _navigate_to_dm(bob, url, pk_alice) # 3. Poll for KeyPackages on relay (both should publish kind 443) kps = _poll_relay(ws, {"kinds": [443], "authors": [pk_alice, pk_bob]}, 2, timeout=20) print(f"KeyPackages on relay: {len(kps)}") if len(kps) < 2: # Diagnostic: check individually kp_a = _query_relay(ws, {"kinds": [443], "authors": [pk_alice]}) kp_b = _query_relay(ws, {"kinds": [443], "authors": [pk_bob]}) pytest.fail( f"Expected 2 KeyPackages, got {len(kps)}. " f"Alice KPs: {len(kp_a)}, Bob KPs: {len(kp_b)}" ) # 4. Alice sends message _send_message(alice, "hello from alice") print("Alice sent message") # 5. Poll for gift-wrap (Welcome) to Bob wraps = _poll_relay(ws, {"kinds": [1059], "#p": [pk_bob]}, 1, timeout=15) print(f"Gift-wraps to Bob: {len(wraps)}") if not wraps: pytest.fail("No gift-wrap (Welcome) delivered to Bob - group creation failed") # 6. Poll for kind 445 (encrypted message) msgs445 = _poll_relay(ws, {"kinds": [445]}, 1, timeout=15) print(f"Kind 445 messages: {len(msgs445)}") if not msgs445: pytest.fail("No kind 445 (MLS message) published - send failed after group creation") # 7. Bob should see the message assert _wait_for_message(bob, "hello from alice", timeout=30), \ "Bob did not receive 'hello from alice'" print("Bob received Alice's message") # 8. Bob replies _send_message(bob, "hello from bob") print("Bob sent reply") # 9. Poll for Bob's reply on relay msgs445_2 = _poll_relay(ws, {"kinds": [445]}, 2, timeout=15) print(f"Kind 445 messages after reply: {len(msgs445_2)}") # 10. Alice should see Bob's reply assert _wait_for_message(alice, "hello from bob", timeout=30), \ "Alice did not receive 'hello from bob'" print("Alice received Bob's reply") print("\nMLS DM bidirectional exchange: PASSED")