// Package pool provides shared worker pool infrastructure: pipe frame IPC, // busy tracking, and FD lookup. Used by all worker pools in the relay. package pool import ( "syscall" ) // WriteFrame writes one pipe frame to fd. // Frame format: [chanID(2BE) | length(4BE)] + data. // Handles partial writes. Returns false immediately on EAGAIN or error // to avoid blocking the event loop. func WriteFrame(fd int32, chanID uint16, data []byte) bool { l := uint32(len(data)) var hdr [6]byte hdr[0] = byte(chanID >> 8) hdr[1] = byte(chanID) hdr[2] = byte(l >> 24) hdr[3] = byte(l >> 16) hdr[4] = byte(l >> 8) hdr[5] = byte(l) frame := []byte{:0:6 + int(l)} frame = append(frame, hdr[:]...) frame = append(frame, data...) sent := 0 for sent < len(frame) { n, err := syscall.Write(int(fd), frame[sent:]) if n > 0 { sent += n } if err != nil { return false } } return true } // ReadFrame reads one pipe frame from fd using non-blocking I/O for the header. // Returns (chanID, payload, ok). ok=false on error or EAGAIN. func ReadFrame(fd int32) (chanID uint16, data []byte, ok bool) { var hdr [6]byte syscall.SetNonblock(int(fd), true) n, err := syscall.Read(int(fd), hdr[:]) syscall.SetNonblock(int(fd), false) if err == syscall.EAGAIN || n == 0 || err != nil { return 0, nil, false } if n < 6 { return 0, nil, false } chanID = uint16(hdr[0])<<8 | uint16(hdr[1]) l := uint32(hdr[2])<<24 | uint32(hdr[3])<<16 | uint32(hdr[4])<<8 | uint32(hdr[5]) if l == 0 || l > 64<<20 { return chanID, nil, false } data = []byte{:l} got := 0 for got < int(l) { nr, rerr := syscall.Read(int(fd), data[got:]) if nr <= 0 || rerr != nil { return 0, nil, false } got += nr } return chanID, data, true } // DrainFrame discards n bytes from fd. func DrainFrame(fd int32, n uint32) { discard := []byte{:512} for n > 0 { chunk := int(n) if chunk > 512 { chunk = 512 } nr, err := syscall.Read(int(fd), discard[:chunk]) if err != nil || nr <= 0 { return } n -= uint32(nr) } } // FrameReader maintains partial-frame state for non-blocking reads. // The fd must be in non-blocking mode. Each Read call either returns a // complete frame or preserves partial state for the next call. type FrameReader struct { hdr [6]byte hdrN int chanID uint16 data []byte dataN int dataLen int } // Read attempts to complete one frame from fd. Returns (chanID, payload, true) // on success. Returns (0, nil, false) on EAGAIN (partial state preserved for // next call) or pipe error (state reset). func (fr *FrameReader) Read(fd int32) (uint16, []byte, bool) { for fr.hdrN < 6 { n, err := syscall.Read(int(fd), fr.hdr[fr.hdrN:]) if n > 0 { fr.hdrN += n } if err == syscall.EAGAIN { return 0, nil, false } if err != nil || n <= 0 { fr.Reset() return 0, nil, false } } if fr.data == nil { fr.chanID = uint16(fr.hdr[0])<<8 | uint16(fr.hdr[1]) l := uint32(fr.hdr[2])<<24 | uint32(fr.hdr[3])<<16 | uint32(fr.hdr[4])<<8 | uint32(fr.hdr[5]) if l == 0 || l > 64<<20 { fr.Reset() return fr.chanID, nil, false } fr.dataLen = int(l) fr.data = []byte{:fr.dataLen} fr.dataN = 0 } for fr.dataN < fr.dataLen { n, err := syscall.Read(int(fd), fr.data[fr.dataN:]) if n > 0 { fr.dataN += n } if err == syscall.EAGAIN { return 0, nil, false } if err != nil || n <= 0 { fr.Reset() return 0, nil, false } } chanID := fr.chanID data := fr.data fr.Reset() return chanID, data, true } // Reset clears partial-frame state. func (fr *FrameReader) Reset() { fr.hdrN = 0 fr.data = nil fr.dataN = 0 fr.dataLen = 0 }