// Package broadcast manages the live subscription fan-out domain. // A BroadcastWorker child process owns the subscription registry and filter // matching. The parent communicates via pipe frame IPC. This package hides // the wire protocol behind named methods so the server never deals with // SubCommand encoding or BroadcastFrame decoding directly. package broadcast import ( "bytes" "runtime" "smesh.lol/pkg/pool" "smesh.lol/pkg/relay/wire" ) // Pre-spawn state. Call PreSpawn() before any GC-triggering allocation // to fork the worker safely (Boehm GC's mmap'd lock + fork = deadlock). // Non-relay commands (sync, crawl, outbox) skip PreSpawn and never pay // the child process cost. var preSpawnedIn chan wire.SubCommand var preSpawnedOut chan wire.BroadcastFrame var preSpawnedFD int32 = -1 // PreSpawn forks the broadcast worker early, before heap pressure can // trigger a GC cycle. Must be called from main() before config loading // for relay mode only. func PreSpawn() { preSpawnedIn = chan wire.SubCommand{} preSpawnedOut = chan wire.BroadcastFrame{} spawn(wire.BroadcastWorker, preSpawnedIn, preSpawnedOut) preSpawnedFD = runtime.LastSpawnedParentFd() } // Broadcaster is the parent-side handle to the broadcast worker domain. type Broadcaster struct { in chan wire.SubCommand out chan wire.BroadcastFrame fd int32 reader pool.FrameReader } // New wires the pre-spawned broadcast worker into a Broadcaster. // Falls back to spawning a new worker if called after init() consumed // the pre-spawned instance (e.g. in test binaries). func New() *Broadcaster { if preSpawnedFD >= 0 { b := &Broadcaster{in: preSpawnedIn, out: preSpawnedOut, fd: preSpawnedFD} preSpawnedFD = -1 return b } in := chan wire.SubCommand{} out := chan wire.BroadcastFrame{} spawn(wire.BroadcastWorker, in, out) return &Broadcaster{in: in, out: out, fd: runtime.LastSpawnedParentFd()} } // FD returns the parent-side pipe FD to register with the transport epoll set. func (b *Broadcaster) FD() int32 { return b.fd } // Valid reports whether the worker is still alive. func (b *Broadcaster) Valid() bool { return b.fd >= 0 } // OnConnect notifies the broadcast domain of a new WS connection. func (b *Broadcaster) OnConnect(fd int32, whitelisted bool) { var flags uint8 if whitelisted { flags = 1 } b.send(wire.SubCommand{Op: wire.SubOpNew, ConnFD: fd, Flags: flags}) } // OnDisconnect notifies the broadcast domain that a WS connection closed. func (b *Broadcaster) OnDisconnect(fd int32) { b.send(wire.SubCommand{Op: wire.SubOpClose, ConnFD: fd}) } // OnSubscribe registers a REQ subscription. func (b *Broadcaster) OnSubscribe(fd int32, subID []byte, reqMsg []byte) { b.send(wire.SubCommand{Op: wire.SubOpAdd, ConnFD: fd, SubID: subID, Bytes: reqMsg}) } // OnUnsubscribe removes a subscription. func (b *Broadcaster) OnUnsubscribe(fd int32, subID []byte) { b.send(wire.SubCommand{Op: wire.SubOpRemove, ConnFD: fd, SubID: subID}) } // OnAuth updates the broadcast domain with the authed pubkey for a connection. func (b *Broadcaster) OnAuth(fd int32, pubkey []byte) { b.send(wire.SubCommand{Op: wire.SubOpAuth, ConnFD: fd, Bytes: pubkey}) } // Fanout dispatches an accepted event for fan-out to matching subscribers. // flags: bit0=needFilter, bit1=nip70Enforce, bit2=marmotOpen. func (b *Broadcaster) Fanout(rawMsg []byte, senderFD int32, flags uint8) { b.send(wire.SubCommand{Op: wire.SubOpBcast, ConnFD: senderFD, Flags: flags, Bytes: rawMsg}) } // ReadFrame reads one BroadcastFrame from the worker using non-blocking I/O. // Returns (connFD, payload, ok). ok=false means EAGAIN (partial state preserved // for next call) or worker died (b.fd set to -1). Skips non-broadcast frames // (chanID != 2), reading up to 8 frames per call. func (b *Broadcaster) ReadFrame() (connFD int32, msg []byte, ok bool) { for attempt := 0; attempt < 8; attempt++ { chanID, data, complete := b.reader.Read(b.fd) if !complete { if runtime.PipeClosed(b.fd) { b.fd = -1 } return 0, nil, false } if chanID != 2 { continue } if len(data) == 0 { return 0, nil, false } var frame wire.BroadcastFrame if err := frame.DecodeFrom(bytes.NewReader(data)); err != nil { return 0, nil, false } return frame.ConnFD, frame.Bytes, true } return 0, nil, false } var sendBuf bytes.Buffer func (b *Broadcaster) send(cmd wire.SubCommand) { if b.fd < 0 || !runtime.PipeFDCanSend(b.fd) { return } sendBuf.Reset() if err := cmd.EncodeTo(&sendBuf); err != nil { return } pool.WriteFrame(b.fd, 1, sendBuf.Bytes()) // chanID=1: `in` chan is first arg }