package evloop import ( "syscall" ) // Pipe wraps a Conn for frame-based IPC over a spawn socketpair. // Frame format: chanID(2 bytes BE) | length(4 bytes BE) | payload. // WriteFrame never blocks - it appends to the underlying Conn's write buffer. // ReadFrame reads one complete frame from the Conn's read buffer per call // (one frame per EPOLLIN event for fair scheduling). type Pipe struct { Conn *Conn reader pipeFrameReader } // NewPipe creates a Pipe wrapping an existing Conn. // The caller must have already registered the fd with the Loop via AddFD or // the Loop's accept path. func NewPipe(c *Conn) *Pipe { return &Pipe{Conn: c} } // WriteFrame buffers a frame for sending. Never blocks. func (p *Pipe) WriteFrame(chanID uint16, data []byte) { l := uint32(len(data)) var hdr [6]byte hdr[0] = byte(chanID >> 8) hdr[1] = byte(chanID) hdr[2] = byte(l >> 24) hdr[3] = byte(l >> 16) hdr[4] = byte(l >> 8) hdr[5] = byte(l) frame := []byte{:0:6 + int(l)} frame = append(frame, hdr[:]...) frame = append(frame, data...) p.Conn.Write(frame) } // ReadFrame attempts to read one complete frame from the Conn's read buffer. // Returns (chanID, payload, true) on success. // Returns (0, nil, false) when insufficient data is buffered (call again on next EPOLLIN). // The returned payload slice is valid until the next Consume on the Conn. func (p *Pipe) ReadFrame() (chanID uint16, payload []byte, ok bool) { return p.reader.read(p.Conn) } // pipeFrameReader maintains partial-frame state across EPOLLIN events. type pipeFrameReader struct { hdr [6]byte hdrN int chanID uint16 dataLen int } func (fr *pipeFrameReader) read(c *Conn) (uint16, []byte, bool) { buf := c.Read() // Accumulate header bytes. for fr.hdrN < 6 { if len(buf) == 0 { return 0, nil, false } fr.hdr[fr.hdrN] = buf[0] fr.hdrN++ c.Consume(1) buf = c.Read() } // Decode length on first complete header read. if fr.dataLen == 0 && fr.hdrN == 6 { fr.chanID = uint16(fr.hdr[0])<<8 | uint16(fr.hdr[1]) l := uint32(fr.hdr[2])<<24 | uint32(fr.hdr[3])<<16 | uint32(fr.hdr[4])<<8 | uint32(fr.hdr[5]) if l == 0 || l > 64<<20 { // Invalid frame: reset and signal error via zero-length payload. fr.reset() return fr.chanID, nil, true } fr.dataLen = int(l) } // Wait for full payload. if len(buf) < fr.dataLen { return 0, nil, false } data := buf[:fr.dataLen] id := fr.chanID c.Consume(fr.dataLen) fr.reset() return id, data, true } func (fr *pipeFrameReader) reset() { fr.hdrN = 0 fr.chanID = 0 fr.dataLen = 0 } // PipeFD sets up an existing fd (from a spawn socketpair) as a non-blocking // Pipe registered with the loop for EPOLLIN dispatch via OnFD. // The caller is responsible for calling Pipe.ReadFrame inside OnFD. func PipeFD(l *Loop, fd int32, tag int) *Pipe { syscall.SetNonblock(int(fd), true) c := &Conn{ FD: int(fd), Tag: tag, loop: l, } l.conns[int(fd)] = c l.AddFD(fd) return NewPipe(c) }