pool.js raw
1 // pool.js — WebSocket connection pool, relay messaging, AUTH, reconnect
2
3 import { signEvent, schnorr, sha256, bytesToHex } from './crypto.js'
4 import { saveEvent } from './db.js'
5
6 const MAX_CONNECTIONS = 16
7
8 // url -> { ws, lastUsed, queue, subCount }
9 export const pool = new Map()
10
11 // module-level state set by sw.js
12 let _writeRelays = []
13 let _secretKey = null
14 let _secretKeyHex = null
15 let _myPubkey = null
16
17 export function setPoolState({ writeRelays, secretKey, secretKeyHex, myPubkey }) {
18 if (writeRelays !== undefined) _writeRelays = writeRelays
19 if (secretKey !== undefined) _secretKey = secretKey
20 if (secretKeyHex !== undefined) _secretKeyHex = secretKeyHex
21 if (myPubkey !== undefined) _myPubkey = myPubkey
22 }
23
24 // ─── reconnect handlers ─────────────────────────────────────────────
25
26 const reconnectHandlers = []
27
28 export function onReconnect(fn) {
29 reconnectHandlers.push(fn)
30 }
31
32 function fireReconnect(url) {
33 for (const fn of reconnectHandlers) {
34 try { fn(url) } catch (e) { console.warn('reconnect handler error:', e) }
35 }
36 }
37
38 // ─── event/message callbacks (set by sw.js) ─────────────────────────
39
40 let _onEvent = null
41 let _onDMEvent = null
42 let _broadcastToClients = null
43
44 export function setCallbacks({ onEvent, onDMEvent, broadcastToClients }) {
45 if (onEvent) _onEvent = onEvent
46 if (onDMEvent) _onDMEvent = onDMEvent
47 if (broadcastToClients) _broadcastToClients = broadcastToClients
48 }
49
50 function broadcast(msg) {
51 if (_broadcastToClients) _broadcastToClients(msg)
52 }
53
54 // ─── connection pool ────────────────────────────────────────────────
55
56 function getConnection(url) {
57 if (pool.has(url)) {
58 const conn = pool.get(url)
59 conn.lastUsed = Date.now()
60 return conn.ws
61 }
62
63 if (pool.size >= MAX_CONNECTIONS) {
64 let oldest = null
65 let oldestTime = Infinity
66 for (const [u, c] of pool) {
67 if (c.lastUsed < oldestTime) { oldest = u; oldestTime = c.lastUsed }
68 }
69 if (oldest) {
70 pool.get(oldest).ws.close()
71 pool.delete(oldest)
72 }
73 }
74
75 const ws = new WebSocket(url)
76 const conn = { ws, lastUsed: Date.now(), queue: null }
77
78 ws.onopen = () => {
79 if (conn.queue) {
80 for (const msg of conn.queue) ws.send(msg)
81 conn.queue = null
82 }
83 fireReconnect(url)
84 }
85
86 ws.onmessage = (e) => {
87 let msg
88 try { msg = JSON.parse(e.data) } catch { return }
89 handleRelayMessage(url, msg)
90 }
91
92 ws.onclose = () => pool.delete(url)
93 ws.onerror = () => pool.delete(url)
94
95 pool.set(url, conn)
96 return ws
97 }
98
99 // ─── filter sanitizer ───────────────────────────────────────────────
100
101 const HEX_FILTER_KEYS = new Set(['ids', 'authors', '#e', '#p', '#a', '#d'])
102
103 function sanitizeFilter(msg) {
104 if (msg[0] !== 'REQ') return msg
105 const out = [msg[0], msg[1]]
106 for (let i = 2; i < msg.length; i++) {
107 const filter = msg[i]
108 if (!filter || typeof filter !== 'object') { out.push(filter); continue }
109 const clean = {}
110 let hasRequiredField = false
111 for (const [k, v] of Object.entries(filter)) {
112 if (HEX_FILTER_KEYS.has(k) && Array.isArray(v)) {
113 const valid = v.filter((s) => {
114 if (typeof s !== 'string') return false
115 if (s.length % 2 !== 0) { console.warn('dropped odd-len hex', k, s.length, s.slice(0, 16)); return false }
116 if (!/^[0-9a-f]+$/i.test(s)) { console.warn('dropped non-hex', k, s.slice(0, 16)); return false }
117 return true
118 })
119 if (valid.length) {
120 clean[k] = valid
121 hasRequiredField = true
122 }
123 // FIX #7: if ALL hex values were invalid, skip this key entirely
124 // but track that we had a required field that's now empty
125 } else {
126 clean[k] = v
127 if (k === 'kinds' || k === 'since' || k === 'until' || k === 'limit') {
128 // these are constraints, not required fields
129 } else if (!k.startsWith('_')) {
130 hasRequiredField = true
131 }
132 }
133 }
134 // only include filter if it has at least one field that can match something
135 if (hasRequiredField || Object.keys(clean).some(k => k === 'kinds')) {
136 out.push(clean)
137 } else {
138 console.warn('dropped empty filter after sanitization')
139 }
140 }
141 // if no filters remain after sanitization, don't send the REQ
142 if (out.length <= 2) return null
143 return out
144 }
145
146 // ─── send to relay ──────────────────────────────────────────────────
147
148 export function sendToRelay(url, msg) {
149 msg = sanitizeFilter(msg)
150 if (!msg) return // sanitizer dropped all filters
151 const ws = getConnection(url)
152 const data = JSON.stringify(msg)
153 if (ws.readyState === WebSocket.OPEN) {
154 ws.send(data)
155 } else {
156 const conn = pool.get(url)
157 if (conn) {
158 conn.queue = conn.queue || []
159 conn.queue.push(data)
160 }
161 }
162 }
163
164 // ─── proxy subscriptions ────────────────────────────────────────────
165
166 // proxyId -> { remoteSubIds, relayCount, eoseCount, timeout }
167 export const proxySubs = new Map()
168 // sub_id -> { filter, clientId }
169 export const subs = new Map()
170
171 export async function handleProxy(clientId, subId, filter, ...relayUrls) {
172 if (proxySubs.has(subId)) cleanupProxy(subId)
173
174 subs.set(subId, { filter, clientId })
175
176 const remoteSubIds = new Set()
177 const remoteSubId = 'p_' + subId + '_' + Math.random().toString(36).slice(2, 6)
178
179 proxySubs.set(subId, {
180 remoteSubIds,
181 relayCount: relayUrls.length,
182 eoseCount: 0,
183 timeout: setTimeout(() => cleanupProxy(subId), 10000),
184 })
185
186 for (const url of relayUrls) {
187 const rSubId = remoteSubId + '_' + url.replace(/\W/g, '').slice(-8)
188 remoteSubIds.add(rSubId)
189 sendToRelay(url, ['REQ', rSubId, filter])
190 }
191 }
192
193 export async function cleanupProxy(proxyId) {
194 const info = proxySubs.get(proxyId)
195 if (!info) return
196 clearTimeout(info.timeout)
197 const sub = subs.get(proxyId)
198 if (sub) {
199 const client = await self.clients.get(sub.clientId)
200 if (client) client.postMessage(['EOSE', proxyId])
201 }
202 for (const rSubId of info.remoteSubIds) {
203 for (const [url, conn] of pool) {
204 if (conn.ws.readyState === WebSocket.OPEN) {
205 conn.ws.send(JSON.stringify(['CLOSE', rSubId]))
206 }
207 }
208 }
209 proxySubs.delete(proxyId)
210 subs.delete(proxyId)
211 }
212
213 // ─── relay message handler ──────────────────────────────────────────
214
215 async function handleRelayMessage(relayUrl, msg) {
216 const [type, ...args] = msg
217
218 if (type === 'EVENT') {
219 const [subId, event] = args
220 const saved = await saveEvent(event)
221 if (saved && _onEvent) await _onEvent(event, subId)
222 // propagate to write relays (skip source, skip DMs)
223 // FIX #3: use writeRelays, not all pool connections
224 if (saved && event.kind !== 4 && event.kind !== 1059) {
225 for (const wr of _writeRelays) {
226 if (wr !== relayUrl) sendToRelay(wr, ['EVENT', event])
227 }
228 }
229 // process DMs regardless of saved (might already have event but need to decrypt)
230 if ((event.kind === 4 || event.kind === 1059) && _onDMEvent) {
231 _onDMEvent(event)
232 }
233 }
234
235 if (type === 'EOSE') {
236 const [subId] = args
237 for (const [proxyId, info] of proxySubs) {
238 if (info.remoteSubIds?.has(subId)) {
239 info.eoseCount = (info.eoseCount || 0) + 1
240 if (info.eoseCount >= info.relayCount) {
241 cleanupProxy(proxyId)
242 }
243 }
244 }
245 }
246
247 if (type === 'OK') {
248 const [eventId, success, message] = args
249 broadcast(['OK', eventId, success, message || ''])
250 }
251
252 if (type === 'NOTICE') {
253 const [message] = args
254 broadcast(['NOTICE', `[${relayUrl}] ${message}`])
255 }
256
257 // FIX #11: NIP-42 AUTH challenge
258 if (type === 'AUTH') {
259 const [challenge] = args
260 if (_secretKey && _myPubkey) {
261 const authEvent = signEvent({
262 kind: 22242,
263 content: '',
264 tags: [
265 ['relay', relayUrl],
266 ['challenge', challenge],
267 ],
268 created_at: Math.floor(Date.now() / 1000),
269 pubkey: _myPubkey,
270 }, _secretKey)
271 const conn = pool.get(relayUrl)
272 if (conn?.ws?.readyState === WebSocket.OPEN) {
273 conn.ws.send(JSON.stringify(['AUTH', authEvent]))
274 }
275 }
276 }
277 }
278
279 // ─── publish event ──────────────────────────────────────────────────
280
281 // FIX #3: publish to writeRelays only, not all connected relays
282 export async function handleEvent(clientId, event) {
283 const saved = await saveEvent(event)
284 if (saved && _onEvent) await _onEvent(event, null)
285
286 for (const url of _writeRelays) {
287 sendToRelay(url, ['EVENT', event])
288 }
289
290 const client = await self.clients.get(clientId)
291 if (client) client.postMessage(['OK', event.id, true, ''])
292 }
293
294 // ─── relay info (NIP-11) ────────────────────────────────────────────
295
296 const relayInfoCache = new Map()
297 const RELAY_INFO_TTL = 3600000
298
299 export async function handleRelayInfo(clientId, relayUrl) {
300 const cached = relayInfoCache.get(relayUrl)
301 if (cached && Date.now() - cached.ts < RELAY_INFO_TTL) {
302 const client = await self.clients.get(clientId)
303 if (client) client.postMessage(['RELAY_INFO', relayUrl, cached.info])
304 return
305 }
306
307 try {
308 const httpUrl = relayUrl.replace('wss://', 'https://').replace('ws://', 'http://')
309 const resp = await fetch(httpUrl, {
310 headers: { 'Accept': 'application/nostr+json' }
311 })
312 const info = await resp.json()
313 relayInfoCache.set(relayUrl, { info, ts: Date.now() })
314 const client = await self.clients.get(clientId)
315 if (client) client.postMessage(['RELAY_INFO', relayUrl, info])
316 } catch (err) {
317 const client = await self.clients.get(clientId)
318 if (client) client.postMessage(['RELAY_INFO', relayUrl, null])
319 }
320 }
321