broadcast.mx raw
1 // Package broadcast manages the live subscription fan-out domain.
2 // A BroadcastWorker child process owns the subscription registry and filter
3 // matching. The parent communicates via pipe frame IPC. This package hides
4 // the wire protocol behind named methods so the server never deals with
5 // SubCommand encoding or BroadcastFrame decoding directly.
6 package broadcast
7
8 import (
9 "bytes"
10 "runtime"
11
12 "smesh.lol/pkg/pool"
13 "smesh.lol/pkg/relay/wire"
14 )
15
16 // Pre-spawn state. Call PreSpawn() before any GC-triggering allocation
17 // to fork the worker safely (Boehm GC's mmap'd lock + fork = deadlock).
18 // Non-relay commands (sync, crawl, outbox) skip PreSpawn and never pay
19 // the child process cost.
20 var preSpawnedIn chan wire.SubCommand
21 var preSpawnedOut chan wire.BroadcastFrame
22 var preSpawnedFD int32 = -1
23
24 // PreSpawn forks the broadcast worker early, before heap pressure can
25 // trigger a GC cycle. Must be called from main() before config loading
26 // for relay mode only.
27 func PreSpawn() {
28 preSpawnedIn = chan wire.SubCommand{}
29 preSpawnedOut = chan wire.BroadcastFrame{}
30 spawn(wire.BroadcastWorker, preSpawnedIn, preSpawnedOut)
31 preSpawnedFD = runtime.LastSpawnedParentFd()
32 }
33
34 // Broadcaster is the parent-side handle to the broadcast worker domain.
35 type Broadcaster struct {
36 in chan wire.SubCommand
37 out chan wire.BroadcastFrame
38 fd int32
39 reader pool.FrameReader
40 }
41
42 // New wires the pre-spawned broadcast worker into a Broadcaster.
43 // Falls back to spawning a new worker if called after init() consumed
44 // the pre-spawned instance (e.g. in test binaries).
45 func New() *Broadcaster {
46 if preSpawnedFD >= 0 {
47 b := &Broadcaster{in: preSpawnedIn, out: preSpawnedOut, fd: preSpawnedFD}
48 preSpawnedFD = -1
49 return b
50 }
51 in := chan wire.SubCommand{}
52 out := chan wire.BroadcastFrame{}
53 spawn(wire.BroadcastWorker, in, out)
54 return &Broadcaster{in: in, out: out, fd: runtime.LastSpawnedParentFd()}
55 }
56
57 // FD returns the parent-side pipe FD to register with the transport epoll set.
58 func (b *Broadcaster) FD() int32 { return b.fd }
59
60 // Valid reports whether the worker is still alive.
61 func (b *Broadcaster) Valid() bool { return b.fd >= 0 }
62
63 // OnConnect notifies the broadcast domain of a new WS connection.
64 func (b *Broadcaster) OnConnect(fd int32, whitelisted bool) {
65 var flags uint8
66 if whitelisted {
67 flags = 1
68 }
69 b.send(wire.SubCommand{Op: wire.SubOpNew, ConnFD: fd, Flags: flags})
70 }
71
72 // OnDisconnect notifies the broadcast domain that a WS connection closed.
73 func (b *Broadcaster) OnDisconnect(fd int32) {
74 b.send(wire.SubCommand{Op: wire.SubOpClose, ConnFD: fd})
75 }
76
77 // OnSubscribe registers a REQ subscription.
78 func (b *Broadcaster) OnSubscribe(fd int32, subID []byte, reqMsg []byte) {
79 b.send(wire.SubCommand{Op: wire.SubOpAdd, ConnFD: fd, SubID: subID, Bytes: reqMsg})
80 }
81
82 // OnUnsubscribe removes a subscription.
83 func (b *Broadcaster) OnUnsubscribe(fd int32, subID []byte) {
84 b.send(wire.SubCommand{Op: wire.SubOpRemove, ConnFD: fd, SubID: subID})
85 }
86
87 // OnAuth updates the broadcast domain with the authed pubkey for a connection.
88 func (b *Broadcaster) OnAuth(fd int32, pubkey []byte) {
89 b.send(wire.SubCommand{Op: wire.SubOpAuth, ConnFD: fd, Bytes: pubkey})
90 }
91
92 // Fanout dispatches an accepted event for fan-out to matching subscribers.
93 // flags: bit0=needFilter, bit1=nip70Enforce, bit2=marmotOpen.
94 func (b *Broadcaster) Fanout(rawMsg []byte, senderFD int32, flags uint8) {
95 b.send(wire.SubCommand{Op: wire.SubOpBcast, ConnFD: senderFD, Flags: flags, Bytes: rawMsg})
96 }
97
98 // ReadFrame reads one BroadcastFrame from the worker using non-blocking I/O.
99 // Returns (connFD, payload, ok). ok=false means EAGAIN (partial state preserved
100 // for next call) or worker died (b.fd set to -1). Skips non-broadcast frames
101 // (chanID != 2), reading up to 8 frames per call.
102 func (b *Broadcaster) ReadFrame() (connFD int32, msg []byte, ok bool) {
103 for attempt := 0; attempt < 8; attempt++ {
104 chanID, data, complete := b.reader.Read(b.fd)
105 if !complete {
106 if runtime.PipeClosed(b.fd) {
107 b.fd = -1
108 }
109 return 0, nil, false
110 }
111 if chanID != 2 {
112 continue
113 }
114 if len(data) == 0 {
115 return 0, nil, false
116 }
117 var frame wire.BroadcastFrame
118 if err := frame.DecodeFrom(bytes.NewReader(data)); err != nil {
119 return 0, nil, false
120 }
121 return frame.ConnFD, frame.Bytes, true
122 }
123 return 0, nil, false
124 }
125
126 var sendBuf bytes.Buffer
127
128 func (b *Broadcaster) send(cmd wire.SubCommand) {
129 if b.fd < 0 || !runtime.PipeFDCanSend(b.fd) {
130 return
131 }
132 sendBuf.Reset()
133 if err := cmd.EncodeTo(&sendBuf); err != nil {
134 return
135 }
136 pool.WriteFrame(b.fd, 1, sendBuf.Bytes()) // chanID=1: `in` chan is first arg
137 }
138