broadcast.mx raw

   1  // Package broadcast manages the live subscription fan-out domain.
   2  // A BroadcastWorker child process owns the subscription registry and filter
   3  // matching. The parent communicates via pipe frame IPC. This package hides
   4  // the wire protocol behind named methods so the server never deals with
   5  // SubCommand encoding or BroadcastFrame decoding directly.
   6  package broadcast
   7  
   8  import (
   9  	"bytes"
  10  	"runtime"
  11  
  12  	"smesh.lol/pkg/pool"
  13  	"smesh.lol/pkg/relay/wire"
  14  )
  15  
  16  // Pre-spawn state. Call PreSpawn() before any GC-triggering allocation
  17  // to fork the worker safely (Boehm GC's mmap'd lock + fork = deadlock).
  18  // Non-relay commands (sync, crawl, outbox) skip PreSpawn and never pay
  19  // the child process cost.
  20  var preSpawnedIn  chan wire.SubCommand
  21  var preSpawnedOut chan wire.BroadcastFrame
  22  var preSpawnedFD  int32 = -1
  23  
  24  // PreSpawn forks the broadcast worker early, before heap pressure can
  25  // trigger a GC cycle. Must be called from main() before config loading
  26  // for relay mode only.
  27  func PreSpawn() {
  28  	preSpawnedIn = chan wire.SubCommand{}
  29  	preSpawnedOut = chan wire.BroadcastFrame{}
  30  	spawn(wire.BroadcastWorker, preSpawnedIn, preSpawnedOut)
  31  	preSpawnedFD = runtime.LastSpawnedParentFd()
  32  }
  33  
  34  // Broadcaster is the parent-side handle to the broadcast worker domain.
  35  type Broadcaster struct {
  36  	in     chan wire.SubCommand
  37  	out    chan wire.BroadcastFrame
  38  	fd     int32
  39  	reader pool.FrameReader
  40  }
  41  
  42  // New wires the pre-spawned broadcast worker into a Broadcaster.
  43  // Falls back to spawning a new worker if called after init() consumed
  44  // the pre-spawned instance (e.g. in test binaries).
  45  func New() *Broadcaster {
  46  	if preSpawnedFD >= 0 {
  47  		b := &Broadcaster{in: preSpawnedIn, out: preSpawnedOut, fd: preSpawnedFD}
  48  		preSpawnedFD = -1
  49  		return b
  50  	}
  51  	in := chan wire.SubCommand{}
  52  	out := chan wire.BroadcastFrame{}
  53  	spawn(wire.BroadcastWorker, in, out)
  54  	return &Broadcaster{in: in, out: out, fd: runtime.LastSpawnedParentFd()}
  55  }
  56  
  57  // FD returns the parent-side pipe FD to register with the transport epoll set.
  58  func (b *Broadcaster) FD() int32 { return b.fd }
  59  
  60  // Valid reports whether the worker is still alive.
  61  func (b *Broadcaster) Valid() bool { return b.fd >= 0 }
  62  
  63  // OnConnect notifies the broadcast domain of a new WS connection.
  64  func (b *Broadcaster) OnConnect(fd int32, whitelisted bool) {
  65  	var flags uint8
  66  	if whitelisted {
  67  		flags = 1
  68  	}
  69  	b.send(wire.SubCommand{Op: wire.SubOpNew, ConnFD: fd, Flags: flags})
  70  }
  71  
  72  // OnDisconnect notifies the broadcast domain that a WS connection closed.
  73  func (b *Broadcaster) OnDisconnect(fd int32) {
  74  	b.send(wire.SubCommand{Op: wire.SubOpClose, ConnFD: fd})
  75  }
  76  
  77  // OnSubscribe registers a REQ subscription.
  78  func (b *Broadcaster) OnSubscribe(fd int32, subID []byte, reqMsg []byte) {
  79  	b.send(wire.SubCommand{Op: wire.SubOpAdd, ConnFD: fd, SubID: subID, Bytes: reqMsg})
  80  }
  81  
  82  // OnUnsubscribe removes a subscription.
  83  func (b *Broadcaster) OnUnsubscribe(fd int32, subID []byte) {
  84  	b.send(wire.SubCommand{Op: wire.SubOpRemove, ConnFD: fd, SubID: subID})
  85  }
  86  
  87  // OnAuth updates the broadcast domain with the authed pubkey for a connection.
  88  func (b *Broadcaster) OnAuth(fd int32, pubkey []byte) {
  89  	b.send(wire.SubCommand{Op: wire.SubOpAuth, ConnFD: fd, Bytes: pubkey})
  90  }
  91  
  92  // Fanout dispatches an accepted event for fan-out to matching subscribers.
  93  // flags: bit0=needFilter, bit1=nip70Enforce, bit2=marmotOpen.
  94  func (b *Broadcaster) Fanout(rawMsg []byte, senderFD int32, flags uint8) {
  95  	b.send(wire.SubCommand{Op: wire.SubOpBcast, ConnFD: senderFD, Flags: flags, Bytes: rawMsg})
  96  }
  97  
  98  // ReadFrame reads one BroadcastFrame from the worker using non-blocking I/O.
  99  // Returns (connFD, payload, ok). ok=false means EAGAIN (partial state preserved
 100  // for next call) or worker died (b.fd set to -1). Skips non-broadcast frames
 101  // (chanID != 2), reading up to 8 frames per call.
 102  func (b *Broadcaster) ReadFrame() (connFD int32, msg []byte, ok bool) {
 103  	for attempt := 0; attempt < 8; attempt++ {
 104  		chanID, data, complete := b.reader.Read(b.fd)
 105  		if !complete {
 106  			if runtime.PipeClosed(b.fd) {
 107  				b.fd = -1
 108  			}
 109  			return 0, nil, false
 110  		}
 111  		if chanID != 2 {
 112  			continue
 113  		}
 114  		if len(data) == 0 {
 115  			return 0, nil, false
 116  		}
 117  		var frame wire.BroadcastFrame
 118  		if err := frame.DecodeFrom(bytes.NewReader(data)); err != nil {
 119  			return 0, nil, false
 120  		}
 121  		return frame.ConnFD, frame.Bytes, true
 122  	}
 123  	return 0, nil, false
 124  }
 125  
 126  var sendBuf bytes.Buffer
 127  
 128  func (b *Broadcaster) send(cmd wire.SubCommand) {
 129  	if b.fd < 0 || !runtime.PipeFDCanSend(b.fd) {
 130  		return
 131  	}
 132  	sendBuf.Reset()
 133  	if err := cmd.EncodeTo(&sendBuf); err != nil {
 134  		return
 135  	}
 136  	pool.WriteFrame(b.fd, 1, sendBuf.Bytes()) // chanID=1: `in` chan is first arg
 137  }
 138