pipe.go raw

   1  // Copyright 2014 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package http2
   6  
   7  import (
   8  	"errors"
   9  	"io"
  10  	"sync"
  11  )
  12  
  13  // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
  14  // io.Pipe except there are no PipeReader/PipeWriter halves, and the
  15  // underlying buffer is an interface. (io.Pipe is always unbuffered)
  16  type pipe struct {
  17  	mu       sync.Mutex
  18  	c        sync.Cond     // c.L lazily initialized to &p.mu
  19  	b        pipeBuffer    // nil when done reading
  20  	unread   int           // bytes unread when done
  21  	err      error         // read error once empty. non-nil means closed.
  22  	breakErr error         // immediate read error (caller doesn't see rest of b)
  23  	donec    chan struct{} // closed on error
  24  	readFn   func()        // optional code to run in Read before error
  25  }
  26  
  27  type pipeBuffer interface {
  28  	Len() int
  29  	io.Writer
  30  	io.Reader
  31  }
  32  
  33  // setBuffer initializes the pipe buffer.
  34  // It has no effect if the pipe is already closed.
  35  func (p *pipe) setBuffer(b pipeBuffer) {
  36  	p.mu.Lock()
  37  	defer p.mu.Unlock()
  38  	if p.err != nil || p.breakErr != nil {
  39  		return
  40  	}
  41  	p.b = b
  42  }
  43  
  44  func (p *pipe) Len() int {
  45  	p.mu.Lock()
  46  	defer p.mu.Unlock()
  47  	if p.b == nil {
  48  		return p.unread
  49  	}
  50  	return p.b.Len()
  51  }
  52  
  53  // Read waits until data is available and copies bytes
  54  // from the buffer into p.
  55  func (p *pipe) Read(d []byte) (n int, err error) {
  56  	p.mu.Lock()
  57  	defer p.mu.Unlock()
  58  	if p.c.L == nil {
  59  		p.c.L = &p.mu
  60  	}
  61  	for {
  62  		if p.breakErr != nil {
  63  			return 0, p.breakErr
  64  		}
  65  		if p.b != nil && p.b.Len() > 0 {
  66  			return p.b.Read(d)
  67  		}
  68  		if p.err != nil {
  69  			if p.readFn != nil {
  70  				p.readFn()     // e.g. copy trailers
  71  				p.readFn = nil // not sticky like p.err
  72  			}
  73  			p.b = nil
  74  			return 0, p.err
  75  		}
  76  		p.c.Wait()
  77  	}
  78  }
  79  
  80  var (
  81  	errClosedPipeWrite        = errors.New("write on closed buffer")
  82  	errUninitializedPipeWrite = errors.New("write on uninitialized buffer")
  83  )
  84  
  85  // Write copies bytes from p into the buffer and wakes a reader.
  86  // It is an error to write more data than the buffer can hold.
  87  func (p *pipe) Write(d []byte) (n int, err error) {
  88  	p.mu.Lock()
  89  	defer p.mu.Unlock()
  90  	if p.c.L == nil {
  91  		p.c.L = &p.mu
  92  	}
  93  	defer p.c.Signal()
  94  	if p.err != nil || p.breakErr != nil {
  95  		return 0, errClosedPipeWrite
  96  	}
  97  	// pipe.setBuffer is never invoked, leaving the buffer uninitialized.
  98  	// We shouldn't try to write to an uninitialized pipe,
  99  	// but returning an error is better than panicking.
 100  	if p.b == nil {
 101  		return 0, errUninitializedPipeWrite
 102  	}
 103  	return p.b.Write(d)
 104  }
 105  
 106  // CloseWithError causes the next Read (waking up a current blocked
 107  // Read if needed) to return the provided err after all data has been
 108  // read.
 109  //
 110  // The error must be non-nil.
 111  func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
 112  
 113  // BreakWithError causes the next Read (waking up a current blocked
 114  // Read if needed) to return the provided err immediately, without
 115  // waiting for unread data.
 116  func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
 117  
 118  // closeWithErrorAndCode is like CloseWithError but also sets some code to run
 119  // in the caller's goroutine before returning the error.
 120  func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
 121  
 122  func (p *pipe) closeWithError(dst *error, err error, fn func()) {
 123  	if err == nil {
 124  		panic("err must be non-nil")
 125  	}
 126  	p.mu.Lock()
 127  	defer p.mu.Unlock()
 128  	if p.c.L == nil {
 129  		p.c.L = &p.mu
 130  	}
 131  	defer p.c.Signal()
 132  	if *dst != nil {
 133  		// Already been done.
 134  		return
 135  	}
 136  	p.readFn = fn
 137  	if dst == &p.breakErr {
 138  		if p.b != nil {
 139  			p.unread += p.b.Len()
 140  		}
 141  		p.b = nil
 142  	}
 143  	*dst = err
 144  	p.closeDoneLocked()
 145  }
 146  
 147  // requires p.mu be held.
 148  func (p *pipe) closeDoneLocked() {
 149  	if p.donec == nil {
 150  		return
 151  	}
 152  	// Close if unclosed. This isn't racy since we always
 153  	// hold p.mu while closing.
 154  	select {
 155  	case <-p.donec:
 156  	default:
 157  		close(p.donec)
 158  	}
 159  }
 160  
 161  // Err returns the error (if any) first set by BreakWithError or CloseWithError.
 162  func (p *pipe) Err() error {
 163  	p.mu.Lock()
 164  	defer p.mu.Unlock()
 165  	if p.breakErr != nil {
 166  		return p.breakErr
 167  	}
 168  	return p.err
 169  }
 170  
 171  // Done returns a channel which is closed if and when this pipe is closed
 172  // with CloseWithError.
 173  func (p *pipe) Done() <-chan struct{} {
 174  	p.mu.Lock()
 175  	defer p.mu.Unlock()
 176  	if p.donec == nil {
 177  		p.donec = make(chan struct{})
 178  		if p.err != nil || p.breakErr != nil {
 179  			// Already hit an error.
 180  			p.closeDoneLocked()
 181  		}
 182  	}
 183  	return p.donec
 184  }
 185