1 // Copyright 2015 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 5 // Transport code.
6 7 package http2
8 9 import (
10 "bufio"
11 "bytes"
12 "compress/flate"
13 "compress/gzip"
14 "context"
15 "crypto/rand"
16 "crypto/tls"
17 "errors"
18 "fmt"
19 "io"
20 "io/fs"
21 "log"
22 "math"
23 "math/bits"
24 mathrand "math/rand"
25 "net"
26 "net/http"
27 "net/http/httptrace"
28 "net/textproto"
29 "strconv"
30 "strings"
31 "sync"
32 "sync/atomic"
33 "time"
34 35 "golang.org/x/net/http/httpguts"
36 "golang.org/x/net/http2/hpack"
37 "golang.org/x/net/idna"
38 "golang.org/x/net/internal/httpcommon"
39 )
40 41 const (
42 // transportDefaultConnFlow is how many connection-level flow control
43 // tokens we give the server at start-up, past the default 64k.
44 transportDefaultConnFlow = 1 << 30
45 46 // transportDefaultStreamFlow is how many stream-level flow
47 // control tokens we announce to the peer, and how many bytes
48 // we buffer per stream.
49 transportDefaultStreamFlow = 4 << 20
50 51 defaultUserAgent = "Go-http-client/2.0"
52 53 // initialMaxConcurrentStreams is a connections maxConcurrentStreams until
54 // it's received servers initial SETTINGS frame, which corresponds with the
55 // spec's minimum recommended value.
56 initialMaxConcurrentStreams = 100
57 58 // defaultMaxConcurrentStreams is a connections default maxConcurrentStreams
59 // if the server doesn't include one in its initial SETTINGS frame.
60 defaultMaxConcurrentStreams = 1000
61 )
62 63 // Transport is an HTTP/2 Transport.
64 //
65 // A Transport internally caches connections to servers. It is safe
66 // for concurrent use by multiple goroutines.
67 type Transport struct {
68 // DialTLSContext specifies an optional dial function with context for
69 // creating TLS connections for requests.
70 //
71 // If DialTLSContext and DialTLS is nil, tls.Dial is used.
72 //
73 // If the returned net.Conn has a ConnectionState method like tls.Conn,
74 // it will be used to set http.Response.TLS.
75 DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
76 77 // DialTLS specifies an optional dial function for creating
78 // TLS connections for requests.
79 //
80 // If DialTLSContext and DialTLS is nil, tls.Dial is used.
81 //
82 // Deprecated: Use DialTLSContext instead, which allows the transport
83 // to cancel dials as soon as they are no longer needed.
84 // If both are set, DialTLSContext takes priority.
85 DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
86 87 // TLSClientConfig specifies the TLS configuration to use with
88 // tls.Client. If nil, the default configuration is used.
89 TLSClientConfig *tls.Config
90 91 // ConnPool optionally specifies an alternate connection pool to use.
92 // If nil, the default is used.
93 ConnPool ClientConnPool
94 95 // DisableCompression, if true, prevents the Transport from
96 // requesting compression with an "Accept-Encoding: gzip"
97 // request header when the Request contains no existing
98 // Accept-Encoding value. If the Transport requests gzip on
99 // its own and gets a gzipped response, it's transparently
100 // decoded in the Response.Body. However, if the user
101 // explicitly requested gzip it is not automatically
102 // uncompressed.
103 DisableCompression bool
104 105 // AllowHTTP, if true, permits HTTP/2 requests using the insecure,
106 // plain-text "http" scheme. Note that this does not enable h2c support.
107 AllowHTTP bool
108 109 // MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
110 // send in the initial settings frame. It is how many bytes
111 // of response headers are allowed. Unlike the http2 spec, zero here
112 // means to use a default limit (currently 10MB). If you actually
113 // want to advertise an unlimited value to the peer, Transport
114 // interprets the highest possible value here (0xffffffff or 1<<32-1)
115 // to mean no limit.
116 MaxHeaderListSize uint32
117 118 // MaxReadFrameSize is the http2 SETTINGS_MAX_FRAME_SIZE to send in the
119 // initial settings frame. It is the size in bytes of the largest frame
120 // payload that the sender is willing to receive. If 0, no setting is
121 // sent, and the value is provided by the peer, which should be 16384
122 // according to the spec:
123 // https://datatracker.ietf.org/doc/html/rfc7540#section-6.5.2.
124 // Values are bounded in the range 16k to 16M.
125 MaxReadFrameSize uint32
126 127 // MaxDecoderHeaderTableSize optionally specifies the http2
128 // SETTINGS_HEADER_TABLE_SIZE to send in the initial settings frame. It
129 // informs the remote endpoint of the maximum size of the header compression
130 // table used to decode header blocks, in octets. If zero, the default value
131 // of 4096 is used.
132 MaxDecoderHeaderTableSize uint32
133 134 // MaxEncoderHeaderTableSize optionally specifies an upper limit for the
135 // header compression table used for encoding request headers. Received
136 // SETTINGS_HEADER_TABLE_SIZE settings are capped at this limit. If zero,
137 // the default value of 4096 is used.
138 MaxEncoderHeaderTableSize uint32
139 140 // StrictMaxConcurrentStreams controls whether the server's
141 // SETTINGS_MAX_CONCURRENT_STREAMS should be respected
142 // globally. If false, new TCP connections are created to the
143 // server as needed to keep each under the per-connection
144 // SETTINGS_MAX_CONCURRENT_STREAMS limit. If true, the
145 // server's SETTINGS_MAX_CONCURRENT_STREAMS is interpreted as
146 // a global limit and callers of RoundTrip block when needed,
147 // waiting for their turn.
148 StrictMaxConcurrentStreams bool
149 150 // IdleConnTimeout is the maximum amount of time an idle
151 // (keep-alive) connection will remain idle before closing
152 // itself.
153 // Zero means no limit.
154 IdleConnTimeout time.Duration
155 156 // ReadIdleTimeout is the timeout after which a health check using ping
157 // frame will be carried out if no frame is received on the connection.
158 // Note that a ping response will is considered a received frame, so if
159 // there is no other traffic on the connection, the health check will
160 // be performed every ReadIdleTimeout interval.
161 // If zero, no health check is performed.
162 ReadIdleTimeout time.Duration
163 164 // PingTimeout is the timeout after which the connection will be closed
165 // if a response to Ping is not received.
166 // Defaults to 15s.
167 PingTimeout time.Duration
168 169 // WriteByteTimeout is the timeout after which the connection will be
170 // closed no data can be written to it. The timeout begins when data is
171 // available to write, and is extended whenever any bytes are written.
172 WriteByteTimeout time.Duration
173 174 // CountError, if non-nil, is called on HTTP/2 transport errors.
175 // It's intended to increment a metric for monitoring, such
176 // as an expvar or Prometheus metric.
177 // The errType consists of only ASCII word characters.
178 CountError func(errType string)
179 180 // t1, if non-nil, is the standard library Transport using
181 // this transport. Its settings are used (but not its
182 // RoundTrip method, etc).
183 t1 *http.Transport
184 185 connPoolOnce sync.Once
186 connPoolOrDef ClientConnPool // non-nil version of ConnPool
187 188 *transportTestHooks
189 }
190 191 // Hook points used for testing.
192 // Outside of tests, t.transportTestHooks is nil and these all have minimal implementations.
193 // Inside tests, see the testSyncHooks function docs.
194 195 type transportTestHooks struct {
196 newclientconn func(*ClientConn)
197 }
198 199 func (t *Transport) maxHeaderListSize() uint32 {
200 n := int64(t.MaxHeaderListSize)
201 if t.t1 != nil && t.t1.MaxResponseHeaderBytes != 0 {
202 n = t.t1.MaxResponseHeaderBytes
203 if n > 0 {
204 n = adjustHTTP1MaxHeaderSize(n)
205 }
206 }
207 if n <= 0 {
208 return 10 << 20
209 }
210 if n >= 0xffffffff {
211 return 0
212 }
213 return uint32(n)
214 }
215 216 func (t *Transport) disableCompression() bool {
217 return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
218 }
219 220 // ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
221 // It returns an error if t1 has already been HTTP/2-enabled.
222 //
223 // Use ConfigureTransports instead to configure the HTTP/2 Transport.
224 func ConfigureTransport(t1 *http.Transport) error {
225 _, err := ConfigureTransports(t1)
226 return err
227 }
228 229 // ConfigureTransports configures a net/http HTTP/1 Transport to use HTTP/2.
230 // It returns a new HTTP/2 Transport for further configuration.
231 // It returns an error if t1 has already been HTTP/2-enabled.
232 func ConfigureTransports(t1 *http.Transport) (*Transport, error) {
233 return configureTransports(t1)
234 }
235 236 func configureTransports(t1 *http.Transport) (*Transport, error) {
237 connPool := new(clientConnPool)
238 t2 := &Transport{
239 ConnPool: noDialClientConnPool{connPool},
240 t1: t1,
241 }
242 connPool.t = t2
243 if err := registerHTTPSProtocol(t1, noDialH2RoundTripper{t2}); err != nil {
244 return nil, err
245 }
246 if t1.TLSClientConfig == nil {
247 t1.TLSClientConfig = new(tls.Config)
248 }
249 if !strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {
250 t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
251 }
252 if !strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {
253 t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
254 }
255 upgradeFn := func(scheme, authority string, c net.Conn) http.RoundTripper {
256 addr := authorityAddr(scheme, authority)
257 if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
258 go c.Close()
259 return erringRoundTripper{err}
260 } else if !used {
261 // Turns out we don't need this c.
262 // For example, two goroutines made requests to the same host
263 // at the same time, both kicking off TCP dials. (since protocol
264 // was unknown)
265 go c.Close()
266 }
267 if scheme == "http" {
268 return (*unencryptedTransport)(t2)
269 }
270 return t2
271 }
272 if t1.TLSNextProto == nil {
273 t1.TLSNextProto = make(map[string]func(string, *tls.Conn) http.RoundTripper)
274 }
275 t1.TLSNextProto[NextProtoTLS] = func(authority string, c *tls.Conn) http.RoundTripper {
276 return upgradeFn("https", authority, c)
277 }
278 // The "unencrypted_http2" TLSNextProto key is used to pass off non-TLS HTTP/2 conns.
279 t1.TLSNextProto[nextProtoUnencryptedHTTP2] = func(authority string, c *tls.Conn) http.RoundTripper {
280 nc, err := unencryptedNetConnFromTLSConn(c)
281 if err != nil {
282 go c.Close()
283 return erringRoundTripper{err}
284 }
285 return upgradeFn("http", authority, nc)
286 }
287 return t2, nil
288 }
289 290 // unencryptedTransport is a Transport with a RoundTrip method that
291 // always permits http:// URLs.
292 type unencryptedTransport Transport
293 294 func (t *unencryptedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
295 return (*Transport)(t).RoundTripOpt(req, RoundTripOpt{allowHTTP: true})
296 }
297 298 func (t *Transport) connPool() ClientConnPool {
299 t.connPoolOnce.Do(t.initConnPool)
300 return t.connPoolOrDef
301 }
302 303 func (t *Transport) initConnPool() {
304 if t.ConnPool != nil {
305 t.connPoolOrDef = t.ConnPool
306 } else {
307 t.connPoolOrDef = &clientConnPool{t: t}
308 }
309 }
310 311 // ClientConn is the state of a single HTTP/2 client connection to an
312 // HTTP/2 server.
313 type ClientConn struct {
314 t *Transport
315 tconn net.Conn // usually *tls.Conn, except specialized impls
316 tlsState *tls.ConnectionState // nil only for specialized impls
317 atomicReused uint32 // whether conn is being reused; atomic
318 singleUse bool // whether being used for a single http.Request
319 getConnCalled bool // used by clientConnPool
320 321 // readLoop goroutine fields:
322 readerDone chan struct{} // closed on error
323 readerErr error // set before readerDone is closed
324 325 idleTimeout time.Duration // or 0 for never
326 idleTimer *time.Timer
327 328 mu sync.Mutex // guards following
329 cond *sync.Cond // hold mu; broadcast on flow/closed changes
330 flow outflow // our conn-level flow control quota (cs.outflow is per stream)
331 inflow inflow // peer's conn-level flow control
332 doNotReuse bool // whether conn is marked to not be reused for any future requests
333 closing bool
334 closed bool
335 closedOnIdle bool // true if conn was closed for idleness
336 seenSettings bool // true if we've seen a settings frame, false otherwise
337 seenSettingsChan chan struct{} // closed when seenSettings is true or frame reading fails
338 wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
339 goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
340 goAwayDebug string // goAway frame's debug data, retained as a string
341 streams map[uint32]*clientStream // client-initiated
342 streamsReserved int // incr by ReserveNewRequest; decr on RoundTrip
343 nextStreamID uint32
344 pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
345 pings map[[8]byte]chan struct{} // in flight ping data to notification channel
346 br *bufio.Reader
347 lastActive time.Time
348 lastIdle time.Time // time last idle
349 // Settings from peer: (also guarded by wmu)
350 maxFrameSize uint32
351 maxConcurrentStreams uint32
352 peerMaxHeaderListSize uint64
353 peerMaxHeaderTableSize uint32
354 initialWindowSize uint32
355 initialStreamRecvWindowSize int32
356 readIdleTimeout time.Duration
357 pingTimeout time.Duration
358 extendedConnectAllowed bool
359 strictMaxConcurrentStreams bool
360 361 // rstStreamPingsBlocked works around an unfortunate gRPC behavior.
362 // gRPC strictly limits the number of PING frames that it will receive.
363 // The default is two pings per two hours, but the limit resets every time
364 // the gRPC endpoint sends a HEADERS or DATA frame. See golang/go#70575.
365 //
366 // rstStreamPingsBlocked is set after receiving a response to a PING frame
367 // bundled with an RST_STREAM (see pendingResets below), and cleared after
368 // receiving a HEADERS or DATA frame.
369 rstStreamPingsBlocked bool
370 371 // pendingResets is the number of RST_STREAM frames we have sent to the peer,
372 // without confirming that the peer has received them. When we send a RST_STREAM,
373 // we bundle it with a PING frame, unless a PING is already in flight. We count
374 // the reset stream against the connection's concurrency limit until we get
375 // a PING response. This limits the number of requests we'll try to send to a
376 // completely unresponsive connection.
377 pendingResets int
378 379 // readBeforeStreamID is the smallest stream ID that has not been followed by
380 // a frame read from the peer. We use this to determine when a request may
381 // have been sent to a completely unresponsive connection:
382 // If the request ID is less than readBeforeStreamID, then we have had some
383 // indication of life on the connection since sending the request.
384 readBeforeStreamID uint32
385 386 // reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
387 // Write to reqHeaderMu to lock it, read from it to unlock.
388 // Lock reqmu BEFORE mu or wmu.
389 reqHeaderMu chan struct{}
390 391 // internalStateHook reports state changes back to the net/http.ClientConn.
392 // Note that this is different from the user state hook registered by
393 // net/http.ClientConn.SetStateHook: The internal hook calls ClientConn,
394 // which calls the user hook.
395 internalStateHook func()
396 397 // wmu is held while writing.
398 // Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
399 // Only acquire both at the same time when changing peer settings.
400 wmu sync.Mutex
401 bw *bufio.Writer
402 fr *Framer
403 werr error // first write error that has occurred
404 hbuf bytes.Buffer // HPACK encoder writes into this
405 henc *hpack.Encoder
406 }
407 408 // clientStream is the state for a single HTTP/2 stream. One of these
409 // is created for each Transport.RoundTrip call.
410 type clientStream struct {
411 cc *ClientConn
412 413 // Fields of Request that we may access even after the response body is closed.
414 ctx context.Context
415 reqCancel <-chan struct{}
416 417 trace *httptrace.ClientTrace // or nil
418 ID uint32
419 bufPipe pipe // buffered pipe with the flow-controlled response payload
420 requestedGzip bool
421 isHead bool
422 423 abortOnce sync.Once
424 abort chan struct{} // closed to signal stream should end immediately
425 abortErr error // set if abort is closed
426 427 peerClosed chan struct{} // closed when the peer sends an END_STREAM flag
428 donec chan struct{} // closed after the stream is in the closed state
429 on100 chan struct{} // buffered; written to if a 100 is received
430 431 respHeaderRecv chan struct{} // closed when headers are received
432 res *http.Response // set if respHeaderRecv is closed
433 434 flow outflow // guarded by cc.mu
435 inflow inflow // guarded by cc.mu
436 bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
437 readErr error // sticky read error; owned by transportResponseBody.Read
438 439 reqBody io.ReadCloser
440 reqBodyContentLength int64 // -1 means unknown
441 reqBodyClosed chan struct{} // guarded by cc.mu; non-nil on Close, closed when done
442 443 // owned by writeRequest:
444 sentEndStream bool // sent an END_STREAM flag to the peer
445 sentHeaders bool
446 447 // owned by clientConnReadLoop:
448 firstByte bool // got the first response byte
449 pastHeaders bool // got first MetaHeadersFrame (actual headers)
450 pastTrailers bool // got optional second MetaHeadersFrame (trailers)
451 readClosed bool // peer sent an END_STREAM flag
452 readAborted bool // read loop reset the stream
453 totalHeaderSize int64 // total size of 1xx headers seen
454 455 trailer http.Header // accumulated trailers
456 resTrailer *http.Header // client's Response.Trailer
457 }
458 459 var got1xxFuncForTests func(int, textproto.MIMEHeader) error
460 461 // get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func,
462 // if any. It returns nil if not set or if the Go version is too old.
463 func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
464 if fn := got1xxFuncForTests; fn != nil {
465 return fn
466 }
467 return traceGot1xxResponseFunc(cs.trace)
468 }
469 470 func (cs *clientStream) abortStream(err error) {
471 cs.cc.mu.Lock()
472 defer cs.cc.mu.Unlock()
473 cs.abortStreamLocked(err)
474 }
475 476 func (cs *clientStream) abortStreamLocked(err error) {
477 cs.abortOnce.Do(func() {
478 cs.abortErr = err
479 close(cs.abort)
480 })
481 if cs.reqBody != nil {
482 cs.closeReqBodyLocked()
483 }
484 // TODO(dneil): Clean up tests where cs.cc.cond is nil.
485 if cs.cc.cond != nil {
486 // Wake up writeRequestBody if it is waiting on flow control.
487 cs.cc.cond.Broadcast()
488 }
489 }
490 491 func (cs *clientStream) abortRequestBodyWrite() {
492 cc := cs.cc
493 cc.mu.Lock()
494 defer cc.mu.Unlock()
495 if cs.reqBody != nil && cs.reqBodyClosed == nil {
496 cs.closeReqBodyLocked()
497 cc.cond.Broadcast()
498 }
499 }
500 501 func (cs *clientStream) closeReqBodyLocked() {
502 if cs.reqBodyClosed != nil {
503 return
504 }
505 cs.reqBodyClosed = make(chan struct{})
506 reqBodyClosed := cs.reqBodyClosed
507 go func() {
508 cs.reqBody.Close()
509 close(reqBodyClosed)
510 }()
511 }
512 513 type stickyErrWriter struct {
514 conn net.Conn
515 timeout time.Duration
516 err *error
517 }
518 519 func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
520 if *sew.err != nil {
521 return 0, *sew.err
522 }
523 n, err = writeWithByteTimeout(sew.conn, sew.timeout, p)
524 *sew.err = err
525 return n, err
526 }
527 528 // noCachedConnError is the concrete type of ErrNoCachedConn, which
529 // needs to be detected by net/http regardless of whether it's its
530 // bundled version (in h2_bundle.go with a rewritten type name) or
531 // from a user's x/net/http2. As such, as it has a unique method name
532 // (IsHTTP2NoCachedConnError) that net/http sniffs for via func
533 // isNoCachedConnError.
534 type noCachedConnError struct{}
535 536 func (noCachedConnError) IsHTTP2NoCachedConnError() {}
537 func (noCachedConnError) Error() string { return "http2: no cached connection was available" }
538 539 // isNoCachedConnError reports whether err is of type noCachedConnError
540 // or its equivalent renamed type in net/http2's h2_bundle.go. Both types
541 // may coexist in the same running program.
542 func isNoCachedConnError(err error) bool {
543 _, ok := err.(interface{ IsHTTP2NoCachedConnError() })
544 return ok
545 }
546 547 var ErrNoCachedConn error = noCachedConnError{}
548 549 // RoundTripOpt are options for the Transport.RoundTripOpt method.
550 type RoundTripOpt struct {
551 // OnlyCachedConn controls whether RoundTripOpt may
552 // create a new TCP connection. If set true and
553 // no cached connection is available, RoundTripOpt
554 // will return ErrNoCachedConn.
555 OnlyCachedConn bool
556 557 allowHTTP bool // allow http:// URLs
558 }
559 560 func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
561 return t.RoundTripOpt(req, RoundTripOpt{})
562 }
563 564 // authorityAddr returns a given authority (a host/IP, or host:port / ip:port)
565 // and returns a host:port. The port 443 is added if needed.
566 func authorityAddr(scheme string, authority string) (addr string) {
567 host, port, err := net.SplitHostPort(authority)
568 if err != nil { // authority didn't have a port
569 host = authority
570 port = ""
571 }
572 if port == "" { // authority's port was empty
573 port = "443"
574 if scheme == "http" {
575 port = "80"
576 }
577 }
578 if a, err := idna.ToASCII(host); err == nil {
579 host = a
580 }
581 // IPv6 address literal, without a port:
582 if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
583 return host + ":" + port
584 }
585 return net.JoinHostPort(host, port)
586 }
587 588 // RoundTripOpt is like RoundTrip, but takes options.
589 func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
590 switch req.URL.Scheme {
591 case "https":
592 // Always okay.
593 case "http":
594 if !t.AllowHTTP && !opt.allowHTTP {
595 return nil, errors.New("http2: unencrypted HTTP/2 not enabled")
596 }
597 default:
598 return nil, errors.New("http2: unsupported scheme")
599 }
600 601 addr := authorityAddr(req.URL.Scheme, req.URL.Host)
602 for retry := 0; ; retry++ {
603 cc, err := t.connPool().GetClientConn(req, addr)
604 if err != nil {
605 t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
606 return nil, err
607 }
608 reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
609 traceGotConn(req, cc, reused)
610 res, err := cc.RoundTrip(req)
611 if err != nil && retry <= 6 {
612 roundTripErr := err
613 if req, err = shouldRetryRequest(req, err); err == nil {
614 // After the first retry, do exponential backoff with 10% jitter.
615 if retry == 0 {
616 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
617 continue
618 }
619 backoff := float64(uint(1) << (uint(retry) - 1))
620 backoff += backoff * (0.1 * mathrand.Float64())
621 d := time.Second * time.Duration(backoff)
622 tm := time.NewTimer(d)
623 select {
624 case <-tm.C:
625 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
626 continue
627 case <-req.Context().Done():
628 tm.Stop()
629 err = req.Context().Err()
630 }
631 }
632 }
633 if err == errClientConnNotEstablished {
634 // This ClientConn was created recently,
635 // this is the first request to use it,
636 // and the connection is closed and not usable.
637 //
638 // In this state, cc.idleTimer will remove the conn from the pool
639 // when it fires. Stop the timer and remove it here so future requests
640 // won't try to use this connection.
641 //
642 // If the timer has already fired and we're racing it, the redundant
643 // call to MarkDead is harmless.
644 if cc.idleTimer != nil {
645 cc.idleTimer.Stop()
646 }
647 t.connPool().MarkDead(cc)
648 }
649 if err != nil {
650 t.vlogf("RoundTrip failure: %v", err)
651 return nil, err
652 }
653 return res, nil
654 }
655 }
656 657 // CloseIdleConnections closes any connections which were previously
658 // connected from previous requests but are now sitting idle.
659 // It does not interrupt any connections currently in use.
660 func (t *Transport) CloseIdleConnections() {
661 if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
662 cp.closeIdleConnections()
663 }
664 }
665 666 var (
667 errClientConnClosed = errors.New("http2: client conn is closed")
668 errClientConnUnusable = errors.New("http2: client conn not usable")
669 errClientConnNotEstablished = errors.New("http2: client conn could not be established")
670 errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
671 errClientConnForceClosed = errors.New("http2: client connection force closed via ClientConn.Close")
672 )
673 674 // shouldRetryRequest is called by RoundTrip when a request fails to get
675 // response headers. It is always called with a non-nil error.
676 // It returns either a request to retry (either the same request, or a
677 // modified clone), or an error if the request can't be replayed.
678 func shouldRetryRequest(req *http.Request, err error) (*http.Request, error) {
679 if !canRetryError(err) {
680 return nil, err
681 }
682 // If the Body is nil (or http.NoBody), it's safe to reuse
683 // this request and its Body.
684 if req.Body == nil || req.Body == http.NoBody {
685 return req, nil
686 }
687 688 // If the request body can be reset back to its original
689 // state via the optional req.GetBody, do that.
690 if req.GetBody != nil {
691 body, err := req.GetBody()
692 if err != nil {
693 return nil, err
694 }
695 newReq := *req
696 newReq.Body = body
697 return &newReq, nil
698 }
699 700 // The Request.Body can't reset back to the beginning, but we
701 // don't seem to have started to read from it yet, so reuse
702 // the request directly.
703 if err == errClientConnUnusable {
704 return req, nil
705 }
706 707 return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
708 }
709 710 func canRetryError(err error) bool {
711 if err == errClientConnUnusable || err == errClientConnGotGoAway {
712 return true
713 }
714 if se, ok := err.(StreamError); ok {
715 if se.Code == ErrCodeProtocol && se.Cause == errFromPeer {
716 // See golang/go#47635, golang/go#42777
717 return true
718 }
719 return se.Code == ErrCodeRefusedStream
720 }
721 return false
722 }
723 724 func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
725 if t.transportTestHooks != nil {
726 return t.newClientConn(nil, singleUse, nil)
727 }
728 host, _, err := net.SplitHostPort(addr)
729 if err != nil {
730 return nil, err
731 }
732 tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
733 if err != nil {
734 return nil, err
735 }
736 return t.newClientConn(tconn, singleUse, nil)
737 }
738 739 func (t *Transport) newTLSConfig(host string) *tls.Config {
740 cfg := new(tls.Config)
741 if t.TLSClientConfig != nil {
742 *cfg = *t.TLSClientConfig.Clone()
743 }
744 if !strSliceContains(cfg.NextProtos, NextProtoTLS) {
745 cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
746 }
747 if cfg.ServerName == "" {
748 cfg.ServerName = host
749 }
750 return cfg
751 }
752 753 func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
754 if t.DialTLSContext != nil {
755 return t.DialTLSContext(ctx, network, addr, tlsCfg)
756 } else if t.DialTLS != nil {
757 return t.DialTLS(network, addr, tlsCfg)
758 }
759 760 tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
761 if err != nil {
762 return nil, err
763 }
764 state := tlsCn.ConnectionState()
765 if p := state.NegotiatedProtocol; p != NextProtoTLS {
766 return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
767 }
768 if !state.NegotiatedProtocolIsMutual {
769 return nil, errors.New("http2: could not negotiate protocol mutually")
770 }
771 return tlsCn, nil
772 }
773 774 // disableKeepAlives reports whether connections should be closed as
775 // soon as possible after handling the first request.
776 func (t *Transport) disableKeepAlives() bool {
777 return t.t1 != nil && t.t1.DisableKeepAlives
778 }
779 780 func (t *Transport) expectContinueTimeout() time.Duration {
781 if t.t1 == nil {
782 return 0
783 }
784 return t.t1.ExpectContinueTimeout
785 }
786 787 func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
788 return t.newClientConn(c, t.disableKeepAlives(), nil)
789 }
790 791 func (t *Transport) newClientConn(c net.Conn, singleUse bool, internalStateHook func()) (*ClientConn, error) {
792 conf := configFromTransport(t)
793 cc := &ClientConn{
794 t: t,
795 tconn: c,
796 readerDone: make(chan struct{}),
797 nextStreamID: 1,
798 maxFrameSize: 16 << 10, // spec default
799 initialWindowSize: 65535, // spec default
800 initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream,
801 maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
802 strictMaxConcurrentStreams: conf.StrictMaxConcurrentRequests,
803 peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
804 streams: make(map[uint32]*clientStream),
805 singleUse: singleUse,
806 seenSettingsChan: make(chan struct{}),
807 wantSettingsAck: true,
808 readIdleTimeout: conf.SendPingTimeout,
809 pingTimeout: conf.PingTimeout,
810 pings: make(map[[8]byte]chan struct{}),
811 reqHeaderMu: make(chan struct{}, 1),
812 lastActive: time.Now(),
813 internalStateHook: internalStateHook,
814 }
815 if t.transportTestHooks != nil {
816 t.transportTestHooks.newclientconn(cc)
817 c = cc.tconn
818 }
819 if VerboseLogs {
820 t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
821 }
822 823 cc.cond = sync.NewCond(&cc.mu)
824 cc.flow.add(int32(initialWindowSize))
825 826 // TODO: adjust this writer size to account for frame size +
827 // MTU + crypto/tls record padding.
828 cc.bw = bufio.NewWriter(stickyErrWriter{
829 conn: c,
830 timeout: conf.WriteByteTimeout,
831 err: &cc.werr,
832 })
833 cc.br = bufio.NewReader(c)
834 cc.fr = NewFramer(cc.bw, cc.br)
835 cc.fr.SetMaxReadFrameSize(conf.MaxReadFrameSize)
836 if t.CountError != nil {
837 cc.fr.countError = t.CountError
838 }
839 maxHeaderTableSize := conf.MaxDecoderHeaderTableSize
840 cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
841 cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
842 843 cc.henc = hpack.NewEncoder(&cc.hbuf)
844 cc.henc.SetMaxDynamicTableSizeLimit(conf.MaxEncoderHeaderTableSize)
845 cc.peerMaxHeaderTableSize = initialHeaderTableSize
846 847 if cs, ok := c.(connectionStater); ok {
848 state := cs.ConnectionState()
849 cc.tlsState = &state
850 }
851 852 initialSettings := []Setting{
853 {ID: SettingEnablePush, Val: 0},
854 {ID: SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
855 }
856 initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: conf.MaxReadFrameSize})
857 if max := t.maxHeaderListSize(); max != 0 {
858 initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
859 }
860 if maxHeaderTableSize != initialHeaderTableSize {
861 initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
862 }
863 864 cc.bw.Write(clientPreface)
865 cc.fr.WriteSettings(initialSettings...)
866 cc.fr.WriteWindowUpdate(0, uint32(conf.MaxUploadBufferPerConnection))
867 cc.inflow.init(conf.MaxUploadBufferPerConnection + initialWindowSize)
868 cc.bw.Flush()
869 if cc.werr != nil {
870 cc.Close()
871 return nil, cc.werr
872 }
873 874 // Start the idle timer after the connection is fully initialized.
875 if d := t.idleConnTimeout(); d != 0 {
876 cc.idleTimeout = d
877 cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
878 }
879 880 go cc.readLoop()
881 return cc, nil
882 }
883 884 func (cc *ClientConn) healthCheck() {
885 pingTimeout := cc.pingTimeout
886 // We don't need to periodically ping in the health check, because the readLoop of ClientConn will
887 // trigger the healthCheck again if there is no frame received.
888 ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
889 defer cancel()
890 cc.vlogf("http2: Transport sending health check")
891 err := cc.Ping(ctx)
892 if err != nil {
893 cc.vlogf("http2: Transport health check failure: %v", err)
894 cc.closeForLostPing()
895 } else {
896 cc.vlogf("http2: Transport health check success")
897 }
898 }
899 900 // SetDoNotReuse marks cc as not reusable for future HTTP requests.
901 func (cc *ClientConn) SetDoNotReuse() {
902 cc.mu.Lock()
903 defer cc.mu.Unlock()
904 cc.doNotReuse = true
905 }
906 907 func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
908 cc.mu.Lock()
909 defer cc.mu.Unlock()
910 911 old := cc.goAway
912 cc.goAway = f
913 914 // Merge the previous and current GoAway error frames.
915 if cc.goAwayDebug == "" {
916 cc.goAwayDebug = string(f.DebugData())
917 }
918 if old != nil && old.ErrCode != ErrCodeNo {
919 cc.goAway.ErrCode = old.ErrCode
920 }
921 last := f.LastStreamID
922 for streamID, cs := range cc.streams {
923 if streamID <= last {
924 // The server's GOAWAY indicates that it received this stream.
925 // It will either finish processing it, or close the connection
926 // without doing so. Either way, leave the stream alone for now.
927 continue
928 }
929 if streamID == 1 && cc.goAway.ErrCode != ErrCodeNo {
930 // Don't retry the first stream on a connection if we get a non-NO error.
931 // If the server is sending an error on a new connection,
932 // retrying the request on a new one probably isn't going to work.
933 cs.abortStreamLocked(fmt.Errorf("http2: Transport received GOAWAY from server ErrCode:%v", cc.goAway.ErrCode))
934 } else {
935 // Aborting the stream with errClentConnGotGoAway indicates that
936 // the request should be retried on a new connection.
937 cs.abortStreamLocked(errClientConnGotGoAway)
938 }
939 }
940 }
941 942 // CanTakeNewRequest reports whether the connection can take a new request,
943 // meaning it has not been closed or received or sent a GOAWAY.
944 //
945 // If the caller is going to immediately make a new request on this
946 // connection, use ReserveNewRequest instead.
947 func (cc *ClientConn) CanTakeNewRequest() bool {
948 cc.mu.Lock()
949 defer cc.mu.Unlock()
950 return cc.canTakeNewRequestLocked()
951 }
952 953 // ReserveNewRequest is like CanTakeNewRequest but also reserves a
954 // concurrent stream in cc. The reservation is decremented on the
955 // next call to RoundTrip.
956 func (cc *ClientConn) ReserveNewRequest() bool {
957 cc.mu.Lock()
958 defer cc.mu.Unlock()
959 if st := cc.idleStateLocked(); !st.canTakeNewRequest {
960 return false
961 }
962 cc.streamsReserved++
963 return true
964 }
965 966 // ClientConnState describes the state of a ClientConn.
967 type ClientConnState struct {
968 // Closed is whether the connection is closed.
969 Closed bool
970 971 // Closing is whether the connection is in the process of
972 // closing. It may be closing due to shutdown, being a
973 // single-use connection, being marked as DoNotReuse, or
974 // having received a GOAWAY frame.
975 Closing bool
976 977 // StreamsActive is how many streams are active.
978 StreamsActive int
979 980 // StreamsReserved is how many streams have been reserved via
981 // ClientConn.ReserveNewRequest.
982 StreamsReserved int
983 984 // StreamsPending is how many requests have been sent in excess
985 // of the peer's advertised MaxConcurrentStreams setting and
986 // are waiting for other streams to complete.
987 StreamsPending int
988 989 // MaxConcurrentStreams is how many concurrent streams the
990 // peer advertised as acceptable. Zero means no SETTINGS
991 // frame has been received yet.
992 MaxConcurrentStreams uint32
993 994 // LastIdle, if non-zero, is when the connection last
995 // transitioned to idle state.
996 LastIdle time.Time
997 }
998 999 // State returns a snapshot of cc's state.
1000 func (cc *ClientConn) State() ClientConnState {
1001 cc.wmu.Lock()
1002 maxConcurrent := cc.maxConcurrentStreams
1003 if !cc.seenSettings {
1004 maxConcurrent = 0
1005 }
1006 cc.wmu.Unlock()
1007 1008 cc.mu.Lock()
1009 defer cc.mu.Unlock()
1010 return ClientConnState{
1011 Closed: cc.closed,
1012 Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
1013 StreamsActive: len(cc.streams) + cc.pendingResets,
1014 StreamsReserved: cc.streamsReserved,
1015 StreamsPending: cc.pendingRequests,
1016 LastIdle: cc.lastIdle,
1017 MaxConcurrentStreams: maxConcurrent,
1018 }
1019 }
1020 1021 // clientConnIdleState describes the suitability of a client
1022 // connection to initiate a new RoundTrip request.
1023 type clientConnIdleState struct {
1024 canTakeNewRequest bool
1025 }
1026 1027 func (cc *ClientConn) idleState() clientConnIdleState {
1028 cc.mu.Lock()
1029 defer cc.mu.Unlock()
1030 return cc.idleStateLocked()
1031 }
1032 1033 func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
1034 if cc.singleUse && cc.nextStreamID > 1 {
1035 return
1036 }
1037 var maxConcurrentOkay bool
1038 if cc.strictMaxConcurrentStreams {
1039 // We'll tell the caller we can take a new request to
1040 // prevent the caller from dialing a new TCP
1041 // connection, but then we'll block later before
1042 // writing it.
1043 maxConcurrentOkay = true
1044 } else {
1045 // We can take a new request if the total of
1046 // - active streams;
1047 // - reservation slots for new streams; and
1048 // - streams for which we have sent a RST_STREAM and a PING,
1049 // but received no subsequent frame
1050 // is less than the concurrency limit.
1051 maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
1052 }
1053 1054 st.canTakeNewRequest = maxConcurrentOkay && cc.isUsableLocked()
1055 1056 // If this connection has never been used for a request and is closed,
1057 // then let it take a request (which will fail).
1058 // If the conn was closed for idleness, we're racing the idle timer;
1059 // don't try to use the conn. (Issue #70515.)
1060 //
1061 // This avoids a situation where an error early in a connection's lifetime
1062 // goes unreported.
1063 if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed && !cc.closedOnIdle {
1064 st.canTakeNewRequest = true
1065 }
1066 1067 return
1068 }
1069 1070 func (cc *ClientConn) isUsableLocked() bool {
1071 return cc.goAway == nil &&
1072 !cc.closed &&
1073 !cc.closing &&
1074 !cc.doNotReuse &&
1075 int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
1076 !cc.tooIdleLocked()
1077 }
1078 1079 // canReserveLocked reports whether a net/http.ClientConn can reserve a slot on this conn.
1080 //
1081 // This follows slightly different rules than clientConnIdleState.canTakeNewRequest.
1082 // We only permit reservations up to the conn's concurrency limit.
1083 // This differs from ClientConn.ReserveNewRequest, which permits reservations
1084 // past the limit when StrictMaxConcurrentStreams is set.
1085 func (cc *ClientConn) canReserveLocked() bool {
1086 if cc.currentRequestCountLocked() >= int(cc.maxConcurrentStreams) {
1087 return false
1088 }
1089 if !cc.isUsableLocked() {
1090 return false
1091 }
1092 return true
1093 }
1094 1095 // currentRequestCountLocked reports the number of concurrency slots currently in use,
1096 // including active streams, reserved slots, and reset streams waiting for acknowledgement.
1097 func (cc *ClientConn) currentRequestCountLocked() int {
1098 return len(cc.streams) + cc.streamsReserved + cc.pendingResets
1099 }
1100 1101 func (cc *ClientConn) canTakeNewRequestLocked() bool {
1102 st := cc.idleStateLocked()
1103 return st.canTakeNewRequest
1104 }
1105 1106 // availableLocked reports the number of concurrency slots available.
1107 func (cc *ClientConn) availableLocked() int {
1108 if !cc.canTakeNewRequestLocked() {
1109 return 0
1110 }
1111 return max(0, int(cc.maxConcurrentStreams)-cc.currentRequestCountLocked())
1112 }
1113 1114 // tooIdleLocked reports whether this connection has been been sitting idle
1115 // for too much wall time.
1116 func (cc *ClientConn) tooIdleLocked() bool {
1117 // The Round(0) strips the monontonic clock reading so the
1118 // times are compared based on their wall time. We don't want
1119 // to reuse a connection that's been sitting idle during
1120 // VM/laptop suspend if monotonic time was also frozen.
1121 return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
1122 }
1123 1124 // onIdleTimeout is called from a time.AfterFunc goroutine. It will
1125 // only be called when we're idle, but because we're coming from a new
1126 // goroutine, there could be a new request coming in at the same time,
1127 // so this simply calls the synchronized closeIfIdle to shut down this
1128 // connection. The timer could just call closeIfIdle, but this is more
1129 // clear.
1130 func (cc *ClientConn) onIdleTimeout() {
1131 cc.closeIfIdle()
1132 }
1133 1134 func (cc *ClientConn) closeConn() {
1135 t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
1136 defer t.Stop()
1137 cc.tconn.Close()
1138 cc.maybeCallStateHook()
1139 }
1140 1141 // A tls.Conn.Close can hang for a long time if the peer is unresponsive.
1142 // Try to shut it down more aggressively.
1143 func (cc *ClientConn) forceCloseConn() {
1144 tc, ok := cc.tconn.(*tls.Conn)
1145 if !ok {
1146 return
1147 }
1148 if nc := tc.NetConn(); nc != nil {
1149 nc.Close()
1150 }
1151 }
1152 1153 func (cc *ClientConn) closeIfIdle() {
1154 cc.mu.Lock()
1155 if len(cc.streams) > 0 || cc.streamsReserved > 0 {
1156 cc.mu.Unlock()
1157 return
1158 }
1159 cc.closed = true
1160 cc.closedOnIdle = true
1161 nextID := cc.nextStreamID
1162 // TODO: do clients send GOAWAY too? maybe? Just Close:
1163 cc.mu.Unlock()
1164 1165 if VerboseLogs {
1166 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
1167 }
1168 cc.closeConn()
1169 }
1170 1171 func (cc *ClientConn) isDoNotReuseAndIdle() bool {
1172 cc.mu.Lock()
1173 defer cc.mu.Unlock()
1174 return cc.doNotReuse && len(cc.streams) == 0
1175 }
1176 1177 var shutdownEnterWaitStateHook = func() {}
1178 1179 // Shutdown gracefully closes the client connection, waiting for running streams to complete.
1180 func (cc *ClientConn) Shutdown(ctx context.Context) error {
1181 if err := cc.sendGoAway(); err != nil {
1182 return err
1183 }
1184 // Wait for all in-flight streams to complete or connection to close
1185 done := make(chan struct{})
1186 cancelled := false // guarded by cc.mu
1187 go func() {
1188 cc.mu.Lock()
1189 defer cc.mu.Unlock()
1190 for {
1191 if len(cc.streams) == 0 || cc.closed {
1192 cc.closed = true
1193 close(done)
1194 break
1195 }
1196 if cancelled {
1197 break
1198 }
1199 cc.cond.Wait()
1200 }
1201 }()
1202 shutdownEnterWaitStateHook()
1203 select {
1204 case <-done:
1205 cc.closeConn()
1206 return nil
1207 case <-ctx.Done():
1208 cc.mu.Lock()
1209 // Free the goroutine above
1210 cancelled = true
1211 cc.cond.Broadcast()
1212 cc.mu.Unlock()
1213 return ctx.Err()
1214 }
1215 }
1216 1217 func (cc *ClientConn) sendGoAway() error {
1218 cc.mu.Lock()
1219 closing := cc.closing
1220 cc.closing = true
1221 maxStreamID := cc.nextStreamID
1222 cc.mu.Unlock()
1223 if closing {
1224 // GOAWAY sent already
1225 return nil
1226 }
1227 1228 cc.wmu.Lock()
1229 defer cc.wmu.Unlock()
1230 // Send a graceful shutdown frame to server
1231 if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
1232 return err
1233 }
1234 if err := cc.bw.Flush(); err != nil {
1235 return err
1236 }
1237 // Prevent new requests
1238 return nil
1239 }
1240 1241 // closes the client connection immediately. In-flight requests are interrupted.
1242 // err is sent to streams.
1243 func (cc *ClientConn) closeForError(err error) {
1244 cc.mu.Lock()
1245 cc.closed = true
1246 for _, cs := range cc.streams {
1247 cs.abortStreamLocked(err)
1248 }
1249 cc.cond.Broadcast()
1250 cc.mu.Unlock()
1251 cc.closeConn()
1252 }
1253 1254 // Close closes the client connection immediately.
1255 //
1256 // In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
1257 func (cc *ClientConn) Close() error {
1258 cc.closeForError(errClientConnForceClosed)
1259 return nil
1260 }
1261 1262 // closes the client connection immediately. In-flight requests are interrupted.
1263 func (cc *ClientConn) closeForLostPing() {
1264 err := errors.New("http2: client connection lost")
1265 if f := cc.t.CountError; f != nil {
1266 f("conn_close_lost_ping")
1267 }
1268 cc.closeForError(err)
1269 }
1270 1271 // errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
1272 // exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
1273 var errRequestCanceled = errors.New("net/http: request canceled")
1274 1275 func (cc *ClientConn) responseHeaderTimeout() time.Duration {
1276 if cc.t.t1 != nil {
1277 return cc.t.t1.ResponseHeaderTimeout
1278 }
1279 // No way to do this (yet?) with just an http2.Transport. Probably
1280 // no need. Request.Cancel this is the new way. We only need to support
1281 // this for compatibility with the old http.Transport fields when
1282 // we're doing transparent http2.
1283 return 0
1284 }
1285 1286 // actualContentLength returns a sanitized version of
1287 // req.ContentLength, where 0 actually means zero (not unknown) and -1
1288 // means unknown.
1289 func actualContentLength(req *http.Request) int64 {
1290 if req.Body == nil || req.Body == http.NoBody {
1291 return 0
1292 }
1293 if req.ContentLength != 0 {
1294 return req.ContentLength
1295 }
1296 return -1
1297 }
1298 1299 func (cc *ClientConn) decrStreamReservations() {
1300 cc.mu.Lock()
1301 defer cc.mu.Unlock()
1302 cc.decrStreamReservationsLocked()
1303 }
1304 1305 func (cc *ClientConn) decrStreamReservationsLocked() {
1306 if cc.streamsReserved > 0 {
1307 cc.streamsReserved--
1308 }
1309 }
1310 1311 func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
1312 return cc.roundTrip(req, nil)
1313 }
1314 1315 func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream)) (*http.Response, error) {
1316 ctx := req.Context()
1317 cs := &clientStream{
1318 cc: cc,
1319 ctx: ctx,
1320 reqCancel: req.Cancel,
1321 isHead: req.Method == "HEAD",
1322 reqBody: req.Body,
1323 reqBodyContentLength: actualContentLength(req),
1324 trace: httptrace.ContextClientTrace(ctx),
1325 peerClosed: make(chan struct{}),
1326 abort: make(chan struct{}),
1327 respHeaderRecv: make(chan struct{}),
1328 donec: make(chan struct{}),
1329 }
1330 1331 cs.requestedGzip = httpcommon.IsRequestGzip(req.Method, req.Header, cc.t.disableCompression())
1332 1333 go cs.doRequest(req, streamf)
1334 1335 waitDone := func() error {
1336 select {
1337 case <-cs.donec:
1338 return nil
1339 case <-ctx.Done():
1340 return ctx.Err()
1341 case <-cs.reqCancel:
1342 return errRequestCanceled
1343 }
1344 }
1345 1346 handleResponseHeaders := func() (*http.Response, error) {
1347 res := cs.res
1348 if res.StatusCode > 299 {
1349 // On error or status code 3xx, 4xx, 5xx, etc abort any
1350 // ongoing write, assuming that the server doesn't care
1351 // about our request body. If the server replied with 1xx or
1352 // 2xx, however, then assume the server DOES potentially
1353 // want our body (e.g. full-duplex streaming:
1354 // golang.org/issue/13444). If it turns out the server
1355 // doesn't, they'll RST_STREAM us soon enough. This is a
1356 // heuristic to avoid adding knobs to Transport. Hopefully
1357 // we can keep it.
1358 cs.abortRequestBodyWrite()
1359 }
1360 res.Request = req
1361 res.TLS = cc.tlsState
1362 if res.Body == noBody && actualContentLength(req) == 0 {
1363 // If there isn't a request or response body still being
1364 // written, then wait for the stream to be closed before
1365 // RoundTrip returns.
1366 if err := waitDone(); err != nil {
1367 return nil, err
1368 }
1369 }
1370 return res, nil
1371 }
1372 1373 cancelRequest := func(cs *clientStream, err error) error {
1374 cs.cc.mu.Lock()
1375 bodyClosed := cs.reqBodyClosed
1376 cs.cc.mu.Unlock()
1377 // Wait for the request body to be closed.
1378 //
1379 // If nothing closed the body before now, abortStreamLocked
1380 // will have started a goroutine to close it.
1381 //
1382 // Closing the body before returning avoids a race condition
1383 // with net/http checking its readTrackingBody to see if the
1384 // body was read from or closed. See golang/go#60041.
1385 //
1386 // The body is closed in a separate goroutine without the
1387 // connection mutex held, but dropping the mutex before waiting
1388 // will keep us from holding it indefinitely if the body
1389 // close is slow for some reason.
1390 if bodyClosed != nil {
1391 <-bodyClosed
1392 }
1393 return err
1394 }
1395 1396 for {
1397 select {
1398 case <-cs.respHeaderRecv:
1399 return handleResponseHeaders()
1400 case <-cs.abort:
1401 select {
1402 case <-cs.respHeaderRecv:
1403 // If both cs.respHeaderRecv and cs.abort are signaling,
1404 // pick respHeaderRecv. The server probably wrote the
1405 // response and immediately reset the stream.
1406 // golang.org/issue/49645
1407 return handleResponseHeaders()
1408 default:
1409 waitDone()
1410 return nil, cs.abortErr
1411 }
1412 case <-ctx.Done():
1413 err := ctx.Err()
1414 cs.abortStream(err)
1415 return nil, cancelRequest(cs, err)
1416 case <-cs.reqCancel:
1417 cs.abortStream(errRequestCanceled)
1418 return nil, cancelRequest(cs, errRequestCanceled)
1419 }
1420 }
1421 }
1422 1423 // doRequest runs for the duration of the request lifetime.
1424 //
1425 // It sends the request and performs post-request cleanup (closing Request.Body, etc.).
1426 func (cs *clientStream) doRequest(req *http.Request, streamf func(*clientStream)) {
1427 err := cs.writeRequest(req, streamf)
1428 cs.cleanupWriteRequest(err)
1429 }
1430 1431 var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")
1432 1433 // writeRequest sends a request.
1434 //
1435 // It returns nil after the request is written, the response read,
1436 // and the request stream is half-closed by the peer.
1437 //
1438 // It returns non-nil if the request ends otherwise.
1439 // If the returned error is StreamError, the error Code may be used in resetting the stream.
1440 func (cs *clientStream) writeRequest(req *http.Request, streamf func(*clientStream)) (err error) {
1441 cc := cs.cc
1442 ctx := cs.ctx
1443 1444 // wait for setting frames to be received, a server can change this value later,
1445 // but we just wait for the first settings frame
1446 var isExtendedConnect bool
1447 if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
1448 isExtendedConnect = true
1449 }
1450 1451 // Acquire the new-request lock by writing to reqHeaderMu.
1452 // This lock guards the critical section covering allocating a new stream ID
1453 // (requires mu) and creating the stream (requires wmu).
1454 if cc.reqHeaderMu == nil {
1455 panic("RoundTrip on uninitialized ClientConn") // for tests
1456 }
1457 if isExtendedConnect {
1458 select {
1459 case <-cs.reqCancel:
1460 return errRequestCanceled
1461 case <-ctx.Done():
1462 return ctx.Err()
1463 case <-cc.seenSettingsChan:
1464 if !cc.extendedConnectAllowed {
1465 return errExtendedConnectNotSupported
1466 }
1467 }
1468 }
1469 select {
1470 case cc.reqHeaderMu <- struct{}{}:
1471 case <-cs.reqCancel:
1472 return errRequestCanceled
1473 case <-ctx.Done():
1474 return ctx.Err()
1475 }
1476 1477 cc.mu.Lock()
1478 if cc.idleTimer != nil {
1479 cc.idleTimer.Stop()
1480 }
1481 cc.decrStreamReservationsLocked()
1482 if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
1483 cc.mu.Unlock()
1484 <-cc.reqHeaderMu
1485 return err
1486 }
1487 cc.addStreamLocked(cs) // assigns stream ID
1488 if isConnectionCloseRequest(req) {
1489 cc.doNotReuse = true
1490 }
1491 cc.mu.Unlock()
1492 1493 if streamf != nil {
1494 streamf(cs)
1495 }
1496 1497 continueTimeout := cc.t.expectContinueTimeout()
1498 if continueTimeout != 0 {
1499 if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
1500 continueTimeout = 0
1501 } else {
1502 cs.on100 = make(chan struct{}, 1)
1503 }
1504 }
1505 1506 // Past this point (where we send request headers), it is possible for
1507 // RoundTrip to return successfully. Since the RoundTrip contract permits
1508 // the caller to "mutate or reuse" the Request after closing the Response's Body,
1509 // we must take care when referencing the Request from here on.
1510 err = cs.encodeAndWriteHeaders(req)
1511 <-cc.reqHeaderMu
1512 if err != nil {
1513 return err
1514 }
1515 1516 hasBody := cs.reqBodyContentLength != 0
1517 if !hasBody {
1518 cs.sentEndStream = true
1519 } else {
1520 if continueTimeout != 0 {
1521 traceWait100Continue(cs.trace)
1522 timer := time.NewTimer(continueTimeout)
1523 select {
1524 case <-timer.C:
1525 err = nil
1526 case <-cs.on100:
1527 err = nil
1528 case <-cs.abort:
1529 err = cs.abortErr
1530 case <-ctx.Done():
1531 err = ctx.Err()
1532 case <-cs.reqCancel:
1533 err = errRequestCanceled
1534 }
1535 timer.Stop()
1536 if err != nil {
1537 traceWroteRequest(cs.trace, err)
1538 return err
1539 }
1540 }
1541 1542 if err = cs.writeRequestBody(req); err != nil {
1543 if err != errStopReqBodyWrite {
1544 traceWroteRequest(cs.trace, err)
1545 return err
1546 }
1547 } else {
1548 cs.sentEndStream = true
1549 }
1550 }
1551 1552 traceWroteRequest(cs.trace, err)
1553 1554 var respHeaderTimer <-chan time.Time
1555 var respHeaderRecv chan struct{}
1556 if d := cc.responseHeaderTimeout(); d != 0 {
1557 timer := time.NewTimer(d)
1558 defer timer.Stop()
1559 respHeaderTimer = timer.C
1560 respHeaderRecv = cs.respHeaderRecv
1561 }
1562 // Wait until the peer half-closes its end of the stream,
1563 // or until the request is aborted (via context, error, or otherwise),
1564 // whichever comes first.
1565 for {
1566 select {
1567 case <-cs.peerClosed:
1568 return nil
1569 case <-respHeaderTimer:
1570 return errTimeout
1571 case <-respHeaderRecv:
1572 respHeaderRecv = nil
1573 respHeaderTimer = nil // keep waiting for END_STREAM
1574 case <-cs.abort:
1575 return cs.abortErr
1576 case <-ctx.Done():
1577 return ctx.Err()
1578 case <-cs.reqCancel:
1579 return errRequestCanceled
1580 }
1581 }
1582 }
1583 1584 func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
1585 cc := cs.cc
1586 ctx := cs.ctx
1587 1588 cc.wmu.Lock()
1589 defer cc.wmu.Unlock()
1590 1591 // If the request was canceled while waiting for cc.mu, just quit.
1592 select {
1593 case <-cs.abort:
1594 return cs.abortErr
1595 case <-ctx.Done():
1596 return ctx.Err()
1597 case <-cs.reqCancel:
1598 return errRequestCanceled
1599 default:
1600 }
1601 1602 // Encode headers.
1603 //
1604 // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
1605 // sent by writeRequestBody below, along with any Trailers,
1606 // again in form HEADERS{1}, CONTINUATION{0,})
1607 cc.hbuf.Reset()
1608 res, err := encodeRequestHeaders(req, cs.requestedGzip, cc.peerMaxHeaderListSize, func(name, value string) {
1609 cc.writeHeader(name, value)
1610 })
1611 if err != nil {
1612 return fmt.Errorf("http2: %w", err)
1613 }
1614 hdrs := cc.hbuf.Bytes()
1615 1616 // Write the request.
1617 endStream := !res.HasBody && !res.HasTrailers
1618 cs.sentHeaders = true
1619 err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
1620 traceWroteHeaders(cs.trace)
1621 return err
1622 }
1623 1624 func encodeRequestHeaders(req *http.Request, addGzipHeader bool, peerMaxHeaderListSize uint64, headerf func(name, value string)) (httpcommon.EncodeHeadersResult, error) {
1625 return httpcommon.EncodeHeaders(req.Context(), httpcommon.EncodeHeadersParam{
1626 Request: httpcommon.Request{
1627 Header: req.Header,
1628 Trailer: req.Trailer,
1629 URL: req.URL,
1630 Host: req.Host,
1631 Method: req.Method,
1632 ActualContentLength: actualContentLength(req),
1633 },
1634 AddGzipHeader: addGzipHeader,
1635 PeerMaxHeaderListSize: peerMaxHeaderListSize,
1636 DefaultUserAgent: defaultUserAgent,
1637 }, headerf)
1638 }
1639 1640 // cleanupWriteRequest performs post-request tasks.
1641 //
1642 // If err (the result of writeRequest) is non-nil and the stream is not closed,
1643 // cleanupWriteRequest will send a reset to the peer.
1644 func (cs *clientStream) cleanupWriteRequest(err error) {
1645 cc := cs.cc
1646 1647 if cs.ID == 0 {
1648 // We were canceled before creating the stream, so return our reservation.
1649 cc.decrStreamReservations()
1650 }
1651 1652 // TODO: write h12Compare test showing whether
1653 // Request.Body is closed by the Transport,
1654 // and in multiple cases: server replies <=299 and >299
1655 // while still writing request body
1656 cc.mu.Lock()
1657 mustCloseBody := false
1658 if cs.reqBody != nil && cs.reqBodyClosed == nil {
1659 mustCloseBody = true
1660 cs.reqBodyClosed = make(chan struct{})
1661 }
1662 bodyClosed := cs.reqBodyClosed
1663 closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
1664 // Have we read any frames from the connection since sending this request?
1665 readSinceStream := cc.readBeforeStreamID > cs.ID
1666 cc.mu.Unlock()
1667 if mustCloseBody {
1668 cs.reqBody.Close()
1669 close(bodyClosed)
1670 }
1671 if bodyClosed != nil {
1672 <-bodyClosed
1673 }
1674 1675 if err != nil && cs.sentEndStream {
1676 // If the connection is closed immediately after the response is read,
1677 // we may be aborted before finishing up here. If the stream was closed
1678 // cleanly on both sides, there is no error.
1679 select {
1680 case <-cs.peerClosed:
1681 err = nil
1682 default:
1683 }
1684 }
1685 if err != nil {
1686 cs.abortStream(err) // possibly redundant, but harmless
1687 if cs.sentHeaders {
1688 if se, ok := err.(StreamError); ok {
1689 if se.Cause != errFromPeer {
1690 cc.writeStreamReset(cs.ID, se.Code, false, err)
1691 }
1692 } else {
1693 // We're cancelling an in-flight request.
1694 //
1695 // This could be due to the server becoming unresponsive.
1696 // To avoid sending too many requests on a dead connection,
1697 // if we haven't read any frames from the connection since
1698 // sending this request, we let it continue to consume
1699 // a concurrency slot until we can confirm the server is
1700 // still responding.
1701 // We do this by sending a PING frame along with the RST_STREAM
1702 // (unless a ping is already in flight).
1703 //
1704 // For simplicity, we don't bother tracking the PING payload:
1705 // We reset cc.pendingResets any time we receive a PING ACK.
1706 //
1707 // We skip this if the conn is going to be closed on idle,
1708 // because it's short lived and will probably be closed before
1709 // we get the ping response.
1710 ping := false
1711 if !closeOnIdle && !readSinceStream {
1712 cc.mu.Lock()
1713 // rstStreamPingsBlocked works around a gRPC behavior:
1714 // see comment on the field for details.
1715 if !cc.rstStreamPingsBlocked {
1716 if cc.pendingResets == 0 {
1717 ping = true
1718 }
1719 cc.pendingResets++
1720 }
1721 cc.mu.Unlock()
1722 }
1723 cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
1724 }
1725 }
1726 cs.bufPipe.CloseWithError(err) // no-op if already closed
1727 } else {
1728 if cs.sentHeaders && !cs.sentEndStream {
1729 cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
1730 }
1731 cs.bufPipe.CloseWithError(errRequestCanceled)
1732 }
1733 if cs.ID != 0 {
1734 cc.forgetStreamID(cs.ID)
1735 }
1736 1737 cc.wmu.Lock()
1738 werr := cc.werr
1739 cc.wmu.Unlock()
1740 if werr != nil {
1741 cc.Close()
1742 }
1743 1744 close(cs.donec)
1745 cc.maybeCallStateHook()
1746 }
1747 1748 // awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams.
1749 // Must hold cc.mu.
1750 func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
1751 for {
1752 if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
1753 // This is the very first request sent to this connection.
1754 // Return a fatal error which aborts the retry loop.
1755 return errClientConnNotEstablished
1756 }
1757 cc.lastActive = time.Now()
1758 if cc.closed || !cc.canTakeNewRequestLocked() {
1759 return errClientConnUnusable
1760 }
1761 cc.lastIdle = time.Time{}
1762 if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
1763 return nil
1764 }
1765 cc.pendingRequests++
1766 cc.cond.Wait()
1767 cc.pendingRequests--
1768 select {
1769 case <-cs.abort:
1770 return cs.abortErr
1771 default:
1772 }
1773 }
1774 }
1775 1776 // requires cc.wmu be held
1777 func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
1778 first := true // first frame written (HEADERS is first, then CONTINUATION)
1779 for len(hdrs) > 0 && cc.werr == nil {
1780 chunk := hdrs
1781 if len(chunk) > maxFrameSize {
1782 chunk = chunk[:maxFrameSize]
1783 }
1784 hdrs = hdrs[len(chunk):]
1785 endHeaders := len(hdrs) == 0
1786 if first {
1787 cc.fr.WriteHeaders(HeadersFrameParam{
1788 StreamID: streamID,
1789 BlockFragment: chunk,
1790 EndStream: endStream,
1791 EndHeaders: endHeaders,
1792 })
1793 first = false
1794 } else {
1795 cc.fr.WriteContinuation(streamID, endHeaders, chunk)
1796 }
1797 }
1798 cc.bw.Flush()
1799 return cc.werr
1800 }
1801 1802 // internal error values; they don't escape to callers
1803 var (
1804 // abort request body write; don't send cancel
1805 errStopReqBodyWrite = errors.New("http2: aborting request body write")
1806 1807 // abort request body write, but send stream reset of cancel.
1808 errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
1809 1810 errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
1811 )
1812 1813 // frameScratchBufferLen returns the length of a buffer to use for
1814 // outgoing request bodies to read/write to/from.
1815 //
1816 // It returns max(1, min(peer's advertised max frame size,
1817 // Request.ContentLength+1, 512KB)).
1818 func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
1819 const max = 512 << 10
1820 n := int64(maxFrameSize)
1821 if n > max {
1822 n = max
1823 }
1824 if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
1825 // Add an extra byte past the declared content-length to
1826 // give the caller's Request.Body io.Reader a chance to
1827 // give us more bytes than they declared, so we can catch it
1828 // early.
1829 n = cl + 1
1830 }
1831 if n < 1 {
1832 return 1
1833 }
1834 return int(n) // doesn't truncate; max is 512K
1835 }
1836 1837 // Seven bufPools manage different frame sizes. This helps to avoid scenarios where long-running
1838 // streaming requests using small frame sizes occupy large buffers initially allocated for prior
1839 // requests needing big buffers. The size ranges are as follows:
1840 // {0 KB, 16 KB], {16 KB, 32 KB], {32 KB, 64 KB], {64 KB, 128 KB], {128 KB, 256 KB],
1841 // {256 KB, 512 KB], {512 KB, infinity}
1842 // In practice, the maximum scratch buffer size should not exceed 512 KB due to
1843 // frameScratchBufferLen(maxFrameSize), thus the "infinity pool" should never be used.
1844 // It exists mainly as a safety measure, for potential future increases in max buffer size.
1845 var bufPools [7]sync.Pool // of *[]byte
1846 func bufPoolIndex(size int) int {
1847 if size <= 16384 {
1848 return 0
1849 }
1850 size -= 1
1851 bits := bits.Len(uint(size))
1852 index := bits - 14
1853 if index >= len(bufPools) {
1854 return len(bufPools) - 1
1855 }
1856 return index
1857 }
1858 1859 func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
1860 cc := cs.cc
1861 body := cs.reqBody
1862 sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
1863 1864 hasTrailers := req.Trailer != nil
1865 remainLen := cs.reqBodyContentLength
1866 hasContentLen := remainLen != -1
1867 1868 cc.mu.Lock()
1869 maxFrameSize := int(cc.maxFrameSize)
1870 cc.mu.Unlock()
1871 1872 // Scratch buffer for reading into & writing from.
1873 scratchLen := cs.frameScratchBufferLen(maxFrameSize)
1874 var buf []byte
1875 index := bufPoolIndex(scratchLen)
1876 if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
1877 defer bufPools[index].Put(bp)
1878 buf = *bp
1879 } else {
1880 buf = make([]byte, scratchLen)
1881 defer bufPools[index].Put(&buf)
1882 }
1883 1884 var sawEOF bool
1885 for !sawEOF {
1886 n, err := body.Read(buf)
1887 if hasContentLen {
1888 remainLen -= int64(n)
1889 if remainLen == 0 && err == nil {
1890 // The request body's Content-Length was predeclared and
1891 // we just finished reading it all, but the underlying io.Reader
1892 // returned the final chunk with a nil error (which is one of
1893 // the two valid things a Reader can do at EOF). Because we'd prefer
1894 // to send the END_STREAM bit early, double-check that we're actually
1895 // at EOF. Subsequent reads should return (0, EOF) at this point.
1896 // If either value is different, we return an error in one of two ways below.
1897 var scratch [1]byte
1898 var n1 int
1899 n1, err = body.Read(scratch[:])
1900 remainLen -= int64(n1)
1901 }
1902 if remainLen < 0 {
1903 err = errReqBodyTooLong
1904 return err
1905 }
1906 }
1907 if err != nil {
1908 cc.mu.Lock()
1909 bodyClosed := cs.reqBodyClosed != nil
1910 cc.mu.Unlock()
1911 switch {
1912 case bodyClosed:
1913 return errStopReqBodyWrite
1914 case err == io.EOF:
1915 sawEOF = true
1916 err = nil
1917 default:
1918 return err
1919 }
1920 }
1921 1922 remain := buf[:n]
1923 for len(remain) > 0 && err == nil {
1924 var allowed int32
1925 allowed, err = cs.awaitFlowControl(len(remain))
1926 if err != nil {
1927 return err
1928 }
1929 cc.wmu.Lock()
1930 data := remain[:allowed]
1931 remain = remain[allowed:]
1932 sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
1933 err = cc.fr.WriteData(cs.ID, sentEnd, data)
1934 if err == nil {
1935 // TODO(bradfitz): this flush is for latency, not bandwidth.
1936 // Most requests won't need this. Make this opt-in or
1937 // opt-out? Use some heuristic on the body type? Nagel-like
1938 // timers? Based on 'n'? Only last chunk of this for loop,
1939 // unless flow control tokens are low? For now, always.
1940 // If we change this, see comment below.
1941 err = cc.bw.Flush()
1942 }
1943 cc.wmu.Unlock()
1944 }
1945 if err != nil {
1946 return err
1947 }
1948 }
1949 1950 if sentEnd {
1951 // Already sent END_STREAM (which implies we have no
1952 // trailers) and flushed, because currently all
1953 // WriteData frames above get a flush. So we're done.
1954 return nil
1955 }
1956 1957 // Since the RoundTrip contract permits the caller to "mutate or reuse"
1958 // a request after the Response's Body is closed, verify that this hasn't
1959 // happened before accessing the trailers.
1960 cc.mu.Lock()
1961 trailer := req.Trailer
1962 err = cs.abortErr
1963 cc.mu.Unlock()
1964 if err != nil {
1965 return err
1966 }
1967 1968 cc.wmu.Lock()
1969 defer cc.wmu.Unlock()
1970 var trls []byte
1971 if len(trailer) > 0 {
1972 trls, err = cc.encodeTrailers(trailer)
1973 if err != nil {
1974 return err
1975 }
1976 }
1977 1978 // Two ways to send END_STREAM: either with trailers, or
1979 // with an empty DATA frame.
1980 if len(trls) > 0 {
1981 err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
1982 } else {
1983 err = cc.fr.WriteData(cs.ID, true, nil)
1984 }
1985 if ferr := cc.bw.Flush(); ferr != nil && err == nil {
1986 err = ferr
1987 }
1988 return err
1989 }
1990 1991 // awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow
1992 // control tokens from the server.
1993 // It returns either the non-zero number of tokens taken or an error
1994 // if the stream is dead.
1995 func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
1996 cc := cs.cc
1997 ctx := cs.ctx
1998 cc.mu.Lock()
1999 defer cc.mu.Unlock()
2000 for {
2001 if cc.closed {
2002 return 0, errClientConnClosed
2003 }
2004 if cs.reqBodyClosed != nil {
2005 return 0, errStopReqBodyWrite
2006 }
2007 select {
2008 case <-cs.abort:
2009 return 0, cs.abortErr
2010 case <-ctx.Done():
2011 return 0, ctx.Err()
2012 case <-cs.reqCancel:
2013 return 0, errRequestCanceled
2014 default:
2015 }
2016 if a := cs.flow.available(); a > 0 {
2017 take := a
2018 if int(take) > maxBytes {
2019 2020 take = int32(maxBytes) // can't truncate int; take is int32
2021 }
2022 if take > int32(cc.maxFrameSize) {
2023 take = int32(cc.maxFrameSize)
2024 }
2025 cs.flow.take(take)
2026 return take, nil
2027 }
2028 cc.cond.Wait()
2029 }
2030 }
2031 2032 // requires cc.wmu be held.
2033 func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
2034 cc.hbuf.Reset()
2035 2036 hlSize := uint64(0)
2037 for k, vv := range trailer {
2038 for _, v := range vv {
2039 hf := hpack.HeaderField{Name: k, Value: v}
2040 hlSize += uint64(hf.Size())
2041 }
2042 }
2043 if hlSize > cc.peerMaxHeaderListSize {
2044 return nil, errRequestHeaderListSize
2045 }
2046 2047 for k, vv := range trailer {
2048 lowKey, ascii := httpcommon.LowerHeader(k)
2049 if !ascii {
2050 // Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
2051 // field names have to be ASCII characters (just as in HTTP/1.x).
2052 continue
2053 }
2054 // Transfer-Encoding, etc.. have already been filtered at the
2055 // start of RoundTrip
2056 for _, v := range vv {
2057 cc.writeHeader(lowKey, v)
2058 }
2059 }
2060 return cc.hbuf.Bytes(), nil
2061 }
2062 2063 func (cc *ClientConn) writeHeader(name, value string) {
2064 if VerboseLogs {
2065 log.Printf("http2: Transport encoding header %q = %q", name, value)
2066 }
2067 cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
2068 }
2069 2070 type resAndError struct {
2071 _ incomparable
2072 res *http.Response
2073 err error
2074 }
2075 2076 // requires cc.mu be held.
2077 func (cc *ClientConn) addStreamLocked(cs *clientStream) {
2078 cs.flow.add(int32(cc.initialWindowSize))
2079 cs.flow.setConnFlow(&cc.flow)
2080 cs.inflow.init(cc.initialStreamRecvWindowSize)
2081 cs.ID = cc.nextStreamID
2082 cc.nextStreamID += 2
2083 cc.streams[cs.ID] = cs
2084 if cs.ID == 0 {
2085 panic("assigned stream ID 0")
2086 }
2087 }
2088 2089 func (cc *ClientConn) forgetStreamID(id uint32) {
2090 cc.mu.Lock()
2091 slen := len(cc.streams)
2092 delete(cc.streams, id)
2093 if len(cc.streams) != slen-1 {
2094 panic("forgetting unknown stream id")
2095 }
2096 cc.lastActive = time.Now()
2097 if len(cc.streams) == 0 && cc.idleTimer != nil {
2098 cc.idleTimer.Reset(cc.idleTimeout)
2099 cc.lastIdle = time.Now()
2100 }
2101 // Wake up writeRequestBody via clientStream.awaitFlowControl and
2102 // wake up RoundTrip if there is a pending request.
2103 cc.cond.Broadcast()
2104 2105 closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
2106 if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
2107 if VerboseLogs {
2108 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
2109 }
2110 cc.closed = true
2111 defer cc.closeConn()
2112 }
2113 2114 cc.mu.Unlock()
2115 }
2116 2117 // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
2118 type clientConnReadLoop struct {
2119 _ incomparable
2120 cc *ClientConn
2121 }
2122 2123 // readLoop runs in its own goroutine and reads and dispatches frames.
2124 func (cc *ClientConn) readLoop() {
2125 rl := &clientConnReadLoop{cc: cc}
2126 defer rl.cleanup()
2127 cc.readerErr = rl.run()
2128 if ce, ok := cc.readerErr.(ConnectionError); ok {
2129 cc.wmu.Lock()
2130 cc.fr.WriteGoAway(0, ErrCode(ce), nil)
2131 cc.wmu.Unlock()
2132 }
2133 }
2134 2135 // GoAwayError is returned by the Transport when the server closes the
2136 // TCP connection after sending a GOAWAY frame.
2137 type GoAwayError struct {
2138 LastStreamID uint32
2139 ErrCode ErrCode
2140 DebugData string
2141 }
2142 2143 func (e GoAwayError) Error() string {
2144 return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
2145 e.LastStreamID, e.ErrCode, e.DebugData)
2146 }
2147 2148 func isEOFOrNetReadError(err error) bool {
2149 if err == io.EOF {
2150 return true
2151 }
2152 ne, ok := err.(*net.OpError)
2153 return ok && ne.Op == "read"
2154 }
2155 2156 func (rl *clientConnReadLoop) cleanup() {
2157 cc := rl.cc
2158 defer cc.closeConn()
2159 defer close(cc.readerDone)
2160 2161 if cc.idleTimer != nil {
2162 cc.idleTimer.Stop()
2163 }
2164 2165 // Close any response bodies if the server closes prematurely.
2166 // TODO: also do this if we've written the headers but not
2167 // gotten a response yet.
2168 err := cc.readerErr
2169 cc.mu.Lock()
2170 if cc.goAway != nil && isEOFOrNetReadError(err) {
2171 err = GoAwayError{
2172 LastStreamID: cc.goAway.LastStreamID,
2173 ErrCode: cc.goAway.ErrCode,
2174 DebugData: cc.goAwayDebug,
2175 }
2176 } else if err == io.EOF {
2177 err = io.ErrUnexpectedEOF
2178 }
2179 cc.closed = true
2180 2181 // If the connection has never been used, and has been open for only a short time,
2182 // leave it in the connection pool for a little while.
2183 //
2184 // This avoids a situation where new connections are constantly created,
2185 // added to the pool, fail, and are removed from the pool, without any error
2186 // being surfaced to the user.
2187 unusedWaitTime := 5 * time.Second
2188 if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
2189 unusedWaitTime = cc.idleTimeout
2190 }
2191 idleTime := time.Now().Sub(cc.lastActive)
2192 if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
2193 cc.idleTimer = time.AfterFunc(unusedWaitTime-idleTime, func() {
2194 cc.t.connPool().MarkDead(cc)
2195 })
2196 } else {
2197 cc.mu.Unlock() // avoid any deadlocks in MarkDead
2198 cc.t.connPool().MarkDead(cc)
2199 cc.mu.Lock()
2200 }
2201 2202 for _, cs := range cc.streams {
2203 select {
2204 case <-cs.peerClosed:
2205 // The server closed the stream before closing the conn,
2206 // so no need to interrupt it.
2207 default:
2208 cs.abortStreamLocked(err)
2209 }
2210 }
2211 cc.cond.Broadcast()
2212 cc.mu.Unlock()
2213 2214 if !cc.seenSettings {
2215 // If we have a pending request that wants extended CONNECT,
2216 // let it continue and fail with the connection error.
2217 cc.extendedConnectAllowed = true
2218 close(cc.seenSettingsChan)
2219 }
2220 }
2221 2222 // countReadFrameError calls Transport.CountError with a string
2223 // representing err.
2224 func (cc *ClientConn) countReadFrameError(err error) {
2225 f := cc.t.CountError
2226 if f == nil || err == nil {
2227 return
2228 }
2229 if ce, ok := err.(ConnectionError); ok {
2230 errCode := ErrCode(ce)
2231 f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
2232 return
2233 }
2234 if errors.Is(err, io.EOF) {
2235 f("read_frame_eof")
2236 return
2237 }
2238 if errors.Is(err, io.ErrUnexpectedEOF) {
2239 f("read_frame_unexpected_eof")
2240 return
2241 }
2242 if errors.Is(err, ErrFrameTooLarge) {
2243 f("read_frame_too_large")
2244 return
2245 }
2246 f("read_frame_other")
2247 }
2248 2249 func (rl *clientConnReadLoop) run() error {
2250 cc := rl.cc
2251 gotSettings := false
2252 readIdleTimeout := cc.readIdleTimeout
2253 var t *time.Timer
2254 if readIdleTimeout != 0 {
2255 t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
2256 }
2257 for {
2258 f, err := cc.fr.ReadFrame()
2259 if t != nil {
2260 t.Reset(readIdleTimeout)
2261 }
2262 if err != nil {
2263 cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
2264 }
2265 if se, ok := err.(StreamError); ok {
2266 if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil {
2267 if se.Cause == nil {
2268 se.Cause = cc.fr.errDetail
2269 }
2270 rl.endStreamError(cs, se)
2271 }
2272 continue
2273 } else if err != nil {
2274 cc.countReadFrameError(err)
2275 return err
2276 }
2277 if VerboseLogs {
2278 cc.vlogf("http2: Transport received %s", summarizeFrame(f))
2279 }
2280 if !gotSettings {
2281 if _, ok := f.(*SettingsFrame); !ok {
2282 cc.logf("protocol error: received %T before a SETTINGS frame", f)
2283 return ConnectionError(ErrCodeProtocol)
2284 }
2285 gotSettings = true
2286 }
2287 2288 switch f := f.(type) {
2289 case *MetaHeadersFrame:
2290 err = rl.processHeaders(f)
2291 case *DataFrame:
2292 err = rl.processData(f)
2293 case *GoAwayFrame:
2294 err = rl.processGoAway(f)
2295 case *RSTStreamFrame:
2296 err = rl.processResetStream(f)
2297 case *SettingsFrame:
2298 err = rl.processSettings(f)
2299 case *PushPromiseFrame:
2300 err = rl.processPushPromise(f)
2301 case *WindowUpdateFrame:
2302 err = rl.processWindowUpdate(f)
2303 case *PingFrame:
2304 err = rl.processPing(f)
2305 default:
2306 cc.logf("Transport: unhandled response frame type %T", f)
2307 }
2308 if err != nil {
2309 if VerboseLogs {
2310 cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
2311 }
2312 return err
2313 }
2314 }
2315 }
2316 2317 func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
2318 cs := rl.streamByID(f.StreamID, headerOrDataFrame)
2319 if cs == nil {
2320 // We'd get here if we canceled a request while the
2321 // server had its response still in flight. So if this
2322 // was just something we canceled, ignore it.
2323 return nil
2324 }
2325 if cs.readClosed {
2326 rl.endStreamError(cs, StreamError{
2327 StreamID: f.StreamID,
2328 Code: ErrCodeProtocol,
2329 Cause: errors.New("protocol error: headers after END_STREAM"),
2330 })
2331 return nil
2332 }
2333 if !cs.firstByte {
2334 if cs.trace != nil {
2335 // TODO(bradfitz): move first response byte earlier,
2336 // when we first read the 9 byte header, not waiting
2337 // until all the HEADERS+CONTINUATION frames have been
2338 // merged. This works for now.
2339 traceFirstResponseByte(cs.trace)
2340 }
2341 cs.firstByte = true
2342 }
2343 if !cs.pastHeaders {
2344 cs.pastHeaders = true
2345 } else {
2346 return rl.processTrailers(cs, f)
2347 }
2348 2349 res, err := rl.handleResponse(cs, f)
2350 if err != nil {
2351 if _, ok := err.(ConnectionError); ok {
2352 return err
2353 }
2354 // Any other error type is a stream error.
2355 rl.endStreamError(cs, StreamError{
2356 StreamID: f.StreamID,
2357 Code: ErrCodeProtocol,
2358 Cause: err,
2359 })
2360 return nil // return nil from process* funcs to keep conn alive
2361 }
2362 if res == nil {
2363 // (nil, nil) special case. See handleResponse docs.
2364 return nil
2365 }
2366 cs.resTrailer = &res.Trailer
2367 cs.res = res
2368 close(cs.respHeaderRecv)
2369 if f.StreamEnded() {
2370 rl.endStream(cs)
2371 }
2372 return nil
2373 }
2374 2375 // may return error types nil, or ConnectionError. Any other error value
2376 // is a StreamError of type ErrCodeProtocol. The returned error in that case
2377 // is the detail.
2378 //
2379 // As a special case, handleResponse may return (nil, nil) to skip the
2380 // frame (currently only used for 1xx responses).
2381 func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*http.Response, error) {
2382 if f.Truncated {
2383 return nil, errResponseHeaderListSize
2384 }
2385 2386 status := f.PseudoValue("status")
2387 if status == "" {
2388 return nil, errors.New("malformed response from server: missing status pseudo header")
2389 }
2390 statusCode, err := strconv.Atoi(status)
2391 if err != nil {
2392 return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
2393 }
2394 2395 regularFields := f.RegularFields()
2396 strs := make([]string, len(regularFields))
2397 header := make(http.Header, len(regularFields))
2398 res := &http.Response{
2399 Proto: "HTTP/2.0",
2400 ProtoMajor: 2,
2401 Header: header,
2402 StatusCode: statusCode,
2403 Status: status + " " + http.StatusText(statusCode),
2404 }
2405 for _, hf := range regularFields {
2406 key := httpcommon.CanonicalHeader(hf.Name)
2407 if key == "Trailer" {
2408 t := res.Trailer
2409 if t == nil {
2410 t = make(http.Header)
2411 res.Trailer = t
2412 }
2413 foreachHeaderElement(hf.Value, func(v string) {
2414 t[httpcommon.CanonicalHeader(v)] = nil
2415 })
2416 } else {
2417 vv := header[key]
2418 if vv == nil && len(strs) > 0 {
2419 // More than likely this will be a single-element key.
2420 // Most headers aren't multi-valued.
2421 // Set the capacity on strs[0] to 1, so any future append
2422 // won't extend the slice into the other strings.
2423 vv, strs = strs[:1:1], strs[1:]
2424 vv[0] = hf.Value
2425 header[key] = vv
2426 } else {
2427 header[key] = append(vv, hf.Value)
2428 }
2429 }
2430 }
2431 2432 if statusCode >= 100 && statusCode <= 199 {
2433 if f.StreamEnded() {
2434 return nil, errors.New("1xx informational response with END_STREAM flag")
2435 }
2436 if fn := cs.get1xxTraceFunc(); fn != nil {
2437 // If the 1xx response is being delivered to the user,
2438 // then they're responsible for limiting the number
2439 // of responses.
2440 if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
2441 return nil, err
2442 }
2443 } else {
2444 // If the user didn't examine the 1xx response, then we
2445 // limit the size of all 1xx headers.
2446 //
2447 // This differs a bit from the HTTP/1 implementation, which
2448 // limits the size of all 1xx headers plus the final response.
2449 // Use the larger limit of MaxHeaderListSize and
2450 // net/http.Transport.MaxResponseHeaderBytes.
2451 limit := int64(cs.cc.t.maxHeaderListSize())
2452 if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes > limit {
2453 limit = t1.MaxResponseHeaderBytes
2454 }
2455 for _, h := range f.Fields {
2456 cs.totalHeaderSize += int64(h.Size())
2457 }
2458 if cs.totalHeaderSize > limit {
2459 if VerboseLogs {
2460 log.Printf("http2: 1xx informational responses too large")
2461 }
2462 return nil, errors.New("header list too large")
2463 }
2464 }
2465 if statusCode == 100 {
2466 traceGot100Continue(cs.trace)
2467 select {
2468 case cs.on100 <- struct{}{}:
2469 default:
2470 }
2471 }
2472 cs.pastHeaders = false // do it all again
2473 return nil, nil
2474 }
2475 2476 res.ContentLength = -1
2477 if clens := res.Header["Content-Length"]; len(clens) == 1 {
2478 if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
2479 res.ContentLength = int64(cl)
2480 } else {
2481 // TODO: care? unlike http/1, it won't mess up our framing, so it's
2482 // more safe smuggling-wise to ignore.
2483 }
2484 } else if len(clens) > 1 {
2485 // TODO: care? unlike http/1, it won't mess up our framing, so it's
2486 // more safe smuggling-wise to ignore.
2487 } else if f.StreamEnded() && !cs.isHead {
2488 res.ContentLength = 0
2489 }
2490 2491 if cs.isHead {
2492 res.Body = noBody
2493 return res, nil
2494 }
2495 2496 if f.StreamEnded() {
2497 if res.ContentLength > 0 {
2498 res.Body = missingBody{}
2499 } else {
2500 res.Body = noBody
2501 }
2502 return res, nil
2503 }
2504 2505 cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
2506 cs.bytesRemain = res.ContentLength
2507 res.Body = transportResponseBody{cs}
2508 2509 if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
2510 res.Header.Del("Content-Encoding")
2511 res.Header.Del("Content-Length")
2512 res.ContentLength = -1
2513 res.Body = &gzipReader{body: res.Body}
2514 res.Uncompressed = true
2515 }
2516 return res, nil
2517 }
2518 2519 func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
2520 if cs.pastTrailers {
2521 // Too many HEADERS frames for this stream.
2522 return ConnectionError(ErrCodeProtocol)
2523 }
2524 cs.pastTrailers = true
2525 if !f.StreamEnded() {
2526 // We expect that any headers for trailers also
2527 // has END_STREAM.
2528 return ConnectionError(ErrCodeProtocol)
2529 }
2530 if len(f.PseudoFields()) > 0 {
2531 // No pseudo header fields are defined for trailers.
2532 // TODO: ConnectionError might be overly harsh? Check.
2533 return ConnectionError(ErrCodeProtocol)
2534 }
2535 2536 trailer := make(http.Header)
2537 for _, hf := range f.RegularFields() {
2538 key := httpcommon.CanonicalHeader(hf.Name)
2539 trailer[key] = append(trailer[key], hf.Value)
2540 }
2541 cs.trailer = trailer
2542 2543 rl.endStream(cs)
2544 return nil
2545 }
2546 2547 // transportResponseBody is the concrete type of Transport.RoundTrip's
2548 // Response.Body. It is an io.ReadCloser.
2549 type transportResponseBody struct {
2550 cs *clientStream
2551 }
2552 2553 func (b transportResponseBody) Read(p []byte) (n int, err error) {
2554 cs := b.cs
2555 cc := cs.cc
2556 2557 if cs.readErr != nil {
2558 return 0, cs.readErr
2559 }
2560 n, err = b.cs.bufPipe.Read(p)
2561 if cs.bytesRemain != -1 {
2562 if int64(n) > cs.bytesRemain {
2563 n = int(cs.bytesRemain)
2564 if err == nil {
2565 err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
2566 cs.abortStream(err)
2567 }
2568 cs.readErr = err
2569 return int(cs.bytesRemain), err
2570 }
2571 cs.bytesRemain -= int64(n)
2572 if err == io.EOF && cs.bytesRemain > 0 {
2573 err = io.ErrUnexpectedEOF
2574 cs.readErr = err
2575 return n, err
2576 }
2577 }
2578 if n == 0 {
2579 // No flow control tokens to send back.
2580 return
2581 }
2582 2583 cc.mu.Lock()
2584 connAdd := cc.inflow.add(n)
2585 var streamAdd int32
2586 if err == nil { // No need to refresh if the stream is over or failed.
2587 streamAdd = cs.inflow.add(n)
2588 }
2589 cc.mu.Unlock()
2590 2591 if connAdd != 0 || streamAdd != 0 {
2592 cc.wmu.Lock()
2593 defer cc.wmu.Unlock()
2594 if connAdd != 0 {
2595 cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
2596 }
2597 if streamAdd != 0 {
2598 cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
2599 }
2600 cc.bw.Flush()
2601 }
2602 return
2603 }
2604 2605 var errClosedResponseBody = errors.New("http2: response body closed")
2606 2607 func (b transportResponseBody) Close() error {
2608 cs := b.cs
2609 cc := cs.cc
2610 2611 cs.bufPipe.BreakWithError(errClosedResponseBody)
2612 cs.abortStream(errClosedResponseBody)
2613 2614 unread := cs.bufPipe.Len()
2615 if unread > 0 {
2616 cc.mu.Lock()
2617 // Return connection-level flow control.
2618 connAdd := cc.inflow.add(unread)
2619 cc.mu.Unlock()
2620 2621 // TODO(dneil): Acquiring this mutex can block indefinitely.
2622 // Move flow control return to a goroutine?
2623 cc.wmu.Lock()
2624 // Return connection-level flow control.
2625 if connAdd > 0 {
2626 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2627 }
2628 cc.bw.Flush()
2629 cc.wmu.Unlock()
2630 }
2631 2632 select {
2633 case <-cs.donec:
2634 case <-cs.ctx.Done():
2635 // See golang/go#49366: The net/http package can cancel the
2636 // request context after the response body is fully read.
2637 // Don't treat this as an error.
2638 return nil
2639 case <-cs.reqCancel:
2640 return errRequestCanceled
2641 }
2642 return nil
2643 }
2644 2645 func (rl *clientConnReadLoop) processData(f *DataFrame) error {
2646 cc := rl.cc
2647 cs := rl.streamByID(f.StreamID, headerOrDataFrame)
2648 data := f.Data()
2649 if cs == nil {
2650 cc.mu.Lock()
2651 neverSent := cc.nextStreamID
2652 cc.mu.Unlock()
2653 if f.StreamID >= neverSent {
2654 // We never asked for this.
2655 cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
2656 return ConnectionError(ErrCodeProtocol)
2657 }
2658 // We probably did ask for this, but canceled. Just ignore it.
2659 // TODO: be stricter here? only silently ignore things which
2660 // we canceled, but not things which were closed normally
2661 // by the peer? Tough without accumulating too much state.
2662 2663 // But at least return their flow control:
2664 if f.Length > 0 {
2665 cc.mu.Lock()
2666 ok := cc.inflow.take(f.Length)
2667 connAdd := cc.inflow.add(int(f.Length))
2668 cc.mu.Unlock()
2669 if !ok {
2670 return ConnectionError(ErrCodeFlowControl)
2671 }
2672 if connAdd > 0 {
2673 cc.wmu.Lock()
2674 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2675 cc.bw.Flush()
2676 cc.wmu.Unlock()
2677 }
2678 }
2679 return nil
2680 }
2681 if cs.readClosed {
2682 cc.logf("protocol error: received DATA after END_STREAM")
2683 rl.endStreamError(cs, StreamError{
2684 StreamID: f.StreamID,
2685 Code: ErrCodeProtocol,
2686 })
2687 return nil
2688 }
2689 if !cs.pastHeaders {
2690 cc.logf("protocol error: received DATA before a HEADERS frame")
2691 rl.endStreamError(cs, StreamError{
2692 StreamID: f.StreamID,
2693 Code: ErrCodeProtocol,
2694 })
2695 return nil
2696 }
2697 if f.Length > 0 {
2698 if cs.isHead && len(data) > 0 {
2699 cc.logf("protocol error: received DATA on a HEAD request")
2700 rl.endStreamError(cs, StreamError{
2701 StreamID: f.StreamID,
2702 Code: ErrCodeProtocol,
2703 })
2704 return nil
2705 }
2706 // Check connection-level flow control.
2707 cc.mu.Lock()
2708 if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
2709 cc.mu.Unlock()
2710 return ConnectionError(ErrCodeFlowControl)
2711 }
2712 // Return any padded flow control now, since we won't
2713 // refund it later on body reads.
2714 var refund int
2715 if pad := int(f.Length) - len(data); pad > 0 {
2716 refund += pad
2717 }
2718 2719 didReset := false
2720 var err error
2721 if len(data) > 0 {
2722 if _, err = cs.bufPipe.Write(data); err != nil {
2723 // Return len(data) now if the stream is already closed,
2724 // since data will never be read.
2725 didReset = true
2726 refund += len(data)
2727 }
2728 }
2729 2730 sendConn := cc.inflow.add(refund)
2731 var sendStream int32
2732 if !didReset {
2733 sendStream = cs.inflow.add(refund)
2734 }
2735 cc.mu.Unlock()
2736 2737 if sendConn > 0 || sendStream > 0 {
2738 cc.wmu.Lock()
2739 if sendConn > 0 {
2740 cc.fr.WriteWindowUpdate(0, uint32(sendConn))
2741 }
2742 if sendStream > 0 {
2743 cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
2744 }
2745 cc.bw.Flush()
2746 cc.wmu.Unlock()
2747 }
2748 2749 if err != nil {
2750 rl.endStreamError(cs, err)
2751 return nil
2752 }
2753 }
2754 2755 if f.StreamEnded() {
2756 rl.endStream(cs)
2757 }
2758 return nil
2759 }
2760 2761 func (rl *clientConnReadLoop) endStream(cs *clientStream) {
2762 // TODO: check that any declared content-length matches, like
2763 // server.go's (*stream).endStream method.
2764 if !cs.readClosed {
2765 cs.readClosed = true
2766 // Close cs.bufPipe and cs.peerClosed with cc.mu held to avoid a
2767 // race condition: The caller can read io.EOF from Response.Body
2768 // and close the body before we close cs.peerClosed, causing
2769 // cleanupWriteRequest to send a RST_STREAM.
2770 rl.cc.mu.Lock()
2771 defer rl.cc.mu.Unlock()
2772 cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
2773 close(cs.peerClosed)
2774 }
2775 }
2776 2777 func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
2778 cs.readAborted = true
2779 cs.abortStream(err)
2780 }
2781 2782 // Constants passed to streamByID for documentation purposes.
2783 const (
2784 headerOrDataFrame = true
2785 notHeaderOrDataFrame = false
2786 )
2787 2788 // streamByID returns the stream with the given id, or nil if no stream has that id.
2789 // If headerOrData is true, it clears rst.StreamPingsBlocked.
2790 func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream {
2791 rl.cc.mu.Lock()
2792 defer rl.cc.mu.Unlock()
2793 if headerOrData {
2794 // Work around an unfortunate gRPC behavior.
2795 // See comment on ClientConn.rstStreamPingsBlocked for details.
2796 rl.cc.rstStreamPingsBlocked = false
2797 }
2798 rl.cc.readBeforeStreamID = rl.cc.nextStreamID
2799 cs := rl.cc.streams[id]
2800 if cs != nil && !cs.readAborted {
2801 return cs
2802 }
2803 return nil
2804 }
2805 2806 func (cs *clientStream) copyTrailers() {
2807 for k, vv := range cs.trailer {
2808 t := cs.resTrailer
2809 if *t == nil {
2810 *t = make(http.Header)
2811 }
2812 (*t)[k] = vv
2813 }
2814 }
2815 2816 func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
2817 cc := rl.cc
2818 cc.t.connPool().MarkDead(cc)
2819 if f.ErrCode != 0 {
2820 // TODO: deal with GOAWAY more. particularly the error code
2821 cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
2822 if fn := cc.t.CountError; fn != nil {
2823 fn("recv_goaway_" + f.ErrCode.stringToken())
2824 }
2825 }
2826 cc.setGoAway(f)
2827 return nil
2828 }
2829 2830 func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
2831 cc := rl.cc
2832 // Locking both mu and wmu here allows frame encoding to read settings with only wmu held.
2833 // Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless.
2834 cc.wmu.Lock()
2835 defer cc.wmu.Unlock()
2836 2837 if err := rl.processSettingsNoWrite(f); err != nil {
2838 return err
2839 }
2840 if !f.IsAck() {
2841 cc.fr.WriteSettingsAck()
2842 cc.bw.Flush()
2843 }
2844 return nil
2845 }
2846 2847 func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
2848 cc := rl.cc
2849 defer cc.maybeCallStateHook()
2850 cc.mu.Lock()
2851 defer cc.mu.Unlock()
2852 2853 if f.IsAck() {
2854 if cc.wantSettingsAck {
2855 cc.wantSettingsAck = false
2856 return nil
2857 }
2858 return ConnectionError(ErrCodeProtocol)
2859 }
2860 2861 var seenMaxConcurrentStreams bool
2862 err := f.ForeachSetting(func(s Setting) error {
2863 switch s.ID {
2864 case SettingMaxFrameSize:
2865 cc.maxFrameSize = s.Val
2866 case SettingMaxConcurrentStreams:
2867 cc.maxConcurrentStreams = s.Val
2868 seenMaxConcurrentStreams = true
2869 case SettingMaxHeaderListSize:
2870 cc.peerMaxHeaderListSize = uint64(s.Val)
2871 case SettingInitialWindowSize:
2872 // Values above the maximum flow-control
2873 // window size of 2^31-1 MUST be treated as a
2874 // connection error (Section 5.4.1) of type
2875 // FLOW_CONTROL_ERROR.
2876 if s.Val > math.MaxInt32 {
2877 return ConnectionError(ErrCodeFlowControl)
2878 }
2879 2880 // Adjust flow control of currently-open
2881 // frames by the difference of the old initial
2882 // window size and this one.
2883 delta := int32(s.Val) - int32(cc.initialWindowSize)
2884 for _, cs := range cc.streams {
2885 cs.flow.add(delta)
2886 }
2887 cc.cond.Broadcast()
2888 2889 cc.initialWindowSize = s.Val
2890 case SettingHeaderTableSize:
2891 cc.henc.SetMaxDynamicTableSize(s.Val)
2892 cc.peerMaxHeaderTableSize = s.Val
2893 case SettingEnableConnectProtocol:
2894 if err := s.Valid(); err != nil {
2895 return err
2896 }
2897 // If the peer wants to send us SETTINGS_ENABLE_CONNECT_PROTOCOL,
2898 // we require that it do so in the first SETTINGS frame.
2899 //
2900 // When we attempt to use extended CONNECT, we wait for the first
2901 // SETTINGS frame to see if the server supports it. If we let the
2902 // server enable the feature with a later SETTINGS frame, then
2903 // users will see inconsistent results depending on whether we've
2904 // seen that frame or not.
2905 if !cc.seenSettings {
2906 cc.extendedConnectAllowed = s.Val == 1
2907 }
2908 default:
2909 cc.vlogf("Unhandled Setting: %v", s)
2910 }
2911 return nil
2912 })
2913 if err != nil {
2914 return err
2915 }
2916 2917 if !cc.seenSettings {
2918 if !seenMaxConcurrentStreams {
2919 // This was the servers initial SETTINGS frame and it
2920 // didn't contain a MAX_CONCURRENT_STREAMS field so
2921 // increase the number of concurrent streams this
2922 // connection can establish to our default.
2923 cc.maxConcurrentStreams = defaultMaxConcurrentStreams
2924 }
2925 close(cc.seenSettingsChan)
2926 cc.seenSettings = true
2927 }
2928 2929 return nil
2930 }
2931 2932 func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
2933 cc := rl.cc
2934 cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
2935 if f.StreamID != 0 && cs == nil {
2936 return nil
2937 }
2938 2939 cc.mu.Lock()
2940 defer cc.mu.Unlock()
2941 2942 fl := &cc.flow
2943 if cs != nil {
2944 fl = &cs.flow
2945 }
2946 if !fl.add(int32(f.Increment)) {
2947 // For stream, the sender sends RST_STREAM with an error code of FLOW_CONTROL_ERROR
2948 if cs != nil {
2949 rl.endStreamError(cs, StreamError{
2950 StreamID: f.StreamID,
2951 Code: ErrCodeFlowControl,
2952 })
2953 return nil
2954 }
2955 2956 return ConnectionError(ErrCodeFlowControl)
2957 }
2958 cc.cond.Broadcast()
2959 return nil
2960 }
2961 2962 func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
2963 cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
2964 if cs == nil {
2965 // TODO: return error if server tries to RST_STREAM an idle stream
2966 return nil
2967 }
2968 serr := streamError(cs.ID, f.ErrCode)
2969 serr.Cause = errFromPeer
2970 if f.ErrCode == ErrCodeProtocol {
2971 rl.cc.SetDoNotReuse()
2972 }
2973 if fn := cs.cc.t.CountError; fn != nil {
2974 fn("recv_rststream_" + f.ErrCode.stringToken())
2975 }
2976 cs.abortStream(serr)
2977 2978 cs.bufPipe.CloseWithError(serr)
2979 return nil
2980 }
2981 2982 // Ping sends a PING frame to the server and waits for the ack.
2983 func (cc *ClientConn) Ping(ctx context.Context) error {
2984 c := make(chan struct{})
2985 // Generate a random payload
2986 var p [8]byte
2987 for {
2988 if _, err := rand.Read(p[:]); err != nil {
2989 return err
2990 }
2991 cc.mu.Lock()
2992 // check for dup before insert
2993 if _, found := cc.pings[p]; !found {
2994 cc.pings[p] = c
2995 cc.mu.Unlock()
2996 break
2997 }
2998 cc.mu.Unlock()
2999 }
3000 var pingError error
3001 errc := make(chan struct{})
3002 go func() {
3003 cc.wmu.Lock()
3004 defer cc.wmu.Unlock()
3005 if pingError = cc.fr.WritePing(false, p); pingError != nil {
3006 close(errc)
3007 return
3008 }
3009 if pingError = cc.bw.Flush(); pingError != nil {
3010 close(errc)
3011 return
3012 }
3013 }()
3014 select {
3015 case <-c:
3016 return nil
3017 case <-errc:
3018 return pingError
3019 case <-ctx.Done():
3020 return ctx.Err()
3021 case <-cc.readerDone:
3022 // connection closed
3023 return cc.readerErr
3024 }
3025 }
3026 3027 func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
3028 if f.IsAck() {
3029 cc := rl.cc
3030 defer cc.maybeCallStateHook()
3031 cc.mu.Lock()
3032 defer cc.mu.Unlock()
3033 // If ack, notify listener if any
3034 if c, ok := cc.pings[f.Data]; ok {
3035 close(c)
3036 delete(cc.pings, f.Data)
3037 }
3038 if cc.pendingResets > 0 {
3039 // See clientStream.cleanupWriteRequest.
3040 cc.pendingResets = 0
3041 cc.rstStreamPingsBlocked = true
3042 cc.cond.Broadcast()
3043 }
3044 return nil
3045 }
3046 cc := rl.cc
3047 cc.wmu.Lock()
3048 defer cc.wmu.Unlock()
3049 if err := cc.fr.WritePing(true, f.Data); err != nil {
3050 return err
3051 }
3052 return cc.bw.Flush()
3053 }
3054 3055 func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
3056 // We told the peer we don't want them.
3057 // Spec says:
3058 // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
3059 // setting of the peer endpoint is set to 0. An endpoint that
3060 // has set this setting and has received acknowledgement MUST
3061 // treat the receipt of a PUSH_PROMISE frame as a connection
3062 // error (Section 5.4.1) of type PROTOCOL_ERROR."
3063 return ConnectionError(ErrCodeProtocol)
3064 }
3065 3066 // writeStreamReset sends a RST_STREAM frame.
3067 // When ping is true, it also sends a PING frame with a random payload.
3068 func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
3069 // TODO: map err to more interesting error codes, once the
3070 // HTTP community comes up with some. But currently for
3071 // RST_STREAM there's no equivalent to GOAWAY frame's debug
3072 // data, and the error codes are all pretty vague ("cancel").
3073 cc.wmu.Lock()
3074 cc.fr.WriteRSTStream(streamID, code)
3075 if ping {
3076 var payload [8]byte
3077 rand.Read(payload[:])
3078 cc.fr.WritePing(false, payload)
3079 }
3080 cc.bw.Flush()
3081 cc.wmu.Unlock()
3082 }
3083 3084 var (
3085 errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
3086 errRequestHeaderListSize = httpcommon.ErrRequestHeaderListSize
3087 )
3088 3089 func (cc *ClientConn) logf(format string, args ...interface{}) {
3090 cc.t.logf(format, args...)
3091 }
3092 3093 func (cc *ClientConn) vlogf(format string, args ...interface{}) {
3094 cc.t.vlogf(format, args...)
3095 }
3096 3097 func (t *Transport) vlogf(format string, args ...interface{}) {
3098 if VerboseLogs {
3099 t.logf(format, args...)
3100 }
3101 }
3102 3103 func (t *Transport) logf(format string, args ...interface{}) {
3104 log.Printf(format, args...)
3105 }
3106 3107 var noBody io.ReadCloser = noBodyReader{}
3108 3109 type noBodyReader struct{}
3110 3111 func (noBodyReader) Close() error { return nil }
3112 func (noBodyReader) Read([]byte) (int, error) { return 0, io.EOF }
3113 3114 type missingBody struct{}
3115 3116 func (missingBody) Close() error { return nil }
3117 func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
3118 3119 func strSliceContains(ss []string, s string) bool {
3120 for _, v := range ss {
3121 if v == s {
3122 return true
3123 }
3124 }
3125 return false
3126 }
3127 3128 type erringRoundTripper struct{ err error }
3129 3130 func (rt erringRoundTripper) RoundTripErr() error { return rt.err }
3131 func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err }
3132 3133 var errConcurrentReadOnResBody = errors.New("http2: concurrent read on response body")
3134 3135 // gzipReader wraps a response body so it can lazily
3136 // get gzip.Reader from the pool on the first call to Read.
3137 // After Close is called it puts gzip.Reader to the pool immediately
3138 // if there is no Read in progress or later when Read completes.
3139 type gzipReader struct {
3140 _ incomparable
3141 body io.ReadCloser // underlying Response.Body
3142 mu sync.Mutex // guards zr and zerr
3143 zr *gzip.Reader // stores gzip reader from the pool between reads
3144 zerr error // sticky gzip reader init error or sentinel value to detect concurrent read and read after close
3145 }
3146 3147 type eofReader struct{}
3148 3149 func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
3150 func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
3151 3152 var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
3153 3154 // gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r.
3155 func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
3156 zr := gzipPool.Get().(*gzip.Reader)
3157 if err := zr.Reset(r); err != nil {
3158 gzipPoolPut(zr)
3159 return nil, err
3160 }
3161 return zr, nil
3162 }
3163 3164 // gzipPoolPut puts a gzip.Reader back into the pool.
3165 func gzipPoolPut(zr *gzip.Reader) {
3166 // Reset will allocate bufio.Reader if we pass it anything
3167 // other than a flate.Reader, so ensure that it's getting one.
3168 var r flate.Reader = eofReader{}
3169 zr.Reset(r)
3170 gzipPool.Put(zr)
3171 }
3172 3173 // acquire returns a gzip.Reader for reading response body.
3174 // The reader must be released after use.
3175 func (gz *gzipReader) acquire() (*gzip.Reader, error) {
3176 gz.mu.Lock()
3177 defer gz.mu.Unlock()
3178 if gz.zerr != nil {
3179 return nil, gz.zerr
3180 }
3181 if gz.zr == nil {
3182 gz.zr, gz.zerr = gzipPoolGet(gz.body)
3183 if gz.zerr != nil {
3184 return nil, gz.zerr
3185 }
3186 }
3187 ret := gz.zr
3188 gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
3189 return ret, nil
3190 }
3191 3192 // release returns the gzip.Reader to the pool if Close was called during Read.
3193 func (gz *gzipReader) release(zr *gzip.Reader) {
3194 gz.mu.Lock()
3195 defer gz.mu.Unlock()
3196 if gz.zerr == errConcurrentReadOnResBody {
3197 gz.zr, gz.zerr = zr, nil
3198 } else { // fs.ErrClosed
3199 gzipPoolPut(zr)
3200 }
3201 }
3202 3203 // close returns the gzip.Reader to the pool immediately or
3204 // signals release to do so after Read completes.
3205 func (gz *gzipReader) close() {
3206 gz.mu.Lock()
3207 defer gz.mu.Unlock()
3208 if gz.zerr == nil && gz.zr != nil {
3209 gzipPoolPut(gz.zr)
3210 gz.zr = nil
3211 }
3212 gz.zerr = fs.ErrClosed
3213 }
3214 3215 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3216 zr, err := gz.acquire()
3217 if err != nil {
3218 return 0, err
3219 }
3220 defer gz.release(zr)
3221 3222 return zr.Read(p)
3223 }
3224 3225 func (gz *gzipReader) Close() error {
3226 gz.close()
3227 3228 return gz.body.Close()
3229 }
3230 3231 type errorReader struct{ err error }
3232 3233 func (r errorReader) Read(p []byte) (int, error) { return 0, r.err }
3234 3235 // isConnectionCloseRequest reports whether req should use its own
3236 // connection for a single request and then close the connection.
3237 func isConnectionCloseRequest(req *http.Request) bool {
3238 return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
3239 }
3240 3241 // registerHTTPSProtocol calls Transport.RegisterProtocol but
3242 // converting panics into errors.
3243 func registerHTTPSProtocol(t *http.Transport, rt noDialH2RoundTripper) (err error) {
3244 defer func() {
3245 if e := recover(); e != nil {
3246 err = fmt.Errorf("%v", e)
3247 }
3248 }()
3249 t.RegisterProtocol("https", rt)
3250 return nil
3251 }
3252 3253 // noDialH2RoundTripper is a RoundTripper which only tries to complete the request
3254 // if there's already a cached connection to the host.
3255 // (The field is exported so it can be accessed via reflect from net/http; tested
3256 // by TestNoDialH2RoundTripperType)
3257 //
3258 // A noDialH2RoundTripper is registered with http1.Transport.RegisterProtocol,
3259 // and the http1.Transport can use type assertions to call non-RoundTrip methods on it.
3260 // This lets us expose, for example, NewClientConn to net/http.
3261 type noDialH2RoundTripper struct{ *Transport }
3262 3263 func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
3264 res, err := rt.Transport.RoundTrip(req)
3265 if isNoCachedConnError(err) {
3266 return nil, http.ErrSkipAltProtocol
3267 }
3268 return res, err
3269 }
3270 3271 func (rt noDialH2RoundTripper) NewClientConn(conn net.Conn, internalStateHook func()) (http.RoundTripper, error) {
3272 tr := rt.Transport
3273 cc, err := tr.newClientConn(conn, tr.disableKeepAlives(), internalStateHook)
3274 if err != nil {
3275 return nil, err
3276 }
3277 3278 // RoundTrip should block when the conn is at its concurrency limit,
3279 // not return an error. Setting strictMaxConcurrentStreams enables this.
3280 cc.strictMaxConcurrentStreams = true
3281 3282 return netHTTPClientConn{cc}, nil
3283 }
3284 3285 // netHTTPClientConn wraps ClientConn and implements the interface net/http expects from
3286 // the RoundTripper returned by NewClientConn.
3287 type netHTTPClientConn struct {
3288 cc *ClientConn
3289 }
3290 3291 func (cc netHTTPClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
3292 return cc.cc.RoundTrip(req)
3293 }
3294 3295 func (cc netHTTPClientConn) Close() error {
3296 return cc.cc.Close()
3297 }
3298 3299 func (cc netHTTPClientConn) Err() error {
3300 cc.cc.mu.Lock()
3301 defer cc.cc.mu.Unlock()
3302 if cc.cc.closed {
3303 return errors.New("connection closed")
3304 }
3305 return nil
3306 }
3307 3308 func (cc netHTTPClientConn) Reserve() error {
3309 defer cc.cc.maybeCallStateHook()
3310 cc.cc.mu.Lock()
3311 defer cc.cc.mu.Unlock()
3312 if !cc.cc.canReserveLocked() {
3313 return errors.New("connection is unavailable")
3314 }
3315 cc.cc.streamsReserved++
3316 return nil
3317 }
3318 3319 func (cc netHTTPClientConn) Release() {
3320 defer cc.cc.maybeCallStateHook()
3321 cc.cc.mu.Lock()
3322 defer cc.cc.mu.Unlock()
3323 // We don't complain if streamsReserved is 0.
3324 //
3325 // This is consistent with RoundTrip: both Release and RoundTrip will
3326 // consume a reservation iff one exists.
3327 if cc.cc.streamsReserved > 0 {
3328 cc.cc.streamsReserved--
3329 }
3330 }
3331 3332 func (cc netHTTPClientConn) Available() int {
3333 cc.cc.mu.Lock()
3334 defer cc.cc.mu.Unlock()
3335 return cc.cc.availableLocked()
3336 }
3337 3338 func (cc netHTTPClientConn) InFlight() int {
3339 cc.cc.mu.Lock()
3340 defer cc.cc.mu.Unlock()
3341 return cc.cc.currentRequestCountLocked()
3342 }
3343 3344 func (cc *ClientConn) maybeCallStateHook() {
3345 if cc.internalStateHook != nil {
3346 cc.internalStateHook()
3347 }
3348 }
3349 3350 func (t *Transport) idleConnTimeout() time.Duration {
3351 // to keep things backwards compatible, we use non-zero values of
3352 // IdleConnTimeout, followed by using the IdleConnTimeout on the underlying
3353 // http1 transport, followed by 0
3354 if t.IdleConnTimeout != 0 {
3355 return t.IdleConnTimeout
3356 }
3357 3358 if t.t1 != nil {
3359 return t.t1.IdleConnTimeout
3360 }
3361 3362 return 0
3363 }
3364 3365 func traceGetConn(req *http.Request, hostPort string) {
3366 trace := httptrace.ContextClientTrace(req.Context())
3367 if trace == nil || trace.GetConn == nil {
3368 return
3369 }
3370 trace.GetConn(hostPort)
3371 }
3372 3373 func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
3374 trace := httptrace.ContextClientTrace(req.Context())
3375 if trace == nil || trace.GotConn == nil {
3376 return
3377 }
3378 ci := httptrace.GotConnInfo{Conn: cc.tconn}
3379 ci.Reused = reused
3380 cc.mu.Lock()
3381 ci.WasIdle = len(cc.streams) == 0 && reused
3382 if ci.WasIdle && !cc.lastActive.IsZero() {
3383 ci.IdleTime = time.Since(cc.lastActive)
3384 }
3385 cc.mu.Unlock()
3386 3387 trace.GotConn(ci)
3388 }
3389 3390 func traceWroteHeaders(trace *httptrace.ClientTrace) {
3391 if trace != nil && trace.WroteHeaders != nil {
3392 trace.WroteHeaders()
3393 }
3394 }
3395 3396 func traceGot100Continue(trace *httptrace.ClientTrace) {
3397 if trace != nil && trace.Got100Continue != nil {
3398 trace.Got100Continue()
3399 }
3400 }
3401 3402 func traceWait100Continue(trace *httptrace.ClientTrace) {
3403 if trace != nil && trace.Wait100Continue != nil {
3404 trace.Wait100Continue()
3405 }
3406 }
3407 3408 func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
3409 if trace != nil && trace.WroteRequest != nil {
3410 trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
3411 }
3412 }
3413 3414 func traceFirstResponseByte(trace *httptrace.ClientTrace) {
3415 if trace != nil && trace.GotFirstResponseByte != nil {
3416 trace.GotFirstResponseByte()
3417 }
3418 }
3419 3420 func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
3421 if trace != nil {
3422 return trace.Got1xxResponse
3423 }
3424 return nil
3425 }
3426 3427 // dialTLSWithContext uses tls.Dialer, added in Go 1.15, to open a TLS
3428 // connection.
3429 func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
3430 dialer := &tls.Dialer{
3431 Config: cfg,
3432 }
3433 cn, err := dialer.DialContext(ctx, network, addr)
3434 if err != nil {
3435 return nil, err
3436 }
3437 tlsCn := cn.(*tls.Conn) // DialContext comment promises this will always succeed
3438 return tlsCn, nil
3439 }
3440