evloop.mx raw

   1  // Package evloop provides a non-blocking event loop for single-threaded
   2  // moxie servers. It is NOT a framework: it provides buffered I/O primitives
   3  // and an epoll dispatch loop. Protocol parsing is the caller's concern.
   4  //
   5  // Core invariant: no write anywhere in this package ever blocks.
   6  // All writes are: attempt once, buffer remainder, drain via EPOLLOUT.
   7  // A blocking write in a single-threaded event loop is a deadlock.
   8  //
   9  // Linux only (epoll). Darwin (kqueue) is future work.
  10  package evloop
  11  
  12  import (
  13  	"fmt"
  14  	"syscall"
  15  )
  16  
  17  // Handler is implemented by the server layer above evloop.
  18  type Handler interface {
  19  	// OnAccept is called for each new accepted connection.
  20  	// Return false to reject (evloop closes the fd immediately).
  21  	OnAccept(c *Conn) bool
  22  	// OnRead is called when data is available on c. Data is in c.Read().
  23  	// The handler must call c.Consume(n) for data it processes.
  24  	// Called once per EPOLLIN event - process at most one logical message
  25  	// per call to keep fair scheduling across connections.
  26  	OnRead(c *Conn)
  27  	// OnClose is called just before the fd is closed.
  28  	OnClose(c *Conn)
  29  	// OnFD is called for non-connection fds (e.g. worker pipes) registered
  30  	// via AddFD. The loop calls this on EPOLLIN for those fds.
  31  	OnFD(fd int32)
  32  	// OnTick is called periodically when epoll_wait times out (~5s).
  33  	OnTick()
  34  }
  35  
  36  // Loop is a single-threaded epoll event loop.
  37  type Loop struct {
  38  	epfd    int
  39  	lnFD    int
  40  	conns   map[int]*Conn
  41  	extraFDs map[int32]bool // fds registered via AddFD (not connections)
  42  	handler Handler
  43  	readBuf []byte
  44  }
  45  
  46  // New creates a Loop with the given handler.
  47  func New(h Handler) *Loop {
  48  	return &Loop{
  49  		epfd:     -1,
  50  		lnFD:     -1,
  51  		conns:    map[int]*Conn{},
  52  		extraFDs: map[int32]bool{},
  53  		handler:  h,
  54  		readBuf:  []byte{:32768},
  55  	}
  56  }
  57  
  58  // Run starts the event loop listening on lnFD (a non-blocking TCP listen socket).
  59  // Blocks until SIGTERM/SIGINT or a fatal epoll error.
  60  func (l *Loop) Run(lnFD int) error {
  61  	epfd, err := syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
  62  	if err != nil {
  63  		return fmt.Errorf("evloop: epoll_create1: %w", err)
  64  	}
  65  	l.epfd = epfd
  66  	l.lnFD = lnFD
  67  
  68  	// Register listen fd.
  69  	if err := l.epollAdd(lnFD, syscall.EPOLLIN); err != nil {
  70  		return fmt.Errorf("evloop: epoll add listen: %w", err)
  71  	}
  72  	// Register pre-added extra fds.
  73  	for fd := range l.extraFDs {
  74  		if err := l.epollAdd(int(fd), syscall.EPOLLIN); err != nil {
  75  			return fmt.Errorf("evloop: epoll add fd %d: %w", fd, err)
  76  		}
  77  	}
  78  
  79  	events := []syscall.EpollEvent{:64}
  80  	for {
  81  		n, err := syscall.EpollWait(l.epfd, events, 5000)
  82  		if err != nil {
  83  			if err == syscall.EINTR {
  84  				return nil
  85  			}
  86  			return fmt.Errorf("evloop: epoll_wait: %w", err)
  87  		}
  88  		if n == 0 {
  89  			l.handler.OnTick()
  90  			continue
  91  		}
  92  		for i := 0; i < n; i++ {
  93  			ev := events[i]
  94  			fd := int(ev.Fd)
  95  			if fd == lnFD {
  96  				l.acceptAll()
  97  				continue
  98  			}
  99  			if l.extraFDs[int32(fd)] {
 100  				l.handler.OnFD(int32(fd))
 101  				continue
 102  			}
 103  			c := l.conns[fd]
 104  			if c == nil {
 105  				continue
 106  			}
 107  			if ev.Events&(syscall.EPOLLERR|syscall.EPOLLHUP) != 0 {
 108  				l.closeConn(c)
 109  				continue
 110  			}
 111  			if ev.Events&syscall.EPOLLOUT != 0 {
 112  				c.drainWrite()
 113  				// If conn still alive and EPOLLIN also set, read too.
 114  				if l.conns[fd] != nil && ev.Events&syscall.EPOLLIN != 0 {
 115  					l.readConn(c)
 116  				}
 117  				continue
 118  			}
 119  			l.readConn(c)
 120  		}
 121  	}
 122  }
 123  
 124  // AddFD registers an existing fd (e.g. a worker pipe) for EPOLLIN dispatch
 125  // via Handler.OnFD. The fd is set non-blocking.
 126  func (l *Loop) AddFD(fd int32) {
 127  	syscall.SetNonblock(int(fd), true)
 128  	l.extraFDs[fd] = true
 129  	if l.epfd >= 0 {
 130  		l.epollAdd(int(fd), syscall.EPOLLIN)
 131  	}
 132  }
 133  
 134  // RemoveFD removes a previously registered extra fd.
 135  func (l *Loop) RemoveFD(fd int32) {
 136  	delete(l.extraFDs, fd)
 137  	if l.epfd >= 0 {
 138  		syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_DEL, int(fd), nil)
 139  	}
 140  }
 141  
 142  // Conn returns the Conn for a given fd, or nil.
 143  func (l *Loop) Conn(fd int) *Conn {
 144  	return l.conns[fd]
 145  }
 146  
 147  func (l *Loop) acceptAll() {
 148  	for {
 149  		nfd, _, err := syscall.Accept4(l.lnFD, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
 150  		if err != nil {
 151  			return // EAGAIN = no more pending connections
 152  		}
 153  		c := &Conn{
 154  			FD:   nfd,
 155  			loop: l,
 156  		}
 157  		if !l.handler.OnAccept(c) {
 158  			syscall.Close(nfd)
 159  			continue
 160  		}
 161  		if err := l.epollAdd(nfd, syscall.EPOLLIN); err != nil {
 162  			syscall.Close(nfd)
 163  			continue
 164  		}
 165  		l.conns[nfd] = c
 166  	}
 167  }
 168  
 169  func (l *Loop) readConn(c *Conn) {
 170  	n, err := syscall.Read(c.FD, l.readBuf)
 171  	if n > 0 {
 172  		c.appendRead(l.readBuf[:n])
 173  		l.handler.OnRead(c)
 174  	}
 175  	if err != nil && err != syscall.EAGAIN && err != syscall.EINTR {
 176  		l.closeConn(c)
 177  	}
 178  }
 179  
 180  func (l *Loop) closeConn(c *Conn) {
 181  	if l.conns[c.FD] == nil {
 182  		return // already closed
 183  	}
 184  	l.handler.OnClose(c)
 185  	syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_DEL, c.FD, nil)
 186  	syscall.Close(c.FD)
 187  	delete(l.conns, c.FD)
 188  }
 189  
 190  func (l *Loop) modWrite(fd int) {
 191  	ev := syscall.EpollEvent{Events: syscall.EPOLLIN | syscall.EPOLLOUT, Fd: int32(fd)}
 192  	syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_MOD, fd, &ev)
 193  }
 194  
 195  func (l *Loop) modRead(fd int) {
 196  	ev := syscall.EpollEvent{Events: syscall.EPOLLIN, Fd: int32(fd)}
 197  	syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_MOD, fd, &ev)
 198  }
 199  
 200  func (l *Loop) epollAdd(fd int, events uint32) error {
 201  	ev := syscall.EpollEvent{Events: events, Fd: int32(fd)}
 202  	return syscall.EpollCtl(l.epfd, syscall.EPOLL_CTL_ADD, fd, &ev)
 203  }
 204