1 /*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 // Package transport defines and implements message oriented communication
20 // channel to complete various transactions (e.g., an RPC). It is meant for
21 // grpc-internal usage and is not intended to be imported directly by users.
22 package transport
23 24 import (
25 "context"
26 "errors"
27 "fmt"
28 "io"
29 "net"
30 "sync"
31 "sync/atomic"
32 "time"
33 34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/credentials"
36 "google.golang.org/grpc/internal/channelz"
37 "google.golang.org/grpc/keepalive"
38 "google.golang.org/grpc/mem"
39 "google.golang.org/grpc/metadata"
40 "google.golang.org/grpc/peer"
41 "google.golang.org/grpc/stats"
42 "google.golang.org/grpc/status"
43 "google.golang.org/grpc/tap"
44 )
45 46 const logLevel = 2
47 48 // recvMsg represents the received msg from the transport. All transport
49 // protocol specific info has been removed.
50 type recvMsg struct {
51 buffer mem.Buffer
52 // nil: received some data
53 // io.EOF: stream is completed. data is nil.
54 // other non-nil error: transport failure. data is nil.
55 err error
56 }
57 58 // recvBuffer is an unbounded channel of recvMsg structs.
59 //
60 // Note: recvBuffer differs from buffer.Unbounded only in the fact that it
61 // holds a channel of recvMsg structs instead of objects implementing "item"
62 // interface. recvBuffer is written to much more often and using strict recvMsg
63 // structs helps avoid allocation in "recvBuffer.put"
64 type recvBuffer struct {
65 c chan recvMsg
66 mu sync.Mutex
67 backlog []recvMsg
68 err error
69 }
70 71 // init allows a recvBuffer to be initialized in-place, which is useful
72 // for resetting a buffer or for avoiding a heap allocation when the buffer
73 // is embedded in another struct.
74 func (b *recvBuffer) init() {
75 b.c = make(chan recvMsg, 1)
76 }
77 78 func (b *recvBuffer) put(r recvMsg) {
79 b.mu.Lock()
80 if b.err != nil {
81 // drop the buffer on the floor. Since b.err is not nil, any subsequent reads
82 // will always return an error, making this buffer inaccessible.
83 r.buffer.Free()
84 b.mu.Unlock()
85 // An error had occurred earlier, don't accept more
86 // data or errors.
87 return
88 }
89 b.err = r.err
90 if len(b.backlog) == 0 {
91 select {
92 case b.c <- r:
93 b.mu.Unlock()
94 return
95 default:
96 }
97 }
98 b.backlog = append(b.backlog, r)
99 b.mu.Unlock()
100 }
101 102 func (b *recvBuffer) load() {
103 b.mu.Lock()
104 if len(b.backlog) > 0 {
105 select {
106 case b.c <- b.backlog[0]:
107 b.backlog[0] = recvMsg{}
108 b.backlog = b.backlog[1:]
109 default:
110 }
111 }
112 b.mu.Unlock()
113 }
114 115 // get returns the channel that receives a recvMsg in the buffer.
116 //
117 // Upon receipt of a recvMsg, the caller should call load to send another
118 // recvMsg onto the channel if there is any.
119 func (b *recvBuffer) get() <-chan recvMsg {
120 return b.c
121 }
122 123 // recvBufferReader implements io.Reader interface to read the data from
124 // recvBuffer.
125 type recvBufferReader struct {
126 _ noCopy
127 clientStream *ClientStream // The client transport stream is closed with a status representing ctx.Err() and nil trailer metadata.
128 ctx context.Context
129 ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
130 recv *recvBuffer
131 last mem.Buffer // Stores the remaining data in the previous calls.
132 err error
133 }
134 135 func (r *recvBufferReader) ReadMessageHeader(header []byte) (n int, err error) {
136 if r.err != nil {
137 return 0, r.err
138 }
139 if r.last != nil {
140 n, r.last = mem.ReadUnsafe(header, r.last)
141 return n, nil
142 }
143 if r.clientStream != nil {
144 n, r.err = r.readMessageHeaderClient(header)
145 } else {
146 n, r.err = r.readMessageHeader(header)
147 }
148 return n, r.err
149 }
150 151 // Read reads the next n bytes from last. If last is drained, it tries to read
152 // additional data from recv. It blocks if there no additional data available in
153 // recv. If Read returns any non-nil error, it will continue to return that
154 // error.
155 func (r *recvBufferReader) Read(n int) (buf mem.Buffer, err error) {
156 if r.err != nil {
157 return nil, r.err
158 }
159 if r.last != nil {
160 buf = r.last
161 if r.last.Len() > n {
162 buf, r.last = mem.SplitUnsafe(buf, n)
163 } else {
164 r.last = nil
165 }
166 return buf, nil
167 }
168 if r.clientStream != nil {
169 buf, r.err = r.readClient(n)
170 } else {
171 buf, r.err = r.read(n)
172 }
173 return buf, r.err
174 }
175 176 func (r *recvBufferReader) readMessageHeader(header []byte) (n int, err error) {
177 select {
178 case <-r.ctxDone:
179 return 0, ContextErr(r.ctx.Err())
180 case m := <-r.recv.get():
181 return r.readMessageHeaderAdditional(m, header)
182 }
183 }
184 185 func (r *recvBufferReader) read(n int) (buf mem.Buffer, err error) {
186 select {
187 case <-r.ctxDone:
188 return nil, ContextErr(r.ctx.Err())
189 case m := <-r.recv.get():
190 return r.readAdditional(m, n)
191 }
192 }
193 194 func (r *recvBufferReader) readMessageHeaderClient(header []byte) (n int, err error) {
195 // If the context is canceled, then closes the stream with nil metadata.
196 // closeStream writes its error parameter to r.recv as a recvMsg.
197 // r.readAdditional acts on that message and returns the necessary error.
198 select {
199 case <-r.ctxDone:
200 // Note that this adds the ctx error to the end of recv buffer, and
201 // reads from the head. This will delay the error until recv buffer is
202 // empty, thus will delay ctx cancellation in Recv().
203 //
204 // It's done this way to fix a race between ctx cancel and trailer. The
205 // race was, stream.Recv() may return ctx error if ctxDone wins the
206 // race, but stream.Trailer() may return a non-nil md because the stream
207 // was not marked as done when trailer is received. This closeStream
208 // call will mark stream as done, thus fix the race.
209 //
210 // TODO: delaying ctx error seems like a unnecessary side effect. What
211 // we really want is to mark the stream as done, and return ctx error
212 // faster.
213 r.clientStream.Close(ContextErr(r.ctx.Err()))
214 m := <-r.recv.get()
215 return r.readMessageHeaderAdditional(m, header)
216 case m := <-r.recv.get():
217 return r.readMessageHeaderAdditional(m, header)
218 }
219 }
220 221 func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) {
222 // If the context is canceled, then closes the stream with nil metadata.
223 // closeStream writes its error parameter to r.recv as a recvMsg.
224 // r.readAdditional acts on that message and returns the necessary error.
225 select {
226 case <-r.ctxDone:
227 // Note that this adds the ctx error to the end of recv buffer, and
228 // reads from the head. This will delay the error until recv buffer is
229 // empty, thus will delay ctx cancellation in Recv().
230 //
231 // It's done this way to fix a race between ctx cancel and trailer. The
232 // race was, stream.Recv() may return ctx error if ctxDone wins the
233 // race, but stream.Trailer() may return a non-nil md because the stream
234 // was not marked as done when trailer is received. This closeStream
235 // call will mark stream as done, thus fix the race.
236 //
237 // TODO: delaying ctx error seems like a unnecessary side effect. What
238 // we really want is to mark the stream as done, and return ctx error
239 // faster.
240 r.clientStream.Close(ContextErr(r.ctx.Err()))
241 m := <-r.recv.get()
242 return r.readAdditional(m, n)
243 case m := <-r.recv.get():
244 return r.readAdditional(m, n)
245 }
246 }
247 248 func (r *recvBufferReader) readMessageHeaderAdditional(m recvMsg, header []byte) (n int, err error) {
249 r.recv.load()
250 if m.err != nil {
251 if m.buffer != nil {
252 m.buffer.Free()
253 }
254 return 0, m.err
255 }
256 257 n, r.last = mem.ReadUnsafe(header, m.buffer)
258 259 return n, nil
260 }
261 262 func (r *recvBufferReader) readAdditional(m recvMsg, n int) (b mem.Buffer, err error) {
263 r.recv.load()
264 if m.err != nil {
265 if m.buffer != nil {
266 m.buffer.Free()
267 }
268 return nil, m.err
269 }
270 271 if m.buffer.Len() > n {
272 m.buffer, r.last = mem.SplitUnsafe(m.buffer, n)
273 }
274 275 return m.buffer, nil
276 }
277 278 type streamState uint32
279 280 const (
281 streamActive streamState = iota
282 streamWriteDone // EndStream sent
283 streamReadDone // EndStream received
284 streamDone // the entire stream is finished.
285 )
286 287 // Stream represents an RPC in the transport layer.
288 type Stream struct {
289 ctx context.Context // the associated context of the stream
290 method string // the associated RPC method of the stream
291 recvCompress string
292 sendCompress string
293 294 readRequester readRequester
295 296 // contentSubtype is the content-subtype for requests.
297 // this must be lowercase or the behavior is undefined.
298 contentSubtype string
299 300 trailer metadata.MD // the key-value map of trailer metadata.
301 302 // Non-pointer fields are at the end to optimize GC performance.
303 state streamState
304 id uint32
305 buf recvBuffer
306 trReader transportReader
307 fc inFlow
308 wq writeQuota
309 }
310 311 // readRequester is used to state application's intentions to read data. This
312 // is used to adjust flow control, if needed.
313 type readRequester interface {
314 requestRead(int)
315 }
316 317 func (s *Stream) swapState(st streamState) streamState {
318 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
319 }
320 321 func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
322 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
323 }
324 325 func (s *Stream) getState() streamState {
326 return streamState(atomic.LoadUint32((*uint32)(&s.state)))
327 }
328 329 // Trailer returns the cached trailer metadata. Note that if it is not called
330 // after the entire stream is done, it could return an empty MD.
331 // It can be safely read only after stream has ended that is either read
332 // or write have returned io.EOF.
333 func (s *Stream) Trailer() metadata.MD {
334 return s.trailer.Copy()
335 }
336 337 // Context returns the context of the stream.
338 func (s *Stream) Context() context.Context {
339 return s.ctx
340 }
341 342 // Method returns the method for the stream.
343 func (s *Stream) Method() string {
344 return s.method
345 }
346 347 func (s *Stream) write(m recvMsg) {
348 s.buf.put(m)
349 }
350 351 // ReadMessageHeader reads data into the provided header slice from the stream.
352 // It first checks if there was an error during a previous read operation and
353 // returns it if present. It then requests a read operation for the length of
354 // the header. It continues to read from the stream until the entire header
355 // slice is filled or an error occurs. If an `io.EOF` error is encountered with
356 // partially read data, it is converted to `io.ErrUnexpectedEOF` to indicate an
357 // unexpected end of the stream. The method returns any error encountered during
358 // the read process or nil if the header was successfully read.
359 func (s *Stream) ReadMessageHeader(header []byte) (err error) {
360 // Don't request a read if there was an error earlier
361 if er := s.trReader.er; er != nil {
362 return er
363 }
364 s.readRequester.requestRead(len(header))
365 for len(header) != 0 {
366 n, err := s.trReader.ReadMessageHeader(header)
367 header = header[n:]
368 if len(header) == 0 {
369 err = nil
370 }
371 if err != nil {
372 if n > 0 && err == io.EOF {
373 err = io.ErrUnexpectedEOF
374 }
375 return err
376 }
377 }
378 return nil
379 }
380 381 // Read reads n bytes from the wire for this stream.
382 func (s *Stream) read(n int) (data mem.BufferSlice, err error) {
383 // Don't request a read if there was an error earlier
384 if er := s.trReader.er; er != nil {
385 return nil, er
386 }
387 s.readRequester.requestRead(n)
388 for n != 0 {
389 buf, err := s.trReader.Read(n)
390 var bufLen int
391 if buf != nil {
392 bufLen = buf.Len()
393 }
394 n -= bufLen
395 if n == 0 {
396 err = nil
397 }
398 if err != nil {
399 if bufLen > 0 && err == io.EOF {
400 err = io.ErrUnexpectedEOF
401 }
402 data.Free()
403 return nil, err
404 }
405 data = append(data, buf)
406 }
407 return data, nil
408 }
409 410 // noCopy may be embedded into structs which must not be copied
411 // after the first use.
412 //
413 // See https://golang.org/issues/8005#issuecomment-190753527
414 // for details.
415 type noCopy struct {
416 }
417 418 func (*noCopy) Lock() {}
419 func (*noCopy) Unlock() {}
420 421 // transportReader reads all the data available for this Stream from the transport and
422 // passes them into the decoder, which converts them into a gRPC message stream.
423 // The error is io.EOF when the stream is done or another non-nil error if
424 // the stream broke.
425 type transportReader struct {
426 _ noCopy
427 // The handler to control the window update procedure for both this
428 // particular stream and the associated transport.
429 windowHandler windowHandler
430 er error
431 reader recvBufferReader
432 }
433 434 // The handler to control the window update procedure for both this
435 // particular stream and the associated transport.
436 type windowHandler interface {
437 updateWindow(int)
438 }
439 440 func (t *transportReader) ReadMessageHeader(header []byte) (int, error) {
441 n, err := t.reader.ReadMessageHeader(header)
442 if err != nil {
443 t.er = err
444 return 0, err
445 }
446 t.windowHandler.updateWindow(n)
447 return n, nil
448 }
449 450 func (t *transportReader) Read(n int) (mem.Buffer, error) {
451 buf, err := t.reader.Read(n)
452 if err != nil {
453 t.er = err
454 return buf, err
455 }
456 t.windowHandler.updateWindow(buf.Len())
457 return buf, nil
458 }
459 460 // GoString is implemented by Stream so context.String() won't
461 // race when printing %#v.
462 func (s *Stream) GoString() string {
463 return fmt.Sprintf("<stream: %p, %v>", s, s.method)
464 }
465 466 // state of transport
467 type transportState int
468 469 const (
470 reachable transportState = iota
471 closing
472 draining
473 )
474 475 // ServerConfig consists of all the configurations to establish a server transport.
476 type ServerConfig struct {
477 MaxStreams uint32
478 ConnectionTimeout time.Duration
479 Credentials credentials.TransportCredentials
480 InTapHandle tap.ServerInHandle
481 StatsHandler stats.Handler
482 KeepaliveParams keepalive.ServerParameters
483 KeepalivePolicy keepalive.EnforcementPolicy
484 InitialWindowSize int32
485 InitialConnWindowSize int32
486 WriteBufferSize int
487 ReadBufferSize int
488 SharedWriteBuffer bool
489 ChannelzParent *channelz.Server
490 MaxHeaderListSize *uint32
491 HeaderTableSize *uint32
492 BufferPool mem.BufferPool
493 StaticWindowSize bool
494 }
495 496 // ConnectOptions covers all relevant options for communicating with the server.
497 type ConnectOptions struct {
498 // UserAgent is the application user agent.
499 UserAgent string
500 // Dialer specifies how to dial a network address.
501 Dialer func(context.Context, string) (net.Conn, error)
502 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
503 FailOnNonTempDialError bool
504 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
505 PerRPCCredentials []credentials.PerRPCCredentials
506 // TransportCredentials stores the Authenticator required to setup a client
507 // connection. Only one of TransportCredentials and CredsBundle is non-nil.
508 TransportCredentials credentials.TransportCredentials
509 // CredsBundle is the credentials bundle to be used. Only one of
510 // TransportCredentials and CredsBundle is non-nil.
511 CredsBundle credentials.Bundle
512 // KeepaliveParams stores the keepalive parameters.
513 KeepaliveParams keepalive.ClientParameters
514 // StatsHandlers stores the handler for stats.
515 StatsHandlers []stats.Handler
516 // InitialWindowSize sets the initial window size for a stream.
517 InitialWindowSize int32
518 // InitialConnWindowSize sets the initial window size for a connection.
519 InitialConnWindowSize int32
520 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
521 WriteBufferSize int
522 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
523 ReadBufferSize int
524 // SharedWriteBuffer indicates whether connections should reuse write buffer
525 SharedWriteBuffer bool
526 // ChannelzParent sets the addrConn id which initiated the creation of this client transport.
527 ChannelzParent *channelz.SubChannel
528 // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
529 MaxHeaderListSize *uint32
530 // The mem.BufferPool to use when reading/writing to the wire.
531 BufferPool mem.BufferPool
532 // StaticWindowSize controls whether dynamic window sizing is enabled.
533 StaticWindowSize bool
534 }
535 536 // WriteOptions provides additional hints and information for message
537 // transmission.
538 type WriteOptions struct {
539 // Last indicates whether this write is the last piece for
540 // this stream.
541 Last bool
542 }
543 544 // CallHdr carries the information of a particular RPC.
545 type CallHdr struct {
546 // Host specifies the peer's host.
547 Host string
548 549 // Method specifies the operation to perform.
550 Method string
551 552 // SendCompress specifies the compression algorithm applied on
553 // outbound message.
554 SendCompress string
555 556 // AcceptedCompressors overrides the grpc-accept-encoding header for this
557 // call. When nil, the transport advertises the default set of registered
558 // compressors. A non-nil pointer overrides that value (including the empty
559 // string to advertise none).
560 AcceptedCompressors *string
561 562 // Creds specifies credentials.PerRPCCredentials for a call.
563 Creds credentials.PerRPCCredentials
564 565 // ContentSubtype specifies the content-subtype for a request. For example, a
566 // content-subtype of "proto" will result in a content-type of
567 // "application/grpc+proto". The value of ContentSubtype must be all
568 // lowercase, otherwise the behavior is undefined. See
569 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
570 // for more details.
571 ContentSubtype string
572 573 PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
574 575 DoneFunc func() // called when the stream is finished
576 577 // Authority is used to explicitly override the `:authority` header. If set,
578 // this value takes precedence over the Host field and will be used as the
579 // value for the `:authority` header.
580 Authority string
581 }
582 583 // ClientTransport is the common interface for all gRPC client-side transport
584 // implementations.
585 type ClientTransport interface {
586 // Close tears down this transport. Once it returns, the transport
587 // should not be accessed any more. The caller must make sure this
588 // is called only once.
589 Close(err error)
590 591 // GracefulClose starts to tear down the transport: the transport will stop
592 // accepting new RPCs and NewStream will return error. Once all streams are
593 // finished, the transport will close.
594 //
595 // It does not block.
596 GracefulClose()
597 598 // NewStream creates a Stream for an RPC.
599 NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error)
600 601 // Error returns a channel that is closed when some I/O error
602 // happens. Typically the caller should have a goroutine to monitor
603 // this in order to take action (e.g., close the current transport
604 // and create a new one) in error case. It should not return nil
605 // once the transport is initiated.
606 Error() <-chan struct{}
607 608 // GoAway returns a channel that is closed when ClientTransport
609 // receives the draining signal from the server (e.g., GOAWAY frame in
610 // HTTP/2).
611 GoAway() <-chan struct{}
612 613 // GetGoAwayReason returns the reason why GoAway frame was received, along
614 // with a human readable string with debug info.
615 GetGoAwayReason() (GoAwayReason, string)
616 617 // Peer returns information about the peer associated with the Transport.
618 // The returned information includes authentication and network address details.
619 Peer() *peer.Peer
620 }
621 622 // ServerTransport is the common interface for all gRPC server-side transport
623 // implementations.
624 //
625 // Methods may be called concurrently from multiple goroutines, but
626 // Write methods for a given Stream will be called serially.
627 type ServerTransport interface {
628 // HandleStreams receives incoming streams using the given handler.
629 HandleStreams(context.Context, func(*ServerStream))
630 631 // Close tears down the transport. Once it is called, the transport
632 // should not be accessed any more. All the pending streams and their
633 // handlers will be terminated asynchronously.
634 Close(err error)
635 636 // Peer returns the peer of the server transport.
637 Peer() *peer.Peer
638 639 // Drain notifies the client this ServerTransport stops accepting new RPCs.
640 Drain(debugData string)
641 }
642 643 type internalServerTransport interface {
644 ServerTransport
645 writeHeader(s *ServerStream, md metadata.MD) error
646 write(s *ServerStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error
647 writeStatus(s *ServerStream, st *status.Status) error
648 incrMsgRecv()
649 adjustWindow(s *ServerStream, n uint32)
650 updateWindow(s *ServerStream, n uint32)
651 }
652 653 // connectionErrorf creates an ConnectionError with the specified error description.
654 func connectionErrorf(temp bool, e error, format string, a ...any) ConnectionError {
655 return ConnectionError{
656 Desc: fmt.Sprintf(format, a...),
657 temp: temp,
658 err: e,
659 }
660 }
661 662 // ConnectionError is an error that results in the termination of the
663 // entire connection and the retry of all the active streams.
664 type ConnectionError struct {
665 Desc string
666 temp bool
667 err error
668 }
669 670 func (e ConnectionError) Error() string {
671 return fmt.Sprintf("connection error: desc = %q", e.Desc)
672 }
673 674 // Temporary indicates if this connection error is temporary or fatal.
675 func (e ConnectionError) Temporary() bool {
676 return e.temp
677 }
678 679 // Origin returns the original error of this connection error.
680 func (e ConnectionError) Origin() error {
681 // Never return nil error here.
682 // If the original error is nil, return itself.
683 if e.err == nil {
684 return e
685 }
686 return e.err
687 }
688 689 // Unwrap returns the original error of this connection error or nil when the
690 // origin is nil.
691 func (e ConnectionError) Unwrap() error {
692 return e.err
693 }
694 695 var (
696 // ErrConnClosing indicates that the transport is closing.
697 ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
698 // errStreamDrain indicates that the stream is rejected because the
699 // connection is draining. This could be caused by goaway or balancer
700 // removing the address.
701 errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
702 // errStreamDone is returned from write at the client side to indicate application
703 // layer of an error.
704 errStreamDone = errors.New("the stream is done")
705 // StatusGoAway indicates that the server sent a GOAWAY that included this
706 // stream's ID in unprocessed RPCs.
707 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
708 )
709 710 // GoAwayReason contains the reason for the GoAway frame received.
711 type GoAwayReason uint8
712 713 const (
714 // GoAwayInvalid indicates that no GoAway frame is received.
715 GoAwayInvalid GoAwayReason = 0
716 // GoAwayNoReason is the default value when GoAway frame is received.
717 GoAwayNoReason GoAwayReason = 1
718 // GoAwayTooManyPings indicates that a GoAway frame with
719 // ErrCodeEnhanceYourCalm was received and that the debug data said
720 // "too_many_pings".
721 GoAwayTooManyPings GoAwayReason = 2
722 )
723 724 // ContextErr converts the error from context package into a status error.
725 func ContextErr(err error) error {
726 switch err {
727 case context.DeadlineExceeded:
728 return status.Error(codes.DeadlineExceeded, err.Error())
729 case context.Canceled:
730 return status.Error(codes.Canceled, err.Error())
731 }
732 return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
733 }
734