package main import ( "smesh.lol/web/common/helpers" "smesh.lol/web/common/jsbridge/sw" "smesh.lol/web/common/nostr" ) // Subscription router: central dispatcher. var ( clientSubs map[string]*clientSub proxySubs map[string]*proxySub ) type clientSub struct { filter *nostr.Filter filterRaw string clientID string } type proxySub struct { remoteIDs map[string]bool relayCount int eoseCount int timer sw.Timer done bool } func initRouter() { clientSubs = map[string]*clientSub{} proxySubs = map[string]*proxySub{} } func routeMessage(clientID string, w *mw, msgType string) { switch msgType { case "REQ": subID := w.str() filterRaw := w.raw() routerReq(clientID, subID, filterRaw) case "CLOSE": subID := w.str() routerClose(subID) case "EVENT": eventRaw := w.raw() routerPublish(clientID, eventRaw) case "PROXY": subID := w.str() filterRaw := w.raw() relayURLs := w.strs() routerProxy(clientID, subID, filterRaw, relayURLs) case "RELAY_INFO": relayURL := w.str() handleRelayInfo(clientID, relayURL) case "SET_KEY": hexKey := w.str() identitySetKey(hexKey) sendToClient(clientID, "[\"KEY_SET\"]") case "SET_PUBKEY": identitySetPubkey(w.str()) case "CLEAR_KEY": identityClearKey() writeRelays = nil case "SET_WRITE_RELAYS": all := w.strs() writeRelays = nil for _, u := range all { if isAllowedRelay(u) { writeRelays = append(writeRelays, u) } } case "SIGN": requestID := w.str() eventRaw := w.raw() routerSign(clientID, requestID, eventRaw) case "BROADCAST": pubkey := w.str() relayURLs := w.strs() routerBroadcast(clientID, pubkey, relayURLs) case "SEND_DM": recipientPubkey := w.str() content := w.str() relayURLs := w.strs() routerSendDM(clientID, recipientPubkey, content, relayURLs) case "DM_SUB": relayURLs := w.strs() routerDMSub(clientID, relayURLs) case "DM_LIST": routerDMList(clientID) case "DM_HISTORY": peer := w.str() limit := int(w.num()) if limit <= 0 || limit > 500 { limit = 50 } until := w.num() routerDMHistory(clientID, peer, limit, until) case "CRYPTO_RESULT": id := int(w.num()) result := w.str() errMsg := w.str() if fn, ok := cryptoCBs[id]; ok { delete(cryptoCBs, id) fn(result, errMsg) } } } // --- REQ / CLOSE / EVENT --- func routerReq(clientID, subID, filterRaw string) { f := nostr.ParseFilter(filterRaw) if f == nil { return } clientSubs[subID] = &clientSub{filter: f, filterRaw: filterRaw, clientID: clientID} cacheQuery(filterRaw, func(eventsJSON string) { events := nostr.ParseEventsJSON(eventsJSON) for _, ev := range events { sendToClient(clientID, "[\"EVENT\","+jstr(subID)+","+ev.ToJSON()+"]") } sendToClient(clientID, "[\"EOSE\","+jstr(subID)+"]") }) } func routerClose(subID string) { delete(clientSubs, subID) routerCleanupProxy(subID) } func routerPublish(clientID, eventRaw string) { ev := nostr.ParseEvent(eventRaw) if ev == nil { return } cacheStore(eventRaw, func(saved bool) { if saved { pushToMatchingSubs(ev) } }) relayPublish(ev) sendToClient(clientID, "[\"OK\","+jstr(ev.ID)+",true,\"\"]") } // --- PROXY subscriptions --- func isAllowedRelay(url string) bool { if len(url) >= 6 && url[:6] == "wss://" { return true } if len(url) >= 16 && url[:16] == "ws://localhost:" { return true } if len(url) >= 15 && url[:15] == "ws://127.0.0.1:" { return true } return false } func isHex(s string) bool { for i := 0; i < len(s); i++ { c := s[i] if !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f')) { return false } } return true } func routerProxy(clientID, subID, filterRaw string, relayURLs []string) { routerCleanupProxy(subID) f := nostr.ParseFilter(filterRaw) if f == nil { return } clientSubs[subID] = &clientSub{filter: f, filterRaw: filterRaw, clientID: clientID} remoteIDs := map[string]bool{} base := "p_" + subID + "_" proxySubs[subID] = &proxySub{ remoteIDs: remoteIDs, relayCount: len(relayURLs), } for _, url := range relayURLs { if !isAllowedRelay(url) { sw.Log("rejected relay URL: " + url) continue } suffix := urlSuffix(url) rSubID := base + suffix remoteIDs[rSubID] = true c := getConn(url) c.Subscribe(rSubID, []*nostr.Filter{f}) } proxyID := subID proxySubs[subID].timer = sw.SetTimeout(15000, func() { info, ok := proxySubs[proxyID] if ok && !info.done { info.done = true if cs, ok := clientSubs[proxyID]; ok { sendToClient(cs.clientID, "[\"EOSE\","+jstr(proxyID)+"]") } } }) } func routerCleanupProxy(proxyID string) { info, ok := proxySubs[proxyID] if !ok { return } sw.ClearTimeout(info.timer) if !info.done { if cs, ok := clientSubs[proxyID]; ok { sendToClient(cs.clientID, "[\"EOSE\","+jstr(proxyID)+"]") } } for rSubID := range info.remoteIDs { for _, url := range rpool.URLs() { c := rpool.Get(url) if c != nil && c.IsOpen() { c.CloseSubscription(rSubID) } } } delete(proxySubs, proxyID) delete(clientSubs, proxyID) } // --- Relay event callbacks --- func routerOnRelayEvent(relayURL string, remoteSubID string, ev *nostr.Event) { // Verify event ID hash only — signature verification happens in the signer extension. if !ev.CheckID() { return } // Reject events with timestamps too far in the future (>1 hour). now := sw.NowSeconds() if ev.CreatedAt > now+3600 { return } evJSON := ev.ToJSON() pushToProxySub(remoteSubID, relayURL, ev) // Cache only — never re-publish received events to other relays. // Relays exchange events through their own connections; the client // is not a transport. The user's own publishes already fan out via // routerPublish → relayPublish. Re-publishing here created an echo // loop that leaked profile-sub events back into the broad feed sub. cacheStore(evJSON, func(saved bool) { if saved && (ev.Kind == 4 || ev.Kind == 1059) && myPubkey != "" { routerDecryptDM(ev) } }) } func routerDecryptDM(ev *nostr.Event) { // Determine peer pubkey. var peer string if ev.PubKey == myPubkey { // We sent this DM — peer is the "p" tag recipient. pTag := ev.Tags.GetFirst("p") if pTag == nil { return } peer = pTag.Value() } else { peer = ev.PubKey } if len(peer) != 64 || !isHex(peer) { return } method := "nip04.decrypt" if ev.Kind == 1059 { method = "nip44.decrypt" } requestCrypto("", method, peer, ev.Content, func(plaintext, errMsg string) { if errMsg != "" || plaintext == "" { return } protocol := "nip04" if ev.Kind == 1059 { protocol = "nip44" } dm := makeDMRecord(peer, ev.PubKey, plaintext, ev.CreatedAt, protocol, ev.ID) routerSaveDMRecordJSON(dm.ToJSON()) }) } func routerOnRelayEOSE(subID string) { for proxyID, info := range proxySubs { if info.remoteIDs[subID] { info.eoseCount++ if info.eoseCount >= info.relayCount && !info.done { info.done = true sw.ClearTimeout(info.timer) if cs, ok := clientSubs[proxyID]; ok { sendToClient(cs.clientID, "[\"EOSE\","+jstr(proxyID)+"]") } } } } } // pushToProxySub dispatches a relay event only to the clientSub that owns // the remote subscription. Prevents profile events leaking into the feed. func pushToProxySub(remoteSubID string, relayURL string, ev *nostr.Event) { for proxyID, info := range proxySubs { if info.remoteIDs[remoteSubID] { if cs, ok := clientSubs[proxyID]; ok && cs.filter.Matches(ev) { sendToClient(cs.clientID, "[\"EVENT\","+jstr(proxyID)+","+ev.ToJSON()+"]") sendToClient(cs.clientID, "[\"SEEN_ON\","+jstr(ev.ID)+","+jstr(relayURL)+"]") } return } } } func pushToMatchingSubs(ev *nostr.Event) { for subID, cs := range clientSubs { if cs.filter.Matches(ev) { sendToClient(cs.clientID, "[\"EVENT\","+jstr(subID)+","+ev.ToJSON()+"]") } } } // --- Signing --- func routerSign(clientID, requestID, eventRaw string) { if !hasKey { sendToClient(clientID, "[\"SIGN_ERROR\","+jstr(requestID)+",\"no key\"]") return } ev := nostr.ParseEvent(eventRaw) if ev == nil { sendToClient(clientID, "[\"SIGN_ERROR\","+jstr(requestID)+",\"parse error\"]") return } if identitySignEvent(ev) { sendToClient(clientID, "[\"SIGNED\","+jstr(requestID)+","+ev.ToJSON()+"]") } else { sendToClient(clientID, "[\"SIGN_ERROR\","+jstr(requestID)+",\"sign failed\"]") } } // --- DM routing --- func routerSaveDMRecordJSON(dmJSON string) { cacheSaveDM(dmJSON, func(result string) { if result != "duplicate" { broadcastToClients("[\"DM_RECEIVED\"," + dmJSON + "]") } }) } func routerSendDM(clientID, recipientPubkey, content string, relayURLs []string) { if myPubkey == "" || !hasKey { return } requestCrypto(clientID, "nip04.encrypt", recipientPubkey, content, func(ciphertext, errMsg string) { if errMsg != "" { sendToClient(clientID, "[\"DM_SENT\","+jstr(recipientPubkey)+",false,"+jstr(errMsg)+"]") return } ev := &nostr.Event{ Kind: 4, Content: ciphertext, CreatedAt: sw.NowSeconds(), Tags: nostr.Tags{nostr.Tag{"p", recipientPubkey}}, } if !identitySignEvent(ev) { sendToClient(clientID, "[\"DM_SENT\","+jstr(recipientPubkey)+",false,\"sign failed\"]") return } for _, url := range relayURLs { if isAllowedRelay(url) { getConn(url).Publish(ev) } } sendToClient(clientID, "[\"DM_SENT\","+jstr(recipientPubkey)+",true,\"\"]") }) } func routerDMSub(_ string, relayURLs []string) { if myPubkey == "" || len(relayURLs) == 0 { return } dmRelayURLs = relayURLs for rSubID := range dmSubIDs { for _, url := range rpool.URLs() { c := rpool.Get(url) if c != nil && c.IsOpen() { c.CloseSubscription(rSubID) } } } dmSubIDs = map[string]bool{} for _, url := range relayURLs { if !isAllowedRelay(url) { continue } suffix := urlSuffix(url) id1 := "dm4in_" + suffix id2 := "dm4out_" + suffix id3 := "dm17_" + suffix dmSubIDs[id1] = true dmSubIDs[id2] = true dmSubIDs[id3] = true c := getConn(url) c.Subscribe(id1, []*nostr.Filter{{Kinds: []int{4}, Tags: map[string][]string{"#p": {myPubkey}}, Limit: 100}}) c.Subscribe(id2, []*nostr.Filter{{Kinds: []int{4}, Authors: []string{myPubkey}, Limit: 100}}) c.Subscribe(id3, []*nostr.Filter{{Kinds: []int{1059}, Tags: map[string][]string{"#p": {myPubkey}}, Limit: 100}}) } } func routerDMList(clientID string) { cacheGetConversationList(func(listJSON string) { sendToClient(clientID, "[\"DM_LIST\","+listJSON+"]") }) } func routerDMHistory(clientID, peer string, limit int, until int64) { if limit <= 0 { limit = 50 } cacheQueryDMs(peer, limit, until, func(msgsJSON string) { sendToClient(clientID, "[\"DM_HISTORY\","+jstr(peer)+","+msgsJSON+"]") }) } // --- Broadcast --- func routerBroadcast(clientID, pubkey string, relayURLs []string) { filterJSON := "{\"authors\":[" + jstr(pubkey) + "],\"kinds\":[0,3,10002,10050,10051]}" cacheQuery(filterJSON, func(eventsJSON string) { events := nostr.ParseEventsJSON(eventsJSON) byKind := map[int]*nostr.Event{} for _, ev := range events { if prev, ok := byKind[ev.Kind]; !ok || ev.CreatedAt > prev.CreatedAt { byKind[ev.Kind] = ev } } userRelays := relayURLs if relayEv, ok := byKind[10002]; ok { userRelays = nil for _, t := range relayEv.Tags.GetAll("r") { userRelays = append(userRelays, t.Value()) } } if len(userRelays) == 0 { userRelays = writeRelays } if _, ok := byKind[10050]; !ok && hasKey && len(userRelays) > 0 { ev := createRelayListEvent(10050, pubkey, userRelays) if ev != nil { cacheStore(ev.ToJSON(), func(_ bool) {}) byKind[10050] = ev } } if _, ok := byKind[10051]; !ok && hasKey && len(userRelays) > 0 { ev := createRelayListEvent(10051, pubkey, userRelays) if ev != nil { cacheStore(ev.ToJSON(), func(_ bool) {}) byKind[10051] = ev } } count := 0 for _, ev := range byKind { for _, url := range relayURLs { if !isAllowedRelay(url) { continue } getConn(url).Publish(ev) } count++ } sendToClient(clientID, "[\"BROADCAST_DONE\","+helpers.Itoa(int64(count))+","+helpers.Itoa(int64(len(relayURLs)))+"]") }) } func createRelayListEvent(kind int, _ string, relays []string) *nostr.Event { tagKey := "relay" var tags nostr.Tags for _, r := range relays { tags = append(tags, nostr.Tag{tagKey, r}) } ev := &nostr.Event{ Kind: kind, Content: "", Tags: tags, CreatedAt: sw.NowSeconds(), } if !identitySignEvent(ev) { return nil } return ev } func stringsToJSON(ss []string) string { if len(ss) == 0 { return "[]" } b := "[" for i, s := range ss { if i > 0 { b += "," } b += jstr(s) } return b + "]" }