pipe.mx raw

   1  package evloop
   2  
   3  import (
   4  	"syscall"
   5  )
   6  
   7  // Pipe wraps a Conn for frame-based IPC over a spawn socketpair.
   8  // Frame format: chanID(2 bytes BE) | length(4 bytes BE) | payload.
   9  // WriteFrame never blocks - it appends to the underlying Conn's write buffer.
  10  // ReadFrame reads one complete frame from the Conn's read buffer per call
  11  // (one frame per EPOLLIN event for fair scheduling).
  12  type Pipe struct {
  13  	Conn   *Conn
  14  	reader pipeFrameReader
  15  }
  16  
  17  // NewPipe creates a Pipe wrapping an existing Conn.
  18  // The caller must have already registered the fd with the Loop via AddFD or
  19  // the Loop's accept path.
  20  func NewPipe(c *Conn) *Pipe {
  21  	return &Pipe{Conn: c}
  22  }
  23  
  24  // WriteFrame buffers a frame for sending. Never blocks.
  25  func (p *Pipe) WriteFrame(chanID uint16, data []byte) {
  26  	l := uint32(len(data))
  27  	var hdr [6]byte
  28  	hdr[0] = byte(chanID >> 8)
  29  	hdr[1] = byte(chanID)
  30  	hdr[2] = byte(l >> 24)
  31  	hdr[3] = byte(l >> 16)
  32  	hdr[4] = byte(l >> 8)
  33  	hdr[5] = byte(l)
  34  	frame := []byte{:0:6 + int(l)}
  35  	frame = append(frame, hdr[:]...)
  36  	frame = append(frame, data...)
  37  	p.Conn.Write(frame)
  38  }
  39  
  40  // ReadFrame attempts to read one complete frame from the Conn's read buffer.
  41  // Returns (chanID, payload, true) on success.
  42  // Returns (0, nil, false) when insufficient data is buffered (call again on next EPOLLIN).
  43  // The returned payload slice is valid until the next Consume on the Conn.
  44  func (p *Pipe) ReadFrame() (chanID uint16, payload []byte, ok bool) {
  45  	return p.reader.read(p.Conn)
  46  }
  47  
  48  // pipeFrameReader maintains partial-frame state across EPOLLIN events.
  49  type pipeFrameReader struct {
  50  	hdr     [6]byte
  51  	hdrN    int
  52  	chanID  uint16
  53  	dataLen int
  54  }
  55  
  56  func (fr *pipeFrameReader) read(c *Conn) (uint16, []byte, bool) {
  57  	buf := c.Read()
  58  
  59  	// Accumulate header bytes.
  60  	for fr.hdrN < 6 {
  61  		if len(buf) == 0 {
  62  			return 0, nil, false
  63  		}
  64  		fr.hdr[fr.hdrN] = buf[0]
  65  		fr.hdrN++
  66  		c.Consume(1)
  67  		buf = c.Read()
  68  	}
  69  
  70  	// Decode length on first complete header read.
  71  	if fr.dataLen == 0 && fr.hdrN == 6 {
  72  		fr.chanID = uint16(fr.hdr[0])<<8 | uint16(fr.hdr[1])
  73  		l := uint32(fr.hdr[2])<<24 | uint32(fr.hdr[3])<<16 | uint32(fr.hdr[4])<<8 | uint32(fr.hdr[5])
  74  		if l == 0 || l > 64<<20 {
  75  			// Invalid frame: reset and signal error via zero-length payload.
  76  			fr.reset()
  77  			return fr.chanID, nil, true
  78  		}
  79  		fr.dataLen = int(l)
  80  	}
  81  
  82  	// Wait for full payload.
  83  	if len(buf) < fr.dataLen {
  84  		return 0, nil, false
  85  	}
  86  
  87  	data := buf[:fr.dataLen]
  88  	id := fr.chanID
  89  	c.Consume(fr.dataLen)
  90  	fr.reset()
  91  	return id, data, true
  92  }
  93  
  94  func (fr *pipeFrameReader) reset() {
  95  	fr.hdrN = 0
  96  	fr.chanID = 0
  97  	fr.dataLen = 0
  98  }
  99  
 100  // PipeFD sets up an existing fd (from a spawn socketpair) as a non-blocking
 101  // Pipe registered with the loop for EPOLLIN dispatch via OnFD.
 102  // The caller is responsible for calling Pipe.ReadFrame inside OnFD.
 103  func PipeFD(l *Loop, fd int32, tag int) *Pipe {
 104  	syscall.SetNonblock(int(fd), true)
 105  	c := &Conn{
 106  		FD:   int(fd),
 107  		Tag:  tag,
 108  		loop: l,
 109  	}
 110  	l.conns[int(fd)] = c
 111  	l.AddFD(fd)
 112  	return NewPipe(c)
 113  }
 114