client_conn_pool.go raw

   1  // Copyright 2015 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  // Transport code's client connection pooling.
   6  
   7  package http2
   8  
   9  import (
  10  	"context"
  11  	"errors"
  12  	"net"
  13  	"net/http"
  14  	"sync"
  15  )
  16  
  17  // ClientConnPool manages a pool of HTTP/2 client connections.
  18  type ClientConnPool interface {
  19  	// GetClientConn returns a specific HTTP/2 connection (usually
  20  	// a TLS-TCP connection) to an HTTP/2 server. On success, the
  21  	// returned ClientConn accounts for the upcoming RoundTrip
  22  	// call, so the caller should not omit it. If the caller needs
  23  	// to, ClientConn.RoundTrip can be called with a bogus
  24  	// new(http.Request) to release the stream reservation.
  25  	GetClientConn(req *http.Request, addr string) (*ClientConn, error)
  26  	MarkDead(*ClientConn)
  27  }
  28  
  29  // clientConnPoolIdleCloser is the interface implemented by ClientConnPool
  30  // implementations which can close their idle connections.
  31  type clientConnPoolIdleCloser interface {
  32  	ClientConnPool
  33  	closeIdleConnections()
  34  }
  35  
  36  var (
  37  	_ clientConnPoolIdleCloser = (*clientConnPool)(nil)
  38  	_ clientConnPoolIdleCloser = noDialClientConnPool{}
  39  )
  40  
  41  // TODO: use singleflight for dialing and addConnCalls?
  42  type clientConnPool struct {
  43  	t *Transport
  44  
  45  	mu sync.Mutex // TODO: maybe switch to RWMutex
  46  	// TODO: add support for sharing conns based on cert names
  47  	// (e.g. share conn for googleapis.com and appspot.com)
  48  	conns        map[string][]*ClientConn // key is host:port
  49  	dialing      map[string]*dialCall     // currently in-flight dials
  50  	keys         map[*ClientConn][]string
  51  	addConnCalls map[string]*addConnCall // in-flight addConnIfNeeded calls
  52  }
  53  
  54  func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
  55  	return p.getClientConn(req, addr, dialOnMiss)
  56  }
  57  
  58  const (
  59  	dialOnMiss   = true
  60  	noDialOnMiss = false
  61  )
  62  
  63  func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
  64  	// TODO(dneil): Dial a new connection when t.DisableKeepAlives is set?
  65  	if isConnectionCloseRequest(req) && dialOnMiss {
  66  		// It gets its own connection.
  67  		traceGetConn(req, addr)
  68  		const singleUse = true
  69  		cc, err := p.t.dialClientConn(req.Context(), addr, singleUse)
  70  		if err != nil {
  71  			return nil, err
  72  		}
  73  		return cc, nil
  74  	}
  75  	for {
  76  		p.mu.Lock()
  77  		for _, cc := range p.conns[addr] {
  78  			if cc.ReserveNewRequest() {
  79  				// When a connection is presented to us by the net/http package,
  80  				// the GetConn hook has already been called.
  81  				// Don't call it a second time here.
  82  				if !cc.getConnCalled {
  83  					traceGetConn(req, addr)
  84  				}
  85  				cc.getConnCalled = false
  86  				p.mu.Unlock()
  87  				return cc, nil
  88  			}
  89  		}
  90  		if !dialOnMiss {
  91  			p.mu.Unlock()
  92  			return nil, ErrNoCachedConn
  93  		}
  94  		traceGetConn(req, addr)
  95  		call := p.getStartDialLocked(req.Context(), addr)
  96  		p.mu.Unlock()
  97  		<-call.done
  98  		if shouldRetryDial(call, req) {
  99  			continue
 100  		}
 101  		cc, err := call.res, call.err
 102  		if err != nil {
 103  			return nil, err
 104  		}
 105  		if cc.ReserveNewRequest() {
 106  			return cc, nil
 107  		}
 108  	}
 109  }
 110  
 111  // dialCall is an in-flight Transport dial call to a host.
 112  type dialCall struct {
 113  	_ incomparable
 114  	p *clientConnPool
 115  	// the context associated with the request
 116  	// that created this dialCall
 117  	ctx  context.Context
 118  	done chan struct{} // closed when done
 119  	res  *ClientConn   // valid after done is closed
 120  	err  error         // valid after done is closed
 121  }
 122  
 123  // requires p.mu is held.
 124  func (p *clientConnPool) getStartDialLocked(ctx context.Context, addr string) *dialCall {
 125  	if call, ok := p.dialing[addr]; ok {
 126  		// A dial is already in-flight. Don't start another.
 127  		return call
 128  	}
 129  	call := &dialCall{p: p, done: make(chan struct{}), ctx: ctx}
 130  	if p.dialing == nil {
 131  		p.dialing = make(map[string]*dialCall)
 132  	}
 133  	p.dialing[addr] = call
 134  	go call.dial(call.ctx, addr)
 135  	return call
 136  }
 137  
 138  // run in its own goroutine.
 139  func (c *dialCall) dial(ctx context.Context, addr string) {
 140  	const singleUse = false // shared conn
 141  	c.res, c.err = c.p.t.dialClientConn(ctx, addr, singleUse)
 142  
 143  	c.p.mu.Lock()
 144  	delete(c.p.dialing, addr)
 145  	if c.err == nil {
 146  		c.p.addConnLocked(addr, c.res)
 147  	}
 148  	c.p.mu.Unlock()
 149  
 150  	close(c.done)
 151  }
 152  
 153  // addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't
 154  // already exist. It coalesces concurrent calls with the same key.
 155  // This is used by the http1 Transport code when it creates a new connection. Because
 156  // the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know
 157  // the protocol), it can get into a situation where it has multiple TLS connections.
 158  // This code decides which ones live or die.
 159  // The return value used is whether c was used.
 160  // c is never closed.
 161  func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c net.Conn) (used bool, err error) {
 162  	p.mu.Lock()
 163  	for _, cc := range p.conns[key] {
 164  		if cc.CanTakeNewRequest() {
 165  			p.mu.Unlock()
 166  			return false, nil
 167  		}
 168  	}
 169  	call, dup := p.addConnCalls[key]
 170  	if !dup {
 171  		if p.addConnCalls == nil {
 172  			p.addConnCalls = make(map[string]*addConnCall)
 173  		}
 174  		call = &addConnCall{
 175  			p:    p,
 176  			done: make(chan struct{}),
 177  		}
 178  		p.addConnCalls[key] = call
 179  		go call.run(t, key, c)
 180  	}
 181  	p.mu.Unlock()
 182  
 183  	<-call.done
 184  	if call.err != nil {
 185  		return false, call.err
 186  	}
 187  	return !dup, nil
 188  }
 189  
 190  type addConnCall struct {
 191  	_    incomparable
 192  	p    *clientConnPool
 193  	done chan struct{} // closed when done
 194  	err  error
 195  }
 196  
 197  func (c *addConnCall) run(t *Transport, key string, nc net.Conn) {
 198  	cc, err := t.NewClientConn(nc)
 199  
 200  	p := c.p
 201  	p.mu.Lock()
 202  	if err != nil {
 203  		c.err = err
 204  	} else {
 205  		cc.getConnCalled = true // already called by the net/http package
 206  		p.addConnLocked(key, cc)
 207  	}
 208  	delete(p.addConnCalls, key)
 209  	p.mu.Unlock()
 210  	close(c.done)
 211  }
 212  
 213  // p.mu must be held
 214  func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
 215  	for _, v := range p.conns[key] {
 216  		if v == cc {
 217  			return
 218  		}
 219  	}
 220  	if p.conns == nil {
 221  		p.conns = make(map[string][]*ClientConn)
 222  	}
 223  	if p.keys == nil {
 224  		p.keys = make(map[*ClientConn][]string)
 225  	}
 226  	p.conns[key] = append(p.conns[key], cc)
 227  	p.keys[cc] = append(p.keys[cc], key)
 228  }
 229  
 230  func (p *clientConnPool) MarkDead(cc *ClientConn) {
 231  	p.mu.Lock()
 232  	defer p.mu.Unlock()
 233  	for _, key := range p.keys[cc] {
 234  		vv, ok := p.conns[key]
 235  		if !ok {
 236  			continue
 237  		}
 238  		newList := filterOutClientConn(vv, cc)
 239  		if len(newList) > 0 {
 240  			p.conns[key] = newList
 241  		} else {
 242  			delete(p.conns, key)
 243  		}
 244  	}
 245  	delete(p.keys, cc)
 246  }
 247  
 248  func (p *clientConnPool) closeIdleConnections() {
 249  	p.mu.Lock()
 250  	defer p.mu.Unlock()
 251  	// TODO: don't close a cc if it was just added to the pool
 252  	// milliseconds ago and has never been used. There's currently
 253  	// a small race window with the HTTP/1 Transport's integration
 254  	// where it can add an idle conn just before using it, and
 255  	// somebody else can concurrently call CloseIdleConns and
 256  	// break some caller's RoundTrip.
 257  	for _, vv := range p.conns {
 258  		for _, cc := range vv {
 259  			cc.closeIfIdle()
 260  		}
 261  	}
 262  }
 263  
 264  func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
 265  	out := in[:0]
 266  	for _, v := range in {
 267  		if v != exclude {
 268  			out = append(out, v)
 269  		}
 270  	}
 271  	// If we filtered it out, zero out the last item to prevent
 272  	// the GC from seeing it.
 273  	if len(in) != len(out) {
 274  		in[len(in)-1] = nil
 275  	}
 276  	return out
 277  }
 278  
 279  // noDialClientConnPool is an implementation of http2.ClientConnPool
 280  // which never dials. We let the HTTP/1.1 client dial and use its TLS
 281  // connection instead.
 282  type noDialClientConnPool struct{ *clientConnPool }
 283  
 284  func (p noDialClientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
 285  	return p.getClientConn(req, addr, noDialOnMiss)
 286  }
 287  
 288  // shouldRetryDial reports whether the current request should
 289  // retry dialing after the call finished unsuccessfully, for example
 290  // if the dial was canceled because of a context cancellation or
 291  // deadline expiry.
 292  func shouldRetryDial(call *dialCall, req *http.Request) bool {
 293  	if call.err == nil {
 294  		// No error, no need to retry
 295  		return false
 296  	}
 297  	if call.ctx == req.Context() {
 298  		// If the call has the same context as the request, the dial
 299  		// should not be retried, since any cancellation will have come
 300  		// from this request.
 301  		return false
 302  	}
 303  	if !errors.Is(call.err, context.Canceled) && !errors.Is(call.err, context.DeadlineExceeded) {
 304  		// If the call error is not because of a context cancellation or a deadline expiry,
 305  		// the dial should not be retried.
 306  		return false
 307  	}
 308  	// Only retry if the error is a context cancellation error or deadline expiry
 309  	// and the context associated with the call was canceled or expired.
 310  	return call.ctx.Err() != nil
 311  }
 312