conn.mx raw

   1  package evloop
   2  
   3  import (
   4  	"syscall"
   5  )
   6  
   7  // DefaultWBufLimit is the default slow-client write buffer ceiling.
   8  const DefaultWBufLimit = 4 << 20 // 4MB
   9  
  10  // Conn is a non-blocking, buffered file descriptor managed by a Loop.
  11  // All writes are buffered; the loop drains them via EPOLLOUT.
  12  // All reads accumulate in rbuf; the caller consumes via Read/Consume.
  13  type Conn struct {
  14  	FD         int
  15  	Tag        int  // caller-defined (protocol type, worker index, etc.)
  16  	State      int  // caller-defined phase/state
  17  	WBufLimit  int  // max wbuf bytes before drain-and-close; 0 = use DefaultWBufLimit
  18  	wbufClose  bool // close after wbuf fully drained
  19  	rbuf       []byte
  20  	rpos       int
  21  	wbuf       []byte
  22  	loop       *Loop
  23  }
  24  
  25  // Read returns the buffered unread data. The slice is valid until the next
  26  // call to Consume or any method that may trigger a read.
  27  func (c *Conn) Read() []byte {
  28  	return c.rbuf[c.rpos:]
  29  }
  30  
  31  // Consume advances the read cursor by n bytes.
  32  // Panics if n > len(Read()) - this is always a caller logic bug.
  33  func (c *Conn) Consume(n int) {
  34  	avail := len(c.rbuf) - c.rpos
  35  	if n > avail {
  36  		panic("evloop: Consume past end of read buffer")
  37  	}
  38  	c.rpos += n
  39  	// Compact when the dead prefix exceeds half the buffer.
  40  	if c.rpos > len(c.rbuf)/2 {
  41  		copy(c.rbuf, c.rbuf[c.rpos:])
  42  		c.rbuf = c.rbuf[:len(c.rbuf)-c.rpos]
  43  		c.rpos = 0
  44  	}
  45  }
  46  
  47  // Write appends data to the write buffer.
  48  // Attempts an immediate syscall.Write first (common case: socket is writable).
  49  // On EAGAIN or partial write, buffers remainder and registers EPOLLOUT.
  50  // If WBufLimit is exceeded after appending, schedules drain-then-close.
  51  func (c *Conn) Write(data []byte) {
  52  	if len(data) == 0 {
  53  		return
  54  	}
  55  	if c.wbufClose {
  56  		return // already closing; drop new writes
  57  	}
  58  
  59  	if c.wbuf == nil {
  60  		// Fast path: try immediate write.
  61  		n, err := syscall.Write(c.FD, data)
  62  		if n > 0 {
  63  			data = data[n:]
  64  		}
  65  		if len(data) == 0 {
  66  			return // fully written, no EPOLLOUT needed
  67  		}
  68  		if err != nil && err != syscall.EAGAIN {
  69  			c.loop.closeConn(c)
  70  			return
  71  		}
  72  		// Partial or EAGAIN: buffer remainder, register EPOLLOUT.
  73  		c.wbuf = []byte{:len(data)}
  74  		copy(c.wbuf, data)
  75  		c.loop.modWrite(c.FD)
  76  		return
  77  	}
  78  
  79  	// Slow path: already buffering; just append.
  80  	c.wbuf = append(c.wbuf, data...)
  81  	limit := c.WBufLimit
  82  	if limit == 0 {
  83  		limit = DefaultWBufLimit
  84  	}
  85  	if len(c.wbuf) > limit {
  86  		c.wbufClose = true // drain what's buffered, then close
  87  	}
  88  }
  89  
  90  // WriteAndClose buffers data and closes the connection after it drains.
  91  func (c *Conn) WriteAndClose(data []byte) {
  92  	c.Write(data)
  93  	c.wbufClose = true
  94  	if c.wbuf == nil {
  95  		// Nothing pending; close immediately.
  96  		c.loop.closeConn(c)
  97  	}
  98  }
  99  
 100  // Close closes the fd immediately without draining.
 101  func (c *Conn) Close() {
 102  	c.loop.closeConn(c)
 103  }
 104  
 105  // WBufLen returns the number of bytes pending in the write buffer.
 106  func (c *Conn) WBufLen() int {
 107  	return len(c.wbuf)
 108  }
 109  
 110  // drainWrite is called by the loop on EPOLLOUT. Drains wbuf until EAGAIN or done.
 111  func (c *Conn) drainWrite() {
 112  	for len(c.wbuf) > 0 {
 113  		n, err := syscall.Write(c.FD, c.wbuf)
 114  		if n > 0 {
 115  			c.wbuf = c.wbuf[n:]
 116  		}
 117  		if err == syscall.EAGAIN {
 118  			return // socket full again; wait for next EPOLLOUT
 119  		}
 120  		if err != nil {
 121  			c.loop.closeConn(c)
 122  			return
 123  		}
 124  	}
 125  	// Fully drained.
 126  	c.wbuf = nil
 127  	c.loop.modRead(c.FD)
 128  	if c.wbufClose {
 129  		c.loop.closeConn(c)
 130  	}
 131  }
 132  
 133  // appendRead is called by the loop to accumulate incoming bytes.
 134  func (c *Conn) appendRead(buf []byte) {
 135  	c.rbuf = append(c.rbuf, buf...)
 136  }
 137