broadcast_worker.mx raw
1 package wire
2
3 import (
4 "smesh.lol/pkg/access"
5 "smesh.lol/pkg/nostr/envelope"
6 "smesh.lol/pkg/nostr/filter"
7 )
8
9 // bcastConnState holds the broadcast domain's view of one WS connection.
10 type bcastConnState struct {
11 whitelisted bool
12 authed bool
13 authedPubkey [32]byte
14 subs map[string]filter.S
15 }
16
17 // BroadcastWorker is the spawn target for the broadcast domain. Owns a
18 // registry of connection subscriptions and performs fan-out filter matching
19 // so the net domain's epoll loop isn't blocked by O(N*M*F) work per event.
20 //
21 // All parent→child messages go through a single `in` channel to avoid the
22 // Moxie pipe channel select limitation. SubOpBcast carries broadcast events;
23 // other SubOp* values carry conn/sub lifecycle updates.
24 func BroadcastWorker(in chan SubCommand, frames chan BroadcastFrame) {
25 conns := map[int32]*bcastConnState{}
26 for {
27 cmd, ok := <-in
28 if !ok {
29 return
30 }
31 if cmd.Op == SubOpBcast {
32 bcastHandleReq(cmd, conns, frames)
33 } else {
34 bcastHandleCmd(cmd, conns)
35 }
36 }
37 }
38
39 func bcastHandleCmd(cmd SubCommand, conns map[int32]*bcastConnState) {
40 switch cmd.Op {
41 case SubOpNew:
42 conns[cmd.ConnFD] = &bcastConnState{
43 whitelisted: cmd.Flags&1 != 0,
44 subs: map[string]filter.S{},
45 }
46 case SubOpAdd:
47 c := conns[cmd.ConnFD]
48 if c == nil {
49 return
50 }
51 _, rem, _ := envelope.Identify(cmd.Bytes)
52 var req envelope.Req
53 if _, err := req.Unmarshal(rem); err != nil || req.Filters == nil {
54 return
55 }
56 c.subs[string(cmd.SubID)] = filter.S(*req.Filters)
57 case SubOpRemove:
58 c := conns[cmd.ConnFD]
59 if c == nil {
60 return
61 }
62 delete(c.subs, string(cmd.SubID))
63 case SubOpClose:
64 delete(conns, cmd.ConnFD)
65 case SubOpAuth:
66 c := conns[cmd.ConnFD]
67 if c == nil {
68 return
69 }
70 c.authed = true
71 if len(cmd.Bytes) == 32 {
72 copy(c.authedPubkey[:], cmd.Bytes)
73 }
74 }
75 }
76
77 // bcastHandleReq fans out an accepted event to all matching subscriptions.
78 // ConnFD = senderFD to exclude, Flags = filter flags, Bytes = raw EVENT JSON.
79 func bcastHandleReq(cmd SubCommand, conns map[int32]*bcastConnState, frames chan BroadcastFrame) {
80 senderFD := cmd.ConnFD
81 needFilter := cmd.Flags&1 != 0
82 nip70 := cmd.Flags&2 != 0
83 marmotOpen := cmd.Flags&4 != 0
84
85 _, rem, _ := envelope.Identify(cmd.Bytes)
86 var es envelope.EventSubmission
87 if _, err := es.Unmarshal(rem); err != nil || es.E == nil {
88 return
89 }
90 ev := es.E
91
92 scratch := []byte{:0:ev.EstimateSize() + 128}
93 er := &envelope.EventResult{Event: ev}
94 for connFD, c := range conns {
95 if connFD == senderFD {
96 continue
97 }
98 if needFilter && !c.whitelisted && !access.CanSee(c.authed, c.authedPubkey[:], ev, nip70, marmotOpen) {
99 continue
100 }
101 for subID, filters := range c.subs {
102 if filters.Match(ev) {
103 er.Subscription = []byte(subID)
104 marshaled := er.Marshal(scratch[:0])
105 out := []byte{:len(marshaled)}
106 copy(out, marshaled)
107 frames <- BroadcastFrame{ConnFD: connFD, Bytes: out}
108 }
109 }
110 }
111 }
112