router.go raw
1 package main
2
3 import (
4 "common/helpers"
5 "common/jsbridge/sw"
6 "common/nostr"
7 )
8
9 // Subscription Router — central dispatcher for relay operations.
10 // Owns client subscriptions, filter matching, message dispatch.
11
12 var (
13 clientSubs map[string]*clientSub
14 proxySubs map[string]*proxySub
15 )
16
17 type clientSub struct {
18 filters []*nostr.Filter
19 filterRaw string
20 clientID string
21 }
22
23 type proxySub struct {
24 remoteIDs map[string]bool
25 relayCount int
26 eoseCount int
27 timer sw.Timer
28 done bool
29 }
30
31 func initRouter() {
32 clientSubs = make(map[string]*clientSub)
33 proxySubs = make(map[string]*proxySub)
34 }
35
36 // --- REQ / CLOSE / EVENT ---
37
38 func routerReq(clientID, subID, filterRaw string) {
39 f := nostr.ParseFilter(filterRaw)
40 if f == nil {
41 sw.Log("relay-sw: REQ parse filter FAILED")
42 return
43 }
44 clientSubs[subID] = &clientSub{filters: []*nostr.Filter{f}, filterRaw: filterRaw, clientID: clientID}
45
46 cacheQuery(filterRaw, func(eventsJSON string) {
47 events := nostr.ParseEventsJSON(eventsJSON)
48 for _, ev := range events {
49 fwd(clientID, "[\"EVENT\","+jstr(subID)+","+ev.ToJSON()+"]")
50 }
51 fwd(clientID, "[\"EOSE\","+jstr(subID)+"]")
52 })
53 }
54
55 func routerClose(subID string) {
56 delete(clientSubs, subID)
57 routerCleanupProxy(subID)
58 }
59
60 func routerPublish(clientID, eventRaw string) {
61 ev := nostr.ParseEvent(eventRaw)
62 if ev == nil {
63 return
64 }
65
66 cacheStore(eventRaw, func(saved bool) {
67 if saved {
68 pushToMatchingSubs(ev)
69 }
70 })
71
72 relayPublish(ev)
73 fwd(clientID, "[\"OK\","+jstr(ev.ID)+",true,\"\"]")
74 }
75
76 func routerPublishToRelays(eventRaw string, relayURLs []string) {
77 ev := nostr.ParseEvent(eventRaw)
78 if ev == nil {
79 sw.Log("relay: publishToRelays PARSE FAILED len=" + helpers.Itoa(int64(len(eventRaw))))
80 return
81 }
82 sw.Log("relay: publishToRelays kind=" + helpers.Itoa(int64(ev.Kind)) + " id=" + ev.ID[:16] + " relays=" + helpers.Itoa(int64(len(relayURLs))))
83 cacheStore(eventRaw, func(saved bool) {
84 if saved {
85 pushToMatchingSubs(ev)
86 }
87 })
88 for _, url := range relayURLs {
89 c := getConn(url)
90 sw.Log("relay: publish to " + url + " open=" + boolStr(c.IsOpen()))
91 c.Publish(ev)
92 }
93 }
94
95 // --- PROXY subscriptions ---
96
97 func routerProxy(clientID, subID, filterRaw string, relayURLs []string) {
98 routerCleanupProxy(subID)
99
100 filters := parseFilters(filterRaw)
101 if len(filters) == 0 {
102 sw.Log("relay-sw: PROXY parse filter FAILED")
103 return
104 }
105 clientSubs[subID] = &clientSub{filters: filters, filterRaw: filterRaw, clientID: clientID}
106
107 // Skip IDB cache for PROXY subscriptions — marmot needs fresh relay data.
108 // IDB may hold stale key packages from a previous bridge session (bridge
109 // restarts generate a new kpp without a version bump, so the epoch check
110 // doesn't flush them). Serving a stale key package causes the WASM to
111 // create a Welcome with the wrong KeyPackageRef → bridge can't decrypt.
112
113 remoteIDs := make(map[string]bool)
114 base := "p_" + subID + "_"
115
116 proxySubs[subID] = &proxySub{
117 remoteIDs: remoteIDs,
118 relayCount: len(relayURLs),
119 }
120
121 for _, url := range relayURLs {
122 suffix := urlSuffix(url)
123 rSubID := base + suffix
124 remoteIDs[rSubID] = true
125 c := getConn(url)
126 c.Subscribe(rSubID, filters)
127 }
128
129 proxyID := subID
130 proxySubs[subID].timer = sw.SetTimeout(10000, func() {
131 info, ok := proxySubs[proxyID]
132 if ok && !info.done {
133 info.done = true
134 if cs, ok := clientSubs[proxyID]; ok {
135 fwd(cs.clientID, "[\"EOSE\","+jstr(proxyID)+"]")
136 }
137 }
138 })
139 }
140
141 func routerCleanupProxy(proxyID string) {
142 info, ok := proxySubs[proxyID]
143 if !ok {
144 return
145 }
146 sw.ClearTimeout(info.timer)
147
148 if !info.done {
149 if cs, ok := clientSubs[proxyID]; ok {
150 fwd(cs.clientID, "[\"EOSE\","+jstr(proxyID)+"]")
151 }
152 }
153
154 for rSubID := range info.remoteIDs {
155 for _, url := range rpool.URLs() {
156 c := rpool.Get(url)
157 if c != nil && c.IsOpen() {
158 c.CloseSubscription(rSubID)
159 }
160 }
161 }
162 delete(proxySubs, proxyID)
163 delete(clientSubs, proxyID)
164 }
165
166 // --- Relay event callbacks ---
167
168 func routerOnRelayEvent(relayURL string, ev *nostr.Event) {
169 evJSON := ev.ToJSON()
170
171 pushToMatchingSubs(ev)
172
173 cacheStore(evJSON, func(saved bool) {
174 if saved {
175 relayPublishExcept(ev, relayURL)
176 }
177 })
178 }
179
180 func routerOnRelayEOSE(subID string) {
181 for proxyID, info := range proxySubs {
182 if info.remoteIDs[subID] {
183 info.eoseCount++
184 if !info.done {
185 sw.ClearTimeout(info.timer)
186 pid := proxyID
187 info.timer = sw.SetTimeout(3000, func() {
188 inf, ok := proxySubs[pid]
189 if ok && !inf.done {
190 inf.done = true
191 if cs, ok := clientSubs[pid]; ok {
192 fwd(cs.clientID, "[\"EOSE\","+jstr(pid)+"]")
193 }
194 }
195 })
196 }
197 }
198 }
199 }
200
201 func pushToMatchingSubs(ev *nostr.Event) {
202 matched := 0
203 for subID, cs := range clientSubs {
204 for _, f := range cs.filters {
205 if f.Matches(ev) {
206 matched++
207 fwd(cs.clientID, "[\"EVENT\","+jstr(subID)+","+ev.ToJSON()+"]")
208 break // one match per sub is enough
209 }
210 }
211 }
212 _ = matched
213 }
214
215 // parseFilters parses a JSON filter string that may be a single object
216 // or an array of objects. Returns a slice of parsed filters.
217 func parseFilters(raw string) []*nostr.Filter {
218 // Trim whitespace
219 s := raw
220 for len(s) > 0 && (s[0] == ' ' || s[0] == '\t' || s[0] == '\n') {
221 s = s[1:]
222 }
223 if len(s) == 0 {
224 return nil
225 }
226 // Single filter object
227 if s[0] == '{' {
228 f := nostr.ParseFilter(s)
229 if f == nil {
230 return nil
231 }
232 return []*nostr.Filter{f}
233 }
234 // Array of filter objects
235 if s[0] != '[' {
236 return nil
237 }
238 s = s[1:] // skip '['
239 var filters []*nostr.Filter
240 for {
241 for len(s) > 0 && (s[0] == ' ' || s[0] == ',' || s[0] == '\t' || s[0] == '\n') {
242 s = s[1:]
243 }
244 if len(s) == 0 || s[0] == ']' {
245 break
246 }
247 if s[0] != '{' {
248 break
249 }
250 // Find matching '}'
251 depth := 0
252 end := 0
253 for i := 0; i < len(s); i++ {
254 if s[i] == '{' {
255 depth++
256 } else if s[i] == '}' {
257 depth--
258 if depth == 0 {
259 end = i + 1
260 break
261 }
262 }
263 }
264 if end == 0 {
265 break
266 }
267 f := nostr.ParseFilter(s[:end])
268 if f != nil {
269 filters = append(filters, f)
270 }
271 s = s[end:]
272 }
273 return filters
274 }
275
276 // --- Signing ---
277
278 func routerSign(clientID, requestID, eventRaw string) {
279 if myPubkey == "" {
280 fwd(clientID, "[\"SIGN_ERROR\","+jstr(requestID)+",\"no key\"]")
281 return
282 }
283 ev := nostr.ParseEvent(eventRaw)
284 if ev == nil {
285 fwd(clientID, "[\"SIGN_ERROR\","+jstr(requestID)+",\"parse error\"]")
286 return
287 }
288 // Proxy signing through crypto SW -> signer extension.
289 cryptoProxy("signEvent", "", eventRaw, func(signedJSON, errMsg string) {
290 if errMsg != "" || signedJSON == "" {
291 fwd(clientID, "[\"SIGN_ERROR\","+jstr(requestID)+","+jstr(errMsg)+"]")
292 return
293 }
294 fwd(clientID, "[\"SIGNED\","+jstr(requestID)+","+signedJSON+"]")
295 })
296 }
297
298 // --- DM routing ---
299
300 func routerSaveDMRecord(rec *DMRecord) {
301 dmJSON := rec.ToJSON()
302 cacheSaveDM(dmJSON, func(result string) {
303 if result != "duplicate" {
304 fwdAll("[\"DM_RECEIVED\"," + dmJSON + "]")
305 }
306 })
307 }
308
309 func routerDMList(clientID string) {
310 cacheGetConversationList(func(listJSON string) {
311 fwd(clientID, "[\"DM_LIST\","+listJSON+"]")
312 })
313 }
314
315 func routerDMHistory(clientID, peer string, limit int, until int64) {
316 if limit <= 0 {
317 limit = 50
318 }
319 cacheQueryDMs(peer, limit, until, func(msgsJSON string) {
320 fwd(clientID, "[\"DM_HISTORY\","+jstr(peer)+","+msgsJSON+"]")
321 })
322 }
323
324 // --- Broadcast ---
325
326 func routerBroadcast(clientID, pubkey string, relayURLs []string) {
327 filterJSON := "{\"authors\":[" + jstr(pubkey) + "],\"kinds\":[0,3,10002,10050,10051]}"
328 cacheQuery(filterJSON, func(eventsJSON string) {
329 events := nostr.ParseEventsJSON(eventsJSON)
330 byKind := make(map[int]*nostr.Event)
331 for _, ev := range events {
332 if prev, ok := byKind[ev.Kind]; !ok || ev.CreatedAt > prev.CreatedAt {
333 byKind[ev.Kind] = ev
334 }
335 }
336
337 userRelays := relayURLs
338 if relayEv, ok := byKind[10002]; ok {
339 userRelays = nil
340 for _, t := range relayEv.Tags.GetAll("r") {
341 userRelays = append(userRelays, t.Value())
342 }
343 }
344 if len(userRelays) == 0 {
345 userRelays = writeRelays
346 }
347
348 if _, ok := byKind[10050]; !ok && myPubkey != "" && len(userRelays) > 0 {
349 createRelayListEventAsync(10050, userRelays, func(ev *nostr.Event) {
350 if ev != nil {
351 cacheStore(ev.ToJSON(), func(_ bool) {})
352 for _, url := range relayURLs {
353 getConn(url).Publish(ev)
354 }
355 }
356 })
357 }
358 if _, ok := byKind[10051]; !ok && myPubkey != "" && len(userRelays) > 0 {
359 createRelayListEventAsync(10051, userRelays, func(ev *nostr.Event) {
360 if ev != nil {
361 cacheStore(ev.ToJSON(), func(_ bool) {})
362 for _, url := range relayURLs {
363 getConn(url).Publish(ev)
364 }
365 }
366 })
367 }
368
369 count := 0
370 for _, ev := range byKind {
371 for _, url := range relayURLs {
372 getConn(url).Publish(ev)
373 }
374 count++
375 }
376 fwd(clientID, "[\"BROADCAST_DONE\","+helpers.Itoa(int64(count))+","+helpers.Itoa(int64(len(relayURLs)))+"]")
377 })
378 }
379
380 func createRelayListEventAsync(kind int, relays []string, cb func(ev *nostr.Event)) {
381 tagKey := "relay"
382 var tags nostr.Tags
383 for _, r := range relays {
384 tags = append(tags, nostr.Tag{tagKey, r})
385 }
386 ev := &nostr.Event{
387 Kind: kind,
388 PubKey: myPubkey,
389 Content: "",
390 Tags: tags,
391 CreatedAt: sw.NowSeconds(),
392 }
393 cryptoProxy("signEvent", "", ev.ToJSON(), func(signedJSON, errMsg string) {
394 if errMsg != "" || signedJSON == "" {
395 cb(nil)
396 return
397 }
398 signed := nostr.ParseEvent(signedJSON)
399 cb(signed)
400 })
401 }
402