ingest_worker.mx raw
1 package wire
2
3 import (
4 "smesh.lol/pkg/nostr/envelope"
5 "smesh.lol/pkg/nostr/event"
6 "smesh.lol/pkg/nostr/kind"
7 "smesh.lol/pkg/relay/pipeline"
8 )
9
10 // IngestWorker is the spawn target for a Stage-A ingest worker. Reads
11 // IngestRequest frames, runs schema validation + sig verify + ephemeral
12 // fast-reject, writes IngestResponse frames. Loops forever.
13 //
14 // Stage A is intentionally narrow: no I/O, no shared state. The parent's
15 // net domain owns Stage B (dedup, replaceable resolution, WAL append).
16 // Workers are pure compute, parallelizable across CPUs via fork.
17 func IngestWorker(in chan IngestRequest, out chan IngestResponse) {
18 for {
19 req, ok := <-in
20 if !ok {
21 return
22 }
23 out <- ingestProcessOne(req)
24 }
25 }
26
27 func ingestProcessOne(req IngestRequest) IngestResponse {
28 resp := IngestResponse{
29 ReqID: req.ReqID,
30 Bytes: req.Bytes,
31 }
32 label, rem, _ := envelope.Identify(req.Bytes)
33 if label != envelope.EventLabel {
34 resp.Verdict = VerdictReject
35 resp.Reason = []byte("invalid: not an EVENT envelope")
36 return resp
37 }
38 var es envelope.EventSubmission
39 if _, err := es.Unmarshal(rem); err != nil || es.E == nil {
40 resp.Verdict = VerdictReject
41 resp.Reason = []byte("invalid: malformed EVENT")
42 return resp
43 }
44 ev := es.E
45
46 if r := pipeline.StageA(ev); r != nil {
47 resp.Verdict = VerdictReject
48 resp.Reason = r.Reason
49 return resp
50 }
51
52 resp.Kind = ev.Kind
53 resp.CreatedAt = ev.CreatedAt
54 copy(resp.Pubkey[:], ev.Pubkey)
55 copy(resp.EventID[:], ev.ID)
56
57 if kind.IsEphemeral(ev.Kind) {
58 resp.Verdict = VerdictEphemeral
59 return resp
60 }
61
62 resp.Verdict = VerdictAccept
63 return resp
64 }
65
66 // keep event.E imported for type-checking stability
67 var _ = (*event.E)(nil)
68