sleep_unsafe.go raw

   1  // Copyright 2018 The gVisor Authors.
   2  //
   3  // Licensed under the Apache License, Version 2.0 (the "License");
   4  // you may not use this file except in compliance with the License.
   5  // You may obtain a copy of the License at
   6  //
   7  //     http://www.apache.org/licenses/LICENSE-2.0
   8  //
   9  // Unless required by applicable law or agreed to in writing, software
  10  // distributed under the License is distributed on an "AS IS" BASIS,
  11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12  // See the License for the specific language governing permissions and
  13  // limitations under the License.
  14  
  15  // Package sleep allows goroutines to efficiently sleep on multiple sources of
  16  // notifications (wakers). It offers O(1) complexity, which is different from
  17  // multi-channel selects which have O(n) complexity (where n is the number of
  18  // channels) and a considerable constant factor.
  19  //
  20  // It is similar to edge-triggered epoll waits, where the user registers each
  21  // object of interest once, and then can repeatedly wait on all of them.
  22  //
  23  // A Waker object is used to wake a sleeping goroutine (G) up, or prevent it
  24  // from going to sleep next. A Sleeper object is used to receive notifications
  25  // from wakers, and if no notifications are available, to optionally sleep until
  26  // one becomes available.
  27  //
  28  // A Waker can be associated with at most one Sleeper, but a Sleeper can be
  29  // associated with multiple Wakers. A Sleeper has a list of asserted (ready)
  30  // wakers; when Fetch() is called repeatedly, elements from this list are
  31  // returned until the list becomes empty in which case the goroutine goes to
  32  // sleep. When Assert() is called on a Waker, it adds itself to the Sleeper's
  33  // asserted list and wakes the G up from its sleep if needed.
  34  //
  35  // Sleeper objects are expected to be used as follows, with just one goroutine
  36  // executing this code:
  37  //
  38  //	// One time set-up.
  39  //	s := sleep.Sleeper{}
  40  //	s.AddWaker(&w1)
  41  //	s.AddWaker(&w2)
  42  //
  43  //	// Called repeatedly.
  44  //	for {
  45  //		switch s.Fetch(true) {
  46  //		case &w1:
  47  //			// Do work triggered by w1 being asserted.
  48  //		case &w2:
  49  //			// Do work triggered by w2 being asserted.
  50  //		}
  51  //	}
  52  //
  53  // And Waker objects are expected to call w.Assert() when they want the sleeper
  54  // to wake up and perform work.
  55  //
  56  // The notifications are edge-triggered, which means that if a Waker calls
  57  // Assert() several times before the sleeper has the chance to wake up, it will
  58  // only be notified once and should perform all pending work (alternatively, it
  59  // can also call Assert() on the waker, to ensure that it will wake up again).
  60  //
  61  // The "unsafeness" here is in the casts to/from unsafe.Pointer, which is safe
  62  // when only one type is used for each unsafe.Pointer (which is the case here),
  63  // we should just make sure that this remains the case in the future. The usage
  64  // of unsafe package could be confined to sharedWaker and sharedSleeper types
  65  // that would hold pointers in atomic.Pointers, but the go compiler currently
  66  // can't optimize these as well (it won't inline their method calls), which
  67  // reduces performance.
  68  package sleep
  69  
  70  import (
  71  	"context"
  72  	"sync/atomic"
  73  	"unsafe"
  74  
  75  	"gvisor.dev/gvisor/pkg/sync"
  76  )
  77  
  78  const (
  79  	// preparingG is stored in sleepers to indicate that they're preparing
  80  	// to sleep.
  81  	preparingG = 1
  82  )
  83  
  84  var (
  85  	// assertedSleeper is a sentinel sleeper. A pointer to it is stored in
  86  	// wakers that are asserted.
  87  	assertedSleeper Sleeper
  88  )
  89  
  90  // Sleeper allows a goroutine to sleep and receive wake up notifications from
  91  // Wakers in an efficient way.
  92  //
  93  // This is similar to edge-triggered epoll in that wakers are added to the
  94  // sleeper once and the sleeper can then repeatedly sleep in O(1) time while
  95  // waiting on all wakers.
  96  //
  97  // None of the methods in a Sleeper can be called concurrently. Wakers that have
  98  // been added to a sleeper A can only be added to another sleeper after A.Done()
  99  // returns. These restrictions allow this to be implemented lock-free.
 100  //
 101  // This struct is thread-compatible.
 102  //
 103  // +stateify savable
 104  type Sleeper struct {
 105  	_ sync.NoCopy
 106  
 107  	// sharedList is a "stack" of asserted wakers. They atomically add
 108  	// themselves to the front of this list as they become asserted.
 109  	sharedList unsafe.Pointer `state:".(*Waker)"`
 110  
 111  	// localList is a list of asserted wakers that is only accessible to the
 112  	// waiter, and thus doesn't have to be accessed atomically. When
 113  	// fetching more wakers, the waiter will first go through this list, and
 114  	// only  when it's empty will it atomically fetch wakers from
 115  	// sharedList.
 116  	localList *Waker
 117  
 118  	// allWakers is a list with all wakers that have been added to this
 119  	// sleeper. It is used during cleanup to remove associations.
 120  	allWakers *Waker
 121  
 122  	// waitingG holds the G that is sleeping, if any. It is used by wakers
 123  	// to determine which G, if any, they should wake.
 124  	waitingG uintptr `state:"zero"`
 125  }
 126  
 127  // saveSharedList is invoked by stateify.
 128  func (s *Sleeper) saveSharedList() *Waker {
 129  	return (*Waker)(atomic.LoadPointer(&s.sharedList))
 130  }
 131  
 132  // loadSharedList is invoked by stateify.
 133  func (s *Sleeper) loadSharedList(_ context.Context, w *Waker) {
 134  	atomic.StorePointer(&s.sharedList, unsafe.Pointer(w))
 135  }
 136  
 137  // AddWaker associates the given waker to the sleeper.
 138  func (s *Sleeper) AddWaker(w *Waker) {
 139  	if w.allWakersNext != nil {
 140  		panic("waker has non-nil allWakersNext; owned by another sleeper?")
 141  	}
 142  	if w.next != nil {
 143  		panic("waker has non-nil next; queued in another sleeper?")
 144  	}
 145  
 146  	// Add the waker to the list of all wakers.
 147  	w.allWakersNext = s.allWakers
 148  	s.allWakers = w
 149  
 150  	// Try to associate the waker with the sleeper. If it's already
 151  	// asserted, we simply enqueue it in the "ready" list.
 152  	for {
 153  		p := (*Sleeper)(atomic.LoadPointer(&w.s))
 154  		if p == &assertedSleeper {
 155  			s.enqueueAssertedWaker(w, true /* wakep */)
 156  			return
 157  		}
 158  
 159  		if atomic.CompareAndSwapPointer(&w.s, usleeper(p), usleeper(s)) {
 160  			return
 161  		}
 162  	}
 163  }
 164  
 165  // nextWaker returns the next waker in the notification list, blocking if
 166  // needed. The parameter wakepOrSleep indicates that if the operation does not
 167  // block, then we will need to explicitly wake a runtime P.
 168  //
 169  // Precondition: wakepOrSleep may be true iff block is true.
 170  //
 171  //go:nosplit
 172  func (s *Sleeper) nextWaker(block, wakepOrSleep bool) *Waker {
 173  	// Attempt to replenish the local list if it's currently empty.
 174  	if s.localList == nil {
 175  		for atomic.LoadPointer(&s.sharedList) == nil {
 176  			// Fail request if caller requested that we
 177  			// don't block.
 178  			if !block {
 179  				return nil
 180  			}
 181  
 182  			// Indicate to wakers that we're about to sleep,
 183  			// this allows them to abort the wait by setting
 184  			// waitingG back to zero (which we'll notice
 185  			// before committing the sleep).
 186  			atomic.StoreUintptr(&s.waitingG, preparingG)
 187  
 188  			// Check if something was queued while we were
 189  			// preparing to sleep. We need this interleaving
 190  			// to avoid missing wake ups.
 191  			if atomic.LoadPointer(&s.sharedList) != nil {
 192  				atomic.StoreUintptr(&s.waitingG, 0)
 193  				break
 194  			}
 195  
 196  			// Since we are sleeping for sure, we no longer
 197  			// need to wakep once we get a value.
 198  			wakepOrSleep = false
 199  
 200  			// Try to commit the sleep and report it to the
 201  			// tracer as a select.
 202  			//
 203  			// gopark puts the caller to sleep and calls
 204  			// commitSleep to decide whether to immediately
 205  			// wake the caller up or to leave it sleeping.
 206  			const traceEvGoBlockSelect = 24
 207  			// See:runtime2.go in the go runtime package for
 208  			// the values to pass as the waitReason here.
 209  			const waitReasonSelect = 9
 210  			sync.Gopark(commitSleep, unsafe.Pointer(&s.waitingG), sync.WaitReasonSelect, sync.TraceBlockSelect, 0)
 211  		}
 212  
 213  		// Pull the shared list out and reverse it in the local
 214  		// list. Given that wakers push themselves in reverse
 215  		// order, we fix things here.
 216  		v := (*Waker)(atomic.SwapPointer(&s.sharedList, nil))
 217  		for v != nil {
 218  			cur := v
 219  			v = v.next
 220  
 221  			cur.next = s.localList
 222  			s.localList = cur
 223  		}
 224  	}
 225  
 226  	// Remove the waker in the front of the list.
 227  	w := s.localList
 228  	s.localList = w.next
 229  
 230  	// Do we need to wake a P?
 231  	if wakepOrSleep {
 232  		sync.Wakep()
 233  	}
 234  
 235  	return w
 236  }
 237  
 238  // commitSleep signals to wakers that the given g is now sleeping. Wakers can
 239  // then fetch it and wake it.
 240  //
 241  // The commit may fail if wakers have been asserted after our last check, in
 242  // which case they will have set s.waitingG to zero.
 243  //
 244  //go:norace
 245  //go:nosplit
 246  func commitSleep(g uintptr, waitingG unsafe.Pointer) bool {
 247  	return sync.RaceUncheckedAtomicCompareAndSwapUintptr((*uintptr)(waitingG), preparingG, g)
 248  }
 249  
 250  // fetch is the backing implementation for Fetch and AssertAndFetch.
 251  //
 252  // Preconditions are the same as nextWaker.
 253  //
 254  //go:nosplit
 255  func (s *Sleeper) fetch(block, wakepOrSleep bool) *Waker {
 256  	for {
 257  		w := s.nextWaker(block, wakepOrSleep)
 258  		if w == nil {
 259  			return nil
 260  		}
 261  
 262  		// Reassociate the waker with the sleeper. If the waker was
 263  		// still asserted we can return it, otherwise try the next one.
 264  		old := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(s)))
 265  		if old == &assertedSleeper {
 266  			return w
 267  		}
 268  	}
 269  }
 270  
 271  // Fetch fetches the next wake-up notification. If a notification is
 272  // immediately available, the asserted waker is returned immediately.
 273  // Otherwise, the behavior depends on the value of 'block': if true, the
 274  // current goroutine blocks until a notification arrives and returns the
 275  // asserted waker; if false, nil will be returned.
 276  //
 277  // N.B. This method is *not* thread-safe. Only one goroutine at a time is
 278  // allowed to call this method.
 279  func (s *Sleeper) Fetch(block bool) *Waker {
 280  	return s.fetch(block, false /* wakepOrSleep */)
 281  }
 282  
 283  // AssertAndFetch asserts the given waker and fetches the next wake-up notification.
 284  // Note that this will always be blocking, since there is no value in joining a
 285  // non-blocking operation.
 286  //
 287  // N.B. Like Fetch, this method is *not* thread-safe. This will also yield the current
 288  // P to the next goroutine, avoiding associated scheduled overhead.
 289  //
 290  // +checkescape:all
 291  //
 292  //go:nosplit
 293  func (s *Sleeper) AssertAndFetch(n *Waker) *Waker {
 294  	n.assert(false /* wakep */)
 295  	return s.fetch(true /* block */, true /* wakepOrSleep*/)
 296  }
 297  
 298  // Done is used to indicate that the caller won't use this Sleeper anymore. It
 299  // removes the association with all wakers so that they can be safely reused
 300  // by another sleeper after Done() returns.
 301  func (s *Sleeper) Done() {
 302  	// Remove all associations that we can, and build a list of the ones we
 303  	// could not. An association can be removed right away from waker w if
 304  	// w.s has a pointer to the sleeper, that is, the waker is not asserted
 305  	// yet. By atomically switching w.s to nil, we guarantee that
 306  	// subsequent calls to Assert() on the waker will not result in it
 307  	// being queued.
 308  	for w := s.allWakers; w != nil; w = s.allWakers {
 309  		next := w.allWakersNext // Before zapping.
 310  		if atomic.CompareAndSwapPointer(&w.s, usleeper(s), nil) {
 311  			w.allWakersNext = nil
 312  			w.next = nil
 313  			s.allWakers = next // Move ahead.
 314  			continue
 315  		}
 316  
 317  		// Dequeue exactly one waiter from the list, it may not be
 318  		// this one but we know this one is in the process. We must
 319  		// leave it in the asserted state but drop it from our lists.
 320  		if w := s.nextWaker(true, false); w != nil {
 321  			prev := &s.allWakers
 322  			for *prev != w {
 323  				prev = &((*prev).allWakersNext)
 324  			}
 325  			*prev = (*prev).allWakersNext
 326  			w.allWakersNext = nil
 327  			w.next = nil
 328  		}
 329  	}
 330  }
 331  
 332  // enqueueAssertedWaker enqueues an asserted waker to the "ready" circular list
 333  // of wakers that want to notify the sleeper.
 334  //
 335  //go:nosplit
 336  func (s *Sleeper) enqueueAssertedWaker(w *Waker, wakep bool) {
 337  	// Add the new waker to the front of the list.
 338  	for {
 339  		v := (*Waker)(atomic.LoadPointer(&s.sharedList))
 340  		w.next = v
 341  		if atomic.CompareAndSwapPointer(&s.sharedList, uwaker(v), uwaker(w)) {
 342  			break
 343  		}
 344  	}
 345  
 346  	// Nothing to do if there isn't a G waiting.
 347  	if atomic.LoadUintptr(&s.waitingG) == 0 {
 348  		return
 349  	}
 350  
 351  	// Signal to the sleeper that a waker has been asserted.
 352  	switch g := atomic.SwapUintptr(&s.waitingG, 0); g {
 353  	case 0, preparingG:
 354  	default:
 355  		// We managed to get a G. Wake it up.
 356  		sync.Goready(g, 0, wakep)
 357  	}
 358  }
 359  
 360  // Waker represents a source of wake-up notifications to be sent to sleepers. A
 361  // waker can be associated with at most one sleeper at a time, and at any given
 362  // time is either in asserted or non-asserted state.
 363  //
 364  // Once asserted, the waker remains so until it is manually cleared or a sleeper
 365  // consumes its assertion (i.e., a sleeper wakes up or is prevented from going
 366  // to sleep due to the waker).
 367  //
 368  // This struct is thread-safe, that is, its methods can be called concurrently
 369  // by multiple goroutines.
 370  //
 371  // Note, it is not safe to copy a Waker as its fields are modified by value
 372  // (the pointer fields are individually modified with atomic operations).
 373  //
 374  // +stateify savable
 375  type Waker struct {
 376  	_ sync.NoCopy
 377  
 378  	// s is the sleeper that this waker can wake up. Only one sleeper at a
 379  	// time is allowed. This field can have three classes of values:
 380  	// nil -- the waker is not asserted: it either is not associated with
 381  	//     a sleeper, or is queued to a sleeper due to being previously
 382  	//     asserted. This is the zero value.
 383  	// &assertedSleeper -- the waker is asserted.
 384  	// otherwise -- the waker is not asserted, and is associated with the
 385  	//     given sleeper. Once it transitions to asserted state, the
 386  	//     associated sleeper will be woken.
 387  	s unsafe.Pointer `state:".(wakerState)"`
 388  
 389  	// next is used to form a linked list of asserted wakers in a sleeper.
 390  	next *Waker
 391  
 392  	// allWakersNext is used to form a linked list of all wakers associated
 393  	// to a given sleeper.
 394  	allWakersNext *Waker
 395  }
 396  
 397  // +stateify savable
 398  type wakerState struct {
 399  	asserted bool
 400  	other    *Sleeper
 401  }
 402  
 403  // saveS is invoked by stateify.
 404  func (w *Waker) saveS() wakerState {
 405  	s := (*Sleeper)(atomic.LoadPointer(&w.s))
 406  	if s == &assertedSleeper {
 407  		return wakerState{asserted: true}
 408  	}
 409  	return wakerState{other: s}
 410  }
 411  
 412  // loadS is invoked by stateify.
 413  func (w *Waker) loadS(_ context.Context, ws wakerState) {
 414  	if ws.asserted {
 415  		atomic.StorePointer(&w.s, unsafe.Pointer(&assertedSleeper))
 416  	} else {
 417  		atomic.StorePointer(&w.s, unsafe.Pointer(ws.other))
 418  	}
 419  }
 420  
 421  // assert is the implementation for Assert.
 422  //
 423  //go:nosplit
 424  func (w *Waker) assert(wakep bool) {
 425  	// Nothing to do if the waker is already asserted. This check allows us
 426  	// to complete this case (already asserted) without any interlocked
 427  	// operations on x86.
 428  	if atomic.LoadPointer(&w.s) == usleeper(&assertedSleeper) {
 429  		return
 430  	}
 431  
 432  	// Mark the waker as asserted, and wake up a sleeper if there is one.
 433  	switch s := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(&assertedSleeper))); s {
 434  	case nil:
 435  	case &assertedSleeper:
 436  	default:
 437  		s.enqueueAssertedWaker(w, wakep)
 438  	}
 439  }
 440  
 441  // Assert moves the waker to an asserted state, if it isn't asserted yet. When
 442  // asserted, the waker will cause its matching sleeper to wake up.
 443  func (w *Waker) Assert() {
 444  	w.assert(true /* wakep */)
 445  }
 446  
 447  // Clear moves the waker to then non-asserted state and returns whether it was
 448  // asserted before being cleared.
 449  //
 450  // N.B. The waker isn't removed from the "ready" list of a sleeper (if it
 451  // happens to be in one), but the sleeper will notice that it is not asserted
 452  // anymore and won't return it to the caller.
 453  func (w *Waker) Clear() bool {
 454  	// Nothing to do if the waker is not asserted. This check allows us to
 455  	// complete this case (already not asserted) without any interlocked
 456  	// operations on x86.
 457  	if atomic.LoadPointer(&w.s) != usleeper(&assertedSleeper) {
 458  		return false
 459  	}
 460  
 461  	// Try to store nil in the sleeper, which indicates that the waker is
 462  	// not asserted.
 463  	return atomic.CompareAndSwapPointer(&w.s, usleeper(&assertedSleeper), nil)
 464  }
 465  
 466  // IsAsserted returns whether the waker is currently asserted (i.e., if it's
 467  // currently in a state that would cause its matching sleeper to wake up).
 468  func (w *Waker) IsAsserted() bool {
 469  	return (*Sleeper)(atomic.LoadPointer(&w.s)) == &assertedSleeper
 470  }
 471  
 472  func usleeper(s *Sleeper) unsafe.Pointer {
 473  	return unsafe.Pointer(s)
 474  }
 475  
 476  func uwaker(w *Waker) unsafe.Pointer {
 477  	return unsafe.Pointer(w)
 478  }
 479