1 /*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 package grpc
20 21 import (
22 "compress/gzip"
23 "context"
24 "encoding/binary"
25 "fmt"
26 "io"
27 "math"
28 "strings"
29 "sync"
30 "time"
31 32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/credentials"
34 "google.golang.org/grpc/encoding"
35 "google.golang.org/grpc/encoding/proto"
36 "google.golang.org/grpc/internal"
37 "google.golang.org/grpc/internal/grpcutil"
38 "google.golang.org/grpc/internal/transport"
39 "google.golang.org/grpc/mem"
40 "google.golang.org/grpc/metadata"
41 "google.golang.org/grpc/peer"
42 "google.golang.org/grpc/stats"
43 "google.golang.org/grpc/status"
44 )
45 46 func init() {
47 internal.AcceptCompressors = acceptCompressors
48 }
49 50 // Compressor defines the interface gRPC uses to compress a message.
51 //
52 // Deprecated: use package encoding.
53 type Compressor interface {
54 // Do compresses p into w.
55 Do(w io.Writer, p []byte) error
56 // Type returns the compression algorithm the Compressor uses.
57 Type() string
58 }
59 60 type gzipCompressor struct {
61 pool sync.Pool
62 }
63 64 // NewGZIPCompressor creates a Compressor based on GZIP.
65 //
66 // Deprecated: use package encoding/gzip.
67 func NewGZIPCompressor() Compressor {
68 c, _ := NewGZIPCompressorWithLevel(gzip.DefaultCompression)
69 return c
70 }
71 72 // NewGZIPCompressorWithLevel is like NewGZIPCompressor but specifies the gzip compression level instead
73 // of assuming DefaultCompression.
74 //
75 // The error returned will be nil if the level is valid.
76 //
77 // Deprecated: use package encoding/gzip.
78 func NewGZIPCompressorWithLevel(level int) (Compressor, error) {
79 if level < gzip.DefaultCompression || level > gzip.BestCompression {
80 return nil, fmt.Errorf("grpc: invalid compression level: %d", level)
81 }
82 return &gzipCompressor{
83 pool: sync.Pool{
84 New: func() any {
85 w, err := gzip.NewWriterLevel(io.Discard, level)
86 if err != nil {
87 panic(err)
88 }
89 return w
90 },
91 },
92 }, nil
93 }
94 95 func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
96 z := c.pool.Get().(*gzip.Writer)
97 defer c.pool.Put(z)
98 z.Reset(w)
99 if _, err := z.Write(p); err != nil {
100 return err
101 }
102 return z.Close()
103 }
104 105 func (c *gzipCompressor) Type() string {
106 return "gzip"
107 }
108 109 // Decompressor defines the interface gRPC uses to decompress a message.
110 //
111 // Deprecated: use package encoding.
112 type Decompressor interface {
113 // Do reads the data from r and uncompress them.
114 Do(r io.Reader) ([]byte, error)
115 // Type returns the compression algorithm the Decompressor uses.
116 Type() string
117 }
118 119 type gzipDecompressor struct {
120 pool sync.Pool
121 }
122 123 // NewGZIPDecompressor creates a Decompressor based on GZIP.
124 //
125 // Deprecated: use package encoding/gzip.
126 func NewGZIPDecompressor() Decompressor {
127 return &gzipDecompressor{}
128 }
129 130 func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) {
131 var z *gzip.Reader
132 switch maybeZ := d.pool.Get().(type) {
133 case nil:
134 newZ, err := gzip.NewReader(r)
135 if err != nil {
136 return nil, err
137 }
138 z = newZ
139 case *gzip.Reader:
140 z = maybeZ
141 if err := z.Reset(r); err != nil {
142 d.pool.Put(z)
143 return nil, err
144 }
145 }
146 147 defer func() {
148 z.Close()
149 d.pool.Put(z)
150 }()
151 return io.ReadAll(z)
152 }
153 154 func (d *gzipDecompressor) Type() string {
155 return "gzip"
156 }
157 158 // callInfo contains all related configuration and information about an RPC.
159 type callInfo struct {
160 compressorName string
161 failFast bool
162 maxReceiveMessageSize *int
163 maxSendMessageSize *int
164 creds credentials.PerRPCCredentials
165 contentSubtype string
166 codec baseCodec
167 maxRetryRPCBufferSize int
168 onFinish []func(err error)
169 authority string
170 acceptedResponseCompressors []string
171 }
172 173 func acceptedCompressorAllows(allowed []string, name string) bool {
174 if allowed == nil {
175 return true
176 }
177 if name == "" || name == encoding.Identity {
178 return true
179 }
180 for _, a := range allowed {
181 if a == name {
182 return true
183 }
184 }
185 return false
186 }
187 188 func defaultCallInfo() *callInfo {
189 return &callInfo{
190 failFast: true,
191 maxRetryRPCBufferSize: 256 * 1024, // 256KB
192 }
193 }
194 195 func newAcceptedCompressionConfig(names []string) ([]string, error) {
196 if len(names) == 0 {
197 return nil, nil
198 }
199 var allowed []string
200 seen := make(map[string]struct{}, len(names))
201 for _, name := range names {
202 name = strings.TrimSpace(name)
203 if name == "" || name == encoding.Identity {
204 continue
205 }
206 if !grpcutil.IsCompressorNameRegistered(name) {
207 return nil, status.Errorf(codes.InvalidArgument, "grpc: compressor %q is not registered", name)
208 }
209 if _, dup := seen[name]; dup {
210 continue
211 }
212 seen[name] = struct{}{}
213 allowed = append(allowed, name)
214 }
215 return allowed, nil
216 }
217 218 // CallOption configures a Call before it starts or extracts information from
219 // a Call after it completes.
220 type CallOption interface {
221 // before is called before the call is sent to any server. If before
222 // returns a non-nil error, the RPC fails with that error.
223 before(*callInfo) error
224 225 // after is called after the call has completed. after cannot return an
226 // error, so any failures should be reported via output parameters.
227 after(*callInfo, *csAttempt)
228 }
229 230 // EmptyCallOption does not alter the Call configuration.
231 // It can be embedded in another structure to carry satellite data for use
232 // by interceptors.
233 type EmptyCallOption struct{}
234 235 func (EmptyCallOption) before(*callInfo) error { return nil }
236 func (EmptyCallOption) after(*callInfo, *csAttempt) {}
237 238 // StaticMethod returns a CallOption which specifies that a call is being made
239 // to a method that is static, which means the method is known at compile time
240 // and doesn't change at runtime. This can be used as a signal to stats plugins
241 // that this method is safe to include as a key to a measurement.
242 func StaticMethod() CallOption {
243 return StaticMethodCallOption{}
244 }
245 246 // StaticMethodCallOption is a CallOption that specifies that a call comes
247 // from a static method.
248 type StaticMethodCallOption struct {
249 EmptyCallOption
250 }
251 252 // Header returns a CallOptions that retrieves the header metadata
253 // for a unary RPC.
254 func Header(md *metadata.MD) CallOption {
255 return HeaderCallOption{HeaderAddr: md}
256 }
257 258 // HeaderCallOption is a CallOption for collecting response header metadata.
259 // The metadata field will be populated *after* the RPC completes.
260 //
261 // # Experimental
262 //
263 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
264 // later release.
265 type HeaderCallOption struct {
266 HeaderAddr *metadata.MD
267 }
268 269 func (o HeaderCallOption) before(*callInfo) error { return nil }
270 func (o HeaderCallOption) after(_ *callInfo, attempt *csAttempt) {
271 *o.HeaderAddr, _ = attempt.transportStream.Header()
272 }
273 274 // Trailer returns a CallOptions that retrieves the trailer metadata
275 // for a unary RPC.
276 func Trailer(md *metadata.MD) CallOption {
277 return TrailerCallOption{TrailerAddr: md}
278 }
279 280 // TrailerCallOption is a CallOption for collecting response trailer metadata.
281 // The metadata field will be populated *after* the RPC completes.
282 //
283 // # Experimental
284 //
285 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
286 // later release.
287 type TrailerCallOption struct {
288 TrailerAddr *metadata.MD
289 }
290 291 func (o TrailerCallOption) before(*callInfo) error { return nil }
292 func (o TrailerCallOption) after(_ *callInfo, attempt *csAttempt) {
293 *o.TrailerAddr = attempt.transportStream.Trailer()
294 }
295 296 // Peer returns a CallOption that retrieves peer information for a unary RPC.
297 // The peer field will be populated *after* the RPC completes.
298 func Peer(p *peer.Peer) CallOption {
299 return PeerCallOption{PeerAddr: p}
300 }
301 302 // PeerCallOption is a CallOption for collecting the identity of the remote
303 // peer. The peer field will be populated *after* the RPC completes.
304 //
305 // # Experimental
306 //
307 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
308 // later release.
309 type PeerCallOption struct {
310 PeerAddr *peer.Peer
311 }
312 313 func (o PeerCallOption) before(*callInfo) error { return nil }
314 func (o PeerCallOption) after(_ *callInfo, attempt *csAttempt) {
315 if x, ok := peer.FromContext(attempt.transportStream.Context()); ok {
316 *o.PeerAddr = *x
317 }
318 }
319 320 // WaitForReady configures the RPC's behavior when the client is in
321 // TRANSIENT_FAILURE, which occurs when all addresses fail to connect. If
322 // waitForReady is false, the RPC will fail immediately. Otherwise, the client
323 // will wait until a connection becomes available or the RPC's deadline is
324 // reached.
325 //
326 // By default, RPCs do not "wait for ready".
327 func WaitForReady(waitForReady bool) CallOption {
328 return FailFastCallOption{FailFast: !waitForReady}
329 }
330 331 // FailFast is the opposite of WaitForReady.
332 //
333 // Deprecated: use WaitForReady.
334 func FailFast(failFast bool) CallOption {
335 return FailFastCallOption{FailFast: failFast}
336 }
337 338 // FailFastCallOption is a CallOption for indicating whether an RPC should fail
339 // fast or not.
340 //
341 // # Experimental
342 //
343 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
344 // later release.
345 type FailFastCallOption struct {
346 FailFast bool
347 }
348 349 func (o FailFastCallOption) before(c *callInfo) error {
350 c.failFast = o.FailFast
351 return nil
352 }
353 func (o FailFastCallOption) after(*callInfo, *csAttempt) {}
354 355 // OnFinish returns a CallOption that configures a callback to be called when
356 // the call completes. The error passed to the callback is the status of the
357 // RPC, and may be nil. The onFinish callback provided will only be called once
358 // by gRPC. This is mainly used to be used by streaming interceptors, to be
359 // notified when the RPC completes along with information about the status of
360 // the RPC.
361 //
362 // # Experimental
363 //
364 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
365 // later release.
366 func OnFinish(onFinish func(err error)) CallOption {
367 return OnFinishCallOption{
368 OnFinish: onFinish,
369 }
370 }
371 372 // OnFinishCallOption is CallOption that indicates a callback to be called when
373 // the call completes.
374 //
375 // # Experimental
376 //
377 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
378 // later release.
379 type OnFinishCallOption struct {
380 OnFinish func(error)
381 }
382 383 func (o OnFinishCallOption) before(c *callInfo) error {
384 c.onFinish = append(c.onFinish, o.OnFinish)
385 return nil
386 }
387 388 func (o OnFinishCallOption) after(*callInfo, *csAttempt) {}
389 390 // MaxCallRecvMsgSize returns a CallOption which sets the maximum message size
391 // in bytes the client can receive. If this is not set, gRPC uses the default
392 // 4MB.
393 func MaxCallRecvMsgSize(bytes int) CallOption {
394 return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: bytes}
395 }
396 397 // MaxRecvMsgSizeCallOption is a CallOption that indicates the maximum message
398 // size in bytes the client can receive.
399 //
400 // # Experimental
401 //
402 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
403 // later release.
404 type MaxRecvMsgSizeCallOption struct {
405 MaxRecvMsgSize int
406 }
407 408 func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error {
409 c.maxReceiveMessageSize = &o.MaxRecvMsgSize
410 return nil
411 }
412 func (o MaxRecvMsgSizeCallOption) after(*callInfo, *csAttempt) {}
413 414 // CallAuthority returns a CallOption that sets the HTTP/2 :authority header of
415 // an RPC to the specified value. When using CallAuthority, the credentials in
416 // use must implement the AuthorityValidator interface.
417 //
418 // # Experimental
419 //
420 // Notice: This API is EXPERIMENTAL and may be changed or removed in a later
421 // release.
422 func CallAuthority(authority string) CallOption {
423 return AuthorityOverrideCallOption{Authority: authority}
424 }
425 426 // AuthorityOverrideCallOption is a CallOption that indicates the HTTP/2
427 // :authority header value to use for the call.
428 //
429 // # Experimental
430 //
431 // Notice: This type is EXPERIMENTAL and may be changed or removed in a later
432 // release.
433 type AuthorityOverrideCallOption struct {
434 Authority string
435 }
436 437 func (o AuthorityOverrideCallOption) before(c *callInfo) error {
438 c.authority = o.Authority
439 return nil
440 }
441 442 func (o AuthorityOverrideCallOption) after(*callInfo, *csAttempt) {}
443 444 // MaxCallSendMsgSize returns a CallOption which sets the maximum message size
445 // in bytes the client can send. If this is not set, gRPC uses the default
446 // `math.MaxInt32`.
447 func MaxCallSendMsgSize(bytes int) CallOption {
448 return MaxSendMsgSizeCallOption{MaxSendMsgSize: bytes}
449 }
450 451 // MaxSendMsgSizeCallOption is a CallOption that indicates the maximum message
452 // size in bytes the client can send.
453 //
454 // # Experimental
455 //
456 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
457 // later release.
458 type MaxSendMsgSizeCallOption struct {
459 MaxSendMsgSize int
460 }
461 462 func (o MaxSendMsgSizeCallOption) before(c *callInfo) error {
463 c.maxSendMessageSize = &o.MaxSendMsgSize
464 return nil
465 }
466 func (o MaxSendMsgSizeCallOption) after(*callInfo, *csAttempt) {}
467 468 // PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials
469 // for a call.
470 func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption {
471 return PerRPCCredsCallOption{Creds: creds}
472 }
473 474 // PerRPCCredsCallOption is a CallOption that indicates the per-RPC
475 // credentials to use for the call.
476 //
477 // # Experimental
478 //
479 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
480 // later release.
481 type PerRPCCredsCallOption struct {
482 Creds credentials.PerRPCCredentials
483 }
484 485 func (o PerRPCCredsCallOption) before(c *callInfo) error {
486 c.creds = o.Creds
487 return nil
488 }
489 func (o PerRPCCredsCallOption) after(*callInfo, *csAttempt) {}
490 491 // UseCompressor returns a CallOption which sets the compressor used when
492 // sending the request. If WithCompressor is also set, UseCompressor has
493 // higher priority.
494 //
495 // # Experimental
496 //
497 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
498 // later release.
499 func UseCompressor(name string) CallOption {
500 return CompressorCallOption{CompressorType: name}
501 }
502 503 // CompressorCallOption is a CallOption that indicates the compressor to use.
504 //
505 // # Experimental
506 //
507 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
508 // later release.
509 type CompressorCallOption struct {
510 CompressorType string
511 }
512 513 func (o CompressorCallOption) before(c *callInfo) error {
514 c.compressorName = o.CompressorType
515 return nil
516 }
517 func (o CompressorCallOption) after(*callInfo, *csAttempt) {}
518 519 // acceptCompressors returns a CallOption that limits the compression algorithms
520 // advertised in the grpc-accept-encoding header for response messages.
521 // Compression algorithms not in the provided list will not be advertised, and
522 // responses compressed with non-listed algorithms will be rejected.
523 func acceptCompressors(names ...string) CallOption {
524 cp := append([]string(nil), names...)
525 return acceptCompressorsCallOption{names: cp}
526 }
527 528 // acceptCompressorsCallOption is a CallOption that limits response compression.
529 type acceptCompressorsCallOption struct {
530 names []string
531 }
532 533 func (o acceptCompressorsCallOption) before(c *callInfo) error {
534 allowed, err := newAcceptedCompressionConfig(o.names)
535 if err != nil {
536 return err
537 }
538 c.acceptedResponseCompressors = allowed
539 return nil
540 }
541 542 func (acceptCompressorsCallOption) after(*callInfo, *csAttempt) {}
543 544 // CallContentSubtype returns a CallOption that will set the content-subtype
545 // for a call. For example, if content-subtype is "json", the Content-Type over
546 // the wire will be "application/grpc+json". The content-subtype is converted
547 // to lowercase before being included in Content-Type. See Content-Type on
548 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
549 // more details.
550 //
551 // If ForceCodec is not also used, the content-subtype will be used to look up
552 // the Codec to use in the registry controlled by RegisterCodec. See the
553 // documentation on RegisterCodec for details on registration. The lookup of
554 // content-subtype is case-insensitive. If no such Codec is found, the call
555 // will result in an error with code codes.Internal.
556 //
557 // If ForceCodec is also used, that Codec will be used for all request and
558 // response messages, with the content-subtype set to the given contentSubtype
559 // here for requests.
560 func CallContentSubtype(contentSubtype string) CallOption {
561 return ContentSubtypeCallOption{ContentSubtype: strings.ToLower(contentSubtype)}
562 }
563 564 // ContentSubtypeCallOption is a CallOption that indicates the content-subtype
565 // used for marshaling messages.
566 //
567 // # Experimental
568 //
569 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
570 // later release.
571 type ContentSubtypeCallOption struct {
572 ContentSubtype string
573 }
574 575 func (o ContentSubtypeCallOption) before(c *callInfo) error {
576 c.contentSubtype = o.ContentSubtype
577 return nil
578 }
579 func (o ContentSubtypeCallOption) after(*callInfo, *csAttempt) {}
580 581 // ForceCodec returns a CallOption that will set codec to be used for all
582 // request and response messages for a call. The result of calling Name() will
583 // be used as the content-subtype after converting to lowercase, unless
584 // CallContentSubtype is also used.
585 //
586 // See Content-Type on
587 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
588 // more details. Also see the documentation on RegisterCodec and
589 // CallContentSubtype for more details on the interaction between Codec and
590 // content-subtype.
591 //
592 // This function is provided for advanced users; prefer to use only
593 // CallContentSubtype to select a registered codec instead.
594 //
595 // # Experimental
596 //
597 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
598 // later release.
599 func ForceCodec(codec encoding.Codec) CallOption {
600 return ForceCodecCallOption{Codec: codec}
601 }
602 603 // ForceCodecCallOption is a CallOption that indicates the codec used for
604 // marshaling messages.
605 //
606 // # Experimental
607 //
608 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
609 // later release.
610 type ForceCodecCallOption struct {
611 Codec encoding.Codec
612 }
613 614 func (o ForceCodecCallOption) before(c *callInfo) error {
615 c.codec = newCodecV1Bridge(o.Codec)
616 return nil
617 }
618 func (o ForceCodecCallOption) after(*callInfo, *csAttempt) {}
619 620 // ForceCodecV2 returns a CallOption that will set codec to be used for all
621 // request and response messages for a call. The result of calling Name() will
622 // be used as the content-subtype after converting to lowercase, unless
623 // CallContentSubtype is also used.
624 //
625 // See Content-Type on
626 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
627 // more details. Also see the documentation on RegisterCodec and
628 // CallContentSubtype for more details on the interaction between Codec and
629 // content-subtype.
630 //
631 // This function is provided for advanced users; prefer to use only
632 // CallContentSubtype to select a registered codec instead.
633 //
634 // # Experimental
635 //
636 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
637 // later release.
638 func ForceCodecV2(codec encoding.CodecV2) CallOption {
639 return ForceCodecV2CallOption{CodecV2: codec}
640 }
641 642 // ForceCodecV2CallOption is a CallOption that indicates the codec used for
643 // marshaling messages.
644 //
645 // # Experimental
646 //
647 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
648 // later release.
649 type ForceCodecV2CallOption struct {
650 CodecV2 encoding.CodecV2
651 }
652 653 func (o ForceCodecV2CallOption) before(c *callInfo) error {
654 c.codec = o.CodecV2
655 return nil
656 }
657 658 func (o ForceCodecV2CallOption) after(*callInfo, *csAttempt) {}
659 660 // CallCustomCodec behaves like ForceCodec, but accepts a grpc.Codec instead of
661 // an encoding.Codec.
662 //
663 // Deprecated: use ForceCodec instead.
664 func CallCustomCodec(codec Codec) CallOption {
665 return CustomCodecCallOption{Codec: codec}
666 }
667 668 // CustomCodecCallOption is a CallOption that indicates the codec used for
669 // marshaling messages.
670 //
671 // # Experimental
672 //
673 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
674 // later release.
675 type CustomCodecCallOption struct {
676 Codec Codec
677 }
678 679 func (o CustomCodecCallOption) before(c *callInfo) error {
680 c.codec = newCodecV0Bridge(o.Codec)
681 return nil
682 }
683 func (o CustomCodecCallOption) after(*callInfo, *csAttempt) {}
684 685 // MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory
686 // used for buffering this RPC's requests for retry purposes.
687 //
688 // # Experimental
689 //
690 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
691 // later release.
692 func MaxRetryRPCBufferSize(bytes int) CallOption {
693 return MaxRetryRPCBufferSizeCallOption{bytes}
694 }
695 696 // MaxRetryRPCBufferSizeCallOption is a CallOption indicating the amount of
697 // memory to be used for caching this RPC for retry purposes.
698 //
699 // # Experimental
700 //
701 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
702 // later release.
703 type MaxRetryRPCBufferSizeCallOption struct {
704 MaxRetryRPCBufferSize int
705 }
706 707 func (o MaxRetryRPCBufferSizeCallOption) before(c *callInfo) error {
708 c.maxRetryRPCBufferSize = o.MaxRetryRPCBufferSize
709 return nil
710 }
711 func (o MaxRetryRPCBufferSizeCallOption) after(*callInfo, *csAttempt) {}
712 713 // The format of the payload: compressed or not?
714 type payloadFormat uint8
715 716 const (
717 compressionNone payloadFormat = 0 // no compression
718 compressionMade payloadFormat = 1 // compressed
719 )
720 721 func (pf payloadFormat) isCompressed() bool {
722 return pf == compressionMade
723 }
724 725 type streamReader interface {
726 ReadMessageHeader(header []byte) error
727 Read(n int) (mem.BufferSlice, error)
728 }
729 730 // noCopy may be embedded into structs which must not be copied
731 // after the first use.
732 //
733 // See https://golang.org/issues/8005#issuecomment-190753527
734 // for details.
735 type noCopy struct {
736 }
737 738 func (*noCopy) Lock() {}
739 func (*noCopy) Unlock() {}
740 741 // parser reads complete gRPC messages from the underlying reader.
742 type parser struct {
743 _ noCopy
744 // r is the underlying reader.
745 // See the comment on recvMsg for the permissible
746 // error types.
747 r streamReader
748 749 // The header of a gRPC message. Find more detail at
750 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
751 header [5]byte
752 753 // bufferPool is the pool of shared receive buffers.
754 bufferPool mem.BufferPool
755 }
756 757 // recvMsg reads a complete gRPC message from the stream.
758 //
759 // It returns the message and its payload (compression/encoding)
760 // format. The caller owns the returned msg memory.
761 //
762 // If there is an error, possible values are:
763 // - io.EOF, when no messages remain
764 // - io.ErrUnexpectedEOF
765 // - of type transport.ConnectionError
766 // - an error from the status package
767 //
768 // No other error values or types must be returned, which also means
769 // that the underlying streamReader must not return an incompatible
770 // error.
771 func (p *parser) recvMsg(maxReceiveMessageSize int) (payloadFormat, mem.BufferSlice, error) {
772 err := p.r.ReadMessageHeader(p.header[:])
773 if err != nil {
774 return 0, nil, err
775 }
776 777 pf := payloadFormat(p.header[0])
778 length := binary.BigEndian.Uint32(p.header[1:])
779 780 if int64(length) > int64(maxInt) {
781 return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
782 }
783 if int(length) > maxReceiveMessageSize {
784 return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
785 }
786 787 data, err := p.r.Read(int(length))
788 if err != nil {
789 if err == io.EOF {
790 err = io.ErrUnexpectedEOF
791 }
792 return 0, nil, err
793 }
794 return pf, data, nil
795 }
796 797 // encode serializes msg and returns a buffer containing the message, or an
798 // error if it is too large to be transmitted by grpc. If msg is nil, it
799 // generates an empty message.
800 func encode(c baseCodec, msg any) (mem.BufferSlice, error) {
801 if msg == nil { // NOTE: typed nils will not be caught by this check
802 return nil, nil
803 }
804 b, err := c.Marshal(msg)
805 if err != nil {
806 return nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
807 }
808 if bufSize := uint(b.Len()); bufSize > math.MaxUint32 {
809 b.Free()
810 return nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", bufSize)
811 }
812 return b, nil
813 }
814 815 // compress returns the input bytes compressed by compressor or cp.
816 // If both compressors are nil, or if the message has zero length, returns nil,
817 // indicating no compression was done.
818 //
819 // TODO(dfawley): eliminate cp parameter by wrapping Compressor in an encoding.Compressor.
820 func compress(in mem.BufferSlice, cp Compressor, compressor encoding.Compressor, pool mem.BufferPool) (mem.BufferSlice, payloadFormat, error) {
821 if (compressor == nil && cp == nil) || in.Len() == 0 {
822 return nil, compressionNone, nil
823 }
824 var out mem.BufferSlice
825 w := mem.NewWriter(&out, pool)
826 wrapErr := func(err error) error {
827 out.Free()
828 return status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
829 }
830 if compressor != nil {
831 z, err := compressor.Compress(w)
832 if err != nil {
833 return nil, 0, wrapErr(err)
834 }
835 for _, b := range in {
836 if _, err := z.Write(b.ReadOnlyData()); err != nil {
837 return nil, 0, wrapErr(err)
838 }
839 }
840 if err := z.Close(); err != nil {
841 return nil, 0, wrapErr(err)
842 }
843 } else {
844 // This is obviously really inefficient since it fully materializes the data, but
845 // there is no way around this with the old Compressor API. At least it attempts
846 // to return the buffer to the provider, in the hopes it can be reused (maybe
847 // even by a subsequent call to this very function).
848 buf := in.MaterializeToBuffer(pool)
849 defer buf.Free()
850 if err := cp.Do(w, buf.ReadOnlyData()); err != nil {
851 return nil, 0, wrapErr(err)
852 }
853 }
854 return out, compressionMade, nil
855 }
856 857 const (
858 payloadLen = 1
859 sizeLen = 4
860 headerLen = payloadLen + sizeLen
861 )
862 863 // msgHeader returns a 5-byte header for the message being transmitted and the
864 // payload, which is compData if non-nil or data otherwise.
865 func msgHeader(data, compData mem.BufferSlice, pf payloadFormat) (hdr []byte, payload mem.BufferSlice) {
866 hdr = make([]byte, headerLen)
867 hdr[0] = byte(pf)
868 869 var length uint32
870 if pf.isCompressed() {
871 length = uint32(compData.Len())
872 payload = compData
873 } else {
874 length = uint32(data.Len())
875 payload = data
876 }
877 878 // Write length of payload into buf
879 binary.BigEndian.PutUint32(hdr[payloadLen:], length)
880 return hdr, payload
881 }
882 883 func outPayload(client bool, msg any, dataLength, payloadLength int, t time.Time) *stats.OutPayload {
884 return &stats.OutPayload{
885 Client: client,
886 Payload: msg,
887 Length: dataLength,
888 WireLength: payloadLength + headerLen,
889 CompressedLength: payloadLength,
890 SentTime: t,
891 }
892 }
893 894 func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool, isServer bool) *status.Status {
895 switch pf {
896 case compressionNone:
897 case compressionMade:
898 if recvCompress == "" || recvCompress == encoding.Identity {
899 return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding")
900 }
901 if !haveCompressor {
902 if isServer {
903 return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
904 }
905 return status.Newf(codes.Internal, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
906 }
907 default:
908 return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", pf)
909 }
910 return nil
911 }
912 913 type payloadInfo struct {
914 compressedLength int // The compressed length got from wire.
915 uncompressedBytes mem.BufferSlice
916 }
917 918 func (p *payloadInfo) free() {
919 if p != nil && p.uncompressedBytes != nil {
920 p.uncompressedBytes.Free()
921 }
922 }
923 924 // recvAndDecompress reads a message from the stream, decompressing it if necessary.
925 //
926 // Cancelling the returned cancel function releases the buffer back to the pool. So the caller should cancel as soon as
927 // the buffer is no longer needed.
928 // TODO: Refactor this function to reduce the number of arguments.
929 // See: https://google.github.io/styleguide/go/best-practices.html#function-argument-lists
930 func recvAndDecompress(p *parser, s recvCompressor, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool) (out mem.BufferSlice, err error) {
931 pf, compressed, err := p.recvMsg(maxReceiveMessageSize)
932 if err != nil {
933 return nil, err
934 }
935 936 compressedLength := compressed.Len()
937 938 if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil, isServer); st != nil {
939 compressed.Free()
940 return nil, st.Err()
941 }
942 943 if pf.isCompressed() {
944 defer compressed.Free()
945 // To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
946 // use this decompressor as the default.
947 out, err = decompress(compressor, compressed, dc, maxReceiveMessageSize, p.bufferPool)
948 if err != nil {
949 return nil, err
950 }
951 } else {
952 out = compressed
953 }
954 955 if payInfo != nil {
956 payInfo.compressedLength = compressedLength
957 out.Ref()
958 payInfo.uncompressedBytes = out
959 }
960 961 return out, nil
962 }
963 964 // decompress processes the given data by decompressing it using either a custom decompressor or a standard compressor.
965 // If a custom decompressor is provided, it takes precedence. The function validates that the decompressed data
966 // does not exceed the specified maximum size and returns an error if this limit is exceeded.
967 // On success, it returns the decompressed data. Otherwise, it returns an error if decompression fails or the data exceeds the size limit.
968 func decompress(compressor encoding.Compressor, d mem.BufferSlice, dc Decompressor, maxReceiveMessageSize int, pool mem.BufferPool) (mem.BufferSlice, error) {
969 if dc != nil {
970 uncompressed, err := dc.Do(d.Reader())
971 if err != nil {
972 return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", err)
973 }
974 if len(uncompressed) > maxReceiveMessageSize {
975 return nil, status.Errorf(codes.ResourceExhausted, "grpc: message after decompression larger than max (%d vs. %d)", len(uncompressed), maxReceiveMessageSize)
976 }
977 return mem.BufferSlice{mem.SliceBuffer(uncompressed)}, nil
978 }
979 if compressor != nil {
980 dcReader, err := compressor.Decompress(d.Reader())
981 if err != nil {
982 return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the message: %v", err)
983 }
984 985 // Read at most one byte more than the limit from the decompressor.
986 // Unless the limit is MaxInt64, in which case, that's impossible, so
987 // apply no limit.
988 if limit := int64(maxReceiveMessageSize); limit < math.MaxInt64 {
989 dcReader = io.LimitReader(dcReader, limit+1)
990 }
991 out, err := mem.ReadAll(dcReader, pool)
992 if err != nil {
993 out.Free()
994 return nil, status.Errorf(codes.Internal, "grpc: failed to read decompressed data: %v", err)
995 }
996 997 if out.Len() > maxReceiveMessageSize {
998 out.Free()
999 return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max %d", maxReceiveMessageSize)
1000 }
1001 return out, nil
1002 }
1003 return nil, status.Errorf(codes.Internal, "grpc: no decompressor available for compressed payload")
1004 }
1005 1006 type recvCompressor interface {
1007 RecvCompress() string
1008 }
1009 1010 // For the two compressor parameters, both should not be set, but if they are,
1011 // dc takes precedence over compressor.
1012 // TODO(dfawley): wrap the old compressor/decompressor using the new API?
1013 func recv(p *parser, c baseCodec, s recvCompressor, dc Decompressor, m any, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool) error {
1014 data, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor, isServer)
1015 if err != nil {
1016 return err
1017 }
1018 1019 // If the codec wants its own reference to the data, it can get it. Otherwise, always
1020 // free the buffers.
1021 defer data.Free()
1022 1023 if err := c.Unmarshal(data, m); err != nil {
1024 return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err)
1025 }
1026 1027 return nil
1028 }
1029 1030 // Information about RPC
1031 type rpcInfo struct {
1032 failfast bool
1033 preloaderInfo compressorInfo
1034 }
1035 1036 // Information about Preloader
1037 // Responsible for storing codec, and compressors
1038 // If stream (s) has context s.Context which stores rpcInfo that has non nil
1039 // pointers to codec, and compressors, then we can use preparedMsg for Async message prep
1040 // and reuse marshalled bytes
1041 type compressorInfo struct {
1042 codec baseCodec
1043 cp Compressor
1044 comp encoding.Compressor
1045 }
1046 1047 type rpcInfoContextKey struct{}
1048 1049 func newContextWithRPCInfo(ctx context.Context, failfast bool, codec baseCodec, cp Compressor, comp encoding.Compressor) context.Context {
1050 return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{
1051 failfast: failfast,
1052 preloaderInfo: compressorInfo{
1053 codec: codec,
1054 cp: cp,
1055 comp: comp,
1056 },
1057 })
1058 }
1059 1060 func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) {
1061 s, ok = ctx.Value(rpcInfoContextKey{}).(*rpcInfo)
1062 return
1063 }
1064 1065 // Code returns the error code for err if it was produced by the rpc system.
1066 // Otherwise, it returns codes.Unknown.
1067 //
1068 // Deprecated: use status.Code instead.
1069 func Code(err error) codes.Code {
1070 return status.Code(err)
1071 }
1072 1073 // ErrorDesc returns the error description of err if it was produced by the rpc system.
1074 // Otherwise, it returns err.Error() or empty string when err is nil.
1075 //
1076 // Deprecated: use status.Convert and Message method instead.
1077 func ErrorDesc(err error) string {
1078 return status.Convert(err).Message()
1079 }
1080 1081 // Errorf returns an error containing an error code and a description;
1082 // Errorf returns nil if c is OK.
1083 //
1084 // Deprecated: use status.Errorf instead.
1085 func Errorf(c codes.Code, format string, a ...any) error {
1086 return status.Errorf(c, format, a...)
1087 }
1088 1089 var errContextCanceled = status.Error(codes.Canceled, context.Canceled.Error())
1090 var errContextDeadline = status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error())
1091 1092 // toRPCErr converts an error into an error from the status package.
1093 func toRPCErr(err error) error {
1094 switch err {
1095 case nil, io.EOF:
1096 return err
1097 case context.DeadlineExceeded:
1098 return errContextDeadline
1099 case context.Canceled:
1100 return errContextCanceled
1101 case io.ErrUnexpectedEOF:
1102 return status.Error(codes.Internal, err.Error())
1103 }
1104 1105 switch e := err.(type) {
1106 case transport.ConnectionError:
1107 return status.Error(codes.Unavailable, e.Desc)
1108 case *transport.NewStreamError:
1109 return toRPCErr(e.Err)
1110 }
1111 1112 if _, ok := status.FromError(err); ok {
1113 return err
1114 }
1115 1116 return status.Error(codes.Unknown, err.Error())
1117 }
1118 1119 // setCallInfoCodec should only be called after CallOptions have been applied.
1120 func setCallInfoCodec(c *callInfo) error {
1121 if c.codec != nil {
1122 // codec was already set by a CallOption; use it, but set the content
1123 // subtype if it is not set.
1124 if c.contentSubtype == "" {
1125 // c.codec is a baseCodec to hide the difference between grpc.Codec and
1126 // encoding.Codec (Name vs. String method name). We only support
1127 // setting content subtype from encoding.Codec to avoid a behavior
1128 // change with the deprecated version.
1129 if ec, ok := c.codec.(encoding.CodecV2); ok {
1130 c.contentSubtype = strings.ToLower(ec.Name())
1131 }
1132 }
1133 return nil
1134 }
1135 1136 if c.contentSubtype == "" {
1137 // No codec specified in CallOptions; use proto by default.
1138 c.codec = getCodec(proto.Name)
1139 return nil
1140 }
1141 1142 // c.contentSubtype is already lowercased in CallContentSubtype
1143 c.codec = getCodec(c.contentSubtype)
1144 if c.codec == nil {
1145 return status.Errorf(codes.Internal, "no codec registered for content-subtype %s", c.contentSubtype)
1146 }
1147 return nil
1148 }
1149 1150 // The SupportPackageIsVersion variables are referenced from generated protocol
1151 // buffer files to ensure compatibility with the gRPC version used. The latest
1152 // support package version is 9.
1153 //
1154 // Older versions are kept for compatibility.
1155 //
1156 // These constants should not be referenced from any other code.
1157 const (
1158 SupportPackageIsVersion3 = true
1159 SupportPackageIsVersion4 = true
1160 SupportPackageIsVersion5 = true
1161 SupportPackageIsVersion6 = true
1162 SupportPackageIsVersion7 = true
1163 SupportPackageIsVersion8 = true
1164 SupportPackageIsVersion9 = true
1165 )
1166 1167 const grpcUA = "grpc-go/" + Version
1168