// Package wire defines the channel-IPC types crossing spawn boundaries // in the relay's parallel ingest pipeline. Each type implements moxie.Codec // so the runtime serializes the actual byte payload (not just the slice // header) across fork+socketpair pipes. // // Ingest topology: // net (parent) ─→ workerIn[i] ─→ worker domain (Stage A) // worker domain ─→ workerOut[i] ─→ net (parent) ─→ Stage B in-process package wire import ( "encoding/binary" "io" ) // IngestRequest carries one EVENT envelope's raw bytes + a reqID // the net domain uses to correlate responses back to the originating // connection. type IngestRequest struct { ReqID uint32 Bytes []byte } func (r IngestRequest) EncodeTo(w io.Writer) error { var hdr [8]byte binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID) binary.LittleEndian.PutUint32(hdr[4:8], uint32(len(r.Bytes))) if _, err := w.Write(hdr[:]); err != nil { return err } if len(r.Bytes) > 0 { _, err := w.Write(r.Bytes) return err } return nil } func (r *IngestRequest) DecodeFrom(rd io.Reader) error { var hdr [8]byte if _, err := io.ReadFull(rd, hdr[:]); err != nil { return err } r.ReqID = binary.LittleEndian.Uint32(hdr[0:4]) l := binary.LittleEndian.Uint32(hdr[4:8]) if l == 0 { r.Bytes = nil return nil } r.Bytes = []byte{:l} _, err := io.ReadFull(rd, r.Bytes) return err } // IngestResponse carries Stage-A verdict back from worker to net. If // Verdict == VerdictAccept, the net domain runs Stage B (dedup/save) // using the parsed metadata. Otherwise Reason carries the human-readable // rejection. // // Bytes carries the original raw event payload back so net can hand it // straight to the WAL on commit without re-parsing or re-serializing. type IngestResponse struct { ReqID uint32 Verdict uint8 // VerdictAccept | VerdictReject | VerdictEphemeral Kind uint16 CreatedAt int64 Pubkey [32]byte EventID [32]byte Bytes []byte // raw event JSON (echoed back from request) Reason []byte // populated only when Verdict == VerdictReject } const ( VerdictReject uint8 = 0 VerdictAccept uint8 = 1 VerdictEphemeral uint8 = 2 // accept-but-don't-store path ) func (r IngestResponse) EncodeTo(w io.Writer) error { var hdr [80]byte binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID) hdr[4] = r.Verdict binary.LittleEndian.PutUint16(hdr[5:7], r.Kind) binary.LittleEndian.PutUint64(hdr[7:15], uint64(r.CreatedAt)) copy(hdr[15:47], r.Pubkey[:]) copy(hdr[47:79], r.EventID[:]) hdr[79] = 0 // reserved if _, err := w.Write(hdr[:]); err != nil { return err } var lens [8]byte binary.LittleEndian.PutUint32(lens[0:4], uint32(len(r.Bytes))) binary.LittleEndian.PutUint32(lens[4:8], uint32(len(r.Reason))) if _, err := w.Write(lens[:]); err != nil { return err } if len(r.Bytes) > 0 { if _, err := w.Write(r.Bytes); err != nil { return err } } if len(r.Reason) > 0 { if _, err := w.Write(r.Reason); err != nil { return err } } return nil } func (r *IngestResponse) DecodeFrom(rd io.Reader) error { var hdr [80]byte if _, err := io.ReadFull(rd, hdr[:]); err != nil { return err } r.ReqID = binary.LittleEndian.Uint32(hdr[0:4]) r.Verdict = hdr[4] r.Kind = binary.LittleEndian.Uint16(hdr[5:7]) r.CreatedAt = int64(binary.LittleEndian.Uint64(hdr[7:15])) copy(r.Pubkey[:], hdr[15:47]) copy(r.EventID[:], hdr[47:79]) var lens [8]byte if _, err := io.ReadFull(rd, lens[:]); err != nil { return err } bl := binary.LittleEndian.Uint32(lens[0:4]) rl := binary.LittleEndian.Uint32(lens[4:8]) if bl > 0 { r.Bytes = []byte{:bl} if _, err := io.ReadFull(rd, r.Bytes); err != nil { return err } } else { r.Bytes = nil } if rl > 0 { r.Reason = []byte{:rl} if _, err := io.ReadFull(rd, r.Reason); err != nil { return err } } else { r.Reason = nil } return nil } // --- broadcast domain wire types --- const ( SubOpNew uint8 = 1 // new WS connection SubOpAdd uint8 = 2 // REQ: register subscription SubOpRemove uint8 = 3 // CLOSE: deregister subscription SubOpClose uint8 = 4 // conn closed SubOpAuth uint8 = 5 // conn authed (Bytes = 32-byte pubkey) SubOpBcast uint8 = 10 // broadcast event; reuses ConnFD=senderFD, Flags=bcast-flags, Bytes=raw EVENT JSON ) // SubCommand updates the broadcast domain's conn/sub registry. // hdr: [4]connFD + [1]op + [1]flags + [4]subIDLen + [4]bytesLen = 14 bytes // Flags for SubOpNew: bit0=whitelisted. type SubCommand struct { Op uint8 ConnFD int32 Flags uint8 SubID []byte Bytes []byte // SubOpAdd: raw REQ bytes; SubOpAuth: 32-byte pubkey } func (c SubCommand) EncodeTo(w io.Writer) error { var hdr [14]byte binary.LittleEndian.PutUint32(hdr[0:4], uint32(c.ConnFD)) hdr[4] = c.Op hdr[5] = c.Flags binary.LittleEndian.PutUint32(hdr[6:10], uint32(len(c.SubID))) binary.LittleEndian.PutUint32(hdr[10:14], uint32(len(c.Bytes))) if _, err := w.Write(hdr[:]); err != nil { return err } if len(c.SubID) > 0 { if _, err := w.Write(c.SubID); err != nil { return err } } if len(c.Bytes) > 0 { _, err := w.Write(c.Bytes) return err } return nil } func (c *SubCommand) DecodeFrom(r io.Reader) error { var hdr [14]byte if _, err := io.ReadFull(r, hdr[:]); err != nil { return err } c.ConnFD = int32(binary.LittleEndian.Uint32(hdr[0:4])) c.Op = hdr[4] c.Flags = hdr[5] sl := binary.LittleEndian.Uint32(hdr[6:10]) bl := binary.LittleEndian.Uint32(hdr[10:14]) if sl > 0 { c.SubID = []byte{:sl} if _, err := io.ReadFull(r, c.SubID); err != nil { return err } } else { c.SubID = nil } if bl > 0 { c.Bytes = []byte{:bl} if _, err := io.ReadFull(r, c.Bytes); err != nil { return err } } else { c.Bytes = nil } return nil } // BroadcastRequest asks the broadcast domain to fan out an accepted event. // Flags: bit0=needFilter, bit1=nip70Enforce, bit2=marmotOpen. type BroadcastRequest struct { SenderFD int32 Flags uint8 Bytes []byte // raw EVENT envelope JSON } func (r BroadcastRequest) EncodeTo(w io.Writer) error { var hdr [9]byte binary.LittleEndian.PutUint32(hdr[0:4], uint32(r.SenderFD)) hdr[4] = r.Flags binary.LittleEndian.PutUint32(hdr[5:9], uint32(len(r.Bytes))) if _, err := w.Write(hdr[:]); err != nil { return err } if len(r.Bytes) > 0 { _, err := w.Write(r.Bytes) return err } return nil } func (r *BroadcastRequest) DecodeFrom(rd io.Reader) error { var hdr [9]byte if _, err := io.ReadFull(rd, hdr[:]); err != nil { return err } r.SenderFD = int32(binary.LittleEndian.Uint32(hdr[0:4])) r.Flags = hdr[4] bl := binary.LittleEndian.Uint32(hdr[5:9]) if bl > 0 { r.Bytes = []byte{:bl} _, err := io.ReadFull(rd, r.Bytes) return err } r.Bytes = nil return nil } // BroadcastFrame is one EventResult JSON payload the parent should WS-frame // and write to ConnFD. type BroadcastFrame struct { ConnFD int32 Bytes []byte } func (f BroadcastFrame) EncodeTo(w io.Writer) error { var hdr [8]byte binary.LittleEndian.PutUint32(hdr[0:4], uint32(f.ConnFD)) binary.LittleEndian.PutUint32(hdr[4:8], uint32(len(f.Bytes))) if _, err := w.Write(hdr[:]); err != nil { return err } if len(f.Bytes) > 0 { _, err := w.Write(f.Bytes) return err } return nil } func (f *BroadcastFrame) DecodeFrom(r io.Reader) error { var hdr [8]byte if _, err := io.ReadFull(r, hdr[:]); err != nil { return err } f.ConnFD = int32(binary.LittleEndian.Uint32(hdr[0:4])) bl := binary.LittleEndian.Uint32(hdr[4:8]) if bl > 0 { f.Bytes = []byte{:bl} _, err := io.ReadFull(r, f.Bytes) return err } f.Bytes = nil return nil } // --- proxy domain wire types --- // ProxyRequest asks the proxy domain to fetch a URL. // hdr: [4]ReqID + [4]MaxBytes + [4]URLLen = 12 bytes. type ProxyRequest struct { ReqID uint32 MaxBytes uint32 URL []byte } func (r ProxyRequest) EncodeTo(w io.Writer) error { var hdr [12]byte binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID) binary.LittleEndian.PutUint32(hdr[4:8], r.MaxBytes) binary.LittleEndian.PutUint32(hdr[8:12], uint32(len(r.URL))) if _, err := w.Write(hdr[:]); err != nil { return err } if len(r.URL) > 0 { _, err := w.Write(r.URL) return err } return nil } func (r *ProxyRequest) DecodeFrom(rd io.Reader) error { var hdr [12]byte if _, err := io.ReadFull(rd, hdr[:]); err != nil { return err } r.ReqID = binary.LittleEndian.Uint32(hdr[0:4]) r.MaxBytes = binary.LittleEndian.Uint32(hdr[4:8]) ul := binary.LittleEndian.Uint32(hdr[8:12]) if ul > 0 { r.URL = []byte{:ul} _, err := io.ReadFull(rd, r.URL) return err } r.URL = nil return nil } // ProxyResponse carries the fetch result back to the net domain. // hdr: [4]ReqID + [4]Status + [4]CTLen + [4]BodyLen + [4]ErrLen = 20 bytes. // Status < 0 means a fetch error (Err is populated, Content-Type and Body are empty). type ProxyResponse struct { ReqID uint32 Status int32 ContentType []byte Body []byte Err []byte } func (r ProxyResponse) EncodeTo(w io.Writer) error { var hdr [20]byte binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID) binary.LittleEndian.PutUint32(hdr[4:8], uint32(r.Status)) binary.LittleEndian.PutUint32(hdr[8:12], uint32(len(r.ContentType))) binary.LittleEndian.PutUint32(hdr[12:16], uint32(len(r.Body))) binary.LittleEndian.PutUint32(hdr[16:20], uint32(len(r.Err))) if _, err := w.Write(hdr[:]); err != nil { return err } if len(r.ContentType) > 0 { if _, err := w.Write(r.ContentType); err != nil { return err } } if len(r.Body) > 0 { if _, err := w.Write(r.Body); err != nil { return err } } if len(r.Err) > 0 { _, err := w.Write(r.Err) return err } return nil } func (r *ProxyResponse) DecodeFrom(rd io.Reader) error { var hdr [20]byte if _, err := io.ReadFull(rd, hdr[:]); err != nil { return err } r.ReqID = binary.LittleEndian.Uint32(hdr[0:4]) r.Status = int32(binary.LittleEndian.Uint32(hdr[4:8])) ctl := binary.LittleEndian.Uint32(hdr[8:12]) bl := binary.LittleEndian.Uint32(hdr[12:16]) el := binary.LittleEndian.Uint32(hdr[16:20]) if ctl > 0 { r.ContentType = []byte{:ctl} if _, err := io.ReadFull(rd, r.ContentType); err != nil { return err } } else { r.ContentType = nil } if bl > 0 { r.Body = []byte{:bl} if _, err := io.ReadFull(rd, r.Body); err != nil { return err } } else { r.Body = nil } if el > 0 { r.Err = []byte{:el} _, err := io.ReadFull(rd, r.Err) return err } r.Err = nil return nil } // --- blossom domain wire types --- // BlossomRequest asks the blossom domain to handle one HTTP request. // Dir is the blob storage directory (sent with every request; worker caches // the blossom.Server after first use — avoids a non-channel spawn argument). // Upstream is an optional CORS-proxy fallback URL prefix (e.g. // "https://smesh.lol"). When set and a local blob is missing, the worker // fetches /<.ext> and serves that result. // hdr: [4]ReqID + [4]DirLen + [4]MethodLen + [4]PathLen + [4]CTLen + [4]BodyLen + [4]UpstreamLen = 28 bytes. type BlossomRequest struct { ReqID uint32 Dir []byte Method []byte Path []byte ContentType []byte Body []byte Upstream []byte } func (r BlossomRequest) EncodeTo(w io.Writer) error { var hdr [28]byte binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID) binary.LittleEndian.PutUint32(hdr[4:8], uint32(len(r.Dir))) binary.LittleEndian.PutUint32(hdr[8:12], uint32(len(r.Method))) binary.LittleEndian.PutUint32(hdr[12:16], uint32(len(r.Path))) binary.LittleEndian.PutUint32(hdr[16:20], uint32(len(r.ContentType))) binary.LittleEndian.PutUint32(hdr[20:24], uint32(len(r.Body))) binary.LittleEndian.PutUint32(hdr[24:28], uint32(len(r.Upstream))) if _, err := w.Write(hdr[:]); err != nil { return err } for _, b := range [][]byte{r.Dir, r.Method, r.Path, r.ContentType, r.Body, r.Upstream} { if len(b) > 0 { if _, err := w.Write(b); err != nil { return err } } } return nil } func (r *BlossomRequest) DecodeFrom(rd io.Reader) error { var hdr [28]byte if _, err := io.ReadFull(rd, hdr[:]); err != nil { return err } r.ReqID = binary.LittleEndian.Uint32(hdr[0:4]) dl := binary.LittleEndian.Uint32(hdr[4:8]) ml := binary.LittleEndian.Uint32(hdr[8:12]) pl := binary.LittleEndian.Uint32(hdr[12:16]) ctl := binary.LittleEndian.Uint32(hdr[16:20]) bl := binary.LittleEndian.Uint32(hdr[20:24]) ul := binary.LittleEndian.Uint32(hdr[24:28]) readSlice := func(n uint32) ([]byte, error) { if n == 0 { return nil, nil } b := []byte{:n} _, err := io.ReadFull(rd, b) return b, err } var err error if r.Dir, err = readSlice(dl); err != nil { return err } if r.Method, err = readSlice(ml); err != nil { return err } if r.Path, err = readSlice(pl); err != nil { return err } if r.ContentType, err = readSlice(ctl); err != nil { return err } if r.Body, err = readSlice(bl); err != nil { return err } r.Upstream, err = readSlice(ul) return err } // BlossomResponse carries the result back to the net domain. // hdr: [4]ReqID + [4]Status + [8]Size + [4]CTLen + [4]BodyLen = 24 bytes. // Size is the file size for HEAD responses (Content-Length); 0 otherwise. type BlossomResponse struct { ReqID uint32 Status int32 Size int64 CT []byte Body []byte } func (r BlossomResponse) EncodeTo(w io.Writer) error { var hdr [24]byte binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID) binary.LittleEndian.PutUint32(hdr[4:8], uint32(r.Status)) binary.LittleEndian.PutUint64(hdr[8:16], uint64(r.Size)) binary.LittleEndian.PutUint32(hdr[16:20], uint32(len(r.CT))) binary.LittleEndian.PutUint32(hdr[20:24], uint32(len(r.Body))) if _, err := w.Write(hdr[:]); err != nil { return err } if len(r.CT) > 0 { if _, err := w.Write(r.CT); err != nil { return err } } if len(r.Body) > 0 { _, err := w.Write(r.Body) return err } return nil } func (r *BlossomResponse) DecodeFrom(rd io.Reader) error { var hdr [24]byte if _, err := io.ReadFull(rd, hdr[:]); err != nil { return err } r.ReqID = binary.LittleEndian.Uint32(hdr[0:4]) r.Status = int32(binary.LittleEndian.Uint32(hdr[4:8])) r.Size = int64(binary.LittleEndian.Uint64(hdr[8:16])) ctl := binary.LittleEndian.Uint32(hdr[16:20]) bl := binary.LittleEndian.Uint32(hdr[20:24]) if ctl > 0 { r.CT = []byte{:ctl} if _, err := io.ReadFull(rd, r.CT); err != nil { return err } } else { r.CT = nil } if bl > 0 { r.Body = []byte{:bl} _, err := io.ReadFull(rd, r.Body) return err } r.Body = nil return nil }