conn.mx raw
1 package evloop
2
3 import (
4 "syscall"
5 )
6
7 // DefaultWBufLimit is the default slow-client write buffer ceiling.
8 const DefaultWBufLimit = 4 << 20 // 4MB
9
10 // Conn is a non-blocking, buffered file descriptor managed by a Loop.
11 // All writes are buffered; the loop drains them via EPOLLOUT.
12 // All reads accumulate in rbuf; the caller consumes via Read/Consume.
13 type Conn struct {
14 FD int
15 Tag int // caller-defined (protocol type, worker index, etc.)
16 State int // caller-defined phase/state
17 WBufLimit int // max wbuf bytes before drain-and-close; 0 = use DefaultWBufLimit
18 wbufClose bool // close after wbuf fully drained
19 rbuf []byte
20 rpos int
21 wbuf []byte
22 loop *Loop
23 }
24
25 // Read returns the buffered unread data. The slice is valid until the next
26 // call to Consume or any method that may trigger a read.
27 func (c *Conn) Read() []byte {
28 return c.rbuf[c.rpos:]
29 }
30
31 // Consume advances the read cursor by n bytes.
32 // Panics if n > len(Read()) - this is always a caller logic bug.
33 func (c *Conn) Consume(n int) {
34 avail := len(c.rbuf) - c.rpos
35 if n > avail {
36 panic("evloop: Consume past end of read buffer")
37 }
38 c.rpos += n
39 // Compact when the dead prefix exceeds half the buffer.
40 if c.rpos > len(c.rbuf)/2 {
41 copy(c.rbuf, c.rbuf[c.rpos:])
42 c.rbuf = c.rbuf[:len(c.rbuf)-c.rpos]
43 c.rpos = 0
44 }
45 }
46
47 // Write appends data to the write buffer.
48 // Attempts an immediate syscall.Write first (common case: socket is writable).
49 // On EAGAIN or partial write, buffers remainder and registers EPOLLOUT.
50 // If WBufLimit is exceeded after appending, schedules drain-then-close.
51 func (c *Conn) Write(data []byte) {
52 if len(data) == 0 {
53 return
54 }
55 if c.wbufClose {
56 return // already closing; drop new writes
57 }
58
59 if c.wbuf == nil {
60 // Fast path: try immediate write.
61 n, err := syscall.Write(c.FD, data)
62 if n > 0 {
63 data = data[n:]
64 }
65 if len(data) == 0 {
66 return // fully written, no EPOLLOUT needed
67 }
68 if err != nil && err != syscall.EAGAIN {
69 c.loop.closeConn(c)
70 return
71 }
72 // Partial or EAGAIN: buffer remainder, register EPOLLOUT.
73 c.wbuf = []byte{:len(data)}
74 copy(c.wbuf, data)
75 c.loop.modWrite(c.FD)
76 return
77 }
78
79 // Slow path: already buffering; just append.
80 c.wbuf = append(c.wbuf, data...)
81 limit := c.WBufLimit
82 if limit == 0 {
83 limit = DefaultWBufLimit
84 }
85 if len(c.wbuf) > limit {
86 c.wbufClose = true // drain what's buffered, then close
87 }
88 }
89
90 // WriteAndClose buffers data and closes the connection after it drains.
91 func (c *Conn) WriteAndClose(data []byte) {
92 c.Write(data)
93 c.wbufClose = true
94 if c.wbuf == nil {
95 // Nothing pending; close immediately.
96 c.loop.closeConn(c)
97 }
98 }
99
100 // Close closes the fd immediately without draining.
101 func (c *Conn) Close() {
102 c.loop.closeConn(c)
103 }
104
105 // WBufLen returns the number of bytes pending in the write buffer.
106 func (c *Conn) WBufLen() int {
107 return len(c.wbuf)
108 }
109
110 // drainWrite is called by the loop on EPOLLOUT. Drains wbuf until EAGAIN or done.
111 func (c *Conn) drainWrite() {
112 for len(c.wbuf) > 0 {
113 n, err := syscall.Write(c.FD, c.wbuf)
114 if n > 0 {
115 c.wbuf = c.wbuf[n:]
116 }
117 if err == syscall.EAGAIN {
118 return // socket full again; wait for next EPOLLOUT
119 }
120 if err != nil {
121 c.loop.closeConn(c)
122 return
123 }
124 }
125 // Fully drained.
126 c.wbuf = nil
127 c.loop.modRead(c.FD)
128 if c.wbufClose {
129 c.loop.closeConn(c)
130 }
131 }
132
133 // appendRead is called by the loop to accumulate incoming bytes.
134 func (c *Conn) appendRead(buf []byte) {
135 c.rbuf = append(c.rbuf, buf...)
136 }
137