ingest_worker.mx raw

   1  package wire
   2  
   3  import (
   4  	"smesh.lol/pkg/nostr/envelope"
   5  	"smesh.lol/pkg/nostr/event"
   6  	"smesh.lol/pkg/nostr/kind"
   7  	"smesh.lol/pkg/relay/pipeline"
   8  )
   9  
  10  // IngestWorker is the spawn target for a Stage-A ingest worker. Reads
  11  // IngestRequest frames, runs schema validation + sig verify + ephemeral
  12  // fast-reject, writes IngestResponse frames. Loops forever.
  13  //
  14  // Stage A is intentionally narrow: no I/O, no shared state. The parent's
  15  // net domain owns Stage B (dedup, replaceable resolution, WAL append).
  16  // Workers are pure compute, parallelizable across CPUs via fork.
  17  func IngestWorker(in chan IngestRequest, out chan IngestResponse) {
  18  	for {
  19  		req, ok := <-in
  20  		if !ok {
  21  			return
  22  		}
  23  		out <- ingestProcessOne(req)
  24  	}
  25  }
  26  
  27  func ingestProcessOne(req IngestRequest) IngestResponse {
  28  	resp := IngestResponse{
  29  		ReqID: req.ReqID,
  30  		Bytes: req.Bytes,
  31  	}
  32  	label, rem, _ := envelope.Identify(req.Bytes)
  33  	if label != envelope.EventLabel {
  34  		resp.Verdict = VerdictReject
  35  		resp.Reason = []byte("invalid: not an EVENT envelope")
  36  		return resp
  37  	}
  38  	var es envelope.EventSubmission
  39  	if _, err := es.Unmarshal(rem); err != nil || es.E == nil {
  40  		resp.Verdict = VerdictReject
  41  		resp.Reason = []byte("invalid: malformed EVENT")
  42  		return resp
  43  	}
  44  	ev := es.E
  45  
  46  	if r := pipeline.StageA(ev); r != nil {
  47  		resp.Verdict = VerdictReject
  48  		resp.Reason = r.Reason
  49  		return resp
  50  	}
  51  
  52  	resp.Kind = ev.Kind
  53  	resp.CreatedAt = ev.CreatedAt
  54  	copy(resp.Pubkey[:], ev.Pubkey)
  55  	copy(resp.EventID[:], ev.ID)
  56  
  57  	if kind.IsEphemeral(ev.Kind) {
  58  		resp.Verdict = VerdictEphemeral
  59  		return resp
  60  	}
  61  
  62  	resp.Verdict = VerdictAccept
  63  	return resp
  64  }
  65  
  66  // keep event.E imported for type-checking stability
  67  var _ = (*event.E)(nil)
  68