http2_client.go raw

   1  /*
   2   *
   3   * Copyright 2014 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package transport
  20  
  21  import (
  22  	"context"
  23  	"fmt"
  24  	"io"
  25  	"math"
  26  	"net"
  27  	"net/http"
  28  	"path/filepath"
  29  	"strconv"
  30  	"strings"
  31  	"sync"
  32  	"sync/atomic"
  33  	"time"
  34  
  35  	"golang.org/x/net/http2"
  36  	"golang.org/x/net/http2/hpack"
  37  	"google.golang.org/grpc/codes"
  38  	"google.golang.org/grpc/credentials"
  39  	"google.golang.org/grpc/internal"
  40  	"google.golang.org/grpc/internal/channelz"
  41  	icredentials "google.golang.org/grpc/internal/credentials"
  42  	"google.golang.org/grpc/internal/grpclog"
  43  	"google.golang.org/grpc/internal/grpcsync"
  44  	"google.golang.org/grpc/internal/grpcutil"
  45  	imetadata "google.golang.org/grpc/internal/metadata"
  46  	"google.golang.org/grpc/internal/proxyattributes"
  47  	istats "google.golang.org/grpc/internal/stats"
  48  	istatus "google.golang.org/grpc/internal/status"
  49  	isyscall "google.golang.org/grpc/internal/syscall"
  50  	"google.golang.org/grpc/internal/transport/networktype"
  51  	"google.golang.org/grpc/keepalive"
  52  	"google.golang.org/grpc/mem"
  53  	"google.golang.org/grpc/metadata"
  54  	"google.golang.org/grpc/peer"
  55  	"google.golang.org/grpc/resolver"
  56  	"google.golang.org/grpc/stats"
  57  	"google.golang.org/grpc/status"
  58  )
  59  
  60  // clientConnectionCounter counts the number of connections a client has
  61  // initiated (equal to the number of http2Clients created). Must be accessed
  62  // atomically.
  63  var clientConnectionCounter uint64
  64  
  65  var goAwayLoopyWriterTimeout = 5 * time.Second
  66  
  67  var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
  68  
  69  // http2Client implements the ClientTransport interface with HTTP2.
  70  type http2Client struct {
  71  	lastRead  int64 // Keep this field 64-bit aligned. Accessed atomically.
  72  	ctx       context.Context
  73  	cancel    context.CancelFunc
  74  	ctxDone   <-chan struct{} // Cache the ctx.Done() chan.
  75  	userAgent string
  76  	// address contains the resolver returned address for this transport.
  77  	// If the `ServerName` field is set, it takes precedence over `CallHdr.Host`
  78  	// passed to `NewStream`, when determining the :authority header.
  79  	address    resolver.Address
  80  	md         metadata.MD
  81  	conn       net.Conn // underlying communication channel
  82  	loopy      *loopyWriter
  83  	remoteAddr net.Addr
  84  	localAddr  net.Addr
  85  	authInfo   credentials.AuthInfo // auth info about the connection
  86  
  87  	readerDone chan struct{} // sync point to enable testing.
  88  	writerDone chan struct{} // sync point to enable testing.
  89  	// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
  90  	// that the server sent GoAway on this transport.
  91  	goAway        chan struct{}
  92  	keepaliveDone chan struct{} // Closed when the keepalive goroutine exits.
  93  	framer        *framer
  94  	// controlBuf delivers all the control related tasks (e.g., window
  95  	// updates, reset streams, and various settings) to the controller.
  96  	// Do not access controlBuf with mu held.
  97  	controlBuf *controlBuffer
  98  	fc         *trInFlow
  99  	// The scheme used: https if TLS is on, http otherwise.
 100  	scheme string
 101  
 102  	isSecure bool
 103  
 104  	perRPCCreds []credentials.PerRPCCredentials
 105  
 106  	kp               keepalive.ClientParameters
 107  	keepaliveEnabled bool
 108  
 109  	statsHandler stats.Handler
 110  
 111  	initialWindowSize int32
 112  
 113  	// configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
 114  	maxSendHeaderListSize *uint32
 115  
 116  	bdpEst *bdpEstimator
 117  
 118  	maxConcurrentStreams  uint32
 119  	streamQuota           int64
 120  	streamsQuotaAvailable chan struct{}
 121  	waitingStreams        uint32
 122  	registeredCompressors string
 123  
 124  	// Do not access controlBuf with mu held.
 125  	mu            sync.Mutex // guard the following variables
 126  	nextID        uint32
 127  	state         transportState
 128  	activeStreams map[uint32]*ClientStream
 129  	// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
 130  	prevGoAwayID uint32
 131  	// goAwayReason records the http2.ErrCode and debug data received with the
 132  	// GoAway frame.
 133  	goAwayReason GoAwayReason
 134  	// goAwayDebugMessage contains a detailed human readable string about a
 135  	// GoAway frame, useful for error messages.
 136  	goAwayDebugMessage string
 137  	// A condition variable used to signal when the keepalive goroutine should
 138  	// go dormant. The condition for dormancy is based on the number of active
 139  	// streams and the `PermitWithoutStream` keepalive client parameter. And
 140  	// since the number of active streams is guarded by the above mutex, we use
 141  	// the same for this condition variable as well.
 142  	kpDormancyCond *sync.Cond
 143  	// A boolean to track whether the keepalive goroutine is dormant or not.
 144  	// This is checked before attempting to signal the above condition
 145  	// variable.
 146  	kpDormant bool
 147  
 148  	channelz *channelz.Socket
 149  
 150  	onClose func(GoAwayReason)
 151  
 152  	bufferPool mem.BufferPool
 153  
 154  	connectionID uint64
 155  	logger       *grpclog.PrefixLogger
 156  }
 157  
 158  func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, grpcUA string) (net.Conn, error) {
 159  	address := addr.Addr
 160  	networkType, ok := networktype.Get(addr)
 161  	if fn != nil {
 162  		// Special handling for unix scheme with custom dialer. Back in the day,
 163  		// we did not have a unix resolver and therefore targets with a unix
 164  		// scheme would end up using the passthrough resolver. So, user's used a
 165  		// custom dialer in this case and expected the original dial target to
 166  		// be passed to the custom dialer. Now, we have a unix resolver. But if
 167  		// a custom dialer is specified, we want to retain the old behavior in
 168  		// terms of the address being passed to the custom dialer.
 169  		if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
 170  			// Supported unix targets are either "unix://absolute-path" or
 171  			// "unix:relative-path".
 172  			if filepath.IsAbs(address) {
 173  				return fn(ctx, "unix://"+address)
 174  			}
 175  			return fn(ctx, "unix:"+address)
 176  		}
 177  		return fn(ctx, address)
 178  	}
 179  	if !ok {
 180  		networkType, address = ParseDialTarget(address)
 181  	}
 182  	if opts, present := proxyattributes.Get(addr); present {
 183  		return proxyDial(ctx, addr, grpcUA, opts)
 184  	}
 185  	return internal.NetDialerWithTCPKeepalive().DialContext(ctx, networkType, address)
 186  }
 187  
 188  func isTemporary(err error) bool {
 189  	switch err := err.(type) {
 190  	case interface {
 191  		Temporary() bool
 192  	}:
 193  		return err.Temporary()
 194  	case interface {
 195  		Timeout() bool
 196  	}:
 197  		// Timeouts may be resolved upon retry, and are thus treated as
 198  		// temporary.
 199  		return err.Timeout()
 200  	}
 201  	return true
 202  }
 203  
 204  // NewHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
 205  // and starts to receive messages on it. Non-nil error returns if construction
 206  // fails.
 207  func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ ClientTransport, err error) {
 208  	scheme := "http"
 209  	ctx, cancel := context.WithCancel(ctx)
 210  	defer func() {
 211  		if err != nil {
 212  			cancel()
 213  		}
 214  	}()
 215  
 216  	// gRPC, resolver, balancer etc. can specify arbitrary data in the
 217  	// Attributes field of resolver.Address, which is shoved into connectCtx
 218  	// and passed to the dialer and credential handshaker. This makes it possible for
 219  	// address specific arbitrary data to reach custom dialers and credential handshakers.
 220  	connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
 221  
 222  	conn, err := dial(connectCtx, opts.Dialer, addr, opts.UserAgent)
 223  	if err != nil {
 224  		if opts.FailOnNonTempDialError {
 225  			return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
 226  		}
 227  		return nil, connectionErrorf(true, err, "transport: Error while dialing: %v", err)
 228  	}
 229  
 230  	// Any further errors will close the underlying connection
 231  	defer func(conn net.Conn) {
 232  		if err != nil {
 233  			conn.Close()
 234  		}
 235  	}(conn)
 236  
 237  	// The following defer and goroutine monitor the connectCtx for cancellation
 238  	// and deadline.  On context expiration, the connection is hard closed and
 239  	// this function will naturally fail as a result.  Otherwise, the defer
 240  	// waits for the goroutine to exit to prevent the context from being
 241  	// monitored (and to prevent the connection from ever being closed) after
 242  	// returning from this function.
 243  	ctxMonitorDone := grpcsync.NewEvent()
 244  	newClientCtx, newClientDone := context.WithCancel(connectCtx)
 245  	defer func() {
 246  		newClientDone()         // Awaken the goroutine below if connectCtx hasn't expired.
 247  		<-ctxMonitorDone.Done() // Wait for the goroutine below to exit.
 248  	}()
 249  	go func(conn net.Conn) {
 250  		defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
 251  		<-newClientCtx.Done()       // Block until connectCtx expires or the defer above executes.
 252  		if err := connectCtx.Err(); err != nil {
 253  			// connectCtx expired before exiting the function.  Hard close the connection.
 254  			if logger.V(logLevel) {
 255  				logger.Infof("Aborting due to connect deadline expiring: %v", err)
 256  			}
 257  			conn.Close()
 258  		}
 259  	}(conn)
 260  
 261  	kp := opts.KeepaliveParams
 262  	// Validate keepalive parameters.
 263  	if kp.Time == 0 {
 264  		kp.Time = defaultClientKeepaliveTime
 265  	}
 266  	if kp.Timeout == 0 {
 267  		kp.Timeout = defaultClientKeepaliveTimeout
 268  	}
 269  	keepaliveEnabled := false
 270  	if kp.Time != infinity {
 271  		if err = isyscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
 272  			return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
 273  		}
 274  		keepaliveEnabled = true
 275  	}
 276  	var (
 277  		isSecure bool
 278  		authInfo credentials.AuthInfo
 279  	)
 280  	transportCreds := opts.TransportCredentials
 281  	perRPCCreds := opts.PerRPCCredentials
 282  
 283  	if b := opts.CredsBundle; b != nil {
 284  		if t := b.TransportCredentials(); t != nil {
 285  			transportCreds = t
 286  		}
 287  		if t := b.PerRPCCredentials(); t != nil {
 288  			perRPCCreds = append(perRPCCreds, t)
 289  		}
 290  	}
 291  	if transportCreds != nil {
 292  		conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
 293  		if err != nil {
 294  			return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
 295  		}
 296  		for _, cd := range perRPCCreds {
 297  			if cd.RequireTransportSecurity() {
 298  				if ci, ok := authInfo.(interface {
 299  					GetCommonAuthInfo() credentials.CommonAuthInfo
 300  				}); ok {
 301  					secLevel := ci.GetCommonAuthInfo().SecurityLevel
 302  					if secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {
 303  						return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
 304  					}
 305  				}
 306  			}
 307  		}
 308  		isSecure = true
 309  		if transportCreds.Info().SecurityProtocol == "tls" {
 310  			scheme = "https"
 311  		}
 312  	}
 313  	icwz := int32(initialWindowSize)
 314  	if opts.InitialConnWindowSize >= defaultWindowSize {
 315  		icwz = opts.InitialConnWindowSize
 316  	}
 317  	writeBufSize := opts.WriteBufferSize
 318  	readBufSize := opts.ReadBufferSize
 319  	maxHeaderListSize := defaultClientMaxHeaderListSize
 320  	if opts.MaxHeaderListSize != nil {
 321  		maxHeaderListSize = *opts.MaxHeaderListSize
 322  	}
 323  
 324  	t := &http2Client{
 325  		ctx:                   ctx,
 326  		ctxDone:               ctx.Done(), // Cache Done chan.
 327  		cancel:                cancel,
 328  		userAgent:             opts.UserAgent,
 329  		registeredCompressors: grpcutil.RegisteredCompressors(),
 330  		address:               addr,
 331  		conn:                  conn,
 332  		remoteAddr:            conn.RemoteAddr(),
 333  		localAddr:             conn.LocalAddr(),
 334  		authInfo:              authInfo,
 335  		readerDone:            make(chan struct{}),
 336  		writerDone:            make(chan struct{}),
 337  		goAway:                make(chan struct{}),
 338  		keepaliveDone:         make(chan struct{}),
 339  		framer:                newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize, opts.BufferPool),
 340  		fc:                    &trInFlow{limit: uint32(icwz)},
 341  		scheme:                scheme,
 342  		activeStreams:         make(map[uint32]*ClientStream),
 343  		isSecure:              isSecure,
 344  		perRPCCreds:           perRPCCreds,
 345  		kp:                    kp,
 346  		statsHandler:          istats.NewCombinedHandler(opts.StatsHandlers...),
 347  		initialWindowSize:     initialWindowSize,
 348  		nextID:                1,
 349  		maxConcurrentStreams:  defaultMaxStreamsClient,
 350  		streamQuota:           defaultMaxStreamsClient,
 351  		streamsQuotaAvailable: make(chan struct{}, 1),
 352  		keepaliveEnabled:      keepaliveEnabled,
 353  		bufferPool:            opts.BufferPool,
 354  		onClose:               onClose,
 355  	}
 356  	var czSecurity credentials.ChannelzSecurityValue
 357  	if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
 358  		czSecurity = au.GetSecurityValue()
 359  	}
 360  	t.channelz = channelz.RegisterSocket(
 361  		&channelz.Socket{
 362  			SocketType:       channelz.SocketTypeNormal,
 363  			Parent:           opts.ChannelzParent,
 364  			SocketMetrics:    channelz.SocketMetrics{},
 365  			EphemeralMetrics: t.socketMetrics,
 366  			LocalAddr:        t.localAddr,
 367  			RemoteAddr:       t.remoteAddr,
 368  			SocketOptions:    channelz.GetSocketOption(t.conn),
 369  			Security:         czSecurity,
 370  		})
 371  	t.logger = prefixLoggerForClientTransport(t)
 372  	// Add peer information to the http2client context.
 373  	t.ctx = peer.NewContext(t.ctx, t.Peer())
 374  
 375  	if md, ok := addr.Metadata.(*metadata.MD); ok {
 376  		t.md = *md
 377  	} else if md := imetadata.Get(addr); md != nil {
 378  		t.md = md
 379  	}
 380  	t.controlBuf = newControlBuffer(t.ctxDone)
 381  	if opts.InitialWindowSize >= defaultWindowSize {
 382  		t.initialWindowSize = opts.InitialWindowSize
 383  	}
 384  	if !opts.StaticWindowSize {
 385  		t.bdpEst = &bdpEstimator{
 386  			bdp:               initialWindowSize,
 387  			updateFlowControl: t.updateFlowControl,
 388  		}
 389  	}
 390  	if t.statsHandler != nil {
 391  		t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
 392  			RemoteAddr: t.remoteAddr,
 393  			LocalAddr:  t.localAddr,
 394  		})
 395  		t.statsHandler.HandleConn(t.ctx, &stats.ConnBegin{
 396  			Client: true,
 397  		})
 398  	}
 399  	if t.keepaliveEnabled {
 400  		t.kpDormancyCond = sync.NewCond(&t.mu)
 401  		go t.keepalive()
 402  	}
 403  
 404  	// Start the reader goroutine for incoming messages. Each transport has a
 405  	// dedicated goroutine which reads HTTP2 frames from the network. Then it
 406  	// dispatches the frame to the corresponding stream entity.  When the
 407  	// server preface is received, readerErrCh is closed.  If an error occurs
 408  	// first, an error is pushed to the channel.  This must be checked before
 409  	// returning from this function.
 410  	readerErrCh := make(chan error, 1)
 411  	go t.reader(readerErrCh)
 412  	defer func() {
 413  		if err != nil {
 414  			// writerDone should be closed since the loopy goroutine
 415  			// wouldn't have started in the case this function returns an error.
 416  			close(t.writerDone)
 417  			t.Close(err)
 418  		}
 419  	}()
 420  
 421  	// Send connection preface to server.
 422  	n, err := t.conn.Write(clientPreface)
 423  	if err != nil {
 424  		err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
 425  		return nil, err
 426  	}
 427  	if n != len(clientPreface) {
 428  		err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
 429  		return nil, err
 430  	}
 431  	var ss []http2.Setting
 432  
 433  	if t.initialWindowSize != defaultWindowSize {
 434  		ss = append(ss, http2.Setting{
 435  			ID:  http2.SettingInitialWindowSize,
 436  			Val: uint32(t.initialWindowSize),
 437  		})
 438  	}
 439  	if opts.MaxHeaderListSize != nil {
 440  		ss = append(ss, http2.Setting{
 441  			ID:  http2.SettingMaxHeaderListSize,
 442  			Val: *opts.MaxHeaderListSize,
 443  		})
 444  	}
 445  	err = t.framer.fr.WriteSettings(ss...)
 446  	if err != nil {
 447  		err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
 448  		return nil, err
 449  	}
 450  	// Adjust the connection flow control window if needed.
 451  	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
 452  		if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
 453  			err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
 454  			return nil, err
 455  		}
 456  	}
 457  
 458  	t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
 459  
 460  	if err := t.framer.writer.Flush(); err != nil {
 461  		return nil, err
 462  	}
 463  	// Block until the server preface is received successfully or an error occurs.
 464  	if err = <-readerErrCh; err != nil {
 465  		return nil, err
 466  	}
 467  	go func() {
 468  		t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
 469  		if err := t.loopy.run(); !isIOError(err) {
 470  			// Immediately close the connection, as the loopy writer returns
 471  			// when there are no more active streams and we were draining (the
 472  			// server sent a GOAWAY).  For I/O errors, the reader will hit it
 473  			// after draining any remaining incoming data.
 474  			t.conn.Close()
 475  		}
 476  		close(t.writerDone)
 477  	}()
 478  	return t, nil
 479  }
 480  
 481  func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientStream {
 482  	// TODO(zhaoq): Handle uint32 overflow of Stream.id.
 483  	s := &ClientStream{
 484  		Stream: Stream{
 485  			method:         callHdr.Method,
 486  			sendCompress:   callHdr.SendCompress,
 487  			contentSubtype: callHdr.ContentSubtype,
 488  		},
 489  		ct:         t,
 490  		done:       make(chan struct{}),
 491  		headerChan: make(chan struct{}),
 492  		doneFunc:   callHdr.DoneFunc,
 493  	}
 494  	s.Stream.buf.init()
 495  	s.Stream.wq.init(defaultWriteQuota, s.done)
 496  	s.readRequester = s
 497  	// The client side stream context should have exactly the same life cycle with the user provided context.
 498  	// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
 499  	// So we use the original context here instead of creating a copy.
 500  	s.ctx = ctx
 501  	s.trReader = transportReader{
 502  		reader: recvBufferReader{
 503  			ctx:          s.ctx,
 504  			ctxDone:      s.ctx.Done(),
 505  			recv:         &s.buf,
 506  			clientStream: s,
 507  		},
 508  		windowHandler: s,
 509  	}
 510  	return s
 511  }
 512  
 513  func (t *http2Client) Peer() *peer.Peer {
 514  	return &peer.Peer{
 515  		Addr:      t.remoteAddr,
 516  		AuthInfo:  t.authInfo, // Can be nil
 517  		LocalAddr: t.localAddr,
 518  	}
 519  }
 520  
 521  // OutgoingGoAwayHandler writes a GOAWAY to the connection.  Always returns (false, err) as we want the GoAway
 522  // to be the last frame loopy writes to the transport.
 523  func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) {
 524  	t.mu.Lock()
 525  	maxStreamID := t.nextID - 2
 526  	t.mu.Unlock()
 527  	if err := t.framer.fr.WriteGoAway(maxStreamID, http2.ErrCodeNo, g.debugData); err != nil {
 528  		return false, err
 529  	}
 530  	return false, g.closeConn
 531  }
 532  
 533  func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
 534  	aud := t.createAudience(callHdr)
 535  	ri := credentials.RequestInfo{
 536  		Method:   callHdr.Method,
 537  		AuthInfo: t.authInfo,
 538  	}
 539  	ctxWithRequestInfo := credentials.NewContextWithRequestInfo(ctx, ri)
 540  	authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
 541  	if err != nil {
 542  		return nil, err
 543  	}
 544  	callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
 545  	if err != nil {
 546  		return nil, err
 547  	}
 548  	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
 549  	// first and create a slice of that exact size.
 550  	// Make the slice of certain predictable size to reduce allocations made by append.
 551  	hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
 552  	hfLen += len(authData) + len(callAuthData)
 553  	registeredCompressors := t.registeredCompressors
 554  	if callHdr.AcceptedCompressors != nil {
 555  		registeredCompressors = *callHdr.AcceptedCompressors
 556  	}
 557  	if callHdr.PreviousAttempts > 0 {
 558  		hfLen++
 559  	}
 560  	if callHdr.SendCompress != "" {
 561  		hfLen++
 562  	}
 563  	if registeredCompressors != "" {
 564  		hfLen++
 565  	}
 566  	if _, ok := ctx.Deadline(); ok {
 567  		hfLen++
 568  	}
 569  	headerFields := make([]hpack.HeaderField, 0, hfLen)
 570  	headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
 571  	headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
 572  	headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
 573  	headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
 574  	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(callHdr.ContentSubtype)})
 575  	headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
 576  	headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
 577  	if callHdr.PreviousAttempts > 0 {
 578  		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
 579  	}
 580  
 581  	if callHdr.SendCompress != "" {
 582  		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
 583  		// Include the outgoing compressor name when compressor is not registered
 584  		// via encoding.RegisterCompressor. This is possible when client uses
 585  		// WithCompressor dial option.
 586  		if !grpcutil.IsCompressorNameRegistered(callHdr.SendCompress) {
 587  			if registeredCompressors != "" {
 588  				registeredCompressors += ","
 589  			}
 590  			registeredCompressors += callHdr.SendCompress
 591  		}
 592  	}
 593  
 594  	if registeredCompressors != "" {
 595  		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: registeredCompressors})
 596  	}
 597  	if dl, ok := ctx.Deadline(); ok {
 598  		// Send out timeout regardless its value. The server can detect timeout context by itself.
 599  		// TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
 600  		timeout := time.Until(dl)
 601  		if timeout <= 0 {
 602  			return nil, status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error())
 603  		}
 604  		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)})
 605  	}
 606  	for k, v := range authData {
 607  		headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
 608  	}
 609  	for k, v := range callAuthData {
 610  		headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
 611  	}
 612  
 613  	if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
 614  		var k string
 615  		for k, vv := range md {
 616  			// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
 617  			if isReservedHeader(k) {
 618  				continue
 619  			}
 620  			for _, v := range vv {
 621  				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
 622  			}
 623  		}
 624  		for _, vv := range added {
 625  			for i, v := range vv {
 626  				if i%2 == 0 {
 627  					k = strings.ToLower(v)
 628  					continue
 629  				}
 630  				// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
 631  				if isReservedHeader(k) {
 632  					continue
 633  				}
 634  				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
 635  			}
 636  		}
 637  	}
 638  	for k, vv := range t.md {
 639  		if isReservedHeader(k) {
 640  			continue
 641  		}
 642  		for _, v := range vv {
 643  			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
 644  		}
 645  	}
 646  	return headerFields, nil
 647  }
 648  
 649  func (t *http2Client) createAudience(callHdr *CallHdr) string {
 650  	// Create an audience string only if needed.
 651  	if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
 652  		return ""
 653  	}
 654  	// Construct URI required to get auth request metadata.
 655  	// Omit port if it is the default one.
 656  	host := strings.TrimSuffix(callHdr.Host, ":443")
 657  	pos := strings.LastIndex(callHdr.Method, "/")
 658  	if pos == -1 {
 659  		pos = len(callHdr.Method)
 660  	}
 661  	return "https://" + host + callHdr.Method[:pos]
 662  }
 663  
 664  func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
 665  	if len(t.perRPCCreds) == 0 {
 666  		return nil, nil
 667  	}
 668  	authData := map[string]string{}
 669  	for _, c := range t.perRPCCreds {
 670  		data, err := c.GetRequestMetadata(ctx, audience)
 671  		if err != nil {
 672  			if st, ok := status.FromError(err); ok {
 673  				// Restrict the code to the list allowed by gRFC A54.
 674  				if istatus.IsRestrictedControlPlaneCode(st) {
 675  					err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
 676  				}
 677  				return nil, err
 678  			}
 679  
 680  			return nil, status.Errorf(codes.Unauthenticated, "transport: per-RPC creds failed due to error: %v", err)
 681  		}
 682  		for k, v := range data {
 683  			// Capital header names are illegal in HTTP/2.
 684  			k = strings.ToLower(k)
 685  			authData[k] = v
 686  		}
 687  	}
 688  	return authData, nil
 689  }
 690  
 691  func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
 692  	var callAuthData map[string]string
 693  	// Check if credentials.PerRPCCredentials were provided via call options.
 694  	// Note: if these credentials are provided both via dial options and call
 695  	// options, then both sets of credentials will be applied.
 696  	if callCreds := callHdr.Creds; callCreds != nil {
 697  		if callCreds.RequireTransportSecurity() {
 698  			ri, _ := credentials.RequestInfoFromContext(ctx)
 699  			if !t.isSecure || credentials.CheckSecurityLevel(ri.AuthInfo, credentials.PrivacyAndIntegrity) != nil {
 700  				return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
 701  			}
 702  		}
 703  		data, err := callCreds.GetRequestMetadata(ctx, audience)
 704  		if err != nil {
 705  			if st, ok := status.FromError(err); ok {
 706  				// Restrict the code to the list allowed by gRFC A54.
 707  				if istatus.IsRestrictedControlPlaneCode(st) {
 708  					err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
 709  				}
 710  				return nil, err
 711  			}
 712  			return nil, status.Errorf(codes.Internal, "transport: per-RPC creds failed due to error: %v", err)
 713  		}
 714  		callAuthData = make(map[string]string, len(data))
 715  		for k, v := range data {
 716  			// Capital header names are illegal in HTTP/2
 717  			k = strings.ToLower(k)
 718  			callAuthData[k] = v
 719  		}
 720  	}
 721  	return callAuthData, nil
 722  }
 723  
 724  // NewStreamError wraps an error and reports additional information.  Typically
 725  // NewStream errors result in transparent retry, as they mean nothing went onto
 726  // the wire.  However, there are two notable exceptions:
 727  //
 728  //  1. If the stream headers violate the max header list size allowed by the
 729  //     server.  It's possible this could succeed on another transport, even if
 730  //     it's unlikely, but do not transparently retry.
 731  //  2. If the credentials errored when requesting their headers.  In this case,
 732  //     it's possible a retry can fix the problem, but indefinitely transparently
 733  //     retrying is not appropriate as it is likely the credentials, if they can
 734  //     eventually succeed, would need I/O to do so.
 735  type NewStreamError struct {
 736  	Err error
 737  
 738  	AllowTransparentRetry bool
 739  }
 740  
 741  func (e NewStreamError) Error() string {
 742  	return e.Err.Error()
 743  }
 744  
 745  // NewStream creates a stream and registers it into the transport as "active"
 746  // streams.  All non-nil errors returned will be *NewStreamError.
 747  func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error) {
 748  	ctx = peer.NewContext(ctx, t.Peer())
 749  
 750  	// ServerName field of the resolver returned address takes precedence over
 751  	// Host field of CallHdr to determine the :authority header. This is because,
 752  	// the ServerName field takes precedence for server authentication during
 753  	// TLS handshake, and the :authority header should match the value used
 754  	// for server authentication.
 755  	if t.address.ServerName != "" {
 756  		newCallHdr := *callHdr
 757  		newCallHdr.Host = t.address.ServerName
 758  		callHdr = &newCallHdr
 759  	}
 760  
 761  	// The authority specified via the `CallAuthority` CallOption takes the
 762  	// highest precedence when determining the `:authority` header. It overrides
 763  	// any value present in the Host field of CallHdr. Before applying this
 764  	// override, the authority string is validated. If the credentials do not
 765  	// implement the AuthorityValidator interface, or if validation fails, the
 766  	// RPC is failed with a status code of `UNAVAILABLE`.
 767  	if callHdr.Authority != "" {
 768  		auth, ok := t.authInfo.(credentials.AuthorityValidator)
 769  		if !ok {
 770  			return nil, &NewStreamError{Err: status.Errorf(codes.Unavailable, "credentials type %q does not implement the AuthorityValidator interface, but authority override specified with CallAuthority call option", t.authInfo.AuthType())}
 771  		}
 772  		if err := auth.ValidateAuthority(callHdr.Authority); err != nil {
 773  			return nil, &NewStreamError{Err: status.Errorf(codes.Unavailable, "failed to validate authority %q : %v", callHdr.Authority, err)}
 774  		}
 775  		newCallHdr := *callHdr
 776  		newCallHdr.Host = callHdr.Authority
 777  		callHdr = &newCallHdr
 778  	}
 779  
 780  	headerFields, err := t.createHeaderFields(ctx, callHdr)
 781  	if err != nil {
 782  		return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
 783  	}
 784  	s := t.newStream(ctx, callHdr)
 785  	cleanup := func(err error) {
 786  		if s.swapState(streamDone) == streamDone {
 787  			// If it was already done, return.
 788  			return
 789  		}
 790  		// The stream was unprocessed by the server.
 791  		s.unprocessed.Store(true)
 792  		s.write(recvMsg{err: err})
 793  		close(s.done)
 794  		// If headerChan isn't closed, then close it.
 795  		if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
 796  			close(s.headerChan)
 797  		}
 798  	}
 799  	hdr := &headerFrame{
 800  		hf:        headerFields,
 801  		endStream: false,
 802  		initStream: func(uint32) error {
 803  			t.mu.Lock()
 804  			// TODO: handle transport closure in loopy instead and remove this
 805  			// initStream is never called when transport is draining.
 806  			if t.state == closing {
 807  				t.mu.Unlock()
 808  				cleanup(ErrConnClosing)
 809  				return ErrConnClosing
 810  			}
 811  			if channelz.IsOn() {
 812  				t.channelz.SocketMetrics.StreamsStarted.Add(1)
 813  				t.channelz.SocketMetrics.LastLocalStreamCreatedTimestamp.Store(time.Now().UnixNano())
 814  			}
 815  			// If the keepalive goroutine has gone dormant, wake it up.
 816  			if t.kpDormant {
 817  				t.kpDormancyCond.Signal()
 818  			}
 819  			t.mu.Unlock()
 820  			return nil
 821  		},
 822  		onOrphaned: cleanup,
 823  		wq:         &s.wq,
 824  	}
 825  	firstTry := true
 826  	var ch chan struct{}
 827  	transportDrainRequired := false
 828  	checkForStreamQuota := func() bool {
 829  		if t.streamQuota <= 0 { // Can go negative if server decreases it.
 830  			if firstTry {
 831  				t.waitingStreams++
 832  			}
 833  			ch = t.streamsQuotaAvailable
 834  			return false
 835  		}
 836  		if !firstTry {
 837  			t.waitingStreams--
 838  		}
 839  		t.streamQuota--
 840  
 841  		t.mu.Lock()
 842  		if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
 843  			t.mu.Unlock()
 844  			return false // Don't create a stream if the transport is already closed.
 845  		}
 846  
 847  		hdr.streamID = t.nextID
 848  		t.nextID += 2
 849  		// Drain client transport if nextID > MaxStreamID which signals gRPC that
 850  		// the connection is closed and a new one must be created for subsequent RPCs.
 851  		transportDrainRequired = t.nextID > MaxStreamID
 852  
 853  		s.id = hdr.streamID
 854  		s.fc = inFlow{limit: uint32(t.initialWindowSize)}
 855  		t.activeStreams[s.id] = s
 856  		t.mu.Unlock()
 857  
 858  		if t.streamQuota > 0 && t.waitingStreams > 0 {
 859  			select {
 860  			case t.streamsQuotaAvailable <- struct{}{}:
 861  			default:
 862  			}
 863  		}
 864  		return true
 865  	}
 866  	var hdrListSizeErr error
 867  	checkForHeaderListSize := func() bool {
 868  		if t.maxSendHeaderListSize == nil {
 869  			return true
 870  		}
 871  		var sz int64
 872  		for _, f := range hdr.hf {
 873  			if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
 874  				hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
 875  				return false
 876  			}
 877  		}
 878  		return true
 879  	}
 880  	for {
 881  		success, err := t.controlBuf.executeAndPut(func() bool {
 882  			return checkForHeaderListSize() && checkForStreamQuota()
 883  		}, hdr)
 884  		if err != nil {
 885  			// Connection closed.
 886  			return nil, &NewStreamError{Err: err, AllowTransparentRetry: true}
 887  		}
 888  		if success {
 889  			break
 890  		}
 891  		if hdrListSizeErr != nil {
 892  			return nil, &NewStreamError{Err: hdrListSizeErr}
 893  		}
 894  		firstTry = false
 895  		select {
 896  		case <-ch:
 897  		case <-ctx.Done():
 898  			return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
 899  		case <-t.goAway:
 900  			return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
 901  		case <-t.ctx.Done():
 902  			return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
 903  		}
 904  	}
 905  	if t.statsHandler != nil {
 906  		header, ok := metadata.FromOutgoingContext(ctx)
 907  		if ok {
 908  			header.Set("user-agent", t.userAgent)
 909  		} else {
 910  			header = metadata.Pairs("user-agent", t.userAgent)
 911  		}
 912  		// Note: The header fields are compressed with hpack after this call returns.
 913  		// No WireLength field is set here.
 914  		t.statsHandler.HandleRPC(s.ctx, &stats.OutHeader{
 915  			Client:      true,
 916  			FullMethod:  callHdr.Method,
 917  			RemoteAddr:  t.remoteAddr,
 918  			LocalAddr:   t.localAddr,
 919  			Compression: callHdr.SendCompress,
 920  			Header:      header,
 921  		})
 922  	}
 923  	if transportDrainRequired {
 924  		if t.logger.V(logLevel) {
 925  			t.logger.Infof("Draining transport: t.nextID > MaxStreamID")
 926  		}
 927  		t.GracefulClose()
 928  	}
 929  	return s, nil
 930  }
 931  
 932  func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
 933  	// Set stream status to done.
 934  	if s.swapState(streamDone) == streamDone {
 935  		// If it was already done, return.  If multiple closeStream calls
 936  		// happen simultaneously, wait for the first to finish.
 937  		<-s.done
 938  		return
 939  	}
 940  	// status and trailers can be updated here without any synchronization because the stream goroutine will
 941  	// only read it after it sees an io.EOF error from read or write and we'll write those errors
 942  	// only after updating this.
 943  	s.status = st
 944  	if len(mdata) > 0 {
 945  		s.trailer = mdata
 946  	}
 947  	if err != nil {
 948  		// This will unblock reads eventually.
 949  		s.write(recvMsg{err: err})
 950  	}
 951  	// If headerChan isn't closed, then close it.
 952  	if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
 953  		s.noHeaders = true
 954  		close(s.headerChan)
 955  	}
 956  	cleanup := &cleanupStream{
 957  		streamID: s.id,
 958  		onWrite: func() {
 959  			t.mu.Lock()
 960  			if t.activeStreams != nil {
 961  				delete(t.activeStreams, s.id)
 962  			}
 963  			t.mu.Unlock()
 964  			if channelz.IsOn() {
 965  				if eosReceived {
 966  					t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
 967  				} else {
 968  					t.channelz.SocketMetrics.StreamsFailed.Add(1)
 969  				}
 970  			}
 971  		},
 972  		rst:     rst,
 973  		rstCode: rstCode,
 974  	}
 975  	addBackStreamQuota := func() bool {
 976  		t.streamQuota++
 977  		if t.streamQuota > 0 && t.waitingStreams > 0 {
 978  			select {
 979  			case t.streamsQuotaAvailable <- struct{}{}:
 980  			default:
 981  			}
 982  		}
 983  		return true
 984  	}
 985  	t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
 986  	// This will unblock write.
 987  	close(s.done)
 988  	if s.doneFunc != nil {
 989  		s.doneFunc()
 990  	}
 991  }
 992  
 993  // Close kicks off the shutdown process of the transport. This should be called
 994  // only once on a transport. Once it is called, the transport should not be
 995  // accessed anymore.
 996  func (t *http2Client) Close(err error) {
 997  	t.conn.SetWriteDeadline(time.Now().Add(time.Second * 10))
 998  	// For background on the deadline value chosen here, see
 999  	// https://github.com/grpc/grpc-go/issues/8425#issuecomment-3057938248 .
1000  	t.conn.SetReadDeadline(time.Now().Add(time.Second))
1001  	t.mu.Lock()
1002  	// Make sure we only close once.
1003  	if t.state == closing {
1004  		t.mu.Unlock()
1005  		return
1006  	}
1007  	if t.logger.V(logLevel) {
1008  		t.logger.Infof("Closing: %v", err)
1009  	}
1010  	// Call t.onClose ASAP to prevent the client from attempting to create new
1011  	// streams.
1012  	if t.state != draining {
1013  		t.onClose(GoAwayInvalid)
1014  	}
1015  	t.state = closing
1016  	streams := t.activeStreams
1017  	t.activeStreams = nil
1018  	if t.kpDormant {
1019  		// If the keepalive goroutine is blocked on this condition variable, we
1020  		// should unblock it so that the goroutine eventually exits.
1021  		t.kpDormancyCond.Signal()
1022  	}
1023  	// Append info about previous goaways if there were any, since this may be important
1024  	// for understanding the root cause for this connection to be closed.
1025  	goAwayDebugMessage := t.goAwayDebugMessage
1026  	t.mu.Unlock()
1027  
1028  	// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the
1029  	// connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY. It
1030  	// also waits for loopyWriter to be closed with a timer to avoid the
1031  	// long blocking in case the connection is blackholed, i.e. TCP is
1032  	// just stuck.
1033  	t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err})
1034  	timer := time.NewTimer(goAwayLoopyWriterTimeout)
1035  	defer timer.Stop()
1036  	select {
1037  	case <-t.writerDone: // success
1038  	case <-timer.C:
1039  		t.logger.Infof("Failed to write a GOAWAY frame as part of connection close after %s. Giving up and closing the transport.", goAwayLoopyWriterTimeout)
1040  	}
1041  	t.cancel()
1042  	t.conn.Close()
1043  	// Waits for the reader and keepalive goroutines to exit before returning to
1044  	// ensure all resources are cleaned up before Close can return.
1045  	<-t.readerDone
1046  	if t.keepaliveEnabled {
1047  		<-t.keepaliveDone
1048  	}
1049  	channelz.RemoveEntry(t.channelz.ID)
1050  	var st *status.Status
1051  	if len(goAwayDebugMessage) > 0 {
1052  		st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
1053  		err = st.Err()
1054  	} else {
1055  		st = status.New(codes.Unavailable, err.Error())
1056  	}
1057  
1058  	// Notify all active streams.
1059  	for _, s := range streams {
1060  		t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
1061  	}
1062  	if t.statsHandler != nil {
1063  		t.statsHandler.HandleConn(t.ctx, &stats.ConnEnd{
1064  			Client: true,
1065  		})
1066  	}
1067  }
1068  
1069  // GracefulClose sets the state to draining, which prevents new streams from
1070  // being created and causes the transport to be closed when the last active
1071  // stream is closed.  If there are no active streams, the transport is closed
1072  // immediately.  This does nothing if the transport is already draining or
1073  // closing.
1074  func (t *http2Client) GracefulClose() {
1075  	t.mu.Lock()
1076  	// Make sure we move to draining only from active.
1077  	if t.state == draining || t.state == closing {
1078  		t.mu.Unlock()
1079  		return
1080  	}
1081  	if t.logger.V(logLevel) {
1082  		t.logger.Infof("GracefulClose called")
1083  	}
1084  	t.onClose(GoAwayInvalid)
1085  	t.state = draining
1086  	active := len(t.activeStreams)
1087  	t.mu.Unlock()
1088  	if active == 0 {
1089  		t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
1090  		return
1091  	}
1092  	t.controlBuf.put(&incomingGoAway{})
1093  }
1094  
1095  // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
1096  // should proceed only if Write returns nil.
1097  func (t *http2Client) write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
1098  	if opts.Last {
1099  		// If it's the last message, update stream state.
1100  		if !s.compareAndSwapState(streamActive, streamWriteDone) {
1101  			return errStreamDone
1102  		}
1103  	} else if s.getState() != streamActive {
1104  		return errStreamDone
1105  	}
1106  	df := &dataFrame{
1107  		streamID:  s.id,
1108  		endStream: opts.Last,
1109  		h:         hdr,
1110  		data:      data,
1111  	}
1112  	dataLen := data.Len()
1113  	if hdr != nil || dataLen != 0 { // If it's not an empty data frame, check quota.
1114  		if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
1115  			return err
1116  		}
1117  	}
1118  	data.Ref()
1119  	if err := t.controlBuf.put(df); err != nil {
1120  		data.Free()
1121  		return err
1122  	}
1123  	t.incrMsgSent()
1124  	return nil
1125  }
1126  
1127  func (t *http2Client) getStream(f http2.Frame) *ClientStream {
1128  	t.mu.Lock()
1129  	s := t.activeStreams[f.Header().StreamID]
1130  	t.mu.Unlock()
1131  	return s
1132  }
1133  
1134  // adjustWindow sends out extra window update over the initial window size
1135  // of stream if the application is requesting data larger in size than
1136  // the window.
1137  func (t *http2Client) adjustWindow(s *ClientStream, n uint32) {
1138  	if w := s.fc.maybeAdjust(n); w > 0 {
1139  		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
1140  	}
1141  }
1142  
1143  // updateWindow adjusts the inbound quota for the stream.
1144  // Window updates will be sent out when the cumulative quota
1145  // exceeds the corresponding threshold.
1146  func (t *http2Client) updateWindow(s *ClientStream, n uint32) {
1147  	if w := s.fc.onRead(n); w > 0 {
1148  		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
1149  	}
1150  }
1151  
1152  // updateFlowControl updates the incoming flow control windows
1153  // for the transport and the stream based on the current bdp
1154  // estimation.
1155  func (t *http2Client) updateFlowControl(n uint32) {
1156  	updateIWS := func() bool {
1157  		t.initialWindowSize = int32(n)
1158  		t.mu.Lock()
1159  		for _, s := range t.activeStreams {
1160  			s.fc.newLimit(n)
1161  		}
1162  		t.mu.Unlock()
1163  		return true
1164  	}
1165  	t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
1166  	t.controlBuf.put(&outgoingSettings{
1167  		ss: []http2.Setting{
1168  			{
1169  				ID:  http2.SettingInitialWindowSize,
1170  				Val: n,
1171  			},
1172  		},
1173  	})
1174  }
1175  
1176  func (t *http2Client) handleData(f *parsedDataFrame) {
1177  	size := f.Header().Length
1178  	var sendBDPPing bool
1179  	if t.bdpEst != nil {
1180  		sendBDPPing = t.bdpEst.add(size)
1181  	}
1182  	// Decouple connection's flow control from application's read.
1183  	// An update on connection's flow control should not depend on
1184  	// whether user application has read the data or not. Such a
1185  	// restriction is already imposed on the stream's flow control,
1186  	// and therefore the sender will be blocked anyways.
1187  	// Decoupling the connection flow control will prevent other
1188  	// active(fast) streams from starving in presence of slow or
1189  	// inactive streams.
1190  	//
1191  	if w := t.fc.onData(size); w > 0 {
1192  		t.controlBuf.put(&outgoingWindowUpdate{
1193  			streamID:  0,
1194  			increment: w,
1195  		})
1196  	}
1197  	if sendBDPPing {
1198  		// Avoid excessive ping detection (e.g. in an L7 proxy)
1199  		// by sending a window update prior to the BDP ping.
1200  
1201  		if w := t.fc.reset(); w > 0 {
1202  			t.controlBuf.put(&outgoingWindowUpdate{
1203  				streamID:  0,
1204  				increment: w,
1205  			})
1206  		}
1207  
1208  		t.controlBuf.put(bdpPing)
1209  	}
1210  	// Select the right stream to dispatch.
1211  	s := t.getStream(f)
1212  	if s == nil {
1213  		return
1214  	}
1215  	if size > 0 {
1216  		if err := s.fc.onData(size); err != nil {
1217  			t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
1218  			return
1219  		}
1220  		dataLen := f.data.Len()
1221  		if f.Header().Flags.Has(http2.FlagDataPadded) {
1222  			if w := s.fc.onRead(size - uint32(dataLen)); w > 0 {
1223  				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
1224  			}
1225  		}
1226  		if dataLen > 0 {
1227  			f.data.Ref()
1228  			s.write(recvMsg{buffer: f.data})
1229  		}
1230  	}
1231  	// The server has closed the stream without sending trailers.  Record that
1232  	// the read direction is closed, and set the status appropriately.
1233  	if f.StreamEnded() {
1234  		t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
1235  	}
1236  }
1237  
1238  func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
1239  	s := t.getStream(f)
1240  	if s == nil {
1241  		return
1242  	}
1243  	if f.ErrCode == http2.ErrCodeRefusedStream {
1244  		// The stream was unprocessed by the server.
1245  		s.unprocessed.Store(true)
1246  	}
1247  	statusCode, ok := http2ErrConvTab[f.ErrCode]
1248  	if !ok {
1249  		if t.logger.V(logLevel) {
1250  			t.logger.Infof("Received a RST_STREAM frame with code %q, but found no mapped gRPC status", f.ErrCode)
1251  		}
1252  		statusCode = codes.Unknown
1253  	}
1254  	if statusCode == codes.Canceled {
1255  		if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
1256  			// Our deadline was already exceeded, and that was likely the cause
1257  			// of this cancellation.  Alter the status code accordingly.
1258  			statusCode = codes.DeadlineExceeded
1259  		}
1260  	}
1261  	st := status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode)
1262  	t.closeStream(s, st.Err(), false, http2.ErrCodeNo, st, nil, false)
1263  }
1264  
1265  func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
1266  	if f.IsAck() {
1267  		return
1268  	}
1269  	var maxStreams *uint32
1270  	var ss []http2.Setting
1271  	var updateFuncs []func()
1272  	f.ForeachSetting(func(s http2.Setting) error {
1273  		switch s.ID {
1274  		case http2.SettingMaxConcurrentStreams:
1275  			maxStreams = new(uint32)
1276  			*maxStreams = s.Val
1277  		case http2.SettingMaxHeaderListSize:
1278  			updateFuncs = append(updateFuncs, func() {
1279  				t.maxSendHeaderListSize = new(uint32)
1280  				*t.maxSendHeaderListSize = s.Val
1281  			})
1282  		default:
1283  			ss = append(ss, s)
1284  		}
1285  		return nil
1286  	})
1287  	if isFirst && maxStreams == nil {
1288  		maxStreams = new(uint32)
1289  		*maxStreams = math.MaxUint32
1290  	}
1291  	sf := &incomingSettings{
1292  		ss: ss,
1293  	}
1294  	if maxStreams != nil {
1295  		updateStreamQuota := func() {
1296  			delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
1297  			t.maxConcurrentStreams = *maxStreams
1298  			t.streamQuota += delta
1299  			if delta > 0 && t.waitingStreams > 0 {
1300  				close(t.streamsQuotaAvailable) // wake all of them up.
1301  				t.streamsQuotaAvailable = make(chan struct{}, 1)
1302  			}
1303  		}
1304  		updateFuncs = append(updateFuncs, updateStreamQuota)
1305  	}
1306  	t.controlBuf.executeAndPut(func() bool {
1307  		for _, f := range updateFuncs {
1308  			f()
1309  		}
1310  		return true
1311  	}, sf)
1312  }
1313  
1314  func (t *http2Client) handlePing(f *http2.PingFrame) {
1315  	if f.IsAck() {
1316  		// Maybe it's a BDP ping.
1317  		if t.bdpEst != nil {
1318  			t.bdpEst.calculate(f.Data)
1319  		}
1320  		return
1321  	}
1322  	pingAck := &ping{ack: true}
1323  	copy(pingAck.data[:], f.Data[:])
1324  	t.controlBuf.put(pingAck)
1325  }
1326  
1327  func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
1328  	t.mu.Lock()
1329  	if t.state == closing {
1330  		t.mu.Unlock()
1331  		return nil
1332  	}
1333  	if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" {
1334  		// When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1335  		// data equal to ASCII "too_many_pings", it should log the occurrence at a log level that is
1336  		// enabled by default and double the configure KEEPALIVE_TIME used for new connections
1337  		// on that channel.
1338  		logger.Errorf("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\".")
1339  	}
1340  	id := f.LastStreamID
1341  	if id > 0 && id%2 == 0 {
1342  		t.mu.Unlock()
1343  		return connectionErrorf(true, nil, "received goaway with non-zero even-numbered stream id: %v", id)
1344  	}
1345  	// A client can receive multiple GoAways from the server (see
1346  	// https://github.com/grpc/grpc-go/issues/1387).  The idea is that the first
1347  	// GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
1348  	// sent after an RTT delay with the ID of the last stream the server will
1349  	// process.
1350  	//
1351  	// Therefore, when we get the first GoAway we don't necessarily close any
1352  	// streams. While in case of second GoAway we close all streams created after
1353  	// the GoAwayId. This way streams that were in-flight while the GoAway from
1354  	// server was being sent don't get killed.
1355  	select {
1356  	case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
1357  		// If there are multiple GoAways the first one should always have an ID greater than the following ones.
1358  		if id > t.prevGoAwayID {
1359  			t.mu.Unlock()
1360  			return connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID)
1361  		}
1362  	default:
1363  		t.setGoAwayReason(f)
1364  		close(t.goAway)
1365  		defer t.controlBuf.put(&incomingGoAway{}) // Defer as t.mu is currently held.
1366  		// Notify the clientconn about the GOAWAY before we set the state to
1367  		// draining, to allow the client to stop attempting to create streams
1368  		// before disallowing new streams on this connection.
1369  		if t.state != draining {
1370  			t.onClose(t.goAwayReason)
1371  			t.state = draining
1372  		}
1373  	}
1374  	// All streams with IDs greater than the GoAwayId
1375  	// and smaller than the previous GoAway ID should be killed.
1376  	upperLimit := t.prevGoAwayID
1377  	if upperLimit == 0 { // This is the first GoAway Frame.
1378  		upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
1379  	}
1380  
1381  	t.prevGoAwayID = id
1382  	if len(t.activeStreams) == 0 {
1383  		t.mu.Unlock()
1384  		return connectionErrorf(true, nil, "received goaway and there are no active streams")
1385  	}
1386  
1387  	streamsToClose := make([]*ClientStream, 0)
1388  	for streamID, stream := range t.activeStreams {
1389  		if streamID > id && streamID <= upperLimit {
1390  			// The stream was unprocessed by the server.
1391  			stream.unprocessed.Store(true)
1392  			streamsToClose = append(streamsToClose, stream)
1393  		}
1394  	}
1395  	t.mu.Unlock()
1396  	// Called outside t.mu because closeStream can take controlBuf's mu, which
1397  	// could induce deadlock and is not allowed.
1398  	for _, stream := range streamsToClose {
1399  		t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
1400  	}
1401  	return nil
1402  }
1403  
1404  // setGoAwayReason sets the value of t.goAwayReason based
1405  // on the GoAway frame received.
1406  // It expects a lock on transport's mutex to be held by
1407  // the caller.
1408  func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1409  	t.goAwayReason = GoAwayNoReason
1410  	if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
1411  		if string(f.DebugData()) == "too_many_pings" {
1412  			t.goAwayReason = GoAwayTooManyPings
1413  		}
1414  	}
1415  	if len(f.DebugData()) == 0 {
1416  		t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode)
1417  	} else {
1418  		t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", f.ErrCode, string(f.DebugData()))
1419  	}
1420  }
1421  
1422  func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {
1423  	t.mu.Lock()
1424  	defer t.mu.Unlock()
1425  	return t.goAwayReason, t.goAwayDebugMessage
1426  }
1427  
1428  func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1429  	t.controlBuf.put(&incomingWindowUpdate{
1430  		streamID:  f.Header().StreamID,
1431  		increment: f.Increment,
1432  	})
1433  }
1434  
1435  // operateHeaders takes action on the decoded headers.
1436  func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1437  	s := t.getStream(frame)
1438  	if s == nil {
1439  		return
1440  	}
1441  	endStream := frame.StreamEnded()
1442  	s.bytesReceived.Store(true)
1443  	initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
1444  
1445  	if !initialHeader && !endStream {
1446  		// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
1447  		st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
1448  		t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
1449  		return
1450  	}
1451  
1452  	// frame.Truncated is set to true when framer detects that the current header
1453  	// list size hits MaxHeaderListSize limit.
1454  	if frame.Truncated {
1455  		se := status.New(codes.Internal, "peer header list size exceeded limit")
1456  		t.closeStream(s, se.Err(), true, http2.ErrCodeFrameSize, se, nil, endStream)
1457  		return
1458  	}
1459  
1460  	var (
1461  		// If a gRPC Response-Headers has already been received, then it means
1462  		// that the peer is speaking gRPC and we are in gRPC mode.
1463  		isGRPC         = !initialHeader
1464  		mdata          = make(map[string][]string)
1465  		contentTypeErr = "malformed header: missing HTTP content-type"
1466  		grpcMessage    string
1467  		recvCompress   string
1468  		httpStatusErr  string
1469  		// the code from the grpc-status header, if present
1470  		grpcStatusCode = codes.Unknown
1471  		// headerError is set if an error is encountered while parsing the headers
1472  		headerError string
1473  		httpStatus  string
1474  	)
1475  
1476  	for _, hf := range frame.Fields {
1477  		switch hf.Name {
1478  		case "content-type":
1479  			if _, validContentType := grpcutil.ContentSubtype(hf.Value); !validContentType {
1480  				contentTypeErr = fmt.Sprintf("transport: received unexpected content-type %q", hf.Value)
1481  				break
1482  			}
1483  			contentTypeErr = ""
1484  			mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
1485  			isGRPC = true
1486  		case "grpc-encoding":
1487  			recvCompress = hf.Value
1488  		case "grpc-status":
1489  			code, err := strconv.ParseInt(hf.Value, 10, 32)
1490  			if err != nil {
1491  				se := status.New(codes.Unknown, fmt.Sprintf("transport: malformed grpc-status: %v", err))
1492  				t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1493  				return
1494  			}
1495  			grpcStatusCode = codes.Code(uint32(code))
1496  		case "grpc-message":
1497  			grpcMessage = decodeGrpcMessage(hf.Value)
1498  		case ":status":
1499  			httpStatus = hf.Value
1500  		default:
1501  			if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
1502  				break
1503  			}
1504  			v, err := decodeMetadataHeader(hf.Name, hf.Value)
1505  			if err != nil {
1506  				headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err)
1507  				logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
1508  				break
1509  			}
1510  			mdata[hf.Name] = append(mdata[hf.Name], v)
1511  		}
1512  	}
1513  
1514  	// If a non-gRPC response is received, then evaluate the HTTP status to
1515  	// process the response and close the stream.
1516  	// In case http status doesn't provide any error information (status : 200),
1517  	// then evalute response code to be Unknown.
1518  	if !isGRPC {
1519  		var grpcErrorCode = codes.Internal
1520  		if httpStatus == "" {
1521  			httpStatusErr = "malformed header: missing HTTP status"
1522  		} else {
1523  			// Parse the status codes (e.g. "200", 404").
1524  			statusCode, err := strconv.Atoi(httpStatus)
1525  			if err != nil {
1526  				se := status.New(grpcErrorCode, fmt.Sprintf("transport: malformed http-status: %v", err))
1527  				t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1528  				return
1529  			}
1530  			if statusCode >= 100 && statusCode < 200 {
1531  				if endStream {
1532  					se := status.New(codes.Internal, fmt.Sprintf(
1533  						"protocol error: informational header with status code %d must not have END_STREAM set", statusCode))
1534  					t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1535  				}
1536  				// In case of informational headers, return.
1537  				return
1538  			}
1539  			httpStatusErr = fmt.Sprintf(
1540  				"unexpected HTTP status code received from server: %d (%s)",
1541  				statusCode,
1542  				http.StatusText(statusCode),
1543  			)
1544  			var ok bool
1545  			grpcErrorCode, ok = HTTPStatusConvTab[statusCode]
1546  			if !ok {
1547  				grpcErrorCode = codes.Unknown
1548  			}
1549  		}
1550  		var errs []string
1551  		if httpStatusErr != "" {
1552  			errs = append(errs, httpStatusErr)
1553  		}
1554  
1555  		if contentTypeErr != "" {
1556  			errs = append(errs, contentTypeErr)
1557  		}
1558  
1559  		se := status.New(grpcErrorCode, strings.Join(errs, "; "))
1560  		t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1561  		return
1562  	}
1563  
1564  	if headerError != "" {
1565  		se := status.New(codes.Internal, headerError)
1566  		t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1567  		return
1568  	}
1569  
1570  	// For headers, set them in s.header and close headerChan.  For trailers or
1571  	// trailers-only, closeStream will set the trailers and close headerChan as
1572  	// needed.
1573  	if !endStream {
1574  		// If headerChan hasn't been closed yet (expected, given we checked it
1575  		// above, but something else could have potentially closed the whole
1576  		// stream).
1577  		if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
1578  			s.headerValid = true
1579  			// These values can be set without any synchronization because
1580  			// stream goroutine will read it only after seeing a closed
1581  			// headerChan which we'll close after setting this.
1582  			s.recvCompress = recvCompress
1583  			if len(mdata) > 0 {
1584  				s.header = mdata
1585  			}
1586  			close(s.headerChan)
1587  		}
1588  	}
1589  
1590  	if t.statsHandler != nil {
1591  		if !endStream {
1592  			t.statsHandler.HandleRPC(s.ctx, &stats.InHeader{
1593  				Client:      true,
1594  				WireLength:  int(frame.Header().Length),
1595  				Header:      metadata.MD(mdata).Copy(),
1596  				Compression: s.recvCompress,
1597  			})
1598  		} else {
1599  			t.statsHandler.HandleRPC(s.ctx, &stats.InTrailer{
1600  				Client:     true,
1601  				WireLength: int(frame.Header().Length),
1602  				Trailer:    metadata.MD(mdata).Copy(),
1603  			})
1604  		}
1605  	}
1606  
1607  	if !endStream {
1608  		return
1609  	}
1610  
1611  	status := istatus.NewWithProto(grpcStatusCode, grpcMessage, mdata[grpcStatusDetailsBinHeader])
1612  
1613  	// If client received END_STREAM from server while stream was still active,
1614  	// send RST_STREAM.
1615  	rstStream := s.getState() == streamActive
1616  	t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, status, mdata, true)
1617  }
1618  
1619  // readServerPreface reads and handles the initial settings frame from the
1620  // server.
1621  func (t *http2Client) readServerPreface() error {
1622  	frame, err := t.framer.fr.ReadFrame()
1623  	if err != nil {
1624  		return connectionErrorf(true, err, "error reading server preface: %v", err)
1625  	}
1626  	sf, ok := frame.(*http2.SettingsFrame)
1627  	if !ok {
1628  		return connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame)
1629  	}
1630  	t.handleSettings(sf, true)
1631  	return nil
1632  }
1633  
1634  // reader verifies the server preface and reads all subsequent data from
1635  // network connection.  If the server preface is not read successfully, an
1636  // error is pushed to errCh; otherwise errCh is closed with no error.
1637  func (t *http2Client) reader(errCh chan<- error) {
1638  	var errClose error
1639  	defer func() {
1640  		close(t.readerDone)
1641  		if errClose != nil {
1642  			t.Close(errClose)
1643  		}
1644  	}()
1645  
1646  	if err := t.readServerPreface(); err != nil {
1647  		errCh <- err
1648  		return
1649  	}
1650  	close(errCh)
1651  	if t.keepaliveEnabled {
1652  		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
1653  	}
1654  
1655  	// loop to keep reading incoming messages on this transport.
1656  	for {
1657  		t.controlBuf.throttle()
1658  		frame, err := t.framer.readFrame()
1659  		if t.keepaliveEnabled {
1660  			atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
1661  		}
1662  		if err != nil {
1663  			// Abort an active stream if the http2.Framer returns a
1664  			// http2.StreamError. This can happen only if the server's response
1665  			// is malformed http2.
1666  			if se, ok := err.(http2.StreamError); ok {
1667  				t.mu.Lock()
1668  				s := t.activeStreams[se.StreamID]
1669  				t.mu.Unlock()
1670  				if s != nil {
1671  					// use error detail to provide better err message
1672  					code := http2ErrConvTab[se.Code]
1673  					errorDetail := t.framer.errorDetail()
1674  					var msg string
1675  					if errorDetail != nil {
1676  						msg = errorDetail.Error()
1677  					} else {
1678  						msg = "received invalid frame"
1679  					}
1680  					t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
1681  				}
1682  				continue
1683  			}
1684  			// Transport error.
1685  			errClose = connectionErrorf(true, err, "error reading from server: %v", err)
1686  			return
1687  		}
1688  		switch frame := frame.(type) {
1689  		case *http2.MetaHeadersFrame:
1690  			t.operateHeaders(frame)
1691  		case *parsedDataFrame:
1692  			t.handleData(frame)
1693  			frame.data.Free()
1694  		case *http2.RSTStreamFrame:
1695  			t.handleRSTStream(frame)
1696  		case *http2.SettingsFrame:
1697  			t.handleSettings(frame, false)
1698  		case *http2.PingFrame:
1699  			t.handlePing(frame)
1700  		case *http2.GoAwayFrame:
1701  			errClose = t.handleGoAway(frame)
1702  		case *http2.WindowUpdateFrame:
1703  			t.handleWindowUpdate(frame)
1704  		default:
1705  			if logger.V(logLevel) {
1706  				logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1707  			}
1708  		}
1709  	}
1710  }
1711  
1712  // keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
1713  func (t *http2Client) keepalive() {
1714  	var err error
1715  	defer func() {
1716  		close(t.keepaliveDone)
1717  		if err != nil {
1718  			t.Close(err)
1719  		}
1720  	}()
1721  	p := &ping{data: [8]byte{}}
1722  	// True iff a ping has been sent, and no data has been received since then.
1723  	outstandingPing := false
1724  	// Amount of time remaining before which we should receive an ACK for the
1725  	// last sent ping.
1726  	timeoutLeft := time.Duration(0)
1727  	// Records the last value of t.lastRead before we go block on the timer.
1728  	// This is required to check for read activity since then.
1729  	prevNano := time.Now().UnixNano()
1730  	timer := time.NewTimer(t.kp.Time)
1731  	for {
1732  		select {
1733  		case <-timer.C:
1734  			lastRead := atomic.LoadInt64(&t.lastRead)
1735  			if lastRead > prevNano {
1736  				// There has been read activity since the last time we were here.
1737  				outstandingPing = false
1738  				// Next timer should fire at kp.Time seconds from lastRead time.
1739  				timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1740  				prevNano = lastRead
1741  				continue
1742  			}
1743  			if outstandingPing && timeoutLeft <= 0 {
1744  				err = connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout")
1745  				return
1746  			}
1747  			t.mu.Lock()
1748  			if t.state == closing {
1749  				// If the transport is closing, we should exit from the
1750  				// keepalive goroutine here. If not, we could have a race
1751  				// between the call to Signal() from Close() and the call to
1752  				// Wait() here, whereby the keepalive goroutine ends up
1753  				// blocking on the condition variable which will never be
1754  				// signalled again.
1755  				t.mu.Unlock()
1756  				return
1757  			}
1758  			if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1759  				// If a ping was sent out previously (because there were active
1760  				// streams at that point) which wasn't acked and its timeout
1761  				// hadn't fired, but we got here and are about to go dormant,
1762  				// we should make sure that we unconditionally send a ping once
1763  				// we awaken.
1764  				outstandingPing = false
1765  				t.kpDormant = true
1766  				t.kpDormancyCond.Wait()
1767  			}
1768  			t.kpDormant = false
1769  			t.mu.Unlock()
1770  
1771  			// We get here either because we were dormant and a new stream was
1772  			// created which unblocked the Wait() call, or because the
1773  			// keepalive timer expired. In both cases, we need to send a ping.
1774  			if !outstandingPing {
1775  				if channelz.IsOn() {
1776  					t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
1777  				}
1778  				t.controlBuf.put(p)
1779  				timeoutLeft = t.kp.Timeout
1780  				outstandingPing = true
1781  			}
1782  			// The amount of time to sleep here is the minimum of kp.Time and
1783  			// timeoutLeft. This will ensure that we wait only for kp.Time
1784  			// before sending out the next ping (for cases where the ping is
1785  			// acked).
1786  			sleepDuration := min(t.kp.Time, timeoutLeft)
1787  			timeoutLeft -= sleepDuration
1788  			timer.Reset(sleepDuration)
1789  		case <-t.ctx.Done():
1790  			if !timer.Stop() {
1791  				<-timer.C
1792  			}
1793  			return
1794  		}
1795  	}
1796  }
1797  
1798  func (t *http2Client) Error() <-chan struct{} {
1799  	return t.ctx.Done()
1800  }
1801  
1802  func (t *http2Client) GoAway() <-chan struct{} {
1803  	return t.goAway
1804  }
1805  
1806  func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics {
1807  	return &channelz.EphemeralSocketMetrics{
1808  		LocalFlowControlWindow:  int64(t.fc.getSize()),
1809  		RemoteFlowControlWindow: t.getOutFlowWindow(),
1810  	}
1811  }
1812  
1813  func (t *http2Client) incrMsgSent() {
1814  	if channelz.IsOn() {
1815  		t.channelz.SocketMetrics.MessagesSent.Add(1)
1816  		t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())
1817  	}
1818  }
1819  
1820  func (t *http2Client) incrMsgRecv() {
1821  	if channelz.IsOn() {
1822  		t.channelz.SocketMetrics.MessagesReceived.Add(1)
1823  		t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())
1824  	}
1825  }
1826  
1827  func (t *http2Client) getOutFlowWindow() int64 {
1828  	resp := make(chan uint32, 1)
1829  	timer := time.NewTimer(time.Second)
1830  	defer timer.Stop()
1831  	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1832  	select {
1833  	case sz := <-resp:
1834  		return int64(sz)
1835  	case <-t.ctxDone:
1836  		return -1
1837  	case <-timer.C:
1838  		return -2
1839  	}
1840  }
1841  
1842  func (t *http2Client) stateForTesting() transportState {
1843  	t.mu.Lock()
1844  	defer t.mu.Unlock()
1845  	return t.state
1846  }
1847