wire.mx raw

   1  // Package wire defines the channel-IPC types crossing spawn boundaries
   2  // in the relay's parallel ingest pipeline. Each type implements moxie.Codec
   3  // so the runtime serializes the actual byte payload (not just the slice
   4  // header) across fork+socketpair pipes.
   5  //
   6  // Ingest topology:
   7  //   net (parent) ─→ workerIn[i] ─→ worker domain (Stage A)
   8  //   worker domain ─→ workerOut[i] ─→ net (parent) ─→ Stage B in-process
   9  package wire
  10  
  11  import (
  12  	"encoding/binary"
  13  	"io"
  14  )
  15  
  16  // IngestRequest carries one EVENT envelope's raw bytes + a reqID
  17  // the net domain uses to correlate responses back to the originating
  18  // connection.
  19  type IngestRequest struct {
  20  	ReqID uint32
  21  	Bytes []byte
  22  }
  23  
  24  func (r IngestRequest) EncodeTo(w io.Writer) error {
  25  	var hdr [8]byte
  26  	binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID)
  27  	binary.LittleEndian.PutUint32(hdr[4:8], uint32(len(r.Bytes)))
  28  	if _, err := w.Write(hdr[:]); err != nil {
  29  		return err
  30  	}
  31  	if len(r.Bytes) > 0 {
  32  		_, err := w.Write(r.Bytes)
  33  		return err
  34  	}
  35  	return nil
  36  }
  37  
  38  func (r *IngestRequest) DecodeFrom(rd io.Reader) error {
  39  	var hdr [8]byte
  40  	if _, err := io.ReadFull(rd, hdr[:]); err != nil {
  41  		return err
  42  	}
  43  	r.ReqID = binary.LittleEndian.Uint32(hdr[0:4])
  44  	l := binary.LittleEndian.Uint32(hdr[4:8])
  45  	if l == 0 {
  46  		r.Bytes = nil
  47  		return nil
  48  	}
  49  	r.Bytes = []byte{:l}
  50  	_, err := io.ReadFull(rd, r.Bytes)
  51  	return err
  52  }
  53  
  54  // IngestResponse carries Stage-A verdict back from worker to net. If
  55  // Verdict == VerdictAccept, the net domain runs Stage B (dedup/save)
  56  // using the parsed metadata. Otherwise Reason carries the human-readable
  57  // rejection.
  58  //
  59  // Bytes carries the original raw event payload back so net can hand it
  60  // straight to the WAL on commit without re-parsing or re-serializing.
  61  type IngestResponse struct {
  62  	ReqID     uint32
  63  	Verdict   uint8 // VerdictAccept | VerdictReject | VerdictEphemeral
  64  	Kind      uint16
  65  	CreatedAt int64
  66  	Pubkey    [32]byte
  67  	EventID   [32]byte
  68  	Bytes     []byte // raw event JSON (echoed back from request)
  69  	Reason    []byte // populated only when Verdict == VerdictReject
  70  }
  71  
  72  const (
  73  	VerdictReject    uint8 = 0
  74  	VerdictAccept    uint8 = 1
  75  	VerdictEphemeral uint8 = 2 // accept-but-don't-store path
  76  )
  77  
  78  func (r IngestResponse) EncodeTo(w io.Writer) error {
  79  	var hdr [80]byte
  80  	binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID)
  81  	hdr[4] = r.Verdict
  82  	binary.LittleEndian.PutUint16(hdr[5:7], r.Kind)
  83  	binary.LittleEndian.PutUint64(hdr[7:15], uint64(r.CreatedAt))
  84  	copy(hdr[15:47], r.Pubkey[:])
  85  	copy(hdr[47:79], r.EventID[:])
  86  	hdr[79] = 0 // reserved
  87  	if _, err := w.Write(hdr[:]); err != nil {
  88  		return err
  89  	}
  90  	var lens [8]byte
  91  	binary.LittleEndian.PutUint32(lens[0:4], uint32(len(r.Bytes)))
  92  	binary.LittleEndian.PutUint32(lens[4:8], uint32(len(r.Reason)))
  93  	if _, err := w.Write(lens[:]); err != nil {
  94  		return err
  95  	}
  96  	if len(r.Bytes) > 0 {
  97  		if _, err := w.Write(r.Bytes); err != nil {
  98  			return err
  99  		}
 100  	}
 101  	if len(r.Reason) > 0 {
 102  		if _, err := w.Write(r.Reason); err != nil {
 103  			return err
 104  		}
 105  	}
 106  	return nil
 107  }
 108  
 109  func (r *IngestResponse) DecodeFrom(rd io.Reader) error {
 110  	var hdr [80]byte
 111  	if _, err := io.ReadFull(rd, hdr[:]); err != nil {
 112  		return err
 113  	}
 114  	r.ReqID = binary.LittleEndian.Uint32(hdr[0:4])
 115  	r.Verdict = hdr[4]
 116  	r.Kind = binary.LittleEndian.Uint16(hdr[5:7])
 117  	r.CreatedAt = int64(binary.LittleEndian.Uint64(hdr[7:15]))
 118  	copy(r.Pubkey[:], hdr[15:47])
 119  	copy(r.EventID[:], hdr[47:79])
 120  	var lens [8]byte
 121  	if _, err := io.ReadFull(rd, lens[:]); err != nil {
 122  		return err
 123  	}
 124  	bl := binary.LittleEndian.Uint32(lens[0:4])
 125  	rl := binary.LittleEndian.Uint32(lens[4:8])
 126  	if bl > 0 {
 127  		r.Bytes = []byte{:bl}
 128  		if _, err := io.ReadFull(rd, r.Bytes); err != nil {
 129  			return err
 130  		}
 131  	} else {
 132  		r.Bytes = nil
 133  	}
 134  	if rl > 0 {
 135  		r.Reason = []byte{:rl}
 136  		if _, err := io.ReadFull(rd, r.Reason); err != nil {
 137  			return err
 138  		}
 139  	} else {
 140  		r.Reason = nil
 141  	}
 142  	return nil
 143  }
 144  
 145  // --- broadcast domain wire types ---
 146  
 147  const (
 148  	SubOpNew    uint8 = 1  // new WS connection
 149  	SubOpAdd    uint8 = 2  // REQ: register subscription
 150  	SubOpRemove uint8 = 3  // CLOSE: deregister subscription
 151  	SubOpClose  uint8 = 4  // conn closed
 152  	SubOpAuth   uint8 = 5  // conn authed (Bytes = 32-byte pubkey)
 153  	SubOpBcast  uint8 = 10 // broadcast event; reuses ConnFD=senderFD, Flags=bcast-flags, Bytes=raw EVENT JSON
 154  )
 155  
 156  // SubCommand updates the broadcast domain's conn/sub registry.
 157  // hdr: [4]connFD + [1]op + [1]flags + [4]subIDLen + [4]bytesLen = 14 bytes
 158  // Flags for SubOpNew: bit0=whitelisted.
 159  type SubCommand struct {
 160  	Op     uint8
 161  	ConnFD int32
 162  	Flags  uint8
 163  	SubID  []byte
 164  	Bytes  []byte // SubOpAdd: raw REQ bytes; SubOpAuth: 32-byte pubkey
 165  }
 166  
 167  func (c SubCommand) EncodeTo(w io.Writer) error {
 168  	var hdr [14]byte
 169  	binary.LittleEndian.PutUint32(hdr[0:4], uint32(c.ConnFD))
 170  	hdr[4] = c.Op
 171  	hdr[5] = c.Flags
 172  	binary.LittleEndian.PutUint32(hdr[6:10], uint32(len(c.SubID)))
 173  	binary.LittleEndian.PutUint32(hdr[10:14], uint32(len(c.Bytes)))
 174  	if _, err := w.Write(hdr[:]); err != nil {
 175  		return err
 176  	}
 177  	if len(c.SubID) > 0 {
 178  		if _, err := w.Write(c.SubID); err != nil {
 179  			return err
 180  		}
 181  	}
 182  	if len(c.Bytes) > 0 {
 183  		_, err := w.Write(c.Bytes)
 184  		return err
 185  	}
 186  	return nil
 187  }
 188  
 189  func (c *SubCommand) DecodeFrom(r io.Reader) error {
 190  	var hdr [14]byte
 191  	if _, err := io.ReadFull(r, hdr[:]); err != nil {
 192  		return err
 193  	}
 194  	c.ConnFD = int32(binary.LittleEndian.Uint32(hdr[0:4]))
 195  	c.Op = hdr[4]
 196  	c.Flags = hdr[5]
 197  	sl := binary.LittleEndian.Uint32(hdr[6:10])
 198  	bl := binary.LittleEndian.Uint32(hdr[10:14])
 199  	if sl > 0 {
 200  		c.SubID = []byte{:sl}
 201  		if _, err := io.ReadFull(r, c.SubID); err != nil {
 202  			return err
 203  		}
 204  	} else {
 205  		c.SubID = nil
 206  	}
 207  	if bl > 0 {
 208  		c.Bytes = []byte{:bl}
 209  		if _, err := io.ReadFull(r, c.Bytes); err != nil {
 210  			return err
 211  		}
 212  	} else {
 213  		c.Bytes = nil
 214  	}
 215  	return nil
 216  }
 217  
 218  // BroadcastRequest asks the broadcast domain to fan out an accepted event.
 219  // Flags: bit0=needFilter, bit1=nip70Enforce, bit2=marmotOpen.
 220  type BroadcastRequest struct {
 221  	SenderFD int32
 222  	Flags    uint8
 223  	Bytes    []byte // raw EVENT envelope JSON
 224  }
 225  
 226  func (r BroadcastRequest) EncodeTo(w io.Writer) error {
 227  	var hdr [9]byte
 228  	binary.LittleEndian.PutUint32(hdr[0:4], uint32(r.SenderFD))
 229  	hdr[4] = r.Flags
 230  	binary.LittleEndian.PutUint32(hdr[5:9], uint32(len(r.Bytes)))
 231  	if _, err := w.Write(hdr[:]); err != nil {
 232  		return err
 233  	}
 234  	if len(r.Bytes) > 0 {
 235  		_, err := w.Write(r.Bytes)
 236  		return err
 237  	}
 238  	return nil
 239  }
 240  
 241  func (r *BroadcastRequest) DecodeFrom(rd io.Reader) error {
 242  	var hdr [9]byte
 243  	if _, err := io.ReadFull(rd, hdr[:]); err != nil {
 244  		return err
 245  	}
 246  	r.SenderFD = int32(binary.LittleEndian.Uint32(hdr[0:4]))
 247  	r.Flags = hdr[4]
 248  	bl := binary.LittleEndian.Uint32(hdr[5:9])
 249  	if bl > 0 {
 250  		r.Bytes = []byte{:bl}
 251  		_, err := io.ReadFull(rd, r.Bytes)
 252  		return err
 253  	}
 254  	r.Bytes = nil
 255  	return nil
 256  }
 257  
 258  // BroadcastFrame is one EventResult JSON payload the parent should WS-frame
 259  // and write to ConnFD.
 260  type BroadcastFrame struct {
 261  	ConnFD int32
 262  	Bytes  []byte
 263  }
 264  
 265  func (f BroadcastFrame) EncodeTo(w io.Writer) error {
 266  	var hdr [8]byte
 267  	binary.LittleEndian.PutUint32(hdr[0:4], uint32(f.ConnFD))
 268  	binary.LittleEndian.PutUint32(hdr[4:8], uint32(len(f.Bytes)))
 269  	if _, err := w.Write(hdr[:]); err != nil {
 270  		return err
 271  	}
 272  	if len(f.Bytes) > 0 {
 273  		_, err := w.Write(f.Bytes)
 274  		return err
 275  	}
 276  	return nil
 277  }
 278  
 279  func (f *BroadcastFrame) DecodeFrom(r io.Reader) error {
 280  	var hdr [8]byte
 281  	if _, err := io.ReadFull(r, hdr[:]); err != nil {
 282  		return err
 283  	}
 284  	f.ConnFD = int32(binary.LittleEndian.Uint32(hdr[0:4]))
 285  	bl := binary.LittleEndian.Uint32(hdr[4:8])
 286  	if bl > 0 {
 287  		f.Bytes = []byte{:bl}
 288  		_, err := io.ReadFull(r, f.Bytes)
 289  		return err
 290  	}
 291  	f.Bytes = nil
 292  	return nil
 293  }
 294  
 295  // --- proxy domain wire types ---
 296  
 297  // ProxyRequest asks the proxy domain to fetch a URL.
 298  // hdr: [4]ReqID + [4]MaxBytes + [4]URLLen = 12 bytes.
 299  type ProxyRequest struct {
 300  	ReqID    uint32
 301  	MaxBytes uint32
 302  	URL      []byte
 303  }
 304  
 305  func (r ProxyRequest) EncodeTo(w io.Writer) error {
 306  	var hdr [12]byte
 307  	binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID)
 308  	binary.LittleEndian.PutUint32(hdr[4:8], r.MaxBytes)
 309  	binary.LittleEndian.PutUint32(hdr[8:12], uint32(len(r.URL)))
 310  	if _, err := w.Write(hdr[:]); err != nil {
 311  		return err
 312  	}
 313  	if len(r.URL) > 0 {
 314  		_, err := w.Write(r.URL)
 315  		return err
 316  	}
 317  	return nil
 318  }
 319  
 320  func (r *ProxyRequest) DecodeFrom(rd io.Reader) error {
 321  	var hdr [12]byte
 322  	if _, err := io.ReadFull(rd, hdr[:]); err != nil {
 323  		return err
 324  	}
 325  	r.ReqID = binary.LittleEndian.Uint32(hdr[0:4])
 326  	r.MaxBytes = binary.LittleEndian.Uint32(hdr[4:8])
 327  	ul := binary.LittleEndian.Uint32(hdr[8:12])
 328  	if ul > 0 {
 329  		r.URL = []byte{:ul}
 330  		_, err := io.ReadFull(rd, r.URL)
 331  		return err
 332  	}
 333  	r.URL = nil
 334  	return nil
 335  }
 336  
 337  // ProxyResponse carries the fetch result back to the net domain.
 338  // hdr: [4]ReqID + [4]Status + [4]CTLen + [4]BodyLen + [4]ErrLen = 20 bytes.
 339  // Status < 0 means a fetch error (Err is populated, Content-Type and Body are empty).
 340  type ProxyResponse struct {
 341  	ReqID       uint32
 342  	Status      int32
 343  	ContentType []byte
 344  	Body        []byte
 345  	Err         []byte
 346  }
 347  
 348  func (r ProxyResponse) EncodeTo(w io.Writer) error {
 349  	var hdr [20]byte
 350  	binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID)
 351  	binary.LittleEndian.PutUint32(hdr[4:8], uint32(r.Status))
 352  	binary.LittleEndian.PutUint32(hdr[8:12], uint32(len(r.ContentType)))
 353  	binary.LittleEndian.PutUint32(hdr[12:16], uint32(len(r.Body)))
 354  	binary.LittleEndian.PutUint32(hdr[16:20], uint32(len(r.Err)))
 355  	if _, err := w.Write(hdr[:]); err != nil {
 356  		return err
 357  	}
 358  	if len(r.ContentType) > 0 {
 359  		if _, err := w.Write(r.ContentType); err != nil {
 360  			return err
 361  		}
 362  	}
 363  	if len(r.Body) > 0 {
 364  		if _, err := w.Write(r.Body); err != nil {
 365  			return err
 366  		}
 367  	}
 368  	if len(r.Err) > 0 {
 369  		_, err := w.Write(r.Err)
 370  		return err
 371  	}
 372  	return nil
 373  }
 374  
 375  func (r *ProxyResponse) DecodeFrom(rd io.Reader) error {
 376  	var hdr [20]byte
 377  	if _, err := io.ReadFull(rd, hdr[:]); err != nil {
 378  		return err
 379  	}
 380  	r.ReqID = binary.LittleEndian.Uint32(hdr[0:4])
 381  	r.Status = int32(binary.LittleEndian.Uint32(hdr[4:8]))
 382  	ctl := binary.LittleEndian.Uint32(hdr[8:12])
 383  	bl := binary.LittleEndian.Uint32(hdr[12:16])
 384  	el := binary.LittleEndian.Uint32(hdr[16:20])
 385  	if ctl > 0 {
 386  		r.ContentType = []byte{:ctl}
 387  		if _, err := io.ReadFull(rd, r.ContentType); err != nil {
 388  			return err
 389  		}
 390  	} else {
 391  		r.ContentType = nil
 392  	}
 393  	if bl > 0 {
 394  		r.Body = []byte{:bl}
 395  		if _, err := io.ReadFull(rd, r.Body); err != nil {
 396  			return err
 397  		}
 398  	} else {
 399  		r.Body = nil
 400  	}
 401  	if el > 0 {
 402  		r.Err = []byte{:el}
 403  		_, err := io.ReadFull(rd, r.Err)
 404  		return err
 405  	}
 406  	r.Err = nil
 407  	return nil
 408  }
 409  
 410  // --- blossom domain wire types ---
 411  
 412  // BlossomRequest asks the blossom domain to handle one HTTP request.
 413  // Dir is the blob storage directory (sent with every request; worker caches
 414  // the blossom.Server after first use — avoids a non-channel spawn argument).
 415  // Upstream is an optional CORS-proxy fallback URL prefix (e.g.
 416  // "https://smesh.lol"). When set and a local blob is missing, the worker
 417  // fetches <Upstream>/<hash><.ext> and serves that result.
 418  // hdr: [4]ReqID + [4]DirLen + [4]MethodLen + [4]PathLen + [4]CTLen + [4]BodyLen + [4]UpstreamLen = 28 bytes.
 419  type BlossomRequest struct {
 420  	ReqID       uint32
 421  	Dir         []byte
 422  	Method      []byte
 423  	Path        []byte
 424  	ContentType []byte
 425  	Body        []byte
 426  	Upstream    []byte
 427  }
 428  
 429  func (r BlossomRequest) EncodeTo(w io.Writer) error {
 430  	var hdr [28]byte
 431  	binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID)
 432  	binary.LittleEndian.PutUint32(hdr[4:8], uint32(len(r.Dir)))
 433  	binary.LittleEndian.PutUint32(hdr[8:12], uint32(len(r.Method)))
 434  	binary.LittleEndian.PutUint32(hdr[12:16], uint32(len(r.Path)))
 435  	binary.LittleEndian.PutUint32(hdr[16:20], uint32(len(r.ContentType)))
 436  	binary.LittleEndian.PutUint32(hdr[20:24], uint32(len(r.Body)))
 437  	binary.LittleEndian.PutUint32(hdr[24:28], uint32(len(r.Upstream)))
 438  	if _, err := w.Write(hdr[:]); err != nil {
 439  		return err
 440  	}
 441  	for _, b := range [][]byte{r.Dir, r.Method, r.Path, r.ContentType, r.Body, r.Upstream} {
 442  		if len(b) > 0 {
 443  			if _, err := w.Write(b); err != nil {
 444  				return err
 445  			}
 446  		}
 447  	}
 448  	return nil
 449  }
 450  
 451  func (r *BlossomRequest) DecodeFrom(rd io.Reader) error {
 452  	var hdr [28]byte
 453  	if _, err := io.ReadFull(rd, hdr[:]); err != nil {
 454  		return err
 455  	}
 456  	r.ReqID = binary.LittleEndian.Uint32(hdr[0:4])
 457  	dl := binary.LittleEndian.Uint32(hdr[4:8])
 458  	ml := binary.LittleEndian.Uint32(hdr[8:12])
 459  	pl := binary.LittleEndian.Uint32(hdr[12:16])
 460  	ctl := binary.LittleEndian.Uint32(hdr[16:20])
 461  	bl := binary.LittleEndian.Uint32(hdr[20:24])
 462  	ul := binary.LittleEndian.Uint32(hdr[24:28])
 463  	readSlice := func(n uint32) ([]byte, error) {
 464  		if n == 0 {
 465  			return nil, nil
 466  		}
 467  		b := []byte{:n}
 468  		_, err := io.ReadFull(rd, b)
 469  		return b, err
 470  	}
 471  	var err error
 472  	if r.Dir, err = readSlice(dl); err != nil {
 473  		return err
 474  	}
 475  	if r.Method, err = readSlice(ml); err != nil {
 476  		return err
 477  	}
 478  	if r.Path, err = readSlice(pl); err != nil {
 479  		return err
 480  	}
 481  	if r.ContentType, err = readSlice(ctl); err != nil {
 482  		return err
 483  	}
 484  	if r.Body, err = readSlice(bl); err != nil {
 485  		return err
 486  	}
 487  	r.Upstream, err = readSlice(ul)
 488  	return err
 489  }
 490  
 491  // BlossomResponse carries the result back to the net domain.
 492  // hdr: [4]ReqID + [4]Status + [8]Size + [4]CTLen + [4]BodyLen = 24 bytes.
 493  // Size is the file size for HEAD responses (Content-Length); 0 otherwise.
 494  type BlossomResponse struct {
 495  	ReqID  uint32
 496  	Status int32
 497  	Size   int64
 498  	CT     []byte
 499  	Body   []byte
 500  }
 501  
 502  func (r BlossomResponse) EncodeTo(w io.Writer) error {
 503  	var hdr [24]byte
 504  	binary.LittleEndian.PutUint32(hdr[0:4], r.ReqID)
 505  	binary.LittleEndian.PutUint32(hdr[4:8], uint32(r.Status))
 506  	binary.LittleEndian.PutUint64(hdr[8:16], uint64(r.Size))
 507  	binary.LittleEndian.PutUint32(hdr[16:20], uint32(len(r.CT)))
 508  	binary.LittleEndian.PutUint32(hdr[20:24], uint32(len(r.Body)))
 509  	if _, err := w.Write(hdr[:]); err != nil {
 510  		return err
 511  	}
 512  	if len(r.CT) > 0 {
 513  		if _, err := w.Write(r.CT); err != nil {
 514  			return err
 515  		}
 516  	}
 517  	if len(r.Body) > 0 {
 518  		_, err := w.Write(r.Body)
 519  		return err
 520  	}
 521  	return nil
 522  }
 523  
 524  func (r *BlossomResponse) DecodeFrom(rd io.Reader) error {
 525  	var hdr [24]byte
 526  	if _, err := io.ReadFull(rd, hdr[:]); err != nil {
 527  		return err
 528  	}
 529  	r.ReqID = binary.LittleEndian.Uint32(hdr[0:4])
 530  	r.Status = int32(binary.LittleEndian.Uint32(hdr[4:8]))
 531  	r.Size = int64(binary.LittleEndian.Uint64(hdr[8:16]))
 532  	ctl := binary.LittleEndian.Uint32(hdr[16:20])
 533  	bl := binary.LittleEndian.Uint32(hdr[20:24])
 534  	if ctl > 0 {
 535  		r.CT = []byte{:ctl}
 536  		if _, err := io.ReadFull(rd, r.CT); err != nil {
 537  			return err
 538  		}
 539  	} else {
 540  		r.CT = nil
 541  	}
 542  	if bl > 0 {
 543  		r.Body = []byte{:bl}
 544  		_, err := io.ReadFull(rd, r.Body)
 545  		return err
 546  	}
 547  	r.Body = nil
 548  	return nil
 549  }
 550  
 551