stream.go raw

   1  /*
   2   *
   3   * Copyright 2014 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package grpc
  20  
  21  import (
  22  	"context"
  23  	"errors"
  24  	"io"
  25  	"math"
  26  	rand "math/rand/v2"
  27  	"strconv"
  28  	"strings"
  29  	"sync"
  30  	"time"
  31  
  32  	"google.golang.org/grpc/balancer"
  33  	"google.golang.org/grpc/codes"
  34  	"google.golang.org/grpc/encoding"
  35  	"google.golang.org/grpc/internal"
  36  	"google.golang.org/grpc/internal/balancerload"
  37  	"google.golang.org/grpc/internal/binarylog"
  38  	"google.golang.org/grpc/internal/channelz"
  39  	"google.golang.org/grpc/internal/grpcutil"
  40  	imetadata "google.golang.org/grpc/internal/metadata"
  41  	iresolver "google.golang.org/grpc/internal/resolver"
  42  	"google.golang.org/grpc/internal/serviceconfig"
  43  	istatus "google.golang.org/grpc/internal/status"
  44  	"google.golang.org/grpc/internal/transport"
  45  	"google.golang.org/grpc/mem"
  46  	"google.golang.org/grpc/metadata"
  47  	"google.golang.org/grpc/peer"
  48  	"google.golang.org/grpc/stats"
  49  	"google.golang.org/grpc/status"
  50  )
  51  
  52  var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
  53  
  54  // StreamHandler defines the handler called by gRPC server to complete the
  55  // execution of a streaming RPC.
  56  //
  57  // If a StreamHandler returns an error, it should either be produced by the
  58  // status package, or be one of the context errors. Otherwise, gRPC will use
  59  // codes.Unknown as the status code and err.Error() as the status message of the
  60  // RPC.
  61  type StreamHandler func(srv any, stream ServerStream) error
  62  
  63  // StreamDesc represents a streaming RPC service's method specification.  Used
  64  // on the server when registering services and on the client when initiating
  65  // new streams.
  66  type StreamDesc struct {
  67  	// StreamName and Handler are only used when registering handlers on a
  68  	// server.
  69  	StreamName string        // the name of the method excluding the service
  70  	Handler    StreamHandler // the handler called for the method
  71  
  72  	// ServerStreams and ClientStreams are used for registering handlers on a
  73  	// server as well as defining RPC behavior when passed to NewClientStream
  74  	// and ClientConn.NewStream.  At least one must be true.
  75  	ServerStreams bool // indicates the server can perform streaming sends
  76  	ClientStreams bool // indicates the client can perform streaming sends
  77  }
  78  
  79  // Stream defines the common interface a client or server stream has to satisfy.
  80  //
  81  // Deprecated: See ClientStream and ServerStream documentation instead.
  82  type Stream interface {
  83  	// Deprecated: See ClientStream and ServerStream documentation instead.
  84  	Context() context.Context
  85  	// Deprecated: See ClientStream and ServerStream documentation instead.
  86  	SendMsg(m any) error
  87  	// Deprecated: See ClientStream and ServerStream documentation instead.
  88  	RecvMsg(m any) error
  89  }
  90  
  91  // ClientStream defines the client-side behavior of a streaming RPC.
  92  //
  93  // All errors returned from ClientStream methods are compatible with the
  94  // status package.
  95  type ClientStream interface {
  96  	// Header returns the header metadata received from the server if there
  97  	// is any. It blocks if the metadata is not ready to read.  If the metadata
  98  	// is nil and the error is also nil, then the stream was terminated without
  99  	// headers, and the status can be discovered by calling RecvMsg.
 100  	Header() (metadata.MD, error)
 101  	// Trailer returns the trailer metadata from the server, if there is any.
 102  	// It must only be called after stream.CloseAndRecv has returned, or
 103  	// stream.Recv has returned a non-nil error (including io.EOF).
 104  	Trailer() metadata.MD
 105  	// CloseSend closes the send direction of the stream. This method always
 106  	// returns a nil error. The status of the stream may be discovered using
 107  	// RecvMsg. It is also not safe to call CloseSend concurrently with SendMsg.
 108  	CloseSend() error
 109  	// Context returns the context for this stream.
 110  	//
 111  	// It should not be called until after Header or RecvMsg has returned. Once
 112  	// called, subsequent client-side retries are disabled.
 113  	Context() context.Context
 114  	// SendMsg is generally called by generated code. On error, SendMsg aborts
 115  	// the stream. If the error was generated by the client, the status is
 116  	// returned directly; otherwise, io.EOF is returned and the status of
 117  	// the stream may be discovered using RecvMsg. For unary or server-streaming
 118  	// RPCs (StreamDesc.ClientStreams is false), a nil error is returned
 119  	// unconditionally.
 120  	//
 121  	// SendMsg blocks until:
 122  	//   - There is sufficient flow control to schedule m with the transport, or
 123  	//   - The stream is done, or
 124  	//   - The stream breaks.
 125  	//
 126  	// SendMsg does not wait until the message is received by the server. An
 127  	// untimely stream closure may result in lost messages. To ensure delivery,
 128  	// users should ensure the RPC completed successfully using RecvMsg.
 129  	//
 130  	// It is safe to have a goroutine calling SendMsg and another goroutine
 131  	// calling RecvMsg on the same stream at the same time, but it is not safe
 132  	// to call SendMsg on the same stream in different goroutines. It is also
 133  	// not safe to call CloseSend concurrently with SendMsg.
 134  	//
 135  	// It is not safe to modify the message after calling SendMsg. Tracing
 136  	// libraries and stats handlers may use the message lazily.
 137  	SendMsg(m any) error
 138  	// RecvMsg blocks until it receives a message into m or the stream is
 139  	// done. It returns io.EOF when the stream completes successfully. On
 140  	// any other error, the stream is aborted and the error contains the RPC
 141  	// status.
 142  	//
 143  	// It is safe to have a goroutine calling SendMsg and another goroutine
 144  	// calling RecvMsg on the same stream at the same time, but it is not
 145  	// safe to call RecvMsg on the same stream in different goroutines.
 146  	RecvMsg(m any) error
 147  }
 148  
 149  // NewStream creates a new Stream for the client side. This is typically
 150  // called by generated code. ctx is used for the lifetime of the stream.
 151  //
 152  // To ensure resources are not leaked due to the stream returned, one of the following
 153  // actions must be performed:
 154  //
 155  //  1. Call Close on the ClientConn.
 156  //  2. Cancel the context provided.
 157  //  3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
 158  //     client-streaming RPC, for instance, might use the helper function
 159  //     CloseAndRecv (note that CloseSend does not Recv, therefore is not
 160  //     guaranteed to release all resources).
 161  //  4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
 162  //
 163  // If none of the above happen, a goroutine and a context will be leaked, and grpc
 164  // will not call the optionally-configured stats handler with a stats.End message.
 165  func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
 166  	// allow interceptor to see all applicable call options, which means those
 167  	// configured as defaults from dial option as well as per-call options
 168  	opts = combine(cc.dopts.callOptions, opts)
 169  
 170  	if cc.dopts.streamInt != nil {
 171  		return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
 172  	}
 173  	return newClientStream(ctx, desc, cc, method, opts...)
 174  }
 175  
 176  // NewClientStream is a wrapper for ClientConn.NewStream.
 177  func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
 178  	return cc.NewStream(ctx, desc, method, opts...)
 179  }
 180  
 181  var emptyMethodConfig = serviceconfig.MethodConfig{}
 182  
 183  // endOfClientStream performs cleanup actions required for both successful and
 184  // failed streams. This includes incrementing channelz stats and invoking all
 185  // registered OnFinish call options.
 186  func endOfClientStream(cc *ClientConn, err error, opts ...CallOption) {
 187  	if channelz.IsOn() {
 188  		if err != nil {
 189  			cc.incrCallsFailed()
 190  		} else {
 191  			cc.incrCallsSucceeded()
 192  		}
 193  	}
 194  
 195  	for _, o := range opts {
 196  		if o, ok := o.(OnFinishCallOption); ok {
 197  			o.OnFinish(err)
 198  		}
 199  	}
 200  }
 201  
 202  func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
 203  	if channelz.IsOn() {
 204  		cc.incrCallsStarted()
 205  	}
 206  	defer func() {
 207  		if err != nil {
 208  			// Ensure cleanup when stream creation fails.
 209  			endOfClientStream(cc, err, opts...)
 210  		}
 211  	}()
 212  
 213  	// Start tracking the RPC for idleness purposes. This is where a stream is
 214  	// created for both streaming and unary RPCs, and hence is a good place to
 215  	// track active RPC count.
 216  	cc.idlenessMgr.OnCallBegin()
 217  
 218  	// Add a calloption, to decrement the active call count, that gets executed
 219  	// when the RPC completes.
 220  	opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
 221  
 222  	if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
 223  		// validate md
 224  		if err := imetadata.Validate(md); err != nil {
 225  			return nil, status.Error(codes.Internal, err.Error())
 226  		}
 227  		// validate added
 228  		for _, kvs := range added {
 229  			for i := 0; i < len(kvs); i += 2 {
 230  				if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
 231  					return nil, status.Error(codes.Internal, err.Error())
 232  				}
 233  			}
 234  		}
 235  	}
 236  	// Provide an opportunity for the first RPC to see the first service config
 237  	// provided by the resolver.
 238  	nameResolutionDelayed, err := cc.waitForResolvedAddrs(ctx)
 239  	if err != nil {
 240  		return nil, err
 241  	}
 242  
 243  	mc := &emptyMethodConfig
 244  	var onCommit func()
 245  	newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
 246  		return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, nameResolutionDelayed, opts...)
 247  	}
 248  
 249  	rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
 250  	rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
 251  	if err != nil {
 252  		if st, ok := status.FromError(err); ok {
 253  			// Restrict the code to the list allowed by gRFC A54.
 254  			if istatus.IsRestrictedControlPlaneCode(st) {
 255  				err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
 256  			}
 257  			return nil, err
 258  		}
 259  		return nil, toRPCErr(err)
 260  	}
 261  
 262  	if rpcConfig != nil {
 263  		if rpcConfig.Context != nil {
 264  			ctx = rpcConfig.Context
 265  		}
 266  		mc = &rpcConfig.MethodConfig
 267  		onCommit = rpcConfig.OnCommitted
 268  		if rpcConfig.Interceptor != nil {
 269  			rpcInfo.Context = nil
 270  			ns := newStream
 271  			newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
 272  				cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
 273  				if err != nil {
 274  					return nil, toRPCErr(err)
 275  				}
 276  				return cs, nil
 277  			}
 278  		}
 279  	}
 280  
 281  	return newStream(ctx, func() {})
 282  }
 283  
 284  func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc *serviceconfig.MethodConfig, onCommit, doneFunc func(), nameResolutionDelayed bool, opts ...CallOption) (_ iresolver.ClientStream, err error) {
 285  	callInfo := defaultCallInfo()
 286  	if mc.WaitForReady != nil {
 287  		callInfo.failFast = !*mc.WaitForReady
 288  	}
 289  
 290  	// Possible context leak:
 291  	// The cancel function for the child context we create will only be called
 292  	// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
 293  	// an error is generated by SendMsg.
 294  	// https://github.com/grpc/grpc-go/issues/1818.
 295  	var cancel context.CancelFunc
 296  	if mc.Timeout != nil && *mc.Timeout >= 0 {
 297  		ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
 298  	} else {
 299  		ctx, cancel = context.WithCancel(ctx)
 300  	}
 301  	defer func() {
 302  		if err != nil {
 303  			cancel()
 304  		}
 305  	}()
 306  
 307  	for _, o := range opts {
 308  		if err := o.before(callInfo); err != nil {
 309  			return nil, toRPCErr(err)
 310  		}
 311  	}
 312  	callInfo.maxSendMessageSize = getMaxSize(mc.MaxReqSize, callInfo.maxSendMessageSize, defaultClientMaxSendMessageSize)
 313  	callInfo.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, callInfo.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
 314  	if err := setCallInfoCodec(callInfo); err != nil {
 315  		return nil, err
 316  	}
 317  
 318  	callHdr := &transport.CallHdr{
 319  		Host:           cc.authority,
 320  		Method:         method,
 321  		ContentSubtype: callInfo.contentSubtype,
 322  		DoneFunc:       doneFunc,
 323  		Authority:      callInfo.authority,
 324  	}
 325  	if allowed := callInfo.acceptedResponseCompressors; len(allowed) > 0 {
 326  		headerValue := strings.Join(allowed, ",")
 327  		callHdr.AcceptedCompressors = &headerValue
 328  	}
 329  
 330  	// Set our outgoing compression according to the UseCompressor CallOption, if
 331  	// set.  In that case, also find the compressor from the encoding package.
 332  	// Otherwise, use the compressor configured by the WithCompressor DialOption,
 333  	// if set.
 334  	var compressorV0 Compressor
 335  	var compressorV1 encoding.Compressor
 336  	if ct := callInfo.compressorName; ct != "" {
 337  		callHdr.SendCompress = ct
 338  		if ct != encoding.Identity {
 339  			compressorV1 = encoding.GetCompressor(ct)
 340  			if compressorV1 == nil {
 341  				return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
 342  			}
 343  		}
 344  	} else if cc.dopts.compressorV0 != nil {
 345  		callHdr.SendCompress = cc.dopts.compressorV0.Type()
 346  		compressorV0 = cc.dopts.compressorV0
 347  	}
 348  	if callInfo.creds != nil {
 349  		callHdr.Creds = callInfo.creds
 350  	}
 351  
 352  	cs := &clientStream{
 353  		callHdr:             callHdr,
 354  		ctx:                 ctx,
 355  		methodConfig:        mc,
 356  		opts:                opts,
 357  		callInfo:            callInfo,
 358  		cc:                  cc,
 359  		desc:                desc,
 360  		codec:               callInfo.codec,
 361  		compressorV0:        compressorV0,
 362  		compressorV1:        compressorV1,
 363  		cancel:              cancel,
 364  		firstAttempt:        true,
 365  		onCommit:            onCommit,
 366  		nameResolutionDelay: nameResolutionDelayed,
 367  	}
 368  	if !cc.dopts.disableRetry {
 369  		cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
 370  	}
 371  	if ml := binarylog.GetMethodLogger(method); ml != nil {
 372  		cs.binlogs = append(cs.binlogs, ml)
 373  	}
 374  	if cc.dopts.binaryLogger != nil {
 375  		if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
 376  			cs.binlogs = append(cs.binlogs, ml)
 377  		}
 378  	}
 379  
 380  	// Pick the transport to use and create a new stream on the transport.
 381  	// Assign cs.attempt upon success.
 382  	op := func(a *csAttempt) error {
 383  		if err := a.getTransport(); err != nil {
 384  			return err
 385  		}
 386  		if err := a.newStream(); err != nil {
 387  			return err
 388  		}
 389  		// Because this operation is always called either here (while creating
 390  		// the clientStream) or by the retry code while locked when replaying
 391  		// the operation, it is safe to access cs.attempt directly.
 392  		cs.attempt = a
 393  		return nil
 394  	}
 395  	if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil {
 396  		return nil, err
 397  	}
 398  
 399  	if len(cs.binlogs) != 0 {
 400  		md, _ := metadata.FromOutgoingContext(ctx)
 401  		logEntry := &binarylog.ClientHeader{
 402  			OnClientSide: true,
 403  			Header:       md,
 404  			MethodName:   method,
 405  			Authority:    cs.cc.authority,
 406  		}
 407  		if deadline, ok := ctx.Deadline(); ok {
 408  			logEntry.Timeout = time.Until(deadline)
 409  			if logEntry.Timeout < 0 {
 410  				logEntry.Timeout = 0
 411  			}
 412  		}
 413  		for _, binlog := range cs.binlogs {
 414  			binlog.Log(cs.ctx, logEntry)
 415  		}
 416  	}
 417  
 418  	if desc != unaryStreamDesc {
 419  		// Listen on cc and stream contexts to cleanup when the user closes the
 420  		// ClientConn or cancels the stream context.  In all other cases, an error
 421  		// should already be injected into the recv buffer by the transport, which
 422  		// the client will eventually receive, and then we will cancel the stream's
 423  		// context in clientStream.finish.
 424  		go func() {
 425  			select {
 426  			case <-cc.ctx.Done():
 427  				cs.finish(ErrClientConnClosing)
 428  			case <-ctx.Done():
 429  				cs.finish(toRPCErr(ctx.Err()))
 430  			}
 431  		}()
 432  	}
 433  	return cs, nil
 434  }
 435  
 436  // newAttemptLocked creates a new csAttempt without a transport or stream.
 437  func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
 438  	if err := cs.ctx.Err(); err != nil {
 439  		return nil, toRPCErr(err)
 440  	}
 441  	if err := cs.cc.ctx.Err(); err != nil {
 442  		return nil, ErrClientConnClosing
 443  	}
 444  
 445  	ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.compressorV0, cs.compressorV1)
 446  	method := cs.callHdr.Method
 447  	var beginTime time.Time
 448  	sh := cs.cc.statsHandler
 449  	if sh != nil {
 450  		beginTime = time.Now()
 451  		ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{
 452  			FullMethodName: method, FailFast: cs.callInfo.failFast,
 453  			NameResolutionDelay: cs.nameResolutionDelay,
 454  		})
 455  		sh.HandleRPC(ctx, &stats.Begin{
 456  			Client:                    true,
 457  			BeginTime:                 beginTime,
 458  			FailFast:                  cs.callInfo.failFast,
 459  			IsClientStream:            cs.desc.ClientStreams,
 460  			IsServerStream:            cs.desc.ServerStreams,
 461  			IsTransparentRetryAttempt: isTransparent,
 462  		})
 463  	}
 464  
 465  	var trInfo *traceInfo
 466  	if EnableTracing {
 467  		trInfo = &traceInfo{
 468  			tr: newTrace("grpc.Sent."+methodFamily(method), method),
 469  			firstLine: firstLine{
 470  				client: true,
 471  			},
 472  		}
 473  		if deadline, ok := ctx.Deadline(); ok {
 474  			trInfo.firstLine.deadline = time.Until(deadline)
 475  		}
 476  		trInfo.tr.LazyLog(&trInfo.firstLine, false)
 477  		ctx = newTraceContext(ctx, trInfo.tr)
 478  	}
 479  
 480  	if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata {
 481  		// Add extra metadata (metadata that will be added by transport) to context
 482  		// so the balancer can see them.
 483  		ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
 484  			"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
 485  		))
 486  	}
 487  
 488  	return &csAttempt{
 489  		ctx:            ctx,
 490  		beginTime:      beginTime,
 491  		cs:             cs,
 492  		decompressorV0: cs.cc.dopts.dc,
 493  		statsHandler:   sh,
 494  		trInfo:         trInfo,
 495  	}, nil
 496  }
 497  
 498  func (a *csAttempt) getTransport() error {
 499  	cs := a.cs
 500  
 501  	pickInfo := balancer.PickInfo{Ctx: a.ctx, FullMethodName: cs.callHdr.Method}
 502  	pick, err := cs.cc.pickerWrapper.pick(a.ctx, cs.callInfo.failFast, pickInfo)
 503  	a.transport, a.pickResult = pick.transport, pick.result
 504  	if err != nil {
 505  		if de, ok := err.(dropError); ok {
 506  			err = de.error
 507  			a.drop = true
 508  		}
 509  		return err
 510  	}
 511  	if a.trInfo != nil {
 512  		a.trInfo.firstLine.SetRemoteAddr(a.transport.Peer().Addr)
 513  	}
 514  	if pick.blocked && a.statsHandler != nil {
 515  		a.statsHandler.HandleRPC(a.ctx, &stats.DelayedPickComplete{})
 516  	}
 517  	return nil
 518  }
 519  
 520  func (a *csAttempt) newStream() error {
 521  	cs := a.cs
 522  	cs.callHdr.PreviousAttempts = cs.numRetries
 523  
 524  	// Merge metadata stored in PickResult, if any, with existing call metadata.
 525  	// It is safe to overwrite the csAttempt's context here, since all state
 526  	// maintained in it are local to the attempt. When the attempt has to be
 527  	// retried, a new instance of csAttempt will be created.
 528  	if a.pickResult.Metadata != nil {
 529  		// We currently do not have a function it the metadata package which
 530  		// merges given metadata with existing metadata in a context. Existing
 531  		// function `AppendToOutgoingContext()` takes a variadic argument of key
 532  		// value pairs.
 533  		//
 534  		// TODO: Make it possible to retrieve key value pairs from metadata.MD
 535  		// in a form passable to AppendToOutgoingContext(), or create a version
 536  		// of AppendToOutgoingContext() that accepts a metadata.MD.
 537  		md, _ := metadata.FromOutgoingContext(a.ctx)
 538  		md = metadata.Join(md, a.pickResult.Metadata)
 539  		a.ctx = metadata.NewOutgoingContext(a.ctx, md)
 540  	}
 541  
 542  	s, err := a.transport.NewStream(a.ctx, cs.callHdr)
 543  	if err != nil {
 544  		nse, ok := err.(*transport.NewStreamError)
 545  		if !ok {
 546  			// Unexpected.
 547  			return err
 548  		}
 549  
 550  		if nse.AllowTransparentRetry {
 551  			a.allowTransparentRetry = true
 552  		}
 553  
 554  		// Unwrap and convert error.
 555  		return toRPCErr(nse.Err)
 556  	}
 557  	a.transportStream = s
 558  	a.ctx = s.Context()
 559  	a.parser = parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool}
 560  	return nil
 561  }
 562  
 563  // clientStream implements a client side Stream.
 564  type clientStream struct {
 565  	callHdr  *transport.CallHdr
 566  	opts     []CallOption
 567  	callInfo *callInfo
 568  	cc       *ClientConn
 569  	desc     *StreamDesc
 570  
 571  	codec        baseCodec
 572  	compressorV0 Compressor
 573  	compressorV1 encoding.Compressor
 574  
 575  	cancel context.CancelFunc // cancels all attempts
 576  
 577  	sentLast bool // sent an end stream
 578  
 579  	receivedFirstMsg bool // set after the first message is received
 580  
 581  	methodConfig *MethodConfig
 582  
 583  	ctx context.Context // the application's context, wrapped by stats/tracing
 584  
 585  	retryThrottler *retryThrottler // The throttler active when the RPC began.
 586  
 587  	binlogs []binarylog.MethodLogger
 588  	// serverHeaderBinlogged is a boolean for whether server header has been
 589  	// logged. Server header will be logged when the first time one of those
 590  	// happens: stream.Header(), stream.Recv().
 591  	//
 592  	// It's only read and used by Recv() and Header(), so it doesn't need to be
 593  	// synchronized.
 594  	serverHeaderBinlogged bool
 595  
 596  	mu                      sync.Mutex
 597  	firstAttempt            bool // if true, transparent retry is valid
 598  	numRetries              int  // exclusive of transparent retry attempt(s)
 599  	numRetriesSincePushback int  // retries since pushback; to reset backoff
 600  	finished                bool // TODO: replace with atomic cmpxchg or sync.Once?
 601  	// attempt is the active client stream attempt.
 602  	// The only place where it is written is the newAttemptLocked method and this method never writes nil.
 603  	// So, attempt can be nil only inside newClientStream function when clientStream is first created.
 604  	// One of the first things done after clientStream's creation, is to call newAttemptLocked which either
 605  	// assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
 606  	// then newClientStream calls finish on the clientStream and returns. So, finish method is the only
 607  	// place where we need to check if the attempt is nil.
 608  	attempt *csAttempt
 609  	// TODO(hedging): hedging will have multiple attempts simultaneously.
 610  	committed        bool // active attempt committed for retry?
 611  	onCommit         func()
 612  	replayBuffer     []replayOp // operations to replay on retry
 613  	replayBufferSize int        // current size of replayBuffer
 614  	// nameResolutionDelay indicates if there was a delay in the name resolution.
 615  	// This field is only valid on client side, it's always false on server side.
 616  	nameResolutionDelay bool
 617  }
 618  
 619  type replayOp struct {
 620  	op      func(a *csAttempt) error
 621  	cleanup func()
 622  }
 623  
 624  // csAttempt implements a single transport stream attempt within a
 625  // clientStream.
 626  type csAttempt struct {
 627  	ctx             context.Context
 628  	cs              *clientStream
 629  	transport       transport.ClientTransport
 630  	transportStream *transport.ClientStream
 631  	parser          parser
 632  	pickResult      balancer.PickResult
 633  
 634  	finished        bool
 635  	decompressorV0  Decompressor
 636  	decompressorV1  encoding.Compressor
 637  	decompressorSet bool
 638  
 639  	mu sync.Mutex // guards trInfo.tr
 640  	// trInfo may be nil (if EnableTracing is false).
 641  	// trInfo.tr is set when created (if EnableTracing is true),
 642  	// and cleared when the finish method is called.
 643  	trInfo *traceInfo
 644  
 645  	statsHandler stats.Handler
 646  	beginTime    time.Time
 647  
 648  	// set for newStream errors that may be transparently retried
 649  	allowTransparentRetry bool
 650  	// set for pick errors that are returned as a status
 651  	drop bool
 652  }
 653  
 654  func (cs *clientStream) commitAttemptLocked() {
 655  	if !cs.committed && cs.onCommit != nil {
 656  		cs.onCommit()
 657  	}
 658  	cs.committed = true
 659  	for _, op := range cs.replayBuffer {
 660  		if op.cleanup != nil {
 661  			op.cleanup()
 662  		}
 663  	}
 664  	cs.replayBuffer = nil
 665  }
 666  
 667  func (cs *clientStream) commitAttempt() {
 668  	cs.mu.Lock()
 669  	cs.commitAttemptLocked()
 670  	cs.mu.Unlock()
 671  }
 672  
 673  // shouldRetry returns nil if the RPC should be retried; otherwise it returns
 674  // the error that should be returned by the operation.  If the RPC should be
 675  // retried, the bool indicates whether it is being retried transparently.
 676  func (a *csAttempt) shouldRetry(err error) (bool, error) {
 677  	cs := a.cs
 678  
 679  	if cs.finished || cs.committed || a.drop {
 680  		// RPC is finished or committed or was dropped by the picker; cannot retry.
 681  		return false, err
 682  	}
 683  	if a.transportStream == nil && a.allowTransparentRetry {
 684  		return true, nil
 685  	}
 686  	// Wait for the trailers.
 687  	unprocessed := false
 688  	if a.transportStream != nil {
 689  		<-a.transportStream.Done()
 690  		unprocessed = a.transportStream.Unprocessed()
 691  	}
 692  	if cs.firstAttempt && unprocessed {
 693  		// First attempt, stream unprocessed: transparently retry.
 694  		return true, nil
 695  	}
 696  	if cs.cc.dopts.disableRetry {
 697  		return false, err
 698  	}
 699  
 700  	pushback := 0
 701  	hasPushback := false
 702  	if a.transportStream != nil {
 703  		if !a.transportStream.TrailersOnly() {
 704  			return false, err
 705  		}
 706  
 707  		// TODO(retry): Move down if the spec changes to not check server pushback
 708  		// before considering this a failure for throttling.
 709  		sps := a.transportStream.Trailer()["grpc-retry-pushback-ms"]
 710  		if len(sps) == 1 {
 711  			var e error
 712  			if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
 713  				channelz.Infof(logger, cs.cc.channelz, "Server retry pushback specified to abort (%q).", sps[0])
 714  				cs.retryThrottler.throttle() // This counts as a failure for throttling.
 715  				return false, err
 716  			}
 717  			hasPushback = true
 718  		} else if len(sps) > 1 {
 719  			channelz.Warningf(logger, cs.cc.channelz, "Server retry pushback specified multiple values (%q); not retrying.", sps)
 720  			cs.retryThrottler.throttle() // This counts as a failure for throttling.
 721  			return false, err
 722  		}
 723  	}
 724  
 725  	var code codes.Code
 726  	if a.transportStream != nil {
 727  		code = a.transportStream.Status().Code()
 728  	} else {
 729  		code = status.Code(err)
 730  	}
 731  
 732  	rp := cs.methodConfig.RetryPolicy
 733  	if rp == nil || !rp.RetryableStatusCodes[code] {
 734  		return false, err
 735  	}
 736  
 737  	// Note: the ordering here is important; we count this as a failure
 738  	// only if the code matched a retryable code.
 739  	if cs.retryThrottler.throttle() {
 740  		return false, err
 741  	}
 742  	if cs.numRetries+1 >= rp.MaxAttempts {
 743  		return false, err
 744  	}
 745  
 746  	var dur time.Duration
 747  	if hasPushback {
 748  		dur = time.Millisecond * time.Duration(pushback)
 749  		cs.numRetriesSincePushback = 0
 750  	} else {
 751  		fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
 752  		cur := min(float64(rp.InitialBackoff)*fact, float64(rp.MaxBackoff))
 753  		// Apply jitter by multiplying with a random factor between 0.8 and 1.2
 754  		cur *= 0.8 + 0.4*rand.Float64()
 755  		dur = time.Duration(int64(cur))
 756  		cs.numRetriesSincePushback++
 757  	}
 758  
 759  	// TODO(dfawley): we could eagerly fail here if dur puts us past the
 760  	// deadline, but unsure if it is worth doing.
 761  	t := time.NewTimer(dur)
 762  	select {
 763  	case <-t.C:
 764  		cs.numRetries++
 765  		return false, nil
 766  	case <-cs.ctx.Done():
 767  		t.Stop()
 768  		return false, status.FromContextError(cs.ctx.Err()).Err()
 769  	}
 770  }
 771  
 772  // Returns nil if a retry was performed and succeeded; error otherwise.
 773  func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
 774  	for {
 775  		attempt.finish(toRPCErr(lastErr))
 776  		isTransparent, err := attempt.shouldRetry(lastErr)
 777  		if err != nil {
 778  			cs.commitAttemptLocked()
 779  			return err
 780  		}
 781  		cs.firstAttempt = false
 782  		attempt, err = cs.newAttemptLocked(isTransparent)
 783  		if err != nil {
 784  			// Only returns error if the clientconn is closed or the context of
 785  			// the stream is canceled.
 786  			return err
 787  		}
 788  		// Note that the first op in replayBuffer always sets cs.attempt
 789  		// if it is able to pick a transport and create a stream.
 790  		if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
 791  			return nil
 792  		}
 793  	}
 794  }
 795  
 796  func (cs *clientStream) Context() context.Context {
 797  	cs.commitAttempt()
 798  	// No need to lock before using attempt, since we know it is committed and
 799  	// cannot change.
 800  	if cs.attempt.transportStream != nil {
 801  		return cs.attempt.transportStream.Context()
 802  	}
 803  	return cs.ctx
 804  }
 805  
 806  func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
 807  	cs.mu.Lock()
 808  	for {
 809  		if cs.committed {
 810  			cs.mu.Unlock()
 811  			// toRPCErr is used in case the error from the attempt comes from
 812  			// NewClientStream, which intentionally doesn't return a status
 813  			// error to allow for further inspection; all other errors should
 814  			// already be status errors.
 815  			return toRPCErr(op(cs.attempt))
 816  		}
 817  		if len(cs.replayBuffer) == 0 {
 818  			// For the first op, which controls creation of the stream and
 819  			// assigns cs.attempt, we need to create a new attempt inline
 820  			// before executing the first op.  On subsequent ops, the attempt
 821  			// is created immediately before replaying the ops.
 822  			var err error
 823  			if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
 824  				cs.mu.Unlock()
 825  				cs.finish(err)
 826  				return err
 827  			}
 828  		}
 829  		a := cs.attempt
 830  		cs.mu.Unlock()
 831  		err := op(a)
 832  		cs.mu.Lock()
 833  		if a != cs.attempt {
 834  			// We started another attempt already.
 835  			continue
 836  		}
 837  		if err == io.EOF {
 838  			<-a.transportStream.Done()
 839  		}
 840  		if err == nil || (err == io.EOF && a.transportStream.Status().Code() == codes.OK) {
 841  			onSuccess()
 842  			cs.mu.Unlock()
 843  			return err
 844  		}
 845  		if err := cs.retryLocked(a, err); err != nil {
 846  			cs.mu.Unlock()
 847  			return err
 848  		}
 849  	}
 850  }
 851  
 852  func (cs *clientStream) Header() (metadata.MD, error) {
 853  	var m metadata.MD
 854  	err := cs.withRetry(func(a *csAttempt) error {
 855  		var err error
 856  		m, err = a.transportStream.Header()
 857  		return toRPCErr(err)
 858  	}, cs.commitAttemptLocked)
 859  
 860  	if m == nil && err == nil {
 861  		// The stream ended with success.  Finish the clientStream.
 862  		err = io.EOF
 863  	}
 864  
 865  	if err != nil {
 866  		cs.finish(err)
 867  		// Do not return the error.  The user should get it by calling Recv().
 868  		return nil, nil
 869  	}
 870  
 871  	if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && m != nil {
 872  		// Only log if binary log is on and header has not been logged, and
 873  		// there is actually headers to log.
 874  		logEntry := &binarylog.ServerHeader{
 875  			OnClientSide: true,
 876  			Header:       m,
 877  			PeerAddr:     nil,
 878  		}
 879  		if peer, ok := peer.FromContext(cs.Context()); ok {
 880  			logEntry.PeerAddr = peer.Addr
 881  		}
 882  		cs.serverHeaderBinlogged = true
 883  		for _, binlog := range cs.binlogs {
 884  			binlog.Log(cs.ctx, logEntry)
 885  		}
 886  	}
 887  
 888  	return m, nil
 889  }
 890  
 891  func (cs *clientStream) Trailer() metadata.MD {
 892  	// On RPC failure, we never need to retry, because usage requires that
 893  	// RecvMsg() returned a non-nil error before calling this function is valid.
 894  	// We would have retried earlier if necessary.
 895  	//
 896  	// Commit the attempt anyway, just in case users are not following those
 897  	// directions -- it will prevent races and should not meaningfully impact
 898  	// performance.
 899  	cs.commitAttempt()
 900  	if cs.attempt.transportStream == nil {
 901  		return nil
 902  	}
 903  	return cs.attempt.transportStream.Trailer()
 904  }
 905  
 906  func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
 907  	for _, f := range cs.replayBuffer {
 908  		if err := f.op(attempt); err != nil {
 909  			return err
 910  		}
 911  	}
 912  	return nil
 913  }
 914  
 915  func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error, cleanup func()) {
 916  	// Note: we still will buffer if retry is disabled (for transparent retries).
 917  	if cs.committed {
 918  		return
 919  	}
 920  	cs.replayBufferSize += sz
 921  	if cs.replayBufferSize > cs.callInfo.maxRetryRPCBufferSize {
 922  		cs.commitAttemptLocked()
 923  		cleanup()
 924  		return
 925  	}
 926  	cs.replayBuffer = append(cs.replayBuffer, replayOp{op: op, cleanup: cleanup})
 927  }
 928  
 929  func (cs *clientStream) SendMsg(m any) (err error) {
 930  	defer func() {
 931  		if err != nil && err != io.EOF {
 932  			// Call finish on the client stream for errors generated by this SendMsg
 933  			// call, as these indicate problems created by this client.  (Transport
 934  			// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
 935  			// error will be returned from RecvMsg eventually in that case, or be
 936  			// retried.)
 937  			cs.finish(err)
 938  		}
 939  	}()
 940  	if cs.sentLast {
 941  		return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
 942  	}
 943  	if !cs.desc.ClientStreams {
 944  		cs.sentLast = true
 945  	}
 946  
 947  	// load hdr, payload, data
 948  	hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.compressorV0, cs.compressorV1, cs.cc.dopts.copts.BufferPool)
 949  	if err != nil {
 950  		return err
 951  	}
 952  
 953  	defer func() {
 954  		data.Free()
 955  		// only free payload if compression was made, and therefore it is a different set
 956  		// of buffers from data.
 957  		if pf.isCompressed() {
 958  			payload.Free()
 959  		}
 960  	}()
 961  
 962  	dataLen := data.Len()
 963  	payloadLen := payload.Len()
 964  	// TODO(dfawley): should we be checking len(data) instead?
 965  	if payloadLen > *cs.callInfo.maxSendMessageSize {
 966  		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, *cs.callInfo.maxSendMessageSize)
 967  	}
 968  
 969  	// always take an extra ref in case data == payload (i.e. when the data isn't
 970  	// compressed). The original ref will always be freed by the deferred free above.
 971  	payload.Ref()
 972  	op := func(a *csAttempt) error {
 973  		return a.sendMsg(m, hdr, payload, dataLen, payloadLen)
 974  	}
 975  
 976  	// onSuccess is invoked when the op is captured for a subsequent retry. If the
 977  	// stream was established by a previous message and therefore retries are
 978  	// disabled, onSuccess will not be invoked, and payloadRef can be freed
 979  	// immediately.
 980  	onSuccessCalled := false
 981  	err = cs.withRetry(op, func() {
 982  		cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free)
 983  		onSuccessCalled = true
 984  	})
 985  	if !onSuccessCalled {
 986  		payload.Free()
 987  	}
 988  	if len(cs.binlogs) != 0 && err == nil {
 989  		cm := &binarylog.ClientMessage{
 990  			OnClientSide: true,
 991  			Message:      data.Materialize(),
 992  		}
 993  		for _, binlog := range cs.binlogs {
 994  			binlog.Log(cs.ctx, cm)
 995  		}
 996  	}
 997  	return err
 998  }
 999  
1000  func (cs *clientStream) RecvMsg(m any) error {
1001  	if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
1002  		// Call Header() to binary log header if it's not already logged.
1003  		cs.Header()
1004  	}
1005  	var recvInfo *payloadInfo
1006  	if len(cs.binlogs) != 0 {
1007  		recvInfo = &payloadInfo{}
1008  		defer recvInfo.free()
1009  	}
1010  	err := cs.withRetry(func(a *csAttempt) error {
1011  		return a.recvMsg(m, recvInfo)
1012  	}, cs.commitAttemptLocked)
1013  	if len(cs.binlogs) != 0 && err == nil {
1014  		sm := &binarylog.ServerMessage{
1015  			OnClientSide: true,
1016  			Message:      recvInfo.uncompressedBytes.Materialize(),
1017  		}
1018  		for _, binlog := range cs.binlogs {
1019  			binlog.Log(cs.ctx, sm)
1020  		}
1021  	}
1022  	if err != nil || !cs.desc.ServerStreams {
1023  		// err != nil or non-server-streaming indicates end of stream.
1024  		cs.finish(err)
1025  	}
1026  	return err
1027  }
1028  
1029  func (cs *clientStream) CloseSend() error {
1030  	if cs.sentLast {
1031  		// Return a nil error on repeated calls to this method.
1032  		return nil
1033  	}
1034  	cs.sentLast = true
1035  	op := func(a *csAttempt) error {
1036  		a.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
1037  		// Always return nil; io.EOF is the only error that might make sense
1038  		// instead, but there is no need to signal the client to call RecvMsg
1039  		// as the only use left for the stream after CloseSend is to call
1040  		// RecvMsg.  This also matches historical behavior.
1041  		return nil
1042  	}
1043  	cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) })
1044  	if len(cs.binlogs) != 0 {
1045  		chc := &binarylog.ClientHalfClose{
1046  			OnClientSide: true,
1047  		}
1048  		for _, binlog := range cs.binlogs {
1049  			binlog.Log(cs.ctx, chc)
1050  		}
1051  	}
1052  	// We don't return an error here as we expect users to read all messages
1053  	// from the stream and get the RPC status from RecvMsg().  Note that
1054  	// SendMsg() must return an error when one occurs so the application
1055  	// knows to stop sending messages, but that does not apply here.
1056  	return nil
1057  }
1058  
1059  func (cs *clientStream) finish(err error) {
1060  	if err == io.EOF {
1061  		// Ending a stream with EOF indicates a success.
1062  		err = nil
1063  	}
1064  	cs.mu.Lock()
1065  	if cs.finished {
1066  		cs.mu.Unlock()
1067  		return
1068  	}
1069  	cs.finished = true
1070  	cs.commitAttemptLocked()
1071  	if cs.attempt != nil {
1072  		cs.attempt.finish(err)
1073  		// after functions all rely upon having a stream.
1074  		if cs.attempt.transportStream != nil {
1075  			for _, o := range cs.opts {
1076  				o.after(cs.callInfo, cs.attempt)
1077  			}
1078  		}
1079  	}
1080  
1081  	cs.mu.Unlock()
1082  	// Only one of cancel or trailer needs to be logged.
1083  	if len(cs.binlogs) != 0 {
1084  		switch err {
1085  		case errContextCanceled, errContextDeadline, ErrClientConnClosing:
1086  			c := &binarylog.Cancel{
1087  				OnClientSide: true,
1088  			}
1089  			for _, binlog := range cs.binlogs {
1090  				binlog.Log(cs.ctx, c)
1091  			}
1092  		default:
1093  			logEntry := &binarylog.ServerTrailer{
1094  				OnClientSide: true,
1095  				Trailer:      cs.Trailer(),
1096  				Err:          err,
1097  			}
1098  			if peer, ok := peer.FromContext(cs.Context()); ok {
1099  				logEntry.PeerAddr = peer.Addr
1100  			}
1101  			for _, binlog := range cs.binlogs {
1102  				binlog.Log(cs.ctx, logEntry)
1103  			}
1104  		}
1105  	}
1106  	if err == nil {
1107  		cs.retryThrottler.successfulRPC()
1108  	}
1109  	endOfClientStream(cs.cc, err, cs.opts...)
1110  	cs.cancel()
1111  }
1112  
1113  func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength, payloadLength int) error {
1114  	cs := a.cs
1115  	if a.trInfo != nil {
1116  		a.mu.Lock()
1117  		if a.trInfo.tr != nil {
1118  			a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1119  		}
1120  		a.mu.Unlock()
1121  	}
1122  	if err := a.transportStream.Write(hdr, payld, &transport.WriteOptions{Last: !cs.desc.ClientStreams}); err != nil {
1123  		if !cs.desc.ClientStreams {
1124  			// For non-client-streaming RPCs, we return nil instead of EOF on error
1125  			// because the generated code requires it.  finish is not called; RecvMsg()
1126  			// will call it with the stream's status independently.
1127  			return nil
1128  		}
1129  		return io.EOF
1130  	}
1131  	if a.statsHandler != nil {
1132  		a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, dataLength, payloadLength, time.Now()))
1133  	}
1134  	return nil
1135  }
1136  
1137  func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
1138  	cs := a.cs
1139  	if a.statsHandler != nil && payInfo == nil {
1140  		payInfo = &payloadInfo{}
1141  		defer payInfo.free()
1142  	}
1143  
1144  	if !a.decompressorSet {
1145  		// Block until we receive headers containing received message encoding.
1146  		if ct := a.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1147  			if a.decompressorV0 == nil || a.decompressorV0.Type() != ct {
1148  				// No configured decompressor, or it does not match the incoming
1149  				// message encoding; attempt to find a registered compressor that does.
1150  				a.decompressorV0 = nil
1151  				a.decompressorV1 = encoding.GetCompressor(ct)
1152  			}
1153  			// Validate that the compression method is acceptable for this call.
1154  			if !acceptedCompressorAllows(cs.callInfo.acceptedResponseCompressors, ct) {
1155  				return status.Errorf(codes.Internal, "grpc: peer compressed the response with %q which is not allowed by AcceptCompressors", ct)
1156  			}
1157  		} else {
1158  			// No compression is used; disable our decompressor.
1159  			a.decompressorV0 = nil
1160  		}
1161  		// Only initialize this state once per stream.
1162  		a.decompressorSet = true
1163  	}
1164  	if err := recv(&a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decompressorV1, false); err != nil {
1165  		if err == io.EOF {
1166  			if statusErr := a.transportStream.Status().Err(); statusErr != nil {
1167  				return statusErr
1168  			}
1169  			// Received no msg and status OK for non-server streaming rpcs.
1170  			if !cs.desc.ServerStreams && !cs.receivedFirstMsg {
1171  				return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
1172  			}
1173  			return io.EOF // indicates successful end of stream.
1174  		}
1175  
1176  		return toRPCErr(err)
1177  	}
1178  	cs.receivedFirstMsg = true
1179  	if a.trInfo != nil {
1180  		a.mu.Lock()
1181  		if a.trInfo.tr != nil {
1182  			a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1183  		}
1184  		a.mu.Unlock()
1185  	}
1186  	if a.statsHandler != nil {
1187  		a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
1188  			Client:           true,
1189  			RecvTime:         time.Now(),
1190  			Payload:          m,
1191  			WireLength:       payInfo.compressedLength + headerLen,
1192  			CompressedLength: payInfo.compressedLength,
1193  			Length:           payInfo.uncompressedBytes.Len(),
1194  		})
1195  	}
1196  	if cs.desc.ServerStreams {
1197  		// Subsequent messages should be received by subsequent RecvMsg calls.
1198  		return nil
1199  	}
1200  	// Special handling for non-server-stream rpcs.
1201  	// This recv expects EOF or errors, so we don't collect inPayload.
1202  	if err := recv(&a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF {
1203  		return a.transportStream.Status().Err() // non-server streaming Recv returns nil on success
1204  	} else if err != nil {
1205  		return toRPCErr(err)
1206  	}
1207  	return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
1208  }
1209  
1210  func (a *csAttempt) finish(err error) {
1211  	a.mu.Lock()
1212  	if a.finished {
1213  		a.mu.Unlock()
1214  		return
1215  	}
1216  	a.finished = true
1217  	if err == io.EOF {
1218  		// Ending a stream with EOF indicates a success.
1219  		err = nil
1220  	}
1221  	var tr metadata.MD
1222  	if a.transportStream != nil {
1223  		a.transportStream.Close(err)
1224  		tr = a.transportStream.Trailer()
1225  	}
1226  
1227  	if a.pickResult.Done != nil {
1228  		br := false
1229  		if a.transportStream != nil {
1230  			br = a.transportStream.BytesReceived()
1231  		}
1232  		a.pickResult.Done(balancer.DoneInfo{
1233  			Err:           err,
1234  			Trailer:       tr,
1235  			BytesSent:     a.transportStream != nil,
1236  			BytesReceived: br,
1237  			ServerLoad:    balancerload.Parse(tr),
1238  		})
1239  	}
1240  	if a.statsHandler != nil {
1241  		a.statsHandler.HandleRPC(a.ctx, &stats.End{
1242  			Client:    true,
1243  			BeginTime: a.beginTime,
1244  			EndTime:   time.Now(),
1245  			Trailer:   tr,
1246  			Error:     err,
1247  		})
1248  	}
1249  	if a.trInfo != nil && a.trInfo.tr != nil {
1250  		if err == nil {
1251  			a.trInfo.tr.LazyPrintf("RPC: [OK]")
1252  		} else {
1253  			a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
1254  			a.trInfo.tr.SetError()
1255  		}
1256  		a.trInfo.tr.Finish()
1257  		a.trInfo.tr = nil
1258  	}
1259  	a.mu.Unlock()
1260  }
1261  
1262  // newNonRetryClientStream creates a ClientStream with the specified transport, on the
1263  // given addrConn.
1264  //
1265  // It's expected that the given transport is either the same one in addrConn, or
1266  // is already closed. To avoid race, transport is specified separately, instead
1267  // of using ac.transport.
1268  //
1269  // Main difference between this and ClientConn.NewStream:
1270  // - no retry
1271  // - no service config (or wait for service config)
1272  // - no tracing or stats
1273  func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
1274  	if t == nil {
1275  		// TODO: return RPC error here?
1276  		return nil, errors.New("transport provided is nil")
1277  	}
1278  	// defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
1279  	c := &callInfo{}
1280  
1281  	// Possible context leak:
1282  	// The cancel function for the child context we create will only be called
1283  	// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
1284  	// an error is generated by SendMsg.
1285  	// https://github.com/grpc/grpc-go/issues/1818.
1286  	ctx, cancel := context.WithCancel(ctx)
1287  	defer func() {
1288  		if err != nil {
1289  			cancel()
1290  		}
1291  	}()
1292  
1293  	for _, o := range opts {
1294  		if err := o.before(c); err != nil {
1295  			return nil, toRPCErr(err)
1296  		}
1297  	}
1298  	c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
1299  	c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
1300  	if err := setCallInfoCodec(c); err != nil {
1301  		return nil, err
1302  	}
1303  
1304  	callHdr := &transport.CallHdr{
1305  		Host:           ac.cc.authority,
1306  		Method:         method,
1307  		ContentSubtype: c.contentSubtype,
1308  	}
1309  
1310  	// Set our outgoing compression according to the UseCompressor CallOption, if
1311  	// set.  In that case, also find the compressor from the encoding package.
1312  	// Otherwise, use the compressor configured by the WithCompressor DialOption,
1313  	// if set.
1314  	var cp Compressor
1315  	var comp encoding.Compressor
1316  	if ct := c.compressorName; ct != "" {
1317  		callHdr.SendCompress = ct
1318  		if ct != encoding.Identity {
1319  			comp = encoding.GetCompressor(ct)
1320  			if comp == nil {
1321  				return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1322  			}
1323  		}
1324  	} else if ac.cc.dopts.compressorV0 != nil {
1325  		callHdr.SendCompress = ac.cc.dopts.compressorV0.Type()
1326  		cp = ac.cc.dopts.compressorV0
1327  	}
1328  	if c.creds != nil {
1329  		callHdr.Creds = c.creds
1330  	}
1331  
1332  	// Use a special addrConnStream to avoid retry.
1333  	as := &addrConnStream{
1334  		callHdr:          callHdr,
1335  		ac:               ac,
1336  		ctx:              ctx,
1337  		cancel:           cancel,
1338  		opts:             opts,
1339  		callInfo:         c,
1340  		desc:             desc,
1341  		codec:            c.codec,
1342  		sendCompressorV0: cp,
1343  		sendCompressorV1: comp,
1344  		transport:        t,
1345  	}
1346  
1347  	s, err := as.transport.NewStream(as.ctx, as.callHdr)
1348  	if err != nil {
1349  		err = toRPCErr(err)
1350  		return nil, err
1351  	}
1352  	as.transportStream = s
1353  	as.parser = parser{r: s, bufferPool: ac.dopts.copts.BufferPool}
1354  	ac.incrCallsStarted()
1355  	if desc != unaryStreamDesc {
1356  		// Listen on stream context to cleanup when the stream context is
1357  		// canceled.  Also listen for the addrConn's context in case the
1358  		// addrConn is closed or reconnects to a different address.  In all
1359  		// other cases, an error should already be injected into the recv
1360  		// buffer by the transport, which the client will eventually receive,
1361  		// and then we will cancel the stream's context in
1362  		// addrConnStream.finish.
1363  		go func() {
1364  			ac.mu.Lock()
1365  			acCtx := ac.ctx
1366  			ac.mu.Unlock()
1367  			select {
1368  			case <-acCtx.Done():
1369  				as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1370  			case <-ctx.Done():
1371  				as.finish(toRPCErr(ctx.Err()))
1372  			}
1373  		}()
1374  	}
1375  	return as, nil
1376  }
1377  
1378  type addrConnStream struct {
1379  	transportStream  *transport.ClientStream
1380  	ac               *addrConn
1381  	callHdr          *transport.CallHdr
1382  	cancel           context.CancelFunc
1383  	opts             []CallOption
1384  	callInfo         *callInfo
1385  	transport        transport.ClientTransport
1386  	ctx              context.Context
1387  	sentLast         bool
1388  	receivedFirstMsg bool
1389  	desc             *StreamDesc
1390  	codec            baseCodec
1391  	sendCompressorV0 Compressor
1392  	sendCompressorV1 encoding.Compressor
1393  	decompressorSet  bool
1394  	decompressorV0   Decompressor
1395  	decompressorV1   encoding.Compressor
1396  	parser           parser
1397  
1398  	// mu guards finished and is held for the entire finish method.
1399  	mu       sync.Mutex
1400  	finished bool
1401  }
1402  
1403  func (as *addrConnStream) Header() (metadata.MD, error) {
1404  	m, err := as.transportStream.Header()
1405  	if err != nil {
1406  		as.finish(toRPCErr(err))
1407  	}
1408  	return m, err
1409  }
1410  
1411  func (as *addrConnStream) Trailer() metadata.MD {
1412  	return as.transportStream.Trailer()
1413  }
1414  
1415  func (as *addrConnStream) CloseSend() error {
1416  	if as.sentLast {
1417  		// Return a nil error on repeated calls to this method.
1418  		return nil
1419  	}
1420  	as.sentLast = true
1421  
1422  	as.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
1423  	// Always return nil; io.EOF is the only error that might make sense
1424  	// instead, but there is no need to signal the client to call RecvMsg
1425  	// as the only use left for the stream after CloseSend is to call
1426  	// RecvMsg.  This also matches historical behavior.
1427  	return nil
1428  }
1429  
1430  func (as *addrConnStream) Context() context.Context {
1431  	return as.transportStream.Context()
1432  }
1433  
1434  func (as *addrConnStream) SendMsg(m any) (err error) {
1435  	defer func() {
1436  		if err != nil && err != io.EOF {
1437  			// Call finish on the client stream for errors generated by this SendMsg
1438  			// call, as these indicate problems created by this client.  (Transport
1439  			// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1440  			// error will be returned from RecvMsg eventually in that case, or be
1441  			// retried.)
1442  			as.finish(err)
1443  		}
1444  	}()
1445  	if as.sentLast {
1446  		return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1447  	}
1448  	if !as.desc.ClientStreams {
1449  		as.sentLast = true
1450  	}
1451  
1452  	// load hdr, payload, data
1453  	hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.sendCompressorV0, as.sendCompressorV1, as.ac.dopts.copts.BufferPool)
1454  	if err != nil {
1455  		return err
1456  	}
1457  
1458  	defer func() {
1459  		data.Free()
1460  		// only free payload if compression was made, and therefore it is a different set
1461  		// of buffers from data.
1462  		if pf.isCompressed() {
1463  			payload.Free()
1464  		}
1465  	}()
1466  
1467  	// TODO(dfawley): should we be checking len(data) instead?
1468  	if payload.Len() > *as.callInfo.maxSendMessageSize {
1469  		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payload.Len(), *as.callInfo.maxSendMessageSize)
1470  	}
1471  
1472  	if err := as.transportStream.Write(hdr, payload, &transport.WriteOptions{Last: !as.desc.ClientStreams}); err != nil {
1473  		if !as.desc.ClientStreams {
1474  			// For non-client-streaming RPCs, we return nil instead of EOF on error
1475  			// because the generated code requires it.  finish is not called; RecvMsg()
1476  			// will call it with the stream's status independently.
1477  			return nil
1478  		}
1479  		return io.EOF
1480  	}
1481  
1482  	return nil
1483  }
1484  
1485  func (as *addrConnStream) RecvMsg(m any) (err error) {
1486  	defer func() {
1487  		if err != nil || !as.desc.ServerStreams {
1488  			// err != nil or non-server-streaming indicates end of stream.
1489  			as.finish(err)
1490  		}
1491  	}()
1492  
1493  	if !as.decompressorSet {
1494  		// Block until we receive headers containing received message encoding.
1495  		if ct := as.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1496  			if as.decompressorV0 == nil || as.decompressorV0.Type() != ct {
1497  				// No configured decompressor, or it does not match the incoming
1498  				// message encoding; attempt to find a registered compressor that does.
1499  				as.decompressorV0 = nil
1500  				as.decompressorV1 = encoding.GetCompressor(ct)
1501  			}
1502  			// Validate that the compression method is acceptable for this call.
1503  			if !acceptedCompressorAllows(as.callInfo.acceptedResponseCompressors, ct) {
1504  				return status.Errorf(codes.Internal, "grpc: peer compressed the response with %q which is not allowed by AcceptCompressors", ct)
1505  			}
1506  		} else {
1507  			// No compression is used; disable our decompressor.
1508  			as.decompressorV0 = nil
1509  		}
1510  		// Only initialize this state once per stream.
1511  		as.decompressorSet = true
1512  	}
1513  	if err := recv(&as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err != nil {
1514  		if err == io.EOF {
1515  			if statusErr := as.transportStream.Status().Err(); statusErr != nil {
1516  				return statusErr
1517  			}
1518  			// Received no msg and status OK for non-server streaming rpcs.
1519  			if !as.desc.ServerStreams && !as.receivedFirstMsg {
1520  				return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
1521  			}
1522  			return io.EOF // indicates successful end of stream.
1523  		}
1524  		return toRPCErr(err)
1525  	}
1526  	as.receivedFirstMsg = true
1527  
1528  	if as.desc.ServerStreams {
1529  		// Subsequent messages should be received by subsequent RecvMsg calls.
1530  		return nil
1531  	}
1532  
1533  	// Special handling for non-server-stream rpcs.
1534  	// This recv expects EOF or errors, so we don't collect inPayload.
1535  	if err := recv(&as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err == io.EOF {
1536  		return as.transportStream.Status().Err() // non-server streaming Recv returns nil on success
1537  	} else if err != nil {
1538  		return toRPCErr(err)
1539  	}
1540  	return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
1541  }
1542  
1543  func (as *addrConnStream) finish(err error) {
1544  	as.mu.Lock()
1545  	if as.finished {
1546  		as.mu.Unlock()
1547  		return
1548  	}
1549  	as.finished = true
1550  	if err == io.EOF {
1551  		// Ending a stream with EOF indicates a success.
1552  		err = nil
1553  	}
1554  	if as.transportStream != nil {
1555  		as.transportStream.Close(err)
1556  	}
1557  
1558  	if err != nil {
1559  		as.ac.incrCallsFailed()
1560  	} else {
1561  		as.ac.incrCallsSucceeded()
1562  	}
1563  	as.cancel()
1564  	as.mu.Unlock()
1565  }
1566  
1567  // ServerStream defines the server-side behavior of a streaming RPC.
1568  //
1569  // Errors returned from ServerStream methods are compatible with the status
1570  // package.  However, the status code will often not match the RPC status as
1571  // seen by the client application, and therefore, should not be relied upon for
1572  // this purpose.
1573  type ServerStream interface {
1574  	// SetHeader sets the header metadata. It may be called multiple times.
1575  	// When call multiple times, all the provided metadata will be merged.
1576  	// All the metadata will be sent out when one of the following happens:
1577  	//  - ServerStream.SendHeader() is called;
1578  	//  - The first response is sent out;
1579  	//  - An RPC status is sent out (error or success).
1580  	SetHeader(metadata.MD) error
1581  	// SendHeader sends the header metadata.
1582  	// The provided md and headers set by SetHeader() will be sent.
1583  	// It fails if called multiple times.
1584  	SendHeader(metadata.MD) error
1585  	// SetTrailer sets the trailer metadata which will be sent with the RPC status.
1586  	// When called more than once, all the provided metadata will be merged.
1587  	SetTrailer(metadata.MD)
1588  	// Context returns the context for this stream.
1589  	Context() context.Context
1590  	// SendMsg sends a message. On error, SendMsg aborts the stream and the
1591  	// error is returned directly.
1592  	//
1593  	// SendMsg blocks until:
1594  	//   - There is sufficient flow control to schedule m with the transport, or
1595  	//   - The stream is done, or
1596  	//   - The stream breaks.
1597  	//
1598  	// SendMsg does not wait until the message is received by the client. An
1599  	// untimely stream closure may result in lost messages.
1600  	//
1601  	// It is safe to have a goroutine calling SendMsg and another goroutine
1602  	// calling RecvMsg on the same stream at the same time, but it is not safe
1603  	// to call SendMsg on the same stream in different goroutines.
1604  	//
1605  	// It is not safe to modify the message after calling SendMsg. Tracing
1606  	// libraries and stats handlers may use the message lazily.
1607  	SendMsg(m any) error
1608  	// RecvMsg blocks until it receives a message into m or the stream is
1609  	// done. It returns io.EOF when the client has performed a CloseSend. On
1610  	// any non-EOF error, the stream is aborted and the error contains the
1611  	// RPC status.
1612  	//
1613  	// It is safe to have a goroutine calling SendMsg and another goroutine
1614  	// calling RecvMsg on the same stream at the same time, but it is not
1615  	// safe to call RecvMsg on the same stream in different goroutines.
1616  	RecvMsg(m any) error
1617  }
1618  
1619  // serverStream implements a server side Stream.
1620  type serverStream struct {
1621  	ctx   context.Context
1622  	s     *transport.ServerStream
1623  	p     parser
1624  	codec baseCodec
1625  	desc  *StreamDesc
1626  
1627  	compressorV0   Compressor
1628  	compressorV1   encoding.Compressor
1629  	decompressorV0 Decompressor
1630  	decompressorV1 encoding.Compressor
1631  
1632  	sendCompressorName string
1633  
1634  	recvFirstMsg bool // set after the first message is received
1635  
1636  	maxReceiveMessageSize int
1637  	maxSendMessageSize    int
1638  	trInfo                *traceInfo
1639  
1640  	statsHandler stats.Handler
1641  
1642  	binlogs []binarylog.MethodLogger
1643  	// serverHeaderBinlogged indicates whether server header has been logged. It
1644  	// will happen when one of the following two happens: stream.SendHeader(),
1645  	// stream.Send().
1646  	//
1647  	// It's only checked in send and sendHeader, doesn't need to be
1648  	// synchronized.
1649  	serverHeaderBinlogged bool
1650  
1651  	mu sync.Mutex // protects trInfo.tr after the service handler runs.
1652  }
1653  
1654  func (ss *serverStream) Context() context.Context {
1655  	return ss.ctx
1656  }
1657  
1658  func (ss *serverStream) SetHeader(md metadata.MD) error {
1659  	if md.Len() == 0 {
1660  		return nil
1661  	}
1662  	err := imetadata.Validate(md)
1663  	if err != nil {
1664  		return status.Error(codes.Internal, err.Error())
1665  	}
1666  	return ss.s.SetHeader(md)
1667  }
1668  
1669  func (ss *serverStream) SendHeader(md metadata.MD) error {
1670  	err := imetadata.Validate(md)
1671  	if err != nil {
1672  		return status.Error(codes.Internal, err.Error())
1673  	}
1674  
1675  	err = ss.s.SendHeader(md)
1676  	if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
1677  		h, _ := ss.s.Header()
1678  		sh := &binarylog.ServerHeader{
1679  			Header: h,
1680  		}
1681  		ss.serverHeaderBinlogged = true
1682  		for _, binlog := range ss.binlogs {
1683  			binlog.Log(ss.ctx, sh)
1684  		}
1685  	}
1686  	return err
1687  }
1688  
1689  func (ss *serverStream) SetTrailer(md metadata.MD) {
1690  	if md.Len() == 0 {
1691  		return
1692  	}
1693  	if err := imetadata.Validate(md); err != nil {
1694  		logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
1695  	}
1696  	ss.s.SetTrailer(md)
1697  }
1698  
1699  func (ss *serverStream) SendMsg(m any) (err error) {
1700  	defer func() {
1701  		if ss.trInfo != nil {
1702  			ss.mu.Lock()
1703  			if ss.trInfo.tr != nil {
1704  				if err == nil {
1705  					ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1706  				} else {
1707  					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1708  					ss.trInfo.tr.SetError()
1709  				}
1710  			}
1711  			ss.mu.Unlock()
1712  		}
1713  		if err != nil && err != io.EOF {
1714  			st, _ := status.FromError(toRPCErr(err))
1715  			ss.s.WriteStatus(st)
1716  			// Non-user specified status was sent out. This should be an error
1717  			// case (as a server side Cancel maybe).
1718  			//
1719  			// This is not handled specifically now. User will return a final
1720  			// status from the service handler, we will log that error instead.
1721  			// This behavior is similar to an interceptor.
1722  		}
1723  	}()
1724  
1725  	// Server handler could have set new compressor by calling SetSendCompressor.
1726  	// In case it is set, we need to use it for compressing outbound message.
1727  	if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
1728  		ss.compressorV1 = encoding.GetCompressor(sendCompressorsName)
1729  		ss.sendCompressorName = sendCompressorsName
1730  	}
1731  
1732  	// load hdr, payload, data
1733  	hdr, data, payload, pf, err := prepareMsg(m, ss.codec, ss.compressorV0, ss.compressorV1, ss.p.bufferPool)
1734  	if err != nil {
1735  		return err
1736  	}
1737  
1738  	defer func() {
1739  		data.Free()
1740  		// only free payload if compression was made, and therefore it is a different set
1741  		// of buffers from data.
1742  		if pf.isCompressed() {
1743  			payload.Free()
1744  		}
1745  	}()
1746  
1747  	dataLen := data.Len()
1748  	payloadLen := payload.Len()
1749  
1750  	// TODO(dfawley): should we be checking len(data) instead?
1751  	if payloadLen > ss.maxSendMessageSize {
1752  		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, ss.maxSendMessageSize)
1753  	}
1754  	if err := ss.s.Write(hdr, payload, &transport.WriteOptions{Last: false}); err != nil {
1755  		return toRPCErr(err)
1756  	}
1757  
1758  	if len(ss.binlogs) != 0 {
1759  		if !ss.serverHeaderBinlogged {
1760  			h, _ := ss.s.Header()
1761  			sh := &binarylog.ServerHeader{
1762  				Header: h,
1763  			}
1764  			ss.serverHeaderBinlogged = true
1765  			for _, binlog := range ss.binlogs {
1766  				binlog.Log(ss.ctx, sh)
1767  			}
1768  		}
1769  		sm := &binarylog.ServerMessage{
1770  			Message: data.Materialize(),
1771  		}
1772  		for _, binlog := range ss.binlogs {
1773  			binlog.Log(ss.ctx, sm)
1774  		}
1775  	}
1776  	if ss.statsHandler != nil {
1777  		ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, dataLen, payloadLen, time.Now()))
1778  	}
1779  	return nil
1780  }
1781  
1782  func (ss *serverStream) RecvMsg(m any) (err error) {
1783  	defer func() {
1784  		if ss.trInfo != nil {
1785  			ss.mu.Lock()
1786  			if ss.trInfo.tr != nil {
1787  				if err == nil {
1788  					ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1789  				} else if err != io.EOF {
1790  					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1791  					ss.trInfo.tr.SetError()
1792  				}
1793  			}
1794  			ss.mu.Unlock()
1795  		}
1796  		if err != nil && err != io.EOF {
1797  			st, _ := status.FromError(toRPCErr(err))
1798  			ss.s.WriteStatus(st)
1799  			// Non-user specified status was sent out. This should be an error
1800  			// case (as a server side Cancel maybe).
1801  			//
1802  			// This is not handled specifically now. User will return a final
1803  			// status from the service handler, we will log that error instead.
1804  			// This behavior is similar to an interceptor.
1805  		}
1806  	}()
1807  	var payInfo *payloadInfo
1808  	if ss.statsHandler != nil || len(ss.binlogs) != 0 {
1809  		payInfo = &payloadInfo{}
1810  		defer payInfo.free()
1811  	}
1812  	if err := recv(&ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, payInfo, ss.decompressorV1, true); err != nil {
1813  		if err == io.EOF {
1814  			if len(ss.binlogs) != 0 {
1815  				chc := &binarylog.ClientHalfClose{}
1816  				for _, binlog := range ss.binlogs {
1817  					binlog.Log(ss.ctx, chc)
1818  				}
1819  			}
1820  			// Received no request msg for non-client streaming rpcs.
1821  			if !ss.desc.ClientStreams && !ss.recvFirstMsg {
1822  				return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC")
1823  			}
1824  			return err
1825  		}
1826  		if err == io.ErrUnexpectedEOF {
1827  			err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())
1828  		}
1829  		return toRPCErr(err)
1830  	}
1831  	ss.recvFirstMsg = true
1832  	if ss.statsHandler != nil {
1833  		ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
1834  			RecvTime:         time.Now(),
1835  			Payload:          m,
1836  			Length:           payInfo.uncompressedBytes.Len(),
1837  			WireLength:       payInfo.compressedLength + headerLen,
1838  			CompressedLength: payInfo.compressedLength,
1839  		})
1840  	}
1841  	if len(ss.binlogs) != 0 {
1842  		cm := &binarylog.ClientMessage{
1843  			Message: payInfo.uncompressedBytes.Materialize(),
1844  		}
1845  		for _, binlog := range ss.binlogs {
1846  			binlog.Log(ss.ctx, cm)
1847  		}
1848  	}
1849  
1850  	if ss.desc.ClientStreams {
1851  		// Subsequent messages should be received by subsequent RecvMsg calls.
1852  		return nil
1853  	}
1854  	// Special handling for non-client-stream rpcs.
1855  	// This recv expects EOF or errors, so we don't collect inPayload.
1856  	if err := recv(&ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, nil, ss.decompressorV1, true); err == io.EOF {
1857  		return nil
1858  	} else if err != nil {
1859  		return err
1860  	}
1861  	return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-client-streaming RPC")
1862  }
1863  
1864  // MethodFromServerStream returns the method string for the input stream.
1865  // The returned string is in the format of "/service/method".
1866  func MethodFromServerStream(stream ServerStream) (string, bool) {
1867  	return Method(stream.Context())
1868  }
1869  
1870  // prepareMsg returns the hdr, payload and data using the compressors passed or
1871  // using the passed preparedmsg. The returned boolean indicates whether
1872  // compression was made and therefore whether the payload needs to be freed in
1873  // addition to the returned data. Freeing the payload if the returned boolean is
1874  // false can lead to undefined behavior.
1875  func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor, pool mem.BufferPool) (hdr []byte, data, payload mem.BufferSlice, pf payloadFormat, err error) {
1876  	if preparedMsg, ok := m.(*PreparedMsg); ok {
1877  		return preparedMsg.hdr, preparedMsg.encodedData, preparedMsg.payload, preparedMsg.pf, nil
1878  	}
1879  	// The input interface is not a prepared msg.
1880  	// Marshal and Compress the data at this point
1881  	data, err = encode(codec, m)
1882  	if err != nil {
1883  		return nil, nil, nil, 0, err
1884  	}
1885  	compData, pf, err := compress(data, cp, comp, pool)
1886  	if err != nil {
1887  		data.Free()
1888  		return nil, nil, nil, 0, err
1889  	}
1890  	hdr, payload = msgHeader(data, compData, pf)
1891  	return hdr, data, payload, pf, nil
1892  }
1893