package evloop import ( "syscall" ) // DefaultWBufLimit is the default slow-client write buffer ceiling. const DefaultWBufLimit = 4 << 20 // 4MB // Conn is a non-blocking, buffered file descriptor managed by a Loop. // All writes are buffered; the loop drains them via EPOLLOUT. // All reads accumulate in rbuf; the caller consumes via Read/Consume. type Conn struct { FD int Tag int // caller-defined (protocol type, worker index, etc.) State int // caller-defined phase/state WBufLimit int // max wbuf bytes before drain-and-close; 0 = use DefaultWBufLimit wbufClose bool // close after wbuf fully drained rbuf []byte rpos int wbuf []byte loop *Loop } // Read returns the buffered unread data. The slice is valid until the next // call to Consume or any method that may trigger a read. func (c *Conn) Read() []byte { return c.rbuf[c.rpos:] } // Consume advances the read cursor by n bytes. // Panics if n > len(Read()) - this is always a caller logic bug. func (c *Conn) Consume(n int) { avail := len(c.rbuf) - c.rpos if n > avail { panic("evloop: Consume past end of read buffer") } c.rpos += n // Compact when the dead prefix exceeds half the buffer. if c.rpos > len(c.rbuf)/2 { copy(c.rbuf, c.rbuf[c.rpos:]) c.rbuf = c.rbuf[:len(c.rbuf)-c.rpos] c.rpos = 0 } } // Write appends data to the write buffer. // Attempts an immediate syscall.Write first (common case: socket is writable). // On EAGAIN or partial write, buffers remainder and registers EPOLLOUT. // If WBufLimit is exceeded after appending, schedules drain-then-close. func (c *Conn) Write(data []byte) { if len(data) == 0 { return } if c.wbufClose { return // already closing; drop new writes } if c.wbuf == nil { // Fast path: try immediate write. n, err := syscall.Write(c.FD, data) if n > 0 { data = data[n:] } if len(data) == 0 { return // fully written, no EPOLLOUT needed } if err != nil && err != syscall.EAGAIN { c.loop.closeConn(c) return } // Partial or EAGAIN: buffer remainder, register EPOLLOUT. c.wbuf = []byte{:len(data)} copy(c.wbuf, data) c.loop.modWrite(c.FD) return } // Slow path: already buffering; just append. c.wbuf = append(c.wbuf, data...) limit := c.WBufLimit if limit == 0 { limit = DefaultWBufLimit } if len(c.wbuf) > limit { c.wbufClose = true // drain what's buffered, then close } } // WriteAndClose buffers data and closes the connection after it drains. func (c *Conn) WriteAndClose(data []byte) { c.Write(data) c.wbufClose = true if c.wbuf == nil { // Nothing pending; close immediately. c.loop.closeConn(c) } } // Close closes the fd immediately without draining. func (c *Conn) Close() { c.loop.closeConn(c) } // WBufLen returns the number of bytes pending in the write buffer. func (c *Conn) WBufLen() int { return len(c.wbuf) } // drainWrite is called by the loop on EPOLLOUT. Drains wbuf until EAGAIN or done. func (c *Conn) drainWrite() { for len(c.wbuf) > 0 { n, err := syscall.Write(c.FD, c.wbuf) if n > 0 { c.wbuf = c.wbuf[n:] } if err == syscall.EAGAIN { return // socket full again; wait for next EPOLLOUT } if err != nil { c.loop.closeConn(c) return } } // Fully drained. c.wbuf = nil c.loop.modRead(c.FD) if c.wbufClose { c.loop.closeConn(c) } } // appendRead is called by the loop to accumulate incoming bytes. func (c *Conn) appendRead(buf []byte) { c.rbuf = append(c.rbuf, buf...) }