waiter.go raw

   1  // Copyright 2018 The gVisor Authors.
   2  //
   3  // Licensed under the Apache License, Version 2.0 (the "License");
   4  // you may not use this file except in compliance with the License.
   5  // You may obtain a copy of the License at
   6  //
   7  //     http://www.apache.org/licenses/LICENSE-2.0
   8  //
   9  // Unless required by applicable law or agreed to in writing, software
  10  // distributed under the License is distributed on an "AS IS" BASIS,
  11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12  // See the License for the specific language governing permissions and
  13  // limitations under the License.
  14  
  15  // Package waiter provides the implementation of a wait queue, where waiters can
  16  // be enqueued to be notified when an event of interest happens.
  17  //
  18  // Becoming readable and/or writable are examples of events. Waiters are
  19  // expected to use a pattern similar to this to make a blocking function out of
  20  // a non-blocking one:
  21  //
  22  //	func (o *object) blockingRead(...) error {
  23  //		err := o.nonBlockingRead(...)
  24  //		if err != ErrAgain {
  25  //			// Completed with no need to wait!
  26  //			return err
  27  //		}
  28  //
  29  //		e := createOrGetWaiterEntry(...)
  30  //		o.EventRegister(&e, waiter.EventIn)
  31  //		defer o.EventUnregister(&e)
  32  //
  33  //		// We need to try to read again after registration because the
  34  //		// object may have become readable between the last attempt to
  35  //		// read and read registration.
  36  //		err = o.nonBlockingRead(...)
  37  //		for err == ErrAgain {
  38  //			wait()
  39  //			err = o.nonBlockingRead(...)
  40  //		}
  41  //
  42  //		return err
  43  //	}
  44  //
  45  // Another goroutine needs to notify waiters when events happen. For example:
  46  //
  47  //	func (o *object) Write(...) ... {
  48  //		// Do write work.
  49  //		[...]
  50  //
  51  //		if oldDataAvailableSize == 0 && dataAvailableSize > 0 {
  52  //			// If no data was available and now some data is
  53  //			// available, the object became readable, so notify
  54  //			// potential waiters about this.
  55  //			o.Notify(waiter.EventIn)
  56  //		}
  57  //	}
  58  package waiter
  59  
  60  import (
  61  	"gvisor.dev/gvisor/pkg/sync"
  62  )
  63  
  64  // EventMask represents io events as used in the poll() syscall.
  65  type EventMask uint64
  66  
  67  // Events that waiters can wait on. The meaning is the same as those in the
  68  // poll() syscall.
  69  const (
  70  	EventIn       EventMask = 0x01   // POLLIN
  71  	EventPri      EventMask = 0x02   // POLLPRI
  72  	EventOut      EventMask = 0x04   // POLLOUT
  73  	EventErr      EventMask = 0x08   // POLLERR
  74  	EventHUp      EventMask = 0x10   // POLLHUP
  75  	EventRdNorm   EventMask = 0x0040 // POLLRDNORM
  76  	EventWrNorm   EventMask = 0x0100 // POLLWRNORM
  77  	EventInternal EventMask = 0x1000
  78  	EventRdHUp    EventMask = 0x2000 // POLLRDHUP
  79  
  80  	AllEvents      EventMask = 0x1f | EventRdNorm | EventWrNorm | EventRdHUp
  81  	ReadableEvents EventMask = EventIn | EventRdNorm
  82  	WritableEvents EventMask = EventOut | EventWrNorm
  83  )
  84  
  85  // EventMaskFromLinux returns an EventMask representing the supported events
  86  // from the Linux events e, which is in the format used by poll(2).
  87  func EventMaskFromLinux(e uint32) EventMask {
  88  	// Our flag definitions are currently identical to Linux.
  89  	return EventMask(e) & AllEvents
  90  }
  91  
  92  // ToLinux returns e in the format used by Linux poll(2).
  93  func (e EventMask) ToLinux() uint32 {
  94  	// Our flag definitions are currently identical to Linux.
  95  	return uint32(e)
  96  }
  97  
  98  // Waitable contains the methods that need to be implemented by waitable
  99  // objects.
 100  type Waitable interface {
 101  	// Readiness returns what the object is currently ready for. If it's
 102  	// not ready for a desired purpose, the caller may use EventRegister and
 103  	// EventUnregister to get notifications once the object becomes ready.
 104  	//
 105  	// Implementations should allow for events like EventHUp and EventErr
 106  	// to be returned regardless of whether they are in the input EventMask.
 107  	Readiness(mask EventMask) EventMask
 108  
 109  	// EventRegister registers the given waiter entry to receive
 110  	// notifications when an event occurs that makes the object ready for
 111  	// at least one of the events in mask.
 112  	EventRegister(e *Entry) error
 113  
 114  	// EventUnregister unregisters a waiter entry previously registered with
 115  	// EventRegister().
 116  	EventUnregister(e *Entry)
 117  }
 118  
 119  // EventListener provides a notify callback.
 120  type EventListener interface {
 121  	// NotifyEvent is the function to be called when the waiter entry is
 122  	// notified. It is responsible for doing whatever is needed to wake up
 123  	// the waiter.
 124  	//
 125  	// The callback is supposed to perform minimal work, and cannot call
 126  	// any method on the queue itself because it will be locked while the
 127  	// callback is running.
 128  	//
 129  	// The mask indicates the events that occurred and that the entry is
 130  	// interested in.
 131  	NotifyEvent(mask EventMask)
 132  }
 133  
 134  // Entry represents a waiter that can be add to the a wait queue. It can
 135  // only be in one queue at a time, and is added "intrusively" to the queue with
 136  // no extra memory allocations.
 137  //
 138  // +stateify savable
 139  type Entry struct {
 140  	waiterEntry
 141  
 142  	// eventListener receives the notification.
 143  	eventListener EventListener
 144  
 145  	// mask should be immutable once queued.
 146  	mask EventMask
 147  }
 148  
 149  // Init initializes the Entry.
 150  //
 151  // This must only be called when unregistered.
 152  func (e *Entry) Init(eventListener EventListener, mask EventMask) {
 153  	e.eventListener = eventListener
 154  	e.mask = mask
 155  }
 156  
 157  // Mask returns the entry mask.
 158  func (e *Entry) Mask() EventMask {
 159  	return e.mask
 160  }
 161  
 162  // NotifyEvent notifies the event listener.
 163  //
 164  // Mask should be the full set of active events.
 165  func (e *Entry) NotifyEvent(mask EventMask) {
 166  	if m := mask & e.mask; m != 0 {
 167  		e.eventListener.NotifyEvent(m)
 168  	}
 169  }
 170  
 171  // ChannelNotifier is a simple channel-based notification.
 172  type ChannelNotifier chan struct{}
 173  
 174  // NotifyEvent implements waiter.EventListener.NotifyEvent.
 175  func (c ChannelNotifier) NotifyEvent(EventMask) {
 176  	select {
 177  	case c <- struct{}{}:
 178  	default:
 179  	}
 180  }
 181  
 182  // NewChannelEntry initializes a new Entry that does a non-blocking write to a
 183  // struct{} channel when the callback is called. It returns the new Entry
 184  // instance and the channel being used.
 185  func NewChannelEntry(mask EventMask) (e Entry, ch chan struct{}) {
 186  	ch = make(chan struct{}, 1)
 187  	e.Init(ChannelNotifier(ch), mask)
 188  	return e, ch
 189  }
 190  
 191  type functionNotifier func(EventMask)
 192  
 193  // NotifyEvent implements waiter.EventListener.NotifyEvent.
 194  func (f functionNotifier) NotifyEvent(mask EventMask) {
 195  	f(mask)
 196  }
 197  
 198  // NewFunctionEntry initializes a new Entry that calls the given function.
 199  func NewFunctionEntry(mask EventMask, fn func(EventMask)) (e Entry) {
 200  	e.Init(functionNotifier(fn), mask)
 201  	return e
 202  }
 203  
 204  // Queue represents the wait queue where waiters can be added and
 205  // notifiers can notify them when events happen.
 206  //
 207  // The zero value for waiter.Queue is an empty queue ready for use.
 208  //
 209  // +stateify savable
 210  type Queue struct {
 211  	list waiterList
 212  	mu   sync.RWMutex `state:"nosave"`
 213  }
 214  
 215  // EventRegister adds a waiter to the wait queue.
 216  func (q *Queue) EventRegister(e *Entry) {
 217  	q.mu.Lock()
 218  	q.list.PushBack(e)
 219  	q.mu.Unlock()
 220  }
 221  
 222  // EventUnregister removes the given waiter entry from the wait queue.
 223  func (q *Queue) EventUnregister(e *Entry) {
 224  	q.mu.Lock()
 225  	q.list.Remove(e)
 226  	q.mu.Unlock()
 227  }
 228  
 229  // Notify notifies all waiters in the queue whose masks have at least one bit
 230  // in common with the notification mask.
 231  func (q *Queue) Notify(mask EventMask) {
 232  	q.mu.RLock()
 233  	for e := q.list.Front(); e != nil; e = e.Next() {
 234  		m := mask & e.mask
 235  		if m == 0 {
 236  			continue
 237  		}
 238  		e.eventListener.NotifyEvent(m) // Skip intermediate call.
 239  	}
 240  	q.mu.RUnlock()
 241  }
 242  
 243  // Events returns the set of events being waited on. It is the union of the
 244  // masks of all registered entries.
 245  func (q *Queue) Events() EventMask {
 246  	q.mu.RLock()
 247  	defer q.mu.RUnlock()
 248  	ret := EventMask(0)
 249  	for e := q.list.Front(); e != nil; e = e.Next() {
 250  		ret |= e.mask
 251  	}
 252  	return ret
 253  }
 254  
 255  // IsEmpty returns if the wait queue is empty or not.
 256  func (q *Queue) IsEmpty() bool {
 257  	q.mu.RLock()
 258  	defer q.mu.RUnlock()
 259  	return q.list.Front() == nil
 260  }
 261  
 262  // NeverReady implements the Waitable interface but is never ready. Otherwise,
 263  // this is exactly the same as AlwaysReady.
 264  type NeverReady struct {
 265  }
 266  
 267  // Readiness always returns 0 because this object is never ready.
 268  func (*NeverReady) Readiness(EventMask) EventMask {
 269  	return 0
 270  }
 271  
 272  // EventRegister doesn't do anything because this object doesn't need to issue
 273  // notifications because its readiness never changes.
 274  func (*NeverReady) EventRegister(*Entry) error {
 275  	return nil
 276  }
 277  
 278  // EventUnregister doesn't do anything because this object doesn't need to issue
 279  // notifications because its readiness never changes.
 280  func (*NeverReady) EventUnregister(*Entry) {
 281  }
 282