channel.go raw
1 // Copyright 2018 The gVisor Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // Package channel provides the implementation of channel-based data-link layer
16 // endpoints. Such endpoints allow injection of inbound packets and store
17 // outbound packets in a channel.
18 package channel
19
20 import (
21 "context"
22
23 "gvisor.dev/gvisor/pkg/tcpip"
24 "gvisor.dev/gvisor/pkg/tcpip/header"
25 "gvisor.dev/gvisor/pkg/tcpip/stack"
26 )
27
28 // Notification is the interface for receiving notification from the packet
29 // queue.
30 type Notification interface {
31 // WriteNotify will be called when a write happens to the queue.
32 WriteNotify()
33 }
34
35 // NotificationHandle is an opaque handle to the registered notification target.
36 // It can be used to unregister the notification when no longer interested.
37 //
38 // +stateify savable
39 type NotificationHandle struct {
40 n Notification
41 }
42
43 type queue struct {
44 // c is the outbound packet channel.
45 c chan *stack.PacketBuffer
46 mu queueRWMutex
47 // +checklocks:mu
48 notify []*NotificationHandle
49 // +checklocks:mu
50 closed bool
51 }
52
53 func (q *queue) Close() {
54 q.mu.Lock()
55 defer q.mu.Unlock()
56 if !q.closed {
57 close(q.c)
58 }
59 q.closed = true
60 }
61
62 func (q *queue) Read() *stack.PacketBuffer {
63 select {
64 case p := <-q.c:
65 return p
66 default:
67 return nil
68 }
69 }
70
71 func (q *queue) ReadContext(ctx context.Context) *stack.PacketBuffer {
72 select {
73 case pkt := <-q.c:
74 return pkt
75 case <-ctx.Done():
76 return nil
77 }
78 }
79
80 func (q *queue) Write(pkt *stack.PacketBuffer) tcpip.Error {
81 // q holds the PacketBuffer.
82 q.mu.RLock()
83 if q.closed {
84 q.mu.RUnlock()
85 return &tcpip.ErrClosedForSend{}
86 }
87
88 wrote := false
89 p := pkt.Clone()
90 select {
91 case q.c <- p:
92 wrote = true
93 default:
94 p.DecRef()
95 }
96 notify := q.notify
97 q.mu.RUnlock()
98
99 if wrote {
100 // Send notification outside of lock.
101 for _, h := range notify {
102 h.n.WriteNotify()
103 }
104 return nil
105 }
106 return &tcpip.ErrNoBufferSpace{}
107 }
108
109 func (q *queue) Num() int {
110 return len(q.c)
111 }
112
113 func (q *queue) AddNotify(notify Notification) *NotificationHandle {
114 q.mu.Lock()
115 defer q.mu.Unlock()
116 h := &NotificationHandle{n: notify}
117 q.notify = append(q.notify, h)
118 return h
119 }
120
121 func (q *queue) RemoveNotify(handle *NotificationHandle) {
122 q.mu.Lock()
123 defer q.mu.Unlock()
124 // Make a copy, since we reads the array outside of lock when notifying.
125 notify := make([]*NotificationHandle, 0, len(q.notify))
126 for _, h := range q.notify {
127 if h != handle {
128 notify = append(notify, h)
129 }
130 }
131 q.notify = notify
132 }
133
134 var _ stack.LinkEndpoint = (*Endpoint)(nil)
135 var _ stack.GSOEndpoint = (*Endpoint)(nil)
136
137 // Endpoint is link layer endpoint that stores outbound packets in a channel
138 // and allows injection of inbound packets.
139 //
140 // +stateify savable
141 type Endpoint struct {
142 LinkEPCapabilities stack.LinkEndpointCapabilities
143 SupportedGSOKind stack.SupportedGSO
144
145 mu endpointRWMutex `state:"nosave"`
146 // +checklocks:mu
147 dispatcher stack.NetworkDispatcher
148 // +checklocks:mu
149 linkAddr tcpip.LinkAddress
150 // +checklocks:mu
151 mtu uint32
152
153 // Outbound packet queue.
154 q *queue
155 }
156
157 // New creates a new channel endpoint.
158 func New(size int, mtu uint32, linkAddr tcpip.LinkAddress) *Endpoint {
159 return &Endpoint{
160 q: &queue{
161 c: make(chan *stack.PacketBuffer, size),
162 },
163 mtu: mtu,
164 linkAddr: linkAddr,
165 }
166 }
167
168 // Close closes e. Further packet injections will return an error, and all pending
169 // packets are discarded. Close may be called concurrently with WritePackets.
170 func (e *Endpoint) Close() {
171 e.q.Close()
172 e.Drain()
173 }
174
175 // Read does non-blocking read one packet from the outbound packet queue.
176 func (e *Endpoint) Read() *stack.PacketBuffer {
177 return e.q.Read()
178 }
179
180 // ReadContext does blocking read for one packet from the outbound packet queue.
181 // It can be cancelled by ctx, and in this case, it returns nil.
182 func (e *Endpoint) ReadContext(ctx context.Context) *stack.PacketBuffer {
183 return e.q.ReadContext(ctx)
184 }
185
186 // Drain removes all outbound packets from the channel and counts them.
187 func (e *Endpoint) Drain() int {
188 c := 0
189 for pkt := e.Read(); pkt != nil; pkt = e.Read() {
190 pkt.DecRef()
191 c++
192 }
193 return c
194 }
195
196 // NumQueued returns the number of packet queued for outbound.
197 func (e *Endpoint) NumQueued() int {
198 return e.q.Num()
199 }
200
201 // InjectInbound injects an inbound packet. If the endpoint is not attached, the
202 // packet is not delivered.
203 func (e *Endpoint) InjectInbound(protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
204 e.mu.RLock()
205 d := e.dispatcher
206 e.mu.RUnlock()
207 if d != nil {
208 d.DeliverNetworkPacket(protocol, pkt)
209 }
210 }
211
212 // Attach saves the stack network-layer dispatcher for use later when packets
213 // are injected.
214 func (e *Endpoint) Attach(dispatcher stack.NetworkDispatcher) {
215 e.mu.Lock()
216 defer e.mu.Unlock()
217 e.dispatcher = dispatcher
218 }
219
220 // IsAttached implements stack.LinkEndpoint.IsAttached.
221 func (e *Endpoint) IsAttached() bool {
222 e.mu.RLock()
223 defer e.mu.RUnlock()
224 return e.dispatcher != nil
225 }
226
227 // MTU implements stack.LinkEndpoint.MTU.
228 func (e *Endpoint) MTU() uint32 {
229 e.mu.RLock()
230 defer e.mu.RUnlock()
231 return e.mtu
232 }
233
234 // SetMTU implements stack.LinkEndpoint.SetMTU.
235 func (e *Endpoint) SetMTU(mtu uint32) {
236 e.mu.Lock()
237 defer e.mu.Unlock()
238 e.mtu = mtu
239 }
240
241 // Capabilities implements stack.LinkEndpoint.Capabilities.
242 func (e *Endpoint) Capabilities() stack.LinkEndpointCapabilities {
243 return e.LinkEPCapabilities
244 }
245
246 // GSOMaxSize implements stack.GSOEndpoint.
247 func (*Endpoint) GSOMaxSize() uint32 {
248 return 1 << 15
249 }
250
251 // SupportedGSO implements stack.GSOEndpoint.
252 func (e *Endpoint) SupportedGSO() stack.SupportedGSO {
253 return e.SupportedGSOKind
254 }
255
256 // MaxHeaderLength returns the maximum size of the link layer header. Given it
257 // doesn't have a header, it just returns 0.
258 func (*Endpoint) MaxHeaderLength() uint16 {
259 return 0
260 }
261
262 // LinkAddress returns the link address of this endpoint.
263 func (e *Endpoint) LinkAddress() tcpip.LinkAddress {
264 e.mu.RLock()
265 defer e.mu.RUnlock()
266 return e.linkAddr
267 }
268
269 // SetLinkAddress implements stack.LinkEndpoint.SetLinkAddress.
270 func (e *Endpoint) SetLinkAddress(addr tcpip.LinkAddress) {
271 e.mu.Lock()
272 defer e.mu.Unlock()
273 e.linkAddr = addr
274 }
275
276 // WritePackets stores outbound packets into the channel.
277 // Multiple concurrent calls are permitted.
278 func (e *Endpoint) WritePackets(pkts stack.PacketBufferList) (int, tcpip.Error) {
279 n := 0
280 for _, pkt := range pkts.AsSlice() {
281 if err := e.q.Write(pkt); err != nil {
282 if _, ok := err.(*tcpip.ErrNoBufferSpace); !ok && n == 0 {
283 return 0, err
284 }
285 break
286 }
287 n++
288 }
289
290 return n, nil
291 }
292
293 // Wait implements stack.LinkEndpoint.Wait.
294 func (*Endpoint) Wait() {}
295
296 // AddNotify adds a notification target for receiving event about outgoing
297 // packets.
298 func (e *Endpoint) AddNotify(notify Notification) *NotificationHandle {
299 return e.q.AddNotify(notify)
300 }
301
302 // RemoveNotify removes handle from the list of notification targets.
303 func (e *Endpoint) RemoveNotify(handle *NotificationHandle) {
304 e.q.RemoveNotify(handle)
305 }
306
307 // ARPHardwareType implements stack.LinkEndpoint.ARPHardwareType.
308 func (*Endpoint) ARPHardwareType() header.ARPHardwareType {
309 return header.ARPHardwareNone
310 }
311
312 // AddHeader implements stack.LinkEndpoint.AddHeader.
313 func (*Endpoint) AddHeader(*stack.PacketBuffer) {}
314
315 // ParseHeader implements stack.LinkEndpoint.ParseHeader.
316 func (*Endpoint) ParseHeader(*stack.PacketBuffer) bool { return true }
317
318 // SetOnCloseAction implements stack.LinkEndpoint.
319 func (*Endpoint) SetOnCloseAction(func()) {}
320