1 /* SPDX-License-Identifier: MIT
2 *
3 * Copyright (C) 2017-2025 WireGuard LLC. All Rights Reserved.
4 */
5 6 package device
7 8 import (
9 "runtime"
10 "sync"
11 )
12 13 // An outboundQueue is a channel of QueueOutboundElements awaiting encryption.
14 // An outboundQueue is ref-counted using its wg field.
15 // An outboundQueue created with newOutboundQueue has one reference.
16 // Every additional writer must call wg.Add(1).
17 // Every completed writer must call wg.Done().
18 // When no further writers will be added,
19 // call wg.Done to remove the initial reference.
20 // When the refcount hits 0, the queue's channel is closed.
21 type outboundQueue struct {
22 c chan *QueueOutboundElementsContainer
23 wg sync.WaitGroup
24 }
25 26 func newOutboundQueue() *outboundQueue {
27 q := &outboundQueue{
28 c: make(chan *QueueOutboundElementsContainer, QueueOutboundSize),
29 }
30 q.wg.Add(1)
31 go func() {
32 q.wg.Wait()
33 close(q.c)
34 }()
35 return q
36 }
37 38 // A inboundQueue is similar to an outboundQueue; see those docs.
39 type inboundQueue struct {
40 c chan *QueueInboundElementsContainer
41 wg sync.WaitGroup
42 }
43 44 func newInboundQueue() *inboundQueue {
45 q := &inboundQueue{
46 c: make(chan *QueueInboundElementsContainer, QueueInboundSize),
47 }
48 q.wg.Add(1)
49 go func() {
50 q.wg.Wait()
51 close(q.c)
52 }()
53 return q
54 }
55 56 // A handshakeQueue is similar to an outboundQueue; see those docs.
57 type handshakeQueue struct {
58 c chan QueueHandshakeElement
59 wg sync.WaitGroup
60 }
61 62 func newHandshakeQueue() *handshakeQueue {
63 q := &handshakeQueue{
64 c: make(chan QueueHandshakeElement, QueueHandshakeSize),
65 }
66 q.wg.Add(1)
67 go func() {
68 q.wg.Wait()
69 close(q.c)
70 }()
71 return q
72 }
73 74 type autodrainingInboundQueue struct {
75 c chan *QueueInboundElementsContainer
76 }
77 78 // newAutodrainingInboundQueue returns a channel that will be drained when it gets GC'd.
79 // It is useful in cases in which is it hard to manage the lifetime of the channel.
80 // The returned channel must not be closed. Senders should signal shutdown using
81 // some other means, such as sending a sentinel nil values.
82 func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue {
83 q := &autodrainingInboundQueue{
84 c: make(chan *QueueInboundElementsContainer, QueueInboundSize),
85 }
86 runtime.SetFinalizer(q, device.flushInboundQueue)
87 return q
88 }
89 90 func (device *Device) flushInboundQueue(q *autodrainingInboundQueue) {
91 for {
92 select {
93 case elemsContainer := <-q.c:
94 elemsContainer.Lock()
95 for _, elem := range elemsContainer.elems {
96 device.PutMessageBuffer(elem.buffer)
97 device.PutInboundElement(elem)
98 }
99 device.PutInboundElementsContainer(elemsContainer)
100 default:
101 return
102 }
103 }
104 }
105 106 type autodrainingOutboundQueue struct {
107 c chan *QueueOutboundElementsContainer
108 }
109 110 // newAutodrainingOutboundQueue returns a channel that will be drained when it gets GC'd.
111 // It is useful in cases in which is it hard to manage the lifetime of the channel.
112 // The returned channel must not be closed. Senders should signal shutdown using
113 // some other means, such as sending a sentinel nil values.
114 // All sends to the channel must be best-effort, because there may be no receivers.
115 func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue {
116 q := &autodrainingOutboundQueue{
117 c: make(chan *QueueOutboundElementsContainer, QueueOutboundSize),
118 }
119 runtime.SetFinalizer(q, device.flushOutboundQueue)
120 return q
121 }
122 123 func (device *Device) flushOutboundQueue(q *autodrainingOutboundQueue) {
124 for {
125 select {
126 case elemsContainer := <-q.c:
127 elemsContainer.Lock()
128 for _, elem := range elemsContainer.elems {
129 device.PutMessageBuffer(elem.buffer)
130 device.PutOutboundElement(elem)
131 }
132 device.PutOutboundElementsContainer(elemsContainer)
133 default:
134 return
135 }
136 }
137 }
138