evloop.mx raw
1 // Package evloop provides a non-blocking event loop for single-threaded
2 // moxie servers. It is NOT a framework: it provides buffered I/O primitives
3 // and an epoll dispatch loop. Protocol parsing is the caller's concern.
4 //
5 // Core invariant: no write anywhere in this package ever blocks.
6 // All writes are: attempt once, buffer remainder, drain via EPOLLOUT.
7 // A blocking write in a single-threaded event loop is a deadlock.
8 //
9 // Linux only (epoll). Darwin (kqueue) is future work.
10 package evloop
11
12 import (
13 "fmt"
14 "syscall"
15 )
16
17 // Handler is implemented by the server layer above evloop.
18 type Handler interface {
19 // OnAccept is called for each new accepted connection.
20 // Return false to reject (evloop closes the fd immediately).
21 OnAccept(c *Conn) bool
22 // OnRead is called when data is available on c. Data is in c.Read().
23 // The handler must call c.Consume(n) for data it processes.
24 // Called once per EPOLLIN event - process at most one logical message
25 // per call to keep fair scheduling across connections.
26 OnRead(c *Conn)
27 // OnClose is called just before the fd is closed.
28 OnClose(c *Conn)
29 // OnFD is called for non-connection fds (e.g. worker pipes) registered
30 // via AddFD. The loop calls this on EPOLLIN for those fds.
31 OnFD(fd int32)
32 // OnTick is called periodically when epoll_wait times out (~5s).
33 OnTick()
34 }
35
36 // Loop is a single-threaded epoll event loop.
37 type Loop struct {
38 epfd int
39 lnFD int
40 conns map[int]*Conn
41 extraFDs map[int32]bool // fds registered via AddFD (not connections)
42 handler Handler
43 readBuf []byte
44 }
45
46 // New creates a Loop with the given handler.
47 func New(h Handler) *Loop {
48 return &Loop{
49 epfd: -1,
50 lnFD: -1,
51 conns: map[int]*Conn{},
52 extraFDs: map[int32]bool{},
53 handler: h,
54 readBuf: []byte{:32768},
55 }
56 }
57
58 // Run starts the event loop listening on lnFD (a non-blocking TCP listen socket).
59 // Blocks until SIGTERM/SIGINT or a fatal epoll error.
60 func (l *Loop) Run(lnFD int) error {
61 epfd, err := syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
62 if err != nil {
63 return fmt.Errorf("evloop: epoll_create1: %w", err)
64 }
65 l.epfd = epfd
66 l.lnFD = lnFD
67
68 // Register listen fd.
69 if err := l.epollAdd(lnFD, syscall.EPOLLIN); err != nil {
70 return fmt.Errorf("evloop: epoll add listen: %w", err)
71 }
72 // Register pre-added extra fds.
73 for fd := range l.extraFDs {
74 if err := l.epollAdd(int(fd), syscall.EPOLLIN); err != nil {
75 return fmt.Errorf("evloop: epoll add fd %d: %w", fd, err)
76 }
77 }
78
79 events := []syscall.EpollEvent{:64}
80 for {
81 n, err := syscall.EpollWait(l.epfd, events, 5000)
82 if err != nil {
83 if err == syscall.EINTR {
84 return nil
85 }
86 return fmt.Errorf("evloop: epoll_wait: %w", err)
87 }
88 if n == 0 {
89 l.handler.OnTick()
90 continue
91 }
92 for i := 0; i < n; i++ {
93 ev := events[i]
94 fd := int(ev.Fd)
95 if fd == lnFD {
96 l.acceptAll()
97 continue
98 }
99 if l.extraFDs[int32(fd)] {
100 l.handler.OnFD(int32(fd))
101 continue
102 }
103 c := l.conns[fd]
104 if c == nil {
105 continue
106 }
107 if ev.Events&(syscall.EPOLLERR|syscall.EPOLLHUP) != 0 {
108 l.closeConn(c)
109 continue
110 }
111 if ev.Events&syscall.EPOLLOUT != 0 {
112 c.drainWrite()
113 // If conn still alive and EPOLLIN also set, read too.
114 if l.conns[fd] != nil && ev.Events&syscall.EPOLLIN != 0 {
115 l.readConn(c)
116 }
117 continue
118 }
119 l.readConn(c)
120 }
121 }
122 }
123
124 // AddFD registers an existing fd (e.g. a worker pipe) for EPOLLIN dispatch
125 // via Handler.OnFD. The fd is set non-blocking.
126 func (l *Loop) AddFD(fd int32) {
127 syscall.SetNonblock(int(fd), true)
128 l.extraFDs[fd] = true
129 if l.epfd >= 0 {
130 l.epollAdd(int(fd), syscall.EPOLLIN)
131 }
132 }
133
134 // RemoveFD removes a previously registered extra fd.
135 func (l *Loop) RemoveFD(fd int32) {
136 delete(l.extraFDs, fd)
137 if l.epfd >= 0 {
138 syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_DEL, int(fd), nil)
139 }
140 }
141
142 // Conn returns the Conn for a given fd, or nil.
143 func (l *Loop) Conn(fd int) *Conn {
144 return l.conns[fd]
145 }
146
147 func (l *Loop) acceptAll() {
148 for {
149 nfd, _, err := syscall.Accept4(l.lnFD, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
150 if err != nil {
151 return // EAGAIN = no more pending connections
152 }
153 c := &Conn{
154 FD: nfd,
155 loop: l,
156 }
157 if !l.handler.OnAccept(c) {
158 syscall.Close(nfd)
159 continue
160 }
161 if err := l.epollAdd(nfd, syscall.EPOLLIN); err != nil {
162 syscall.Close(nfd)
163 continue
164 }
165 l.conns[nfd] = c
166 }
167 }
168
169 func (l *Loop) readConn(c *Conn) {
170 n, err := syscall.Read(c.FD, l.readBuf)
171 if n > 0 {
172 c.appendRead(l.readBuf[:n])
173 l.handler.OnRead(c)
174 }
175 if err != nil && err != syscall.EAGAIN && err != syscall.EINTR {
176 l.closeConn(c)
177 }
178 }
179
180 func (l *Loop) closeConn(c *Conn) {
181 if l.conns[c.FD] == nil {
182 return // already closed
183 }
184 l.handler.OnClose(c)
185 syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_DEL, c.FD, nil)
186 syscall.Close(c.FD)
187 delete(l.conns, c.FD)
188 }
189
190 func (l *Loop) modWrite(fd int) {
191 ev := syscall.EpollEvent{Events: syscall.EPOLLIN | syscall.EPOLLOUT, Fd: int32(fd)}
192 syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_MOD, fd, &ev)
193 }
194
195 func (l *Loop) modRead(fd int) {
196 ev := syscall.EpollEvent{Events: syscall.EPOLLIN, Fd: int32(fd)}
197 syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_MOD, fd, &ev)
198 }
199
200 func (l *Loop) epollAdd(fd int, events uint32) error {
201 ev := syscall.EpollEvent{Events: events, Fd: int32(fd)}
202 return syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_ADD, fd, &ev)
203 }
204