poll.mx raw

   1  package runtime
   2  
   3  // Netpoller for moxie on Linux using epoll.
   4  // Integrates with the cooperative scheduler via task.Pause()/scheduleTask().
   5  // Uses libc wrappers (not syscall package) to avoid circular imports.
   6  
   7  import (
   8  	"internal/task"
   9  	"unsafe"
  10  )
  11  
  12  // Error codes matching internal/poll constants.
  13  const (
  14  	pollNoError        = 0
  15  	pollErrClosing     = 1
  16  	pollErrTimeout     = 2
  17  	pollErrNotPollable = 3
  18  )
  19  
  20  // epoll constants (same on all linux arches).
  21  const (
  22  	_EPOLLIN      = 0x1
  23  	_EPOLLOUT     = 0x4
  24  	_EPOLLERR     = 0x8
  25  	_EPOLLHUP     = 0x10
  26  	_EPOLL_CTL_ADD = 0x1
  27  	_EPOLL_CTL_DEL = 0x2
  28  	_EPOLL_CTL_MOD = 0x3
  29  	_EPOLL_CLOEXEC = 0x80000
  30  )
  31  
  32  // epollEvent matches the kernel struct for amd64.
  33  type epollEvent struct {
  34  	events uint32
  35  	fd     int32
  36  	pad    int32
  37  }
  38  
  39  // libc wrappers for epoll syscalls.
  40  //
  41  //export epoll_create1
  42  func libc_epoll_create1(flags int32) int32
  43  
  44  //export epoll_ctl
  45  func libc_epoll_ctl(epfd int32, op int32, fd int32, event *epollEvent) int32
  46  
  47  //export epoll_wait
  48  func libc_epoll_wait(epfd int32, events *epollEvent, maxevents int32, timeout int32) int32
  49  
  50  // pollDesc tracks per-fd state for the netpoller.
  51  type pollDesc struct {
  52  	fd      int32
  53  	closing bool
  54  
  55  	// Tasks waiting for read/write readiness.
  56  	rg *task.Task
  57  	wg *task.Task
  58  
  59  	// Ready flags set by epoll results, cleared by reset.
  60  	rready bool
  61  	wready bool
  62  
  63  	// Deadline support (nanoseconds, 0 = no deadline, <0 = expired).
  64  	rd int64
  65  	wd int64
  66  }
  67  
  68  var (
  69  	epfd     int32 = -1
  70  	pdAlloc  [512]pollDesc
  71  	pdCount  int
  72  	hasPoll  bool
  73  )
  74  
  75  func pdLookup(ctx uintptr) *pollDesc {
  76  	return (*pollDesc)(unsafe.Pointer(ctx))
  77  }
  78  
  79  //go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
  80  func poll_runtime_pollServerInit() {
  81  	fd := libc_epoll_create1(int32(_EPOLL_CLOEXEC))
  82  	if fd < 0 {
  83  		runtimePanic("netpoll: epoll_create1 failed")
  84  	}
  85  	epfd = fd
  86  	hasPoll = true
  87  }
  88  
  89  //go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
  90  func poll_runtime_pollOpen(fd uintptr) (uintptr, int) {
  91  	if pdCount >= len(pdAlloc) {
  92  		runtimePanic("netpoll: too many fds")
  93  	}
  94  	pd := &pdAlloc[pdCount]
  95  	pdCount++
  96  	pd.fd = int32(fd)
  97  	pd.closing = false
  98  	pd.rg = nil
  99  	pd.wg = nil
 100  	pd.rready = false
 101  	pd.wready = false
 102  	pd.rd = 0
 103  	pd.wd = 0
 104  
 105  	ev := epollEvent{
 106  		events: _EPOLLIN | _EPOLLOUT | _EPOLLHUP | _EPOLLERR,
 107  		fd:     int32(fd),
 108  	}
 109  	if libc_epoll_ctl(epfd, int32(_EPOLL_CTL_ADD), int32(fd), &ev) < 0 {
 110  		pdCount--
 111  		return 0, 22 // EINVAL
 112  	}
 113  	return uintptr(unsafe.Pointer(pd)), 0
 114  }
 115  
 116  //go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose
 117  func poll_runtime_pollClose(ctx uintptr) {
 118  	pd := pdLookup(ctx)
 119  	libc_epoll_ctl(epfd, int32(_EPOLL_CTL_DEL), pd.fd, nil)
 120  	pd.fd = -1
 121  }
 122  
 123  //go:linkname poll_runtime_pollReset internal/poll.runtime_pollReset
 124  func poll_runtime_pollReset(ctx uintptr, mode int) int {
 125  	pd := pdLookup(ctx)
 126  	if pd.closing {
 127  		return pollErrClosing
 128  	}
 129  	if mode == 'r' {
 130  		pd.rready = false
 131  	} else if mode == 'w' {
 132  		pd.wready = false
 133  	}
 134  	return pollNoError
 135  }
 136  
 137  //go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
 138  func poll_runtime_pollWait(ctx uintptr, mode int) int {
 139  	pd := pdLookup(ctx)
 140  	if pd.closing {
 141  		return pollErrClosing
 142  	}
 143  
 144  	if mode == 'r' && pd.rready {
 145  		pd.rready = false
 146  		return pollNoError
 147  	}
 148  	if mode == 'w' && pd.wready {
 149  		pd.wready = false
 150  		return pollNoError
 151  	}
 152  
 153  	if mode == 'r' && pd.rd < 0 {
 154  		return pollErrTimeout
 155  	}
 156  	if mode == 'w' && pd.wd < 0 {
 157  		return pollErrTimeout
 158  	}
 159  
 160  	// Single-threaded: block on epoll_wait until this fd is ready.
 161  	// Narrow the epoll interest set to only the event we need,
 162  	// otherwise write-readiness causes busy-spinning when waiting for reads.
 163  	var wantEvents uint32 = _EPOLLHUP | _EPOLLERR
 164  	if mode == 'r' {
 165  		wantEvents |= _EPOLLIN
 166  	} else {
 167  		wantEvents |= _EPOLLOUT
 168  	}
 169  	modEv := epollEvent{events: wantEvents, fd: pd.fd}
 170  	libc_epoll_ctl(epfd, int32(_EPOLL_CTL_MOD), pd.fd, &modEv)
 171  
 172  	for {
 173  		// Compute timeout from deadline.
 174  		timeoutMs := int32(-1) // infinite
 175  		if mode == 'r' && pd.rd > 0 {
 176  			ms := (pd.rd - nanotime()) / 1000000
 177  			if ms <= 0 {
 178  				break
 179  			}
 180  			timeoutMs = int32(ms)
 181  		} else if mode == 'w' && pd.wd > 0 {
 182  			ms := (pd.wd - nanotime()) / 1000000
 183  			if ms <= 0 {
 184  				break
 185  			}
 186  			timeoutMs = int32(ms)
 187  		}
 188  
 189  		// Clamp to 100ms so timers fire regularly.
 190  		if timeoutMs < 0 || timeoutMs > 100 {
 191  			timeoutMs = 100
 192  		}
 193  
 194  		var events [64]epollEvent
 195  		n := libc_epoll_wait(epfd, &events[0], 64, timeoutMs)
 196  		netpollProcess(events[:], int(n))
 197  		fireTimers()
 198  
 199  		if pd.closing {
 200  			break
 201  		}
 202  		if mode == 'r' && pd.rready {
 203  			pd.rready = false
 204  			// Restore full interest set.
 205  			restoreEv := epollEvent{events: _EPOLLIN | _EPOLLOUT | _EPOLLHUP | _EPOLLERR, fd: pd.fd}
 206  			libc_epoll_ctl(epfd, int32(_EPOLL_CTL_MOD), pd.fd, &restoreEv)
 207  			return pollNoError
 208  		}
 209  		if mode == 'w' && pd.wready {
 210  			pd.wready = false
 211  			restoreEv := epollEvent{events: _EPOLLIN | _EPOLLOUT | _EPOLLHUP | _EPOLLERR, fd: pd.fd}
 212  			libc_epoll_ctl(epfd, int32(_EPOLL_CTL_MOD), pd.fd, &restoreEv)
 213  			return pollNoError
 214  		}
 215  		if mode == 'r' && pd.rd < 0 {
 216  			break
 217  		}
 218  		if mode == 'w' && pd.wd < 0 {
 219  			break
 220  		}
 221  	}
 222  
 223  	// Restore full interest set on exit.
 224  	restoreEv := epollEvent{events: _EPOLLIN | _EPOLLOUT | _EPOLLHUP | _EPOLLERR, fd: pd.fd}
 225  	libc_epoll_ctl(epfd, int32(_EPOLL_CTL_MOD), pd.fd, &restoreEv)
 226  	if pd.closing {
 227  		return pollErrClosing
 228  	}
 229  	return pollErrTimeout
 230  }
 231  
 232  //go:linkname poll_runtime_pollWaitCanceled internal/poll.runtime_pollWaitCanceled
 233  func poll_runtime_pollWaitCanceled(ctx uintptr, mode int) {
 234  	// In cooperative scheduler, goroutine is already running when this is called.
 235  }
 236  
 237  //go:linkname poll_runtime_pollSetDeadline internal/poll.runtime_pollSetDeadline
 238  func poll_runtime_pollSetDeadline(ctx uintptr, d int64, mode int) {
 239  	pd := pdLookup(ctx)
 240  
 241  	// Convert relative duration to absolute deadline so the wait loop
 242  	// can compare against nanotime() correctly.
 243  	if d > 0 {
 244  		d = nanotime() + d
 245  	}
 246  
 247  	if mode == 'r' || mode == 'r'+'w' {
 248  		pd.rd = d
 249  		if d < 0 && pd.rg != nil {
 250  			t := pd.rg
 251  			pd.rg = nil
 252  			scheduleTask(t)
 253  		}
 254  	}
 255  	if mode == 'w' || mode == 'r'+'w' {
 256  		pd.wd = d
 257  		if d < 0 && pd.wg != nil {
 258  			t := pd.wg
 259  			pd.wg = nil
 260  			scheduleTask(t)
 261  		}
 262  	}
 263  }
 264  
 265  //go:linkname poll_runtime_pollUnblock internal/poll.runtime_pollUnblock
 266  func poll_runtime_pollUnblock(ctx uintptr) {
 267  	pd := pdLookup(ctx)
 268  	pd.closing = true
 269  	if pd.rg != nil {
 270  		t := pd.rg
 271  		pd.rg = nil
 272  		scheduleTask(t)
 273  	}
 274  	if pd.wg != nil {
 275  		t := pd.wg
 276  		pd.wg = nil
 277  		scheduleTask(t)
 278  	}
 279  }
 280  
 281  //go:linkname poll_runtime_isPollServerDescriptor internal/poll.runtime_isPollServerDescriptor
 282  func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
 283  	return int32(fd) == epfd
 284  }
 285  
 286  // netpollCheck does a non-blocking epoll_wait and wakes ready goroutines.
 287  func netpollCheck() {
 288  	if !hasPoll {
 289  		return
 290  	}
 291  	var events [64]epollEvent
 292  	n := libc_epoll_wait(epfd, &events[0], 64, 0)
 293  	netpollProcess(events[:], int(n))
 294  }
 295  
 296  // netpollBlock does a blocking epoll_wait with timeout (ms).
 297  func netpollBlock(timeoutMs int) {
 298  	if !hasPoll {
 299  		return
 300  	}
 301  	var events [64]epollEvent
 302  	n := libc_epoll_wait(epfd, &events[0], 64, int32(timeoutMs))
 303  	netpollProcess(events[:], int(n))
 304  }
 305  
 306  func netpollProcess(events []epollEvent, n int) {
 307  	for i := 0; i < n; i++ {
 308  		ev := &events[i]
 309  		pd := netpollFindPD(ev.fd)
 310  		if pd == nil {
 311  			continue
 312  		}
 313  		if ev.events&(_EPOLLIN|_EPOLLHUP|_EPOLLERR) != 0 {
 314  			pd.rready = true
 315  			if pd.rg != nil {
 316  				t := pd.rg
 317  				pd.rg = nil
 318  				scheduleTask(t)
 319  			}
 320  		}
 321  		if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
 322  			pd.wready = true
 323  			if pd.wg != nil {
 324  				t := pd.wg
 325  				pd.wg = nil
 326  				scheduleTask(t)
 327  			}
 328  		}
 329  	}
 330  }
 331  
 332  func netpollFindPD(fd int32) *pollDesc {
 333  	for i := 0; i < pdCount; i++ {
 334  		if pdAlloc[i].fd == fd {
 335  			return &pdAlloc[i]
 336  		}
 337  	}
 338  	return nil
 339  }
 340