pending_packets.go raw

   1  // Copyright 2020 The gVisor Authors.
   2  //
   3  // Licensed under the Apache License, Version 2.0 (the "License");
   4  // you may not use this file except in compliance with the License.
   5  // You may obtain a copy of the License at
   6  //
   7  //     http://www.apache.org/licenses/LICENSE-2.0
   8  //
   9  // Unless required by applicable law or agreed to in writing, software
  10  // distributed under the License is distributed on an "AS IS" BASIS,
  11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12  // See the License for the specific language governing permissions and
  13  // limitations under the License.
  14  
  15  package stack
  16  
  17  import (
  18  	"fmt"
  19  
  20  	"gvisor.dev/gvisor/pkg/tcpip"
  21  )
  22  
  23  const (
  24  	// maxPendingResolutions is the maximum number of pending link-address
  25  	// resolutions.
  26  	maxPendingResolutions          = 64
  27  	maxPendingPacketsPerResolution = 256
  28  )
  29  
  30  // +stateify savable
  31  type pendingPacket struct {
  32  	routeInfo RouteInfo
  33  	pkt       *PacketBuffer
  34  }
  35  
  36  type packetsPendingLinkResolutionMu struct {
  37  	packetsPendingLinkResolutionMutex
  38  
  39  	// The packets to send once the resolver completes.
  40  	//
  41  	// The link resolution channel is used as the key for this map.
  42  	packets map[<-chan struct{}][]pendingPacket
  43  
  44  	// FIFO of channels used to cancel the oldest goroutine waiting for
  45  	// link-address resolution.
  46  	//
  47  	// cancelChans holds the same channels that are used as keys to packets.
  48  	cancelChans []<-chan struct{}
  49  }
  50  
  51  // packetsPendingLinkResolution is a queue of packets pending link resolution.
  52  //
  53  // Once link resolution completes successfully, the packets will be written.
  54  //
  55  // +stateify savable
  56  type packetsPendingLinkResolution struct {
  57  	nic *nic
  58  	mu  packetsPendingLinkResolutionMu `state:"nosave"`
  59  }
  60  
  61  func (f *packetsPendingLinkResolution) incrementOutgoingPacketErrors(pkt *PacketBuffer) {
  62  	f.nic.stack.stats.IP.OutgoingPacketErrors.Increment()
  63  
  64  	if ipEndpointStats, ok := f.nic.getNetworkEndpoint(pkt.NetworkProtocolNumber).Stats().(IPNetworkEndpointStats); ok {
  65  		ipEndpointStats.IPStats().OutgoingPacketErrors.Increment()
  66  	}
  67  }
  68  
  69  func (f *packetsPendingLinkResolution) init(nic *nic) {
  70  	f.mu.Lock()
  71  	defer f.mu.Unlock()
  72  	f.nic = nic
  73  	f.mu.packets = make(map[<-chan struct{}][]pendingPacket)
  74  }
  75  
  76  // cancel drains all pending packet queues and release all packet
  77  // references.
  78  func (f *packetsPendingLinkResolution) cancel() {
  79  	f.mu.Lock()
  80  	defer f.mu.Unlock()
  81  	for ch, pendingPackets := range f.mu.packets {
  82  		for _, p := range pendingPackets {
  83  			p.pkt.DecRef()
  84  		}
  85  		delete(f.mu.packets, ch)
  86  	}
  87  	f.mu.cancelChans = nil
  88  }
  89  
  90  // dequeue any pending packets associated with ch.
  91  //
  92  // If err is nil, packets will be written and sent to the given remote link
  93  // address.
  94  func (f *packetsPendingLinkResolution) dequeue(ch <-chan struct{}, linkAddr tcpip.LinkAddress, err tcpip.Error) {
  95  	f.mu.Lock()
  96  	packets, ok := f.mu.packets[ch]
  97  	delete(f.mu.packets, ch)
  98  
  99  	if ok {
 100  		for i, cancelChan := range f.mu.cancelChans {
 101  			if cancelChan == ch {
 102  				f.mu.cancelChans = append(f.mu.cancelChans[:i], f.mu.cancelChans[i+1:]...)
 103  				break
 104  			}
 105  		}
 106  	}
 107  
 108  	f.mu.Unlock()
 109  
 110  	if ok {
 111  		f.dequeuePackets(packets, linkAddr, err)
 112  	}
 113  }
 114  
 115  // enqueue a packet to be sent once link resolution completes.
 116  //
 117  // If the maximum number of pending resolutions is reached, the packets
 118  // associated with the oldest link resolution will be dequeued as if they failed
 119  // link resolution.
 120  func (f *packetsPendingLinkResolution) enqueue(r *Route, pkt *PacketBuffer) tcpip.Error {
 121  	f.mu.Lock()
 122  	// Make sure we attempt resolution while holding f's lock so that we avoid
 123  	// a race where link resolution completes before we enqueue the packets.
 124  	//
 125  	//   A @ T1: Call ResolvedFields (get link resolution channel)
 126  	//   B @ T2: Complete link resolution, dequeue pending packets
 127  	//   C @ T1: Enqueue packet that already completed link resolution (which will
 128  	//       never dequeue)
 129  	//
 130  	// To make sure B does not interleave with A and C, we make sure A and C are
 131  	// done while holding the lock.
 132  	routeInfo, ch, err := r.resolvedFields(nil)
 133  	switch err.(type) {
 134  	case nil:
 135  		// The route resolved immediately, so we don't need to wait for link
 136  		// resolution to send the packet.
 137  		f.mu.Unlock()
 138  		pkt.EgressRoute = routeInfo
 139  		return f.nic.writePacket(pkt)
 140  	case *tcpip.ErrWouldBlock:
 141  		// We need to wait for link resolution to complete.
 142  	default:
 143  		f.mu.Unlock()
 144  		return err
 145  	}
 146  
 147  	defer f.mu.Unlock()
 148  
 149  	packets, ok := f.mu.packets[ch]
 150  	packets = append(packets, pendingPacket{
 151  		routeInfo: routeInfo,
 152  		pkt:       pkt.Clone(),
 153  	})
 154  
 155  	if len(packets) > maxPendingPacketsPerResolution {
 156  		f.incrementOutgoingPacketErrors(packets[0].pkt)
 157  		packets[0].pkt.DecRef()
 158  		packets[0] = pendingPacket{}
 159  		packets = packets[1:]
 160  
 161  		if numPackets := len(packets); numPackets != maxPendingPacketsPerResolution {
 162  			panic(fmt.Sprintf("holding more queued packets than expected; got = %d, want <= %d", numPackets, maxPendingPacketsPerResolution))
 163  		}
 164  	}
 165  
 166  	f.mu.packets[ch] = packets
 167  
 168  	if ok {
 169  		return nil
 170  	}
 171  
 172  	cancelledPackets := f.newCancelChannelLocked(ch)
 173  
 174  	if len(cancelledPackets) != 0 {
 175  		// Dequeue the pending packets in a new goroutine to not hold up the current
 176  		// goroutine as handing link resolution failures may be a costly operation.
 177  		go f.dequeuePackets(cancelledPackets, "" /* linkAddr */, &tcpip.ErrAborted{})
 178  	}
 179  
 180  	return nil
 181  }
 182  
 183  // newCancelChannelLocked appends the link resolution channel to a FIFO. If the
 184  // maximum number of pending resolutions is reached, the oldest channel will be
 185  // removed and its associated pending packets will be returned.
 186  func (f *packetsPendingLinkResolution) newCancelChannelLocked(newCH <-chan struct{}) []pendingPacket {
 187  	f.mu.cancelChans = append(f.mu.cancelChans, newCH)
 188  	if len(f.mu.cancelChans) <= maxPendingResolutions {
 189  		return nil
 190  	}
 191  
 192  	ch := f.mu.cancelChans[0]
 193  	f.mu.cancelChans[0] = nil
 194  	f.mu.cancelChans = f.mu.cancelChans[1:]
 195  	if l := len(f.mu.cancelChans); l > maxPendingResolutions {
 196  		panic(fmt.Sprintf("max pending resolutions reached; got %d active resolutions, max = %d", l, maxPendingResolutions))
 197  	}
 198  
 199  	packets, ok := f.mu.packets[ch]
 200  	if !ok {
 201  		panic("must have a packet queue for an uncancelled channel")
 202  	}
 203  	delete(f.mu.packets, ch)
 204  
 205  	return packets
 206  }
 207  
 208  func (f *packetsPendingLinkResolution) dequeuePackets(packets []pendingPacket, linkAddr tcpip.LinkAddress, err tcpip.Error) {
 209  	for _, p := range packets {
 210  		if err == nil {
 211  			p.routeInfo.RemoteLinkAddress = linkAddr
 212  			p.pkt.EgressRoute = p.routeInfo
 213  			_ = f.nic.writePacket(p.pkt)
 214  		} else {
 215  			f.incrementOutgoingPacketErrors(p.pkt)
 216  
 217  			if linkResolvableEP, ok := f.nic.getNetworkEndpoint(p.pkt.NetworkProtocolNumber).(LinkResolvableNetworkEndpoint); ok {
 218  				linkResolvableEP.HandleLinkResolutionFailure(p.pkt)
 219  			}
 220  		}
 221  		p.pkt.DecRef()
 222  	}
 223  }
 224