pool.js raw

   1  // pool.js — WebSocket connection pool, relay messaging, AUTH, reconnect
   2  
   3  import { signEvent, schnorr, sha256, bytesToHex } from './crypto.js'
   4  import { saveEvent } from './db.js'
   5  
   6  const MAX_CONNECTIONS = 16
   7  
   8  // url -> { ws, lastUsed, queue, subCount }
   9  export const pool = new Map()
  10  
  11  // module-level state set by sw.js
  12  let _writeRelays = []
  13  let _secretKey = null
  14  let _secretKeyHex = null
  15  let _myPubkey = null
  16  
  17  export function setPoolState({ writeRelays, secretKey, secretKeyHex, myPubkey }) {
  18    if (writeRelays !== undefined) _writeRelays = writeRelays
  19    if (secretKey !== undefined) _secretKey = secretKey
  20    if (secretKeyHex !== undefined) _secretKeyHex = secretKeyHex
  21    if (myPubkey !== undefined) _myPubkey = myPubkey
  22  }
  23  
  24  // ─── reconnect handlers ─────────────────────────────────────────────
  25  
  26  const reconnectHandlers = []
  27  
  28  export function onReconnect(fn) {
  29    reconnectHandlers.push(fn)
  30  }
  31  
  32  function fireReconnect(url) {
  33    for (const fn of reconnectHandlers) {
  34      try { fn(url) } catch (e) { console.warn('reconnect handler error:', e) }
  35    }
  36  }
  37  
  38  // ─── event/message callbacks (set by sw.js) ─────────────────────────
  39  
  40  let _onEvent = null
  41  let _onDMEvent = null
  42  let _broadcastToClients = null
  43  
  44  export function setCallbacks({ onEvent, onDMEvent, broadcastToClients }) {
  45    if (onEvent) _onEvent = onEvent
  46    if (onDMEvent) _onDMEvent = onDMEvent
  47    if (broadcastToClients) _broadcastToClients = broadcastToClients
  48  }
  49  
  50  function broadcast(msg) {
  51    if (_broadcastToClients) _broadcastToClients(msg)
  52  }
  53  
  54  // ─── connection pool ────────────────────────────────────────────────
  55  
  56  function getConnection(url) {
  57    if (pool.has(url)) {
  58      const conn = pool.get(url)
  59      conn.lastUsed = Date.now()
  60      return conn.ws
  61    }
  62  
  63    if (pool.size >= MAX_CONNECTIONS) {
  64      let oldest = null
  65      let oldestTime = Infinity
  66      for (const [u, c] of pool) {
  67        if (c.lastUsed < oldestTime) { oldest = u; oldestTime = c.lastUsed }
  68      }
  69      if (oldest) {
  70        pool.get(oldest).ws.close()
  71        pool.delete(oldest)
  72      }
  73    }
  74  
  75    const ws = new WebSocket(url)
  76    const conn = { ws, lastUsed: Date.now(), queue: null }
  77  
  78    ws.onopen = () => {
  79      if (conn.queue) {
  80        for (const msg of conn.queue) ws.send(msg)
  81        conn.queue = null
  82      }
  83      fireReconnect(url)
  84    }
  85  
  86    ws.onmessage = (e) => {
  87      let msg
  88      try { msg = JSON.parse(e.data) } catch { return }
  89      handleRelayMessage(url, msg)
  90    }
  91  
  92    ws.onclose = () => pool.delete(url)
  93    ws.onerror = () => pool.delete(url)
  94  
  95    pool.set(url, conn)
  96    return ws
  97  }
  98  
  99  // ─── filter sanitizer ───────────────────────────────────────────────
 100  
 101  const HEX_FILTER_KEYS = new Set(['ids', 'authors', '#e', '#p', '#a', '#d'])
 102  
 103  function sanitizeFilter(msg) {
 104    if (msg[0] !== 'REQ') return msg
 105    const out = [msg[0], msg[1]]
 106    for (let i = 2; i < msg.length; i++) {
 107      const filter = msg[i]
 108      if (!filter || typeof filter !== 'object') { out.push(filter); continue }
 109      const clean = {}
 110      let hasRequiredField = false
 111      for (const [k, v] of Object.entries(filter)) {
 112        if (HEX_FILTER_KEYS.has(k) && Array.isArray(v)) {
 113          const valid = v.filter((s) => {
 114            if (typeof s !== 'string') return false
 115            if (s.length % 2 !== 0) { console.warn('dropped odd-len hex', k, s.length, s.slice(0, 16)); return false }
 116            if (!/^[0-9a-f]+$/i.test(s)) { console.warn('dropped non-hex', k, s.slice(0, 16)); return false }
 117            return true
 118          })
 119          if (valid.length) {
 120            clean[k] = valid
 121            hasRequiredField = true
 122          }
 123          // FIX #7: if ALL hex values were invalid, skip this key entirely
 124          // but track that we had a required field that's now empty
 125        } else {
 126          clean[k] = v
 127          if (k === 'kinds' || k === 'since' || k === 'until' || k === 'limit') {
 128            // these are constraints, not required fields
 129          } else if (!k.startsWith('_')) {
 130            hasRequiredField = true
 131          }
 132        }
 133      }
 134      // only include filter if it has at least one field that can match something
 135      if (hasRequiredField || Object.keys(clean).some(k => k === 'kinds')) {
 136        out.push(clean)
 137      } else {
 138        console.warn('dropped empty filter after sanitization')
 139      }
 140    }
 141    // if no filters remain after sanitization, don't send the REQ
 142    if (out.length <= 2) return null
 143    return out
 144  }
 145  
 146  // ─── send to relay ──────────────────────────────────────────────────
 147  
 148  export function sendToRelay(url, msg) {
 149    msg = sanitizeFilter(msg)
 150    if (!msg) return // sanitizer dropped all filters
 151    const ws = getConnection(url)
 152    const data = JSON.stringify(msg)
 153    if (ws.readyState === WebSocket.OPEN) {
 154      ws.send(data)
 155    } else {
 156      const conn = pool.get(url)
 157      if (conn) {
 158        conn.queue = conn.queue || []
 159        conn.queue.push(data)
 160      }
 161    }
 162  }
 163  
 164  // ─── proxy subscriptions ────────────────────────────────────────────
 165  
 166  // proxyId -> { remoteSubIds, relayCount, eoseCount, timeout }
 167  export const proxySubs = new Map()
 168  // sub_id -> { filter, clientId }
 169  export const subs = new Map()
 170  
 171  export async function handleProxy(clientId, subId, filter, ...relayUrls) {
 172    if (proxySubs.has(subId)) cleanupProxy(subId)
 173  
 174    subs.set(subId, { filter, clientId })
 175  
 176    const remoteSubIds = new Set()
 177    const remoteSubId = 'p_' + subId + '_' + Math.random().toString(36).slice(2, 6)
 178  
 179    proxySubs.set(subId, {
 180      remoteSubIds,
 181      relayCount: relayUrls.length,
 182      eoseCount: 0,
 183      timeout: setTimeout(() => cleanupProxy(subId), 10000),
 184    })
 185  
 186    for (const url of relayUrls) {
 187      const rSubId = remoteSubId + '_' + url.replace(/\W/g, '').slice(-8)
 188      remoteSubIds.add(rSubId)
 189      sendToRelay(url, ['REQ', rSubId, filter])
 190    }
 191  }
 192  
 193  export async function cleanupProxy(proxyId) {
 194    const info = proxySubs.get(proxyId)
 195    if (!info) return
 196    clearTimeout(info.timeout)
 197    const sub = subs.get(proxyId)
 198    if (sub) {
 199      const client = await self.clients.get(sub.clientId)
 200      if (client) client.postMessage(['EOSE', proxyId])
 201    }
 202    for (const rSubId of info.remoteSubIds) {
 203      for (const [url, conn] of pool) {
 204        if (conn.ws.readyState === WebSocket.OPEN) {
 205          conn.ws.send(JSON.stringify(['CLOSE', rSubId]))
 206        }
 207      }
 208    }
 209    proxySubs.delete(proxyId)
 210    subs.delete(proxyId)
 211  }
 212  
 213  // ─── relay message handler ──────────────────────────────────────────
 214  
 215  async function handleRelayMessage(relayUrl, msg) {
 216    const [type, ...args] = msg
 217  
 218    if (type === 'EVENT') {
 219      const [subId, event] = args
 220      const saved = await saveEvent(event)
 221      if (saved && _onEvent) await _onEvent(event, subId)
 222      // propagate to write relays (skip source, skip DMs)
 223      // FIX #3: use writeRelays, not all pool connections
 224      if (saved && event.kind !== 4 && event.kind !== 1059) {
 225        for (const wr of _writeRelays) {
 226          if (wr !== relayUrl) sendToRelay(wr, ['EVENT', event])
 227        }
 228      }
 229      // process DMs regardless of saved (might already have event but need to decrypt)
 230      if ((event.kind === 4 || event.kind === 1059) && _onDMEvent) {
 231        _onDMEvent(event)
 232      }
 233    }
 234  
 235    if (type === 'EOSE') {
 236      const [subId] = args
 237      for (const [proxyId, info] of proxySubs) {
 238        if (info.remoteSubIds?.has(subId)) {
 239          info.eoseCount = (info.eoseCount || 0) + 1
 240          if (info.eoseCount >= info.relayCount) {
 241            cleanupProxy(proxyId)
 242          }
 243        }
 244      }
 245    }
 246  
 247    if (type === 'OK') {
 248      const [eventId, success, message] = args
 249      broadcast(['OK', eventId, success, message || ''])
 250    }
 251  
 252    if (type === 'NOTICE') {
 253      const [message] = args
 254      broadcast(['NOTICE', `[${relayUrl}] ${message}`])
 255    }
 256  
 257    // FIX #11: NIP-42 AUTH challenge
 258    if (type === 'AUTH') {
 259      const [challenge] = args
 260      if (_secretKey && _myPubkey) {
 261        const authEvent = signEvent({
 262          kind: 22242,
 263          content: '',
 264          tags: [
 265            ['relay', relayUrl],
 266            ['challenge', challenge],
 267          ],
 268          created_at: Math.floor(Date.now() / 1000),
 269          pubkey: _myPubkey,
 270        }, _secretKey)
 271        const conn = pool.get(relayUrl)
 272        if (conn?.ws?.readyState === WebSocket.OPEN) {
 273          conn.ws.send(JSON.stringify(['AUTH', authEvent]))
 274        }
 275      }
 276    }
 277  }
 278  
 279  // ─── publish event ──────────────────────────────────────────────────
 280  
 281  // FIX #3: publish to writeRelays only, not all connected relays
 282  export async function handleEvent(clientId, event) {
 283    const saved = await saveEvent(event)
 284    if (saved && _onEvent) await _onEvent(event, null)
 285  
 286    for (const url of _writeRelays) {
 287      sendToRelay(url, ['EVENT', event])
 288    }
 289  
 290    const client = await self.clients.get(clientId)
 291    if (client) client.postMessage(['OK', event.id, true, ''])
 292  }
 293  
 294  // ─── relay info (NIP-11) ────────────────────────────────────────────
 295  
 296  const relayInfoCache = new Map()
 297  const RELAY_INFO_TTL = 3600000
 298  
 299  export async function handleRelayInfo(clientId, relayUrl) {
 300    const cached = relayInfoCache.get(relayUrl)
 301    if (cached && Date.now() - cached.ts < RELAY_INFO_TTL) {
 302      const client = await self.clients.get(clientId)
 303      if (client) client.postMessage(['RELAY_INFO', relayUrl, cached.info])
 304      return
 305    }
 306  
 307    try {
 308      const httpUrl = relayUrl.replace('wss://', 'https://').replace('ws://', 'http://')
 309      const resp = await fetch(httpUrl, {
 310        headers: { 'Accept': 'application/nostr+json' }
 311      })
 312      const info = await resp.json()
 313      relayInfoCache.set(relayUrl, { info, ts: Date.now() })
 314      const client = await self.clients.get(clientId)
 315      if (client) client.postMessage(['RELAY_INFO', relayUrl, info])
 316    } catch (err) {
 317      const client = await self.clients.get(clientId)
 318      if (client) client.postMessage(['RELAY_INFO', relayUrl, null])
 319    }
 320  }
 321