1 package runtime
2 3 // This file implements the 'chan' type and send/receive/select operations.
4 //
5 // Every channel has a list of senders and a list of receivers, and possibly a
6 // queue. There is no 'channel state', the state is inferred from the available
7 // senders/receivers and values in the buffer.
8 //
9 // - A sender will first try to send the value to a waiting receiver if there is
10 // one, but only if there is nothing in the queue (to keep the values flowing
11 // in the correct order). If it can't, it will add the value in the queue and
12 // possibly wait as a sender if there's no space available.
13 // - A receiver will first try to read a value from the queue, but if there is
14 // none it will try to read from a sender in the list. It will block if it
15 // can't proceed.
16 //
17 // State is kept in various ways:
18 //
19 // - The sender value is stored in the sender 'channelOp', which is really a
20 // queue entry. This works for both senders and select operations: a select
21 // operation has a separate value to send for each case.
22 // - The receiver value is stored inside Task.Ptr. This works for receivers, and
23 // importantly also works for select which has a single buffer for every
24 // receive operation.
25 // - The `Task.Data` value stores how the channel operation proceeded. For
26 // normal send/receive operations, it starts at chanOperationWaiting and then
27 // is changed to chanOperationOk or chanOperationClosed depending on whether
28 // the send/receive proceeded normally or because it was closed. For a select
29 // operation, it also stores the 'case' index in the upper bits (zero for
30 // non-select operations) so that the select operation knows which case did
31 // proceed.
32 // The value is at the same time also a way that goroutines can be the first
33 // (and only) goroutine to 'take' a channel operation using an atomic CAS
34 // operation to change it from 'waiting' to any other value. This is important
35 // for the select statement because multiple goroutines could try to let
36 // different channels in the select statement proceed at the same time. By
37 // using Task.Data, only a single channel operation in the select statement
38 // can proceed.
39 // - It is possible for the channel queues to contain already-processed senders
40 // or receivers. This can happen when the select statement managed to proceed
41 // but the goroutine doing the select has not yet cleaned up the stale queue
42 // entries before returning. This should therefore only happen for a short
43 // period.
44 45 import (
46 "internal/task"
47 "runtime/interrupt"
48 "unsafe"
49 )
50 51 // The runtime implementation of the Go 'chan' type.
52 type channel struct {
53 closed bool
54 selectLocked bool
55 elementSize uintptr
56 bufCap uintptr // 'cap'
57 bufLen uintptr // 'len'
58 bufHead uintptr
59 bufTail uintptr
60 senders chanQueue
61 receivers chanQueue
62 lock task.PMutex
63 buf unsafe.Pointer
64 }
65 66 const (
67 chanOperationWaiting = 0b00 // waiting for a send/receive operation to continue
68 chanOperationOk = 0b01 // successfully sent or received (not closed)
69 chanOperationClosed = 0b10 // channel was closed, the value has been zeroed
70 chanOperationMask = 0b11
71 )
72 73 type chanQueue struct {
74 first *channelOp
75 }
76 77 // Pus the next channel operation to the queue. All appropriate fields must have
78 // been initialized already.
79 // This function must be called with interrupts disabled and the channel lock
80 // held.
81 func (q *chanQueue) push(node *channelOp) {
82 node.next = q.first
83 q.first = node
84 }
85 86 // Pop the next waiting channel from the queue. Channels that are no longer
87 // waiting (for example, when they're part of a select operation) will be
88 // skipped.
89 // This function must be called with interrupts disabled.
90 func (q *chanQueue) pop(chanOp uint32) *channelOp {
91 for {
92 if q.first == nil {
93 return nil
94 }
95 96 // Pop next from the queue.
97 popped := q.first
98 q.first = q.first.next
99 100 // The new value for the 'data' field will be a combination of the
101 // channel operation and the select index. (The select index is 0 for
102 // non-select channel operations).
103 newDataValue := chanOp | popped.index<<2
104 105 // Try to be the first to proceed with this goroutine.
106 swapped := popped.task.DataAtomicUint32().CompareAndSwap(0, newDataValue)
107 if swapped {
108 return popped
109 }
110 }
111 }
112 113 // Remove the given to-be-removed node from the queue if it is part of the
114 // queue. If there are multiple, only one will be removed.
115 // This function must be called with interrupts disabled and the channel lock
116 // held.
117 func (q *chanQueue) remove(remove *channelOp) {
118 n := &q.first
119 for *n != nil {
120 if *n == remove {
121 *n = (*n).next
122 return
123 }
124 n = &((*n).next)
125 }
126 }
127 128 type channelOp struct {
129 next *channelOp
130 task *task.Task
131 index uint32 // select index, 0 for non-select operation
132 value unsafe.Pointer // if this is a sender, this is the value to send
133 }
134 135 type chanSelectState struct {
136 ch *channel
137 value unsafe.Pointer
138 }
139 140 func chanMake(elementSize uintptr, bufSize uintptr) *channel {
141 return &channel{
142 elementSize: elementSize,
143 bufCap: bufSize,
144 buf: alloc(elementSize*bufSize, nil),
145 }
146 }
147 148 // Return the number of entries in this chan, called from the len builtin.
149 // A nil chan is defined as having length 0.
150 func chanLen(c *channel) int {
151 if c == nil {
152 return 0
153 }
154 return int(c.bufLen)
155 }
156 157 // Return the capacity of this chan, called from the cap builtin.
158 // A nil chan is defined as having capacity 0.
159 func chanCap(c *channel) int {
160 if c == nil {
161 return 0
162 }
163 return int(c.bufCap)
164 }
165 166 // Push the value to the channel buffer array, for a send operation.
167 // This function may only be called when interrupts are disabled, the channel is
168 // locked and it is known there is space available in the buffer.
169 func (ch *channel) bufferPush(value unsafe.Pointer) {
170 elemAddr := unsafe.Add(ch.buf, ch.bufHead*ch.elementSize)
171 ch.bufLen++
172 ch.bufHead++
173 if ch.bufHead == ch.bufCap {
174 ch.bufHead = 0
175 }
176 177 memcpy(elemAddr, value, ch.elementSize)
178 }
179 180 // Pop a value from the channel buffer and store it in the 'value' pointer, for
181 // a receive operation.
182 // This function may only be called when interrupts are disabled, the channel is
183 // locked and it is known there is at least one value available in the buffer.
184 func (ch *channel) bufferPop(value unsafe.Pointer) {
185 elemAddr := unsafe.Add(ch.buf, ch.bufTail*ch.elementSize)
186 ch.bufLen--
187 ch.bufTail++
188 if ch.bufTail == ch.bufCap {
189 ch.bufTail = 0
190 }
191 192 memcpy(value, elemAddr, ch.elementSize)
193 194 // Zero the value to allow the GC to collect it.
195 memzero(elemAddr, ch.elementSize)
196 }
197 198 // Try to proceed with this send operation without blocking, and return whether
199 // the send succeeded. Interrupts must be disabled and the lock must be held
200 // when calling this function.
201 func (ch *channel) trySend(value unsafe.Pointer) bool {
202 // To make sure we send values in the correct order, we can only send
203 // directly to a receiver when there are no values in the buffer.
204 205 // Do not allow sending on a closed channel.
206 if ch.closed {
207 // Note: we cannot currently recover from this panic.
208 // There's some state in the select statement especially that would be
209 // corrupted if we allowed recovering from this panic.
210 runtimePanic("send on closed channel")
211 }
212 213 // There is no value in the buffer and we have a receiver available. Copy
214 // the value directly into the receiver.
215 if ch.bufLen == 0 {
216 if receiver := ch.receivers.pop(chanOperationOk); receiver != nil {
217 memcpy(receiver.task.Ptr, value, ch.elementSize)
218 scheduleTask(receiver.task)
219 return true
220 }
221 }
222 223 // If there is space in the buffer (if this is a buffered channel), we can
224 // store the value in the buffer and continue.
225 if ch.bufLen < ch.bufCap {
226 ch.bufferPush(value)
227 return true
228 }
229 return false
230 }
231 232 func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
233 if ch == nil {
234 // A nil channel blocks forever. Do not schedule this goroutine again.
235 deadlock()
236 }
237 238 mask := interrupt.Disable()
239 ch.lock.Lock()
240 241 // See whether we can proceed immediately, and if so, return early.
242 if ch.trySend(value) {
243 ch.lock.Unlock()
244 interrupt.Restore(mask)
245 return
246 }
247 248 // Can't proceed. Add us to the list of senders and wait until we're awoken.
249 t := task.Current()
250 t.SetDataUint32(chanOperationWaiting)
251 op.task = t
252 op.index = 0
253 op.value = value
254 ch.senders.push(op)
255 ch.lock.Unlock()
256 interrupt.Restore(mask)
257 258 // Wait until a receiver picks up the value.
259 if !hasScheduler {
260 for t.DataUint32() == chanOperationWaiting {
261 if hasPoll {
262 netpollBlock(100)
263 } else {
264 sleepTicks(nanosecondsToTicks(100_000_000))
265 }
266 fireTimers()
267 }
268 } else {
269 task.Pause()
270 }
271 272 // Check whether the sent happened normally (not because the channel was
273 // closed while sending).
274 if t.DataUint32() == chanOperationClosed {
275 // Oops, this channel was closed while sending!
276 runtimePanic("send on closed channel")
277 }
278 }
279 280 // Try to proceed with this receive operation without blocking, and return
281 // whether the receive operation succeeded. Interrupts must be disabled and the
282 // lock must be held when calling this function.
283 func (ch *channel) tryRecv(value unsafe.Pointer) (received, ok bool) {
284 // To make sure we keep the values in the channel in the correct order, we
285 // first have to read values from the buffer before we can look at the
286 // senders.
287 288 // If there is a value available in the buffer, we can pull it out and
289 // proceed immediately.
290 if ch.bufLen > 0 {
291 ch.bufferPop(value)
292 293 // Check for the next sender available and push it to the buffer.
294 if sender := ch.senders.pop(chanOperationOk); sender != nil {
295 ch.bufferPush(sender.value)
296 scheduleTask(sender.task)
297 }
298 299 return true, true
300 }
301 302 if ch.closed {
303 // Channel is closed, so proceed immediately.
304 memzero(value, ch.elementSize)
305 return true, false
306 }
307 308 // If there is a sender, we can proceed with the channel operation
309 // immediately.
310 if sender := ch.senders.pop(chanOperationOk); sender != nil {
311 memcpy(value, sender.value, ch.elementSize)
312 scheduleTask(sender.task)
313 return true, true
314 }
315 316 return false, false
317 }
318 319 func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
320 if ch == nil {
321 // A nil channel blocks forever. Do not schedule this goroutine again.
322 deadlock()
323 }
324 325 mask := interrupt.Disable()
326 ch.lock.Lock()
327 328 if received, ok := ch.tryRecv(value); received {
329 ch.lock.Unlock()
330 interrupt.Restore(mask)
331 return ok
332 }
333 334 // We can't proceed, so we add ourselves to the list of receivers and wait
335 // until we're awoken.
336 t := task.Current()
337 t.Ptr = value
338 t.SetDataUint32(chanOperationWaiting)
339 op.task = t
340 op.index = 0
341 ch.receivers.push(op)
342 ch.lock.Unlock()
343 interrupt.Restore(mask)
344 345 // Wait until a sender provides a value.
346 if !hasScheduler {
347 for t.DataUint32() == chanOperationWaiting {
348 if hasPoll {
349 netpollBlock(100)
350 } else {
351 sleepTicks(nanosecondsToTicks(100_000_000))
352 }
353 fireTimers()
354 }
355 } else {
356 task.Pause()
357 }
358 359 // Return whether the receive happened from a closed channel.
360 return t.DataUint32() != chanOperationClosed
361 }
362 363 // chanClose closes the given channel. If this channel has a receiver or is
364 // empty, it closes the channel. Else, it panics.
365 func chanClose(ch *channel) {
366 if ch == nil {
367 // Not allowed by the language spec.
368 runtimePanic("close of nil channel")
369 }
370 371 mask := interrupt.Disable()
372 ch.lock.Lock()
373 374 if ch.closed {
375 // Not allowed by the language spec.
376 ch.lock.Unlock()
377 interrupt.Restore(mask)
378 runtimePanic("close of closed channel")
379 }
380 381 // Proceed all receiving operations that are blocked.
382 for {
383 receiver := ch.receivers.pop(chanOperationClosed)
384 if receiver == nil {
385 // Processed all receivers.
386 break
387 }
388 389 // Zero the value that the receiver is getting.
390 memzero(receiver.task.Ptr, ch.elementSize)
391 392 // Wake up the receiving goroutine.
393 scheduleTask(receiver.task)
394 }
395 396 // Let all senders panic.
397 for {
398 sender := ch.senders.pop(chanOperationClosed)
399 if sender == nil {
400 break // processed all senders
401 }
402 403 // Wake up the sender.
404 scheduleTask(sender.task)
405 }
406 407 ch.closed = true
408 409 ch.lock.Unlock()
410 interrupt.Restore(mask)
411 }
412 413 // We currently use a global select lock to avoid deadlocks while locking each
414 // individual channel in the select. Without this global lock, two select
415 // operations that have a different order of the same channels could end up in a
416 // deadlock. This global lock is inefficient if there are many select operations
417 // happening in parallel, but gets the job done.
418 //
419 // If this becomes a performance issue, we can see how the Go runtime does this.
420 // I think it does this by sorting all states by channel address and then
421 // locking them in that order to avoid this deadlock.
422 var chanSelectLock task.PMutex
423 424 // Lock all channels (taking care to skip duplicate channels).
425 func lockAllStates(states []chanSelectState) {
426 if !hasParallelism {
427 return
428 }
429 for _, state := range states {
430 if state.ch != nil && !state.ch.selectLocked {
431 state.ch.lock.Lock()
432 state.ch.selectLocked = true
433 }
434 }
435 }
436 437 // Unlock all channels (taking care to skip duplicate channels).
438 func unlockAllStates(states []chanSelectState) {
439 if !hasParallelism {
440 return
441 }
442 for _, state := range states {
443 if state.ch != nil && state.ch.selectLocked {
444 state.ch.lock.Unlock()
445 state.ch.selectLocked = false
446 }
447 }
448 }
449 450 // chanSelect implements blocking or non-blocking select operations.
451 // The 'ops' slice must be set if (and only if) this is a blocking select.
452 func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelOp) (uint32, bool) {
453 mask := interrupt.Disable()
454 455 // Lock everything.
456 chanSelectLock.Lock()
457 lockAllStates(states)
458 459 const selectNoIndex = ^uint32(0)
460 selectIndex := selectNoIndex
461 selectOk := true
462 463 // Iterate over each state, and see if it can proceed.
464 // TODO: start from a random index.
465 for i, state := range states {
466 if state.ch == nil {
467 // A nil channel blocks forever, so it won't take part of the select
468 // operation.
469 continue
470 }
471 472 if state.value == nil { // chan receive
473 if received, ok := state.ch.tryRecv(recvbuf); received {
474 selectIndex = uint32(i)
475 selectOk = ok
476 break
477 }
478 } else { // chan send
479 if state.ch.trySend(state.value) {
480 selectIndex = uint32(i)
481 break
482 }
483 }
484 }
485 486 // If this select can immediately proceed, or is a non-blocking select,
487 // return early.
488 blocking := len(ops) != 0
489 if selectIndex != selectNoIndex || !blocking {
490 unlockAllStates(states)
491 chanSelectLock.Unlock()
492 interrupt.Restore(mask)
493 return selectIndex, selectOk
494 }
495 496 // The select is blocking and no channel operation can proceed, so things
497 // become more complicated.
498 // We add ourselves as a sender/receiver to every channel, and wait for the
499 // first one to complete. Only one will successfully complete, because
500 // senders and receivers use a compare-and-exchange atomic operation on
501 // t.Data so that only one will be able to "take" this select operation.
502 t := task.Current()
503 t.Ptr = recvbuf
504 t.SetDataUint32(chanOperationWaiting)
505 for i, state := range states {
506 if state.ch == nil {
507 continue
508 }
509 op := &ops[i]
510 op.task = t
511 op.index = uint32(i)
512 if state.value == nil { // chan receive
513 state.ch.receivers.push(op)
514 } else { // chan send
515 op.value = state.value
516 state.ch.senders.push(op)
517 }
518 }
519 520 // Now we wait until one of the send/receive operations can proceed.
521 unlockAllStates(states)
522 chanSelectLock.Unlock()
523 interrupt.Restore(mask)
524 525 // Single-threaded event loop: poll for I/O and timers until a
526 // channel operation completes (changes t.Data from waiting).
527 if !hasScheduler {
528 for t.DataUint32() == chanOperationWaiting {
529 if hasPoll {
530 netpollBlock(100)
531 } else {
532 sleepTicks(nanosecondsToTicks(100_000_000))
533 }
534 fireTimers()
535 }
536 } else {
537 task.Pause()
538 }
539 540 // Resumed, so one channel operation must have progressed.
541 542 // Make sure all channel ops are removed from the senders/receivers
543 // queue before we return and the memory of them becomes invalid.
544 chanSelectLock.Lock()
545 lockAllStates(states)
546 for i, state := range states {
547 if state.ch == nil {
548 continue
549 }
550 op := &ops[i]
551 mask := interrupt.Disable()
552 if state.value == nil {
553 state.ch.receivers.remove(op)
554 } else {
555 state.ch.senders.remove(op)
556 }
557 interrupt.Restore(mask)
558 }
559 unlockAllStates(states)
560 chanSelectLock.Unlock()
561 562 // Pull the return values out of t.Data (which contains two bitfields).
563 selectIndex = t.DataUint32() >> 2
564 selectOk = t.DataUint32()&chanOperationMask != chanOperationClosed
565 566 return selectIndex, selectOk
567 }
568