package wire import ( "smesh.lol/pkg/access" "smesh.lol/pkg/nostr/envelope" "smesh.lol/pkg/nostr/filter" ) // bcastConnState holds the broadcast domain's view of one WS connection. type bcastConnState struct { whitelisted bool authed bool authedPubkey [32]byte subs map[string]filter.S } // BroadcastWorker is the spawn target for the broadcast domain. Owns a // registry of connection subscriptions and performs fan-out filter matching // so the net domain's epoll loop isn't blocked by O(N*M*F) work per event. // // All parent→child messages go through a single `in` channel to avoid the // Moxie pipe channel select limitation. SubOpBcast carries broadcast events; // other SubOp* values carry conn/sub lifecycle updates. func BroadcastWorker(in chan SubCommand, frames chan BroadcastFrame) { conns := map[int32]*bcastConnState{} for { cmd, ok := <-in if !ok { return } if cmd.Op == SubOpBcast { bcastHandleReq(cmd, conns, frames) } else { bcastHandleCmd(cmd, conns) } } } func bcastHandleCmd(cmd SubCommand, conns map[int32]*bcastConnState) { switch cmd.Op { case SubOpNew: conns[cmd.ConnFD] = &bcastConnState{ whitelisted: cmd.Flags&1 != 0, subs: map[string]filter.S{}, } case SubOpAdd: c := conns[cmd.ConnFD] if c == nil { return } _, rem, _ := envelope.Identify(cmd.Bytes) var req envelope.Req if _, err := req.Unmarshal(rem); err != nil || req.Filters == nil { return } c.subs[string(cmd.SubID)] = filter.S(*req.Filters) case SubOpRemove: c := conns[cmd.ConnFD] if c == nil { return } delete(c.subs, string(cmd.SubID)) case SubOpClose: delete(conns, cmd.ConnFD) case SubOpAuth: c := conns[cmd.ConnFD] if c == nil { return } c.authed = true if len(cmd.Bytes) == 32 { copy(c.authedPubkey[:], cmd.Bytes) } } } // bcastHandleReq fans out an accepted event to all matching subscriptions. // ConnFD = senderFD to exclude, Flags = filter flags, Bytes = raw EVENT JSON. func bcastHandleReq(cmd SubCommand, conns map[int32]*bcastConnState, frames chan BroadcastFrame) { senderFD := cmd.ConnFD needFilter := cmd.Flags&1 != 0 nip70 := cmd.Flags&2 != 0 marmotOpen := cmd.Flags&4 != 0 _, rem, _ := envelope.Identify(cmd.Bytes) var es envelope.EventSubmission if _, err := es.Unmarshal(rem); err != nil || es.E == nil { return } ev := es.E scratch := []byte{:0:ev.EstimateSize() + 128} er := &envelope.EventResult{Event: ev} for connFD, c := range conns { if connFD == senderFD { continue } if needFilter && !c.whitelisted && !access.CanSee(c.authed, c.authedPubkey[:], ev, nip70, marmotOpen) { continue } for subID, filters := range c.subs { if filters.Match(ev) { er.Subscription = []byte(subID) marshaled := er.Marshal(scratch[:0]) out := []byte{:len(marshaled)} copy(out, marshaled) frames <- BroadcastFrame{ConnFD: connFD, Bytes: out} } } } }