pipe.mx raw

   1  // Copyright 2010 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package net
   6  
   7  import (
   8  	"io"
   9  	"os"
  10  	"sync"
  11  	"time"
  12  )
  13  
  14  // pipeDeadline is an abstraction for handling timeouts.
  15  type pipeDeadline struct {
  16  	mu     sync.Mutex // Guards timer and cancel
  17  	timer  *time.Timer
  18  	cancel chan struct{} // Must be non-nil
  19  }
  20  
  21  func makePipeDeadline() pipeDeadline {
  22  	return pipeDeadline{cancel: chan struct{}{}}
  23  }
  24  
  25  // set sets the point in time when the deadline will time out.
  26  // A timeout event is signaled by closing the channel returned by waiter.
  27  // Once a timeout has occurred, the deadline can be refreshed by specifying a
  28  // t value in the future.
  29  //
  30  // A zero value for t prevents timeout.
  31  func (d *pipeDeadline) set(t time.Time) {
  32  	d.mu.Lock()
  33  	defer d.mu.Unlock()
  34  
  35  	if d.timer != nil && !d.timer.Stop() {
  36  		<-d.cancel // Wait for the timer callback to finish and close cancel
  37  	}
  38  	d.timer = nil
  39  
  40  	// Time is zero, then there is no deadline.
  41  	closed := isClosedChan(d.cancel)
  42  	if t.IsZero() {
  43  		if closed {
  44  			d.cancel = chan struct{}{}
  45  		}
  46  		return
  47  	}
  48  
  49  	// Time in the future, setup a timer to cancel in the future.
  50  	if dur := time.Until(t); dur > 0 {
  51  		if closed {
  52  			d.cancel = chan struct{}{}
  53  		}
  54  		d.timer = time.AfterFunc(dur, func() {
  55  			close(d.cancel)
  56  		})
  57  		return
  58  	}
  59  
  60  	// Time in the past, so close immediately.
  61  	if !closed {
  62  		close(d.cancel)
  63  	}
  64  }
  65  
  66  // wait returns a channel that is closed when the deadline is exceeded.
  67  func (d *pipeDeadline) wait() chan struct{} {
  68  	d.mu.Lock()
  69  	defer d.mu.Unlock()
  70  	return d.cancel
  71  }
  72  
  73  func isClosedChan(c <-chan struct{}) bool {
  74  	select {
  75  	case <-c:
  76  		return true
  77  	default:
  78  		return false
  79  	}
  80  }
  81  
  82  type pipeAddr struct{}
  83  
  84  func (pipeAddr) Network() []byte { return "pipe" }
  85  func (pipeAddr) String() string  { return "pipe" }
  86  
  87  type pipe struct {
  88  	wrMu sync.Mutex // Serialize Write operations
  89  
  90  	// Used by local Read to interact with remote Write.
  91  	// Successful receive on rdRx is always followed by send on rdTx.
  92  	rdRx <-chan []byte
  93  	rdTx chan<- int
  94  
  95  	// Used by local Write to interact with remote Read.
  96  	// Successful send on wrTx is always followed by receive on wrRx.
  97  	wrTx chan<- []byte
  98  	wrRx <-chan int
  99  
 100  	once       sync.Once // Protects closing localDone
 101  	localDone  chan struct{}
 102  	remoteDone <-chan struct{}
 103  
 104  	readDeadline  pipeDeadline
 105  	writeDeadline pipeDeadline
 106  }
 107  
 108  // Pipe creates a synchronous, in-memory, full duplex
 109  // network connection; both ends implement the [Conn] interface.
 110  // Reads on one end are matched with writes on the other,
 111  // copying data directly between the two; there is no internal
 112  // buffering.
 113  func Pipe() (Conn, Conn) {
 114  	cb1 := chan []byte{}
 115  	cb2 := chan []byte{}
 116  	cn1 := chan int{}
 117  	cn2 := chan int{}
 118  	done1 := chan struct{}{}
 119  	done2 := chan struct{}{}
 120  
 121  	p1 := &pipe{
 122  		rdRx: cb1, rdTx: cn1,
 123  		wrTx: cb2, wrRx: cn2,
 124  		localDone: done1, remoteDone: done2,
 125  		readDeadline:  makePipeDeadline(),
 126  		writeDeadline: makePipeDeadline(),
 127  	}
 128  	p2 := &pipe{
 129  		rdRx: cb2, rdTx: cn2,
 130  		wrTx: cb1, wrRx: cn1,
 131  		localDone: done2, remoteDone: done1,
 132  		readDeadline:  makePipeDeadline(),
 133  		writeDeadline: makePipeDeadline(),
 134  	}
 135  	return p1, p2
 136  }
 137  
 138  func (*pipe) LocalAddr() Addr  { return pipeAddr{} }
 139  func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
 140  
 141  func (p *pipe) Read(b []byte) (int, error) {
 142  	n, err := p.read(b)
 143  	if err != nil && err != io.EOF && err != io.ErrClosedPipe {
 144  		err = &OpError{Op: "read", Net: "pipe", Err: err}
 145  	}
 146  	return n, err
 147  }
 148  
 149  func (p *pipe) read(b []byte) (n int, err error) {
 150  	switch {
 151  	case isClosedChan(p.localDone):
 152  		return 0, io.ErrClosedPipe
 153  	case isClosedChan(p.remoteDone):
 154  		return 0, io.EOF
 155  	case isClosedChan(p.readDeadline.wait()):
 156  		return 0, os.ErrDeadlineExceeded
 157  	}
 158  
 159  	select {
 160  	case bw := <-p.rdRx:
 161  		nr := copy(b, bw)
 162  		p.rdTx <- nr
 163  		return nr, nil
 164  	case <-p.localDone:
 165  		return 0, io.ErrClosedPipe
 166  	case <-p.remoteDone:
 167  		return 0, io.EOF
 168  	case <-p.readDeadline.wait():
 169  		return 0, os.ErrDeadlineExceeded
 170  	}
 171  }
 172  
 173  func (p *pipe) Write(b []byte) (int, error) {
 174  	n, err := p.write(b)
 175  	if err != nil && err != io.ErrClosedPipe {
 176  		err = &OpError{Op: "write", Net: "pipe", Err: err}
 177  	}
 178  	return n, err
 179  }
 180  
 181  func (p *pipe) write(b []byte) (n int, err error) {
 182  	switch {
 183  	case isClosedChan(p.localDone):
 184  		return 0, io.ErrClosedPipe
 185  	case isClosedChan(p.remoteDone):
 186  		return 0, io.ErrClosedPipe
 187  	case isClosedChan(p.writeDeadline.wait()):
 188  		return 0, os.ErrDeadlineExceeded
 189  	}
 190  
 191  	p.wrMu.Lock() // Ensure entirety of b is written together
 192  	defer p.wrMu.Unlock()
 193  	for once := true; once || len(b) > 0; once = false {
 194  		select {
 195  		case p.wrTx <- b:
 196  			nw := <-p.wrRx
 197  			b = b[nw:]
 198  			n += nw
 199  		case <-p.localDone:
 200  			return n, io.ErrClosedPipe
 201  		case <-p.remoteDone:
 202  			return n, io.ErrClosedPipe
 203  		case <-p.writeDeadline.wait():
 204  			return n, os.ErrDeadlineExceeded
 205  		}
 206  	}
 207  	return n, nil
 208  }
 209  
 210  func (p *pipe) SetDeadline(t time.Time) error {
 211  	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
 212  		return io.ErrClosedPipe
 213  	}
 214  	p.readDeadline.set(t)
 215  	p.writeDeadline.set(t)
 216  	return nil
 217  }
 218  
 219  func (p *pipe) SetReadDeadline(t time.Time) error {
 220  	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
 221  		return io.ErrClosedPipe
 222  	}
 223  	p.readDeadline.set(t)
 224  	return nil
 225  }
 226  
 227  func (p *pipe) SetWriteDeadline(t time.Time) error {
 228  	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
 229  		return io.ErrClosedPipe
 230  	}
 231  	p.writeDeadline.set(t)
 232  	return nil
 233  }
 234  
 235  func (p *pipe) Close() error {
 236  	p.once.Do(func() { close(p.localDone) })
 237  	return nil
 238  }
 239