// Package transport provides the network layer: epoll event loop, TCP accept, // HTTP/1.1 request parsing, and WebSocket framing. It has no knowledge of // Nostr protocol or any application domain logic. It calls back to a Handler // interface for all domain events. // // Dependency direction: server imports transport, never the reverse. // No smesh package imports are allowed here - only stdlib. package transport import ( "bytes" "fmt" "syscall" ) const maxBuf = 20 << 20 // 20MB max per-connection buffer const ( phaseHTTP = 0 phaseWS = 1 phaseHTTPBody = 2 phaseHTTPDeferred = 3 // waiting for async worker response ) const ( opText byte = 0x1 opBin byte = 0x2 opClose byte = 0x8 opPing byte = 0x9 opPong byte = 0xA ) // HTTPDeferred is returned from Handler.OnHTTP to indicate async processing. // Transport sets the connection to deferred phase and expects CompleteHTTP later. const HTTPDeferred = -1 // Handler is implemented by the server layer. Transport calls these methods // for all connection and message events. type Handler interface { // OnAccept: new TCP connection. Return false to close immediately. OnAccept(fd int, ip string) bool // OnWSUpgrade: WS handshake requested. currentIPWSCount = existing WS conns from ip. // Return (whitelisted, allow). allow=false → 429. OnWSUpgrade(fd int, ip string, currentIPWSCount int) (whitelisted bool, allow bool) // OnWSConnected: 101 response sent. Handler sets up conn state, sends auth challenge. OnWSConnected(fd int) // OnWSMessage: decoded WS payload. OnWSMessage(fd int, payload []byte) // OnWSClose: WS connection closed. Handler cleans up conn state. OnWSClose(fd int) // OnHTTP: complete HTTP request. Return (HTTPDeferred, nil, nil, false) for async. OnHTTP(fd int, method, path string, headers map[string]string, body []byte) (status int, respHeaders map[string]string, respBody []byte, connClose bool) // OnFD: registered worker FD became readable. OnFD(fd int32) // OnTick: called periodically when epoll_wait times out (every ~5s). OnTick() } // Server runs the epoll event loop. type Server struct { BotBlock bool // block known bot User-Agents OnReady func() // called after bind+listen, before epoll loop handler Handler epfd int lnFD int sigFD int conns map[int]*tconn ipConns map[string]int // WS connection counts per IP maxConnPerIP int extraFDs map[int32]bool // registered worker FDs } type tconn struct { fd int phase int buf []byte wpos int remoteIP string whitelisted bool pendingReq *httpReq bodyNeeded int wbuf []byte // pending write data (EAGAIN buffered) wbufClose bool // close connection after wbuf drains } type httpReq struct { method string path string headers map[string]string body []byte } //export moxie_signal_enable func moxie_signal_enable(s uint32) //export moxie_signal_pipe_init func moxie_signal_pipe_init() int32 //export moxie_signal_pipe_read func moxie_signal_pipe_read() int32 var globalSigFD int32 = -1 // InitSignals sets up SIGTERM/SIGINT handling. Must be called before any store // operations so that shutdown signals during slow startup are handled cleanly. func InitSignals() { globalSigFD = moxie_signal_pipe_init() moxie_signal_enable(15) // SIGTERM moxie_signal_enable(2) // SIGINT } // New creates a transport Server. maxConnPerIP=0 means unlimited. func New(handler Handler, maxConnPerIP int) *Server { return &Server{ handler: handler, conns: map[int]*tconn{}, ipConns: map[string]int{}, maxConnPerIP: maxConnPerIP, extraFDs: map[int32]bool{}, } } // RegisterFD adds a file descriptor to the epoll set. // Readable events on this FD call handler.OnFD(fd). // Sets the fd non-blocking so parent-side reads/writes never stall the // epoll event loop (prevents bilateral deadlock on spawn socketpairs). func (s *Server) RegisterFD(fd int32) { s.extraFDs[fd] = true syscall.SetNonblock(int(fd), true) if s.epfd != 0 { epollAdd(s.epfd, int(fd)) } } // RemoveFD removes a file descriptor from the epoll set. func (s *Server) RemoveFD(fd int32) { if s.extraFDs[fd] { delete(s.extraFDs, fd) epollDel(s.epfd, int(fd)) } } // ConnCount returns the current number of tracked TCP connections. func (s *Server) ConnCount() int { return len(s.conns) } // ConnIP returns the effective remote IP for a connection (XFF-substituted). func (s *Server) ConnIP(fd int) string { if c := s.conns[fd]; c != nil { return c.remoteIP } return "" } // ConnIsWhitelisted reports whether fd's IP is whitelisted. func (s *Server) ConnIsWhitelisted(fd int) bool { if c := s.conns[fd]; c != nil { return c.whitelisted } return false } // ConnIsWS reports whether fd is in WS phase. func (s *Server) ConnIsWS(fd int) bool { if c := s.conns[fd]; c != nil { return c.phase == phaseWS } return false } // IPConnCount returns the current WS connection count for ip. func (s *Server) IPConnCount(ip string) int { return s.ipConns[ip] } // SendWS writes a WS text frame. Buffers on EAGAIN; closes on error. func (s *Server) SendWS(fd int, payload []byte) { c := s.conns[fd] if c == nil { return } s.connWrite(c, buildWSFrame(opText, payload), false) } // SendWSErr writes a WS text frame. Returns error on EAGAIN (closes conn). func (s *Server) SendWSErr(fd int, payload []byte) error { c := s.conns[fd] if c == nil { return fmt.Errorf("no conn") } data := buildWSFrame(opText, payload) for len(data) > 0 { n, err := syscall.Write(c.fd, data) if n > 0 { data = data[n:] } if err == syscall.EAGAIN { return syscall.EAGAIN } if err != nil { return err } } return nil } // connWrite writes data to a connection, buffering on EAGAIN. func (s *Server) connWrite(c *tconn, data []byte, closeAfter bool) { if c.wbuf != nil { c.wbuf = append(c.wbuf, data...) if closeAfter { c.wbufClose = true } return } for len(data) > 0 { n, err := syscall.Write(c.fd, data) if n > 0 { data = data[n:] } if err == syscall.EAGAIN { c.wbuf = []byte{:len(data)} copy(c.wbuf, data) c.wbufClose = closeAfter epollModWrite(s.epfd, c.fd) return } if err != nil { s.closeConn(c) return } } if closeAfter { s.closeConn(c) } } // SendHTTP writes an HTTP response. Uses buffered writes; closes on error. func (s *Server) SendHTTP(fd int, status int, headers map[string]string, body []byte) { c := s.conns[fd] if c == nil { return } s.sendHTTPBuffered(c, status, headers, body, false) } // CloseConn closes a connection. func (s *Server) CloseConn(fd int) { if c := s.conns[fd]; c != nil { s.closeConn(c) } } // CompleteHTTP sends an HTTP response for a previously deferred connection. func (s *Server) CompleteHTTP(fd int, status int, headers map[string]string, body []byte, connClose bool) { c := s.conns[fd] if c == nil { return } c.phase = phaseHTTP s.sendHTTPBuffered(c, status, headers, body, connClose) } // ListenAndServe runs the epoll event loop. Returns on signal or error. func (s *Server) ListenAndServe(addr string) error { ip, port := ParseAddr(addr) fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0) if err != nil { return fmt.Errorf("transport: socket: %w", err) } syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) if err := syscall.SetNonblock(fd, true); err != nil { syscall.Close(fd) return fmt.Errorf("transport: nonblock: %w", err) } sa := &syscall.SockaddrInet4{Port: port, Addr: ip} if err := syscall.Bind(fd, sa); err != nil { syscall.Close(fd) return fmt.Errorf("transport: bind %s: %w", addr, err) } if err := syscall.Listen(fd, 4096); err != nil { syscall.Close(fd) return fmt.Errorf("transport: listen: %w", err) } s.lnFD = fd s.epfd, err = syscall.EpollCreate1(0) if err != nil { syscall.Close(fd) return fmt.Errorf("transport: epoll: %w", err) } if err := epollAdd(s.epfd, fd); err != nil { syscall.Close(s.epfd) syscall.Close(fd) return fmt.Errorf("transport: epoll add: %w", err) } if globalSigFD >= 0 { s.sigFD = int(globalSigFD) if err := epollAdd(s.epfd, s.sigFD); err != nil { return fmt.Errorf("transport: epoll add signal pipe: %w", err) } } // Register pre-registered FDs (worker pipes added before ListenAndServe). for fd := range s.extraFDs { if err := epollAdd(s.epfd, int(fd)); err != nil { return fmt.Errorf("transport: epoll add fd %d: %w", fd, err) } } if s.OnReady != nil { s.OnReady() } events := []syscall.EpollEvent{:64} for { n, err := syscall.EpollWait(s.epfd, events, 5000) if err != nil { if err == syscall.EINTR { return nil } return fmt.Errorf("transport: epoll wait: %w", err) } if n == 0 { s.handler.OnTick() continue } for i := 0; i < n; i++ { evFD := int(events[i].Fd) if evFD == s.sigFD { moxie_signal_pipe_read() return nil } else if evFD == s.lnFD { s.acceptAll() } else if s.extraFDs[int32(evFD)] { s.handler.OnFD(int32(evFD)) } else if c := s.conns[evFD]; c != nil { if events[i].Events&(syscall.EPOLLERR|syscall.EPOLLHUP) != 0 { s.closeConn(c) } else if events[i].Events&syscall.EPOLLOUT != 0 { s.drainWrite(c) } else { s.readConn(c) } } } } } func (s *Server) acceptAll() { for { nfd, sa, err := syscall.Accept4(s.lnFD, syscall.SOCK_NONBLOCK) if err != nil { return } ip := peerAddr(sa) if !s.handler.OnAccept(nfd, ip) { syscall.Close(nfd) continue } if err := epollAdd(s.epfd, nfd); err != nil { syscall.Close(nfd) continue } s.conns[nfd] = &tconn{ fd: nfd, phase: phaseHTTP, buf: []byte{:4096}, remoteIP: ip, } } } func peerAddr(sa syscall.Sockaddr) string { if sa4, ok := sa.(*syscall.SockaddrInet4); ok { b := []byte{:0:20} b = appendInt(b, int(sa4.Addr[0])) b = append(b, '.') b = appendInt(b, int(sa4.Addr[1])) b = append(b, '.') b = appendInt(b, int(sa4.Addr[2])) b = append(b, '.') b = appendInt(b, int(sa4.Addr[3])) return string(makeCopy(b)) } return "unknown" } func (s *Server) readConn(c *tconn) { if c.wbuf != nil { return } avail := len(c.buf) - c.wpos if avail < 512 { if len(c.buf) >= maxBuf { s.closeConn(c) return } nb := []byte{:len(c.buf) * 2} copy(nb, c.buf[:c.wpos]) c.buf = nb } n, err := syscall.Read(c.fd, c.buf[c.wpos:]) if n <= 0 { if err == nil || (err != syscall.EAGAIN && err != syscall.EINTR) { s.closeConn(c) } return } c.wpos += n switch c.phase { case phaseHTTP: s.processHTTP(c) case phaseHTTPBody: s.processHTTPBody(c) case phaseWS: s.processWS(c) case phaseHTTPDeferred: c.wpos = 0 // drop bytes while awaiting async response } } func (s *Server) closeConn(c *tconn) { epollDel(s.epfd, c.fd) syscall.Close(c.fd) delete(s.conns, c.fd) if c.phase == phaseWS { s.handler.OnWSClose(c.fd) if n := s.ipConns[c.remoteIP] - 1; n <= 0 { delete(s.ipConns, c.remoteIP) } else { s.ipConns[c.remoteIP] = n } } } func (s *Server) keepAlive(c *tconn) { c.phase = phaseHTTP c.pendingReq = nil c.bodyNeeded = 0 if c.wpos > 0 { s.processHTTP(c) } } func (s *Server) processHTTP(c *tconn) { data := c.buf[:c.wpos] end := bytes.Index(data, []byte("\r\n\r\n")) if end < 0 { return } consumed := end + 4 req := parseHTTPHeaders(data[:end]) if req == nil { s.closeConn(c) return } copy(c.buf, c.buf[consumed:c.wpos]) c.wpos -= consumed // Reverse proxy: substitute real IP from X-Forwarded-For before any checks. if c.remoteIP == "127.0.0.1" { if xff := req.headers["x-forwarded-for"]; xff != "" { realIP := FirstXFF(xff) if len(realIP) > 0 { c.remoteIP = realIP } } } if bytes.EqualFold([]byte(req.headers["upgrade"]), []byte("websocket")) { s.upgradeWS(c, req) return } if s.BotBlock && isBot(req.headers["user-agent"]) { writeHTTPResponse(c.fd, 403, nil, []byte("forbidden")) s.closeConn(c) return } cl := parseContentLength(req.headers["content-length"]) if cl > 0 && cl <= maxBuf { c.pendingReq = req c.bodyNeeded = cl c.phase = phaseHTTPBody s.processHTTPBody(c) return } status, headers, body, connClose := s.handler.OnHTTP(c.fd, req.method, req.path, req.headers, nil) if status == HTTPDeferred { c.phase = phaseHTTPDeferred return } s.sendHTTPBuffered(c, status, headers, body, connClose || req.headers["connection"] == "close") } func (s *Server) processHTTPBody(c *tconn) { if c.wpos < c.bodyNeeded { return } c.pendingReq.body = makeCopy(c.buf[:c.bodyNeeded]) copy(c.buf, c.buf[c.bodyNeeded:c.wpos]) c.wpos -= c.bodyNeeded req := c.pendingReq c.pendingReq = nil status, headers, body, connClose := s.handler.OnHTTP(c.fd, req.method, req.path, req.headers, req.body) if status == HTTPDeferred { c.phase = phaseHTTPDeferred return } s.sendHTTPBuffered(c, status, headers, body, connClose || req.headers["connection"] == "close") } // sendHTTPBuffered writes an HTTP response, buffering on EAGAIN. func (s *Server) sendHTTPBuffered(c *tconn, status int, headers map[string]string, body []byte, connClose bool) { s.connWrite(c, buildHTTPResponse(status, headers, body), connClose) if c.wbuf == nil && s.conns[c.fd] != nil && !connClose { s.keepAlive(c) } } // drainWrite flushes pending write buffer when socket becomes writable. func (s *Server) drainWrite(c *tconn) { for len(c.wbuf) > 0 { n, err := syscall.Write(c.fd, c.wbuf) if n > 0 { c.wbuf = c.wbuf[n:] } if err == syscall.EAGAIN { return } if err != nil { s.closeConn(c) return } } c.wbuf = nil epollModRead(s.epfd, c.fd) if c.wbufClose { s.closeConn(c) return } if c.phase == phaseHTTP || c.phase == phaseHTTPBody { s.keepAlive(c) } } // Epoll helpers. func epollAdd(epfd, fd int) error { ev := syscall.EpollEvent{Events: syscall.EPOLLIN, Fd: int32(fd)} return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, fd, &ev) } func epollModWrite(epfd, fd int) { ev := syscall.EpollEvent{Events: syscall.EPOLLIN | syscall.EPOLLOUT, Fd: int32(fd)} syscall.EpollCtl(epfd, syscall.EPOLL_CTL_MOD, fd, &ev) } func epollModRead(epfd, fd int) { ev := syscall.EpollEvent{Events: syscall.EPOLLIN, Fd: int32(fd)} syscall.EpollCtl(epfd, syscall.EPOLL_CTL_MOD, fd, &ev) } func epollDel(epfd, fd int) error { return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, fd, nil) }