http2_server.go raw
1 /*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 package transport
20
21 import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 rand "math/rand/v2"
29 "net"
30 "net/http"
31 "strconv"
32 "sync"
33 "sync/atomic"
34 "time"
35
36 "golang.org/x/net/http2"
37 "golang.org/x/net/http2/hpack"
38 "google.golang.org/protobuf/proto"
39
40 "google.golang.org/grpc/internal"
41 "google.golang.org/grpc/internal/grpclog"
42 "google.golang.org/grpc/internal/grpcutil"
43 "google.golang.org/grpc/internal/pretty"
44 istatus "google.golang.org/grpc/internal/status"
45 "google.golang.org/grpc/internal/syscall"
46 "google.golang.org/grpc/mem"
47
48 "google.golang.org/grpc/codes"
49 "google.golang.org/grpc/credentials"
50 "google.golang.org/grpc/internal/channelz"
51 "google.golang.org/grpc/internal/grpcsync"
52 "google.golang.org/grpc/keepalive"
53 "google.golang.org/grpc/metadata"
54 "google.golang.org/grpc/peer"
55 "google.golang.org/grpc/stats"
56 "google.golang.org/grpc/status"
57 "google.golang.org/grpc/tap"
58 )
59
60 var (
61 // ErrIllegalHeaderWrite indicates that setting header is illegal because of
62 // the stream's state.
63 ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
64 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
65 // than the limit set by peer.
66 ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
67 )
68
69 // serverConnectionCounter counts the number of connections a server has seen
70 // (equal to the number of http2Servers created). Must be accessed atomically.
71 var serverConnectionCounter uint64
72
73 // http2Server implements the ServerTransport interface with HTTP2.
74 type http2Server struct {
75 lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
76 done chan struct{}
77 conn net.Conn
78 loopy *loopyWriter
79 readerDone chan struct{} // sync point to enable testing.
80 loopyWriterDone chan struct{}
81 peer peer.Peer
82 inTapHandle tap.ServerInHandle
83 framer *framer
84 // The max number of concurrent streams.
85 maxStreams uint32
86 // controlBuf delivers all the control related tasks (e.g., window
87 // updates, reset streams, and various settings) to the controller.
88 controlBuf *controlBuffer
89 fc *trInFlow
90 stats stats.Handler
91 // Keepalive and max-age parameters for the server.
92 kp keepalive.ServerParameters
93 // Keepalive enforcement policy.
94 kep keepalive.EnforcementPolicy
95 // The time instance last ping was received.
96 lastPingAt time.Time
97 // Number of times the client has violated keepalive ping policy so far.
98 pingStrikes uint8
99 // Flag to signify that number of ping strikes should be reset to 0.
100 // This is set whenever data or header frames are sent.
101 // 1 means yes.
102 resetPingStrikes uint32 // Accessed atomically.
103 initialWindowSize int32
104 bdpEst *bdpEstimator
105 maxSendHeaderListSize *uint32
106
107 mu sync.Mutex // guard the following
108
109 // drainEvent is initialized when Drain() is called the first time. After
110 // which the server writes out the first GoAway(with ID 2^31-1) frame. Then
111 // an independent goroutine will be launched to later send the second
112 // GoAway. During this time we don't want to write another first GoAway(with
113 // ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
114 // already initialized since draining is already underway.
115 drainEvent *grpcsync.Event
116 state transportState
117 activeStreams map[uint32]*ServerStream
118 // idle is the time instant when the connection went idle.
119 // This is either the beginning of the connection or when the number of
120 // RPCs go down to 0.
121 // When the connection is busy, this value is set to 0.
122 idle time.Time
123
124 // Fields below are for channelz metric collection.
125 channelz *channelz.Socket
126 bufferPool mem.BufferPool
127
128 connectionID uint64
129
130 // maxStreamMu guards the maximum stream ID
131 // This lock may not be taken if mu is already held.
132 maxStreamMu sync.Mutex
133 maxStreamID uint32 // max stream ID ever seen
134
135 logger *grpclog.PrefixLogger
136 // setResetPingStrikes is stored as a closure instead of making this a
137 // method on http2Server to avoid a heap allocation when converting a method
138 // to a closure for passing to frames objects.
139 setResetPingStrikes func()
140 }
141
142 // NewServerTransport creates a http2 transport with conn and configuration
143 // options from config.
144 //
145 // It returns a non-nil transport and a nil error on success. On failure, it
146 // returns a nil transport and a non-nil error. For a special case where the
147 // underlying conn gets closed before the client preface could be read, it
148 // returns a nil transport and a nil error.
149 func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
150 var authInfo credentials.AuthInfo
151 rawConn := conn
152 if config.Credentials != nil {
153 var err error
154 conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
155 if err != nil {
156 // ErrConnDispatched means that the connection was dispatched away
157 // from gRPC; those connections should be left open. io.EOF means
158 // the connection was closed before handshaking completed, which can
159 // happen naturally from probers. Return these errors directly.
160 if err == credentials.ErrConnDispatched || err == io.EOF {
161 return nil, err
162 }
163 return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
164 }
165 }
166 writeBufSize := config.WriteBufferSize
167 readBufSize := config.ReadBufferSize
168 maxHeaderListSize := defaultServerMaxHeaderListSize
169 if config.MaxHeaderListSize != nil {
170 maxHeaderListSize = *config.MaxHeaderListSize
171 }
172 framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize, config.BufferPool)
173 // Send initial settings as connection preface to client.
174 isettings := []http2.Setting{{
175 ID: http2.SettingMaxFrameSize,
176 Val: http2MaxFrameLen,
177 }}
178 if config.MaxStreams != math.MaxUint32 {
179 isettings = append(isettings, http2.Setting{
180 ID: http2.SettingMaxConcurrentStreams,
181 Val: config.MaxStreams,
182 })
183 }
184 iwz := int32(initialWindowSize)
185 if config.InitialWindowSize >= defaultWindowSize {
186 iwz = config.InitialWindowSize
187 }
188 icwz := int32(initialWindowSize)
189 if config.InitialConnWindowSize >= defaultWindowSize {
190 icwz = config.InitialConnWindowSize
191 }
192 if iwz != defaultWindowSize {
193 isettings = append(isettings, http2.Setting{
194 ID: http2.SettingInitialWindowSize,
195 Val: uint32(iwz)})
196 }
197 if config.MaxHeaderListSize != nil {
198 isettings = append(isettings, http2.Setting{
199 ID: http2.SettingMaxHeaderListSize,
200 Val: *config.MaxHeaderListSize,
201 })
202 }
203 if config.HeaderTableSize != nil {
204 isettings = append(isettings, http2.Setting{
205 ID: http2.SettingHeaderTableSize,
206 Val: *config.HeaderTableSize,
207 })
208 }
209 if err := framer.fr.WriteSettings(isettings...); err != nil {
210 return nil, connectionErrorf(false, err, "transport: %v", err)
211 }
212 // Adjust the connection flow control window if needed.
213 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
214 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
215 return nil, connectionErrorf(false, err, "transport: %v", err)
216 }
217 }
218 kp := config.KeepaliveParams
219 if kp.MaxConnectionIdle == 0 {
220 kp.MaxConnectionIdle = defaultMaxConnectionIdle
221 }
222 if kp.MaxConnectionAge == 0 {
223 kp.MaxConnectionAge = defaultMaxConnectionAge
224 }
225 // Add a jitter to MaxConnectionAge.
226 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
227 if kp.MaxConnectionAgeGrace == 0 {
228 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
229 }
230 if kp.Time == 0 {
231 kp.Time = defaultServerKeepaliveTime
232 }
233 if kp.Timeout == 0 {
234 kp.Timeout = defaultServerKeepaliveTimeout
235 }
236 if kp.Time != infinity {
237 if err = syscall.SetTCPUserTimeout(rawConn, kp.Timeout); err != nil {
238 return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
239 }
240 }
241 kep := config.KeepalivePolicy
242 if kep.MinTime == 0 {
243 kep.MinTime = defaultKeepalivePolicyMinTime
244 }
245
246 done := make(chan struct{})
247 peer := peer.Peer{
248 Addr: conn.RemoteAddr(),
249 LocalAddr: conn.LocalAddr(),
250 AuthInfo: authInfo,
251 }
252 t := &http2Server{
253 done: done,
254 conn: conn,
255 peer: peer,
256 framer: framer,
257 readerDone: make(chan struct{}),
258 loopyWriterDone: make(chan struct{}),
259 maxStreams: config.MaxStreams,
260 inTapHandle: config.InTapHandle,
261 fc: &trInFlow{limit: uint32(icwz)},
262 state: reachable,
263 activeStreams: make(map[uint32]*ServerStream),
264 stats: config.StatsHandler,
265 kp: kp,
266 idle: time.Now(),
267 kep: kep,
268 initialWindowSize: iwz,
269 bufferPool: config.BufferPool,
270 }
271 t.setResetPingStrikes = func() {
272 atomic.StoreUint32(&t.resetPingStrikes, 1)
273 }
274 var czSecurity credentials.ChannelzSecurityValue
275 if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
276 czSecurity = au.GetSecurityValue()
277 }
278 t.channelz = channelz.RegisterSocket(
279 &channelz.Socket{
280 SocketType: channelz.SocketTypeNormal,
281 Parent: config.ChannelzParent,
282 SocketMetrics: channelz.SocketMetrics{},
283 EphemeralMetrics: t.socketMetrics,
284 LocalAddr: t.peer.LocalAddr,
285 RemoteAddr: t.peer.Addr,
286 SocketOptions: channelz.GetSocketOption(t.conn),
287 Security: czSecurity,
288 },
289 )
290 t.logger = prefixLoggerForServerTransport(t)
291
292 t.controlBuf = newControlBuffer(t.done)
293 if !config.StaticWindowSize {
294 t.bdpEst = &bdpEstimator{
295 bdp: initialWindowSize,
296 updateFlowControl: t.updateFlowControl,
297 }
298 }
299
300 t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
301 t.framer.writer.Flush()
302
303 defer func() {
304 if err != nil {
305 t.Close(err)
306 }
307 }()
308
309 // Check the validity of client preface.
310 preface := make([]byte, len(clientPreface))
311 if _, err := io.ReadFull(t.conn, preface); err != nil {
312 // In deployments where a gRPC server runs behind a cloud load balancer
313 // which performs regular TCP level health checks, the connection is
314 // closed immediately by the latter. Returning io.EOF here allows the
315 // grpc server implementation to recognize this scenario and suppress
316 // logging to reduce spam.
317 if err == io.EOF {
318 return nil, io.EOF
319 }
320 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
321 }
322 if !bytes.Equal(preface, clientPreface) {
323 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
324 }
325
326 frame, err := t.framer.fr.ReadFrame()
327 if err == io.EOF || err == io.ErrUnexpectedEOF {
328 return nil, err
329 }
330 if err != nil {
331 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
332 }
333 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
334 sf, ok := frame.(*http2.SettingsFrame)
335 if !ok {
336 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
337 }
338 t.handleSettings(sf)
339
340 go func() {
341 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
342 err := t.loopy.run()
343 close(t.loopyWriterDone)
344 if !isIOError(err) {
345 // Close the connection if a non-I/O error occurs (for I/O errors
346 // the reader will also encounter the error and close). Wait 1
347 // second before closing the connection, or when the reader is done
348 // (i.e. the client already closed the connection or a connection
349 // error occurred). This avoids the potential problem where there
350 // is unread data on the receive side of the connection, which, if
351 // closed, would lead to a TCP RST instead of FIN, and the client
352 // encountering errors. For more info:
353 // https://github.com/grpc/grpc-go/issues/5358
354 timer := time.NewTimer(time.Second)
355 defer timer.Stop()
356 select {
357 case <-t.readerDone:
358 case <-timer.C:
359 }
360 t.conn.Close()
361 }
362 }()
363 go t.keepalive()
364 return t, nil
365 }
366
367 // operateHeaders takes action on the decoded headers. Returns an error if fatal
368 // error encountered and transport needs to close, otherwise returns nil.
369 func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*ServerStream)) error {
370 // Acquire max stream ID lock for entire duration
371 t.maxStreamMu.Lock()
372 defer t.maxStreamMu.Unlock()
373
374 streamID := frame.Header().StreamID
375
376 // frame.Truncated is set to true when framer detects that the current header
377 // list size hits MaxHeaderListSize limit.
378 if frame.Truncated {
379 t.controlBuf.put(&cleanupStream{
380 streamID: streamID,
381 rst: true,
382 rstCode: http2.ErrCodeFrameSize,
383 onWrite: func() {},
384 })
385 return nil
386 }
387
388 if streamID%2 != 1 || streamID <= t.maxStreamID {
389 // illegal gRPC stream id.
390 return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
391 }
392 t.maxStreamID = streamID
393
394 s := &ServerStream{
395 Stream: Stream{
396 id: streamID,
397 fc: inFlow{limit: uint32(t.initialWindowSize)},
398 },
399 st: t,
400 headerWireLength: int(frame.Header().Length),
401 }
402 s.Stream.buf.init()
403 var (
404 // if false, content-type was missing or invalid
405 isGRPC = false
406 contentType = ""
407 mdata = make(metadata.MD, len(frame.Fields))
408 httpMethod string
409 // these are set if an error is encountered while parsing the headers
410 protocolError bool
411 headerError *status.Status
412
413 timeoutSet bool
414 timeout time.Duration
415 )
416
417 for _, hf := range frame.Fields {
418 switch hf.Name {
419 case "content-type":
420 contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
421 if !validContentType {
422 contentType = hf.Value
423 break
424 }
425 mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
426 s.contentSubtype = contentSubtype
427 isGRPC = true
428
429 case "grpc-accept-encoding":
430 mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
431 if hf.Value == "" {
432 continue
433 }
434 compressors := hf.Value
435 if s.clientAdvertisedCompressors != "" {
436 compressors = s.clientAdvertisedCompressors + "," + compressors
437 }
438 s.clientAdvertisedCompressors = compressors
439 case "grpc-encoding":
440 s.recvCompress = hf.Value
441 case ":method":
442 httpMethod = hf.Value
443 case ":path":
444 s.method = hf.Value
445 case "grpc-timeout":
446 timeoutSet = true
447 var err error
448 if timeout, err = decodeTimeout(hf.Value); err != nil {
449 headerError = status.Newf(codes.Internal, "malformed grpc-timeout: %v", err)
450 }
451 // "Transports must consider requests containing the Connection header
452 // as malformed." - A41
453 case "connection":
454 if t.logger.V(logLevel) {
455 t.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
456 }
457 protocolError = true
458 default:
459 if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
460 break
461 }
462 v, err := decodeMetadataHeader(hf.Name, hf.Value)
463 if err != nil {
464 headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
465 t.logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
466 break
467 }
468 mdata[hf.Name] = append(mdata[hf.Name], v)
469 }
470 }
471
472 // "If multiple Host headers or multiple :authority headers are present, the
473 // request must be rejected with an HTTP status code 400 as required by Host
474 // validation in RFC 7230 ยง5.4, gRPC status code INTERNAL, or RST_STREAM
475 // with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
476 // error, this takes precedence over a client not speaking gRPC.
477 if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
478 errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
479 if t.logger.V(logLevel) {
480 t.logger.Infof("Aborting the stream early: %v", errMsg)
481 }
482 t.controlBuf.put(&earlyAbortStream{
483 httpStatus: http.StatusBadRequest,
484 streamID: streamID,
485 contentSubtype: s.contentSubtype,
486 status: status.New(codes.Internal, errMsg),
487 rst: !frame.StreamEnded(),
488 })
489 return nil
490 }
491
492 if protocolError {
493 t.controlBuf.put(&cleanupStream{
494 streamID: streamID,
495 rst: true,
496 rstCode: http2.ErrCodeProtocol,
497 onWrite: func() {},
498 })
499 return nil
500 }
501 if !isGRPC {
502 t.controlBuf.put(&earlyAbortStream{
503 httpStatus: http.StatusUnsupportedMediaType,
504 streamID: streamID,
505 contentSubtype: s.contentSubtype,
506 status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
507 rst: !frame.StreamEnded(),
508 })
509 return nil
510 }
511 if headerError != nil {
512 t.controlBuf.put(&earlyAbortStream{
513 httpStatus: http.StatusBadRequest,
514 streamID: streamID,
515 contentSubtype: s.contentSubtype,
516 status: headerError,
517 rst: !frame.StreamEnded(),
518 })
519 return nil
520 }
521
522 // "If :authority is missing, Host must be renamed to :authority." - A41
523 if len(mdata[":authority"]) == 0 {
524 // No-op if host isn't present, no eventual :authority header is a valid
525 // RPC.
526 if host, ok := mdata["host"]; ok {
527 mdata[":authority"] = host
528 delete(mdata, "host")
529 }
530 } else {
531 // "If :authority is present, Host must be discarded" - A41
532 delete(mdata, "host")
533 }
534
535 if frame.StreamEnded() {
536 // s is just created by the caller. No lock needed.
537 s.state = streamReadDone
538 }
539 if timeoutSet {
540 s.ctx, s.cancel = context.WithTimeout(ctx, timeout)
541 } else {
542 s.ctx, s.cancel = context.WithCancel(ctx)
543 }
544
545 // Attach the received metadata to the context.
546 if len(mdata) > 0 {
547 s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
548 }
549 t.mu.Lock()
550 if t.state != reachable {
551 t.mu.Unlock()
552 s.cancel()
553 return nil
554 }
555 if uint32(len(t.activeStreams)) >= t.maxStreams {
556 t.mu.Unlock()
557 t.controlBuf.put(&cleanupStream{
558 streamID: streamID,
559 rst: true,
560 rstCode: http2.ErrCodeRefusedStream,
561 onWrite: func() {},
562 })
563 s.cancel()
564 return nil
565 }
566 if httpMethod != http.MethodPost {
567 t.mu.Unlock()
568 errMsg := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", httpMethod)
569 if t.logger.V(logLevel) {
570 t.logger.Infof("Aborting the stream early: %v", errMsg)
571 }
572 t.controlBuf.put(&earlyAbortStream{
573 httpStatus: http.StatusMethodNotAllowed,
574 streamID: streamID,
575 contentSubtype: s.contentSubtype,
576 status: status.New(codes.Internal, errMsg),
577 rst: !frame.StreamEnded(),
578 })
579 s.cancel()
580 return nil
581 }
582 if t.inTapHandle != nil {
583 var err error
584 if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method, Header: mdata}); err != nil {
585 t.mu.Unlock()
586 if t.logger.V(logLevel) {
587 t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
588 }
589 stat, ok := status.FromError(err)
590 if !ok {
591 stat = status.New(codes.PermissionDenied, err.Error())
592 }
593 t.controlBuf.put(&earlyAbortStream{
594 httpStatus: http.StatusOK,
595 streamID: s.id,
596 contentSubtype: s.contentSubtype,
597 status: stat,
598 rst: !frame.StreamEnded(),
599 })
600 return nil
601 }
602 }
603
604 if s.ctx.Err() != nil {
605 t.mu.Unlock()
606 // Early abort in case the timeout was zero or so low it already fired.
607 t.controlBuf.put(&earlyAbortStream{
608 httpStatus: http.StatusOK,
609 streamID: s.id,
610 contentSubtype: s.contentSubtype,
611 status: status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()),
612 rst: !frame.StreamEnded(),
613 })
614 return nil
615 }
616
617 t.activeStreams[streamID] = s
618 if len(t.activeStreams) == 1 {
619 t.idle = time.Time{}
620 }
621
622 // Start a timer to close the stream on reaching the deadline.
623 if timeoutSet {
624 // We need to wait for s.cancel to be updated before calling
625 // t.closeStream to avoid data races.
626 cancelUpdated := make(chan struct{})
627 timer := internal.TimeAfterFunc(timeout, func() {
628 <-cancelUpdated
629 t.closeStream(s, true, http2.ErrCodeCancel, false)
630 })
631 oldCancel := s.cancel
632 s.cancel = func() {
633 oldCancel()
634 timer.Stop()
635 }
636 close(cancelUpdated)
637 }
638 t.mu.Unlock()
639 if channelz.IsOn() {
640 t.channelz.SocketMetrics.StreamsStarted.Add(1)
641 t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
642 }
643 s.readRequester = s
644 s.ctxDone = s.ctx.Done()
645 s.Stream.wq.init(defaultWriteQuota, s.ctxDone)
646 s.trReader = transportReader{
647 reader: recvBufferReader{
648 ctx: s.ctx,
649 ctxDone: s.ctxDone,
650 recv: &s.buf,
651 },
652 windowHandler: s,
653 }
654 // Register the stream with loopy.
655 t.controlBuf.put(®isterStream{
656 streamID: s.id,
657 wq: &s.wq,
658 })
659 handle(s)
660 return nil
661 }
662
663 // HandleStreams receives incoming streams using the given handler. This is
664 // typically run in a separate goroutine.
665 // traceCtx attaches trace to ctx and returns the new context.
666 func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStream)) {
667 defer func() {
668 close(t.readerDone)
669 <-t.loopyWriterDone
670 }()
671 for {
672 t.controlBuf.throttle()
673 frame, err := t.framer.readFrame()
674 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
675 if err != nil {
676 if se, ok := err.(http2.StreamError); ok {
677 if t.logger.V(logLevel) {
678 t.logger.Warningf("Encountered http2.StreamError: %v", se)
679 }
680 t.mu.Lock()
681 s := t.activeStreams[se.StreamID]
682 t.mu.Unlock()
683 if s != nil {
684 t.closeStream(s, true, se.Code, false)
685 } else {
686 t.controlBuf.put(&cleanupStream{
687 streamID: se.StreamID,
688 rst: true,
689 rstCode: se.Code,
690 onWrite: func() {},
691 })
692 }
693 continue
694 }
695 t.Close(err)
696 return
697 }
698 switch frame := frame.(type) {
699 case *http2.MetaHeadersFrame:
700 if err := t.operateHeaders(ctx, frame, handle); err != nil {
701 // Any error processing client headers, e.g. invalid stream ID,
702 // is considered a protocol violation.
703 t.controlBuf.put(&goAway{
704 code: http2.ErrCodeProtocol,
705 debugData: []byte(err.Error()),
706 closeConn: err,
707 })
708 continue
709 }
710 case *parsedDataFrame:
711 t.handleData(frame)
712 frame.data.Free()
713 case *http2.RSTStreamFrame:
714 t.handleRSTStream(frame)
715 case *http2.SettingsFrame:
716 t.handleSettings(frame)
717 case *http2.PingFrame:
718 t.handlePing(frame)
719 case *http2.WindowUpdateFrame:
720 t.handleWindowUpdate(frame)
721 case *http2.GoAwayFrame:
722 // TODO: Handle GoAway from the client appropriately.
723 default:
724 if t.logger.V(logLevel) {
725 t.logger.Infof("Received unsupported frame type %T", frame)
726 }
727 }
728 }
729 }
730
731 func (t *http2Server) getStream(f http2.Frame) (*ServerStream, bool) {
732 t.mu.Lock()
733 defer t.mu.Unlock()
734 if t.activeStreams == nil {
735 // The transport is closing.
736 return nil, false
737 }
738 s, ok := t.activeStreams[f.Header().StreamID]
739 if !ok {
740 // The stream is already done.
741 return nil, false
742 }
743 return s, true
744 }
745
746 // adjustWindow sends out extra window update over the initial window size
747 // of stream if the application is requesting data larger in size than
748 // the window.
749 func (t *http2Server) adjustWindow(s *ServerStream, n uint32) {
750 if w := s.fc.maybeAdjust(n); w > 0 {
751 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
752 }
753
754 }
755
756 // updateWindow adjusts the inbound quota for the stream and the transport.
757 // Window updates will deliver to the controller for sending when
758 // the cumulative quota exceeds the corresponding threshold.
759 func (t *http2Server) updateWindow(s *ServerStream, n uint32) {
760 if w := s.fc.onRead(n); w > 0 {
761 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
762 increment: w,
763 })
764 }
765 }
766
767 // updateFlowControl updates the incoming flow control windows
768 // for the transport and the stream based on the current bdp
769 // estimation.
770 func (t *http2Server) updateFlowControl(n uint32) {
771 t.mu.Lock()
772 for _, s := range t.activeStreams {
773 s.fc.newLimit(n)
774 }
775 t.initialWindowSize = int32(n)
776 t.mu.Unlock()
777 t.controlBuf.put(&outgoingWindowUpdate{
778 streamID: 0,
779 increment: t.fc.newLimit(n),
780 })
781 t.controlBuf.put(&outgoingSettings{
782 ss: []http2.Setting{
783 {
784 ID: http2.SettingInitialWindowSize,
785 Val: n,
786 },
787 },
788 })
789
790 }
791
792 func (t *http2Server) handleData(f *parsedDataFrame) {
793 size := f.Header().Length
794 var sendBDPPing bool
795 if t.bdpEst != nil {
796 sendBDPPing = t.bdpEst.add(size)
797 }
798 // Decouple connection's flow control from application's read.
799 // An update on connection's flow control should not depend on
800 // whether user application has read the data or not. Such a
801 // restriction is already imposed on the stream's flow control,
802 // and therefore the sender will be blocked anyways.
803 // Decoupling the connection flow control will prevent other
804 // active(fast) streams from starving in presence of slow or
805 // inactive streams.
806 if w := t.fc.onData(size); w > 0 {
807 t.controlBuf.put(&outgoingWindowUpdate{
808 streamID: 0,
809 increment: w,
810 })
811 }
812 if sendBDPPing {
813 // Avoid excessive ping detection (e.g. in an L7 proxy)
814 // by sending a window update prior to the BDP ping.
815 if w := t.fc.reset(); w > 0 {
816 t.controlBuf.put(&outgoingWindowUpdate{
817 streamID: 0,
818 increment: w,
819 })
820 }
821 t.controlBuf.put(bdpPing)
822 }
823 // Select the right stream to dispatch.
824 s, ok := t.getStream(f)
825 if !ok {
826 return
827 }
828 if s.getState() == streamReadDone {
829 t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
830 return
831 }
832 if size > 0 {
833 if err := s.fc.onData(size); err != nil {
834 t.closeStream(s, true, http2.ErrCodeFlowControl, false)
835 return
836 }
837 dataLen := f.data.Len()
838 if f.Header().Flags.Has(http2.FlagDataPadded) {
839 if w := s.fc.onRead(size - uint32(dataLen)); w > 0 {
840 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
841 }
842 }
843 if dataLen > 0 {
844 f.data.Ref()
845 s.write(recvMsg{buffer: f.data})
846 }
847 }
848 if f.StreamEnded() {
849 // Received the end of stream from the client.
850 s.compareAndSwapState(streamActive, streamReadDone)
851 s.write(recvMsg{err: io.EOF})
852 }
853 }
854
855 func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
856 // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
857 if s, ok := t.getStream(f); ok {
858 t.closeStream(s, false, 0, false)
859 return
860 }
861 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
862 t.controlBuf.put(&cleanupStream{
863 streamID: f.Header().StreamID,
864 rst: false,
865 rstCode: 0,
866 onWrite: func() {},
867 })
868 }
869
870 func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
871 if f.IsAck() {
872 return
873 }
874 var ss []http2.Setting
875 var updateFuncs []func()
876 f.ForeachSetting(func(s http2.Setting) error {
877 switch s.ID {
878 case http2.SettingMaxHeaderListSize:
879 updateFuncs = append(updateFuncs, func() {
880 t.maxSendHeaderListSize = new(uint32)
881 *t.maxSendHeaderListSize = s.Val
882 })
883 default:
884 ss = append(ss, s)
885 }
886 return nil
887 })
888 t.controlBuf.executeAndPut(func() bool {
889 for _, f := range updateFuncs {
890 f()
891 }
892 return true
893 }, &incomingSettings{
894 ss: ss,
895 })
896 }
897
898 const (
899 maxPingStrikes = 2
900 defaultPingTimeout = 2 * time.Hour
901 )
902
903 func (t *http2Server) handlePing(f *http2.PingFrame) {
904 if f.IsAck() {
905 if f.Data == goAwayPing.data && t.drainEvent != nil {
906 t.drainEvent.Fire()
907 return
908 }
909 // Maybe it's a BDP ping.
910 if t.bdpEst != nil {
911 t.bdpEst.calculate(f.Data)
912 }
913 return
914 }
915 pingAck := &ping{ack: true}
916 copy(pingAck.data[:], f.Data[:])
917 t.controlBuf.put(pingAck)
918
919 now := time.Now()
920 defer func() {
921 t.lastPingAt = now
922 }()
923 // A reset ping strikes means that we don't need to check for policy
924 // violation for this ping and the pingStrikes counter should be set
925 // to 0.
926 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
927 t.pingStrikes = 0
928 return
929 }
930 t.mu.Lock()
931 ns := len(t.activeStreams)
932 t.mu.Unlock()
933 if ns < 1 && !t.kep.PermitWithoutStream {
934 // Keepalive shouldn't be active thus, this new ping should
935 // have come after at least defaultPingTimeout.
936 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
937 t.pingStrikes++
938 }
939 } else {
940 // Check if keepalive policy is respected.
941 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
942 t.pingStrikes++
943 }
944 }
945
946 if t.pingStrikes > maxPingStrikes {
947 // Send goaway and close the connection.
948 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
949 }
950 }
951
952 func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
953 t.controlBuf.put(&incomingWindowUpdate{
954 streamID: f.Header().StreamID,
955 increment: f.Increment,
956 })
957 }
958
959 func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
960 for k, vv := range md {
961 if isReservedHeader(k) {
962 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
963 continue
964 }
965 for _, v := range vv {
966 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
967 }
968 }
969 return headerFields
970 }
971
972 func (t *http2Server) checkForHeaderListSize(it any) bool {
973 if t.maxSendHeaderListSize == nil {
974 return true
975 }
976 hdrFrame := it.(*headerFrame)
977 var sz int64
978 for _, f := range hdrFrame.hf {
979 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
980 if t.logger.V(logLevel) {
981 t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
982 }
983 return false
984 }
985 }
986 return true
987 }
988
989 func (t *http2Server) streamContextErr(s *ServerStream) error {
990 select {
991 case <-t.done:
992 return ErrConnClosing
993 default:
994 }
995 return ContextErr(s.ctx.Err())
996 }
997
998 // WriteHeader sends the header metadata md back to the client.
999 func (t *http2Server) writeHeader(s *ServerStream, md metadata.MD) error {
1000 s.hdrMu.Lock()
1001 defer s.hdrMu.Unlock()
1002 if s.getState() == streamDone {
1003 return t.streamContextErr(s)
1004 }
1005
1006 if s.updateHeaderSent() {
1007 return ErrIllegalHeaderWrite
1008 }
1009
1010 if md.Len() > 0 {
1011 if s.header.Len() > 0 {
1012 s.header = metadata.Join(s.header, md)
1013 } else {
1014 s.header = md
1015 }
1016 }
1017 if err := t.writeHeaderLocked(s); err != nil {
1018 switch e := err.(type) {
1019 case ConnectionError:
1020 return status.Error(codes.Unavailable, e.Desc)
1021 default:
1022 return status.Convert(err).Err()
1023 }
1024 }
1025 return nil
1026 }
1027
1028 func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
1029 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
1030 // first and create a slice of that exact size.
1031 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
1032 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
1033 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
1034 if s.sendCompress != "" {
1035 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
1036 }
1037 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
1038 hf := &headerFrame{
1039 streamID: s.id,
1040 hf: headerFields,
1041 endStream: false,
1042 onWrite: t.setResetPingStrikes,
1043 }
1044 success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf)
1045 if !success {
1046 if err != nil {
1047 return err
1048 }
1049 t.closeStream(s, true, http2.ErrCodeInternal, false)
1050 return ErrHeaderListSizeLimitViolation
1051 }
1052 if t.stats != nil {
1053 // Note: Headers are compressed with hpack after this call returns.
1054 // No WireLength field is set here.
1055 t.stats.HandleRPC(s.Context(), &stats.OutHeader{
1056 Header: s.header.Copy(),
1057 Compression: s.sendCompress,
1058 })
1059 }
1060 return nil
1061 }
1062
1063 // writeStatus sends stream status to the client and terminates the stream.
1064 // There is no further I/O operations being able to perform on this stream.
1065 // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
1066 // OK is adopted.
1067 func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
1068 s.hdrMu.Lock()
1069 defer s.hdrMu.Unlock()
1070
1071 if s.getState() == streamDone {
1072 return nil
1073 }
1074
1075 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
1076 // first and create a slice of that exact size.
1077 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
1078 if !s.updateHeaderSent() { // No headers have been sent.
1079 if len(s.header) > 0 { // Send a separate header frame.
1080 if err := t.writeHeaderLocked(s); err != nil {
1081 return err
1082 }
1083 } else { // Send a trailer only response.
1084 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
1085 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
1086 }
1087 }
1088 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
1089 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
1090
1091 if p := istatus.RawStatusProto(st); len(p.GetDetails()) > 0 {
1092 // Do not use the user's grpc-status-details-bin (if present) if we are
1093 // even attempting to set our own.
1094 delete(s.trailer, grpcStatusDetailsBinHeader)
1095 stBytes, err := proto.Marshal(p)
1096 if err != nil {
1097 // TODO: return error instead, when callers are able to handle it.
1098 t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
1099 } else {
1100 headerFields = append(headerFields, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)})
1101 }
1102 }
1103
1104 // Attach the trailer metadata.
1105 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
1106 trailingHeader := &headerFrame{
1107 streamID: s.id,
1108 hf: headerFields,
1109 endStream: true,
1110 onWrite: t.setResetPingStrikes,
1111 }
1112
1113 success, err := t.controlBuf.executeAndPut(func() bool {
1114 return t.checkForHeaderListSize(trailingHeader)
1115 }, nil)
1116 if !success {
1117 if err != nil {
1118 return err
1119 }
1120 t.closeStream(s, true, http2.ErrCodeInternal, false)
1121 return ErrHeaderListSizeLimitViolation
1122 }
1123 // Send a RST_STREAM after the trailers if the client has not already half-closed.
1124 rst := s.getState() == streamActive
1125 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
1126 if t.stats != nil {
1127 // Note: The trailer fields are compressed with hpack after this call returns.
1128 // No WireLength field is set here.
1129 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
1130 Trailer: s.trailer.Copy(),
1131 })
1132 }
1133 return nil
1134 }
1135
1136 // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
1137 // is returns if it fails (e.g., framing error, transport error).
1138 func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
1139 if !s.isHeaderSent() { // Headers haven't been written yet.
1140 if err := t.writeHeader(s, nil); err != nil {
1141 return err
1142 }
1143 } else {
1144 // Writing headers checks for this condition.
1145 if s.getState() == streamDone {
1146 return t.streamContextErr(s)
1147 }
1148 }
1149
1150 df := &dataFrame{
1151 streamID: s.id,
1152 h: hdr,
1153 data: data,
1154 onEachWrite: t.setResetPingStrikes,
1155 }
1156 dataLen := data.Len()
1157 if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
1158 return t.streamContextErr(s)
1159 }
1160 data.Ref()
1161 if err := t.controlBuf.put(df); err != nil {
1162 data.Free()
1163 return err
1164 }
1165 t.incrMsgSent()
1166 return nil
1167 }
1168
1169 // keepalive running in a separate goroutine does the following:
1170 // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
1171 // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
1172 // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
1173 // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
1174 // after an additional duration of keepalive.Timeout.
1175 func (t *http2Server) keepalive() {
1176 p := &ping{}
1177 // True iff a ping has been sent, and no data has been received since then.
1178 outstandingPing := false
1179 // Amount of time remaining before which we should receive an ACK for the
1180 // last sent ping.
1181 kpTimeoutLeft := time.Duration(0)
1182 // Records the last value of t.lastRead before we go block on the timer.
1183 // This is required to check for read activity since then.
1184 prevNano := time.Now().UnixNano()
1185 // Initialize the different timers to their default values.
1186 idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
1187 ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
1188 kpTimer := time.NewTimer(t.kp.Time)
1189 defer func() {
1190 // We need to drain the underlying channel in these timers after a call
1191 // to Stop(), only if we are interested in resetting them. Clearly we
1192 // are not interested in resetting them here.
1193 idleTimer.Stop()
1194 ageTimer.Stop()
1195 kpTimer.Stop()
1196 }()
1197
1198 for {
1199 select {
1200 case <-idleTimer.C:
1201 t.mu.Lock()
1202 idle := t.idle
1203 if idle.IsZero() { // The connection is non-idle.
1204 t.mu.Unlock()
1205 idleTimer.Reset(t.kp.MaxConnectionIdle)
1206 continue
1207 }
1208 val := t.kp.MaxConnectionIdle - time.Since(idle)
1209 t.mu.Unlock()
1210 if val <= 0 {
1211 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
1212 // Gracefully close the connection.
1213 t.Drain("max_idle")
1214 return
1215 }
1216 idleTimer.Reset(val)
1217 case <-ageTimer.C:
1218 t.Drain("max_age")
1219 ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
1220 select {
1221 case <-ageTimer.C:
1222 // Close the connection after grace period.
1223 if t.logger.V(logLevel) {
1224 t.logger.Infof("Closing server transport due to maximum connection age")
1225 }
1226 t.controlBuf.put(closeConnection{})
1227 case <-t.done:
1228 }
1229 return
1230 case <-kpTimer.C:
1231 lastRead := atomic.LoadInt64(&t.lastRead)
1232 if lastRead > prevNano {
1233 // There has been read activity since the last time we were
1234 // here. Setup the timer to fire at kp.Time seconds from
1235 // lastRead time and continue.
1236 outstandingPing = false
1237 kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1238 prevNano = lastRead
1239 continue
1240 }
1241 if outstandingPing && kpTimeoutLeft <= 0 {
1242 t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Timeout))
1243 return
1244 }
1245 if !outstandingPing {
1246 if channelz.IsOn() {
1247 t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
1248 }
1249 t.controlBuf.put(p)
1250 kpTimeoutLeft = t.kp.Timeout
1251 outstandingPing = true
1252 }
1253 // The amount of time to sleep here is the minimum of kp.Time and
1254 // timeoutLeft. This will ensure that we wait only for kp.Time
1255 // before sending out the next ping (for cases where the ping is
1256 // acked).
1257 sleepDuration := min(t.kp.Time, kpTimeoutLeft)
1258 kpTimeoutLeft -= sleepDuration
1259 kpTimer.Reset(sleepDuration)
1260 case <-t.done:
1261 return
1262 }
1263 }
1264 }
1265
1266 // Close starts shutting down the http2Server transport.
1267 // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1268 // could cause some resource issue. Revisit this later.
1269 func (t *http2Server) Close(err error) {
1270 t.mu.Lock()
1271 if t.state == closing {
1272 t.mu.Unlock()
1273 return
1274 }
1275 if t.logger.V(logLevel) {
1276 t.logger.Infof("Closing: %v", err)
1277 }
1278 t.state = closing
1279 streams := t.activeStreams
1280 t.activeStreams = nil
1281 t.mu.Unlock()
1282 t.controlBuf.finish()
1283 close(t.done)
1284 if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
1285 t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
1286 }
1287 channelz.RemoveEntry(t.channelz.ID)
1288 // Cancel all active streams.
1289 for _, s := range streams {
1290 s.cancel()
1291 }
1292 }
1293
1294 // deleteStream deletes the stream s from transport's active streams.
1295 func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
1296 t.mu.Lock()
1297 _, isActive := t.activeStreams[s.id]
1298 if isActive {
1299 delete(t.activeStreams, s.id)
1300 if len(t.activeStreams) == 0 {
1301 t.idle = time.Now()
1302 }
1303 }
1304 t.mu.Unlock()
1305
1306 if isActive && channelz.IsOn() {
1307 if eosReceived {
1308 t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
1309 } else {
1310 t.channelz.SocketMetrics.StreamsFailed.Add(1)
1311 }
1312 }
1313 }
1314
1315 // finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1316 func (t *http2Server) finishStream(s *ServerStream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1317 // In case stream sending and receiving are invoked in separate
1318 // goroutines (e.g., bi-directional streaming), cancel needs to be
1319 // called to interrupt the potential blocking on other goroutines.
1320 s.cancel()
1321
1322 oldState := s.swapState(streamDone)
1323 if oldState == streamDone {
1324 // If the stream was already done, return.
1325 return
1326 }
1327
1328 hdr.cleanup = &cleanupStream{
1329 streamID: s.id,
1330 rst: rst,
1331 rstCode: rstCode,
1332 onWrite: func() {
1333 t.deleteStream(s, eosReceived)
1334 },
1335 }
1336 t.controlBuf.put(hdr)
1337 }
1338
1339 // closeStream clears the footprint of a stream when the stream is not needed any more.
1340 func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1341 // In case stream sending and receiving are invoked in separate
1342 // goroutines (e.g., bi-directional streaming), cancel needs to be
1343 // called to interrupt the potential blocking on other goroutines.
1344 s.cancel()
1345
1346 // We can't return early even if the stream's state is "done" as the state
1347 // might have been set by the `finishStream` method. Deleting the stream via
1348 // `finishStream` can get blocked on flow control.
1349 s.swapState(streamDone)
1350 t.deleteStream(s, eosReceived)
1351
1352 t.controlBuf.put(&cleanupStream{
1353 streamID: s.id,
1354 rst: rst,
1355 rstCode: rstCode,
1356 onWrite: func() {},
1357 })
1358 }
1359
1360 func (t *http2Server) Drain(debugData string) {
1361 t.mu.Lock()
1362 defer t.mu.Unlock()
1363 if t.drainEvent != nil {
1364 return
1365 }
1366 t.drainEvent = grpcsync.NewEvent()
1367 t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
1368 }
1369
1370 var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1371
1372 // Handles outgoing GoAway and returns true if loopy needs to put itself
1373 // in draining mode.
1374 func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1375 t.maxStreamMu.Lock()
1376 t.mu.Lock()
1377 if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1378 t.mu.Unlock()
1379 t.maxStreamMu.Unlock()
1380 // The transport is closing.
1381 return false, ErrConnClosing
1382 }
1383 if !g.headsUp {
1384 // Stop accepting more streams now.
1385 t.state = draining
1386 sid := t.maxStreamID
1387 retErr := g.closeConn
1388 if len(t.activeStreams) == 0 {
1389 retErr = errors.New("second GOAWAY written and no active streams left to process")
1390 }
1391 t.mu.Unlock()
1392 t.maxStreamMu.Unlock()
1393 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1394 return false, err
1395 }
1396 t.framer.writer.Flush()
1397 if retErr != nil {
1398 return false, retErr
1399 }
1400 return true, nil
1401 }
1402 t.mu.Unlock()
1403 t.maxStreamMu.Unlock()
1404 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1405 // Follow that with a ping and wait for the ack to come back or a timer
1406 // to expire. During this time accept new streams since they might have
1407 // originated before the GoAway reaches the client.
1408 // After getting the ack or timer expiration send out another GoAway this
1409 // time with an ID of the max stream server intends to process.
1410 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
1411 return false, err
1412 }
1413 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1414 return false, err
1415 }
1416 go func() {
1417 timer := time.NewTimer(5 * time.Second)
1418 defer timer.Stop()
1419 select {
1420 case <-t.drainEvent.Done():
1421 case <-timer.C:
1422 case <-t.done:
1423 return
1424 }
1425 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1426 }()
1427 return false, nil
1428 }
1429
1430 func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics {
1431 return &channelz.EphemeralSocketMetrics{
1432 LocalFlowControlWindow: int64(t.fc.getSize()),
1433 RemoteFlowControlWindow: t.getOutFlowWindow(),
1434 }
1435 }
1436
1437 func (t *http2Server) incrMsgSent() {
1438 if channelz.IsOn() {
1439 t.channelz.SocketMetrics.MessagesSent.Add(1)
1440 t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
1441 }
1442 }
1443
1444 func (t *http2Server) incrMsgRecv() {
1445 if channelz.IsOn() {
1446 t.channelz.SocketMetrics.MessagesReceived.Add(1)
1447 t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
1448 }
1449 }
1450
1451 func (t *http2Server) getOutFlowWindow() int64 {
1452 resp := make(chan uint32, 1)
1453 timer := time.NewTimer(time.Second)
1454 defer timer.Stop()
1455 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1456 select {
1457 case sz := <-resp:
1458 return int64(sz)
1459 case <-t.done:
1460 return -1
1461 case <-timer.C:
1462 return -2
1463 }
1464 }
1465
1466 // Peer returns the peer of the transport.
1467 func (t *http2Server) Peer() *peer.Peer {
1468 return &peer.Peer{
1469 Addr: t.peer.Addr,
1470 LocalAddr: t.peer.LocalAddr,
1471 AuthInfo: t.peer.AuthInfo, // Can be nil
1472 }
1473 }
1474
1475 func getJitter(v time.Duration) time.Duration {
1476 if v == infinity {
1477 return 0
1478 }
1479 // Generate a jitter between +/- 10% of the value.
1480 r := int64(v / 10)
1481 j := rand.Int64N(2*r) - r
1482 return time.Duration(j)
1483 }
1484
1485 type connectionKey struct{}
1486
1487 // GetConnection gets the connection from the context.
1488 func GetConnection(ctx context.Context) net.Conn {
1489 conn, _ := ctx.Value(connectionKey{}).(net.Conn)
1490 return conn
1491 }
1492
1493 // SetConnection adds the connection to the context to be able to get
1494 // information about the destination ip and port for an incoming RPC. This also
1495 // allows any unary or streaming interceptors to see the connection.
1496 func SetConnection(ctx context.Context, conn net.Conn) context.Context {
1497 return context.WithValue(ctx, connectionKey{}, conn)
1498 }
1499