wire.mx raw
1 // Package wire defines the channel-IPC types crossing spawn boundaries
2 // in the relay's parallel ingest pipeline. Each type implements moxie.Codec
3 // so the runtime serializes the actual byte payload (not just the slice
4 // header) across fork+socketpair pipes.
5 //
6 // Ingest topology:
7 // net (parent) ─→ workerIn[i] ─→ worker domain (Stage A)
8 // worker domain ─→ workerOut[i] ─→ net (parent) ─→ Stage B in-process
9 package wire
10
11 import (
12 "encoding/binary"
13 "io"
14 )
15
16 // IngestRequest carries one EVENT envelope's raw bytes + a reqID
17 // the net domain uses to correlate responses back to the originating
18 // connection.
19 type IngestRequest struct {
20 ReqID uint32
21 Bytes []byte
22 }
23
24 func (r IngestRequest) EncodeTo(w io.Writer) error {
25 var hdr [8]byte
26 binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID)
27 binary.LittleEndian.PutUint32(hdr[4:8], uint32(len(r.Bytes)))
28 if _, err := w.Write(hdr[:]); err != nil {
29 return err
30 }
31 if len(r.Bytes) > 0 {
32 _, err := w.Write(r.Bytes)
33 return err
34 }
35 return nil
36 }
37
38 func (r *IngestRequest) DecodeFrom(rd io.Reader) error {
39 var hdr [8]byte
40 if _, err := io.ReadFull(rd, hdr[:]); err != nil {
41 return err
42 }
43 r.ReqID = binary.LittleEndian.Uint32(hdr[0:4])
44 l := binary.LittleEndian.Uint32(hdr[4:8])
45 if l == 0 {
46 r.Bytes = nil
47 return nil
48 }
49 r.Bytes = []byte{:l}
50 _, err := io.ReadFull(rd, r.Bytes)
51 return err
52 }
53
54 // IngestResponse carries Stage-A verdict back from worker to net. If
55 // Verdict == VerdictAccept, the net domain runs Stage B (dedup/save)
56 // using the parsed metadata. Otherwise Reason carries the human-readable
57 // rejection.
58 //
59 // Bytes carries the original raw event payload back so net can hand it
60 // straight to the WAL on commit without re-parsing or re-serializing.
61 type IngestResponse struct {
62 ReqID uint32
63 Verdict uint8 // VerdictAccept | VerdictReject | VerdictEphemeral
64 Kind uint16
65 CreatedAt int64
66 Pubkey [32]byte
67 EventID [32]byte
68 Bytes []byte // raw event JSON (echoed back from request)
69 Reason []byte // populated only when Verdict == VerdictReject
70 }
71
72 const (
73 VerdictReject uint8 = 0
74 VerdictAccept uint8 = 1
75 VerdictEphemeral uint8 = 2 // accept-but-don't-store path
76 )
77
78 func (r IngestResponse) EncodeTo(w io.Writer) error {
79 var hdr [80]byte
80 binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID)
81 hdr[4] = r.Verdict
82 binary.LittleEndian.PutUint16(hdr[5:7], r.Kind)
83 binary.LittleEndian.PutUint64(hdr[7:15], uint64(r.CreatedAt))
84 copy(hdr[15:47], r.Pubkey[:])
85 copy(hdr[47:79], r.EventID[:])
86 hdr[79] = 0 // reserved
87 if _, err := w.Write(hdr[:]); err != nil {
88 return err
89 }
90 var lens [8]byte
91 binary.LittleEndian.PutUint32(lens[0:4], uint32(len(r.Bytes)))
92 binary.LittleEndian.PutUint32(lens[4:8], uint32(len(r.Reason)))
93 if _, err := w.Write(lens[:]); err != nil {
94 return err
95 }
96 if len(r.Bytes) > 0 {
97 if _, err := w.Write(r.Bytes); err != nil {
98 return err
99 }
100 }
101 if len(r.Reason) > 0 {
102 if _, err := w.Write(r.Reason); err != nil {
103 return err
104 }
105 }
106 return nil
107 }
108
109 func (r *IngestResponse) DecodeFrom(rd io.Reader) error {
110 var hdr [80]byte
111 if _, err := io.ReadFull(rd, hdr[:]); err != nil {
112 return err
113 }
114 r.ReqID = binary.LittleEndian.Uint32(hdr[0:4])
115 r.Verdict = hdr[4]
116 r.Kind = binary.LittleEndian.Uint16(hdr[5:7])
117 r.CreatedAt = int64(binary.LittleEndian.Uint64(hdr[7:15]))
118 copy(r.Pubkey[:], hdr[15:47])
119 copy(r.EventID[:], hdr[47:79])
120 var lens [8]byte
121 if _, err := io.ReadFull(rd, lens[:]); err != nil {
122 return err
123 }
124 bl := binary.LittleEndian.Uint32(lens[0:4])
125 rl := binary.LittleEndian.Uint32(lens[4:8])
126 if bl > 0 {
127 r.Bytes = []byte{:bl}
128 if _, err := io.ReadFull(rd, r.Bytes); err != nil {
129 return err
130 }
131 } else {
132 r.Bytes = nil
133 }
134 if rl > 0 {
135 r.Reason = []byte{:rl}
136 if _, err := io.ReadFull(rd, r.Reason); err != nil {
137 return err
138 }
139 } else {
140 r.Reason = nil
141 }
142 return nil
143 }
144
145 // --- broadcast domain wire types ---
146
147 const (
148 SubOpNew uint8 = 1 // new WS connection
149 SubOpAdd uint8 = 2 // REQ: register subscription
150 SubOpRemove uint8 = 3 // CLOSE: deregister subscription
151 SubOpClose uint8 = 4 // conn closed
152 SubOpAuth uint8 = 5 // conn authed (Bytes = 32-byte pubkey)
153 SubOpBcast uint8 = 10 // broadcast event; reuses ConnFD=senderFD, Flags=bcast-flags, Bytes=raw EVENT JSON
154 )
155
156 // SubCommand updates the broadcast domain's conn/sub registry.
157 // hdr: [4]connFD + [1]op + [1]flags + [4]subIDLen + [4]bytesLen = 14 bytes
158 // Flags for SubOpNew: bit0=whitelisted.
159 type SubCommand struct {
160 Op uint8
161 ConnFD int32
162 Flags uint8
163 SubID []byte
164 Bytes []byte // SubOpAdd: raw REQ bytes; SubOpAuth: 32-byte pubkey
165 }
166
167 func (c SubCommand) EncodeTo(w io.Writer) error {
168 var hdr [14]byte
169 binary.LittleEndian.PutUint32(hdr[0:4], uint32(c.ConnFD))
170 hdr[4] = c.Op
171 hdr[5] = c.Flags
172 binary.LittleEndian.PutUint32(hdr[6:10], uint32(len(c.SubID)))
173 binary.LittleEndian.PutUint32(hdr[10:14], uint32(len(c.Bytes)))
174 if _, err := w.Write(hdr[:]); err != nil {
175 return err
176 }
177 if len(c.SubID) > 0 {
178 if _, err := w.Write(c.SubID); err != nil {
179 return err
180 }
181 }
182 if len(c.Bytes) > 0 {
183 _, err := w.Write(c.Bytes)
184 return err
185 }
186 return nil
187 }
188
189 func (c *SubCommand) DecodeFrom(r io.Reader) error {
190 var hdr [14]byte
191 if _, err := io.ReadFull(r, hdr[:]); err != nil {
192 return err
193 }
194 c.ConnFD = int32(binary.LittleEndian.Uint32(hdr[0:4]))
195 c.Op = hdr[4]
196 c.Flags = hdr[5]
197 sl := binary.LittleEndian.Uint32(hdr[6:10])
198 bl := binary.LittleEndian.Uint32(hdr[10:14])
199 if sl > 0 {
200 c.SubID = []byte{:sl}
201 if _, err := io.ReadFull(r, c.SubID); err != nil {
202 return err
203 }
204 } else {
205 c.SubID = nil
206 }
207 if bl > 0 {
208 c.Bytes = []byte{:bl}
209 if _, err := io.ReadFull(r, c.Bytes); err != nil {
210 return err
211 }
212 } else {
213 c.Bytes = nil
214 }
215 return nil
216 }
217
218 // BroadcastRequest asks the broadcast domain to fan out an accepted event.
219 // Flags: bit0=needFilter, bit1=nip70Enforce, bit2=marmotOpen.
220 type BroadcastRequest struct {
221 SenderFD int32
222 Flags uint8
223 Bytes []byte // raw EVENT envelope JSON
224 }
225
226 func (r BroadcastRequest) EncodeTo(w io.Writer) error {
227 var hdr [9]byte
228 binary.LittleEndian.PutUint32(hdr[0:4], uint32(r.SenderFD))
229 hdr[4] = r.Flags
230 binary.LittleEndian.PutUint32(hdr[5:9], uint32(len(r.Bytes)))
231 if _, err := w.Write(hdr[:]); err != nil {
232 return err
233 }
234 if len(r.Bytes) > 0 {
235 _, err := w.Write(r.Bytes)
236 return err
237 }
238 return nil
239 }
240
241 func (r *BroadcastRequest) DecodeFrom(rd io.Reader) error {
242 var hdr [9]byte
243 if _, err := io.ReadFull(rd, hdr[:]); err != nil {
244 return err
245 }
246 r.SenderFD = int32(binary.LittleEndian.Uint32(hdr[0:4]))
247 r.Flags = hdr[4]
248 bl := binary.LittleEndian.Uint32(hdr[5:9])
249 if bl > 0 {
250 r.Bytes = []byte{:bl}
251 _, err := io.ReadFull(rd, r.Bytes)
252 return err
253 }
254 r.Bytes = nil
255 return nil
256 }
257
258 // BroadcastFrame is one EventResult JSON payload the parent should WS-frame
259 // and write to ConnFD.
260 type BroadcastFrame struct {
261 ConnFD int32
262 Bytes []byte
263 }
264
265 func (f BroadcastFrame) EncodeTo(w io.Writer) error {
266 var hdr [8]byte
267 binary.LittleEndian.PutUint32(hdr[0:4], uint32(f.ConnFD))
268 binary.LittleEndian.PutUint32(hdr[4:8], uint32(len(f.Bytes)))
269 if _, err := w.Write(hdr[:]); err != nil {
270 return err
271 }
272 if len(f.Bytes) > 0 {
273 _, err := w.Write(f.Bytes)
274 return err
275 }
276 return nil
277 }
278
279 func (f *BroadcastFrame) DecodeFrom(r io.Reader) error {
280 var hdr [8]byte
281 if _, err := io.ReadFull(r, hdr[:]); err != nil {
282 return err
283 }
284 f.ConnFD = int32(binary.LittleEndian.Uint32(hdr[0:4]))
285 bl := binary.LittleEndian.Uint32(hdr[4:8])
286 if bl > 0 {
287 f.Bytes = []byte{:bl}
288 _, err := io.ReadFull(r, f.Bytes)
289 return err
290 }
291 f.Bytes = nil
292 return nil
293 }
294
295 // --- proxy domain wire types ---
296
297 // ProxyRequest asks the proxy domain to fetch a URL.
298 // hdr: [4]ReqID + [4]MaxBytes + [4]URLLen = 12 bytes.
299 type ProxyRequest struct {
300 ReqID uint32
301 MaxBytes uint32
302 URL []byte
303 }
304
305 func (r ProxyRequest) EncodeTo(w io.Writer) error {
306 var hdr [12]byte
307 binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID)
308 binary.LittleEndian.PutUint32(hdr[4:8], r.MaxBytes)
309 binary.LittleEndian.PutUint32(hdr[8:12], uint32(len(r.URL)))
310 if _, err := w.Write(hdr[:]); err != nil {
311 return err
312 }
313 if len(r.URL) > 0 {
314 _, err := w.Write(r.URL)
315 return err
316 }
317 return nil
318 }
319
320 func (r *ProxyRequest) DecodeFrom(rd io.Reader) error {
321 var hdr [12]byte
322 if _, err := io.ReadFull(rd, hdr[:]); err != nil {
323 return err
324 }
325 r.ReqID = binary.LittleEndian.Uint32(hdr[0:4])
326 r.MaxBytes = binary.LittleEndian.Uint32(hdr[4:8])
327 ul := binary.LittleEndian.Uint32(hdr[8:12])
328 if ul > 0 {
329 r.URL = []byte{:ul}
330 _, err := io.ReadFull(rd, r.URL)
331 return err
332 }
333 r.URL = nil
334 return nil
335 }
336
337 // ProxyResponse carries the fetch result back to the net domain.
338 // hdr: [4]ReqID + [4]Status + [4]CTLen + [4]BodyLen + [4]ErrLen = 20 bytes.
339 // Status < 0 means a fetch error (Err is populated, Content-Type and Body are empty).
340 type ProxyResponse struct {
341 ReqID uint32
342 Status int32
343 ContentType []byte
344 Body []byte
345 Err []byte
346 }
347
348 func (r ProxyResponse) EncodeTo(w io.Writer) error {
349 var hdr [20]byte
350 binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID)
351 binary.LittleEndian.PutUint32(hdr[4:8], uint32(r.Status))
352 binary.LittleEndian.PutUint32(hdr[8:12], uint32(len(r.ContentType)))
353 binary.LittleEndian.PutUint32(hdr[12:16], uint32(len(r.Body)))
354 binary.LittleEndian.PutUint32(hdr[16:20], uint32(len(r.Err)))
355 if _, err := w.Write(hdr[:]); err != nil {
356 return err
357 }
358 if len(r.ContentType) > 0 {
359 if _, err := w.Write(r.ContentType); err != nil {
360 return err
361 }
362 }
363 if len(r.Body) > 0 {
364 if _, err := w.Write(r.Body); err != nil {
365 return err
366 }
367 }
368 if len(r.Err) > 0 {
369 _, err := w.Write(r.Err)
370 return err
371 }
372 return nil
373 }
374
375 func (r *ProxyResponse) DecodeFrom(rd io.Reader) error {
376 var hdr [20]byte
377 if _, err := io.ReadFull(rd, hdr[:]); err != nil {
378 return err
379 }
380 r.ReqID = binary.LittleEndian.Uint32(hdr[0:4])
381 r.Status = int32(binary.LittleEndian.Uint32(hdr[4:8]))
382 ctl := binary.LittleEndian.Uint32(hdr[8:12])
383 bl := binary.LittleEndian.Uint32(hdr[12:16])
384 el := binary.LittleEndian.Uint32(hdr[16:20])
385 if ctl > 0 {
386 r.ContentType = []byte{:ctl}
387 if _, err := io.ReadFull(rd, r.ContentType); err != nil {
388 return err
389 }
390 } else {
391 r.ContentType = nil
392 }
393 if bl > 0 {
394 r.Body = []byte{:bl}
395 if _, err := io.ReadFull(rd, r.Body); err != nil {
396 return err
397 }
398 } else {
399 r.Body = nil
400 }
401 if el > 0 {
402 r.Err = []byte{:el}
403 _, err := io.ReadFull(rd, r.Err)
404 return err
405 }
406 r.Err = nil
407 return nil
408 }
409
410 // --- blossom domain wire types ---
411
412 // BlossomRequest asks the blossom domain to handle one HTTP request.
413 // Dir is the blob storage directory (sent with every request; worker caches
414 // the blossom.Server after first use — avoids a non-channel spawn argument).
415 // Upstream is an optional CORS-proxy fallback URL prefix (e.g.
416 // "https://smesh.lol"). When set and a local blob is missing, the worker
417 // fetches <Upstream>/<hash><.ext> and serves that result.
418 // hdr: [4]ReqID + [4]DirLen + [4]MethodLen + [4]PathLen + [4]CTLen + [4]BodyLen + [4]UpstreamLen = 28 bytes.
419 type BlossomRequest struct {
420 ReqID uint32
421 Dir []byte
422 Method []byte
423 Path []byte
424 ContentType []byte
425 Body []byte
426 Upstream []byte
427 }
428
429 func (r BlossomRequest) EncodeTo(w io.Writer) error {
430 var hdr [28]byte
431 binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID)
432 binary.LittleEndian.PutUint32(hdr[4:8], uint32(len(r.Dir)))
433 binary.LittleEndian.PutUint32(hdr[8:12], uint32(len(r.Method)))
434 binary.LittleEndian.PutUint32(hdr[12:16], uint32(len(r.Path)))
435 binary.LittleEndian.PutUint32(hdr[16:20], uint32(len(r.ContentType)))
436 binary.LittleEndian.PutUint32(hdr[20:24], uint32(len(r.Body)))
437 binary.LittleEndian.PutUint32(hdr[24:28], uint32(len(r.Upstream)))
438 if _, err := w.Write(hdr[:]); err != nil {
439 return err
440 }
441 for _, b := range [][]byte{r.Dir, r.Method, r.Path, r.ContentType, r.Body, r.Upstream} {
442 if len(b) > 0 {
443 if _, err := w.Write(b); err != nil {
444 return err
445 }
446 }
447 }
448 return nil
449 }
450
451 func (r *BlossomRequest) DecodeFrom(rd io.Reader) error {
452 var hdr [28]byte
453 if _, err := io.ReadFull(rd, hdr[:]); err != nil {
454 return err
455 }
456 r.ReqID = binary.LittleEndian.Uint32(hdr[0:4])
457 dl := binary.LittleEndian.Uint32(hdr[4:8])
458 ml := binary.LittleEndian.Uint32(hdr[8:12])
459 pl := binary.LittleEndian.Uint32(hdr[12:16])
460 ctl := binary.LittleEndian.Uint32(hdr[16:20])
461 bl := binary.LittleEndian.Uint32(hdr[20:24])
462 ul := binary.LittleEndian.Uint32(hdr[24:28])
463 readSlice := func(n uint32) ([]byte, error) {
464 if n == 0 {
465 return nil, nil
466 }
467 b := []byte{:n}
468 _, err := io.ReadFull(rd, b)
469 return b, err
470 }
471 var err error
472 if r.Dir, err = readSlice(dl); err != nil {
473 return err
474 }
475 if r.Method, err = readSlice(ml); err != nil {
476 return err
477 }
478 if r.Path, err = readSlice(pl); err != nil {
479 return err
480 }
481 if r.ContentType, err = readSlice(ctl); err != nil {
482 return err
483 }
484 if r.Body, err = readSlice(bl); err != nil {
485 return err
486 }
487 r.Upstream, err = readSlice(ul)
488 return err
489 }
490
491 // BlossomResponse carries the result back to the net domain.
492 // hdr: [4]ReqID + [4]Status + [8]Size + [4]CTLen + [4]BodyLen = 24 bytes.
493 // Size is the file size for HEAD responses (Content-Length); 0 otherwise.
494 type BlossomResponse struct {
495 ReqID uint32
496 Status int32
497 Size int64
498 CT []byte
499 Body []byte
500 }
501
502 func (r BlossomResponse) EncodeTo(w io.Writer) error {
503 var hdr [24]byte
504 binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID)
505 binary.LittleEndian.PutUint32(hdr[4:8], uint32(r.Status))
506 binary.LittleEndian.PutUint64(hdr[8:16], uint64(r.Size))
507 binary.LittleEndian.PutUint32(hdr[16:20], uint32(len(r.CT)))
508 binary.LittleEndian.PutUint32(hdr[20:24], uint32(len(r.Body)))
509 if _, err := w.Write(hdr[:]); err != nil {
510 return err
511 }
512 if len(r.CT) > 0 {
513 if _, err := w.Write(r.CT); err != nil {
514 return err
515 }
516 }
517 if len(r.Body) > 0 {
518 _, err := w.Write(r.Body)
519 return err
520 }
521 return nil
522 }
523
524 func (r *BlossomResponse) DecodeFrom(rd io.Reader) error {
525 var hdr [24]byte
526 if _, err := io.ReadFull(rd, hdr[:]); err != nil {
527 return err
528 }
529 r.ReqID = binary.LittleEndian.Uint32(hdr[0:4])
530 r.Status = int32(binary.LittleEndian.Uint32(hdr[4:8]))
531 r.Size = int64(binary.LittleEndian.Uint64(hdr[8:16]))
532 ctl := binary.LittleEndian.Uint32(hdr[16:20])
533 bl := binary.LittleEndian.Uint32(hdr[20:24])
534 if ctl > 0 {
535 r.CT = []byte{:ctl}
536 if _, err := io.ReadFull(rd, r.CT); err != nil {
537 return err
538 }
539 } else {
540 r.CT = nil
541 }
542 if bl > 0 {
543 r.Body = []byte{:bl}
544 _, err := io.ReadFull(rd, r.Body)
545 return err
546 }
547 r.Body = nil
548 return nil
549 }
550
551