controlbuf.go raw

   1  /*
   2   *
   3   * Copyright 2014 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package transport
  20  
  21  import (
  22  	"bytes"
  23  	"errors"
  24  	"fmt"
  25  	"net"
  26  	"runtime"
  27  	"strconv"
  28  	"sync"
  29  	"sync/atomic"
  30  
  31  	"golang.org/x/net/http2"
  32  	"golang.org/x/net/http2/hpack"
  33  	"google.golang.org/grpc/internal/grpclog"
  34  	"google.golang.org/grpc/internal/grpcutil"
  35  	"google.golang.org/grpc/mem"
  36  	"google.golang.org/grpc/status"
  37  )
  38  
  39  var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
  40  	e.SetMaxDynamicTableSizeLimit(v)
  41  }
  42  
  43  // itemNodePool is used to reduce heap allocations.
  44  var itemNodePool = sync.Pool{
  45  	New: func() any {
  46  		return &itemNode{}
  47  	},
  48  }
  49  
  50  type itemNode struct {
  51  	it   any
  52  	next *itemNode
  53  }
  54  
  55  type itemList struct {
  56  	head *itemNode
  57  	tail *itemNode
  58  }
  59  
  60  func (il *itemList) enqueue(i any) {
  61  	n := itemNodePool.Get().(*itemNode)
  62  	n.next = nil
  63  	n.it = i
  64  	if il.tail == nil {
  65  		il.head, il.tail = n, n
  66  		return
  67  	}
  68  	il.tail.next = n
  69  	il.tail = n
  70  }
  71  
  72  // peek returns the first item in the list without removing it from the
  73  // list.
  74  func (il *itemList) peek() any {
  75  	return il.head.it
  76  }
  77  
  78  func (il *itemList) dequeue() any {
  79  	if il.head == nil {
  80  		return nil
  81  	}
  82  	i := il.head.it
  83  	temp := il.head
  84  	il.head = il.head.next
  85  	itemNodePool.Put(temp)
  86  	if il.head == nil {
  87  		il.tail = nil
  88  	}
  89  	return i
  90  }
  91  
  92  func (il *itemList) dequeueAll() *itemNode {
  93  	h := il.head
  94  	il.head, il.tail = nil, nil
  95  	return h
  96  }
  97  
  98  func (il *itemList) isEmpty() bool {
  99  	return il.head == nil
 100  }
 101  
 102  // The following defines various control items which could flow through
 103  // the control buffer of transport. They represent different aspects of
 104  // control tasks, e.g., flow control, settings, streaming resetting, etc.
 105  
 106  // maxQueuedTransportResponseFrames is the most queued "transport response"
 107  // frames we will buffer before preventing new reads from occurring on the
 108  // transport.  These are control frames sent in response to client requests,
 109  // such as RST_STREAM due to bad headers or settings acks.
 110  const maxQueuedTransportResponseFrames = 50
 111  
 112  type cbItem interface {
 113  	isTransportResponseFrame() bool
 114  }
 115  
 116  // registerStream is used to register an incoming stream with loopy writer.
 117  type registerStream struct {
 118  	streamID uint32
 119  	wq       *writeQuota
 120  }
 121  
 122  func (*registerStream) isTransportResponseFrame() bool { return false }
 123  
 124  // headerFrame is also used to register stream on the client-side.
 125  type headerFrame struct {
 126  	streamID   uint32
 127  	hf         []hpack.HeaderField
 128  	endStream  bool               // Valid on server side.
 129  	initStream func(uint32) error // Used only on the client side.
 130  	onWrite    func()
 131  	wq         *writeQuota    // write quota for the stream created.
 132  	cleanup    *cleanupStream // Valid on the server side.
 133  	onOrphaned func(error)    // Valid on client-side
 134  }
 135  
 136  func (h *headerFrame) isTransportResponseFrame() bool {
 137  	return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
 138  }
 139  
 140  type cleanupStream struct {
 141  	streamID uint32
 142  	rst      bool
 143  	rstCode  http2.ErrCode
 144  	onWrite  func()
 145  }
 146  
 147  func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
 148  
 149  type earlyAbortStream struct {
 150  	httpStatus     uint32
 151  	streamID       uint32
 152  	contentSubtype string
 153  	status         *status.Status
 154  	rst            bool
 155  }
 156  
 157  func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
 158  
 159  type dataFrame struct {
 160  	streamID   uint32
 161  	endStream  bool
 162  	h          []byte
 163  	data       mem.BufferSlice
 164  	processing bool
 165  	// onEachWrite is called every time
 166  	// a part of data is written out.
 167  	onEachWrite func()
 168  }
 169  
 170  func (*dataFrame) isTransportResponseFrame() bool { return false }
 171  
 172  type incomingWindowUpdate struct {
 173  	streamID  uint32
 174  	increment uint32
 175  }
 176  
 177  func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
 178  
 179  type outgoingWindowUpdate struct {
 180  	streamID  uint32
 181  	increment uint32
 182  }
 183  
 184  func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
 185  	return false // window updates are throttled by thresholds
 186  }
 187  
 188  type incomingSettings struct {
 189  	ss []http2.Setting
 190  }
 191  
 192  func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
 193  
 194  type outgoingSettings struct {
 195  	ss []http2.Setting
 196  }
 197  
 198  func (*outgoingSettings) isTransportResponseFrame() bool { return false }
 199  
 200  type incomingGoAway struct {
 201  }
 202  
 203  func (*incomingGoAway) isTransportResponseFrame() bool { return false }
 204  
 205  type goAway struct {
 206  	code      http2.ErrCode
 207  	debugData []byte
 208  	headsUp   bool
 209  	closeConn error // if set, loopyWriter will exit with this error
 210  }
 211  
 212  func (*goAway) isTransportResponseFrame() bool { return false }
 213  
 214  type ping struct {
 215  	ack  bool
 216  	data [8]byte
 217  }
 218  
 219  func (*ping) isTransportResponseFrame() bool { return true }
 220  
 221  type outFlowControlSizeRequest struct {
 222  	resp chan uint32
 223  }
 224  
 225  func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
 226  
 227  // closeConnection is an instruction to tell the loopy writer to flush the
 228  // framer and exit, which will cause the transport's connection to be closed
 229  // (by the client or server).  The transport itself will close after the reader
 230  // encounters the EOF caused by the connection closure.
 231  type closeConnection struct{}
 232  
 233  func (closeConnection) isTransportResponseFrame() bool { return false }
 234  
 235  type outStreamState int
 236  
 237  const (
 238  	active outStreamState = iota
 239  	empty
 240  	waitingOnStreamQuota
 241  )
 242  
 243  type outStream struct {
 244  	id               uint32
 245  	state            outStreamState
 246  	itl              *itemList
 247  	bytesOutStanding int
 248  	wq               *writeQuota
 249  	reader           mem.Reader
 250  
 251  	next *outStream
 252  	prev *outStream
 253  }
 254  
 255  func (s *outStream) deleteSelf() {
 256  	if s.prev != nil {
 257  		s.prev.next = s.next
 258  	}
 259  	if s.next != nil {
 260  		s.next.prev = s.prev
 261  	}
 262  	s.next, s.prev = nil, nil
 263  }
 264  
 265  type outStreamList struct {
 266  	// Following are sentinel objects that mark the
 267  	// beginning and end of the list. They do not
 268  	// contain any item lists. All valid objects are
 269  	// inserted in between them.
 270  	// This is needed so that an outStream object can
 271  	// deleteSelf() in O(1) time without knowing which
 272  	// list it belongs to.
 273  	head *outStream
 274  	tail *outStream
 275  }
 276  
 277  func newOutStreamList() *outStreamList {
 278  	head, tail := new(outStream), new(outStream)
 279  	head.next = tail
 280  	tail.prev = head
 281  	return &outStreamList{
 282  		head: head,
 283  		tail: tail,
 284  	}
 285  }
 286  
 287  func (l *outStreamList) enqueue(s *outStream) {
 288  	e := l.tail.prev
 289  	e.next = s
 290  	s.prev = e
 291  	s.next = l.tail
 292  	l.tail.prev = s
 293  }
 294  
 295  // remove from the beginning of the list.
 296  func (l *outStreamList) dequeue() *outStream {
 297  	b := l.head.next
 298  	if b == l.tail {
 299  		return nil
 300  	}
 301  	b.deleteSelf()
 302  	return b
 303  }
 304  
 305  // controlBuffer is a way to pass information to loopy.
 306  //
 307  // Information is passed as specific struct types called control frames. A
 308  // control frame not only represents data, messages or headers to be sent out
 309  // but can also be used to instruct loopy to update its internal state. It
 310  // shouldn't be confused with an HTTP2 frame, although some of the control
 311  // frames like dataFrame and headerFrame do go out on wire as HTTP2 frames.
 312  type controlBuffer struct {
 313  	wakeupCh chan struct{}   // Unblocks readers waiting for something to read.
 314  	done     <-chan struct{} // Closed when the transport is done.
 315  
 316  	// Mutex guards all the fields below, except trfChan which can be read
 317  	// atomically without holding mu.
 318  	mu              sync.Mutex
 319  	consumerWaiting bool      // True when readers are blocked waiting for new data.
 320  	closed          bool      // True when the controlbuf is finished.
 321  	list            *itemList // List of queued control frames.
 322  
 323  	// transportResponseFrames counts the number of queued items that represent
 324  	// the response of an action initiated by the peer.  trfChan is created
 325  	// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
 326  	// closed and nilled when transportResponseFrames drops below the
 327  	// threshold.  Both fields are protected by mu.
 328  	transportResponseFrames int
 329  	trfChan                 atomic.Pointer[chan struct{}]
 330  }
 331  
 332  func newControlBuffer(done <-chan struct{}) *controlBuffer {
 333  	return &controlBuffer{
 334  		wakeupCh: make(chan struct{}, 1),
 335  		list:     &itemList{},
 336  		done:     done,
 337  	}
 338  }
 339  
 340  // throttle blocks if there are too many frames in the control buf that
 341  // represent the response of an action initiated by the peer, like
 342  // incomingSettings cleanupStreams etc.
 343  func (c *controlBuffer) throttle() {
 344  	if ch := c.trfChan.Load(); ch != nil {
 345  		select {
 346  		case <-(*ch):
 347  		case <-c.done:
 348  		}
 349  	}
 350  }
 351  
 352  // put adds an item to the controlbuf.
 353  func (c *controlBuffer) put(it cbItem) error {
 354  	_, err := c.executeAndPut(nil, it)
 355  	return err
 356  }
 357  
 358  // executeAndPut runs f, and if the return value is true, adds the given item to
 359  // the controlbuf. The item could be nil, in which case, this method simply
 360  // executes f and does not add the item to the controlbuf.
 361  //
 362  // The first return value indicates whether the item was successfully added to
 363  // the control buffer. A non-nil error, specifically ErrConnClosing, is returned
 364  // if the control buffer is already closed.
 365  func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
 366  	c.mu.Lock()
 367  	defer c.mu.Unlock()
 368  
 369  	if c.closed {
 370  		return false, ErrConnClosing
 371  	}
 372  	if f != nil {
 373  		if !f() { // f wasn't successful
 374  			return false, nil
 375  		}
 376  	}
 377  	if it == nil {
 378  		return true, nil
 379  	}
 380  
 381  	var wakeUp bool
 382  	if c.consumerWaiting {
 383  		wakeUp = true
 384  		c.consumerWaiting = false
 385  	}
 386  	c.list.enqueue(it)
 387  	if it.isTransportResponseFrame() {
 388  		c.transportResponseFrames++
 389  		if c.transportResponseFrames == maxQueuedTransportResponseFrames {
 390  			// We are adding the frame that puts us over the threshold; create
 391  			// a throttling channel.
 392  			ch := make(chan struct{})
 393  			c.trfChan.Store(&ch)
 394  		}
 395  	}
 396  	if wakeUp {
 397  		select {
 398  		case c.wakeupCh <- struct{}{}:
 399  		default:
 400  		}
 401  	}
 402  	return true, nil
 403  }
 404  
 405  // get returns the next control frame from the control buffer. If block is true
 406  // **and** there are no control frames in the control buffer, the call blocks
 407  // until one of the conditions is met: there is a frame to return or the
 408  // transport is closed.
 409  func (c *controlBuffer) get(block bool) (any, error) {
 410  	for {
 411  		c.mu.Lock()
 412  		frame, err := c.getOnceLocked()
 413  		if frame != nil || err != nil || !block {
 414  			// If we read a frame or an error, we can return to the caller. The
 415  			// call to getOnceLocked() returns a nil frame and a nil error if
 416  			// there is nothing to read, and in that case, if the caller asked
 417  			// us not to block, we can return now as well.
 418  			c.mu.Unlock()
 419  			return frame, err
 420  		}
 421  		c.consumerWaiting = true
 422  		c.mu.Unlock()
 423  
 424  		// Release the lock above and wait to be woken up.
 425  		select {
 426  		case <-c.wakeupCh:
 427  		case <-c.done:
 428  			return nil, errors.New("transport closed by client")
 429  		}
 430  	}
 431  }
 432  
 433  // Callers must not use this method, but should instead use get().
 434  //
 435  // Caller must hold c.mu.
 436  func (c *controlBuffer) getOnceLocked() (any, error) {
 437  	if c.closed {
 438  		return false, ErrConnClosing
 439  	}
 440  	if c.list.isEmpty() {
 441  		return nil, nil
 442  	}
 443  	h := c.list.dequeue().(cbItem)
 444  	if h.isTransportResponseFrame() {
 445  		if c.transportResponseFrames == maxQueuedTransportResponseFrames {
 446  			// We are removing the frame that put us over the
 447  			// threshold; close and clear the throttling channel.
 448  			ch := c.trfChan.Swap(nil)
 449  			close(*ch)
 450  		}
 451  		c.transportResponseFrames--
 452  	}
 453  	return h, nil
 454  }
 455  
 456  // finish closes the control buffer, cleaning up any streams that have queued
 457  // header frames. Once this method returns, no more frames can be added to the
 458  // control buffer, and attempts to do so will return ErrConnClosing.
 459  func (c *controlBuffer) finish() {
 460  	c.mu.Lock()
 461  	defer c.mu.Unlock()
 462  
 463  	if c.closed {
 464  		return
 465  	}
 466  	c.closed = true
 467  	// There may be headers for streams in the control buffer.
 468  	// These streams need to be cleaned out since the transport
 469  	// is still not aware of these yet.
 470  	for head := c.list.dequeueAll(); head != nil; head = head.next {
 471  		switch v := head.it.(type) {
 472  		case *headerFrame:
 473  			if v.onOrphaned != nil { // It will be nil on the server-side.
 474  				v.onOrphaned(ErrConnClosing)
 475  			}
 476  		case *dataFrame:
 477  			if !v.processing {
 478  				v.data.Free()
 479  			}
 480  		}
 481  	}
 482  
 483  	// In case throttle() is currently in flight, it needs to be unblocked.
 484  	// Otherwise, the transport may not close, since the transport is closed by
 485  	// the reader encountering the connection error.
 486  	ch := c.trfChan.Swap(nil)
 487  	if ch != nil {
 488  		close(*ch)
 489  	}
 490  }
 491  
 492  type side int
 493  
 494  const (
 495  	clientSide side = iota
 496  	serverSide
 497  )
 498  
 499  // maxWriteBufSize is the maximum length (number of elements) the cached
 500  // writeBuf can grow to. The length depends on the number of buffers
 501  // contained within the BufferSlice produced by the codec, which is
 502  // generally small.
 503  //
 504  // If a writeBuf larger than this limit is required, it will be allocated
 505  // and freed after use, rather than being cached. This avoids holding
 506  // on to large amounts of memory.
 507  const maxWriteBufSize = 64
 508  
 509  // Loopy receives frames from the control buffer.
 510  // Each frame is handled individually; most of the work done by loopy goes
 511  // into handling data frames. Loopy maintains a queue of active streams, and each
 512  // stream maintains a queue of data frames; as loopy receives data frames
 513  // it gets added to the queue of the relevant stream.
 514  // Loopy goes over this list of active streams by processing one node every iteration,
 515  // thereby closely resembling a round-robin scheduling over all streams. While
 516  // processing a stream, loopy writes out data bytes from this stream capped by the min
 517  // of http2MaxFrameLen, connection-level flow control and stream-level flow control.
 518  type loopyWriter struct {
 519  	side      side
 520  	cbuf      *controlBuffer
 521  	sendQuota uint32
 522  	oiws      uint32 // outbound initial window size.
 523  	// estdStreams is map of all established streams that are not cleaned-up yet.
 524  	// On client-side, this is all streams whose headers were sent out.
 525  	// On server-side, this is all streams whose headers were received.
 526  	estdStreams map[uint32]*outStream // Established streams.
 527  	// activeStreams is a linked-list of all streams that have data to send and some
 528  	// stream-level flow control quota.
 529  	// Each of these streams internally have a list of data items(and perhaps trailers
 530  	// on the server-side) to be sent out.
 531  	activeStreams *outStreamList
 532  	framer        *framer
 533  	hBuf          *bytes.Buffer  // The buffer for HPACK encoding.
 534  	hEnc          *hpack.Encoder // HPACK encoder.
 535  	bdpEst        *bdpEstimator
 536  	draining      bool
 537  	conn          net.Conn
 538  	logger        *grpclog.PrefixLogger
 539  	bufferPool    mem.BufferPool
 540  
 541  	// Side-specific handlers
 542  	ssGoAwayHandler func(*goAway) (bool, error)
 543  
 544  	writeBuf [][]byte // cached slice to avoid heap allocations for calls to mem.Reader.Peek.
 545  }
 546  
 547  func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
 548  	var buf bytes.Buffer
 549  	l := &loopyWriter{
 550  		side:            s,
 551  		cbuf:            cbuf,
 552  		sendQuota:       defaultWindowSize,
 553  		oiws:            defaultWindowSize,
 554  		estdStreams:     make(map[uint32]*outStream),
 555  		activeStreams:   newOutStreamList(),
 556  		framer:          fr,
 557  		hBuf:            &buf,
 558  		hEnc:            hpack.NewEncoder(&buf),
 559  		bdpEst:          bdpEst,
 560  		conn:            conn,
 561  		logger:          logger,
 562  		ssGoAwayHandler: goAwayHandler,
 563  		bufferPool:      bufferPool,
 564  	}
 565  	return l
 566  }
 567  
 568  const minBatchSize = 1000
 569  
 570  // run should be run in a separate goroutine.
 571  // It reads control frames from controlBuf and processes them by:
 572  // 1. Updating loopy's internal state, or/and
 573  // 2. Writing out HTTP2 frames on the wire.
 574  //
 575  // Loopy keeps all active streams with data to send in a linked-list.
 576  // All streams in the activeStreams linked-list must have both:
 577  // 1. Data to send, and
 578  // 2. Stream level flow control quota available.
 579  //
 580  // In each iteration of run loop, other than processing the incoming control
 581  // frame, loopy calls processData, which processes one node from the
 582  // activeStreams linked-list.  This results in writing of HTTP2 frames into an
 583  // underlying write buffer.  When there's no more control frames to read from
 584  // controlBuf, loopy flushes the write buffer.  As an optimization, to increase
 585  // the batch size for each flush, loopy yields the processor, once if the batch
 586  // size is too low to give stream goroutines a chance to fill it up.
 587  //
 588  // Upon exiting, if the error causing the exit is not an I/O error, run()
 589  // flushes the underlying connection.  The connection is always left open to
 590  // allow different closing behavior on the client and server.
 591  func (l *loopyWriter) run() (err error) {
 592  	defer func() {
 593  		if l.logger.V(logLevel) {
 594  			l.logger.Infof("loopyWriter exiting with error: %v", err)
 595  		}
 596  		if !isIOError(err) {
 597  			l.framer.writer.Flush()
 598  		}
 599  		l.cbuf.finish()
 600  	}()
 601  	for {
 602  		it, err := l.cbuf.get(true)
 603  		if err != nil {
 604  			return err
 605  		}
 606  		if err = l.handle(it); err != nil {
 607  			return err
 608  		}
 609  		if _, err = l.processData(); err != nil {
 610  			return err
 611  		}
 612  		gosched := true
 613  	hasdata:
 614  		for {
 615  			it, err := l.cbuf.get(false)
 616  			if err != nil {
 617  				return err
 618  			}
 619  			if it != nil {
 620  				if err = l.handle(it); err != nil {
 621  					return err
 622  				}
 623  				if _, err = l.processData(); err != nil {
 624  					return err
 625  				}
 626  				continue hasdata
 627  			}
 628  			isEmpty, err := l.processData()
 629  			if err != nil {
 630  				return err
 631  			}
 632  			if !isEmpty {
 633  				continue hasdata
 634  			}
 635  			if gosched {
 636  				gosched = false
 637  				if l.framer.writer.offset < minBatchSize {
 638  					runtime.Gosched()
 639  					continue hasdata
 640  				}
 641  			}
 642  			l.framer.writer.Flush()
 643  			break hasdata
 644  		}
 645  	}
 646  }
 647  
 648  func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
 649  	return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
 650  }
 651  
 652  func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
 653  	// Otherwise update the quota.
 654  	if w.streamID == 0 {
 655  		l.sendQuota += w.increment
 656  		return
 657  	}
 658  	// Find the stream and update it.
 659  	if str, ok := l.estdStreams[w.streamID]; ok {
 660  		str.bytesOutStanding -= int(w.increment)
 661  		if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
 662  			str.state = active
 663  			l.activeStreams.enqueue(str)
 664  			return
 665  		}
 666  	}
 667  }
 668  
 669  func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
 670  	return l.framer.fr.WriteSettings(s.ss...)
 671  }
 672  
 673  func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
 674  	l.applySettings(s.ss)
 675  	return l.framer.fr.WriteSettingsAck()
 676  }
 677  
 678  func (l *loopyWriter) registerStreamHandler(h *registerStream) {
 679  	str := &outStream{
 680  		id:    h.streamID,
 681  		state: empty,
 682  		itl:   &itemList{},
 683  		wq:    h.wq,
 684  	}
 685  	l.estdStreams[h.streamID] = str
 686  }
 687  
 688  func (l *loopyWriter) headerHandler(h *headerFrame) error {
 689  	if l.side == serverSide {
 690  		str, ok := l.estdStreams[h.streamID]
 691  		if !ok {
 692  			if l.logger.V(logLevel) {
 693  				l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID)
 694  			}
 695  			return nil
 696  		}
 697  		// Case 1.A: Server is responding back with headers.
 698  		if !h.endStream {
 699  			return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
 700  		}
 701  		// else:  Case 1.B: Server wants to close stream.
 702  
 703  		if str.state != empty { // either active or waiting on stream quota.
 704  			// add it str's list of items.
 705  			str.itl.enqueue(h)
 706  			return nil
 707  		}
 708  		if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
 709  			return err
 710  		}
 711  		return l.cleanupStreamHandler(h.cleanup)
 712  	}
 713  	// Case 2: Client wants to originate stream.
 714  	str := &outStream{
 715  		id:    h.streamID,
 716  		state: empty,
 717  		itl:   &itemList{},
 718  		wq:    h.wq,
 719  	}
 720  	return l.originateStream(str, h)
 721  }
 722  
 723  func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
 724  	// l.draining is set when handling GoAway. In which case, we want to avoid
 725  	// creating new streams.
 726  	if l.draining {
 727  		// TODO: provide a better error with the reason we are in draining.
 728  		hdr.onOrphaned(errStreamDrain)
 729  		return nil
 730  	}
 731  	if err := hdr.initStream(str.id); err != nil {
 732  		return err
 733  	}
 734  	if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
 735  		return err
 736  	}
 737  	l.estdStreams[str.id] = str
 738  	return nil
 739  }
 740  
 741  func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
 742  	if onWrite != nil {
 743  		onWrite()
 744  	}
 745  	l.hBuf.Reset()
 746  	for _, f := range hf {
 747  		if err := l.hEnc.WriteField(f); err != nil {
 748  			if l.logger.V(logLevel) {
 749  				l.logger.Warningf("Encountered error while encoding headers: %v", err)
 750  			}
 751  		}
 752  	}
 753  	var (
 754  		err               error
 755  		endHeaders, first bool
 756  	)
 757  	first = true
 758  	for !endHeaders {
 759  		size := l.hBuf.Len()
 760  		if size > http2MaxFrameLen {
 761  			size = http2MaxFrameLen
 762  		} else {
 763  			endHeaders = true
 764  		}
 765  		if first {
 766  			first = false
 767  			err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
 768  				StreamID:      streamID,
 769  				BlockFragment: l.hBuf.Next(size),
 770  				EndStream:     endStream,
 771  				EndHeaders:    endHeaders,
 772  			})
 773  		} else {
 774  			err = l.framer.fr.WriteContinuation(
 775  				streamID,
 776  				endHeaders,
 777  				l.hBuf.Next(size),
 778  			)
 779  		}
 780  		if err != nil {
 781  			return err
 782  		}
 783  	}
 784  	return nil
 785  }
 786  
 787  func (l *loopyWriter) preprocessData(df *dataFrame) {
 788  	str, ok := l.estdStreams[df.streamID]
 789  	if !ok {
 790  		return
 791  	}
 792  	// If we got data for a stream it means that
 793  	// stream was originated and the headers were sent out.
 794  	str.itl.enqueue(df)
 795  	if str.state == empty {
 796  		str.state = active
 797  		l.activeStreams.enqueue(str)
 798  	}
 799  }
 800  
 801  func (l *loopyWriter) pingHandler(p *ping) error {
 802  	if !p.ack {
 803  		l.bdpEst.timesnap(p.data)
 804  	}
 805  	return l.framer.fr.WritePing(p.ack, p.data)
 806  
 807  }
 808  
 809  func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) {
 810  	o.resp <- l.sendQuota
 811  }
 812  
 813  func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
 814  	c.onWrite()
 815  	if str, ok := l.estdStreams[c.streamID]; ok {
 816  		// On the server side it could be a trailers-only response or
 817  		// a RST_STREAM before stream initialization thus the stream might
 818  		// not be established yet.
 819  		delete(l.estdStreams, c.streamID)
 820  		str.reader.Close()
 821  		str.deleteSelf()
 822  		for head := str.itl.dequeueAll(); head != nil; head = head.next {
 823  			if df, ok := head.it.(*dataFrame); ok {
 824  				if !df.processing {
 825  					df.data.Free()
 826  				}
 827  			}
 828  		}
 829  	}
 830  	if c.rst { // If RST_STREAM needs to be sent.
 831  		if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
 832  			return err
 833  		}
 834  	}
 835  	if l.draining && len(l.estdStreams) == 0 {
 836  		// Flush and close the connection; we are done with it.
 837  		return errors.New("finished processing active streams while in draining mode")
 838  	}
 839  	return nil
 840  }
 841  
 842  func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
 843  	if l.side == clientSide {
 844  		return errors.New("earlyAbortStream not handled on client")
 845  	}
 846  	// In case the caller forgets to set the http status, default to 200.
 847  	if eas.httpStatus == 0 {
 848  		eas.httpStatus = 200
 849  	}
 850  	headerFields := []hpack.HeaderField{
 851  		{Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
 852  		{Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
 853  		{Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
 854  		{Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
 855  	}
 856  
 857  	if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
 858  		return err
 859  	}
 860  	if eas.rst {
 861  		if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
 862  			return err
 863  		}
 864  	}
 865  	return nil
 866  }
 867  
 868  func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
 869  	if l.side == clientSide {
 870  		l.draining = true
 871  		if len(l.estdStreams) == 0 {
 872  			// Flush and close the connection; we are done with it.
 873  			return errors.New("received GOAWAY with no active streams")
 874  		}
 875  	}
 876  	return nil
 877  }
 878  
 879  func (l *loopyWriter) goAwayHandler(g *goAway) error {
 880  	// Handling of outgoing GoAway is very specific to side.
 881  	if l.ssGoAwayHandler != nil {
 882  		draining, err := l.ssGoAwayHandler(g)
 883  		if err != nil {
 884  			return err
 885  		}
 886  		l.draining = draining
 887  	}
 888  	return nil
 889  }
 890  
 891  func (l *loopyWriter) handle(i any) error {
 892  	switch i := i.(type) {
 893  	case *incomingWindowUpdate:
 894  		l.incomingWindowUpdateHandler(i)
 895  	case *outgoingWindowUpdate:
 896  		return l.outgoingWindowUpdateHandler(i)
 897  	case *incomingSettings:
 898  		return l.incomingSettingsHandler(i)
 899  	case *outgoingSettings:
 900  		return l.outgoingSettingsHandler(i)
 901  	case *headerFrame:
 902  		return l.headerHandler(i)
 903  	case *registerStream:
 904  		l.registerStreamHandler(i)
 905  	case *cleanupStream:
 906  		return l.cleanupStreamHandler(i)
 907  	case *earlyAbortStream:
 908  		return l.earlyAbortStreamHandler(i)
 909  	case *incomingGoAway:
 910  		return l.incomingGoAwayHandler(i)
 911  	case *dataFrame:
 912  		l.preprocessData(i)
 913  	case *ping:
 914  		return l.pingHandler(i)
 915  	case *goAway:
 916  		return l.goAwayHandler(i)
 917  	case *outFlowControlSizeRequest:
 918  		l.outFlowControlSizeRequestHandler(i)
 919  	case closeConnection:
 920  		// Just return a non-I/O error and run() will flush and close the
 921  		// connection.
 922  		return ErrConnClosing
 923  	default:
 924  		return fmt.Errorf("transport: unknown control message type %T", i)
 925  	}
 926  	return nil
 927  }
 928  
 929  func (l *loopyWriter) applySettings(ss []http2.Setting) {
 930  	for _, s := range ss {
 931  		switch s.ID {
 932  		case http2.SettingInitialWindowSize:
 933  			o := l.oiws
 934  			l.oiws = s.Val
 935  			if o < l.oiws {
 936  				// If the new limit is greater make all depleted streams active.
 937  				for _, stream := range l.estdStreams {
 938  					if stream.state == waitingOnStreamQuota {
 939  						stream.state = active
 940  						l.activeStreams.enqueue(stream)
 941  					}
 942  				}
 943  			}
 944  		case http2.SettingHeaderTableSize:
 945  			updateHeaderTblSize(l.hEnc, s.Val)
 946  		}
 947  	}
 948  }
 949  
 950  // processData removes the first stream from active streams, writes out at most 16KB
 951  // of its data and then puts it at the end of activeStreams if there's still more data
 952  // to be sent and stream has some stream-level flow control.
 953  func (l *loopyWriter) processData() (bool, error) {
 954  	if l.sendQuota == 0 {
 955  		return true, nil
 956  	}
 957  	str := l.activeStreams.dequeue() // Remove the first stream.
 958  	if str == nil {
 959  		return true, nil
 960  	}
 961  	reader := &str.reader
 962  	dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
 963  	if !dataItem.processing {
 964  		dataItem.processing = true
 965  		reader.Reset(dataItem.data)
 966  		dataItem.data.Free()
 967  	}
 968  	// A data item is represented by a dataFrame, since it later translates into
 969  	// multiple HTTP2 data frames.
 970  	// Every dataFrame has two buffers; h that keeps grpc-message header and data
 971  	// that is the actual message. As an optimization to keep wire traffic low, data
 972  	// from data is copied to h to make as big as the maximum possible HTTP2 frame
 973  	// size.
 974  
 975  	if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
 976  		// Client sends out empty data frame with endStream = true
 977  		if err := l.framer.writeData(dataItem.streamID, dataItem.endStream, nil); err != nil {
 978  			return false, err
 979  		}
 980  		str.itl.dequeue() // remove the empty data item from stream
 981  		reader.Close()
 982  		if str.itl.isEmpty() {
 983  			str.state = empty
 984  		} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
 985  			if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
 986  				return false, err
 987  			}
 988  			if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
 989  				return false, err
 990  			}
 991  		} else {
 992  			l.activeStreams.enqueue(str)
 993  		}
 994  		return false, nil
 995  	}
 996  
 997  	// Figure out the maximum size we can send
 998  	maxSize := http2MaxFrameLen
 999  	if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
1000  		str.state = waitingOnStreamQuota
1001  		return false, nil
1002  	} else if maxSize > strQuota {
1003  		maxSize = strQuota
1004  	}
1005  	if maxSize > int(l.sendQuota) { // connection-level flow control.
1006  		maxSize = int(l.sendQuota)
1007  	}
1008  	// Compute how much of the header and data we can send within quota and max frame length
1009  	hSize := min(maxSize, len(dataItem.h))
1010  	dSize := min(maxSize-hSize, reader.Remaining())
1011  	remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
1012  	size := hSize + dSize
1013  
1014  	l.writeBuf = l.writeBuf[:0]
1015  	if hSize > 0 {
1016  		l.writeBuf = append(l.writeBuf, dataItem.h[:hSize])
1017  	}
1018  	if dSize > 0 {
1019  		var err error
1020  		l.writeBuf, err = reader.Peek(dSize, l.writeBuf)
1021  		if err != nil {
1022  			// This must never happen since the reader must have at least dSize
1023  			// bytes.
1024  			// Log an error to fail tests.
1025  			l.logger.Errorf("unexpected error while reading Data frame payload: %v", err)
1026  			return false, err
1027  		}
1028  	}
1029  
1030  	// Now that outgoing flow controls are checked we can replenish str's write quota
1031  	str.wq.replenish(size)
1032  	var endStream bool
1033  	// If this is the last data message on this stream and all of it can be written in this iteration.
1034  	if dataItem.endStream && remainingBytes == 0 {
1035  		endStream = true
1036  	}
1037  	if dataItem.onEachWrite != nil {
1038  		dataItem.onEachWrite()
1039  	}
1040  	err := l.framer.writeData(dataItem.streamID, endStream, l.writeBuf)
1041  	reader.Discard(dSize)
1042  	if cap(l.writeBuf) > maxWriteBufSize {
1043  		l.writeBuf = nil
1044  	} else {
1045  		clear(l.writeBuf)
1046  	}
1047  	if err != nil {
1048  		return false, err
1049  	}
1050  	str.bytesOutStanding += size
1051  	l.sendQuota -= uint32(size)
1052  	dataItem.h = dataItem.h[hSize:]
1053  
1054  	if remainingBytes == 0 { // All the data from that message was written out.
1055  		reader.Close()
1056  		str.itl.dequeue()
1057  	}
1058  	if str.itl.isEmpty() {
1059  		str.state = empty
1060  	} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
1061  		if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
1062  			return false, err
1063  		}
1064  		if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
1065  			return false, err
1066  		}
1067  	} else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
1068  		str.state = waitingOnStreamQuota
1069  	} else { // Otherwise add it back to the list of active streams.
1070  		l.activeStreams.enqueue(str)
1071  	}
1072  	return false, nil
1073  }
1074