bus.go raw

   1  package main
   2  
   3  import (
   4  	"common/helpers"
   5  	"common/jsbridge/bc"
   6  	"common/jsbridge/sw"
   7  )
   8  
   9  // Bus — BroadcastChannel connecting shell and relay SWs.
  10  // All client-local, no server involvement.
  11  //
  12  // Satellite SWs may be terminated by the browser at any time.
  13  // BroadcastChannel messages don't wake terminated SWs. We queue messages for
  14  // each destination until it sends a READY announcement, then flush the queue.
  15  
  16  var bus bc.BC
  17  
  18  // Per-destination readiness tracking and message queue.
  19  var (
  20  	relayReady bool
  21  	relayQueue []string
  22  )
  23  
  24  func connectBus() {
  25  	broadcastToClients("[\"SW_LOG\",\"shell\",\"bus connecting v" + version + "\"]")
  26  	bus = bc.Open("smesh-bus", func(msg string) { onBusMessage(msg) })
  27  	bc.Send(bus, "{\"from\":\"shell\",\"to\":\"*\",\"msg\":[\"PING\","+jstr(version)+"]}")
  28  	if myPubkey == "" {
  29  		broadcastToClients("[\"NEED_IDENTITY\"]")
  30  	}
  31  	sw.SetTimeout(30000, func() {
  32  		if !relayReady {
  33  			sw.Log("shell: relay SW not responding after 30s")
  34  			broadcastToClients("[\"SW_STATUS\",\"relay-timeout\"]")
  35  		}
  36  	})
  37  }
  38  
  39  func busSend(to, msg string) {
  40  	envelope := "{\"from\":\"shell\",\"to\":" + jstr(to) + ",\"msg\":" + msg + "}"
  41  	if to == "relay" && !relayReady {
  42  		// Fix 1f: cap queue to prevent unbounded growth if relay SW never starts.
  43  		if len(relayQueue) >= 256 {
  44  			relayQueue = relayQueue[1:]
  45  		}
  46  		relayQueue = append(relayQueue, envelope)
  47  		return
  48  	}
  49  	bc.Send(bus, envelope)
  50  }
  51  
  52  func flushQueue(to string) {
  53  	if to != "relay" {
  54  		return
  55  	}
  56  	hadQueued := len(relayQueue) > 0
  57  	relayReady = true
  58  	for _, msg := range relayQueue {
  59  		bc.Send(bus, msg)
  60  	}
  61  	relayQueue = nil
  62  	// Only RESUB if queue was empty — means page's subs went to a dead shell
  63  	// SW instance (thread eviction) and need re-sending. If queue had messages,
  64  	// the flush already delivered them; RESUB would cause a double-subscribe
  65  	// race that tears down in-flight subscriptions.
  66  	if !hadQueued {
  67  		broadcastToClients("[\"RESUB\"]")
  68  	}
  69  }
  70  
  71  func onBusMessage(raw string) {
  72  	from := jsonField(raw, "from")
  73  	if from == "shell" {
  74  		return
  75  	}
  76  	to := jsonField(raw, "to")
  77  	if to != "shell" && to != "*" {
  78  		return
  79  	}
  80  	msg := jsonFieldRaw(raw, "msg")
  81  	if msg == "" {
  82  		return
  83  	}
  84  	w := newMW(msg)
  85  	msgType := w.str()
  86  
  87  	// READY handshake — satellite SW just connected to bus.
  88  	if msgType == "READY" {
  89  		satVer := w.str()
  90  		broadcastToClients("[\"SW_LOG\",\"shell\",\"READY from " + from + " v=" + satVer + "\"]")
  91  		if satVer != "" && satVer != version {
  92  			sw.Log("shell: version mismatch from " + from + ": " + satVer + " != " + version)
  93  			broadcastToClients("[\"FORCE_UPDATE_SW\"," + jstr(from) + "]")
  94  			return
  95  		}
  96  		flushQueue(from)
  97  		return
  98  	}
  99  
 100  	// Any message from relay SW also confirms it's alive.
 101  	if from == "relay" && !relayReady {
 102  		flushQueue("relay")
 103  	}
 104  
 105  	if msgType != "LOG" && msgType != "FWD" && msgType != "FWD_ALL" && msgType != "FWD_BATCH" {
 106  		sw.Log("shell: bus " + from + "→" + msgType)
 107  	}
 108  
 109  	switch msgType {
 110  	case "LOG":
 111  		origin := w.str()
 112  		logMsg := w.str()
 113  		broadcastToClients("[\"SW_LOG\"," + jstr(origin) + "," + jstr(logMsg) + "]")
 114  		return
 115  	case "FWD":
 116  		dispatchFwd(w.str(), w.raw())
 117  	case "FWD_ALL":
 118  		broadcastToClients(w.raw())
 119  	case "FWD_BATCH":
 120  		dispatchFwdBatch(msg)
 121  	case "CRYPTO_REQ":
 122  		from := w.str()
 123  		id := int(w.num())
 124  		method := w.str()
 125  		peerPubkey := w.str()
 126  		data := w.str()
 127  		cryptoProxy(method, peerPubkey, data, func(result, errMsg string) {
 128  			busSend(from, "[\"CRYPTO_RESULT\","+helpers.Itoa(int64(id))+","+jstr(result)+","+jstr(errMsg)+"]")
 129  		})
 130  	case "DM_HISTORY_CLEARED":
 131  		peer := w.str()
 132  		broadcastToClients("[\"DM_HISTORY_CLEARED\"," + jstr(peer) + "]")
 133  	}
 134  }
 135  
 136  func dispatchFwd(clientID, innerMsg string) {
 137  	if clientID == "" {
 138  		if hasMarmotSub(innerMsg) {
 139  			deliverMarmotEvent(innerMsg)
 140  		} else {
 141  			broadcastToClients(innerMsg)
 142  		}
 143  	} else {
 144  		sendToClient(clientID, innerMsg)
 145  	}
 146  }
 147  
 148  // dispatchFwdBatch unpacks a FWD_BATCH message and dispatches each item.
 149  // Format: ["FWD_BATCH", [ ["FWD",clientID,msg], ["FWD_ALL",msg], ... ] ]
 150  func dispatchFwdBatch(raw string) {
 151  	// Find the outer array (second element of the batch message).
 152  	bw := newMW(raw)
 153  	_ = bw.str() // "FWD_BATCH"
 154  	// Now we're at the inner array. Walk it as raw values.
 155  	bw.sep()
 156  	if bw.i >= len(bw.s) || bw.s[bw.i] != '[' {
 157  		return
 158  	}
 159  	bw.i++ // skip '['
 160  	for {
 161  		bw.sep()
 162  		if bw.i >= len(bw.s) || bw.s[bw.i] == ']' {
 163  			break
 164  		}
 165  		// Each item is a full message array like ["FWD","cid",...] or ["FWD_ALL",...]
 166  		itemStart := bw.i
 167  		bw.i = skipval(bw.s, bw.i)
 168  		if bw.i < 0 {
 169  			break
 170  		}
 171  		item := bw.s[itemStart:bw.i]
 172  		iw := newMW(item)
 173  		t := iw.str()
 174  		switch t {
 175  		case "FWD":
 176  			dispatchFwd(iw.str(), iw.raw())
 177  		case "FWD_ALL":
 178  			broadcastToClients(iw.raw())
 179  		}
 180  	}
 181  }
 182  
 183  // hasMarmotSub checks if a FWD inner message is a marmot MLS subscription event.
 184  func hasMarmotSub(msg string) bool {
 185  	const prefix = "[\"EVENT\",\"marmot-sub-"
 186  	return len(msg) > len(prefix) && msg[:len(prefix)] == prefix
 187  }
 188  
 189  // deliverMarmotEvent converts a relay EVENT for a marmot subscription
 190  // into an MLS_PROXY deliverEvent message for the page.
 191  func deliverMarmotEvent(msg string) {
 192  	iw := newMW(msg)
 193  	_ = iw.str()            // "EVENT"
 194  	fullSubID := iw.str()   // "marmot-sub-<n>"
 195  	eventJSON := iw.raw()   // the event object
 196  	subID := fullSubID[11:] // strip "marmot-sub-" (11 chars)
 197  	// Event must be a JSON string (not raw object) — Go WASM calls args[1].String().
 198  	broadcastToClients("[\"MLS_PROXY\",\"deliverEvent\"," + subID + "," + jstr(eventJSON) + "]")
 199  }
 200  
 201  // jsonField extracts a string value (unquoted) for a key from a JSON object string.
 202  func jsonField(json, key string) string {
 203  	v := jsonFieldRaw(json, key)
 204  	if len(v) >= 2 && v[0] == '"' && v[len(v)-1] == '"' {
 205  		return jsonUnescape(v[1 : len(v)-1])
 206  	}
 207  	return v
 208  }
 209  
 210  // jsonUnescape handles JSON string escape sequences: \n \t \\ \" \/ \r
 211  func jsonUnescape(s string) string {
 212  	if len(s) == 0 {
 213  		return s
 214  	}
 215  	// Fast path: no escapes.
 216  	hasEscape := false
 217  	for i := 0; i < len(s); i++ {
 218  		if s[i] == '\\' {
 219  			hasEscape = true
 220  			break
 221  		}
 222  	}
 223  	if !hasEscape {
 224  		return s
 225  	}
 226  	out := make([]byte, 0, len(s))
 227  	for i := 0; i < len(s); i++ {
 228  		if s[i] == '\\' && i+1 < len(s) {
 229  			switch s[i+1] {
 230  			case 'n':
 231  				out = append(out, '\n')
 232  			case 't':
 233  				out = append(out, '\t')
 234  			case 'r':
 235  				out = append(out, '\r')
 236  			case '\\':
 237  				out = append(out, '\\')
 238  			case '"':
 239  				out = append(out, '"')
 240  			case '/':
 241  				out = append(out, '/')
 242  			default:
 243  				out = append(out, s[i], s[i+1])
 244  			}
 245  			i++
 246  		} else {
 247  			out = append(out, s[i])
 248  		}
 249  	}
 250  	return string(out)
 251  }
 252  
 253  // jsonFieldRaw extracts a raw JSON value for a key from a JSON object string.
 254  func jsonFieldRaw(json, key string) string {
 255  	needle := "\"" + key + "\":"
 256  	idx := -1
 257  	for i := 0; i <= len(json)-len(needle); i++ {
 258  		if json[i:i+len(needle)] == needle {
 259  			idx = i + len(needle)
 260  			break
 261  		}
 262  	}
 263  	if idx < 0 {
 264  		return ""
 265  	}
 266  	for idx < len(json) && (json[idx] == ' ' || json[idx] == '\t') {
 267  		idx++
 268  	}
 269  	if idx >= len(json) {
 270  		return ""
 271  	}
 272  	end := skipval(json, idx)
 273  	if end < 0 {
 274  		return ""
 275  	}
 276  	return json[idx:end]
 277  }
 278  
 279  // parseTS parses a numeric string (from JSON) to int64.
 280  func parseTS(s string) int64 {
 281  	var n int64
 282  	for i := 0; i < len(s); i++ {
 283  		if s[i] >= '0' && s[i] <= '9' {
 284  			n = n*10 + int64(s[i]-'0')
 285  		}
 286  	}
 287  	return n
 288  }
 289  
 290  // strsJSON serializes a string slice to a JSON array.
 291  func strsJSON(ss []string) string {
 292  	if len(ss) == 0 {
 293  		return "[]"
 294  	}
 295  	out := "["
 296  	for i, s := range ss {
 297  		if i > 0 {
 298  			out += ","
 299  		}
 300  		out += jstr(s)
 301  	}
 302  	return out + "]"
 303  }
 304