chan.mx raw

   1  package runtime
   2  
   3  // This file implements the 'chan' type and send/receive/select operations.
   4  //
   5  // Every channel has a list of senders and a list of receivers, and possibly a
   6  // queue. There is no 'channel state', the state is inferred from the available
   7  // senders/receivers and values in the buffer.
   8  //
   9  // - A sender will first try to send the value to a waiting receiver if there is
  10  //   one, but only if there is nothing in the queue (to keep the values flowing
  11  //   in the correct order). If it can't, it will add the value in the queue and
  12  //   possibly wait as a sender if there's no space available.
  13  // - A receiver will first try to read a value from the queue, but if there is
  14  //   none it will try to read from a sender in the list. It will block if it
  15  //   can't proceed.
  16  //
  17  // State is kept in various ways:
  18  //
  19  // - The sender value is stored in the sender 'channelOp', which is really a
  20  //   queue entry. This works for both senders and select operations: a select
  21  //   operation has a separate value to send for each case.
  22  // - The receiver value is stored inside Task.Ptr. This works for receivers, and
  23  //   importantly also works for select which has a single buffer for every
  24  //   receive operation.
  25  // - The `Task.Data` value stores how the channel operation proceeded. For
  26  //   normal send/receive operations, it starts at chanOperationWaiting and then
  27  //   is changed to chanOperationOk or chanOperationClosed depending on whether
  28  //   the send/receive proceeded normally or because it was closed. For a select
  29  //   operation, it also stores the 'case' index in the upper bits (zero for
  30  //   non-select operations) so that the select operation knows which case did
  31  //   proceed.
  32  //   The value is at the same time also a way that goroutines can be the first
  33  //   (and only) goroutine to 'take' a channel operation using an atomic CAS
  34  //   operation to change it from 'waiting' to any other value. This is important
  35  //   for the select statement because multiple goroutines could try to let
  36  //   different channels in the select statement proceed at the same time. By
  37  //   using Task.Data, only a single channel operation in the select statement
  38  //   can proceed.
  39  // - It is possible for the channel queues to contain already-processed senders
  40  //   or receivers. This can happen when the select statement managed to proceed
  41  //   but the goroutine doing the select has not yet cleaned up the stale queue
  42  //   entries before returning. This should therefore only happen for a short
  43  //   period.
  44  
  45  import (
  46  	"internal/task"
  47  	"runtime/interrupt"
  48  	"unsafe"
  49  )
  50  
  51  // The runtime implementation of the Go 'chan' type.
  52  type channel struct {
  53  	closed       bool
  54  	selectLocked bool
  55  	elementSize  uintptr
  56  	bufCap       uintptr // 'cap'
  57  	bufLen       uintptr // 'len'
  58  	bufHead      uintptr
  59  	bufTail      uintptr
  60  	senders      chanQueue
  61  	receivers    chanQueue
  62  	lock         task.PMutex
  63  	buf          unsafe.Pointer
  64  }
  65  
  66  const (
  67  	chanOperationWaiting = 0b00 // waiting for a send/receive operation to continue
  68  	chanOperationOk      = 0b01 // successfully sent or received (not closed)
  69  	chanOperationClosed  = 0b10 // channel was closed, the value has been zeroed
  70  	chanOperationMask    = 0b11
  71  )
  72  
  73  type chanQueue struct {
  74  	first *channelOp
  75  }
  76  
  77  // Pus the next channel operation to the queue. All appropriate fields must have
  78  // been initialized already.
  79  // This function must be called with interrupts disabled and the channel lock
  80  // held.
  81  func (q *chanQueue) push(node *channelOp) {
  82  	node.next = q.first
  83  	q.first = node
  84  }
  85  
  86  // Pop the next waiting channel from the queue. Channels that are no longer
  87  // waiting (for example, when they're part of a select operation) will be
  88  // skipped.
  89  // This function must be called with interrupts disabled.
  90  func (q *chanQueue) pop(chanOp uint32) *channelOp {
  91  	for {
  92  		if q.first == nil {
  93  			return nil
  94  		}
  95  
  96  		// Pop next from the queue.
  97  		popped := q.first
  98  		q.first = q.first.next
  99  
 100  		// The new value for the 'data' field will be a combination of the
 101  		// channel operation and the select index. (The select index is 0 for
 102  		// non-select channel operations).
 103  		newDataValue := chanOp | popped.index<<2
 104  
 105  		// Try to be the first to proceed with this goroutine.
 106  		swapped := popped.task.DataAtomicUint32().CompareAndSwap(0, newDataValue)
 107  		if swapped {
 108  			return popped
 109  		}
 110  	}
 111  }
 112  
 113  // Remove the given to-be-removed node from the queue if it is part of the
 114  // queue. If there are multiple, only one will be removed.
 115  // This function must be called with interrupts disabled and the channel lock
 116  // held.
 117  func (q *chanQueue) remove(remove *channelOp) {
 118  	n := &q.first
 119  	for *n != nil {
 120  		if *n == remove {
 121  			*n = (*n).next
 122  			return
 123  		}
 124  		n = &((*n).next)
 125  	}
 126  }
 127  
 128  type channelOp struct {
 129  	next  *channelOp
 130  	task  *task.Task
 131  	index uint32         // select index, 0 for non-select operation
 132  	value unsafe.Pointer // if this is a sender, this is the value to send
 133  }
 134  
 135  type chanSelectState struct {
 136  	ch    *channel
 137  	value unsafe.Pointer
 138  }
 139  
 140  func chanMake(elementSize uintptr, bufSize uintptr) *channel {
 141  	return &channel{
 142  		elementSize: elementSize,
 143  		bufCap:      bufSize,
 144  		buf:         alloc(elementSize*bufSize, nil),
 145  	}
 146  }
 147  
 148  // Return the number of entries in this chan, called from the len builtin.
 149  // A nil chan is defined as having length 0.
 150  func chanLen(c *channel) int {
 151  	if c == nil {
 152  		return 0
 153  	}
 154  	return int(c.bufLen)
 155  }
 156  
 157  // Return the capacity of this chan, called from the cap builtin.
 158  // A nil chan is defined as having capacity 0.
 159  func chanCap(c *channel) int {
 160  	if c == nil {
 161  		return 0
 162  	}
 163  	return int(c.bufCap)
 164  }
 165  
 166  // Push the value to the channel buffer array, for a send operation.
 167  // This function may only be called when interrupts are disabled, the channel is
 168  // locked and it is known there is space available in the buffer.
 169  func (ch *channel) bufferPush(value unsafe.Pointer) {
 170  	elemAddr := unsafe.Add(ch.buf, ch.bufHead*ch.elementSize)
 171  	ch.bufLen++
 172  	ch.bufHead++
 173  	if ch.bufHead == ch.bufCap {
 174  		ch.bufHead = 0
 175  	}
 176  
 177  	memcpy(elemAddr, value, ch.elementSize)
 178  }
 179  
 180  // Pop a value from the channel buffer and store it in the 'value' pointer, for
 181  // a receive operation.
 182  // This function may only be called when interrupts are disabled, the channel is
 183  // locked and it is known there is at least one value available in the buffer.
 184  func (ch *channel) bufferPop(value unsafe.Pointer) {
 185  	elemAddr := unsafe.Add(ch.buf, ch.bufTail*ch.elementSize)
 186  	ch.bufLen--
 187  	ch.bufTail++
 188  	if ch.bufTail == ch.bufCap {
 189  		ch.bufTail = 0
 190  	}
 191  
 192  	memcpy(value, elemAddr, ch.elementSize)
 193  
 194  	// Zero the value to allow the GC to collect it.
 195  	memzero(elemAddr, ch.elementSize)
 196  }
 197  
 198  // Try to proceed with this send operation without blocking, and return whether
 199  // the send succeeded. Interrupts must be disabled and the lock must be held
 200  // when calling this function.
 201  func (ch *channel) trySend(value unsafe.Pointer) bool {
 202  	// To make sure we send values in the correct order, we can only send
 203  	// directly to a receiver when there are no values in the buffer.
 204  
 205  	// Do not allow sending on a closed channel.
 206  	if ch.closed {
 207  		// Note: we cannot currently recover from this panic.
 208  		// There's some state in the select statement especially that would be
 209  		// corrupted if we allowed recovering from this panic.
 210  		runtimePanic("send on closed channel")
 211  	}
 212  
 213  	// There is no value in the buffer and we have a receiver available. Copy
 214  	// the value directly into the receiver.
 215  	if ch.bufLen == 0 {
 216  		if receiver := ch.receivers.pop(chanOperationOk); receiver != nil {
 217  			memcpy(receiver.task.Ptr, value, ch.elementSize)
 218  			scheduleTask(receiver.task)
 219  			return true
 220  		}
 221  	}
 222  
 223  	// If there is space in the buffer (if this is a buffered channel), we can
 224  	// store the value in the buffer and continue.
 225  	if ch.bufLen < ch.bufCap {
 226  		ch.bufferPush(value)
 227  		return true
 228  	}
 229  	return false
 230  }
 231  
 232  func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
 233  	if ch == nil {
 234  		// A nil channel blocks forever. Do not schedule this goroutine again.
 235  		deadlock()
 236  	}
 237  
 238  	mask := interrupt.Disable()
 239  	ch.lock.Lock()
 240  
 241  	// See whether we can proceed immediately, and if so, return early.
 242  	if ch.trySend(value) {
 243  		ch.lock.Unlock()
 244  		interrupt.Restore(mask)
 245  		return
 246  	}
 247  
 248  	// Can't proceed. Add us to the list of senders and wait until we're awoken.
 249  	t := task.Current()
 250  	t.SetDataUint32(chanOperationWaiting)
 251  	op.task = t
 252  	op.index = 0
 253  	op.value = value
 254  	ch.senders.push(op)
 255  	ch.lock.Unlock()
 256  	interrupt.Restore(mask)
 257  
 258  	// Wait until a receiver picks up the value.
 259  	if !hasScheduler {
 260  		for t.DataUint32() == chanOperationWaiting {
 261  			if hasPoll {
 262  				netpollBlock(100)
 263  			} else {
 264  				sleepTicks(nanosecondsToTicks(100_000_000))
 265  			}
 266  			fireTimers()
 267  		}
 268  	} else {
 269  		task.Pause()
 270  	}
 271  
 272  	// Check whether the sent happened normally (not because the channel was
 273  	// closed while sending).
 274  	if t.DataUint32() == chanOperationClosed {
 275  		// Oops, this channel was closed while sending!
 276  		runtimePanic("send on closed channel")
 277  	}
 278  }
 279  
 280  // Try to proceed with this receive operation without blocking, and return
 281  // whether the receive operation succeeded. Interrupts must be disabled and the
 282  // lock must be held when calling this function.
 283  func (ch *channel) tryRecv(value unsafe.Pointer) (received, ok bool) {
 284  	// To make sure we keep the values in the channel in the correct order, we
 285  	// first have to read values from the buffer before we can look at the
 286  	// senders.
 287  
 288  	// If there is a value available in the buffer, we can pull it out and
 289  	// proceed immediately.
 290  	if ch.bufLen > 0 {
 291  		ch.bufferPop(value)
 292  
 293  		// Check for the next sender available and push it to the buffer.
 294  		if sender := ch.senders.pop(chanOperationOk); sender != nil {
 295  			ch.bufferPush(sender.value)
 296  			scheduleTask(sender.task)
 297  		}
 298  
 299  		return true, true
 300  	}
 301  
 302  	if ch.closed {
 303  		// Channel is closed, so proceed immediately.
 304  		memzero(value, ch.elementSize)
 305  		return true, false
 306  	}
 307  
 308  	// If there is a sender, we can proceed with the channel operation
 309  	// immediately.
 310  	if sender := ch.senders.pop(chanOperationOk); sender != nil {
 311  		memcpy(value, sender.value, ch.elementSize)
 312  		scheduleTask(sender.task)
 313  		return true, true
 314  	}
 315  
 316  	return false, false
 317  }
 318  
 319  func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
 320  	if ch == nil {
 321  		// A nil channel blocks forever. Do not schedule this goroutine again.
 322  		deadlock()
 323  	}
 324  
 325  	mask := interrupt.Disable()
 326  	ch.lock.Lock()
 327  
 328  	if received, ok := ch.tryRecv(value); received {
 329  		ch.lock.Unlock()
 330  		interrupt.Restore(mask)
 331  		return ok
 332  	}
 333  
 334  	// We can't proceed, so we add ourselves to the list of receivers and wait
 335  	// until we're awoken.
 336  	t := task.Current()
 337  	t.Ptr = value
 338  	t.SetDataUint32(chanOperationWaiting)
 339  	op.task = t
 340  	op.index = 0
 341  	ch.receivers.push(op)
 342  	ch.lock.Unlock()
 343  	interrupt.Restore(mask)
 344  
 345  	// Wait until a sender provides a value.
 346  	if !hasScheduler {
 347  		for t.DataUint32() == chanOperationWaiting {
 348  			if hasPoll {
 349  				netpollBlock(100)
 350  			} else {
 351  				sleepTicks(nanosecondsToTicks(100_000_000))
 352  			}
 353  			fireTimers()
 354  		}
 355  	} else {
 356  		task.Pause()
 357  	}
 358  
 359  	// Return whether the receive happened from a closed channel.
 360  	return t.DataUint32() != chanOperationClosed
 361  }
 362  
 363  // chanClose closes the given channel. If this channel has a receiver or is
 364  // empty, it closes the channel. Else, it panics.
 365  func chanClose(ch *channel) {
 366  	if ch == nil {
 367  		// Not allowed by the language spec.
 368  		runtimePanic("close of nil channel")
 369  	}
 370  
 371  	mask := interrupt.Disable()
 372  	ch.lock.Lock()
 373  
 374  	if ch.closed {
 375  		// Not allowed by the language spec.
 376  		ch.lock.Unlock()
 377  		interrupt.Restore(mask)
 378  		runtimePanic("close of closed channel")
 379  	}
 380  
 381  	// Proceed all receiving operations that are blocked.
 382  	for {
 383  		receiver := ch.receivers.pop(chanOperationClosed)
 384  		if receiver == nil {
 385  			// Processed all receivers.
 386  			break
 387  		}
 388  
 389  		// Zero the value that the receiver is getting.
 390  		memzero(receiver.task.Ptr, ch.elementSize)
 391  
 392  		// Wake up the receiving goroutine.
 393  		scheduleTask(receiver.task)
 394  	}
 395  
 396  	// Let all senders panic.
 397  	for {
 398  		sender := ch.senders.pop(chanOperationClosed)
 399  		if sender == nil {
 400  			break // processed all senders
 401  		}
 402  
 403  		// Wake up the sender.
 404  		scheduleTask(sender.task)
 405  	}
 406  
 407  	ch.closed = true
 408  
 409  	ch.lock.Unlock()
 410  	interrupt.Restore(mask)
 411  }
 412  
 413  // We currently use a global select lock to avoid deadlocks while locking each
 414  // individual channel in the select. Without this global lock, two select
 415  // operations that have a different order of the same channels could end up in a
 416  // deadlock. This global lock is inefficient if there are many select operations
 417  // happening in parallel, but gets the job done.
 418  //
 419  // If this becomes a performance issue, we can see how the Go runtime does this.
 420  // I think it does this by sorting all states by channel address and then
 421  // locking them in that order to avoid this deadlock.
 422  var chanSelectLock task.PMutex
 423  
 424  // Lock all channels (taking care to skip duplicate channels).
 425  func lockAllStates(states []chanSelectState) {
 426  	if !hasParallelism {
 427  		return
 428  	}
 429  	for _, state := range states {
 430  		if state.ch != nil && !state.ch.selectLocked {
 431  			state.ch.lock.Lock()
 432  			state.ch.selectLocked = true
 433  		}
 434  	}
 435  }
 436  
 437  // Unlock all channels (taking care to skip duplicate channels).
 438  func unlockAllStates(states []chanSelectState) {
 439  	if !hasParallelism {
 440  		return
 441  	}
 442  	for _, state := range states {
 443  		if state.ch != nil && state.ch.selectLocked {
 444  			state.ch.lock.Unlock()
 445  			state.ch.selectLocked = false
 446  		}
 447  	}
 448  }
 449  
 450  // chanSelect implements blocking or non-blocking select operations.
 451  // The 'ops' slice must be set if (and only if) this is a blocking select.
 452  func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelOp) (uint32, bool) {
 453  	mask := interrupt.Disable()
 454  
 455  	// Lock everything.
 456  	chanSelectLock.Lock()
 457  	lockAllStates(states)
 458  
 459  	const selectNoIndex = ^uint32(0)
 460  	selectIndex := selectNoIndex
 461  	selectOk := true
 462  
 463  	// Iterate over each state, and see if it can proceed.
 464  	// TODO: start from a random index.
 465  	for i, state := range states {
 466  		if state.ch == nil {
 467  			// A nil channel blocks forever, so it won't take part of the select
 468  			// operation.
 469  			continue
 470  		}
 471  
 472  		if state.value == nil { // chan receive
 473  			if received, ok := state.ch.tryRecv(recvbuf); received {
 474  				selectIndex = uint32(i)
 475  				selectOk = ok
 476  				break
 477  			}
 478  		} else { // chan send
 479  			if state.ch.trySend(state.value) {
 480  				selectIndex = uint32(i)
 481  				break
 482  			}
 483  		}
 484  	}
 485  
 486  	// If this select can immediately proceed, or is a non-blocking select,
 487  	// return early.
 488  	blocking := len(ops) != 0
 489  	if selectIndex != selectNoIndex || !blocking {
 490  		unlockAllStates(states)
 491  		chanSelectLock.Unlock()
 492  		interrupt.Restore(mask)
 493  		return selectIndex, selectOk
 494  	}
 495  
 496  	// The select is blocking and no channel operation can proceed, so things
 497  	// become more complicated.
 498  	// We add ourselves as a sender/receiver to every channel, and wait for the
 499  	// first one to complete. Only one will successfully complete, because
 500  	// senders and receivers use a compare-and-exchange atomic operation on
 501  	// t.Data so that only one will be able to "take" this select operation.
 502  	t := task.Current()
 503  	t.Ptr = recvbuf
 504  	t.SetDataUint32(chanOperationWaiting)
 505  	for i, state := range states {
 506  		if state.ch == nil {
 507  			continue
 508  		}
 509  		op := &ops[i]
 510  		op.task = t
 511  		op.index = uint32(i)
 512  		if state.value == nil { // chan receive
 513  			state.ch.receivers.push(op)
 514  		} else { // chan send
 515  			op.value = state.value
 516  			state.ch.senders.push(op)
 517  		}
 518  	}
 519  
 520  	// Now we wait until one of the send/receive operations can proceed.
 521  	unlockAllStates(states)
 522  	chanSelectLock.Unlock()
 523  	interrupt.Restore(mask)
 524  
 525  	// Single-threaded event loop: poll for I/O and timers until a
 526  	// channel operation completes (changes t.Data from waiting).
 527  	if !hasScheduler {
 528  		for t.DataUint32() == chanOperationWaiting {
 529  			if hasPoll {
 530  				netpollBlock(100)
 531  			} else {
 532  				sleepTicks(nanosecondsToTicks(100_000_000))
 533  			}
 534  			fireTimers()
 535  		}
 536  	} else {
 537  		task.Pause()
 538  	}
 539  
 540  	// Resumed, so one channel operation must have progressed.
 541  
 542  	// Make sure all channel ops are removed from the senders/receivers
 543  	// queue before we return and the memory of them becomes invalid.
 544  	chanSelectLock.Lock()
 545  	lockAllStates(states)
 546  	for i, state := range states {
 547  		if state.ch == nil {
 548  			continue
 549  		}
 550  		op := &ops[i]
 551  		mask := interrupt.Disable()
 552  		if state.value == nil {
 553  			state.ch.receivers.remove(op)
 554  		} else {
 555  			state.ch.senders.remove(op)
 556  		}
 557  		interrupt.Restore(mask)
 558  	}
 559  	unlockAllStates(states)
 560  	chanSelectLock.Unlock()
 561  
 562  	// Pull the return values out of t.Data (which contains two bitfields).
 563  	selectIndex = t.DataUint32() >> 2
 564  	selectOk = t.DataUint32()&chanOperationMask != chanOperationClosed
 565  
 566  	return selectIndex, selectOk
 567  }
 568