pipe.go raw
1 // Copyright 2014 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package http2
6
7 import (
8 "errors"
9 "io"
10 "sync"
11 )
12
13 // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14 // io.Pipe except there are no PipeReader/PipeWriter halves, and the
15 // underlying buffer is an interface. (io.Pipe is always unbuffered)
16 type pipe struct {
17 mu sync.Mutex
18 c sync.Cond // c.L lazily initialized to &p.mu
19 b pipeBuffer // nil when done reading
20 unread int // bytes unread when done
21 err error // read error once empty. non-nil means closed.
22 breakErr error // immediate read error (caller doesn't see rest of b)
23 donec chan struct{} // closed on error
24 readFn func() // optional code to run in Read before error
25 }
26
27 type pipeBuffer interface {
28 Len() int
29 io.Writer
30 io.Reader
31 }
32
33 // setBuffer initializes the pipe buffer.
34 // It has no effect if the pipe is already closed.
35 func (p *pipe) setBuffer(b pipeBuffer) {
36 p.mu.Lock()
37 defer p.mu.Unlock()
38 if p.err != nil || p.breakErr != nil {
39 return
40 }
41 p.b = b
42 }
43
44 func (p *pipe) Len() int {
45 p.mu.Lock()
46 defer p.mu.Unlock()
47 if p.b == nil {
48 return p.unread
49 }
50 return p.b.Len()
51 }
52
53 // Read waits until data is available and copies bytes
54 // from the buffer into p.
55 func (p *pipe) Read(d []byte) (n int, err error) {
56 p.mu.Lock()
57 defer p.mu.Unlock()
58 if p.c.L == nil {
59 p.c.L = &p.mu
60 }
61 for {
62 if p.breakErr != nil {
63 return 0, p.breakErr
64 }
65 if p.b != nil && p.b.Len() > 0 {
66 return p.b.Read(d)
67 }
68 if p.err != nil {
69 if p.readFn != nil {
70 p.readFn() // e.g. copy trailers
71 p.readFn = nil // not sticky like p.err
72 }
73 p.b = nil
74 return 0, p.err
75 }
76 p.c.Wait()
77 }
78 }
79
80 var (
81 errClosedPipeWrite = errors.New("write on closed buffer")
82 errUninitializedPipeWrite = errors.New("write on uninitialized buffer")
83 )
84
85 // Write copies bytes from p into the buffer and wakes a reader.
86 // It is an error to write more data than the buffer can hold.
87 func (p *pipe) Write(d []byte) (n int, err error) {
88 p.mu.Lock()
89 defer p.mu.Unlock()
90 if p.c.L == nil {
91 p.c.L = &p.mu
92 }
93 defer p.c.Signal()
94 if p.err != nil || p.breakErr != nil {
95 return 0, errClosedPipeWrite
96 }
97 // pipe.setBuffer is never invoked, leaving the buffer uninitialized.
98 // We shouldn't try to write to an uninitialized pipe,
99 // but returning an error is better than panicking.
100 if p.b == nil {
101 return 0, errUninitializedPipeWrite
102 }
103 return p.b.Write(d)
104 }
105
106 // CloseWithError causes the next Read (waking up a current blocked
107 // Read if needed) to return the provided err after all data has been
108 // read.
109 //
110 // The error must be non-nil.
111 func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
112
113 // BreakWithError causes the next Read (waking up a current blocked
114 // Read if needed) to return the provided err immediately, without
115 // waiting for unread data.
116 func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
117
118 // closeWithErrorAndCode is like CloseWithError but also sets some code to run
119 // in the caller's goroutine before returning the error.
120 func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
121
122 func (p *pipe) closeWithError(dst *error, err error, fn func()) {
123 if err == nil {
124 panic("err must be non-nil")
125 }
126 p.mu.Lock()
127 defer p.mu.Unlock()
128 if p.c.L == nil {
129 p.c.L = &p.mu
130 }
131 defer p.c.Signal()
132 if *dst != nil {
133 // Already been done.
134 return
135 }
136 p.readFn = fn
137 if dst == &p.breakErr {
138 if p.b != nil {
139 p.unread += p.b.Len()
140 }
141 p.b = nil
142 }
143 *dst = err
144 p.closeDoneLocked()
145 }
146
147 // requires p.mu be held.
148 func (p *pipe) closeDoneLocked() {
149 if p.donec == nil {
150 return
151 }
152 // Close if unclosed. This isn't racy since we always
153 // hold p.mu while closing.
154 select {
155 case <-p.donec:
156 default:
157 close(p.donec)
158 }
159 }
160
161 // Err returns the error (if any) first set by BreakWithError or CloseWithError.
162 func (p *pipe) Err() error {
163 p.mu.Lock()
164 defer p.mu.Unlock()
165 if p.breakErr != nil {
166 return p.breakErr
167 }
168 return p.err
169 }
170
171 // Done returns a channel which is closed if and when this pipe is closed
172 // with CloseWithError.
173 func (p *pipe) Done() <-chan struct{} {
174 p.mu.Lock()
175 defer p.mu.Unlock()
176 if p.donec == nil {
177 p.donec = make(chan struct{})
178 if p.err != nil || p.breakErr != nil {
179 // Already hit an error.
180 p.closeDoneLocked()
181 }
182 }
183 return p.donec
184 }
185