rpc_util.go raw

   1  /*
   2   *
   3   * Copyright 2014 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package grpc
  20  
  21  import (
  22  	"compress/gzip"
  23  	"context"
  24  	"encoding/binary"
  25  	"fmt"
  26  	"io"
  27  	"math"
  28  	"strings"
  29  	"sync"
  30  	"time"
  31  
  32  	"google.golang.org/grpc/codes"
  33  	"google.golang.org/grpc/credentials"
  34  	"google.golang.org/grpc/encoding"
  35  	"google.golang.org/grpc/encoding/proto"
  36  	"google.golang.org/grpc/internal"
  37  	"google.golang.org/grpc/internal/grpcutil"
  38  	"google.golang.org/grpc/internal/transport"
  39  	"google.golang.org/grpc/mem"
  40  	"google.golang.org/grpc/metadata"
  41  	"google.golang.org/grpc/peer"
  42  	"google.golang.org/grpc/stats"
  43  	"google.golang.org/grpc/status"
  44  )
  45  
  46  func init() {
  47  	internal.AcceptCompressors = acceptCompressors
  48  }
  49  
  50  // Compressor defines the interface gRPC uses to compress a message.
  51  //
  52  // Deprecated: use package encoding.
  53  type Compressor interface {
  54  	// Do compresses p into w.
  55  	Do(w io.Writer, p []byte) error
  56  	// Type returns the compression algorithm the Compressor uses.
  57  	Type() string
  58  }
  59  
  60  type gzipCompressor struct {
  61  	pool sync.Pool
  62  }
  63  
  64  // NewGZIPCompressor creates a Compressor based on GZIP.
  65  //
  66  // Deprecated: use package encoding/gzip.
  67  func NewGZIPCompressor() Compressor {
  68  	c, _ := NewGZIPCompressorWithLevel(gzip.DefaultCompression)
  69  	return c
  70  }
  71  
  72  // NewGZIPCompressorWithLevel is like NewGZIPCompressor but specifies the gzip compression level instead
  73  // of assuming DefaultCompression.
  74  //
  75  // The error returned will be nil if the level is valid.
  76  //
  77  // Deprecated: use package encoding/gzip.
  78  func NewGZIPCompressorWithLevel(level int) (Compressor, error) {
  79  	if level < gzip.DefaultCompression || level > gzip.BestCompression {
  80  		return nil, fmt.Errorf("grpc: invalid compression level: %d", level)
  81  	}
  82  	return &gzipCompressor{
  83  		pool: sync.Pool{
  84  			New: func() any {
  85  				w, err := gzip.NewWriterLevel(io.Discard, level)
  86  				if err != nil {
  87  					panic(err)
  88  				}
  89  				return w
  90  			},
  91  		},
  92  	}, nil
  93  }
  94  
  95  func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
  96  	z := c.pool.Get().(*gzip.Writer)
  97  	defer c.pool.Put(z)
  98  	z.Reset(w)
  99  	if _, err := z.Write(p); err != nil {
 100  		return err
 101  	}
 102  	return z.Close()
 103  }
 104  
 105  func (c *gzipCompressor) Type() string {
 106  	return "gzip"
 107  }
 108  
 109  // Decompressor defines the interface gRPC uses to decompress a message.
 110  //
 111  // Deprecated: use package encoding.
 112  type Decompressor interface {
 113  	// Do reads the data from r and uncompress them.
 114  	Do(r io.Reader) ([]byte, error)
 115  	// Type returns the compression algorithm the Decompressor uses.
 116  	Type() string
 117  }
 118  
 119  type gzipDecompressor struct {
 120  	pool sync.Pool
 121  }
 122  
 123  // NewGZIPDecompressor creates a Decompressor based on GZIP.
 124  //
 125  // Deprecated: use package encoding/gzip.
 126  func NewGZIPDecompressor() Decompressor {
 127  	return &gzipDecompressor{}
 128  }
 129  
 130  func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) {
 131  	var z *gzip.Reader
 132  	switch maybeZ := d.pool.Get().(type) {
 133  	case nil:
 134  		newZ, err := gzip.NewReader(r)
 135  		if err != nil {
 136  			return nil, err
 137  		}
 138  		z = newZ
 139  	case *gzip.Reader:
 140  		z = maybeZ
 141  		if err := z.Reset(r); err != nil {
 142  			d.pool.Put(z)
 143  			return nil, err
 144  		}
 145  	}
 146  
 147  	defer func() {
 148  		z.Close()
 149  		d.pool.Put(z)
 150  	}()
 151  	return io.ReadAll(z)
 152  }
 153  
 154  func (d *gzipDecompressor) Type() string {
 155  	return "gzip"
 156  }
 157  
 158  // callInfo contains all related configuration and information about an RPC.
 159  type callInfo struct {
 160  	compressorName              string
 161  	failFast                    bool
 162  	maxReceiveMessageSize       *int
 163  	maxSendMessageSize          *int
 164  	creds                       credentials.PerRPCCredentials
 165  	contentSubtype              string
 166  	codec                       baseCodec
 167  	maxRetryRPCBufferSize       int
 168  	onFinish                    []func(err error)
 169  	authority                   string
 170  	acceptedResponseCompressors []string
 171  }
 172  
 173  func acceptedCompressorAllows(allowed []string, name string) bool {
 174  	if allowed == nil {
 175  		return true
 176  	}
 177  	if name == "" || name == encoding.Identity {
 178  		return true
 179  	}
 180  	for _, a := range allowed {
 181  		if a == name {
 182  			return true
 183  		}
 184  	}
 185  	return false
 186  }
 187  
 188  func defaultCallInfo() *callInfo {
 189  	return &callInfo{
 190  		failFast:              true,
 191  		maxRetryRPCBufferSize: 256 * 1024, // 256KB
 192  	}
 193  }
 194  
 195  func newAcceptedCompressionConfig(names []string) ([]string, error) {
 196  	if len(names) == 0 {
 197  		return nil, nil
 198  	}
 199  	var allowed []string
 200  	seen := make(map[string]struct{}, len(names))
 201  	for _, name := range names {
 202  		name = strings.TrimSpace(name)
 203  		if name == "" || name == encoding.Identity {
 204  			continue
 205  		}
 206  		if !grpcutil.IsCompressorNameRegistered(name) {
 207  			return nil, status.Errorf(codes.InvalidArgument, "grpc: compressor %q is not registered", name)
 208  		}
 209  		if _, dup := seen[name]; dup {
 210  			continue
 211  		}
 212  		seen[name] = struct{}{}
 213  		allowed = append(allowed, name)
 214  	}
 215  	return allowed, nil
 216  }
 217  
 218  // CallOption configures a Call before it starts or extracts information from
 219  // a Call after it completes.
 220  type CallOption interface {
 221  	// before is called before the call is sent to any server.  If before
 222  	// returns a non-nil error, the RPC fails with that error.
 223  	before(*callInfo) error
 224  
 225  	// after is called after the call has completed.  after cannot return an
 226  	// error, so any failures should be reported via output parameters.
 227  	after(*callInfo, *csAttempt)
 228  }
 229  
 230  // EmptyCallOption does not alter the Call configuration.
 231  // It can be embedded in another structure to carry satellite data for use
 232  // by interceptors.
 233  type EmptyCallOption struct{}
 234  
 235  func (EmptyCallOption) before(*callInfo) error      { return nil }
 236  func (EmptyCallOption) after(*callInfo, *csAttempt) {}
 237  
 238  // StaticMethod returns a CallOption which specifies that a call is being made
 239  // to a method that is static, which means the method is known at compile time
 240  // and doesn't change at runtime. This can be used as a signal to stats plugins
 241  // that this method is safe to include as a key to a measurement.
 242  func StaticMethod() CallOption {
 243  	return StaticMethodCallOption{}
 244  }
 245  
 246  // StaticMethodCallOption is a CallOption that specifies that a call comes
 247  // from a static method.
 248  type StaticMethodCallOption struct {
 249  	EmptyCallOption
 250  }
 251  
 252  // Header returns a CallOptions that retrieves the header metadata
 253  // for a unary RPC.
 254  func Header(md *metadata.MD) CallOption {
 255  	return HeaderCallOption{HeaderAddr: md}
 256  }
 257  
 258  // HeaderCallOption is a CallOption for collecting response header metadata.
 259  // The metadata field will be populated *after* the RPC completes.
 260  //
 261  // # Experimental
 262  //
 263  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 264  // later release.
 265  type HeaderCallOption struct {
 266  	HeaderAddr *metadata.MD
 267  }
 268  
 269  func (o HeaderCallOption) before(*callInfo) error { return nil }
 270  func (o HeaderCallOption) after(_ *callInfo, attempt *csAttempt) {
 271  	*o.HeaderAddr, _ = attempt.transportStream.Header()
 272  }
 273  
 274  // Trailer returns a CallOptions that retrieves the trailer metadata
 275  // for a unary RPC.
 276  func Trailer(md *metadata.MD) CallOption {
 277  	return TrailerCallOption{TrailerAddr: md}
 278  }
 279  
 280  // TrailerCallOption is a CallOption for collecting response trailer metadata.
 281  // The metadata field will be populated *after* the RPC completes.
 282  //
 283  // # Experimental
 284  //
 285  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 286  // later release.
 287  type TrailerCallOption struct {
 288  	TrailerAddr *metadata.MD
 289  }
 290  
 291  func (o TrailerCallOption) before(*callInfo) error { return nil }
 292  func (o TrailerCallOption) after(_ *callInfo, attempt *csAttempt) {
 293  	*o.TrailerAddr = attempt.transportStream.Trailer()
 294  }
 295  
 296  // Peer returns a CallOption that retrieves peer information for a unary RPC.
 297  // The peer field will be populated *after* the RPC completes.
 298  func Peer(p *peer.Peer) CallOption {
 299  	return PeerCallOption{PeerAddr: p}
 300  }
 301  
 302  // PeerCallOption is a CallOption for collecting the identity of the remote
 303  // peer. The peer field will be populated *after* the RPC completes.
 304  //
 305  // # Experimental
 306  //
 307  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 308  // later release.
 309  type PeerCallOption struct {
 310  	PeerAddr *peer.Peer
 311  }
 312  
 313  func (o PeerCallOption) before(*callInfo) error { return nil }
 314  func (o PeerCallOption) after(_ *callInfo, attempt *csAttempt) {
 315  	if x, ok := peer.FromContext(attempt.transportStream.Context()); ok {
 316  		*o.PeerAddr = *x
 317  	}
 318  }
 319  
 320  // WaitForReady configures the RPC's behavior when the client is in
 321  // TRANSIENT_FAILURE, which occurs when all addresses fail to connect.  If
 322  // waitForReady is false, the RPC will fail immediately.  Otherwise, the client
 323  // will wait until a connection becomes available or the RPC's deadline is
 324  // reached.
 325  //
 326  // By default, RPCs do not "wait for ready".
 327  func WaitForReady(waitForReady bool) CallOption {
 328  	return FailFastCallOption{FailFast: !waitForReady}
 329  }
 330  
 331  // FailFast is the opposite of WaitForReady.
 332  //
 333  // Deprecated: use WaitForReady.
 334  func FailFast(failFast bool) CallOption {
 335  	return FailFastCallOption{FailFast: failFast}
 336  }
 337  
 338  // FailFastCallOption is a CallOption for indicating whether an RPC should fail
 339  // fast or not.
 340  //
 341  // # Experimental
 342  //
 343  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 344  // later release.
 345  type FailFastCallOption struct {
 346  	FailFast bool
 347  }
 348  
 349  func (o FailFastCallOption) before(c *callInfo) error {
 350  	c.failFast = o.FailFast
 351  	return nil
 352  }
 353  func (o FailFastCallOption) after(*callInfo, *csAttempt) {}
 354  
 355  // OnFinish returns a CallOption that configures a callback to be called when
 356  // the call completes. The error passed to the callback is the status of the
 357  // RPC, and may be nil. The onFinish callback provided will only be called once
 358  // by gRPC. This is mainly used to be used by streaming interceptors, to be
 359  // notified when the RPC completes along with information about the status of
 360  // the RPC.
 361  //
 362  // # Experimental
 363  //
 364  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 365  // later release.
 366  func OnFinish(onFinish func(err error)) CallOption {
 367  	return OnFinishCallOption{
 368  		OnFinish: onFinish,
 369  	}
 370  }
 371  
 372  // OnFinishCallOption is CallOption that indicates a callback to be called when
 373  // the call completes.
 374  //
 375  // # Experimental
 376  //
 377  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 378  // later release.
 379  type OnFinishCallOption struct {
 380  	OnFinish func(error)
 381  }
 382  
 383  func (o OnFinishCallOption) before(c *callInfo) error {
 384  	c.onFinish = append(c.onFinish, o.OnFinish)
 385  	return nil
 386  }
 387  
 388  func (o OnFinishCallOption) after(*callInfo, *csAttempt) {}
 389  
 390  // MaxCallRecvMsgSize returns a CallOption which sets the maximum message size
 391  // in bytes the client can receive. If this is not set, gRPC uses the default
 392  // 4MB.
 393  func MaxCallRecvMsgSize(bytes int) CallOption {
 394  	return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: bytes}
 395  }
 396  
 397  // MaxRecvMsgSizeCallOption is a CallOption that indicates the maximum message
 398  // size in bytes the client can receive.
 399  //
 400  // # Experimental
 401  //
 402  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 403  // later release.
 404  type MaxRecvMsgSizeCallOption struct {
 405  	MaxRecvMsgSize int
 406  }
 407  
 408  func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error {
 409  	c.maxReceiveMessageSize = &o.MaxRecvMsgSize
 410  	return nil
 411  }
 412  func (o MaxRecvMsgSizeCallOption) after(*callInfo, *csAttempt) {}
 413  
 414  // CallAuthority returns a CallOption that sets the HTTP/2 :authority header of
 415  // an RPC to the specified value. When using CallAuthority, the credentials in
 416  // use must implement the AuthorityValidator interface.
 417  //
 418  // # Experimental
 419  //
 420  // Notice: This API is EXPERIMENTAL and may be changed or removed in a later
 421  // release.
 422  func CallAuthority(authority string) CallOption {
 423  	return AuthorityOverrideCallOption{Authority: authority}
 424  }
 425  
 426  // AuthorityOverrideCallOption is a CallOption that indicates the HTTP/2
 427  // :authority header value to use for the call.
 428  //
 429  // # Experimental
 430  //
 431  // Notice: This type is EXPERIMENTAL and may be changed or removed in a later
 432  // release.
 433  type AuthorityOverrideCallOption struct {
 434  	Authority string
 435  }
 436  
 437  func (o AuthorityOverrideCallOption) before(c *callInfo) error {
 438  	c.authority = o.Authority
 439  	return nil
 440  }
 441  
 442  func (o AuthorityOverrideCallOption) after(*callInfo, *csAttempt) {}
 443  
 444  // MaxCallSendMsgSize returns a CallOption which sets the maximum message size
 445  // in bytes the client can send. If this is not set, gRPC uses the default
 446  // `math.MaxInt32`.
 447  func MaxCallSendMsgSize(bytes int) CallOption {
 448  	return MaxSendMsgSizeCallOption{MaxSendMsgSize: bytes}
 449  }
 450  
 451  // MaxSendMsgSizeCallOption is a CallOption that indicates the maximum message
 452  // size in bytes the client can send.
 453  //
 454  // # Experimental
 455  //
 456  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 457  // later release.
 458  type MaxSendMsgSizeCallOption struct {
 459  	MaxSendMsgSize int
 460  }
 461  
 462  func (o MaxSendMsgSizeCallOption) before(c *callInfo) error {
 463  	c.maxSendMessageSize = &o.MaxSendMsgSize
 464  	return nil
 465  }
 466  func (o MaxSendMsgSizeCallOption) after(*callInfo, *csAttempt) {}
 467  
 468  // PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials
 469  // for a call.
 470  func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption {
 471  	return PerRPCCredsCallOption{Creds: creds}
 472  }
 473  
 474  // PerRPCCredsCallOption is a CallOption that indicates the per-RPC
 475  // credentials to use for the call.
 476  //
 477  // # Experimental
 478  //
 479  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 480  // later release.
 481  type PerRPCCredsCallOption struct {
 482  	Creds credentials.PerRPCCredentials
 483  }
 484  
 485  func (o PerRPCCredsCallOption) before(c *callInfo) error {
 486  	c.creds = o.Creds
 487  	return nil
 488  }
 489  func (o PerRPCCredsCallOption) after(*callInfo, *csAttempt) {}
 490  
 491  // UseCompressor returns a CallOption which sets the compressor used when
 492  // sending the request.  If WithCompressor is also set, UseCompressor has
 493  // higher priority.
 494  //
 495  // # Experimental
 496  //
 497  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 498  // later release.
 499  func UseCompressor(name string) CallOption {
 500  	return CompressorCallOption{CompressorType: name}
 501  }
 502  
 503  // CompressorCallOption is a CallOption that indicates the compressor to use.
 504  //
 505  // # Experimental
 506  //
 507  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 508  // later release.
 509  type CompressorCallOption struct {
 510  	CompressorType string
 511  }
 512  
 513  func (o CompressorCallOption) before(c *callInfo) error {
 514  	c.compressorName = o.CompressorType
 515  	return nil
 516  }
 517  func (o CompressorCallOption) after(*callInfo, *csAttempt) {}
 518  
 519  // acceptCompressors returns a CallOption that limits the compression algorithms
 520  // advertised in the grpc-accept-encoding header for response messages.
 521  // Compression algorithms not in the provided list will not be advertised, and
 522  // responses compressed with non-listed algorithms will be rejected.
 523  func acceptCompressors(names ...string) CallOption {
 524  	cp := append([]string(nil), names...)
 525  	return acceptCompressorsCallOption{names: cp}
 526  }
 527  
 528  // acceptCompressorsCallOption is a CallOption that limits response compression.
 529  type acceptCompressorsCallOption struct {
 530  	names []string
 531  }
 532  
 533  func (o acceptCompressorsCallOption) before(c *callInfo) error {
 534  	allowed, err := newAcceptedCompressionConfig(o.names)
 535  	if err != nil {
 536  		return err
 537  	}
 538  	c.acceptedResponseCompressors = allowed
 539  	return nil
 540  }
 541  
 542  func (acceptCompressorsCallOption) after(*callInfo, *csAttempt) {}
 543  
 544  // CallContentSubtype returns a CallOption that will set the content-subtype
 545  // for a call. For example, if content-subtype is "json", the Content-Type over
 546  // the wire will be "application/grpc+json". The content-subtype is converted
 547  // to lowercase before being included in Content-Type. See Content-Type on
 548  // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
 549  // more details.
 550  //
 551  // If ForceCodec is not also used, the content-subtype will be used to look up
 552  // the Codec to use in the registry controlled by RegisterCodec. See the
 553  // documentation on RegisterCodec for details on registration. The lookup of
 554  // content-subtype is case-insensitive. If no such Codec is found, the call
 555  // will result in an error with code codes.Internal.
 556  //
 557  // If ForceCodec is also used, that Codec will be used for all request and
 558  // response messages, with the content-subtype set to the given contentSubtype
 559  // here for requests.
 560  func CallContentSubtype(contentSubtype string) CallOption {
 561  	return ContentSubtypeCallOption{ContentSubtype: strings.ToLower(contentSubtype)}
 562  }
 563  
 564  // ContentSubtypeCallOption is a CallOption that indicates the content-subtype
 565  // used for marshaling messages.
 566  //
 567  // # Experimental
 568  //
 569  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 570  // later release.
 571  type ContentSubtypeCallOption struct {
 572  	ContentSubtype string
 573  }
 574  
 575  func (o ContentSubtypeCallOption) before(c *callInfo) error {
 576  	c.contentSubtype = o.ContentSubtype
 577  	return nil
 578  }
 579  func (o ContentSubtypeCallOption) after(*callInfo, *csAttempt) {}
 580  
 581  // ForceCodec returns a CallOption that will set codec to be used for all
 582  // request and response messages for a call. The result of calling Name() will
 583  // be used as the content-subtype after converting to lowercase, unless
 584  // CallContentSubtype is also used.
 585  //
 586  // See Content-Type on
 587  // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
 588  // more details. Also see the documentation on RegisterCodec and
 589  // CallContentSubtype for more details on the interaction between Codec and
 590  // content-subtype.
 591  //
 592  // This function is provided for advanced users; prefer to use only
 593  // CallContentSubtype to select a registered codec instead.
 594  //
 595  // # Experimental
 596  //
 597  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 598  // later release.
 599  func ForceCodec(codec encoding.Codec) CallOption {
 600  	return ForceCodecCallOption{Codec: codec}
 601  }
 602  
 603  // ForceCodecCallOption is a CallOption that indicates the codec used for
 604  // marshaling messages.
 605  //
 606  // # Experimental
 607  //
 608  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 609  // later release.
 610  type ForceCodecCallOption struct {
 611  	Codec encoding.Codec
 612  }
 613  
 614  func (o ForceCodecCallOption) before(c *callInfo) error {
 615  	c.codec = newCodecV1Bridge(o.Codec)
 616  	return nil
 617  }
 618  func (o ForceCodecCallOption) after(*callInfo, *csAttempt) {}
 619  
 620  // ForceCodecV2 returns a CallOption that will set codec to be used for all
 621  // request and response messages for a call. The result of calling Name() will
 622  // be used as the content-subtype after converting to lowercase, unless
 623  // CallContentSubtype is also used.
 624  //
 625  // See Content-Type on
 626  // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
 627  // more details. Also see the documentation on RegisterCodec and
 628  // CallContentSubtype for more details on the interaction between Codec and
 629  // content-subtype.
 630  //
 631  // This function is provided for advanced users; prefer to use only
 632  // CallContentSubtype to select a registered codec instead.
 633  //
 634  // # Experimental
 635  //
 636  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 637  // later release.
 638  func ForceCodecV2(codec encoding.CodecV2) CallOption {
 639  	return ForceCodecV2CallOption{CodecV2: codec}
 640  }
 641  
 642  // ForceCodecV2CallOption is a CallOption that indicates the codec used for
 643  // marshaling messages.
 644  //
 645  // # Experimental
 646  //
 647  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 648  // later release.
 649  type ForceCodecV2CallOption struct {
 650  	CodecV2 encoding.CodecV2
 651  }
 652  
 653  func (o ForceCodecV2CallOption) before(c *callInfo) error {
 654  	c.codec = o.CodecV2
 655  	return nil
 656  }
 657  
 658  func (o ForceCodecV2CallOption) after(*callInfo, *csAttempt) {}
 659  
 660  // CallCustomCodec behaves like ForceCodec, but accepts a grpc.Codec instead of
 661  // an encoding.Codec.
 662  //
 663  // Deprecated: use ForceCodec instead.
 664  func CallCustomCodec(codec Codec) CallOption {
 665  	return CustomCodecCallOption{Codec: codec}
 666  }
 667  
 668  // CustomCodecCallOption is a CallOption that indicates the codec used for
 669  // marshaling messages.
 670  //
 671  // # Experimental
 672  //
 673  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 674  // later release.
 675  type CustomCodecCallOption struct {
 676  	Codec Codec
 677  }
 678  
 679  func (o CustomCodecCallOption) before(c *callInfo) error {
 680  	c.codec = newCodecV0Bridge(o.Codec)
 681  	return nil
 682  }
 683  func (o CustomCodecCallOption) after(*callInfo, *csAttempt) {}
 684  
 685  // MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory
 686  // used for buffering this RPC's requests for retry purposes.
 687  //
 688  // # Experimental
 689  //
 690  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 691  // later release.
 692  func MaxRetryRPCBufferSize(bytes int) CallOption {
 693  	return MaxRetryRPCBufferSizeCallOption{bytes}
 694  }
 695  
 696  // MaxRetryRPCBufferSizeCallOption is a CallOption indicating the amount of
 697  // memory to be used for caching this RPC for retry purposes.
 698  //
 699  // # Experimental
 700  //
 701  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 702  // later release.
 703  type MaxRetryRPCBufferSizeCallOption struct {
 704  	MaxRetryRPCBufferSize int
 705  }
 706  
 707  func (o MaxRetryRPCBufferSizeCallOption) before(c *callInfo) error {
 708  	c.maxRetryRPCBufferSize = o.MaxRetryRPCBufferSize
 709  	return nil
 710  }
 711  func (o MaxRetryRPCBufferSizeCallOption) after(*callInfo, *csAttempt) {}
 712  
 713  // The format of the payload: compressed or not?
 714  type payloadFormat uint8
 715  
 716  const (
 717  	compressionNone payloadFormat = 0 // no compression
 718  	compressionMade payloadFormat = 1 // compressed
 719  )
 720  
 721  func (pf payloadFormat) isCompressed() bool {
 722  	return pf == compressionMade
 723  }
 724  
 725  type streamReader interface {
 726  	ReadMessageHeader(header []byte) error
 727  	Read(n int) (mem.BufferSlice, error)
 728  }
 729  
 730  // noCopy may be embedded into structs which must not be copied
 731  // after the first use.
 732  //
 733  // See https://golang.org/issues/8005#issuecomment-190753527
 734  // for details.
 735  type noCopy struct {
 736  }
 737  
 738  func (*noCopy) Lock()   {}
 739  func (*noCopy) Unlock() {}
 740  
 741  // parser reads complete gRPC messages from the underlying reader.
 742  type parser struct {
 743  	_ noCopy
 744  	// r is the underlying reader.
 745  	// See the comment on recvMsg for the permissible
 746  	// error types.
 747  	r streamReader
 748  
 749  	// The header of a gRPC message. Find more detail at
 750  	// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
 751  	header [5]byte
 752  
 753  	// bufferPool is the pool of shared receive buffers.
 754  	bufferPool mem.BufferPool
 755  }
 756  
 757  // recvMsg reads a complete gRPC message from the stream.
 758  //
 759  // It returns the message and its payload (compression/encoding)
 760  // format. The caller owns the returned msg memory.
 761  //
 762  // If there is an error, possible values are:
 763  //   - io.EOF, when no messages remain
 764  //   - io.ErrUnexpectedEOF
 765  //   - of type transport.ConnectionError
 766  //   - an error from the status package
 767  //
 768  // No other error values or types must be returned, which also means
 769  // that the underlying streamReader must not return an incompatible
 770  // error.
 771  func (p *parser) recvMsg(maxReceiveMessageSize int) (payloadFormat, mem.BufferSlice, error) {
 772  	err := p.r.ReadMessageHeader(p.header[:])
 773  	if err != nil {
 774  		return 0, nil, err
 775  	}
 776  
 777  	pf := payloadFormat(p.header[0])
 778  	length := binary.BigEndian.Uint32(p.header[1:])
 779  
 780  	if int64(length) > int64(maxInt) {
 781  		return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
 782  	}
 783  	if int(length) > maxReceiveMessageSize {
 784  		return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
 785  	}
 786  
 787  	data, err := p.r.Read(int(length))
 788  	if err != nil {
 789  		if err == io.EOF {
 790  			err = io.ErrUnexpectedEOF
 791  		}
 792  		return 0, nil, err
 793  	}
 794  	return pf, data, nil
 795  }
 796  
 797  // encode serializes msg and returns a buffer containing the message, or an
 798  // error if it is too large to be transmitted by grpc.  If msg is nil, it
 799  // generates an empty message.
 800  func encode(c baseCodec, msg any) (mem.BufferSlice, error) {
 801  	if msg == nil { // NOTE: typed nils will not be caught by this check
 802  		return nil, nil
 803  	}
 804  	b, err := c.Marshal(msg)
 805  	if err != nil {
 806  		return nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
 807  	}
 808  	if bufSize := uint(b.Len()); bufSize > math.MaxUint32 {
 809  		b.Free()
 810  		return nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", bufSize)
 811  	}
 812  	return b, nil
 813  }
 814  
 815  // compress returns the input bytes compressed by compressor or cp.
 816  // If both compressors are nil, or if the message has zero length, returns nil,
 817  // indicating no compression was done.
 818  //
 819  // TODO(dfawley): eliminate cp parameter by wrapping Compressor in an encoding.Compressor.
 820  func compress(in mem.BufferSlice, cp Compressor, compressor encoding.Compressor, pool mem.BufferPool) (mem.BufferSlice, payloadFormat, error) {
 821  	if (compressor == nil && cp == nil) || in.Len() == 0 {
 822  		return nil, compressionNone, nil
 823  	}
 824  	var out mem.BufferSlice
 825  	w := mem.NewWriter(&out, pool)
 826  	wrapErr := func(err error) error {
 827  		out.Free()
 828  		return status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
 829  	}
 830  	if compressor != nil {
 831  		z, err := compressor.Compress(w)
 832  		if err != nil {
 833  			return nil, 0, wrapErr(err)
 834  		}
 835  		for _, b := range in {
 836  			if _, err := z.Write(b.ReadOnlyData()); err != nil {
 837  				return nil, 0, wrapErr(err)
 838  			}
 839  		}
 840  		if err := z.Close(); err != nil {
 841  			return nil, 0, wrapErr(err)
 842  		}
 843  	} else {
 844  		// This is obviously really inefficient since it fully materializes the data, but
 845  		// there is no way around this with the old Compressor API. At least it attempts
 846  		// to return the buffer to the provider, in the hopes it can be reused (maybe
 847  		// even by a subsequent call to this very function).
 848  		buf := in.MaterializeToBuffer(pool)
 849  		defer buf.Free()
 850  		if err := cp.Do(w, buf.ReadOnlyData()); err != nil {
 851  			return nil, 0, wrapErr(err)
 852  		}
 853  	}
 854  	return out, compressionMade, nil
 855  }
 856  
 857  const (
 858  	payloadLen = 1
 859  	sizeLen    = 4
 860  	headerLen  = payloadLen + sizeLen
 861  )
 862  
 863  // msgHeader returns a 5-byte header for the message being transmitted and the
 864  // payload, which is compData if non-nil or data otherwise.
 865  func msgHeader(data, compData mem.BufferSlice, pf payloadFormat) (hdr []byte, payload mem.BufferSlice) {
 866  	hdr = make([]byte, headerLen)
 867  	hdr[0] = byte(pf)
 868  
 869  	var length uint32
 870  	if pf.isCompressed() {
 871  		length = uint32(compData.Len())
 872  		payload = compData
 873  	} else {
 874  		length = uint32(data.Len())
 875  		payload = data
 876  	}
 877  
 878  	// Write length of payload into buf
 879  	binary.BigEndian.PutUint32(hdr[payloadLen:], length)
 880  	return hdr, payload
 881  }
 882  
 883  func outPayload(client bool, msg any, dataLength, payloadLength int, t time.Time) *stats.OutPayload {
 884  	return &stats.OutPayload{
 885  		Client:           client,
 886  		Payload:          msg,
 887  		Length:           dataLength,
 888  		WireLength:       payloadLength + headerLen,
 889  		CompressedLength: payloadLength,
 890  		SentTime:         t,
 891  	}
 892  }
 893  
 894  func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool, isServer bool) *status.Status {
 895  	switch pf {
 896  	case compressionNone:
 897  	case compressionMade:
 898  		if recvCompress == "" || recvCompress == encoding.Identity {
 899  			return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding")
 900  		}
 901  		if !haveCompressor {
 902  			if isServer {
 903  				return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
 904  			}
 905  			return status.Newf(codes.Internal, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
 906  		}
 907  	default:
 908  		return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", pf)
 909  	}
 910  	return nil
 911  }
 912  
 913  type payloadInfo struct {
 914  	compressedLength  int // The compressed length got from wire.
 915  	uncompressedBytes mem.BufferSlice
 916  }
 917  
 918  func (p *payloadInfo) free() {
 919  	if p != nil && p.uncompressedBytes != nil {
 920  		p.uncompressedBytes.Free()
 921  	}
 922  }
 923  
 924  // recvAndDecompress reads a message from the stream, decompressing it if necessary.
 925  //
 926  // Cancelling the returned cancel function releases the buffer back to the pool. So the caller should cancel as soon as
 927  // the buffer is no longer needed.
 928  // TODO: Refactor this function to reduce the number of arguments.
 929  // See: https://google.github.io/styleguide/go/best-practices.html#function-argument-lists
 930  func recvAndDecompress(p *parser, s recvCompressor, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool) (out mem.BufferSlice, err error) {
 931  	pf, compressed, err := p.recvMsg(maxReceiveMessageSize)
 932  	if err != nil {
 933  		return nil, err
 934  	}
 935  
 936  	compressedLength := compressed.Len()
 937  
 938  	if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil, isServer); st != nil {
 939  		compressed.Free()
 940  		return nil, st.Err()
 941  	}
 942  
 943  	if pf.isCompressed() {
 944  		defer compressed.Free()
 945  		// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
 946  		// use this decompressor as the default.
 947  		out, err = decompress(compressor, compressed, dc, maxReceiveMessageSize, p.bufferPool)
 948  		if err != nil {
 949  			return nil, err
 950  		}
 951  	} else {
 952  		out = compressed
 953  	}
 954  
 955  	if payInfo != nil {
 956  		payInfo.compressedLength = compressedLength
 957  		out.Ref()
 958  		payInfo.uncompressedBytes = out
 959  	}
 960  
 961  	return out, nil
 962  }
 963  
 964  // decompress processes the given data by decompressing it using either a custom decompressor or a standard compressor.
 965  // If a custom decompressor is provided, it takes precedence. The function validates that the decompressed data
 966  // does not exceed the specified maximum size and returns an error if this limit is exceeded.
 967  // On success, it returns the decompressed data. Otherwise, it returns an error if decompression fails or the data exceeds the size limit.
 968  func decompress(compressor encoding.Compressor, d mem.BufferSlice, dc Decompressor, maxReceiveMessageSize int, pool mem.BufferPool) (mem.BufferSlice, error) {
 969  	if dc != nil {
 970  		uncompressed, err := dc.Do(d.Reader())
 971  		if err != nil {
 972  			return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", err)
 973  		}
 974  		if len(uncompressed) > maxReceiveMessageSize {
 975  			return nil, status.Errorf(codes.ResourceExhausted, "grpc: message after decompression larger than max (%d vs. %d)", len(uncompressed), maxReceiveMessageSize)
 976  		}
 977  		return mem.BufferSlice{mem.SliceBuffer(uncompressed)}, nil
 978  	}
 979  	if compressor != nil {
 980  		dcReader, err := compressor.Decompress(d.Reader())
 981  		if err != nil {
 982  			return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the message: %v", err)
 983  		}
 984  
 985  		// Read at most one byte more than the limit from the decompressor.
 986  		// Unless the limit is MaxInt64, in which case, that's impossible, so
 987  		// apply no limit.
 988  		if limit := int64(maxReceiveMessageSize); limit < math.MaxInt64 {
 989  			dcReader = io.LimitReader(dcReader, limit+1)
 990  		}
 991  		out, err := mem.ReadAll(dcReader, pool)
 992  		if err != nil {
 993  			out.Free()
 994  			return nil, status.Errorf(codes.Internal, "grpc: failed to read decompressed data: %v", err)
 995  		}
 996  
 997  		if out.Len() > maxReceiveMessageSize {
 998  			out.Free()
 999  			return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max %d", maxReceiveMessageSize)
1000  		}
1001  		return out, nil
1002  	}
1003  	return nil, status.Errorf(codes.Internal, "grpc: no decompressor available for compressed payload")
1004  }
1005  
1006  type recvCompressor interface {
1007  	RecvCompress() string
1008  }
1009  
1010  // For the two compressor parameters, both should not be set, but if they are,
1011  // dc takes precedence over compressor.
1012  // TODO(dfawley): wrap the old compressor/decompressor using the new API?
1013  func recv(p *parser, c baseCodec, s recvCompressor, dc Decompressor, m any, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool) error {
1014  	data, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor, isServer)
1015  	if err != nil {
1016  		return err
1017  	}
1018  
1019  	// If the codec wants its own reference to the data, it can get it. Otherwise, always
1020  	// free the buffers.
1021  	defer data.Free()
1022  
1023  	if err := c.Unmarshal(data, m); err != nil {
1024  		return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err)
1025  	}
1026  
1027  	return nil
1028  }
1029  
1030  // Information about RPC
1031  type rpcInfo struct {
1032  	failfast      bool
1033  	preloaderInfo compressorInfo
1034  }
1035  
1036  // Information about Preloader
1037  // Responsible for storing codec, and compressors
1038  // If stream (s) has  context s.Context which stores rpcInfo that has non nil
1039  // pointers to codec, and compressors, then we can use preparedMsg for Async message prep
1040  // and reuse marshalled bytes
1041  type compressorInfo struct {
1042  	codec baseCodec
1043  	cp    Compressor
1044  	comp  encoding.Compressor
1045  }
1046  
1047  type rpcInfoContextKey struct{}
1048  
1049  func newContextWithRPCInfo(ctx context.Context, failfast bool, codec baseCodec, cp Compressor, comp encoding.Compressor) context.Context {
1050  	return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{
1051  		failfast: failfast,
1052  		preloaderInfo: compressorInfo{
1053  			codec: codec,
1054  			cp:    cp,
1055  			comp:  comp,
1056  		},
1057  	})
1058  }
1059  
1060  func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) {
1061  	s, ok = ctx.Value(rpcInfoContextKey{}).(*rpcInfo)
1062  	return
1063  }
1064  
1065  // Code returns the error code for err if it was produced by the rpc system.
1066  // Otherwise, it returns codes.Unknown.
1067  //
1068  // Deprecated: use status.Code instead.
1069  func Code(err error) codes.Code {
1070  	return status.Code(err)
1071  }
1072  
1073  // ErrorDesc returns the error description of err if it was produced by the rpc system.
1074  // Otherwise, it returns err.Error() or empty string when err is nil.
1075  //
1076  // Deprecated: use status.Convert and Message method instead.
1077  func ErrorDesc(err error) string {
1078  	return status.Convert(err).Message()
1079  }
1080  
1081  // Errorf returns an error containing an error code and a description;
1082  // Errorf returns nil if c is OK.
1083  //
1084  // Deprecated: use status.Errorf instead.
1085  func Errorf(c codes.Code, format string, a ...any) error {
1086  	return status.Errorf(c, format, a...)
1087  }
1088  
1089  var errContextCanceled = status.Error(codes.Canceled, context.Canceled.Error())
1090  var errContextDeadline = status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error())
1091  
1092  // toRPCErr converts an error into an error from the status package.
1093  func toRPCErr(err error) error {
1094  	switch err {
1095  	case nil, io.EOF:
1096  		return err
1097  	case context.DeadlineExceeded:
1098  		return errContextDeadline
1099  	case context.Canceled:
1100  		return errContextCanceled
1101  	case io.ErrUnexpectedEOF:
1102  		return status.Error(codes.Internal, err.Error())
1103  	}
1104  
1105  	switch e := err.(type) {
1106  	case transport.ConnectionError:
1107  		return status.Error(codes.Unavailable, e.Desc)
1108  	case *transport.NewStreamError:
1109  		return toRPCErr(e.Err)
1110  	}
1111  
1112  	if _, ok := status.FromError(err); ok {
1113  		return err
1114  	}
1115  
1116  	return status.Error(codes.Unknown, err.Error())
1117  }
1118  
1119  // setCallInfoCodec should only be called after CallOptions have been applied.
1120  func setCallInfoCodec(c *callInfo) error {
1121  	if c.codec != nil {
1122  		// codec was already set by a CallOption; use it, but set the content
1123  		// subtype if it is not set.
1124  		if c.contentSubtype == "" {
1125  			// c.codec is a baseCodec to hide the difference between grpc.Codec and
1126  			// encoding.Codec (Name vs. String method name).  We only support
1127  			// setting content subtype from encoding.Codec to avoid a behavior
1128  			// change with the deprecated version.
1129  			if ec, ok := c.codec.(encoding.CodecV2); ok {
1130  				c.contentSubtype = strings.ToLower(ec.Name())
1131  			}
1132  		}
1133  		return nil
1134  	}
1135  
1136  	if c.contentSubtype == "" {
1137  		// No codec specified in CallOptions; use proto by default.
1138  		c.codec = getCodec(proto.Name)
1139  		return nil
1140  	}
1141  
1142  	// c.contentSubtype is already lowercased in CallContentSubtype
1143  	c.codec = getCodec(c.contentSubtype)
1144  	if c.codec == nil {
1145  		return status.Errorf(codes.Internal, "no codec registered for content-subtype %s", c.contentSubtype)
1146  	}
1147  	return nil
1148  }
1149  
1150  // The SupportPackageIsVersion variables are referenced from generated protocol
1151  // buffer files to ensure compatibility with the gRPC version used.  The latest
1152  // support package version is 9.
1153  //
1154  // Older versions are kept for compatibility.
1155  //
1156  // These constants should not be referenced from any other code.
1157  const (
1158  	SupportPackageIsVersion3 = true
1159  	SupportPackageIsVersion4 = true
1160  	SupportPackageIsVersion5 = true
1161  	SupportPackageIsVersion6 = true
1162  	SupportPackageIsVersion7 = true
1163  	SupportPackageIsVersion8 = true
1164  	SupportPackageIsVersion9 = true
1165  )
1166  
1167  const grpcUA = "grpc-go/" + Version
1168