pipe.mx raw
1 package evloop
2
3 import (
4 "syscall"
5 )
6
7 // Pipe wraps a Conn for frame-based IPC over a spawn socketpair.
8 // Frame format: chanID(2 bytes BE) | length(4 bytes BE) | payload.
9 // WriteFrame never blocks - it appends to the underlying Conn's write buffer.
10 // ReadFrame reads one complete frame from the Conn's read buffer per call
11 // (one frame per EPOLLIN event for fair scheduling).
12 type Pipe struct {
13 Conn *Conn
14 reader pipeFrameReader
15 }
16
17 // NewPipe creates a Pipe wrapping an existing Conn.
18 // The caller must have already registered the fd with the Loop via AddFD or
19 // the Loop's accept path.
20 func NewPipe(c *Conn) *Pipe {
21 return &Pipe{Conn: c}
22 }
23
24 // WriteFrame buffers a frame for sending. Never blocks.
25 func (p *Pipe) WriteFrame(chanID uint16, data []byte) {
26 l := uint32(len(data))
27 var hdr [6]byte
28 hdr[0] = byte(chanID >> 8)
29 hdr[1] = byte(chanID)
30 hdr[2] = byte(l >> 24)
31 hdr[3] = byte(l >> 16)
32 hdr[4] = byte(l >> 8)
33 hdr[5] = byte(l)
34 frame := []byte{:0:6 + int(l)}
35 frame = append(frame, hdr[:]...)
36 frame = append(frame, data...)
37 p.Conn.Write(frame)
38 }
39
40 // ReadFrame attempts to read one complete frame from the Conn's read buffer.
41 // Returns (chanID, payload, true) on success.
42 // Returns (0, nil, false) when insufficient data is buffered (call again on next EPOLLIN).
43 // The returned payload slice is valid until the next Consume on the Conn.
44 func (p *Pipe) ReadFrame() (chanID uint16, payload []byte, ok bool) {
45 return p.reader.read(p.Conn)
46 }
47
48 // pipeFrameReader maintains partial-frame state across EPOLLIN events.
49 type pipeFrameReader struct {
50 hdr [6]byte
51 hdrN int
52 chanID uint16
53 dataLen int
54 }
55
56 func (fr *pipeFrameReader) read(c *Conn) (uint16, []byte, bool) {
57 buf := c.Read()
58
59 // Accumulate header bytes.
60 for fr.hdrN < 6 {
61 if len(buf) == 0 {
62 return 0, nil, false
63 }
64 fr.hdr[fr.hdrN] = buf[0]
65 fr.hdrN++
66 c.Consume(1)
67 buf = c.Read()
68 }
69
70 // Decode length on first complete header read.
71 if fr.dataLen == 0 && fr.hdrN == 6 {
72 fr.chanID = uint16(fr.hdr[0])<<8 | uint16(fr.hdr[1])
73 l := uint32(fr.hdr[2])<<24 | uint32(fr.hdr[3])<<16 | uint32(fr.hdr[4])<<8 | uint32(fr.hdr[5])
74 if l == 0 || l > 64<<20 {
75 // Invalid frame: reset and signal error via zero-length payload.
76 fr.reset()
77 return fr.chanID, nil, true
78 }
79 fr.dataLen = int(l)
80 }
81
82 // Wait for full payload.
83 if len(buf) < fr.dataLen {
84 return 0, nil, false
85 }
86
87 data := buf[:fr.dataLen]
88 id := fr.chanID
89 c.Consume(fr.dataLen)
90 fr.reset()
91 return id, data, true
92 }
93
94 func (fr *pipeFrameReader) reset() {
95 fr.hdrN = 0
96 fr.chanID = 0
97 fr.dataLen = 0
98 }
99
100 // PipeFD sets up an existing fd (from a spawn socketpair) as a non-blocking
101 // Pipe registered with the loop for EPOLLIN dispatch via OnFD.
102 // The caller is responsible for calling Pipe.ReadFrame inside OnFD.
103 func PipeFD(l *Loop, fd int32, tag int) *Pipe {
104 syscall.SetNonblock(int(fd), true)
105 c := &Conn{
106 FD: int(fd),
107 Tag: tag,
108 loop: l,
109 }
110 l.conns[int(fd)] = c
111 l.AddFD(fd)
112 return NewPipe(c)
113 }
114