bus.go raw
1 package main
2
3 import (
4 "common/helpers"
5 "common/jsbridge/bc"
6 "common/jsbridge/sw"
7 )
8
9 // Bus — BroadcastChannel connecting shell and relay SWs.
10 // All client-local, no server involvement.
11 //
12 // Satellite SWs may be terminated by the browser at any time.
13 // BroadcastChannel messages don't wake terminated SWs. We queue messages for
14 // each destination until it sends a READY announcement, then flush the queue.
15
16 var bus bc.BC
17
18 // Per-destination readiness tracking and message queue.
19 var (
20 relayReady bool
21 relayQueue []string
22 )
23
24 func connectBus() {
25 broadcastToClients("[\"SW_LOG\",\"shell\",\"bus connecting v" + version + "\"]")
26 bus = bc.Open("smesh-bus", func(msg string) { onBusMessage(msg) })
27 bc.Send(bus, "{\"from\":\"shell\",\"to\":\"*\",\"msg\":[\"PING\","+jstr(version)+"]}")
28 if myPubkey == "" {
29 broadcastToClients("[\"NEED_IDENTITY\"]")
30 }
31 sw.SetTimeout(30000, func() {
32 if !relayReady {
33 sw.Log("shell: relay SW not responding after 30s")
34 broadcastToClients("[\"SW_STATUS\",\"relay-timeout\"]")
35 }
36 })
37 }
38
39 func busSend(to, msg string) {
40 envelope := "{\"from\":\"shell\",\"to\":" + jstr(to) + ",\"msg\":" + msg + "}"
41 if to == "relay" && !relayReady {
42 // Fix 1f: cap queue to prevent unbounded growth if relay SW never starts.
43 if len(relayQueue) >= 256 {
44 relayQueue = relayQueue[1:]
45 }
46 relayQueue = append(relayQueue, envelope)
47 return
48 }
49 bc.Send(bus, envelope)
50 }
51
52 func flushQueue(to string) {
53 if to != "relay" {
54 return
55 }
56 hadQueued := len(relayQueue) > 0
57 relayReady = true
58 for _, msg := range relayQueue {
59 bc.Send(bus, msg)
60 }
61 relayQueue = nil
62 // Only RESUB if queue was empty — means page's subs went to a dead shell
63 // SW instance (thread eviction) and need re-sending. If queue had messages,
64 // the flush already delivered them; RESUB would cause a double-subscribe
65 // race that tears down in-flight subscriptions.
66 if !hadQueued {
67 broadcastToClients("[\"RESUB\"]")
68 }
69 }
70
71 func onBusMessage(raw string) {
72 from := jsonField(raw, "from")
73 if from == "shell" {
74 return
75 }
76 to := jsonField(raw, "to")
77 if to != "shell" && to != "*" {
78 return
79 }
80 msg := jsonFieldRaw(raw, "msg")
81 if msg == "" {
82 return
83 }
84 w := newMW(msg)
85 msgType := w.str()
86
87 // READY handshake — satellite SW just connected to bus.
88 if msgType == "READY" {
89 satVer := w.str()
90 broadcastToClients("[\"SW_LOG\",\"shell\",\"READY from " + from + " v=" + satVer + "\"]")
91 if satVer != "" && satVer != version {
92 sw.Log("shell: version mismatch from " + from + ": " + satVer + " != " + version)
93 broadcastToClients("[\"FORCE_UPDATE_SW\"," + jstr(from) + "]")
94 return
95 }
96 flushQueue(from)
97 return
98 }
99
100 // Any message from relay SW also confirms it's alive.
101 if from == "relay" && !relayReady {
102 flushQueue("relay")
103 }
104
105 if msgType != "LOG" && msgType != "FWD" && msgType != "FWD_ALL" && msgType != "FWD_BATCH" {
106 sw.Log("shell: bus " + from + "→" + msgType)
107 }
108
109 switch msgType {
110 case "LOG":
111 origin := w.str()
112 logMsg := w.str()
113 broadcastToClients("[\"SW_LOG\"," + jstr(origin) + "," + jstr(logMsg) + "]")
114 return
115 case "FWD":
116 dispatchFwd(w.str(), w.raw())
117 case "FWD_ALL":
118 broadcastToClients(w.raw())
119 case "FWD_BATCH":
120 dispatchFwdBatch(msg)
121 case "CRYPTO_REQ":
122 from := w.str()
123 id := int(w.num())
124 method := w.str()
125 peerPubkey := w.str()
126 data := w.str()
127 cryptoProxy(method, peerPubkey, data, func(result, errMsg string) {
128 busSend(from, "[\"CRYPTO_RESULT\","+helpers.Itoa(int64(id))+","+jstr(result)+","+jstr(errMsg)+"]")
129 })
130 case "DM_HISTORY_CLEARED":
131 peer := w.str()
132 broadcastToClients("[\"DM_HISTORY_CLEARED\"," + jstr(peer) + "]")
133 }
134 }
135
136 func dispatchFwd(clientID, innerMsg string) {
137 if clientID == "" {
138 if hasMarmotSub(innerMsg) {
139 deliverMarmotEvent(innerMsg)
140 } else {
141 broadcastToClients(innerMsg)
142 }
143 } else {
144 sendToClient(clientID, innerMsg)
145 }
146 }
147
148 // dispatchFwdBatch unpacks a FWD_BATCH message and dispatches each item.
149 // Format: ["FWD_BATCH", [ ["FWD",clientID,msg], ["FWD_ALL",msg], ... ] ]
150 func dispatchFwdBatch(raw string) {
151 // Find the outer array (second element of the batch message).
152 bw := newMW(raw)
153 _ = bw.str() // "FWD_BATCH"
154 // Now we're at the inner array. Walk it as raw values.
155 bw.sep()
156 if bw.i >= len(bw.s) || bw.s[bw.i] != '[' {
157 return
158 }
159 bw.i++ // skip '['
160 for {
161 bw.sep()
162 if bw.i >= len(bw.s) || bw.s[bw.i] == ']' {
163 break
164 }
165 // Each item is a full message array like ["FWD","cid",...] or ["FWD_ALL",...]
166 itemStart := bw.i
167 bw.i = skipval(bw.s, bw.i)
168 if bw.i < 0 {
169 break
170 }
171 item := bw.s[itemStart:bw.i]
172 iw := newMW(item)
173 t := iw.str()
174 switch t {
175 case "FWD":
176 dispatchFwd(iw.str(), iw.raw())
177 case "FWD_ALL":
178 broadcastToClients(iw.raw())
179 }
180 }
181 }
182
183 // hasMarmotSub checks if a FWD inner message is a marmot MLS subscription event.
184 func hasMarmotSub(msg string) bool {
185 const prefix = "[\"EVENT\",\"marmot-sub-"
186 return len(msg) > len(prefix) && msg[:len(prefix)] == prefix
187 }
188
189 // deliverMarmotEvent converts a relay EVENT for a marmot subscription
190 // into an MLS_PROXY deliverEvent message for the page.
191 func deliverMarmotEvent(msg string) {
192 iw := newMW(msg)
193 _ = iw.str() // "EVENT"
194 fullSubID := iw.str() // "marmot-sub-<n>"
195 eventJSON := iw.raw() // the event object
196 subID := fullSubID[11:] // strip "marmot-sub-" (11 chars)
197 // Event must be a JSON string (not raw object) — Go WASM calls args[1].String().
198 broadcastToClients("[\"MLS_PROXY\",\"deliverEvent\"," + subID + "," + jstr(eventJSON) + "]")
199 }
200
201 // jsonField extracts a string value (unquoted) for a key from a JSON object string.
202 func jsonField(json, key string) string {
203 v := jsonFieldRaw(json, key)
204 if len(v) >= 2 && v[0] == '"' && v[len(v)-1] == '"' {
205 return jsonUnescape(v[1 : len(v)-1])
206 }
207 return v
208 }
209
210 // jsonUnescape handles JSON string escape sequences: \n \t \\ \" \/ \r
211 func jsonUnescape(s string) string {
212 if len(s) == 0 {
213 return s
214 }
215 // Fast path: no escapes.
216 hasEscape := false
217 for i := 0; i < len(s); i++ {
218 if s[i] == '\\' {
219 hasEscape = true
220 break
221 }
222 }
223 if !hasEscape {
224 return s
225 }
226 out := make([]byte, 0, len(s))
227 for i := 0; i < len(s); i++ {
228 if s[i] == '\\' && i+1 < len(s) {
229 switch s[i+1] {
230 case 'n':
231 out = append(out, '\n')
232 case 't':
233 out = append(out, '\t')
234 case 'r':
235 out = append(out, '\r')
236 case '\\':
237 out = append(out, '\\')
238 case '"':
239 out = append(out, '"')
240 case '/':
241 out = append(out, '/')
242 default:
243 out = append(out, s[i], s[i+1])
244 }
245 i++
246 } else {
247 out = append(out, s[i])
248 }
249 }
250 return string(out)
251 }
252
253 // jsonFieldRaw extracts a raw JSON value for a key from a JSON object string.
254 func jsonFieldRaw(json, key string) string {
255 needle := "\"" + key + "\":"
256 idx := -1
257 for i := 0; i <= len(json)-len(needle); i++ {
258 if json[i:i+len(needle)] == needle {
259 idx = i + len(needle)
260 break
261 }
262 }
263 if idx < 0 {
264 return ""
265 }
266 for idx < len(json) && (json[idx] == ' ' || json[idx] == '\t') {
267 idx++
268 }
269 if idx >= len(json) {
270 return ""
271 }
272 end := skipval(json, idx)
273 if end < 0 {
274 return ""
275 }
276 return json[idx:end]
277 }
278
279 // parseTS parses a numeric string (from JSON) to int64.
280 func parseTS(s string) int64 {
281 var n int64
282 for i := 0; i < len(s); i++ {
283 if s[i] >= '0' && s[i] <= '9' {
284 n = n*10 + int64(s[i]-'0')
285 }
286 }
287 return n
288 }
289
290 // strsJSON serializes a string slice to a JSON array.
291 func strsJSON(ss []string) string {
292 if len(ss) == 0 {
293 return "[]"
294 }
295 out := "["
296 for i, s := range ss {
297 if i > 0 {
298 out += ","
299 }
300 out += jstr(s)
301 }
302 return out + "]"
303 }
304