pipe_channel.mx raw

   1  //go:build linux
   2  
   3  package runtime
   4  
   5  // Pipe channel IPC — bridges Go channels across spawn domain boundaries
   6  // using the socketpair created by spawnDomain.
   7  //
   8  // Protocol: length-prefixed messages over the Unix socketpair.
   9  //   Header: chanID(2) | len(4) | payload(len)
  10  // Each channel passed to spawn gets a chanID assigned at spawn time.
  11  // The parent and child each run a bridge goroutine that multiplexes
  12  // all IPC channels over the single socketpair fd.
  13  
  14  import "unsafe"
  15  
  16  const (
  17  	pipeMsgHdrLen = 6 // 2 (chanID) + 4 (length)
  18  	pipeMaxMsg    = 1 << 20
  19  )
  20  
  21  // pipeChanEntry maps a channel ID to a runtime channel pointer.
  22  type pipeChanEntry struct {
  23  	id   uint16
  24  	ch   *channel
  25  	dir  uint8 // 0=send (parent writes, child reads), 1=recv (child writes, parent reads)
  26  	next *pipeChanEntry
  27  }
  28  
  29  // pipeBridge is the multiplexer state for one socketpair fd.
  30  type pipeBridge struct {
  31  	fd       int32
  32  	channels *pipeChanEntry
  33  	nextID   uint16
  34  }
  35  
  36  // pipeBridgeRegister adds a channel to the bridge and returns its ID.
  37  func pipeBridgeRegister(br *pipeBridge, ch *channel, dir uint8) uint16 {
  38  	id := br.nextID
  39  	br.nextID++
  40  	br.channels = &pipeChanEntry{
  41  		id:   id,
  42  		ch:   ch,
  43  		dir:  dir,
  44  		next: br.channels,
  45  	}
  46  	return id
  47  }
  48  
  49  // pipeBridgeLookup finds a channel by ID.
  50  func pipeBridgeLookup(br *pipeBridge, id uint16) *pipeChanEntry {
  51  	for e := br.channels; e != nil; e = e.next {
  52  		if e.id == id {
  53  			return e
  54  		}
  55  	}
  56  	return nil
  57  }
  58  
  59  // pipeWrite writes a length-prefixed message to fd.
  60  // Format: chanID(2) | len(4) | data(len)
  61  func pipeWrite(fd int32, chanID uint16, data []byte) bool {
  62  	var hdr [pipeMsgHdrLen]byte
  63  	hdr[0] = byte(chanID >> 8)
  64  	hdr[1] = byte(chanID)
  65  	l := uint32(len(data))
  66  	hdr[2] = byte(l >> 24)
  67  	hdr[3] = byte(l >> 16)
  68  	hdr[4] = byte(l >> 8)
  69  	hdr[5] = byte(l)
  70  
  71  	if moxie_write(fd, unsafe.Pointer(&hdr[0]), pipeMsgHdrLen) < 0 {
  72  		return false
  73  	}
  74  	if len(data) > 0 {
  75  		if moxie_write(fd, unsafe.Pointer(&data[0]), int32(len(data))) < 0 {
  76  			return false
  77  		}
  78  	}
  79  	return true
  80  }
  81  
  82  // pipeRead reads one length-prefixed message from fd.
  83  // Returns chanID, data, ok.
  84  func pipeRead(fd int32) (uint16, []byte, bool) {
  85  	var hdr [pipeMsgHdrLen]byte
  86  	if moxie_read(fd, unsafe.Pointer(&hdr[0]), pipeMsgHdrLen) < pipeMsgHdrLen {
  87  		return 0, nil, false
  88  	}
  89  	chanID := uint16(hdr[0])<<8 | uint16(hdr[1])
  90  	l := uint32(hdr[2])<<24 | uint32(hdr[3])<<16 | uint32(hdr[4])<<8 | uint32(hdr[5])
  91  	if l > pipeMaxMsg {
  92  		return 0, nil, false
  93  	}
  94  	if l == 0 {
  95  		return chanID, nil, true
  96  	}
  97  	data := make([]byte, l)
  98  	if moxie_read(fd, unsafe.Pointer(&data[0]), int32(l)) < int32(l) {
  99  		return 0, nil, false
 100  	}
 101  	return chanID, data, true
 102  }
 103  
 104  // C functions for pipe I/O (defined in spawn_unix.c).
 105  
 106  //export moxie_write
 107  func moxie_write(fd int32, buf unsafe.Pointer, count int32) int32
 108  
 109  //export moxie_read
 110  func moxie_read(fd int32, buf unsafe.Pointer, count int32) int32
 111