channel.go raw

   1  // Copyright 2018 The gVisor Authors.
   2  //
   3  // Licensed under the Apache License, Version 2.0 (the "License");
   4  // you may not use this file except in compliance with the License.
   5  // You may obtain a copy of the License at
   6  //
   7  //     http://www.apache.org/licenses/LICENSE-2.0
   8  //
   9  // Unless required by applicable law or agreed to in writing, software
  10  // distributed under the License is distributed on an "AS IS" BASIS,
  11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12  // See the License for the specific language governing permissions and
  13  // limitations under the License.
  14  
  15  // Package channel provides the implementation of channel-based data-link layer
  16  // endpoints. Such endpoints allow injection of inbound packets and store
  17  // outbound packets in a channel.
  18  package channel
  19  
  20  import (
  21  	"context"
  22  
  23  	"gvisor.dev/gvisor/pkg/tcpip"
  24  	"gvisor.dev/gvisor/pkg/tcpip/header"
  25  	"gvisor.dev/gvisor/pkg/tcpip/stack"
  26  )
  27  
  28  // Notification is the interface for receiving notification from the packet
  29  // queue.
  30  type Notification interface {
  31  	// WriteNotify will be called when a write happens to the queue.
  32  	WriteNotify()
  33  }
  34  
  35  // NotificationHandle is an opaque handle to the registered notification target.
  36  // It can be used to unregister the notification when no longer interested.
  37  //
  38  // +stateify savable
  39  type NotificationHandle struct {
  40  	n Notification
  41  }
  42  
  43  type queue struct {
  44  	// c is the outbound packet channel.
  45  	c  chan *stack.PacketBuffer
  46  	mu queueRWMutex
  47  	// +checklocks:mu
  48  	notify []*NotificationHandle
  49  	// +checklocks:mu
  50  	closed bool
  51  }
  52  
  53  func (q *queue) Close() {
  54  	q.mu.Lock()
  55  	defer q.mu.Unlock()
  56  	if !q.closed {
  57  		close(q.c)
  58  	}
  59  	q.closed = true
  60  }
  61  
  62  func (q *queue) Read() *stack.PacketBuffer {
  63  	select {
  64  	case p := <-q.c:
  65  		return p
  66  	default:
  67  		return nil
  68  	}
  69  }
  70  
  71  func (q *queue) ReadContext(ctx context.Context) *stack.PacketBuffer {
  72  	select {
  73  	case pkt := <-q.c:
  74  		return pkt
  75  	case <-ctx.Done():
  76  		return nil
  77  	}
  78  }
  79  
  80  func (q *queue) Write(pkt *stack.PacketBuffer) tcpip.Error {
  81  	// q holds the PacketBuffer.
  82  	q.mu.RLock()
  83  	if q.closed {
  84  		q.mu.RUnlock()
  85  		return &tcpip.ErrClosedForSend{}
  86  	}
  87  
  88  	wrote := false
  89  	p := pkt.Clone()
  90  	select {
  91  	case q.c <- p:
  92  		wrote = true
  93  	default:
  94  		p.DecRef()
  95  	}
  96  	notify := q.notify
  97  	q.mu.RUnlock()
  98  
  99  	if wrote {
 100  		// Send notification outside of lock.
 101  		for _, h := range notify {
 102  			h.n.WriteNotify()
 103  		}
 104  		return nil
 105  	}
 106  	return &tcpip.ErrNoBufferSpace{}
 107  }
 108  
 109  func (q *queue) Num() int {
 110  	return len(q.c)
 111  }
 112  
 113  func (q *queue) AddNotify(notify Notification) *NotificationHandle {
 114  	q.mu.Lock()
 115  	defer q.mu.Unlock()
 116  	h := &NotificationHandle{n: notify}
 117  	q.notify = append(q.notify, h)
 118  	return h
 119  }
 120  
 121  func (q *queue) RemoveNotify(handle *NotificationHandle) {
 122  	q.mu.Lock()
 123  	defer q.mu.Unlock()
 124  	// Make a copy, since we reads the array outside of lock when notifying.
 125  	notify := make([]*NotificationHandle, 0, len(q.notify))
 126  	for _, h := range q.notify {
 127  		if h != handle {
 128  			notify = append(notify, h)
 129  		}
 130  	}
 131  	q.notify = notify
 132  }
 133  
 134  var _ stack.LinkEndpoint = (*Endpoint)(nil)
 135  var _ stack.GSOEndpoint = (*Endpoint)(nil)
 136  
 137  // Endpoint is link layer endpoint that stores outbound packets in a channel
 138  // and allows injection of inbound packets.
 139  //
 140  // +stateify savable
 141  type Endpoint struct {
 142  	LinkEPCapabilities stack.LinkEndpointCapabilities
 143  	SupportedGSOKind   stack.SupportedGSO
 144  
 145  	mu endpointRWMutex `state:"nosave"`
 146  	// +checklocks:mu
 147  	dispatcher stack.NetworkDispatcher
 148  	// +checklocks:mu
 149  	linkAddr tcpip.LinkAddress
 150  	// +checklocks:mu
 151  	mtu uint32
 152  
 153  	// Outbound packet queue.
 154  	q *queue
 155  }
 156  
 157  // New creates a new channel endpoint.
 158  func New(size int, mtu uint32, linkAddr tcpip.LinkAddress) *Endpoint {
 159  	return &Endpoint{
 160  		q: &queue{
 161  			c: make(chan *stack.PacketBuffer, size),
 162  		},
 163  		mtu:      mtu,
 164  		linkAddr: linkAddr,
 165  	}
 166  }
 167  
 168  // Close closes e. Further packet injections will return an error, and all pending
 169  // packets are discarded. Close may be called concurrently with WritePackets.
 170  func (e *Endpoint) Close() {
 171  	e.q.Close()
 172  	e.Drain()
 173  }
 174  
 175  // Read does non-blocking read one packet from the outbound packet queue.
 176  func (e *Endpoint) Read() *stack.PacketBuffer {
 177  	return e.q.Read()
 178  }
 179  
 180  // ReadContext does blocking read for one packet from the outbound packet queue.
 181  // It can be cancelled by ctx, and in this case, it returns nil.
 182  func (e *Endpoint) ReadContext(ctx context.Context) *stack.PacketBuffer {
 183  	return e.q.ReadContext(ctx)
 184  }
 185  
 186  // Drain removes all outbound packets from the channel and counts them.
 187  func (e *Endpoint) Drain() int {
 188  	c := 0
 189  	for pkt := e.Read(); pkt != nil; pkt = e.Read() {
 190  		pkt.DecRef()
 191  		c++
 192  	}
 193  	return c
 194  }
 195  
 196  // NumQueued returns the number of packet queued for outbound.
 197  func (e *Endpoint) NumQueued() int {
 198  	return e.q.Num()
 199  }
 200  
 201  // InjectInbound injects an inbound packet. If the endpoint is not attached, the
 202  // packet is not delivered.
 203  func (e *Endpoint) InjectInbound(protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
 204  	e.mu.RLock()
 205  	d := e.dispatcher
 206  	e.mu.RUnlock()
 207  	if d != nil {
 208  		d.DeliverNetworkPacket(protocol, pkt)
 209  	}
 210  }
 211  
 212  // Attach saves the stack network-layer dispatcher for use later when packets
 213  // are injected.
 214  func (e *Endpoint) Attach(dispatcher stack.NetworkDispatcher) {
 215  	e.mu.Lock()
 216  	defer e.mu.Unlock()
 217  	e.dispatcher = dispatcher
 218  }
 219  
 220  // IsAttached implements stack.LinkEndpoint.IsAttached.
 221  func (e *Endpoint) IsAttached() bool {
 222  	e.mu.RLock()
 223  	defer e.mu.RUnlock()
 224  	return e.dispatcher != nil
 225  }
 226  
 227  // MTU implements stack.LinkEndpoint.MTU.
 228  func (e *Endpoint) MTU() uint32 {
 229  	e.mu.RLock()
 230  	defer e.mu.RUnlock()
 231  	return e.mtu
 232  }
 233  
 234  // SetMTU implements stack.LinkEndpoint.SetMTU.
 235  func (e *Endpoint) SetMTU(mtu uint32) {
 236  	e.mu.Lock()
 237  	defer e.mu.Unlock()
 238  	e.mtu = mtu
 239  }
 240  
 241  // Capabilities implements stack.LinkEndpoint.Capabilities.
 242  func (e *Endpoint) Capabilities() stack.LinkEndpointCapabilities {
 243  	return e.LinkEPCapabilities
 244  }
 245  
 246  // GSOMaxSize implements stack.GSOEndpoint.
 247  func (*Endpoint) GSOMaxSize() uint32 {
 248  	return 1 << 15
 249  }
 250  
 251  // SupportedGSO implements stack.GSOEndpoint.
 252  func (e *Endpoint) SupportedGSO() stack.SupportedGSO {
 253  	return e.SupportedGSOKind
 254  }
 255  
 256  // MaxHeaderLength returns the maximum size of the link layer header. Given it
 257  // doesn't have a header, it just returns 0.
 258  func (*Endpoint) MaxHeaderLength() uint16 {
 259  	return 0
 260  }
 261  
 262  // LinkAddress returns the link address of this endpoint.
 263  func (e *Endpoint) LinkAddress() tcpip.LinkAddress {
 264  	e.mu.RLock()
 265  	defer e.mu.RUnlock()
 266  	return e.linkAddr
 267  }
 268  
 269  // SetLinkAddress implements stack.LinkEndpoint.SetLinkAddress.
 270  func (e *Endpoint) SetLinkAddress(addr tcpip.LinkAddress) {
 271  	e.mu.Lock()
 272  	defer e.mu.Unlock()
 273  	e.linkAddr = addr
 274  }
 275  
 276  // WritePackets stores outbound packets into the channel.
 277  // Multiple concurrent calls are permitted.
 278  func (e *Endpoint) WritePackets(pkts stack.PacketBufferList) (int, tcpip.Error) {
 279  	n := 0
 280  	for _, pkt := range pkts.AsSlice() {
 281  		if err := e.q.Write(pkt); err != nil {
 282  			if _, ok := err.(*tcpip.ErrNoBufferSpace); !ok && n == 0 {
 283  				return 0, err
 284  			}
 285  			break
 286  		}
 287  		n++
 288  	}
 289  
 290  	return n, nil
 291  }
 292  
 293  // Wait implements stack.LinkEndpoint.Wait.
 294  func (*Endpoint) Wait() {}
 295  
 296  // AddNotify adds a notification target for receiving event about outgoing
 297  // packets.
 298  func (e *Endpoint) AddNotify(notify Notification) *NotificationHandle {
 299  	return e.q.AddNotify(notify)
 300  }
 301  
 302  // RemoveNotify removes handle from the list of notification targets.
 303  func (e *Endpoint) RemoveNotify(handle *NotificationHandle) {
 304  	e.q.RemoveNotify(handle)
 305  }
 306  
 307  // ARPHardwareType implements stack.LinkEndpoint.ARPHardwareType.
 308  func (*Endpoint) ARPHardwareType() header.ARPHardwareType {
 309  	return header.ARPHardwareNone
 310  }
 311  
 312  // AddHeader implements stack.LinkEndpoint.AddHeader.
 313  func (*Endpoint) AddHeader(*stack.PacketBuffer) {}
 314  
 315  // ParseHeader implements stack.LinkEndpoint.ParseHeader.
 316  func (*Endpoint) ParseHeader(*stack.PacketBuffer) bool { return true }
 317  
 318  // SetOnCloseAction implements stack.LinkEndpoint.
 319  func (*Endpoint) SetOnCloseAction(func()) {}
 320