package wire import ( "smesh.lol/pkg/nostr/envelope" "smesh.lol/pkg/nostr/event" "smesh.lol/pkg/nostr/kind" "smesh.lol/pkg/relay/pipeline" ) // IngestWorker is the spawn target for a Stage-A ingest worker. Reads // IngestRequest frames, runs schema validation + sig verify + ephemeral // fast-reject, writes IngestResponse frames. Loops forever. // // Stage A is intentionally narrow: no I/O, no shared state. The parent's // net domain owns Stage B (dedup, replaceable resolution, WAL append). // Workers are pure compute, parallelizable across CPUs via fork. func IngestWorker(in chan IngestRequest, out chan IngestResponse) { for { req, ok := <-in if !ok { return } out <- ingestProcessOne(req) } } func ingestProcessOne(req IngestRequest) IngestResponse { resp := IngestResponse{ ReqID: req.ReqID, Bytes: req.Bytes, } label, rem, _ := envelope.Identify(req.Bytes) if label != envelope.EventLabel { resp.Verdict = VerdictReject resp.Reason = []byte("invalid: not an EVENT envelope") return resp } var es envelope.EventSubmission if _, err := es.Unmarshal(rem); err != nil || es.E == nil { resp.Verdict = VerdictReject resp.Reason = []byte("invalid: malformed EVENT") return resp } ev := es.E if r := pipeline.StageA(ev); r != nil { resp.Verdict = VerdictReject resp.Reason = r.Reason return resp } resp.Kind = ev.Kind resp.CreatedAt = ev.CreatedAt copy(resp.Pubkey[:], ev.Pubkey) copy(resp.EventID[:], ev.ID) if kind.IsEphemeral(ev.Kind) { resp.Verdict = VerdictEphemeral return resp } resp.Verdict = VerdictAccept return resp } // keep event.E imported for type-checking stability var _ = (*event.E)(nil)