frame.mx raw

   1  // Package pool provides shared worker pool infrastructure: pipe frame IPC,
   2  // busy tracking, and FD lookup. Used by all worker pools in the relay.
   3  package pool
   4  
   5  import (
   6  	"syscall"
   7  )
   8  
   9  // WriteFrame writes one pipe frame to fd.
  10  // Frame format: [chanID(2BE) | length(4BE)] + data.
  11  // Handles partial writes. Returns false immediately on EAGAIN or error
  12  // to avoid blocking the event loop.
  13  func WriteFrame(fd int32, chanID uint16, data []byte) bool {
  14  	l := uint32(len(data))
  15  	var hdr [6]byte
  16  	hdr[0] = byte(chanID >> 8)
  17  	hdr[1] = byte(chanID)
  18  	hdr[2] = byte(l >> 24)
  19  	hdr[3] = byte(l >> 16)
  20  	hdr[4] = byte(l >> 8)
  21  	hdr[5] = byte(l)
  22  	frame := []byte{:0:6 + int(l)}
  23  	frame = append(frame, hdr[:]...)
  24  	frame = append(frame, data...)
  25  	sent := 0
  26  	for sent < len(frame) {
  27  		n, err := syscall.Write(int(fd), frame[sent:])
  28  		if n > 0 {
  29  			sent += n
  30  		}
  31  		if err != nil {
  32  			return false
  33  		}
  34  	}
  35  	return true
  36  }
  37  
  38  // ReadFrame reads one pipe frame from fd using non-blocking I/O for the header.
  39  // Returns (chanID, payload, ok). ok=false on error or EAGAIN.
  40  func ReadFrame(fd int32) (chanID uint16, data []byte, ok bool) {
  41  	var hdr [6]byte
  42  	syscall.SetNonblock(int(fd), true)
  43  	n, err := syscall.Read(int(fd), hdr[:])
  44  	syscall.SetNonblock(int(fd), false)
  45  	if err == syscall.EAGAIN || n == 0 || err != nil {
  46  		return 0, nil, false
  47  	}
  48  	if n < 6 {
  49  		return 0, nil, false
  50  	}
  51  	chanID = uint16(hdr[0])<<8 | uint16(hdr[1])
  52  	l := uint32(hdr[2])<<24 | uint32(hdr[3])<<16 | uint32(hdr[4])<<8 | uint32(hdr[5])
  53  	if l == 0 || l > 64<<20 {
  54  		return chanID, nil, false
  55  	}
  56  	data = []byte{:l}
  57  	got := 0
  58  	for got < int(l) {
  59  		nr, rerr := syscall.Read(int(fd), data[got:])
  60  		if nr <= 0 || rerr != nil {
  61  			return 0, nil, false
  62  		}
  63  		got += nr
  64  	}
  65  	return chanID, data, true
  66  }
  67  
  68  // DrainFrame discards n bytes from fd.
  69  func DrainFrame(fd int32, n uint32) {
  70  	discard := []byte{:512}
  71  	for n > 0 {
  72  		chunk := int(n)
  73  		if chunk > 512 {
  74  			chunk = 512
  75  		}
  76  		nr, err := syscall.Read(int(fd), discard[:chunk])
  77  		if err != nil || nr <= 0 {
  78  			return
  79  		}
  80  		n -= uint32(nr)
  81  	}
  82  }
  83  
  84  // FrameReader maintains partial-frame state for non-blocking reads.
  85  // The fd must be in non-blocking mode. Each Read call either returns a
  86  // complete frame or preserves partial state for the next call.
  87  type FrameReader struct {
  88  	hdr     [6]byte
  89  	hdrN    int
  90  	chanID  uint16
  91  	data    []byte
  92  	dataN   int
  93  	dataLen int
  94  }
  95  
  96  // Read attempts to complete one frame from fd. Returns (chanID, payload, true)
  97  // on success. Returns (0, nil, false) on EAGAIN (partial state preserved for
  98  // next call) or pipe error (state reset).
  99  func (fr *FrameReader) Read(fd int32) (uint16, []byte, bool) {
 100  	for fr.hdrN < 6 {
 101  		n, err := syscall.Read(int(fd), fr.hdr[fr.hdrN:])
 102  		if n > 0 {
 103  			fr.hdrN += n
 104  		}
 105  		if err == syscall.EAGAIN {
 106  			return 0, nil, false
 107  		}
 108  		if err != nil || n <= 0 {
 109  			fr.Reset()
 110  			return 0, nil, false
 111  		}
 112  	}
 113  	if fr.data == nil {
 114  		fr.chanID = uint16(fr.hdr[0])<<8 | uint16(fr.hdr[1])
 115  		l := uint32(fr.hdr[2])<<24 | uint32(fr.hdr[3])<<16 | uint32(fr.hdr[4])<<8 | uint32(fr.hdr[5])
 116  		if l == 0 || l > 64<<20 {
 117  			fr.Reset()
 118  			return fr.chanID, nil, false
 119  		}
 120  		fr.dataLen = int(l)
 121  		fr.data = []byte{:fr.dataLen}
 122  		fr.dataN = 0
 123  	}
 124  	for fr.dataN < fr.dataLen {
 125  		n, err := syscall.Read(int(fd), fr.data[fr.dataN:])
 126  		if n > 0 {
 127  			fr.dataN += n
 128  		}
 129  		if err == syscall.EAGAIN {
 130  			return 0, nil, false
 131  		}
 132  		if err != nil || n <= 0 {
 133  			fr.Reset()
 134  			return 0, nil, false
 135  		}
 136  	}
 137  	chanID := fr.chanID
 138  	data := fr.data
 139  	fr.Reset()
 140  	return chanID, data, true
 141  }
 142  
 143  // Reset clears partial-frame state.
 144  func (fr *FrameReader) Reset() {
 145  	fr.hdrN = 0
 146  	fr.data = nil
 147  	fr.dataN = 0
 148  	fr.dataLen = 0
 149  }
 150