// Package evloop provides a non-blocking event loop for single-threaded // moxie servers. It is NOT a framework: it provides buffered I/O primitives // and an epoll dispatch loop. Protocol parsing is the caller's concern. // // Core invariant: no write anywhere in this package ever blocks. // All writes are: attempt once, buffer remainder, drain via EPOLLOUT. // A blocking write in a single-threaded event loop is a deadlock. // // Linux only (epoll). Darwin (kqueue) is future work. package evloop import ( "fmt" "syscall" ) // Handler is implemented by the server layer above evloop. type Handler interface { // OnAccept is called for each new accepted connection. // Return false to reject (evloop closes the fd immediately). OnAccept(c *Conn) bool // OnRead is called when data is available on c. Data is in c.Read(). // The handler must call c.Consume(n) for data it processes. // Called once per EPOLLIN event - process at most one logical message // per call to keep fair scheduling across connections. OnRead(c *Conn) // OnClose is called just before the fd is closed. OnClose(c *Conn) // OnFD is called for non-connection fds (e.g. worker pipes) registered // via AddFD. The loop calls this on EPOLLIN for those fds. OnFD(fd int32) // OnTick is called periodically when epoll_wait times out (~5s). OnTick() } // Loop is a single-threaded epoll event loop. type Loop struct { epfd int lnFD int conns map[int]*Conn extraFDs map[int32]bool // fds registered via AddFD (not connections) handler Handler readBuf []byte } // New creates a Loop with the given handler. func New(h Handler) *Loop { return &Loop{ epfd: -1, lnFD: -1, conns: map[int]*Conn{}, extraFDs: map[int32]bool{}, handler: h, readBuf: []byte{:32768}, } } // Run starts the event loop listening on lnFD (a non-blocking TCP listen socket). // Blocks until SIGTERM/SIGINT or a fatal epoll error. func (l *Loop) Run(lnFD int) error { epfd, err := syscall.EpollCreate1(syscall.EPOLL_CLOEXEC) if err != nil { return fmt.Errorf("evloop: epoll_create1: %w", err) } l.epfd = epfd l.lnFD = lnFD // Register listen fd. if err := l.epollAdd(lnFD, syscall.EPOLLIN); err != nil { return fmt.Errorf("evloop: epoll add listen: %w", err) } // Register pre-added extra fds. for fd := range l.extraFDs { if err := l.epollAdd(int(fd), syscall.EPOLLIN); err != nil { return fmt.Errorf("evloop: epoll add fd %d: %w", fd, err) } } events := []syscall.EpollEvent{:64} for { n, err := syscall.EpollWait(l.epfd, events, 5000) if err != nil { if err == syscall.EINTR { return nil } return fmt.Errorf("evloop: epoll_wait: %w", err) } if n == 0 { l.handler.OnTick() continue } for i := 0; i < n; i++ { ev := events[i] fd := int(ev.Fd) if fd == lnFD { l.acceptAll() continue } if l.extraFDs[int32(fd)] { l.handler.OnFD(int32(fd)) continue } c := l.conns[fd] if c == nil { continue } if ev.Events&(syscall.EPOLLERR|syscall.EPOLLHUP) != 0 { l.closeConn(c) continue } if ev.Events&syscall.EPOLLOUT != 0 { c.drainWrite() // If conn still alive and EPOLLIN also set, read too. if l.conns[fd] != nil && ev.Events&syscall.EPOLLIN != 0 { l.readConn(c) } continue } l.readConn(c) } } } // AddFD registers an existing fd (e.g. a worker pipe) for EPOLLIN dispatch // via Handler.OnFD. The fd is set non-blocking. func (l *Loop) AddFD(fd int32) { syscall.SetNonblock(int(fd), true) l.extraFDs[fd] = true if l.epfd >= 0 { l.epollAdd(int(fd), syscall.EPOLLIN) } } // RemoveFD removes a previously registered extra fd. func (l *Loop) RemoveFD(fd int32) { delete(l.extraFDs, fd) if l.epfd >= 0 { syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_DEL, int(fd), nil) } } // Conn returns the Conn for a given fd, or nil. func (l *Loop) Conn(fd int) *Conn { return l.conns[fd] } func (l *Loop) acceptAll() { for { nfd, _, err := syscall.Accept4(l.lnFD, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC) if err != nil { return // EAGAIN = no more pending connections } c := &Conn{ FD: nfd, loop: l, } if !l.handler.OnAccept(c) { syscall.Close(nfd) continue } if err := l.epollAdd(nfd, syscall.EPOLLIN); err != nil { syscall.Close(nfd) continue } l.conns[nfd] = c } } func (l *Loop) readConn(c *Conn) { n, err := syscall.Read(c.FD, l.readBuf) if n > 0 { c.appendRead(l.readBuf[:n]) l.handler.OnRead(c) } if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { l.closeConn(c) } } func (l *Loop) closeConn(c *Conn) { if l.conns[c.FD] == nil { return // already closed } l.handler.OnClose(c) syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_DEL, c.FD, nil) syscall.Close(c.FD) delete(l.conns, c.FD) } func (l *Loop) modWrite(fd int) { ev := syscall.EpollEvent{Events: syscall.EPOLLIN | syscall.EPOLLOUT, Fd: int32(fd)} syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_MOD, fd, &ev) } func (l *Loop) modRead(fd int) { ev := syscall.EpollEvent{Events: syscall.EPOLLIN, Fd: int32(fd)} syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_MOD, fd, &ev) } func (l *Loop) epollAdd(fd int, events uint32) error { ev := syscall.EpollEvent{Events: events, Fd: int32(fd)} return syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_ADD, fd, &ev) }