// Copyright 2018 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package poll import ( "internal/syscall/unix" "runtime" "sync" "syscall" "unsafe" ) const ( // spliceNonblock doesn't make the splice itself necessarily nonblocking // (because the actual file descriptors that are spliced from/to may block // unless they have the O_NONBLOCK flag set), but it makes the splice pipe // operations nonblocking. spliceNonblock = 0x2 // maxSpliceSize is the maximum amount of data Splice asks // the kernel to move in a single call to splice(2). // We use 1MB as Splice writes data through a pipe, and 1MB is the default maximum pipe buffer size, // which is determined by /proc/sys/fs/pipe-max-size. maxSpliceSize = 1 << 20 ) // Splice transfers at most remain bytes of data from src to dst, using the // splice system call to minimize copies of data from and to userspace. // // Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer. // src and dst must both be stream-oriented sockets. func Splice(dst, src *FD, remain int64) (written int64, handled bool, err error) { p, err := getPipe() if err != nil { return 0, false, err } defer putPipe(p) var inPipe, n int for err == nil && remain > 0 { max := maxSpliceSize if int64(max) > remain { max = int(remain) } inPipe, err = spliceDrain(p.wfd, src, max) // The operation is considered handled if splice returns no // error, or an error other than EINVAL. An EINVAL means the // kernel does not support splice for the socket type of src. // The failed syscall does not consume any data so it is safe // to fall back to a generic copy. // // spliceDrain should never return EAGAIN, so if err != nil, // Splice cannot continue. // // If inPipe == 0 && err == nil, src is at EOF, and the // transfer is complete. handled = handled || (err != syscall.EINVAL) if err != nil || inPipe == 0 { break } p.data += inPipe n, err = splicePump(dst, p.rfd, inPipe) if n > 0 { written += int64(n) remain -= int64(n) p.data -= n } } if err != nil { return written, handled, err } return written, true, nil } // spliceDrain moves data from a socket to a pipe. // // Invariant: when entering spliceDrain, the pipe is empty. It is either in its // initial state, or splicePump has emptied it previously. // // Given this, spliceDrain can reasonably assume that the pipe is ready for // writing, so if splice returns EAGAIN, it must be because the socket is not // ready for reading. // // If spliceDrain returns (0, nil), src is at EOF. func spliceDrain(pipefd int, sock *FD, max int) (int, error) { if err := sock.readLock(); err != nil { return 0, err } defer sock.readUnlock() if err := sock.pd.prepareRead(sock.isFile); err != nil { return 0, err } for { // In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here, // because it could return EAGAIN ceaselessly when the write end of the pipe is full, // but this shouldn't be a concern here, since the pipe buffer must be sufficient for // this data transmission on the basis of the workflow in Splice. n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock) if err == syscall.EINTR { continue } if err != syscall.EAGAIN { return n, err } if sock.pd.pollable() { if err := sock.pd.waitRead(sock.isFile); err != nil { return n, err } } } } // splicePump moves all the buffered data from a pipe to a socket. // // Invariant: when entering splicePump, there are exactly inPipe // bytes of data in the pipe, from a previous call to spliceDrain. // // By analogy to the condition from spliceDrain, splicePump // only needs to poll the socket for readiness, if splice returns // EAGAIN. // // If splicePump cannot move all the data in a single call to // splice(2), it loops over the buffered data until it has written // all of it to the socket. This behavior is similar to the Write // step of an io.Copy in userspace. func splicePump(sock *FD, pipefd int, inPipe int) (int, error) { if err := sock.writeLock(); err != nil { return 0, err } defer sock.writeUnlock() if err := sock.pd.prepareWrite(sock.isFile); err != nil { return 0, err } written := 0 for inPipe > 0 { // In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here, // because it could return EAGAIN ceaselessly when the read end of the pipe is empty, // but this shouldn't be a concern here, since the pipe buffer must contain inPipe size of // data on the basis of the workflow in Splice. n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock) if err == syscall.EINTR { continue } // Here, the condition n == 0 && err == nil should never be // observed, since Splice controls the write side of the pipe. if n > 0 { inPipe -= n written += n continue } if err != syscall.EAGAIN { return written, err } if sock.pd.pollable() { if err := sock.pd.waitWrite(sock.isFile); err != nil { return written, err } } } return written, nil } // splice wraps the splice system call. Since the current implementation // only uses splice on sockets and pipes, the offset arguments are unused. // splice returns int instead of int64, because callers never ask it to // move more data in a single call than can fit in an int32. func splice(out int, in int, max int, flags int) (int, error) { n, err := syscall.Splice(in, nil, out, nil, max, flags) return int(n), err } type splicePipeFields struct { rfd int wfd int data int } type splicePipe struct { splicePipeFields // We want to use a finalizer, so ensure that the size is // large enough to not use the tiny allocator. _ [24 - unsafe.Sizeof(splicePipeFields{})%24]byte } // splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers. // The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up // a finalizer for each pipe to close its file descriptors before the actual GC. var splicePipePool = sync.Pool{New: newPoolPipe} func newPoolPipe() any { // Discard the error which occurred during the creation of pipe buffer, // redirecting the data transmission to the conventional way utilizing read() + write() as a fallback. p := newPipe() if p == nil { return nil } runtime.SetFinalizer(p, destroyPipe) return p } // getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache. func getPipe() (*splicePipe, error) { v := splicePipePool.Get() if v == nil { return nil, syscall.EINVAL } return v.(*splicePipe), nil } func putPipe(p *splicePipe) { // If there is still data left in the pipe, // then close and discard it instead of putting it back into the pool. if p.data != 0 { runtime.SetFinalizer(p, nil) destroyPipe(p) return } splicePipePool.Put(p) } // newPipe sets up a pipe for a splice operation. func newPipe() *splicePipe { var fds [2]int if err := syscall.Pipe2(fds[:], syscall.O_CLOEXEC|syscall.O_NONBLOCK); err != nil { return nil } // Splice will loop writing maxSpliceSize bytes from the source to the pipe, // and then write those bytes from the pipe to the destination. // Set the pipe buffer size to maxSpliceSize to optimize that. // Ignore errors here, as a smaller buffer size will work, // although it will require more system calls. unix.Fcntl(fds[0], syscall.F_SETPIPE_SZ, maxSpliceSize) return &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}} } // destroyPipe destroys a pipe. func destroyPipe(p *splicePipe) { CloseFunc(p.rfd) CloseFunc(p.wfd) }