frame.mx raw
1 // Package pool provides shared worker pool infrastructure: pipe frame IPC,
2 // busy tracking, and FD lookup. Used by all worker pools in the relay.
3 package pool
4
5 import (
6 "syscall"
7 )
8
9 // WriteFrame writes one pipe frame to fd.
10 // Frame format: [chanID(2BE) | length(4BE)] + data.
11 // Handles partial writes. Returns false immediately on EAGAIN or error
12 // to avoid blocking the event loop.
13 func WriteFrame(fd int32, chanID uint16, data []byte) bool {
14 l := uint32(len(data))
15 var hdr [6]byte
16 hdr[0] = byte(chanID >> 8)
17 hdr[1] = byte(chanID)
18 hdr[2] = byte(l >> 24)
19 hdr[3] = byte(l >> 16)
20 hdr[4] = byte(l >> 8)
21 hdr[5] = byte(l)
22 frame := []byte{:0:6 + int(l)}
23 frame = append(frame, hdr[:]...)
24 frame = append(frame, data...)
25 sent := 0
26 for sent < len(frame) {
27 n, err := syscall.Write(int(fd), frame[sent:])
28 if n > 0 {
29 sent += n
30 }
31 if err != nil {
32 return false
33 }
34 }
35 return true
36 }
37
38 // ReadFrame reads one pipe frame from fd using non-blocking I/O for the header.
39 // Returns (chanID, payload, ok). ok=false on error or EAGAIN.
40 func ReadFrame(fd int32) (chanID uint16, data []byte, ok bool) {
41 var hdr [6]byte
42 syscall.SetNonblock(int(fd), true)
43 n, err := syscall.Read(int(fd), hdr[:])
44 syscall.SetNonblock(int(fd), false)
45 if err == syscall.EAGAIN || n == 0 || err != nil {
46 return 0, nil, false
47 }
48 if n < 6 {
49 return 0, nil, false
50 }
51 chanID = uint16(hdr[0])<<8 | uint16(hdr[1])
52 l := uint32(hdr[2])<<24 | uint32(hdr[3])<<16 | uint32(hdr[4])<<8 | uint32(hdr[5])
53 if l == 0 || l > 64<<20 {
54 return chanID, nil, false
55 }
56 data = []byte{:l}
57 got := 0
58 for got < int(l) {
59 nr, rerr := syscall.Read(int(fd), data[got:])
60 if nr <= 0 || rerr != nil {
61 return 0, nil, false
62 }
63 got += nr
64 }
65 return chanID, data, true
66 }
67
68 // DrainFrame discards n bytes from fd.
69 func DrainFrame(fd int32, n uint32) {
70 discard := []byte{:512}
71 for n > 0 {
72 chunk := int(n)
73 if chunk > 512 {
74 chunk = 512
75 }
76 nr, err := syscall.Read(int(fd), discard[:chunk])
77 if err != nil || nr <= 0 {
78 return
79 }
80 n -= uint32(nr)
81 }
82 }
83
84 // FrameReader maintains partial-frame state for non-blocking reads.
85 // The fd must be in non-blocking mode. Each Read call either returns a
86 // complete frame or preserves partial state for the next call.
87 type FrameReader struct {
88 hdr [6]byte
89 hdrN int
90 chanID uint16
91 data []byte
92 dataN int
93 dataLen int
94 }
95
96 // Read attempts to complete one frame from fd. Returns (chanID, payload, true)
97 // on success. Returns (0, nil, false) on EAGAIN (partial state preserved for
98 // next call) or pipe error (state reset).
99 func (fr *FrameReader) Read(fd int32) (uint16, []byte, bool) {
100 for fr.hdrN < 6 {
101 n, err := syscall.Read(int(fd), fr.hdr[fr.hdrN:])
102 if n > 0 {
103 fr.hdrN += n
104 }
105 if err == syscall.EAGAIN {
106 return 0, nil, false
107 }
108 if err != nil || n <= 0 {
109 fr.Reset()
110 return 0, nil, false
111 }
112 }
113 if fr.data == nil {
114 fr.chanID = uint16(fr.hdr[0])<<8 | uint16(fr.hdr[1])
115 l := uint32(fr.hdr[2])<<24 | uint32(fr.hdr[3])<<16 | uint32(fr.hdr[4])<<8 | uint32(fr.hdr[5])
116 if l == 0 || l > 64<<20 {
117 fr.Reset()
118 return fr.chanID, nil, false
119 }
120 fr.dataLen = int(l)
121 fr.data = []byte{:fr.dataLen}
122 fr.dataN = 0
123 }
124 for fr.dataN < fr.dataLen {
125 n, err := syscall.Read(int(fd), fr.data[fr.dataN:])
126 if n > 0 {
127 fr.dataN += n
128 }
129 if err == syscall.EAGAIN {
130 return 0, nil, false
131 }
132 if err != nil || n <= 0 {
133 fr.Reset()
134 return 0, nil, false
135 }
136 }
137 chanID := fr.chanID
138 data := fr.data
139 fr.Reset()
140 return chanID, data, true
141 }
142
143 // Reset clears partial-frame state.
144 func (fr *FrameReader) Reset() {
145 fr.hdrN = 0
146 fr.data = nil
147 fr.dataN = 0
148 fr.dataLen = 0
149 }
150