1 /*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 package transport
20 21 import (
22 "bytes"
23 "errors"
24 "fmt"
25 "net"
26 "runtime"
27 "strconv"
28 "sync"
29 "sync/atomic"
30 31 "golang.org/x/net/http2"
32 "golang.org/x/net/http2/hpack"
33 "google.golang.org/grpc/internal/grpclog"
34 "google.golang.org/grpc/internal/grpcutil"
35 "google.golang.org/grpc/mem"
36 "google.golang.org/grpc/status"
37 )
38 39 var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
40 e.SetMaxDynamicTableSizeLimit(v)
41 }
42 43 // itemNodePool is used to reduce heap allocations.
44 var itemNodePool = sync.Pool{
45 New: func() any {
46 return &itemNode{}
47 },
48 }
49 50 type itemNode struct {
51 it any
52 next *itemNode
53 }
54 55 type itemList struct {
56 head *itemNode
57 tail *itemNode
58 }
59 60 func (il *itemList) enqueue(i any) {
61 n := itemNodePool.Get().(*itemNode)
62 n.next = nil
63 n.it = i
64 if il.tail == nil {
65 il.head, il.tail = n, n
66 return
67 }
68 il.tail.next = n
69 il.tail = n
70 }
71 72 // peek returns the first item in the list without removing it from the
73 // list.
74 func (il *itemList) peek() any {
75 return il.head.it
76 }
77 78 func (il *itemList) dequeue() any {
79 if il.head == nil {
80 return nil
81 }
82 i := il.head.it
83 temp := il.head
84 il.head = il.head.next
85 itemNodePool.Put(temp)
86 if il.head == nil {
87 il.tail = nil
88 }
89 return i
90 }
91 92 func (il *itemList) dequeueAll() *itemNode {
93 h := il.head
94 il.head, il.tail = nil, nil
95 return h
96 }
97 98 func (il *itemList) isEmpty() bool {
99 return il.head == nil
100 }
101 102 // The following defines various control items which could flow through
103 // the control buffer of transport. They represent different aspects of
104 // control tasks, e.g., flow control, settings, streaming resetting, etc.
105 106 // maxQueuedTransportResponseFrames is the most queued "transport response"
107 // frames we will buffer before preventing new reads from occurring on the
108 // transport. These are control frames sent in response to client requests,
109 // such as RST_STREAM due to bad headers or settings acks.
110 const maxQueuedTransportResponseFrames = 50
111 112 type cbItem interface {
113 isTransportResponseFrame() bool
114 }
115 116 // registerStream is used to register an incoming stream with loopy writer.
117 type registerStream struct {
118 streamID uint32
119 wq *writeQuota
120 }
121 122 func (*registerStream) isTransportResponseFrame() bool { return false }
123 124 // headerFrame is also used to register stream on the client-side.
125 type headerFrame struct {
126 streamID uint32
127 hf []hpack.HeaderField
128 endStream bool // Valid on server side.
129 initStream func(uint32) error // Used only on the client side.
130 onWrite func()
131 wq *writeQuota // write quota for the stream created.
132 cleanup *cleanupStream // Valid on the server side.
133 onOrphaned func(error) // Valid on client-side
134 }
135 136 func (h *headerFrame) isTransportResponseFrame() bool {
137 return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
138 }
139 140 type cleanupStream struct {
141 streamID uint32
142 rst bool
143 rstCode http2.ErrCode
144 onWrite func()
145 }
146 147 func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
148 149 type earlyAbortStream struct {
150 httpStatus uint32
151 streamID uint32
152 contentSubtype string
153 status *status.Status
154 rst bool
155 }
156 157 func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
158 159 type dataFrame struct {
160 streamID uint32
161 endStream bool
162 h []byte
163 data mem.BufferSlice
164 processing bool
165 // onEachWrite is called every time
166 // a part of data is written out.
167 onEachWrite func()
168 }
169 170 func (*dataFrame) isTransportResponseFrame() bool { return false }
171 172 type incomingWindowUpdate struct {
173 streamID uint32
174 increment uint32
175 }
176 177 func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
178 179 type outgoingWindowUpdate struct {
180 streamID uint32
181 increment uint32
182 }
183 184 func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
185 return false // window updates are throttled by thresholds
186 }
187 188 type incomingSettings struct {
189 ss []http2.Setting
190 }
191 192 func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
193 194 type outgoingSettings struct {
195 ss []http2.Setting
196 }
197 198 func (*outgoingSettings) isTransportResponseFrame() bool { return false }
199 200 type incomingGoAway struct {
201 }
202 203 func (*incomingGoAway) isTransportResponseFrame() bool { return false }
204 205 type goAway struct {
206 code http2.ErrCode
207 debugData []byte
208 headsUp bool
209 closeConn error // if set, loopyWriter will exit with this error
210 }
211 212 func (*goAway) isTransportResponseFrame() bool { return false }
213 214 type ping struct {
215 ack bool
216 data [8]byte
217 }
218 219 func (*ping) isTransportResponseFrame() bool { return true }
220 221 type outFlowControlSizeRequest struct {
222 resp chan uint32
223 }
224 225 func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
226 227 // closeConnection is an instruction to tell the loopy writer to flush the
228 // framer and exit, which will cause the transport's connection to be closed
229 // (by the client or server). The transport itself will close after the reader
230 // encounters the EOF caused by the connection closure.
231 type closeConnection struct{}
232 233 func (closeConnection) isTransportResponseFrame() bool { return false }
234 235 type outStreamState int
236 237 const (
238 active outStreamState = iota
239 empty
240 waitingOnStreamQuota
241 )
242 243 type outStream struct {
244 id uint32
245 state outStreamState
246 itl *itemList
247 bytesOutStanding int
248 wq *writeQuota
249 reader mem.Reader
250 251 next *outStream
252 prev *outStream
253 }
254 255 func (s *outStream) deleteSelf() {
256 if s.prev != nil {
257 s.prev.next = s.next
258 }
259 if s.next != nil {
260 s.next.prev = s.prev
261 }
262 s.next, s.prev = nil, nil
263 }
264 265 type outStreamList struct {
266 // Following are sentinel objects that mark the
267 // beginning and end of the list. They do not
268 // contain any item lists. All valid objects are
269 // inserted in between them.
270 // This is needed so that an outStream object can
271 // deleteSelf() in O(1) time without knowing which
272 // list it belongs to.
273 head *outStream
274 tail *outStream
275 }
276 277 func newOutStreamList() *outStreamList {
278 head, tail := new(outStream), new(outStream)
279 head.next = tail
280 tail.prev = head
281 return &outStreamList{
282 head: head,
283 tail: tail,
284 }
285 }
286 287 func (l *outStreamList) enqueue(s *outStream) {
288 e := l.tail.prev
289 e.next = s
290 s.prev = e
291 s.next = l.tail
292 l.tail.prev = s
293 }
294 295 // remove from the beginning of the list.
296 func (l *outStreamList) dequeue() *outStream {
297 b := l.head.next
298 if b == l.tail {
299 return nil
300 }
301 b.deleteSelf()
302 return b
303 }
304 305 // controlBuffer is a way to pass information to loopy.
306 //
307 // Information is passed as specific struct types called control frames. A
308 // control frame not only represents data, messages or headers to be sent out
309 // but can also be used to instruct loopy to update its internal state. It
310 // shouldn't be confused with an HTTP2 frame, although some of the control
311 // frames like dataFrame and headerFrame do go out on wire as HTTP2 frames.
312 type controlBuffer struct {
313 wakeupCh chan struct{} // Unblocks readers waiting for something to read.
314 done <-chan struct{} // Closed when the transport is done.
315 316 // Mutex guards all the fields below, except trfChan which can be read
317 // atomically without holding mu.
318 mu sync.Mutex
319 consumerWaiting bool // True when readers are blocked waiting for new data.
320 closed bool // True when the controlbuf is finished.
321 list *itemList // List of queued control frames.
322 323 // transportResponseFrames counts the number of queued items that represent
324 // the response of an action initiated by the peer. trfChan is created
325 // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
326 // closed and nilled when transportResponseFrames drops below the
327 // threshold. Both fields are protected by mu.
328 transportResponseFrames int
329 trfChan atomic.Pointer[chan struct{}]
330 }
331 332 func newControlBuffer(done <-chan struct{}) *controlBuffer {
333 return &controlBuffer{
334 wakeupCh: make(chan struct{}, 1),
335 list: &itemList{},
336 done: done,
337 }
338 }
339 340 // throttle blocks if there are too many frames in the control buf that
341 // represent the response of an action initiated by the peer, like
342 // incomingSettings cleanupStreams etc.
343 func (c *controlBuffer) throttle() {
344 if ch := c.trfChan.Load(); ch != nil {
345 select {
346 case <-(*ch):
347 case <-c.done:
348 }
349 }
350 }
351 352 // put adds an item to the controlbuf.
353 func (c *controlBuffer) put(it cbItem) error {
354 _, err := c.executeAndPut(nil, it)
355 return err
356 }
357 358 // executeAndPut runs f, and if the return value is true, adds the given item to
359 // the controlbuf. The item could be nil, in which case, this method simply
360 // executes f and does not add the item to the controlbuf.
361 //
362 // The first return value indicates whether the item was successfully added to
363 // the control buffer. A non-nil error, specifically ErrConnClosing, is returned
364 // if the control buffer is already closed.
365 func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
366 c.mu.Lock()
367 defer c.mu.Unlock()
368 369 if c.closed {
370 return false, ErrConnClosing
371 }
372 if f != nil {
373 if !f() { // f wasn't successful
374 return false, nil
375 }
376 }
377 if it == nil {
378 return true, nil
379 }
380 381 var wakeUp bool
382 if c.consumerWaiting {
383 wakeUp = true
384 c.consumerWaiting = false
385 }
386 c.list.enqueue(it)
387 if it.isTransportResponseFrame() {
388 c.transportResponseFrames++
389 if c.transportResponseFrames == maxQueuedTransportResponseFrames {
390 // We are adding the frame that puts us over the threshold; create
391 // a throttling channel.
392 ch := make(chan struct{})
393 c.trfChan.Store(&ch)
394 }
395 }
396 if wakeUp {
397 select {
398 case c.wakeupCh <- struct{}{}:
399 default:
400 }
401 }
402 return true, nil
403 }
404 405 // get returns the next control frame from the control buffer. If block is true
406 // **and** there are no control frames in the control buffer, the call blocks
407 // until one of the conditions is met: there is a frame to return or the
408 // transport is closed.
409 func (c *controlBuffer) get(block bool) (any, error) {
410 for {
411 c.mu.Lock()
412 frame, err := c.getOnceLocked()
413 if frame != nil || err != nil || !block {
414 // If we read a frame or an error, we can return to the caller. The
415 // call to getOnceLocked() returns a nil frame and a nil error if
416 // there is nothing to read, and in that case, if the caller asked
417 // us not to block, we can return now as well.
418 c.mu.Unlock()
419 return frame, err
420 }
421 c.consumerWaiting = true
422 c.mu.Unlock()
423 424 // Release the lock above and wait to be woken up.
425 select {
426 case <-c.wakeupCh:
427 case <-c.done:
428 return nil, errors.New("transport closed by client")
429 }
430 }
431 }
432 433 // Callers must not use this method, but should instead use get().
434 //
435 // Caller must hold c.mu.
436 func (c *controlBuffer) getOnceLocked() (any, error) {
437 if c.closed {
438 return false, ErrConnClosing
439 }
440 if c.list.isEmpty() {
441 return nil, nil
442 }
443 h := c.list.dequeue().(cbItem)
444 if h.isTransportResponseFrame() {
445 if c.transportResponseFrames == maxQueuedTransportResponseFrames {
446 // We are removing the frame that put us over the
447 // threshold; close and clear the throttling channel.
448 ch := c.trfChan.Swap(nil)
449 close(*ch)
450 }
451 c.transportResponseFrames--
452 }
453 return h, nil
454 }
455 456 // finish closes the control buffer, cleaning up any streams that have queued
457 // header frames. Once this method returns, no more frames can be added to the
458 // control buffer, and attempts to do so will return ErrConnClosing.
459 func (c *controlBuffer) finish() {
460 c.mu.Lock()
461 defer c.mu.Unlock()
462 463 if c.closed {
464 return
465 }
466 c.closed = true
467 // There may be headers for streams in the control buffer.
468 // These streams need to be cleaned out since the transport
469 // is still not aware of these yet.
470 for head := c.list.dequeueAll(); head != nil; head = head.next {
471 switch v := head.it.(type) {
472 case *headerFrame:
473 if v.onOrphaned != nil { // It will be nil on the server-side.
474 v.onOrphaned(ErrConnClosing)
475 }
476 case *dataFrame:
477 if !v.processing {
478 v.data.Free()
479 }
480 }
481 }
482 483 // In case throttle() is currently in flight, it needs to be unblocked.
484 // Otherwise, the transport may not close, since the transport is closed by
485 // the reader encountering the connection error.
486 ch := c.trfChan.Swap(nil)
487 if ch != nil {
488 close(*ch)
489 }
490 }
491 492 type side int
493 494 const (
495 clientSide side = iota
496 serverSide
497 )
498 499 // maxWriteBufSize is the maximum length (number of elements) the cached
500 // writeBuf can grow to. The length depends on the number of buffers
501 // contained within the BufferSlice produced by the codec, which is
502 // generally small.
503 //
504 // If a writeBuf larger than this limit is required, it will be allocated
505 // and freed after use, rather than being cached. This avoids holding
506 // on to large amounts of memory.
507 const maxWriteBufSize = 64
508 509 // Loopy receives frames from the control buffer.
510 // Each frame is handled individually; most of the work done by loopy goes
511 // into handling data frames. Loopy maintains a queue of active streams, and each
512 // stream maintains a queue of data frames; as loopy receives data frames
513 // it gets added to the queue of the relevant stream.
514 // Loopy goes over this list of active streams by processing one node every iteration,
515 // thereby closely resembling a round-robin scheduling over all streams. While
516 // processing a stream, loopy writes out data bytes from this stream capped by the min
517 // of http2MaxFrameLen, connection-level flow control and stream-level flow control.
518 type loopyWriter struct {
519 side side
520 cbuf *controlBuffer
521 sendQuota uint32
522 oiws uint32 // outbound initial window size.
523 // estdStreams is map of all established streams that are not cleaned-up yet.
524 // On client-side, this is all streams whose headers were sent out.
525 // On server-side, this is all streams whose headers were received.
526 estdStreams map[uint32]*outStream // Established streams.
527 // activeStreams is a linked-list of all streams that have data to send and some
528 // stream-level flow control quota.
529 // Each of these streams internally have a list of data items(and perhaps trailers
530 // on the server-side) to be sent out.
531 activeStreams *outStreamList
532 framer *framer
533 hBuf *bytes.Buffer // The buffer for HPACK encoding.
534 hEnc *hpack.Encoder // HPACK encoder.
535 bdpEst *bdpEstimator
536 draining bool
537 conn net.Conn
538 logger *grpclog.PrefixLogger
539 bufferPool mem.BufferPool
540 541 // Side-specific handlers
542 ssGoAwayHandler func(*goAway) (bool, error)
543 544 writeBuf [][]byte // cached slice to avoid heap allocations for calls to mem.Reader.Peek.
545 }
546 547 func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
548 var buf bytes.Buffer
549 l := &loopyWriter{
550 side: s,
551 cbuf: cbuf,
552 sendQuota: defaultWindowSize,
553 oiws: defaultWindowSize,
554 estdStreams: make(map[uint32]*outStream),
555 activeStreams: newOutStreamList(),
556 framer: fr,
557 hBuf: &buf,
558 hEnc: hpack.NewEncoder(&buf),
559 bdpEst: bdpEst,
560 conn: conn,
561 logger: logger,
562 ssGoAwayHandler: goAwayHandler,
563 bufferPool: bufferPool,
564 }
565 return l
566 }
567 568 const minBatchSize = 1000
569 570 // run should be run in a separate goroutine.
571 // It reads control frames from controlBuf and processes them by:
572 // 1. Updating loopy's internal state, or/and
573 // 2. Writing out HTTP2 frames on the wire.
574 //
575 // Loopy keeps all active streams with data to send in a linked-list.
576 // All streams in the activeStreams linked-list must have both:
577 // 1. Data to send, and
578 // 2. Stream level flow control quota available.
579 //
580 // In each iteration of run loop, other than processing the incoming control
581 // frame, loopy calls processData, which processes one node from the
582 // activeStreams linked-list. This results in writing of HTTP2 frames into an
583 // underlying write buffer. When there's no more control frames to read from
584 // controlBuf, loopy flushes the write buffer. As an optimization, to increase
585 // the batch size for each flush, loopy yields the processor, once if the batch
586 // size is too low to give stream goroutines a chance to fill it up.
587 //
588 // Upon exiting, if the error causing the exit is not an I/O error, run()
589 // flushes the underlying connection. The connection is always left open to
590 // allow different closing behavior on the client and server.
591 func (l *loopyWriter) run() (err error) {
592 defer func() {
593 if l.logger.V(logLevel) {
594 l.logger.Infof("loopyWriter exiting with error: %v", err)
595 }
596 if !isIOError(err) {
597 l.framer.writer.Flush()
598 }
599 l.cbuf.finish()
600 }()
601 for {
602 it, err := l.cbuf.get(true)
603 if err != nil {
604 return err
605 }
606 if err = l.handle(it); err != nil {
607 return err
608 }
609 if _, err = l.processData(); err != nil {
610 return err
611 }
612 gosched := true
613 hasdata:
614 for {
615 it, err := l.cbuf.get(false)
616 if err != nil {
617 return err
618 }
619 if it != nil {
620 if err = l.handle(it); err != nil {
621 return err
622 }
623 if _, err = l.processData(); err != nil {
624 return err
625 }
626 continue hasdata
627 }
628 isEmpty, err := l.processData()
629 if err != nil {
630 return err
631 }
632 if !isEmpty {
633 continue hasdata
634 }
635 if gosched {
636 gosched = false
637 if l.framer.writer.offset < minBatchSize {
638 runtime.Gosched()
639 continue hasdata
640 }
641 }
642 l.framer.writer.Flush()
643 break hasdata
644 }
645 }
646 }
647 648 func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
649 return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
650 }
651 652 func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
653 // Otherwise update the quota.
654 if w.streamID == 0 {
655 l.sendQuota += w.increment
656 return
657 }
658 // Find the stream and update it.
659 if str, ok := l.estdStreams[w.streamID]; ok {
660 str.bytesOutStanding -= int(w.increment)
661 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
662 str.state = active
663 l.activeStreams.enqueue(str)
664 return
665 }
666 }
667 }
668 669 func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
670 return l.framer.fr.WriteSettings(s.ss...)
671 }
672 673 func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
674 l.applySettings(s.ss)
675 return l.framer.fr.WriteSettingsAck()
676 }
677 678 func (l *loopyWriter) registerStreamHandler(h *registerStream) {
679 str := &outStream{
680 id: h.streamID,
681 state: empty,
682 itl: &itemList{},
683 wq: h.wq,
684 }
685 l.estdStreams[h.streamID] = str
686 }
687 688 func (l *loopyWriter) headerHandler(h *headerFrame) error {
689 if l.side == serverSide {
690 str, ok := l.estdStreams[h.streamID]
691 if !ok {
692 if l.logger.V(logLevel) {
693 l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID)
694 }
695 return nil
696 }
697 // Case 1.A: Server is responding back with headers.
698 if !h.endStream {
699 return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
700 }
701 // else: Case 1.B: Server wants to close stream.
702 703 if str.state != empty { // either active or waiting on stream quota.
704 // add it str's list of items.
705 str.itl.enqueue(h)
706 return nil
707 }
708 if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
709 return err
710 }
711 return l.cleanupStreamHandler(h.cleanup)
712 }
713 // Case 2: Client wants to originate stream.
714 str := &outStream{
715 id: h.streamID,
716 state: empty,
717 itl: &itemList{},
718 wq: h.wq,
719 }
720 return l.originateStream(str, h)
721 }
722 723 func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
724 // l.draining is set when handling GoAway. In which case, we want to avoid
725 // creating new streams.
726 if l.draining {
727 // TODO: provide a better error with the reason we are in draining.
728 hdr.onOrphaned(errStreamDrain)
729 return nil
730 }
731 if err := hdr.initStream(str.id); err != nil {
732 return err
733 }
734 if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
735 return err
736 }
737 l.estdStreams[str.id] = str
738 return nil
739 }
740 741 func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
742 if onWrite != nil {
743 onWrite()
744 }
745 l.hBuf.Reset()
746 for _, f := range hf {
747 if err := l.hEnc.WriteField(f); err != nil {
748 if l.logger.V(logLevel) {
749 l.logger.Warningf("Encountered error while encoding headers: %v", err)
750 }
751 }
752 }
753 var (
754 err error
755 endHeaders, first bool
756 )
757 first = true
758 for !endHeaders {
759 size := l.hBuf.Len()
760 if size > http2MaxFrameLen {
761 size = http2MaxFrameLen
762 } else {
763 endHeaders = true
764 }
765 if first {
766 first = false
767 err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
768 StreamID: streamID,
769 BlockFragment: l.hBuf.Next(size),
770 EndStream: endStream,
771 EndHeaders: endHeaders,
772 })
773 } else {
774 err = l.framer.fr.WriteContinuation(
775 streamID,
776 endHeaders,
777 l.hBuf.Next(size),
778 )
779 }
780 if err != nil {
781 return err
782 }
783 }
784 return nil
785 }
786 787 func (l *loopyWriter) preprocessData(df *dataFrame) {
788 str, ok := l.estdStreams[df.streamID]
789 if !ok {
790 return
791 }
792 // If we got data for a stream it means that
793 // stream was originated and the headers were sent out.
794 str.itl.enqueue(df)
795 if str.state == empty {
796 str.state = active
797 l.activeStreams.enqueue(str)
798 }
799 }
800 801 func (l *loopyWriter) pingHandler(p *ping) error {
802 if !p.ack {
803 l.bdpEst.timesnap(p.data)
804 }
805 return l.framer.fr.WritePing(p.ack, p.data)
806 807 }
808 809 func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) {
810 o.resp <- l.sendQuota
811 }
812 813 func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
814 c.onWrite()
815 if str, ok := l.estdStreams[c.streamID]; ok {
816 // On the server side it could be a trailers-only response or
817 // a RST_STREAM before stream initialization thus the stream might
818 // not be established yet.
819 delete(l.estdStreams, c.streamID)
820 str.reader.Close()
821 str.deleteSelf()
822 for head := str.itl.dequeueAll(); head != nil; head = head.next {
823 if df, ok := head.it.(*dataFrame); ok {
824 if !df.processing {
825 df.data.Free()
826 }
827 }
828 }
829 }
830 if c.rst { // If RST_STREAM needs to be sent.
831 if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
832 return err
833 }
834 }
835 if l.draining && len(l.estdStreams) == 0 {
836 // Flush and close the connection; we are done with it.
837 return errors.New("finished processing active streams while in draining mode")
838 }
839 return nil
840 }
841 842 func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
843 if l.side == clientSide {
844 return errors.New("earlyAbortStream not handled on client")
845 }
846 // In case the caller forgets to set the http status, default to 200.
847 if eas.httpStatus == 0 {
848 eas.httpStatus = 200
849 }
850 headerFields := []hpack.HeaderField{
851 {Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
852 {Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
853 {Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
854 {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
855 }
856 857 if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
858 return err
859 }
860 if eas.rst {
861 if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
862 return err
863 }
864 }
865 return nil
866 }
867 868 func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
869 if l.side == clientSide {
870 l.draining = true
871 if len(l.estdStreams) == 0 {
872 // Flush and close the connection; we are done with it.
873 return errors.New("received GOAWAY with no active streams")
874 }
875 }
876 return nil
877 }
878 879 func (l *loopyWriter) goAwayHandler(g *goAway) error {
880 // Handling of outgoing GoAway is very specific to side.
881 if l.ssGoAwayHandler != nil {
882 draining, err := l.ssGoAwayHandler(g)
883 if err != nil {
884 return err
885 }
886 l.draining = draining
887 }
888 return nil
889 }
890 891 func (l *loopyWriter) handle(i any) error {
892 switch i := i.(type) {
893 case *incomingWindowUpdate:
894 l.incomingWindowUpdateHandler(i)
895 case *outgoingWindowUpdate:
896 return l.outgoingWindowUpdateHandler(i)
897 case *incomingSettings:
898 return l.incomingSettingsHandler(i)
899 case *outgoingSettings:
900 return l.outgoingSettingsHandler(i)
901 case *headerFrame:
902 return l.headerHandler(i)
903 case *registerStream:
904 l.registerStreamHandler(i)
905 case *cleanupStream:
906 return l.cleanupStreamHandler(i)
907 case *earlyAbortStream:
908 return l.earlyAbortStreamHandler(i)
909 case *incomingGoAway:
910 return l.incomingGoAwayHandler(i)
911 case *dataFrame:
912 l.preprocessData(i)
913 case *ping:
914 return l.pingHandler(i)
915 case *goAway:
916 return l.goAwayHandler(i)
917 case *outFlowControlSizeRequest:
918 l.outFlowControlSizeRequestHandler(i)
919 case closeConnection:
920 // Just return a non-I/O error and run() will flush and close the
921 // connection.
922 return ErrConnClosing
923 default:
924 return fmt.Errorf("transport: unknown control message type %T", i)
925 }
926 return nil
927 }
928 929 func (l *loopyWriter) applySettings(ss []http2.Setting) {
930 for _, s := range ss {
931 switch s.ID {
932 case http2.SettingInitialWindowSize:
933 o := l.oiws
934 l.oiws = s.Val
935 if o < l.oiws {
936 // If the new limit is greater make all depleted streams active.
937 for _, stream := range l.estdStreams {
938 if stream.state == waitingOnStreamQuota {
939 stream.state = active
940 l.activeStreams.enqueue(stream)
941 }
942 }
943 }
944 case http2.SettingHeaderTableSize:
945 updateHeaderTblSize(l.hEnc, s.Val)
946 }
947 }
948 }
949 950 // processData removes the first stream from active streams, writes out at most 16KB
951 // of its data and then puts it at the end of activeStreams if there's still more data
952 // to be sent and stream has some stream-level flow control.
953 func (l *loopyWriter) processData() (bool, error) {
954 if l.sendQuota == 0 {
955 return true, nil
956 }
957 str := l.activeStreams.dequeue() // Remove the first stream.
958 if str == nil {
959 return true, nil
960 }
961 reader := &str.reader
962 dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
963 if !dataItem.processing {
964 dataItem.processing = true
965 reader.Reset(dataItem.data)
966 dataItem.data.Free()
967 }
968 // A data item is represented by a dataFrame, since it later translates into
969 // multiple HTTP2 data frames.
970 // Every dataFrame has two buffers; h that keeps grpc-message header and data
971 // that is the actual message. As an optimization to keep wire traffic low, data
972 // from data is copied to h to make as big as the maximum possible HTTP2 frame
973 // size.
974 975 if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
976 // Client sends out empty data frame with endStream = true
977 if err := l.framer.writeData(dataItem.streamID, dataItem.endStream, nil); err != nil {
978 return false, err
979 }
980 str.itl.dequeue() // remove the empty data item from stream
981 reader.Close()
982 if str.itl.isEmpty() {
983 str.state = empty
984 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
985 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
986 return false, err
987 }
988 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
989 return false, err
990 }
991 } else {
992 l.activeStreams.enqueue(str)
993 }
994 return false, nil
995 }
996 997 // Figure out the maximum size we can send
998 maxSize := http2MaxFrameLen
999 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
1000 str.state = waitingOnStreamQuota
1001 return false, nil
1002 } else if maxSize > strQuota {
1003 maxSize = strQuota
1004 }
1005 if maxSize > int(l.sendQuota) { // connection-level flow control.
1006 maxSize = int(l.sendQuota)
1007 }
1008 // Compute how much of the header and data we can send within quota and max frame length
1009 hSize := min(maxSize, len(dataItem.h))
1010 dSize := min(maxSize-hSize, reader.Remaining())
1011 remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
1012 size := hSize + dSize
1013 1014 l.writeBuf = l.writeBuf[:0]
1015 if hSize > 0 {
1016 l.writeBuf = append(l.writeBuf, dataItem.h[:hSize])
1017 }
1018 if dSize > 0 {
1019 var err error
1020 l.writeBuf, err = reader.Peek(dSize, l.writeBuf)
1021 if err != nil {
1022 // This must never happen since the reader must have at least dSize
1023 // bytes.
1024 // Log an error to fail tests.
1025 l.logger.Errorf("unexpected error while reading Data frame payload: %v", err)
1026 return false, err
1027 }
1028 }
1029 1030 // Now that outgoing flow controls are checked we can replenish str's write quota
1031 str.wq.replenish(size)
1032 var endStream bool
1033 // If this is the last data message on this stream and all of it can be written in this iteration.
1034 if dataItem.endStream && remainingBytes == 0 {
1035 endStream = true
1036 }
1037 if dataItem.onEachWrite != nil {
1038 dataItem.onEachWrite()
1039 }
1040 err := l.framer.writeData(dataItem.streamID, endStream, l.writeBuf)
1041 reader.Discard(dSize)
1042 if cap(l.writeBuf) > maxWriteBufSize {
1043 l.writeBuf = nil
1044 } else {
1045 clear(l.writeBuf)
1046 }
1047 if err != nil {
1048 return false, err
1049 }
1050 str.bytesOutStanding += size
1051 l.sendQuota -= uint32(size)
1052 dataItem.h = dataItem.h[hSize:]
1053 1054 if remainingBytes == 0 { // All the data from that message was written out.
1055 reader.Close()
1056 str.itl.dequeue()
1057 }
1058 if str.itl.isEmpty() {
1059 str.state = empty
1060 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
1061 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
1062 return false, err
1063 }
1064 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
1065 return false, err
1066 }
1067 } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
1068 str.state = waitingOnStreamQuota
1069 } else { // Otherwise add it back to the list of active streams.
1070 l.activeStreams.enqueue(str)
1071 }
1072 return false, nil
1073 }
1074