channels.go raw

   1  /* SPDX-License-Identifier: MIT
   2   *
   3   * Copyright (C) 2017-2025 WireGuard LLC. All Rights Reserved.
   4   */
   5  
   6  package device
   7  
   8  import (
   9  	"runtime"
  10  	"sync"
  11  )
  12  
  13  // An outboundQueue is a channel of QueueOutboundElements awaiting encryption.
  14  // An outboundQueue is ref-counted using its wg field.
  15  // An outboundQueue created with newOutboundQueue has one reference.
  16  // Every additional writer must call wg.Add(1).
  17  // Every completed writer must call wg.Done().
  18  // When no further writers will be added,
  19  // call wg.Done to remove the initial reference.
  20  // When the refcount hits 0, the queue's channel is closed.
  21  type outboundQueue struct {
  22  	c  chan *QueueOutboundElementsContainer
  23  	wg sync.WaitGroup
  24  }
  25  
  26  func newOutboundQueue() *outboundQueue {
  27  	q := &outboundQueue{
  28  		c: make(chan *QueueOutboundElementsContainer, QueueOutboundSize),
  29  	}
  30  	q.wg.Add(1)
  31  	go func() {
  32  		q.wg.Wait()
  33  		close(q.c)
  34  	}()
  35  	return q
  36  }
  37  
  38  // A inboundQueue is similar to an outboundQueue; see those docs.
  39  type inboundQueue struct {
  40  	c  chan *QueueInboundElementsContainer
  41  	wg sync.WaitGroup
  42  }
  43  
  44  func newInboundQueue() *inboundQueue {
  45  	q := &inboundQueue{
  46  		c: make(chan *QueueInboundElementsContainer, QueueInboundSize),
  47  	}
  48  	q.wg.Add(1)
  49  	go func() {
  50  		q.wg.Wait()
  51  		close(q.c)
  52  	}()
  53  	return q
  54  }
  55  
  56  // A handshakeQueue is similar to an outboundQueue; see those docs.
  57  type handshakeQueue struct {
  58  	c  chan QueueHandshakeElement
  59  	wg sync.WaitGroup
  60  }
  61  
  62  func newHandshakeQueue() *handshakeQueue {
  63  	q := &handshakeQueue{
  64  		c: make(chan QueueHandshakeElement, QueueHandshakeSize),
  65  	}
  66  	q.wg.Add(1)
  67  	go func() {
  68  		q.wg.Wait()
  69  		close(q.c)
  70  	}()
  71  	return q
  72  }
  73  
  74  type autodrainingInboundQueue struct {
  75  	c chan *QueueInboundElementsContainer
  76  }
  77  
  78  // newAutodrainingInboundQueue returns a channel that will be drained when it gets GC'd.
  79  // It is useful in cases in which is it hard to manage the lifetime of the channel.
  80  // The returned channel must not be closed. Senders should signal shutdown using
  81  // some other means, such as sending a sentinel nil values.
  82  func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue {
  83  	q := &autodrainingInboundQueue{
  84  		c: make(chan *QueueInboundElementsContainer, QueueInboundSize),
  85  	}
  86  	runtime.SetFinalizer(q, device.flushInboundQueue)
  87  	return q
  88  }
  89  
  90  func (device *Device) flushInboundQueue(q *autodrainingInboundQueue) {
  91  	for {
  92  		select {
  93  		case elemsContainer := <-q.c:
  94  			elemsContainer.Lock()
  95  			for _, elem := range elemsContainer.elems {
  96  				device.PutMessageBuffer(elem.buffer)
  97  				device.PutInboundElement(elem)
  98  			}
  99  			device.PutInboundElementsContainer(elemsContainer)
 100  		default:
 101  			return
 102  		}
 103  	}
 104  }
 105  
 106  type autodrainingOutboundQueue struct {
 107  	c chan *QueueOutboundElementsContainer
 108  }
 109  
 110  // newAutodrainingOutboundQueue returns a channel that will be drained when it gets GC'd.
 111  // It is useful in cases in which is it hard to manage the lifetime of the channel.
 112  // The returned channel must not be closed. Senders should signal shutdown using
 113  // some other means, such as sending a sentinel nil values.
 114  // All sends to the channel must be best-effort, because there may be no receivers.
 115  func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue {
 116  	q := &autodrainingOutboundQueue{
 117  		c: make(chan *QueueOutboundElementsContainer, QueueOutboundSize),
 118  	}
 119  	runtime.SetFinalizer(q, device.flushOutboundQueue)
 120  	return q
 121  }
 122  
 123  func (device *Device) flushOutboundQueue(q *autodrainingOutboundQueue) {
 124  	for {
 125  		select {
 126  		case elemsContainer := <-q.c:
 127  			elemsContainer.Lock()
 128  			for _, elem := range elemsContainer.elems {
 129  				device.PutMessageBuffer(elem.buffer)
 130  				device.PutOutboundElement(elem)
 131  			}
 132  			device.PutOutboundElementsContainer(elemsContainer)
 133  		default:
 134  			return
 135  		}
 136  	}
 137  }
 138