splice_linux.mx raw

   1  // Copyright 2018 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package poll
   6  
   7  import (
   8  	"internal/syscall/unix"
   9  	"runtime"
  10  	"sync"
  11  	"syscall"
  12  )
  13  
  14  const (
  15  	// spliceNonblock doesn't make the splice itself necessarily nonblocking
  16  	// (because the actual file descriptors that are spliced from/to may block
  17  	// unless they have the O_NONBLOCK flag set), but it makes the splice pipe
  18  	// operations nonblocking.
  19  	spliceNonblock = 0x2
  20  
  21  	// maxSpliceSize is the maximum amount of data Splice asks
  22  	// the kernel to move in a single call to splice(2).
  23  	// We use 1MB as Splice writes data through a pipe, and 1MB is the default maximum pipe buffer size,
  24  	// which is determined by /proc/sys/fs/pipe-max-size.
  25  	maxSpliceSize = 1 << 20
  26  )
  27  
  28  // Splice transfers at most remain bytes of data from src to dst, using the
  29  // splice system call to minimize copies of data from and to userspace.
  30  //
  31  // Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer.
  32  // src and dst must both be stream-oriented sockets.
  33  func Splice(dst, src *FD, remain int64) (written int64, handled bool, err error) {
  34  	p, err := getPipe()
  35  	if err != nil {
  36  		return 0, false, err
  37  	}
  38  	defer putPipe(p)
  39  	var inPipe, n int
  40  	for err == nil && remain > 0 {
  41  		max := maxSpliceSize
  42  		if int64(max) > remain {
  43  			max = int(remain)
  44  		}
  45  		inPipe, err = spliceDrain(p.wfd, src, max)
  46  		// The operation is considered handled if splice returns no
  47  		// error, or an error other than EINVAL. An EINVAL means the
  48  		// kernel does not support splice for the socket type of src.
  49  		// The failed syscall does not consume any data so it is safe
  50  		// to fall back to a generic copy.
  51  		//
  52  		// spliceDrain should never return EAGAIN, so if err != nil,
  53  		// Splice cannot continue.
  54  		//
  55  		// If inPipe == 0 && err == nil, src is at EOF, and the
  56  		// transfer is complete.
  57  		handled = handled || (err != syscall.EINVAL)
  58  		if err != nil || inPipe == 0 {
  59  			break
  60  		}
  61  		p.data += inPipe
  62  
  63  		n, err = splicePump(dst, p.rfd, inPipe)
  64  		if n > 0 {
  65  			written += int64(n)
  66  			remain -= int64(n)
  67  			p.data -= n
  68  		}
  69  	}
  70  	if err != nil {
  71  		return written, handled, err
  72  	}
  73  	return written, true, nil
  74  }
  75  
  76  // spliceDrain moves data from a socket to a pipe.
  77  //
  78  // Invariant: when entering spliceDrain, the pipe is empty. It is either in its
  79  // initial state, or splicePump has emptied it previously.
  80  //
  81  // Given this, spliceDrain can reasonably assume that the pipe is ready for
  82  // writing, so if splice returns EAGAIN, it must be because the socket is not
  83  // ready for reading.
  84  //
  85  // If spliceDrain returns (0, nil), src is at EOF.
  86  func spliceDrain(pipefd int, sock *FD, max int) (int, error) {
  87  	if err := sock.readLock(); err != nil {
  88  		return 0, err
  89  	}
  90  	defer sock.readUnlock()
  91  	if err := sock.pd.prepareRead(sock.isFile); err != nil {
  92  		return 0, err
  93  	}
  94  	for {
  95  		// In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here,
  96  		// because it could return EAGAIN ceaselessly when the write end of the pipe is full,
  97  		// but this shouldn't be a concern here, since the pipe buffer must be sufficient for
  98  		// this data transmission on the basis of the workflow in Splice.
  99  		n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock)
 100  		if err == syscall.EINTR {
 101  			continue
 102  		}
 103  		if err != syscall.EAGAIN {
 104  			return n, err
 105  		}
 106  		if sock.pd.pollable() {
 107  			if err := sock.pd.waitRead(sock.isFile); err != nil {
 108  				return n, err
 109  			}
 110  		}
 111  	}
 112  }
 113  
 114  // splicePump moves all the buffered data from a pipe to a socket.
 115  //
 116  // Invariant: when entering splicePump, there are exactly inPipe
 117  // bytes of data in the pipe, from a previous call to spliceDrain.
 118  //
 119  // By analogy to the condition from spliceDrain, splicePump
 120  // only needs to poll the socket for readiness, if splice returns
 121  // EAGAIN.
 122  //
 123  // If splicePump cannot move all the data in a single call to
 124  // splice(2), it loops over the buffered data until it has written
 125  // all of it to the socket. This behavior is similar to the Write
 126  // step of an io.Copy in userspace.
 127  func splicePump(sock *FD, pipefd int, inPipe int) (int, error) {
 128  	if err := sock.writeLock(); err != nil {
 129  		return 0, err
 130  	}
 131  	defer sock.writeUnlock()
 132  	if err := sock.pd.prepareWrite(sock.isFile); err != nil {
 133  		return 0, err
 134  	}
 135  	written := 0
 136  	for inPipe > 0 {
 137  		// In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here,
 138  		// because it could return EAGAIN ceaselessly when the read end of the pipe is empty,
 139  		// but this shouldn't be a concern here, since the pipe buffer must contain inPipe size of
 140  		// data on the basis of the workflow in Splice.
 141  		n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock)
 142  		if err == syscall.EINTR {
 143  			continue
 144  		}
 145  		// Here, the condition n == 0 && err == nil should never be
 146  		// observed, since Splice controls the write side of the pipe.
 147  		if n > 0 {
 148  			inPipe -= n
 149  			written += n
 150  			continue
 151  		}
 152  		if err != syscall.EAGAIN {
 153  			return written, err
 154  		}
 155  		if sock.pd.pollable() {
 156  			if err := sock.pd.waitWrite(sock.isFile); err != nil {
 157  				return written, err
 158  			}
 159  		}
 160  	}
 161  	return written, nil
 162  }
 163  
 164  // splice wraps the splice system call. Since the current implementation
 165  // only uses splice on sockets and pipes, the offset arguments are unused.
 166  // splice returns int instead of int64, because callers never ask it to
 167  // move more data in a single call than can fit in an int32.
 168  func splice(out int, in int, max int, flags int) (int, error) {
 169  	n, err := syscall.Splice(in, nil, out, nil, max, flags)
 170  	return int(n), err
 171  }
 172  
 173  type splicePipeFields struct {
 174  	rfd  int
 175  	wfd  int
 176  	data int
 177  }
 178  
 179  type splicePipe struct {
 180  	splicePipeFields
 181  	cleanup runtime.Cleanup
 182  }
 183  
 184  // splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers.
 185  // The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up
 186  // a finalizer for each pipe to close its file descriptors before the actual GC.
 187  var splicePipePool = sync.Pool{New: newPoolPipe}
 188  
 189  func newPoolPipe() any {
 190  	// Discard the error which occurred during the creation of pipe buffer,
 191  	// redirecting the data transmission to the conventional way utilizing read() + write() as a fallback.
 192  	p := newPipe()
 193  	if p == nil {
 194  		return nil
 195  	}
 196  
 197  	p.cleanup = runtime.AddCleanup(p, func(spf splicePipeFields) {
 198  		destroyPipe(&splicePipe{splicePipeFields: spf})
 199  	}, p.splicePipeFields)
 200  	return p
 201  }
 202  
 203  // getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache.
 204  func getPipe() (*splicePipe, error) {
 205  	v := splicePipePool.Get()
 206  	if v == nil {
 207  		return nil, syscall.EINVAL
 208  	}
 209  	return v.(*splicePipe), nil
 210  }
 211  
 212  func putPipe(p *splicePipe) {
 213  	// If there is still data left in the pipe,
 214  	// then close and discard it instead of putting it back into the pool.
 215  	if p.data != 0 {
 216  		p.cleanup.Stop()
 217  		destroyPipe(p)
 218  		return
 219  	}
 220  	splicePipePool.Put(p)
 221  }
 222  
 223  // newPipe sets up a pipe for a splice operation.
 224  func newPipe() *splicePipe {
 225  	var fds [2]int
 226  	if err := syscall.Pipe2(fds[:], syscall.O_CLOEXEC|syscall.O_NONBLOCK); err != nil {
 227  		return nil
 228  	}
 229  
 230  	// Splice will loop writing maxSpliceSize bytes from the source to the pipe,
 231  	// and then write those bytes from the pipe to the destination.
 232  	// Set the pipe buffer size to maxSpliceSize to optimize that.
 233  	// Ignore errors here, as a smaller buffer size will work,
 234  	// although it will require more system calls.
 235  	unix.Fcntl(fds[0], syscall.F_SETPIPE_SZ, maxSpliceSize)
 236  
 237  	return &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}}
 238  }
 239  
 240  // destroyPipe destroys a pipe.
 241  func destroyPipe(p *splicePipe) {
 242  	CloseFunc(p.rfd)
 243  	CloseFunc(p.wfd)
 244  }
 245