1 // Copyright 2018 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 5 package poll
6 7 import (
8 "internal/syscall/unix"
9 "runtime"
10 "sync"
11 "syscall"
12 )
13 14 const (
15 // spliceNonblock doesn't make the splice itself necessarily nonblocking
16 // (because the actual file descriptors that are spliced from/to may block
17 // unless they have the O_NONBLOCK flag set), but it makes the splice pipe
18 // operations nonblocking.
19 spliceNonblock = 0x2
20 21 // maxSpliceSize is the maximum amount of data Splice asks
22 // the kernel to move in a single call to splice(2).
23 // We use 1MB as Splice writes data through a pipe, and 1MB is the default maximum pipe buffer size,
24 // which is determined by /proc/sys/fs/pipe-max-size.
25 maxSpliceSize = 1 << 20
26 )
27 28 // Splice transfers at most remain bytes of data from src to dst, using the
29 // splice system call to minimize copies of data from and to userspace.
30 //
31 // Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer.
32 // src and dst must both be stream-oriented sockets.
33 func Splice(dst, src *FD, remain int64) (written int64, handled bool, err error) {
34 p, err := getPipe()
35 if err != nil {
36 return 0, false, err
37 }
38 defer putPipe(p)
39 var inPipe, n int
40 for err == nil && remain > 0 {
41 max := maxSpliceSize
42 if int64(max) > remain {
43 max = int(remain)
44 }
45 inPipe, err = spliceDrain(p.wfd, src, max)
46 // The operation is considered handled if splice returns no
47 // error, or an error other than EINVAL. An EINVAL means the
48 // kernel does not support splice for the socket type of src.
49 // The failed syscall does not consume any data so it is safe
50 // to fall back to a generic copy.
51 //
52 // spliceDrain should never return EAGAIN, so if err != nil,
53 // Splice cannot continue.
54 //
55 // If inPipe == 0 && err == nil, src is at EOF, and the
56 // transfer is complete.
57 handled = handled || (err != syscall.EINVAL)
58 if err != nil || inPipe == 0 {
59 break
60 }
61 p.data += inPipe
62 63 n, err = splicePump(dst, p.rfd, inPipe)
64 if n > 0 {
65 written += int64(n)
66 remain -= int64(n)
67 p.data -= n
68 }
69 }
70 if err != nil {
71 return written, handled, err
72 }
73 return written, true, nil
74 }
75 76 // spliceDrain moves data from a socket to a pipe.
77 //
78 // Invariant: when entering spliceDrain, the pipe is empty. It is either in its
79 // initial state, or splicePump has emptied it previously.
80 //
81 // Given this, spliceDrain can reasonably assume that the pipe is ready for
82 // writing, so if splice returns EAGAIN, it must be because the socket is not
83 // ready for reading.
84 //
85 // If spliceDrain returns (0, nil), src is at EOF.
86 func spliceDrain(pipefd int, sock *FD, max int) (int, error) {
87 if err := sock.readLock(); err != nil {
88 return 0, err
89 }
90 defer sock.readUnlock()
91 if err := sock.pd.prepareRead(sock.isFile); err != nil {
92 return 0, err
93 }
94 for {
95 // In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here,
96 // because it could return EAGAIN ceaselessly when the write end of the pipe is full,
97 // but this shouldn't be a concern here, since the pipe buffer must be sufficient for
98 // this data transmission on the basis of the workflow in Splice.
99 n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock)
100 if err == syscall.EINTR {
101 continue
102 }
103 if err != syscall.EAGAIN {
104 return n, err
105 }
106 if sock.pd.pollable() {
107 if err := sock.pd.waitRead(sock.isFile); err != nil {
108 return n, err
109 }
110 }
111 }
112 }
113 114 // splicePump moves all the buffered data from a pipe to a socket.
115 //
116 // Invariant: when entering splicePump, there are exactly inPipe
117 // bytes of data in the pipe, from a previous call to spliceDrain.
118 //
119 // By analogy to the condition from spliceDrain, splicePump
120 // only needs to poll the socket for readiness, if splice returns
121 // EAGAIN.
122 //
123 // If splicePump cannot move all the data in a single call to
124 // splice(2), it loops over the buffered data until it has written
125 // all of it to the socket. This behavior is similar to the Write
126 // step of an io.Copy in userspace.
127 func splicePump(sock *FD, pipefd int, inPipe int) (int, error) {
128 if err := sock.writeLock(); err != nil {
129 return 0, err
130 }
131 defer sock.writeUnlock()
132 if err := sock.pd.prepareWrite(sock.isFile); err != nil {
133 return 0, err
134 }
135 written := 0
136 for inPipe > 0 {
137 // In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here,
138 // because it could return EAGAIN ceaselessly when the read end of the pipe is empty,
139 // but this shouldn't be a concern here, since the pipe buffer must contain inPipe size of
140 // data on the basis of the workflow in Splice.
141 n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock)
142 if err == syscall.EINTR {
143 continue
144 }
145 // Here, the condition n == 0 && err == nil should never be
146 // observed, since Splice controls the write side of the pipe.
147 if n > 0 {
148 inPipe -= n
149 written += n
150 continue
151 }
152 if err != syscall.EAGAIN {
153 return written, err
154 }
155 if sock.pd.pollable() {
156 if err := sock.pd.waitWrite(sock.isFile); err != nil {
157 return written, err
158 }
159 }
160 }
161 return written, nil
162 }
163 164 // splice wraps the splice system call. Since the current implementation
165 // only uses splice on sockets and pipes, the offset arguments are unused.
166 // splice returns int instead of int64, because callers never ask it to
167 // move more data in a single call than can fit in an int32.
168 func splice(out int, in int, max int, flags int) (int, error) {
169 n, err := syscall.Splice(in, nil, out, nil, max, flags)
170 return int(n), err
171 }
172 173 type splicePipeFields struct {
174 rfd int
175 wfd int
176 data int
177 }
178 179 type splicePipe struct {
180 splicePipeFields
181 cleanup runtime.Cleanup
182 }
183 184 // splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers.
185 // The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up
186 // a finalizer for each pipe to close its file descriptors before the actual GC.
187 var splicePipePool = sync.Pool{New: newPoolPipe}
188 189 func newPoolPipe() any {
190 // Discard the error which occurred during the creation of pipe buffer,
191 // redirecting the data transmission to the conventional way utilizing read() + write() as a fallback.
192 p := newPipe()
193 if p == nil {
194 return nil
195 }
196 197 p.cleanup = runtime.AddCleanup(p, func(spf splicePipeFields) {
198 destroyPipe(&splicePipe{splicePipeFields: spf})
199 }, p.splicePipeFields)
200 return p
201 }
202 203 // getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache.
204 func getPipe() (*splicePipe, error) {
205 v := splicePipePool.Get()
206 if v == nil {
207 return nil, syscall.EINVAL
208 }
209 return v.(*splicePipe), nil
210 }
211 212 func putPipe(p *splicePipe) {
213 // If there is still data left in the pipe,
214 // then close and discard it instead of putting it back into the pool.
215 if p.data != 0 {
216 p.cleanup.Stop()
217 destroyPipe(p)
218 return
219 }
220 splicePipePool.Put(p)
221 }
222 223 // newPipe sets up a pipe for a splice operation.
224 func newPipe() *splicePipe {
225 var fds [2]int
226 if err := syscall.Pipe2(fds[:], syscall.O_CLOEXEC|syscall.O_NONBLOCK); err != nil {
227 return nil
228 }
229 230 // Splice will loop writing maxSpliceSize bytes from the source to the pipe,
231 // and then write those bytes from the pipe to the destination.
232 // Set the pipe buffer size to maxSpliceSize to optimize that.
233 // Ignore errors here, as a smaller buffer size will work,
234 // although it will require more system calls.
235 unix.Fcntl(fds[0], syscall.F_SETPIPE_SZ, maxSpliceSize)
236 237 return &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}}
238 }
239 240 // destroyPipe destroys a pipe.
241 func destroyPipe(p *splicePipe) {
242 CloseFunc(p.rfd)
243 CloseFunc(p.wfd)
244 }
245