package runtime // Netpoller for moxie on Linux using epoll. // Integrates with the cooperative scheduler via task.Pause()/scheduleTask(). // Uses libc wrappers (not syscall package) to avoid circular imports. import ( "internal/task" "unsafe" ) // Error codes matching internal/poll constants. const ( pollNoError = 0 pollErrClosing = 1 pollErrTimeout = 2 pollErrNotPollable = 3 ) // epoll constants (same on all linux arches). const ( _EPOLLIN = 0x1 _EPOLLOUT = 0x4 _EPOLLERR = 0x8 _EPOLLHUP = 0x10 _EPOLL_CTL_ADD = 0x1 _EPOLL_CTL_DEL = 0x2 _EPOLL_CTL_MOD = 0x3 _EPOLL_CLOEXEC = 0x80000 ) // epollEvent matches the kernel struct for amd64. type epollEvent struct { events uint32 fd int32 pad int32 } // libc wrappers for epoll syscalls. // //export epoll_create1 func libc_epoll_create1(flags int32) int32 //export epoll_ctl func libc_epoll_ctl(epfd int32, op int32, fd int32, event *epollEvent) int32 //export epoll_wait func libc_epoll_wait(epfd int32, events *epollEvent, maxevents int32, timeout int32) int32 // pollDesc tracks per-fd state for the netpoller. type pollDesc struct { fd int32 closing bool // Tasks waiting for read/write readiness. rg *task.Task wg *task.Task // Ready flags set by epoll results, cleared by reset. rready bool wready bool // Deadline support (nanoseconds, 0 = no deadline, <0 = expired). rd int64 wd int64 } var ( epfd int32 = -1 pdAlloc [512]pollDesc pdCount int hasPoll bool ) func pdLookup(ctx uintptr) *pollDesc { return (*pollDesc)(unsafe.Pointer(ctx)) } //go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit func poll_runtime_pollServerInit() { fd := libc_epoll_create1(int32(_EPOLL_CLOEXEC)) if fd < 0 { runtimePanic("netpoll: epoll_create1 failed") } epfd = fd hasPoll = true } //go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen func poll_runtime_pollOpen(fd uintptr) (uintptr, int) { if pdCount >= len(pdAlloc) { runtimePanic("netpoll: too many fds") } pd := &pdAlloc[pdCount] pdCount++ pd.fd = int32(fd) pd.closing = false pd.rg = nil pd.wg = nil pd.rready = false pd.wready = false pd.rd = 0 pd.wd = 0 ev := epollEvent{ events: _EPOLLIN | _EPOLLOUT | _EPOLLHUP | _EPOLLERR, fd: int32(fd), } if libc_epoll_ctl(epfd, int32(_EPOLL_CTL_ADD), int32(fd), &ev) < 0 { pdCount-- return 0, 22 // EINVAL } return uintptr(unsafe.Pointer(pd)), 0 } //go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose func poll_runtime_pollClose(ctx uintptr) { pd := pdLookup(ctx) libc_epoll_ctl(epfd, int32(_EPOLL_CTL_DEL), pd.fd, nil) pd.fd = -1 } //go:linkname poll_runtime_pollReset internal/poll.runtime_pollReset func poll_runtime_pollReset(ctx uintptr, mode int) int { pd := pdLookup(ctx) if pd.closing { return pollErrClosing } if mode == 'r' { pd.rready = false } else if mode == 'w' { pd.wready = false } return pollNoError } //go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait func poll_runtime_pollWait(ctx uintptr, mode int) int { pd := pdLookup(ctx) if pd.closing { return pollErrClosing } if mode == 'r' && pd.rready { pd.rready = false return pollNoError } if mode == 'w' && pd.wready { pd.wready = false return pollNoError } if mode == 'r' && pd.rd < 0 { return pollErrTimeout } if mode == 'w' && pd.wd < 0 { return pollErrTimeout } // Single-threaded: block on epoll_wait until this fd is ready. // Narrow the epoll interest set to only the event we need, // otherwise write-readiness causes busy-spinning when waiting for reads. var wantEvents uint32 = _EPOLLHUP | _EPOLLERR if mode == 'r' { wantEvents |= _EPOLLIN } else { wantEvents |= _EPOLLOUT } modEv := epollEvent{events: wantEvents, fd: pd.fd} libc_epoll_ctl(epfd, int32(_EPOLL_CTL_MOD), pd.fd, &modEv) for { // Compute timeout from deadline. timeoutMs := int32(-1) // infinite if mode == 'r' && pd.rd > 0 { ms := (pd.rd - nanotime()) / 1000000 if ms <= 0 { break } timeoutMs = int32(ms) } else if mode == 'w' && pd.wd > 0 { ms := (pd.wd - nanotime()) / 1000000 if ms <= 0 { break } timeoutMs = int32(ms) } // Clamp to 100ms so timers fire regularly. if timeoutMs < 0 || timeoutMs > 100 { timeoutMs = 100 } var events [64]epollEvent n := libc_epoll_wait(epfd, &events[0], 64, timeoutMs) netpollProcess(events[:], int(n)) fireTimers() if pd.closing { break } if mode == 'r' && pd.rready { pd.rready = false // Restore full interest set. restoreEv := epollEvent{events: _EPOLLIN | _EPOLLOUT | _EPOLLHUP | _EPOLLERR, fd: pd.fd} libc_epoll_ctl(epfd, int32(_EPOLL_CTL_MOD), pd.fd, &restoreEv) return pollNoError } if mode == 'w' && pd.wready { pd.wready = false restoreEv := epollEvent{events: _EPOLLIN | _EPOLLOUT | _EPOLLHUP | _EPOLLERR, fd: pd.fd} libc_epoll_ctl(epfd, int32(_EPOLL_CTL_MOD), pd.fd, &restoreEv) return pollNoError } if mode == 'r' && pd.rd < 0 { break } if mode == 'w' && pd.wd < 0 { break } } // Restore full interest set on exit. restoreEv := epollEvent{events: _EPOLLIN | _EPOLLOUT | _EPOLLHUP | _EPOLLERR, fd: pd.fd} libc_epoll_ctl(epfd, int32(_EPOLL_CTL_MOD), pd.fd, &restoreEv) if pd.closing { return pollErrClosing } return pollErrTimeout } //go:linkname poll_runtime_pollWaitCanceled internal/poll.runtime_pollWaitCanceled func poll_runtime_pollWaitCanceled(ctx uintptr, mode int) { // In cooperative scheduler, goroutine is already running when this is called. } //go:linkname poll_runtime_pollSetDeadline internal/poll.runtime_pollSetDeadline func poll_runtime_pollSetDeadline(ctx uintptr, d int64, mode int) { pd := pdLookup(ctx) // Convert relative duration to absolute deadline so the wait loop // can compare against nanotime() correctly. if d > 0 { d = nanotime() + d } if mode == 'r' || mode == 'r'+'w' { pd.rd = d if d < 0 && pd.rg != nil { t := pd.rg pd.rg = nil scheduleTask(t) } } if mode == 'w' || mode == 'r'+'w' { pd.wd = d if d < 0 && pd.wg != nil { t := pd.wg pd.wg = nil scheduleTask(t) } } } //go:linkname poll_runtime_pollUnblock internal/poll.runtime_pollUnblock func poll_runtime_pollUnblock(ctx uintptr) { pd := pdLookup(ctx) pd.closing = true if pd.rg != nil { t := pd.rg pd.rg = nil scheduleTask(t) } if pd.wg != nil { t := pd.wg pd.wg = nil scheduleTask(t) } } //go:linkname poll_runtime_isPollServerDescriptor internal/poll.runtime_isPollServerDescriptor func poll_runtime_isPollServerDescriptor(fd uintptr) bool { return int32(fd) == epfd } // netpollCheck does a non-blocking epoll_wait and wakes ready goroutines. func netpollCheck() { if !hasPoll { return } var events [64]epollEvent n := libc_epoll_wait(epfd, &events[0], 64, 0) netpollProcess(events[:], int(n)) } // netpollBlock does a blocking epoll_wait with timeout (ms). func netpollBlock(timeoutMs int) { if !hasPoll { return } var events [64]epollEvent n := libc_epoll_wait(epfd, &events[0], 64, int32(timeoutMs)) netpollProcess(events[:], int(n)) } func netpollProcess(events []epollEvent, n int) { for i := 0; i < n; i++ { ev := &events[i] pd := netpollFindPD(ev.fd) if pd == nil { continue } if ev.events&(_EPOLLIN|_EPOLLHUP|_EPOLLERR) != 0 { pd.rready = true if pd.rg != nil { t := pd.rg pd.rg = nil scheduleTask(t) } } if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { pd.wready = true if pd.wg != nil { t := pd.wg pd.wg = nil scheduleTask(t) } } } } func netpollFindPD(fd int32) *pollDesc { for i := 0; i < pdCount; i++ { if pdAlloc[i].fd == fd { return &pdAlloc[i] } } return nil }