http2_server.go raw

   1  /*
   2   *
   3   * Copyright 2014 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package transport
  20  
  21  import (
  22  	"bytes"
  23  	"context"
  24  	"errors"
  25  	"fmt"
  26  	"io"
  27  	"math"
  28  	rand "math/rand/v2"
  29  	"net"
  30  	"net/http"
  31  	"strconv"
  32  	"sync"
  33  	"sync/atomic"
  34  	"time"
  35  
  36  	"golang.org/x/net/http2"
  37  	"golang.org/x/net/http2/hpack"
  38  	"google.golang.org/protobuf/proto"
  39  
  40  	"google.golang.org/grpc/internal"
  41  	"google.golang.org/grpc/internal/grpclog"
  42  	"google.golang.org/grpc/internal/grpcutil"
  43  	"google.golang.org/grpc/internal/pretty"
  44  	istatus "google.golang.org/grpc/internal/status"
  45  	"google.golang.org/grpc/internal/syscall"
  46  	"google.golang.org/grpc/mem"
  47  
  48  	"google.golang.org/grpc/codes"
  49  	"google.golang.org/grpc/credentials"
  50  	"google.golang.org/grpc/internal/channelz"
  51  	"google.golang.org/grpc/internal/grpcsync"
  52  	"google.golang.org/grpc/keepalive"
  53  	"google.golang.org/grpc/metadata"
  54  	"google.golang.org/grpc/peer"
  55  	"google.golang.org/grpc/stats"
  56  	"google.golang.org/grpc/status"
  57  	"google.golang.org/grpc/tap"
  58  )
  59  
  60  var (
  61  	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
  62  	// the stream's state.
  63  	ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
  64  	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
  65  	// than the limit set by peer.
  66  	ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
  67  )
  68  
  69  // serverConnectionCounter counts the number of connections a server has seen
  70  // (equal to the number of http2Servers created). Must be accessed atomically.
  71  var serverConnectionCounter uint64
  72  
  73  // http2Server implements the ServerTransport interface with HTTP2.
  74  type http2Server struct {
  75  	lastRead        int64 // Keep this field 64-bit aligned. Accessed atomically.
  76  	done            chan struct{}
  77  	conn            net.Conn
  78  	loopy           *loopyWriter
  79  	readerDone      chan struct{} // sync point to enable testing.
  80  	loopyWriterDone chan struct{}
  81  	peer            peer.Peer
  82  	inTapHandle     tap.ServerInHandle
  83  	framer          *framer
  84  	// The max number of concurrent streams.
  85  	maxStreams uint32
  86  	// controlBuf delivers all the control related tasks (e.g., window
  87  	// updates, reset streams, and various settings) to the controller.
  88  	controlBuf *controlBuffer
  89  	fc         *trInFlow
  90  	stats      stats.Handler
  91  	// Keepalive and max-age parameters for the server.
  92  	kp keepalive.ServerParameters
  93  	// Keepalive enforcement policy.
  94  	kep keepalive.EnforcementPolicy
  95  	// The time instance last ping was received.
  96  	lastPingAt time.Time
  97  	// Number of times the client has violated keepalive ping policy so far.
  98  	pingStrikes uint8
  99  	// Flag to signify that number of ping strikes should be reset to 0.
 100  	// This is set whenever data or header frames are sent.
 101  	// 1 means yes.
 102  	resetPingStrikes      uint32 // Accessed atomically.
 103  	initialWindowSize     int32
 104  	bdpEst                *bdpEstimator
 105  	maxSendHeaderListSize *uint32
 106  
 107  	mu sync.Mutex // guard the following
 108  
 109  	// drainEvent is initialized when Drain() is called the first time. After
 110  	// which the server writes out the first GoAway(with ID 2^31-1) frame. Then
 111  	// an independent goroutine will be launched to later send the second
 112  	// GoAway. During this time we don't want to write another first GoAway(with
 113  	// ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
 114  	// already initialized since draining is already underway.
 115  	drainEvent    *grpcsync.Event
 116  	state         transportState
 117  	activeStreams map[uint32]*ServerStream
 118  	// idle is the time instant when the connection went idle.
 119  	// This is either the beginning of the connection or when the number of
 120  	// RPCs go down to 0.
 121  	// When the connection is busy, this value is set to 0.
 122  	idle time.Time
 123  
 124  	// Fields below are for channelz metric collection.
 125  	channelz   *channelz.Socket
 126  	bufferPool mem.BufferPool
 127  
 128  	connectionID uint64
 129  
 130  	// maxStreamMu guards the maximum stream ID
 131  	// This lock may not be taken if mu is already held.
 132  	maxStreamMu sync.Mutex
 133  	maxStreamID uint32 // max stream ID ever seen
 134  
 135  	logger *grpclog.PrefixLogger
 136  	// setResetPingStrikes is stored as a closure instead of making this a
 137  	// method on http2Server to avoid a heap allocation when converting a method
 138  	// to a closure for passing to frames objects.
 139  	setResetPingStrikes func()
 140  }
 141  
 142  // NewServerTransport creates a http2 transport with conn and configuration
 143  // options from config.
 144  //
 145  // It returns a non-nil transport and a nil error on success. On failure, it
 146  // returns a nil transport and a non-nil error. For a special case where the
 147  // underlying conn gets closed before the client preface could be read, it
 148  // returns a nil transport and a nil error.
 149  func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
 150  	var authInfo credentials.AuthInfo
 151  	rawConn := conn
 152  	if config.Credentials != nil {
 153  		var err error
 154  		conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
 155  		if err != nil {
 156  			// ErrConnDispatched means that the connection was dispatched away
 157  			// from gRPC; those connections should be left open. io.EOF means
 158  			// the connection was closed before handshaking completed, which can
 159  			// happen naturally from probers. Return these errors directly.
 160  			if err == credentials.ErrConnDispatched || err == io.EOF {
 161  				return nil, err
 162  			}
 163  			return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
 164  		}
 165  	}
 166  	writeBufSize := config.WriteBufferSize
 167  	readBufSize := config.ReadBufferSize
 168  	maxHeaderListSize := defaultServerMaxHeaderListSize
 169  	if config.MaxHeaderListSize != nil {
 170  		maxHeaderListSize = *config.MaxHeaderListSize
 171  	}
 172  	framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize, config.BufferPool)
 173  	// Send initial settings as connection preface to client.
 174  	isettings := []http2.Setting{{
 175  		ID:  http2.SettingMaxFrameSize,
 176  		Val: http2MaxFrameLen,
 177  	}}
 178  	if config.MaxStreams != math.MaxUint32 {
 179  		isettings = append(isettings, http2.Setting{
 180  			ID:  http2.SettingMaxConcurrentStreams,
 181  			Val: config.MaxStreams,
 182  		})
 183  	}
 184  	iwz := int32(initialWindowSize)
 185  	if config.InitialWindowSize >= defaultWindowSize {
 186  		iwz = config.InitialWindowSize
 187  	}
 188  	icwz := int32(initialWindowSize)
 189  	if config.InitialConnWindowSize >= defaultWindowSize {
 190  		icwz = config.InitialConnWindowSize
 191  	}
 192  	if iwz != defaultWindowSize {
 193  		isettings = append(isettings, http2.Setting{
 194  			ID:  http2.SettingInitialWindowSize,
 195  			Val: uint32(iwz)})
 196  	}
 197  	if config.MaxHeaderListSize != nil {
 198  		isettings = append(isettings, http2.Setting{
 199  			ID:  http2.SettingMaxHeaderListSize,
 200  			Val: *config.MaxHeaderListSize,
 201  		})
 202  	}
 203  	if config.HeaderTableSize != nil {
 204  		isettings = append(isettings, http2.Setting{
 205  			ID:  http2.SettingHeaderTableSize,
 206  			Val: *config.HeaderTableSize,
 207  		})
 208  	}
 209  	if err := framer.fr.WriteSettings(isettings...); err != nil {
 210  		return nil, connectionErrorf(false, err, "transport: %v", err)
 211  	}
 212  	// Adjust the connection flow control window if needed.
 213  	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
 214  		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
 215  			return nil, connectionErrorf(false, err, "transport: %v", err)
 216  		}
 217  	}
 218  	kp := config.KeepaliveParams
 219  	if kp.MaxConnectionIdle == 0 {
 220  		kp.MaxConnectionIdle = defaultMaxConnectionIdle
 221  	}
 222  	if kp.MaxConnectionAge == 0 {
 223  		kp.MaxConnectionAge = defaultMaxConnectionAge
 224  	}
 225  	// Add a jitter to MaxConnectionAge.
 226  	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
 227  	if kp.MaxConnectionAgeGrace == 0 {
 228  		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
 229  	}
 230  	if kp.Time == 0 {
 231  		kp.Time = defaultServerKeepaliveTime
 232  	}
 233  	if kp.Timeout == 0 {
 234  		kp.Timeout = defaultServerKeepaliveTimeout
 235  	}
 236  	if kp.Time != infinity {
 237  		if err = syscall.SetTCPUserTimeout(rawConn, kp.Timeout); err != nil {
 238  			return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
 239  		}
 240  	}
 241  	kep := config.KeepalivePolicy
 242  	if kep.MinTime == 0 {
 243  		kep.MinTime = defaultKeepalivePolicyMinTime
 244  	}
 245  
 246  	done := make(chan struct{})
 247  	peer := peer.Peer{
 248  		Addr:      conn.RemoteAddr(),
 249  		LocalAddr: conn.LocalAddr(),
 250  		AuthInfo:  authInfo,
 251  	}
 252  	t := &http2Server{
 253  		done:              done,
 254  		conn:              conn,
 255  		peer:              peer,
 256  		framer:            framer,
 257  		readerDone:        make(chan struct{}),
 258  		loopyWriterDone:   make(chan struct{}),
 259  		maxStreams:        config.MaxStreams,
 260  		inTapHandle:       config.InTapHandle,
 261  		fc:                &trInFlow{limit: uint32(icwz)},
 262  		state:             reachable,
 263  		activeStreams:     make(map[uint32]*ServerStream),
 264  		stats:             config.StatsHandler,
 265  		kp:                kp,
 266  		idle:              time.Now(),
 267  		kep:               kep,
 268  		initialWindowSize: iwz,
 269  		bufferPool:        config.BufferPool,
 270  	}
 271  	t.setResetPingStrikes = func() {
 272  		atomic.StoreUint32(&t.resetPingStrikes, 1)
 273  	}
 274  	var czSecurity credentials.ChannelzSecurityValue
 275  	if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
 276  		czSecurity = au.GetSecurityValue()
 277  	}
 278  	t.channelz = channelz.RegisterSocket(
 279  		&channelz.Socket{
 280  			SocketType:       channelz.SocketTypeNormal,
 281  			Parent:           config.ChannelzParent,
 282  			SocketMetrics:    channelz.SocketMetrics{},
 283  			EphemeralMetrics: t.socketMetrics,
 284  			LocalAddr:        t.peer.LocalAddr,
 285  			RemoteAddr:       t.peer.Addr,
 286  			SocketOptions:    channelz.GetSocketOption(t.conn),
 287  			Security:         czSecurity,
 288  		},
 289  	)
 290  	t.logger = prefixLoggerForServerTransport(t)
 291  
 292  	t.controlBuf = newControlBuffer(t.done)
 293  	if !config.StaticWindowSize {
 294  		t.bdpEst = &bdpEstimator{
 295  			bdp:               initialWindowSize,
 296  			updateFlowControl: t.updateFlowControl,
 297  		}
 298  	}
 299  
 300  	t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
 301  	t.framer.writer.Flush()
 302  
 303  	defer func() {
 304  		if err != nil {
 305  			t.Close(err)
 306  		}
 307  	}()
 308  
 309  	// Check the validity of client preface.
 310  	preface := make([]byte, len(clientPreface))
 311  	if _, err := io.ReadFull(t.conn, preface); err != nil {
 312  		// In deployments where a gRPC server runs behind a cloud load balancer
 313  		// which performs regular TCP level health checks, the connection is
 314  		// closed immediately by the latter.  Returning io.EOF here allows the
 315  		// grpc server implementation to recognize this scenario and suppress
 316  		// logging to reduce spam.
 317  		if err == io.EOF {
 318  			return nil, io.EOF
 319  		}
 320  		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
 321  	}
 322  	if !bytes.Equal(preface, clientPreface) {
 323  		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
 324  	}
 325  
 326  	frame, err := t.framer.fr.ReadFrame()
 327  	if err == io.EOF || err == io.ErrUnexpectedEOF {
 328  		return nil, err
 329  	}
 330  	if err != nil {
 331  		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
 332  	}
 333  	atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
 334  	sf, ok := frame.(*http2.SettingsFrame)
 335  	if !ok {
 336  		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
 337  	}
 338  	t.handleSettings(sf)
 339  
 340  	go func() {
 341  		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
 342  		err := t.loopy.run()
 343  		close(t.loopyWriterDone)
 344  		if !isIOError(err) {
 345  			// Close the connection if a non-I/O error occurs (for I/O errors
 346  			// the reader will also encounter the error and close).  Wait 1
 347  			// second before closing the connection, or when the reader is done
 348  			// (i.e. the client already closed the connection or a connection
 349  			// error occurred).  This avoids the potential problem where there
 350  			// is unread data on the receive side of the connection, which, if
 351  			// closed, would lead to a TCP RST instead of FIN, and the client
 352  			// encountering errors.  For more info:
 353  			// https://github.com/grpc/grpc-go/issues/5358
 354  			timer := time.NewTimer(time.Second)
 355  			defer timer.Stop()
 356  			select {
 357  			case <-t.readerDone:
 358  			case <-timer.C:
 359  			}
 360  			t.conn.Close()
 361  		}
 362  	}()
 363  	go t.keepalive()
 364  	return t, nil
 365  }
 366  
 367  // operateHeaders takes action on the decoded headers. Returns an error if fatal
 368  // error encountered and transport needs to close, otherwise returns nil.
 369  func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*ServerStream)) error {
 370  	// Acquire max stream ID lock for entire duration
 371  	t.maxStreamMu.Lock()
 372  	defer t.maxStreamMu.Unlock()
 373  
 374  	streamID := frame.Header().StreamID
 375  
 376  	// frame.Truncated is set to true when framer detects that the current header
 377  	// list size hits MaxHeaderListSize limit.
 378  	if frame.Truncated {
 379  		t.controlBuf.put(&cleanupStream{
 380  			streamID: streamID,
 381  			rst:      true,
 382  			rstCode:  http2.ErrCodeFrameSize,
 383  			onWrite:  func() {},
 384  		})
 385  		return nil
 386  	}
 387  
 388  	if streamID%2 != 1 || streamID <= t.maxStreamID {
 389  		// illegal gRPC stream id.
 390  		return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
 391  	}
 392  	t.maxStreamID = streamID
 393  
 394  	s := &ServerStream{
 395  		Stream: Stream{
 396  			id: streamID,
 397  			fc: inFlow{limit: uint32(t.initialWindowSize)},
 398  		},
 399  		st:               t,
 400  		headerWireLength: int(frame.Header().Length),
 401  	}
 402  	s.Stream.buf.init()
 403  	var (
 404  		// if false, content-type was missing or invalid
 405  		isGRPC      = false
 406  		contentType = ""
 407  		mdata       = make(metadata.MD, len(frame.Fields))
 408  		httpMethod  string
 409  		// these are set if an error is encountered while parsing the headers
 410  		protocolError bool
 411  		headerError   *status.Status
 412  
 413  		timeoutSet bool
 414  		timeout    time.Duration
 415  	)
 416  
 417  	for _, hf := range frame.Fields {
 418  		switch hf.Name {
 419  		case "content-type":
 420  			contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
 421  			if !validContentType {
 422  				contentType = hf.Value
 423  				break
 424  			}
 425  			mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
 426  			s.contentSubtype = contentSubtype
 427  			isGRPC = true
 428  
 429  		case "grpc-accept-encoding":
 430  			mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
 431  			if hf.Value == "" {
 432  				continue
 433  			}
 434  			compressors := hf.Value
 435  			if s.clientAdvertisedCompressors != "" {
 436  				compressors = s.clientAdvertisedCompressors + "," + compressors
 437  			}
 438  			s.clientAdvertisedCompressors = compressors
 439  		case "grpc-encoding":
 440  			s.recvCompress = hf.Value
 441  		case ":method":
 442  			httpMethod = hf.Value
 443  		case ":path":
 444  			s.method = hf.Value
 445  		case "grpc-timeout":
 446  			timeoutSet = true
 447  			var err error
 448  			if timeout, err = decodeTimeout(hf.Value); err != nil {
 449  				headerError = status.Newf(codes.Internal, "malformed grpc-timeout: %v", err)
 450  			}
 451  		// "Transports must consider requests containing the Connection header
 452  		// as malformed." - A41
 453  		case "connection":
 454  			if t.logger.V(logLevel) {
 455  				t.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
 456  			}
 457  			protocolError = true
 458  		default:
 459  			if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
 460  				break
 461  			}
 462  			v, err := decodeMetadataHeader(hf.Name, hf.Value)
 463  			if err != nil {
 464  				headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
 465  				t.logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
 466  				break
 467  			}
 468  			mdata[hf.Name] = append(mdata[hf.Name], v)
 469  		}
 470  	}
 471  
 472  	// "If multiple Host headers or multiple :authority headers are present, the
 473  	// request must be rejected with an HTTP status code 400 as required by Host
 474  	// validation in RFC 7230 ยง5.4, gRPC status code INTERNAL, or RST_STREAM
 475  	// with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
 476  	// error, this takes precedence over a client not speaking gRPC.
 477  	if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
 478  		errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
 479  		if t.logger.V(logLevel) {
 480  			t.logger.Infof("Aborting the stream early: %v", errMsg)
 481  		}
 482  		t.controlBuf.put(&earlyAbortStream{
 483  			httpStatus:     http.StatusBadRequest,
 484  			streamID:       streamID,
 485  			contentSubtype: s.contentSubtype,
 486  			status:         status.New(codes.Internal, errMsg),
 487  			rst:            !frame.StreamEnded(),
 488  		})
 489  		return nil
 490  	}
 491  
 492  	if protocolError {
 493  		t.controlBuf.put(&cleanupStream{
 494  			streamID: streamID,
 495  			rst:      true,
 496  			rstCode:  http2.ErrCodeProtocol,
 497  			onWrite:  func() {},
 498  		})
 499  		return nil
 500  	}
 501  	if !isGRPC {
 502  		t.controlBuf.put(&earlyAbortStream{
 503  			httpStatus:     http.StatusUnsupportedMediaType,
 504  			streamID:       streamID,
 505  			contentSubtype: s.contentSubtype,
 506  			status:         status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
 507  			rst:            !frame.StreamEnded(),
 508  		})
 509  		return nil
 510  	}
 511  	if headerError != nil {
 512  		t.controlBuf.put(&earlyAbortStream{
 513  			httpStatus:     http.StatusBadRequest,
 514  			streamID:       streamID,
 515  			contentSubtype: s.contentSubtype,
 516  			status:         headerError,
 517  			rst:            !frame.StreamEnded(),
 518  		})
 519  		return nil
 520  	}
 521  
 522  	// "If :authority is missing, Host must be renamed to :authority." - A41
 523  	if len(mdata[":authority"]) == 0 {
 524  		// No-op if host isn't present, no eventual :authority header is a valid
 525  		// RPC.
 526  		if host, ok := mdata["host"]; ok {
 527  			mdata[":authority"] = host
 528  			delete(mdata, "host")
 529  		}
 530  	} else {
 531  		// "If :authority is present, Host must be discarded" - A41
 532  		delete(mdata, "host")
 533  	}
 534  
 535  	if frame.StreamEnded() {
 536  		// s is just created by the caller. No lock needed.
 537  		s.state = streamReadDone
 538  	}
 539  	if timeoutSet {
 540  		s.ctx, s.cancel = context.WithTimeout(ctx, timeout)
 541  	} else {
 542  		s.ctx, s.cancel = context.WithCancel(ctx)
 543  	}
 544  
 545  	// Attach the received metadata to the context.
 546  	if len(mdata) > 0 {
 547  		s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
 548  	}
 549  	t.mu.Lock()
 550  	if t.state != reachable {
 551  		t.mu.Unlock()
 552  		s.cancel()
 553  		return nil
 554  	}
 555  	if uint32(len(t.activeStreams)) >= t.maxStreams {
 556  		t.mu.Unlock()
 557  		t.controlBuf.put(&cleanupStream{
 558  			streamID: streamID,
 559  			rst:      true,
 560  			rstCode:  http2.ErrCodeRefusedStream,
 561  			onWrite:  func() {},
 562  		})
 563  		s.cancel()
 564  		return nil
 565  	}
 566  	if httpMethod != http.MethodPost {
 567  		t.mu.Unlock()
 568  		errMsg := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", httpMethod)
 569  		if t.logger.V(logLevel) {
 570  			t.logger.Infof("Aborting the stream early: %v", errMsg)
 571  		}
 572  		t.controlBuf.put(&earlyAbortStream{
 573  			httpStatus:     http.StatusMethodNotAllowed,
 574  			streamID:       streamID,
 575  			contentSubtype: s.contentSubtype,
 576  			status:         status.New(codes.Internal, errMsg),
 577  			rst:            !frame.StreamEnded(),
 578  		})
 579  		s.cancel()
 580  		return nil
 581  	}
 582  	if t.inTapHandle != nil {
 583  		var err error
 584  		if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method, Header: mdata}); err != nil {
 585  			t.mu.Unlock()
 586  			if t.logger.V(logLevel) {
 587  				t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
 588  			}
 589  			stat, ok := status.FromError(err)
 590  			if !ok {
 591  				stat = status.New(codes.PermissionDenied, err.Error())
 592  			}
 593  			t.controlBuf.put(&earlyAbortStream{
 594  				httpStatus:     http.StatusOK,
 595  				streamID:       s.id,
 596  				contentSubtype: s.contentSubtype,
 597  				status:         stat,
 598  				rst:            !frame.StreamEnded(),
 599  			})
 600  			return nil
 601  		}
 602  	}
 603  
 604  	if s.ctx.Err() != nil {
 605  		t.mu.Unlock()
 606  		// Early abort in case the timeout was zero or so low it already fired.
 607  		t.controlBuf.put(&earlyAbortStream{
 608  			httpStatus:     http.StatusOK,
 609  			streamID:       s.id,
 610  			contentSubtype: s.contentSubtype,
 611  			status:         status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()),
 612  			rst:            !frame.StreamEnded(),
 613  		})
 614  		return nil
 615  	}
 616  
 617  	t.activeStreams[streamID] = s
 618  	if len(t.activeStreams) == 1 {
 619  		t.idle = time.Time{}
 620  	}
 621  
 622  	// Start a timer to close the stream on reaching the deadline.
 623  	if timeoutSet {
 624  		// We need to wait for s.cancel to be updated before calling
 625  		// t.closeStream to avoid data races.
 626  		cancelUpdated := make(chan struct{})
 627  		timer := internal.TimeAfterFunc(timeout, func() {
 628  			<-cancelUpdated
 629  			t.closeStream(s, true, http2.ErrCodeCancel, false)
 630  		})
 631  		oldCancel := s.cancel
 632  		s.cancel = func() {
 633  			oldCancel()
 634  			timer.Stop()
 635  		}
 636  		close(cancelUpdated)
 637  	}
 638  	t.mu.Unlock()
 639  	if channelz.IsOn() {
 640  		t.channelz.SocketMetrics.StreamsStarted.Add(1)
 641  		t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
 642  	}
 643  	s.readRequester = s
 644  	s.ctxDone = s.ctx.Done()
 645  	s.Stream.wq.init(defaultWriteQuota, s.ctxDone)
 646  	s.trReader = transportReader{
 647  		reader: recvBufferReader{
 648  			ctx:     s.ctx,
 649  			ctxDone: s.ctxDone,
 650  			recv:    &s.buf,
 651  		},
 652  		windowHandler: s,
 653  	}
 654  	// Register the stream with loopy.
 655  	t.controlBuf.put(&registerStream{
 656  		streamID: s.id,
 657  		wq:       &s.wq,
 658  	})
 659  	handle(s)
 660  	return nil
 661  }
 662  
 663  // HandleStreams receives incoming streams using the given handler. This is
 664  // typically run in a separate goroutine.
 665  // traceCtx attaches trace to ctx and returns the new context.
 666  func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStream)) {
 667  	defer func() {
 668  		close(t.readerDone)
 669  		<-t.loopyWriterDone
 670  	}()
 671  	for {
 672  		t.controlBuf.throttle()
 673  		frame, err := t.framer.readFrame()
 674  		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
 675  		if err != nil {
 676  			if se, ok := err.(http2.StreamError); ok {
 677  				if t.logger.V(logLevel) {
 678  					t.logger.Warningf("Encountered http2.StreamError: %v", se)
 679  				}
 680  				t.mu.Lock()
 681  				s := t.activeStreams[se.StreamID]
 682  				t.mu.Unlock()
 683  				if s != nil {
 684  					t.closeStream(s, true, se.Code, false)
 685  				} else {
 686  					t.controlBuf.put(&cleanupStream{
 687  						streamID: se.StreamID,
 688  						rst:      true,
 689  						rstCode:  se.Code,
 690  						onWrite:  func() {},
 691  					})
 692  				}
 693  				continue
 694  			}
 695  			t.Close(err)
 696  			return
 697  		}
 698  		switch frame := frame.(type) {
 699  		case *http2.MetaHeadersFrame:
 700  			if err := t.operateHeaders(ctx, frame, handle); err != nil {
 701  				// Any error processing client headers, e.g. invalid stream ID,
 702  				// is considered a protocol violation.
 703  				t.controlBuf.put(&goAway{
 704  					code:      http2.ErrCodeProtocol,
 705  					debugData: []byte(err.Error()),
 706  					closeConn: err,
 707  				})
 708  				continue
 709  			}
 710  		case *parsedDataFrame:
 711  			t.handleData(frame)
 712  			frame.data.Free()
 713  		case *http2.RSTStreamFrame:
 714  			t.handleRSTStream(frame)
 715  		case *http2.SettingsFrame:
 716  			t.handleSettings(frame)
 717  		case *http2.PingFrame:
 718  			t.handlePing(frame)
 719  		case *http2.WindowUpdateFrame:
 720  			t.handleWindowUpdate(frame)
 721  		case *http2.GoAwayFrame:
 722  			// TODO: Handle GoAway from the client appropriately.
 723  		default:
 724  			if t.logger.V(logLevel) {
 725  				t.logger.Infof("Received unsupported frame type %T", frame)
 726  			}
 727  		}
 728  	}
 729  }
 730  
 731  func (t *http2Server) getStream(f http2.Frame) (*ServerStream, bool) {
 732  	t.mu.Lock()
 733  	defer t.mu.Unlock()
 734  	if t.activeStreams == nil {
 735  		// The transport is closing.
 736  		return nil, false
 737  	}
 738  	s, ok := t.activeStreams[f.Header().StreamID]
 739  	if !ok {
 740  		// The stream is already done.
 741  		return nil, false
 742  	}
 743  	return s, true
 744  }
 745  
 746  // adjustWindow sends out extra window update over the initial window size
 747  // of stream if the application is requesting data larger in size than
 748  // the window.
 749  func (t *http2Server) adjustWindow(s *ServerStream, n uint32) {
 750  	if w := s.fc.maybeAdjust(n); w > 0 {
 751  		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
 752  	}
 753  
 754  }
 755  
 756  // updateWindow adjusts the inbound quota for the stream and the transport.
 757  // Window updates will deliver to the controller for sending when
 758  // the cumulative quota exceeds the corresponding threshold.
 759  func (t *http2Server) updateWindow(s *ServerStream, n uint32) {
 760  	if w := s.fc.onRead(n); w > 0 {
 761  		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
 762  			increment: w,
 763  		})
 764  	}
 765  }
 766  
 767  // updateFlowControl updates the incoming flow control windows
 768  // for the transport and the stream based on the current bdp
 769  // estimation.
 770  func (t *http2Server) updateFlowControl(n uint32) {
 771  	t.mu.Lock()
 772  	for _, s := range t.activeStreams {
 773  		s.fc.newLimit(n)
 774  	}
 775  	t.initialWindowSize = int32(n)
 776  	t.mu.Unlock()
 777  	t.controlBuf.put(&outgoingWindowUpdate{
 778  		streamID:  0,
 779  		increment: t.fc.newLimit(n),
 780  	})
 781  	t.controlBuf.put(&outgoingSettings{
 782  		ss: []http2.Setting{
 783  			{
 784  				ID:  http2.SettingInitialWindowSize,
 785  				Val: n,
 786  			},
 787  		},
 788  	})
 789  
 790  }
 791  
 792  func (t *http2Server) handleData(f *parsedDataFrame) {
 793  	size := f.Header().Length
 794  	var sendBDPPing bool
 795  	if t.bdpEst != nil {
 796  		sendBDPPing = t.bdpEst.add(size)
 797  	}
 798  	// Decouple connection's flow control from application's read.
 799  	// An update on connection's flow control should not depend on
 800  	// whether user application has read the data or not. Such a
 801  	// restriction is already imposed on the stream's flow control,
 802  	// and therefore the sender will be blocked anyways.
 803  	// Decoupling the connection flow control will prevent other
 804  	// active(fast) streams from starving in presence of slow or
 805  	// inactive streams.
 806  	if w := t.fc.onData(size); w > 0 {
 807  		t.controlBuf.put(&outgoingWindowUpdate{
 808  			streamID:  0,
 809  			increment: w,
 810  		})
 811  	}
 812  	if sendBDPPing {
 813  		// Avoid excessive ping detection (e.g. in an L7 proxy)
 814  		// by sending a window update prior to the BDP ping.
 815  		if w := t.fc.reset(); w > 0 {
 816  			t.controlBuf.put(&outgoingWindowUpdate{
 817  				streamID:  0,
 818  				increment: w,
 819  			})
 820  		}
 821  		t.controlBuf.put(bdpPing)
 822  	}
 823  	// Select the right stream to dispatch.
 824  	s, ok := t.getStream(f)
 825  	if !ok {
 826  		return
 827  	}
 828  	if s.getState() == streamReadDone {
 829  		t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
 830  		return
 831  	}
 832  	if size > 0 {
 833  		if err := s.fc.onData(size); err != nil {
 834  			t.closeStream(s, true, http2.ErrCodeFlowControl, false)
 835  			return
 836  		}
 837  		dataLen := f.data.Len()
 838  		if f.Header().Flags.Has(http2.FlagDataPadded) {
 839  			if w := s.fc.onRead(size - uint32(dataLen)); w > 0 {
 840  				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
 841  			}
 842  		}
 843  		if dataLen > 0 {
 844  			f.data.Ref()
 845  			s.write(recvMsg{buffer: f.data})
 846  		}
 847  	}
 848  	if f.StreamEnded() {
 849  		// Received the end of stream from the client.
 850  		s.compareAndSwapState(streamActive, streamReadDone)
 851  		s.write(recvMsg{err: io.EOF})
 852  	}
 853  }
 854  
 855  func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
 856  	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
 857  	if s, ok := t.getStream(f); ok {
 858  		t.closeStream(s, false, 0, false)
 859  		return
 860  	}
 861  	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
 862  	t.controlBuf.put(&cleanupStream{
 863  		streamID: f.Header().StreamID,
 864  		rst:      false,
 865  		rstCode:  0,
 866  		onWrite:  func() {},
 867  	})
 868  }
 869  
 870  func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
 871  	if f.IsAck() {
 872  		return
 873  	}
 874  	var ss []http2.Setting
 875  	var updateFuncs []func()
 876  	f.ForeachSetting(func(s http2.Setting) error {
 877  		switch s.ID {
 878  		case http2.SettingMaxHeaderListSize:
 879  			updateFuncs = append(updateFuncs, func() {
 880  				t.maxSendHeaderListSize = new(uint32)
 881  				*t.maxSendHeaderListSize = s.Val
 882  			})
 883  		default:
 884  			ss = append(ss, s)
 885  		}
 886  		return nil
 887  	})
 888  	t.controlBuf.executeAndPut(func() bool {
 889  		for _, f := range updateFuncs {
 890  			f()
 891  		}
 892  		return true
 893  	}, &incomingSettings{
 894  		ss: ss,
 895  	})
 896  }
 897  
 898  const (
 899  	maxPingStrikes     = 2
 900  	defaultPingTimeout = 2 * time.Hour
 901  )
 902  
 903  func (t *http2Server) handlePing(f *http2.PingFrame) {
 904  	if f.IsAck() {
 905  		if f.Data == goAwayPing.data && t.drainEvent != nil {
 906  			t.drainEvent.Fire()
 907  			return
 908  		}
 909  		// Maybe it's a BDP ping.
 910  		if t.bdpEst != nil {
 911  			t.bdpEst.calculate(f.Data)
 912  		}
 913  		return
 914  	}
 915  	pingAck := &ping{ack: true}
 916  	copy(pingAck.data[:], f.Data[:])
 917  	t.controlBuf.put(pingAck)
 918  
 919  	now := time.Now()
 920  	defer func() {
 921  		t.lastPingAt = now
 922  	}()
 923  	// A reset ping strikes means that we don't need to check for policy
 924  	// violation for this ping and the pingStrikes counter should be set
 925  	// to 0.
 926  	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
 927  		t.pingStrikes = 0
 928  		return
 929  	}
 930  	t.mu.Lock()
 931  	ns := len(t.activeStreams)
 932  	t.mu.Unlock()
 933  	if ns < 1 && !t.kep.PermitWithoutStream {
 934  		// Keepalive shouldn't be active thus, this new ping should
 935  		// have come after at least defaultPingTimeout.
 936  		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
 937  			t.pingStrikes++
 938  		}
 939  	} else {
 940  		// Check if keepalive policy is respected.
 941  		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
 942  			t.pingStrikes++
 943  		}
 944  	}
 945  
 946  	if t.pingStrikes > maxPingStrikes {
 947  		// Send goaway and close the connection.
 948  		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
 949  	}
 950  }
 951  
 952  func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
 953  	t.controlBuf.put(&incomingWindowUpdate{
 954  		streamID:  f.Header().StreamID,
 955  		increment: f.Increment,
 956  	})
 957  }
 958  
 959  func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
 960  	for k, vv := range md {
 961  		if isReservedHeader(k) {
 962  			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
 963  			continue
 964  		}
 965  		for _, v := range vv {
 966  			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
 967  		}
 968  	}
 969  	return headerFields
 970  }
 971  
 972  func (t *http2Server) checkForHeaderListSize(it any) bool {
 973  	if t.maxSendHeaderListSize == nil {
 974  		return true
 975  	}
 976  	hdrFrame := it.(*headerFrame)
 977  	var sz int64
 978  	for _, f := range hdrFrame.hf {
 979  		if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
 980  			if t.logger.V(logLevel) {
 981  				t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
 982  			}
 983  			return false
 984  		}
 985  	}
 986  	return true
 987  }
 988  
 989  func (t *http2Server) streamContextErr(s *ServerStream) error {
 990  	select {
 991  	case <-t.done:
 992  		return ErrConnClosing
 993  	default:
 994  	}
 995  	return ContextErr(s.ctx.Err())
 996  }
 997  
 998  // WriteHeader sends the header metadata md back to the client.
 999  func (t *http2Server) writeHeader(s *ServerStream, md metadata.MD) error {
1000  	s.hdrMu.Lock()
1001  	defer s.hdrMu.Unlock()
1002  	if s.getState() == streamDone {
1003  		return t.streamContextErr(s)
1004  	}
1005  
1006  	if s.updateHeaderSent() {
1007  		return ErrIllegalHeaderWrite
1008  	}
1009  
1010  	if md.Len() > 0 {
1011  		if s.header.Len() > 0 {
1012  			s.header = metadata.Join(s.header, md)
1013  		} else {
1014  			s.header = md
1015  		}
1016  	}
1017  	if err := t.writeHeaderLocked(s); err != nil {
1018  		switch e := err.(type) {
1019  		case ConnectionError:
1020  			return status.Error(codes.Unavailable, e.Desc)
1021  		default:
1022  			return status.Convert(err).Err()
1023  		}
1024  	}
1025  	return nil
1026  }
1027  
1028  func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
1029  	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
1030  	// first and create a slice of that exact size.
1031  	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
1032  	headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
1033  	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
1034  	if s.sendCompress != "" {
1035  		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
1036  	}
1037  	headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
1038  	hf := &headerFrame{
1039  		streamID:  s.id,
1040  		hf:        headerFields,
1041  		endStream: false,
1042  		onWrite:   t.setResetPingStrikes,
1043  	}
1044  	success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf)
1045  	if !success {
1046  		if err != nil {
1047  			return err
1048  		}
1049  		t.closeStream(s, true, http2.ErrCodeInternal, false)
1050  		return ErrHeaderListSizeLimitViolation
1051  	}
1052  	if t.stats != nil {
1053  		// Note: Headers are compressed with hpack after this call returns.
1054  		// No WireLength field is set here.
1055  		t.stats.HandleRPC(s.Context(), &stats.OutHeader{
1056  			Header:      s.header.Copy(),
1057  			Compression: s.sendCompress,
1058  		})
1059  	}
1060  	return nil
1061  }
1062  
1063  // writeStatus sends stream status to the client and terminates the stream.
1064  // There is no further I/O operations being able to perform on this stream.
1065  // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
1066  // OK is adopted.
1067  func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
1068  	s.hdrMu.Lock()
1069  	defer s.hdrMu.Unlock()
1070  
1071  	if s.getState() == streamDone {
1072  		return nil
1073  	}
1074  
1075  	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
1076  	// first and create a slice of that exact size.
1077  	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
1078  	if !s.updateHeaderSent() {                      // No headers have been sent.
1079  		if len(s.header) > 0 { // Send a separate header frame.
1080  			if err := t.writeHeaderLocked(s); err != nil {
1081  				return err
1082  			}
1083  		} else { // Send a trailer only response.
1084  			headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
1085  			headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
1086  		}
1087  	}
1088  	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
1089  	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
1090  
1091  	if p := istatus.RawStatusProto(st); len(p.GetDetails()) > 0 {
1092  		// Do not use the user's grpc-status-details-bin (if present) if we are
1093  		// even attempting to set our own.
1094  		delete(s.trailer, grpcStatusDetailsBinHeader)
1095  		stBytes, err := proto.Marshal(p)
1096  		if err != nil {
1097  			// TODO: return error instead, when callers are able to handle it.
1098  			t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
1099  		} else {
1100  			headerFields = append(headerFields, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)})
1101  		}
1102  	}
1103  
1104  	// Attach the trailer metadata.
1105  	headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
1106  	trailingHeader := &headerFrame{
1107  		streamID:  s.id,
1108  		hf:        headerFields,
1109  		endStream: true,
1110  		onWrite:   t.setResetPingStrikes,
1111  	}
1112  
1113  	success, err := t.controlBuf.executeAndPut(func() bool {
1114  		return t.checkForHeaderListSize(trailingHeader)
1115  	}, nil)
1116  	if !success {
1117  		if err != nil {
1118  			return err
1119  		}
1120  		t.closeStream(s, true, http2.ErrCodeInternal, false)
1121  		return ErrHeaderListSizeLimitViolation
1122  	}
1123  	// Send a RST_STREAM after the trailers if the client has not already half-closed.
1124  	rst := s.getState() == streamActive
1125  	t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
1126  	if t.stats != nil {
1127  		// Note: The trailer fields are compressed with hpack after this call returns.
1128  		// No WireLength field is set here.
1129  		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
1130  			Trailer: s.trailer.Copy(),
1131  		})
1132  	}
1133  	return nil
1134  }
1135  
1136  // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
1137  // is returns if it fails (e.g., framing error, transport error).
1138  func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
1139  	if !s.isHeaderSent() { // Headers haven't been written yet.
1140  		if err := t.writeHeader(s, nil); err != nil {
1141  			return err
1142  		}
1143  	} else {
1144  		// Writing headers checks for this condition.
1145  		if s.getState() == streamDone {
1146  			return t.streamContextErr(s)
1147  		}
1148  	}
1149  
1150  	df := &dataFrame{
1151  		streamID:    s.id,
1152  		h:           hdr,
1153  		data:        data,
1154  		onEachWrite: t.setResetPingStrikes,
1155  	}
1156  	dataLen := data.Len()
1157  	if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
1158  		return t.streamContextErr(s)
1159  	}
1160  	data.Ref()
1161  	if err := t.controlBuf.put(df); err != nil {
1162  		data.Free()
1163  		return err
1164  	}
1165  	t.incrMsgSent()
1166  	return nil
1167  }
1168  
1169  // keepalive running in a separate goroutine does the following:
1170  // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
1171  // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
1172  // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
1173  // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
1174  // after an additional duration of keepalive.Timeout.
1175  func (t *http2Server) keepalive() {
1176  	p := &ping{}
1177  	// True iff a ping has been sent, and no data has been received since then.
1178  	outstandingPing := false
1179  	// Amount of time remaining before which we should receive an ACK for the
1180  	// last sent ping.
1181  	kpTimeoutLeft := time.Duration(0)
1182  	// Records the last value of t.lastRead before we go block on the timer.
1183  	// This is required to check for read activity since then.
1184  	prevNano := time.Now().UnixNano()
1185  	// Initialize the different timers to their default values.
1186  	idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
1187  	ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
1188  	kpTimer := time.NewTimer(t.kp.Time)
1189  	defer func() {
1190  		// We need to drain the underlying channel in these timers after a call
1191  		// to Stop(), only if we are interested in resetting them. Clearly we
1192  		// are not interested in resetting them here.
1193  		idleTimer.Stop()
1194  		ageTimer.Stop()
1195  		kpTimer.Stop()
1196  	}()
1197  
1198  	for {
1199  		select {
1200  		case <-idleTimer.C:
1201  			t.mu.Lock()
1202  			idle := t.idle
1203  			if idle.IsZero() { // The connection is non-idle.
1204  				t.mu.Unlock()
1205  				idleTimer.Reset(t.kp.MaxConnectionIdle)
1206  				continue
1207  			}
1208  			val := t.kp.MaxConnectionIdle - time.Since(idle)
1209  			t.mu.Unlock()
1210  			if val <= 0 {
1211  				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
1212  				// Gracefully close the connection.
1213  				t.Drain("max_idle")
1214  				return
1215  			}
1216  			idleTimer.Reset(val)
1217  		case <-ageTimer.C:
1218  			t.Drain("max_age")
1219  			ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
1220  			select {
1221  			case <-ageTimer.C:
1222  				// Close the connection after grace period.
1223  				if t.logger.V(logLevel) {
1224  					t.logger.Infof("Closing server transport due to maximum connection age")
1225  				}
1226  				t.controlBuf.put(closeConnection{})
1227  			case <-t.done:
1228  			}
1229  			return
1230  		case <-kpTimer.C:
1231  			lastRead := atomic.LoadInt64(&t.lastRead)
1232  			if lastRead > prevNano {
1233  				// There has been read activity since the last time we were
1234  				// here. Setup the timer to fire at kp.Time seconds from
1235  				// lastRead time and continue.
1236  				outstandingPing = false
1237  				kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1238  				prevNano = lastRead
1239  				continue
1240  			}
1241  			if outstandingPing && kpTimeoutLeft <= 0 {
1242  				t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Timeout))
1243  				return
1244  			}
1245  			if !outstandingPing {
1246  				if channelz.IsOn() {
1247  					t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
1248  				}
1249  				t.controlBuf.put(p)
1250  				kpTimeoutLeft = t.kp.Timeout
1251  				outstandingPing = true
1252  			}
1253  			// The amount of time to sleep here is the minimum of kp.Time and
1254  			// timeoutLeft. This will ensure that we wait only for kp.Time
1255  			// before sending out the next ping (for cases where the ping is
1256  			// acked).
1257  			sleepDuration := min(t.kp.Time, kpTimeoutLeft)
1258  			kpTimeoutLeft -= sleepDuration
1259  			kpTimer.Reset(sleepDuration)
1260  		case <-t.done:
1261  			return
1262  		}
1263  	}
1264  }
1265  
1266  // Close starts shutting down the http2Server transport.
1267  // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1268  // could cause some resource issue. Revisit this later.
1269  func (t *http2Server) Close(err error) {
1270  	t.mu.Lock()
1271  	if t.state == closing {
1272  		t.mu.Unlock()
1273  		return
1274  	}
1275  	if t.logger.V(logLevel) {
1276  		t.logger.Infof("Closing: %v", err)
1277  	}
1278  	t.state = closing
1279  	streams := t.activeStreams
1280  	t.activeStreams = nil
1281  	t.mu.Unlock()
1282  	t.controlBuf.finish()
1283  	close(t.done)
1284  	if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
1285  		t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
1286  	}
1287  	channelz.RemoveEntry(t.channelz.ID)
1288  	// Cancel all active streams.
1289  	for _, s := range streams {
1290  		s.cancel()
1291  	}
1292  }
1293  
1294  // deleteStream deletes the stream s from transport's active streams.
1295  func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
1296  	t.mu.Lock()
1297  	_, isActive := t.activeStreams[s.id]
1298  	if isActive {
1299  		delete(t.activeStreams, s.id)
1300  		if len(t.activeStreams) == 0 {
1301  			t.idle = time.Now()
1302  		}
1303  	}
1304  	t.mu.Unlock()
1305  
1306  	if isActive && channelz.IsOn() {
1307  		if eosReceived {
1308  			t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
1309  		} else {
1310  			t.channelz.SocketMetrics.StreamsFailed.Add(1)
1311  		}
1312  	}
1313  }
1314  
1315  // finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1316  func (t *http2Server) finishStream(s *ServerStream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1317  	// In case stream sending and receiving are invoked in separate
1318  	// goroutines (e.g., bi-directional streaming), cancel needs to be
1319  	// called to interrupt the potential blocking on other goroutines.
1320  	s.cancel()
1321  
1322  	oldState := s.swapState(streamDone)
1323  	if oldState == streamDone {
1324  		// If the stream was already done, return.
1325  		return
1326  	}
1327  
1328  	hdr.cleanup = &cleanupStream{
1329  		streamID: s.id,
1330  		rst:      rst,
1331  		rstCode:  rstCode,
1332  		onWrite: func() {
1333  			t.deleteStream(s, eosReceived)
1334  		},
1335  	}
1336  	t.controlBuf.put(hdr)
1337  }
1338  
1339  // closeStream clears the footprint of a stream when the stream is not needed any more.
1340  func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1341  	// In case stream sending and receiving are invoked in separate
1342  	// goroutines (e.g., bi-directional streaming), cancel needs to be
1343  	// called to interrupt the potential blocking on other goroutines.
1344  	s.cancel()
1345  
1346  	// We can't return early even if the stream's state is "done" as the state
1347  	// might have been set by the `finishStream` method. Deleting the stream via
1348  	// `finishStream` can get blocked on flow control.
1349  	s.swapState(streamDone)
1350  	t.deleteStream(s, eosReceived)
1351  
1352  	t.controlBuf.put(&cleanupStream{
1353  		streamID: s.id,
1354  		rst:      rst,
1355  		rstCode:  rstCode,
1356  		onWrite:  func() {},
1357  	})
1358  }
1359  
1360  func (t *http2Server) Drain(debugData string) {
1361  	t.mu.Lock()
1362  	defer t.mu.Unlock()
1363  	if t.drainEvent != nil {
1364  		return
1365  	}
1366  	t.drainEvent = grpcsync.NewEvent()
1367  	t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
1368  }
1369  
1370  var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1371  
1372  // Handles outgoing GoAway and returns true if loopy needs to put itself
1373  // in draining mode.
1374  func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1375  	t.maxStreamMu.Lock()
1376  	t.mu.Lock()
1377  	if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1378  		t.mu.Unlock()
1379  		t.maxStreamMu.Unlock()
1380  		// The transport is closing.
1381  		return false, ErrConnClosing
1382  	}
1383  	if !g.headsUp {
1384  		// Stop accepting more streams now.
1385  		t.state = draining
1386  		sid := t.maxStreamID
1387  		retErr := g.closeConn
1388  		if len(t.activeStreams) == 0 {
1389  			retErr = errors.New("second GOAWAY written and no active streams left to process")
1390  		}
1391  		t.mu.Unlock()
1392  		t.maxStreamMu.Unlock()
1393  		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1394  			return false, err
1395  		}
1396  		t.framer.writer.Flush()
1397  		if retErr != nil {
1398  			return false, retErr
1399  		}
1400  		return true, nil
1401  	}
1402  	t.mu.Unlock()
1403  	t.maxStreamMu.Unlock()
1404  	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1405  	// Follow that with a ping and wait for the ack to come back or a timer
1406  	// to expire. During this time accept new streams since they might have
1407  	// originated before the GoAway reaches the client.
1408  	// After getting the ack or timer expiration send out another GoAway this
1409  	// time with an ID of the max stream server intends to process.
1410  	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
1411  		return false, err
1412  	}
1413  	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1414  		return false, err
1415  	}
1416  	go func() {
1417  		timer := time.NewTimer(5 * time.Second)
1418  		defer timer.Stop()
1419  		select {
1420  		case <-t.drainEvent.Done():
1421  		case <-timer.C:
1422  		case <-t.done:
1423  			return
1424  		}
1425  		t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1426  	}()
1427  	return false, nil
1428  }
1429  
1430  func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics {
1431  	return &channelz.EphemeralSocketMetrics{
1432  		LocalFlowControlWindow:  int64(t.fc.getSize()),
1433  		RemoteFlowControlWindow: t.getOutFlowWindow(),
1434  	}
1435  }
1436  
1437  func (t *http2Server) incrMsgSent() {
1438  	if channelz.IsOn() {
1439  		t.channelz.SocketMetrics.MessagesSent.Add(1)
1440  		t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
1441  	}
1442  }
1443  
1444  func (t *http2Server) incrMsgRecv() {
1445  	if channelz.IsOn() {
1446  		t.channelz.SocketMetrics.MessagesReceived.Add(1)
1447  		t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
1448  	}
1449  }
1450  
1451  func (t *http2Server) getOutFlowWindow() int64 {
1452  	resp := make(chan uint32, 1)
1453  	timer := time.NewTimer(time.Second)
1454  	defer timer.Stop()
1455  	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1456  	select {
1457  	case sz := <-resp:
1458  		return int64(sz)
1459  	case <-t.done:
1460  		return -1
1461  	case <-timer.C:
1462  		return -2
1463  	}
1464  }
1465  
1466  // Peer returns the peer of the transport.
1467  func (t *http2Server) Peer() *peer.Peer {
1468  	return &peer.Peer{
1469  		Addr:      t.peer.Addr,
1470  		LocalAddr: t.peer.LocalAddr,
1471  		AuthInfo:  t.peer.AuthInfo, // Can be nil
1472  	}
1473  }
1474  
1475  func getJitter(v time.Duration) time.Duration {
1476  	if v == infinity {
1477  		return 0
1478  	}
1479  	// Generate a jitter between +/- 10% of the value.
1480  	r := int64(v / 10)
1481  	j := rand.Int64N(2*r) - r
1482  	return time.Duration(j)
1483  }
1484  
1485  type connectionKey struct{}
1486  
1487  // GetConnection gets the connection from the context.
1488  func GetConnection(ctx context.Context) net.Conn {
1489  	conn, _ := ctx.Value(connectionKey{}).(net.Conn)
1490  	return conn
1491  }
1492  
1493  // SetConnection adds the connection to the context to be able to get
1494  // information about the destination ip and port for an incoming RPC. This also
1495  // allows any unary or streaming interceptors to see the connection.
1496  func SetConnection(ctx context.Context, conn net.Conn) context.Context {
1497  	return context.WithValue(ctx, connectionKey{}, conn)
1498  }
1499