broadcast_worker.mx raw

   1  package wire
   2  
   3  import (
   4  	"smesh.lol/pkg/access"
   5  	"smesh.lol/pkg/nostr/envelope"
   6  	"smesh.lol/pkg/nostr/filter"
   7  )
   8  
   9  // bcastConnState holds the broadcast domain's view of one WS connection.
  10  type bcastConnState struct {
  11  	whitelisted  bool
  12  	authed       bool
  13  	authedPubkey [32]byte
  14  	subs         map[string]filter.S
  15  }
  16  
  17  // BroadcastWorker is the spawn target for the broadcast domain. Owns a
  18  // registry of connection subscriptions and performs fan-out filter matching
  19  // so the net domain's epoll loop isn't blocked by O(N*M*F) work per event.
  20  //
  21  // All parent→child messages go through a single `in` channel to avoid the
  22  // Moxie pipe channel select limitation. SubOpBcast carries broadcast events;
  23  // other SubOp* values carry conn/sub lifecycle updates.
  24  func BroadcastWorker(in chan SubCommand, frames chan BroadcastFrame) {
  25  	conns := map[int32]*bcastConnState{}
  26  	for {
  27  		cmd, ok := <-in
  28  		if !ok {
  29  			return
  30  		}
  31  		if cmd.Op == SubOpBcast {
  32  			bcastHandleReq(cmd, conns, frames)
  33  		} else {
  34  			bcastHandleCmd(cmd, conns)
  35  		}
  36  	}
  37  }
  38  
  39  func bcastHandleCmd(cmd SubCommand, conns map[int32]*bcastConnState) {
  40  	switch cmd.Op {
  41  	case SubOpNew:
  42  		conns[cmd.ConnFD] = &bcastConnState{
  43  			whitelisted: cmd.Flags&1 != 0,
  44  			subs:        map[string]filter.S{},
  45  		}
  46  	case SubOpAdd:
  47  		c := conns[cmd.ConnFD]
  48  		if c == nil {
  49  			return
  50  		}
  51  		_, rem, _ := envelope.Identify(cmd.Bytes)
  52  		var req envelope.Req
  53  		if _, err := req.Unmarshal(rem); err != nil || req.Filters == nil {
  54  			return
  55  		}
  56  		c.subs[string(cmd.SubID)] = filter.S(*req.Filters)
  57  	case SubOpRemove:
  58  		c := conns[cmd.ConnFD]
  59  		if c == nil {
  60  			return
  61  		}
  62  		delete(c.subs, string(cmd.SubID))
  63  	case SubOpClose:
  64  		delete(conns, cmd.ConnFD)
  65  	case SubOpAuth:
  66  		c := conns[cmd.ConnFD]
  67  		if c == nil {
  68  			return
  69  		}
  70  		c.authed = true
  71  		if len(cmd.Bytes) == 32 {
  72  			copy(c.authedPubkey[:], cmd.Bytes)
  73  		}
  74  	}
  75  }
  76  
  77  // bcastHandleReq fans out an accepted event to all matching subscriptions.
  78  // ConnFD = senderFD to exclude, Flags = filter flags, Bytes = raw EVENT JSON.
  79  func bcastHandleReq(cmd SubCommand, conns map[int32]*bcastConnState, frames chan BroadcastFrame) {
  80  	senderFD := cmd.ConnFD
  81  	needFilter := cmd.Flags&1 != 0
  82  	nip70 := cmd.Flags&2 != 0
  83  	marmotOpen := cmd.Flags&4 != 0
  84  
  85  	_, rem, _ := envelope.Identify(cmd.Bytes)
  86  	var es envelope.EventSubmission
  87  	if _, err := es.Unmarshal(rem); err != nil || es.E == nil {
  88  		return
  89  	}
  90  	ev := es.E
  91  
  92  	scratch := []byte{:0:ev.EstimateSize() + 128}
  93  	er := &envelope.EventResult{Event: ev}
  94  	for connFD, c := range conns {
  95  		if connFD == senderFD {
  96  			continue
  97  		}
  98  		if needFilter && !c.whitelisted && !access.CanSee(c.authed, c.authedPubkey[:], ev, nip70, marmotOpen) {
  99  			continue
 100  		}
 101  		for subID, filters := range c.subs {
 102  			if filters.Match(ev) {
 103  				er.Subscription = []byte(subID)
 104  				marshaled := er.Marshal(scratch[:0])
 105  				out := []byte{:len(marshaled)}
 106  				copy(out, marshaled)
 107  				frames <- BroadcastFrame{ConnFD: connFD, Bytes: out}
 108  			}
 109  		}
 110  	}
 111  }
 112