1 /*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 package grpc
20 21 import (
22 "context"
23 "errors"
24 "io"
25 "math"
26 rand "math/rand/v2"
27 "strconv"
28 "strings"
29 "sync"
30 "time"
31 32 "google.golang.org/grpc/balancer"
33 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/encoding"
35 "google.golang.org/grpc/internal"
36 "google.golang.org/grpc/internal/balancerload"
37 "google.golang.org/grpc/internal/binarylog"
38 "google.golang.org/grpc/internal/channelz"
39 "google.golang.org/grpc/internal/grpcutil"
40 imetadata "google.golang.org/grpc/internal/metadata"
41 iresolver "google.golang.org/grpc/internal/resolver"
42 "google.golang.org/grpc/internal/serviceconfig"
43 istatus "google.golang.org/grpc/internal/status"
44 "google.golang.org/grpc/internal/transport"
45 "google.golang.org/grpc/mem"
46 "google.golang.org/grpc/metadata"
47 "google.golang.org/grpc/peer"
48 "google.golang.org/grpc/stats"
49 "google.golang.org/grpc/status"
50 )
51 52 var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
53 54 // StreamHandler defines the handler called by gRPC server to complete the
55 // execution of a streaming RPC.
56 //
57 // If a StreamHandler returns an error, it should either be produced by the
58 // status package, or be one of the context errors. Otherwise, gRPC will use
59 // codes.Unknown as the status code and err.Error() as the status message of the
60 // RPC.
61 type StreamHandler func(srv any, stream ServerStream) error
62 63 // StreamDesc represents a streaming RPC service's method specification. Used
64 // on the server when registering services and on the client when initiating
65 // new streams.
66 type StreamDesc struct {
67 // StreamName and Handler are only used when registering handlers on a
68 // server.
69 StreamName string // the name of the method excluding the service
70 Handler StreamHandler // the handler called for the method
71 72 // ServerStreams and ClientStreams are used for registering handlers on a
73 // server as well as defining RPC behavior when passed to NewClientStream
74 // and ClientConn.NewStream. At least one must be true.
75 ServerStreams bool // indicates the server can perform streaming sends
76 ClientStreams bool // indicates the client can perform streaming sends
77 }
78 79 // Stream defines the common interface a client or server stream has to satisfy.
80 //
81 // Deprecated: See ClientStream and ServerStream documentation instead.
82 type Stream interface {
83 // Deprecated: See ClientStream and ServerStream documentation instead.
84 Context() context.Context
85 // Deprecated: See ClientStream and ServerStream documentation instead.
86 SendMsg(m any) error
87 // Deprecated: See ClientStream and ServerStream documentation instead.
88 RecvMsg(m any) error
89 }
90 91 // ClientStream defines the client-side behavior of a streaming RPC.
92 //
93 // All errors returned from ClientStream methods are compatible with the
94 // status package.
95 type ClientStream interface {
96 // Header returns the header metadata received from the server if there
97 // is any. It blocks if the metadata is not ready to read. If the metadata
98 // is nil and the error is also nil, then the stream was terminated without
99 // headers, and the status can be discovered by calling RecvMsg.
100 Header() (metadata.MD, error)
101 // Trailer returns the trailer metadata from the server, if there is any.
102 // It must only be called after stream.CloseAndRecv has returned, or
103 // stream.Recv has returned a non-nil error (including io.EOF).
104 Trailer() metadata.MD
105 // CloseSend closes the send direction of the stream. This method always
106 // returns a nil error. The status of the stream may be discovered using
107 // RecvMsg. It is also not safe to call CloseSend concurrently with SendMsg.
108 CloseSend() error
109 // Context returns the context for this stream.
110 //
111 // It should not be called until after Header or RecvMsg has returned. Once
112 // called, subsequent client-side retries are disabled.
113 Context() context.Context
114 // SendMsg is generally called by generated code. On error, SendMsg aborts
115 // the stream. If the error was generated by the client, the status is
116 // returned directly; otherwise, io.EOF is returned and the status of
117 // the stream may be discovered using RecvMsg. For unary or server-streaming
118 // RPCs (StreamDesc.ClientStreams is false), a nil error is returned
119 // unconditionally.
120 //
121 // SendMsg blocks until:
122 // - There is sufficient flow control to schedule m with the transport, or
123 // - The stream is done, or
124 // - The stream breaks.
125 //
126 // SendMsg does not wait until the message is received by the server. An
127 // untimely stream closure may result in lost messages. To ensure delivery,
128 // users should ensure the RPC completed successfully using RecvMsg.
129 //
130 // It is safe to have a goroutine calling SendMsg and another goroutine
131 // calling RecvMsg on the same stream at the same time, but it is not safe
132 // to call SendMsg on the same stream in different goroutines. It is also
133 // not safe to call CloseSend concurrently with SendMsg.
134 //
135 // It is not safe to modify the message after calling SendMsg. Tracing
136 // libraries and stats handlers may use the message lazily.
137 SendMsg(m any) error
138 // RecvMsg blocks until it receives a message into m or the stream is
139 // done. It returns io.EOF when the stream completes successfully. On
140 // any other error, the stream is aborted and the error contains the RPC
141 // status.
142 //
143 // It is safe to have a goroutine calling SendMsg and another goroutine
144 // calling RecvMsg on the same stream at the same time, but it is not
145 // safe to call RecvMsg on the same stream in different goroutines.
146 RecvMsg(m any) error
147 }
148 149 // NewStream creates a new Stream for the client side. This is typically
150 // called by generated code. ctx is used for the lifetime of the stream.
151 //
152 // To ensure resources are not leaked due to the stream returned, one of the following
153 // actions must be performed:
154 //
155 // 1. Call Close on the ClientConn.
156 // 2. Cancel the context provided.
157 // 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
158 // client-streaming RPC, for instance, might use the helper function
159 // CloseAndRecv (note that CloseSend does not Recv, therefore is not
160 // guaranteed to release all resources).
161 // 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
162 //
163 // If none of the above happen, a goroutine and a context will be leaked, and grpc
164 // will not call the optionally-configured stats handler with a stats.End message.
165 func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
166 // allow interceptor to see all applicable call options, which means those
167 // configured as defaults from dial option as well as per-call options
168 opts = combine(cc.dopts.callOptions, opts)
169 170 if cc.dopts.streamInt != nil {
171 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
172 }
173 return newClientStream(ctx, desc, cc, method, opts...)
174 }
175 176 // NewClientStream is a wrapper for ClientConn.NewStream.
177 func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
178 return cc.NewStream(ctx, desc, method, opts...)
179 }
180 181 var emptyMethodConfig = serviceconfig.MethodConfig{}
182 183 // endOfClientStream performs cleanup actions required for both successful and
184 // failed streams. This includes incrementing channelz stats and invoking all
185 // registered OnFinish call options.
186 func endOfClientStream(cc *ClientConn, err error, opts ...CallOption) {
187 if channelz.IsOn() {
188 if err != nil {
189 cc.incrCallsFailed()
190 } else {
191 cc.incrCallsSucceeded()
192 }
193 }
194 195 for _, o := range opts {
196 if o, ok := o.(OnFinishCallOption); ok {
197 o.OnFinish(err)
198 }
199 }
200 }
201 202 func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
203 if channelz.IsOn() {
204 cc.incrCallsStarted()
205 }
206 defer func() {
207 if err != nil {
208 // Ensure cleanup when stream creation fails.
209 endOfClientStream(cc, err, opts...)
210 }
211 }()
212 213 // Start tracking the RPC for idleness purposes. This is where a stream is
214 // created for both streaming and unary RPCs, and hence is a good place to
215 // track active RPC count.
216 cc.idlenessMgr.OnCallBegin()
217 218 // Add a calloption, to decrement the active call count, that gets executed
219 // when the RPC completes.
220 opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
221 222 if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
223 // validate md
224 if err := imetadata.Validate(md); err != nil {
225 return nil, status.Error(codes.Internal, err.Error())
226 }
227 // validate added
228 for _, kvs := range added {
229 for i := 0; i < len(kvs); i += 2 {
230 if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
231 return nil, status.Error(codes.Internal, err.Error())
232 }
233 }
234 }
235 }
236 // Provide an opportunity for the first RPC to see the first service config
237 // provided by the resolver.
238 nameResolutionDelayed, err := cc.waitForResolvedAddrs(ctx)
239 if err != nil {
240 return nil, err
241 }
242 243 mc := &emptyMethodConfig
244 var onCommit func()
245 newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
246 return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, nameResolutionDelayed, opts...)
247 }
248 249 rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
250 rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
251 if err != nil {
252 if st, ok := status.FromError(err); ok {
253 // Restrict the code to the list allowed by gRFC A54.
254 if istatus.IsRestrictedControlPlaneCode(st) {
255 err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
256 }
257 return nil, err
258 }
259 return nil, toRPCErr(err)
260 }
261 262 if rpcConfig != nil {
263 if rpcConfig.Context != nil {
264 ctx = rpcConfig.Context
265 }
266 mc = &rpcConfig.MethodConfig
267 onCommit = rpcConfig.OnCommitted
268 if rpcConfig.Interceptor != nil {
269 rpcInfo.Context = nil
270 ns := newStream
271 newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
272 cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
273 if err != nil {
274 return nil, toRPCErr(err)
275 }
276 return cs, nil
277 }
278 }
279 }
280 281 return newStream(ctx, func() {})
282 }
283 284 func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc *serviceconfig.MethodConfig, onCommit, doneFunc func(), nameResolutionDelayed bool, opts ...CallOption) (_ iresolver.ClientStream, err error) {
285 callInfo := defaultCallInfo()
286 if mc.WaitForReady != nil {
287 callInfo.failFast = !*mc.WaitForReady
288 }
289 290 // Possible context leak:
291 // The cancel function for the child context we create will only be called
292 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
293 // an error is generated by SendMsg.
294 // https://github.com/grpc/grpc-go/issues/1818.
295 var cancel context.CancelFunc
296 if mc.Timeout != nil && *mc.Timeout >= 0 {
297 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
298 } else {
299 ctx, cancel = context.WithCancel(ctx)
300 }
301 defer func() {
302 if err != nil {
303 cancel()
304 }
305 }()
306 307 for _, o := range opts {
308 if err := o.before(callInfo); err != nil {
309 return nil, toRPCErr(err)
310 }
311 }
312 callInfo.maxSendMessageSize = getMaxSize(mc.MaxReqSize, callInfo.maxSendMessageSize, defaultClientMaxSendMessageSize)
313 callInfo.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, callInfo.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
314 if err := setCallInfoCodec(callInfo); err != nil {
315 return nil, err
316 }
317 318 callHdr := &transport.CallHdr{
319 Host: cc.authority,
320 Method: method,
321 ContentSubtype: callInfo.contentSubtype,
322 DoneFunc: doneFunc,
323 Authority: callInfo.authority,
324 }
325 if allowed := callInfo.acceptedResponseCompressors; len(allowed) > 0 {
326 headerValue := strings.Join(allowed, ",")
327 callHdr.AcceptedCompressors = &headerValue
328 }
329 330 // Set our outgoing compression according to the UseCompressor CallOption, if
331 // set. In that case, also find the compressor from the encoding package.
332 // Otherwise, use the compressor configured by the WithCompressor DialOption,
333 // if set.
334 var compressorV0 Compressor
335 var compressorV1 encoding.Compressor
336 if ct := callInfo.compressorName; ct != "" {
337 callHdr.SendCompress = ct
338 if ct != encoding.Identity {
339 compressorV1 = encoding.GetCompressor(ct)
340 if compressorV1 == nil {
341 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
342 }
343 }
344 } else if cc.dopts.compressorV0 != nil {
345 callHdr.SendCompress = cc.dopts.compressorV0.Type()
346 compressorV0 = cc.dopts.compressorV0
347 }
348 if callInfo.creds != nil {
349 callHdr.Creds = callInfo.creds
350 }
351 352 cs := &clientStream{
353 callHdr: callHdr,
354 ctx: ctx,
355 methodConfig: mc,
356 opts: opts,
357 callInfo: callInfo,
358 cc: cc,
359 desc: desc,
360 codec: callInfo.codec,
361 compressorV0: compressorV0,
362 compressorV1: compressorV1,
363 cancel: cancel,
364 firstAttempt: true,
365 onCommit: onCommit,
366 nameResolutionDelay: nameResolutionDelayed,
367 }
368 if !cc.dopts.disableRetry {
369 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
370 }
371 if ml := binarylog.GetMethodLogger(method); ml != nil {
372 cs.binlogs = append(cs.binlogs, ml)
373 }
374 if cc.dopts.binaryLogger != nil {
375 if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
376 cs.binlogs = append(cs.binlogs, ml)
377 }
378 }
379 380 // Pick the transport to use and create a new stream on the transport.
381 // Assign cs.attempt upon success.
382 op := func(a *csAttempt) error {
383 if err := a.getTransport(); err != nil {
384 return err
385 }
386 if err := a.newStream(); err != nil {
387 return err
388 }
389 // Because this operation is always called either here (while creating
390 // the clientStream) or by the retry code while locked when replaying
391 // the operation, it is safe to access cs.attempt directly.
392 cs.attempt = a
393 return nil
394 }
395 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil {
396 return nil, err
397 }
398 399 if len(cs.binlogs) != 0 {
400 md, _ := metadata.FromOutgoingContext(ctx)
401 logEntry := &binarylog.ClientHeader{
402 OnClientSide: true,
403 Header: md,
404 MethodName: method,
405 Authority: cs.cc.authority,
406 }
407 if deadline, ok := ctx.Deadline(); ok {
408 logEntry.Timeout = time.Until(deadline)
409 if logEntry.Timeout < 0 {
410 logEntry.Timeout = 0
411 }
412 }
413 for _, binlog := range cs.binlogs {
414 binlog.Log(cs.ctx, logEntry)
415 }
416 }
417 418 if desc != unaryStreamDesc {
419 // Listen on cc and stream contexts to cleanup when the user closes the
420 // ClientConn or cancels the stream context. In all other cases, an error
421 // should already be injected into the recv buffer by the transport, which
422 // the client will eventually receive, and then we will cancel the stream's
423 // context in clientStream.finish.
424 go func() {
425 select {
426 case <-cc.ctx.Done():
427 cs.finish(ErrClientConnClosing)
428 case <-ctx.Done():
429 cs.finish(toRPCErr(ctx.Err()))
430 }
431 }()
432 }
433 return cs, nil
434 }
435 436 // newAttemptLocked creates a new csAttempt without a transport or stream.
437 func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
438 if err := cs.ctx.Err(); err != nil {
439 return nil, toRPCErr(err)
440 }
441 if err := cs.cc.ctx.Err(); err != nil {
442 return nil, ErrClientConnClosing
443 }
444 445 ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.compressorV0, cs.compressorV1)
446 method := cs.callHdr.Method
447 var beginTime time.Time
448 sh := cs.cc.statsHandler
449 if sh != nil {
450 beginTime = time.Now()
451 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{
452 FullMethodName: method, FailFast: cs.callInfo.failFast,
453 NameResolutionDelay: cs.nameResolutionDelay,
454 })
455 sh.HandleRPC(ctx, &stats.Begin{
456 Client: true,
457 BeginTime: beginTime,
458 FailFast: cs.callInfo.failFast,
459 IsClientStream: cs.desc.ClientStreams,
460 IsServerStream: cs.desc.ServerStreams,
461 IsTransparentRetryAttempt: isTransparent,
462 })
463 }
464 465 var trInfo *traceInfo
466 if EnableTracing {
467 trInfo = &traceInfo{
468 tr: newTrace("grpc.Sent."+methodFamily(method), method),
469 firstLine: firstLine{
470 client: true,
471 },
472 }
473 if deadline, ok := ctx.Deadline(); ok {
474 trInfo.firstLine.deadline = time.Until(deadline)
475 }
476 trInfo.tr.LazyLog(&trInfo.firstLine, false)
477 ctx = newTraceContext(ctx, trInfo.tr)
478 }
479 480 if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata {
481 // Add extra metadata (metadata that will be added by transport) to context
482 // so the balancer can see them.
483 ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
484 "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
485 ))
486 }
487 488 return &csAttempt{
489 ctx: ctx,
490 beginTime: beginTime,
491 cs: cs,
492 decompressorV0: cs.cc.dopts.dc,
493 statsHandler: sh,
494 trInfo: trInfo,
495 }, nil
496 }
497 498 func (a *csAttempt) getTransport() error {
499 cs := a.cs
500 501 pickInfo := balancer.PickInfo{Ctx: a.ctx, FullMethodName: cs.callHdr.Method}
502 pick, err := cs.cc.pickerWrapper.pick(a.ctx, cs.callInfo.failFast, pickInfo)
503 a.transport, a.pickResult = pick.transport, pick.result
504 if err != nil {
505 if de, ok := err.(dropError); ok {
506 err = de.error
507 a.drop = true
508 }
509 return err
510 }
511 if a.trInfo != nil {
512 a.trInfo.firstLine.SetRemoteAddr(a.transport.Peer().Addr)
513 }
514 if pick.blocked && a.statsHandler != nil {
515 a.statsHandler.HandleRPC(a.ctx, &stats.DelayedPickComplete{})
516 }
517 return nil
518 }
519 520 func (a *csAttempt) newStream() error {
521 cs := a.cs
522 cs.callHdr.PreviousAttempts = cs.numRetries
523 524 // Merge metadata stored in PickResult, if any, with existing call metadata.
525 // It is safe to overwrite the csAttempt's context here, since all state
526 // maintained in it are local to the attempt. When the attempt has to be
527 // retried, a new instance of csAttempt will be created.
528 if a.pickResult.Metadata != nil {
529 // We currently do not have a function it the metadata package which
530 // merges given metadata with existing metadata in a context. Existing
531 // function `AppendToOutgoingContext()` takes a variadic argument of key
532 // value pairs.
533 //
534 // TODO: Make it possible to retrieve key value pairs from metadata.MD
535 // in a form passable to AppendToOutgoingContext(), or create a version
536 // of AppendToOutgoingContext() that accepts a metadata.MD.
537 md, _ := metadata.FromOutgoingContext(a.ctx)
538 md = metadata.Join(md, a.pickResult.Metadata)
539 a.ctx = metadata.NewOutgoingContext(a.ctx, md)
540 }
541 542 s, err := a.transport.NewStream(a.ctx, cs.callHdr)
543 if err != nil {
544 nse, ok := err.(*transport.NewStreamError)
545 if !ok {
546 // Unexpected.
547 return err
548 }
549 550 if nse.AllowTransparentRetry {
551 a.allowTransparentRetry = true
552 }
553 554 // Unwrap and convert error.
555 return toRPCErr(nse.Err)
556 }
557 a.transportStream = s
558 a.ctx = s.Context()
559 a.parser = parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool}
560 return nil
561 }
562 563 // clientStream implements a client side Stream.
564 type clientStream struct {
565 callHdr *transport.CallHdr
566 opts []CallOption
567 callInfo *callInfo
568 cc *ClientConn
569 desc *StreamDesc
570 571 codec baseCodec
572 compressorV0 Compressor
573 compressorV1 encoding.Compressor
574 575 cancel context.CancelFunc // cancels all attempts
576 577 sentLast bool // sent an end stream
578 579 receivedFirstMsg bool // set after the first message is received
580 581 methodConfig *MethodConfig
582 583 ctx context.Context // the application's context, wrapped by stats/tracing
584 585 retryThrottler *retryThrottler // The throttler active when the RPC began.
586 587 binlogs []binarylog.MethodLogger
588 // serverHeaderBinlogged is a boolean for whether server header has been
589 // logged. Server header will be logged when the first time one of those
590 // happens: stream.Header(), stream.Recv().
591 //
592 // It's only read and used by Recv() and Header(), so it doesn't need to be
593 // synchronized.
594 serverHeaderBinlogged bool
595 596 mu sync.Mutex
597 firstAttempt bool // if true, transparent retry is valid
598 numRetries int // exclusive of transparent retry attempt(s)
599 numRetriesSincePushback int // retries since pushback; to reset backoff
600 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
601 // attempt is the active client stream attempt.
602 // The only place where it is written is the newAttemptLocked method and this method never writes nil.
603 // So, attempt can be nil only inside newClientStream function when clientStream is first created.
604 // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
605 // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
606 // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
607 // place where we need to check if the attempt is nil.
608 attempt *csAttempt
609 // TODO(hedging): hedging will have multiple attempts simultaneously.
610 committed bool // active attempt committed for retry?
611 onCommit func()
612 replayBuffer []replayOp // operations to replay on retry
613 replayBufferSize int // current size of replayBuffer
614 // nameResolutionDelay indicates if there was a delay in the name resolution.
615 // This field is only valid on client side, it's always false on server side.
616 nameResolutionDelay bool
617 }
618 619 type replayOp struct {
620 op func(a *csAttempt) error
621 cleanup func()
622 }
623 624 // csAttempt implements a single transport stream attempt within a
625 // clientStream.
626 type csAttempt struct {
627 ctx context.Context
628 cs *clientStream
629 transport transport.ClientTransport
630 transportStream *transport.ClientStream
631 parser parser
632 pickResult balancer.PickResult
633 634 finished bool
635 decompressorV0 Decompressor
636 decompressorV1 encoding.Compressor
637 decompressorSet bool
638 639 mu sync.Mutex // guards trInfo.tr
640 // trInfo may be nil (if EnableTracing is false).
641 // trInfo.tr is set when created (if EnableTracing is true),
642 // and cleared when the finish method is called.
643 trInfo *traceInfo
644 645 statsHandler stats.Handler
646 beginTime time.Time
647 648 // set for newStream errors that may be transparently retried
649 allowTransparentRetry bool
650 // set for pick errors that are returned as a status
651 drop bool
652 }
653 654 func (cs *clientStream) commitAttemptLocked() {
655 if !cs.committed && cs.onCommit != nil {
656 cs.onCommit()
657 }
658 cs.committed = true
659 for _, op := range cs.replayBuffer {
660 if op.cleanup != nil {
661 op.cleanup()
662 }
663 }
664 cs.replayBuffer = nil
665 }
666 667 func (cs *clientStream) commitAttempt() {
668 cs.mu.Lock()
669 cs.commitAttemptLocked()
670 cs.mu.Unlock()
671 }
672 673 // shouldRetry returns nil if the RPC should be retried; otherwise it returns
674 // the error that should be returned by the operation. If the RPC should be
675 // retried, the bool indicates whether it is being retried transparently.
676 func (a *csAttempt) shouldRetry(err error) (bool, error) {
677 cs := a.cs
678 679 if cs.finished || cs.committed || a.drop {
680 // RPC is finished or committed or was dropped by the picker; cannot retry.
681 return false, err
682 }
683 if a.transportStream == nil && a.allowTransparentRetry {
684 return true, nil
685 }
686 // Wait for the trailers.
687 unprocessed := false
688 if a.transportStream != nil {
689 <-a.transportStream.Done()
690 unprocessed = a.transportStream.Unprocessed()
691 }
692 if cs.firstAttempt && unprocessed {
693 // First attempt, stream unprocessed: transparently retry.
694 return true, nil
695 }
696 if cs.cc.dopts.disableRetry {
697 return false, err
698 }
699 700 pushback := 0
701 hasPushback := false
702 if a.transportStream != nil {
703 if !a.transportStream.TrailersOnly() {
704 return false, err
705 }
706 707 // TODO(retry): Move down if the spec changes to not check server pushback
708 // before considering this a failure for throttling.
709 sps := a.transportStream.Trailer()["grpc-retry-pushback-ms"]
710 if len(sps) == 1 {
711 var e error
712 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
713 channelz.Infof(logger, cs.cc.channelz, "Server retry pushback specified to abort (%q).", sps[0])
714 cs.retryThrottler.throttle() // This counts as a failure for throttling.
715 return false, err
716 }
717 hasPushback = true
718 } else if len(sps) > 1 {
719 channelz.Warningf(logger, cs.cc.channelz, "Server retry pushback specified multiple values (%q); not retrying.", sps)
720 cs.retryThrottler.throttle() // This counts as a failure for throttling.
721 return false, err
722 }
723 }
724 725 var code codes.Code
726 if a.transportStream != nil {
727 code = a.transportStream.Status().Code()
728 } else {
729 code = status.Code(err)
730 }
731 732 rp := cs.methodConfig.RetryPolicy
733 if rp == nil || !rp.RetryableStatusCodes[code] {
734 return false, err
735 }
736 737 // Note: the ordering here is important; we count this as a failure
738 // only if the code matched a retryable code.
739 if cs.retryThrottler.throttle() {
740 return false, err
741 }
742 if cs.numRetries+1 >= rp.MaxAttempts {
743 return false, err
744 }
745 746 var dur time.Duration
747 if hasPushback {
748 dur = time.Millisecond * time.Duration(pushback)
749 cs.numRetriesSincePushback = 0
750 } else {
751 fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
752 cur := min(float64(rp.InitialBackoff)*fact, float64(rp.MaxBackoff))
753 // Apply jitter by multiplying with a random factor between 0.8 and 1.2
754 cur *= 0.8 + 0.4*rand.Float64()
755 dur = time.Duration(int64(cur))
756 cs.numRetriesSincePushback++
757 }
758 759 // TODO(dfawley): we could eagerly fail here if dur puts us past the
760 // deadline, but unsure if it is worth doing.
761 t := time.NewTimer(dur)
762 select {
763 case <-t.C:
764 cs.numRetries++
765 return false, nil
766 case <-cs.ctx.Done():
767 t.Stop()
768 return false, status.FromContextError(cs.ctx.Err()).Err()
769 }
770 }
771 772 // Returns nil if a retry was performed and succeeded; error otherwise.
773 func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
774 for {
775 attempt.finish(toRPCErr(lastErr))
776 isTransparent, err := attempt.shouldRetry(lastErr)
777 if err != nil {
778 cs.commitAttemptLocked()
779 return err
780 }
781 cs.firstAttempt = false
782 attempt, err = cs.newAttemptLocked(isTransparent)
783 if err != nil {
784 // Only returns error if the clientconn is closed or the context of
785 // the stream is canceled.
786 return err
787 }
788 // Note that the first op in replayBuffer always sets cs.attempt
789 // if it is able to pick a transport and create a stream.
790 if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
791 return nil
792 }
793 }
794 }
795 796 func (cs *clientStream) Context() context.Context {
797 cs.commitAttempt()
798 // No need to lock before using attempt, since we know it is committed and
799 // cannot change.
800 if cs.attempt.transportStream != nil {
801 return cs.attempt.transportStream.Context()
802 }
803 return cs.ctx
804 }
805 806 func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
807 cs.mu.Lock()
808 for {
809 if cs.committed {
810 cs.mu.Unlock()
811 // toRPCErr is used in case the error from the attempt comes from
812 // NewClientStream, which intentionally doesn't return a status
813 // error to allow for further inspection; all other errors should
814 // already be status errors.
815 return toRPCErr(op(cs.attempt))
816 }
817 if len(cs.replayBuffer) == 0 {
818 // For the first op, which controls creation of the stream and
819 // assigns cs.attempt, we need to create a new attempt inline
820 // before executing the first op. On subsequent ops, the attempt
821 // is created immediately before replaying the ops.
822 var err error
823 if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
824 cs.mu.Unlock()
825 cs.finish(err)
826 return err
827 }
828 }
829 a := cs.attempt
830 cs.mu.Unlock()
831 err := op(a)
832 cs.mu.Lock()
833 if a != cs.attempt {
834 // We started another attempt already.
835 continue
836 }
837 if err == io.EOF {
838 <-a.transportStream.Done()
839 }
840 if err == nil || (err == io.EOF && a.transportStream.Status().Code() == codes.OK) {
841 onSuccess()
842 cs.mu.Unlock()
843 return err
844 }
845 if err := cs.retryLocked(a, err); err != nil {
846 cs.mu.Unlock()
847 return err
848 }
849 }
850 }
851 852 func (cs *clientStream) Header() (metadata.MD, error) {
853 var m metadata.MD
854 err := cs.withRetry(func(a *csAttempt) error {
855 var err error
856 m, err = a.transportStream.Header()
857 return toRPCErr(err)
858 }, cs.commitAttemptLocked)
859 860 if m == nil && err == nil {
861 // The stream ended with success. Finish the clientStream.
862 err = io.EOF
863 }
864 865 if err != nil {
866 cs.finish(err)
867 // Do not return the error. The user should get it by calling Recv().
868 return nil, nil
869 }
870 871 if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && m != nil {
872 // Only log if binary log is on and header has not been logged, and
873 // there is actually headers to log.
874 logEntry := &binarylog.ServerHeader{
875 OnClientSide: true,
876 Header: m,
877 PeerAddr: nil,
878 }
879 if peer, ok := peer.FromContext(cs.Context()); ok {
880 logEntry.PeerAddr = peer.Addr
881 }
882 cs.serverHeaderBinlogged = true
883 for _, binlog := range cs.binlogs {
884 binlog.Log(cs.ctx, logEntry)
885 }
886 }
887 888 return m, nil
889 }
890 891 func (cs *clientStream) Trailer() metadata.MD {
892 // On RPC failure, we never need to retry, because usage requires that
893 // RecvMsg() returned a non-nil error before calling this function is valid.
894 // We would have retried earlier if necessary.
895 //
896 // Commit the attempt anyway, just in case users are not following those
897 // directions -- it will prevent races and should not meaningfully impact
898 // performance.
899 cs.commitAttempt()
900 if cs.attempt.transportStream == nil {
901 return nil
902 }
903 return cs.attempt.transportStream.Trailer()
904 }
905 906 func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
907 for _, f := range cs.replayBuffer {
908 if err := f.op(attempt); err != nil {
909 return err
910 }
911 }
912 return nil
913 }
914 915 func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error, cleanup func()) {
916 // Note: we still will buffer if retry is disabled (for transparent retries).
917 if cs.committed {
918 return
919 }
920 cs.replayBufferSize += sz
921 if cs.replayBufferSize > cs.callInfo.maxRetryRPCBufferSize {
922 cs.commitAttemptLocked()
923 cleanup()
924 return
925 }
926 cs.replayBuffer = append(cs.replayBuffer, replayOp{op: op, cleanup: cleanup})
927 }
928 929 func (cs *clientStream) SendMsg(m any) (err error) {
930 defer func() {
931 if err != nil && err != io.EOF {
932 // Call finish on the client stream for errors generated by this SendMsg
933 // call, as these indicate problems created by this client. (Transport
934 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
935 // error will be returned from RecvMsg eventually in that case, or be
936 // retried.)
937 cs.finish(err)
938 }
939 }()
940 if cs.sentLast {
941 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
942 }
943 if !cs.desc.ClientStreams {
944 cs.sentLast = true
945 }
946 947 // load hdr, payload, data
948 hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.compressorV0, cs.compressorV1, cs.cc.dopts.copts.BufferPool)
949 if err != nil {
950 return err
951 }
952 953 defer func() {
954 data.Free()
955 // only free payload if compression was made, and therefore it is a different set
956 // of buffers from data.
957 if pf.isCompressed() {
958 payload.Free()
959 }
960 }()
961 962 dataLen := data.Len()
963 payloadLen := payload.Len()
964 // TODO(dfawley): should we be checking len(data) instead?
965 if payloadLen > *cs.callInfo.maxSendMessageSize {
966 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, *cs.callInfo.maxSendMessageSize)
967 }
968 969 // always take an extra ref in case data == payload (i.e. when the data isn't
970 // compressed). The original ref will always be freed by the deferred free above.
971 payload.Ref()
972 op := func(a *csAttempt) error {
973 return a.sendMsg(m, hdr, payload, dataLen, payloadLen)
974 }
975 976 // onSuccess is invoked when the op is captured for a subsequent retry. If the
977 // stream was established by a previous message and therefore retries are
978 // disabled, onSuccess will not be invoked, and payloadRef can be freed
979 // immediately.
980 onSuccessCalled := false
981 err = cs.withRetry(op, func() {
982 cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free)
983 onSuccessCalled = true
984 })
985 if !onSuccessCalled {
986 payload.Free()
987 }
988 if len(cs.binlogs) != 0 && err == nil {
989 cm := &binarylog.ClientMessage{
990 OnClientSide: true,
991 Message: data.Materialize(),
992 }
993 for _, binlog := range cs.binlogs {
994 binlog.Log(cs.ctx, cm)
995 }
996 }
997 return err
998 }
999 1000 func (cs *clientStream) RecvMsg(m any) error {
1001 if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
1002 // Call Header() to binary log header if it's not already logged.
1003 cs.Header()
1004 }
1005 var recvInfo *payloadInfo
1006 if len(cs.binlogs) != 0 {
1007 recvInfo = &payloadInfo{}
1008 defer recvInfo.free()
1009 }
1010 err := cs.withRetry(func(a *csAttempt) error {
1011 return a.recvMsg(m, recvInfo)
1012 }, cs.commitAttemptLocked)
1013 if len(cs.binlogs) != 0 && err == nil {
1014 sm := &binarylog.ServerMessage{
1015 OnClientSide: true,
1016 Message: recvInfo.uncompressedBytes.Materialize(),
1017 }
1018 for _, binlog := range cs.binlogs {
1019 binlog.Log(cs.ctx, sm)
1020 }
1021 }
1022 if err != nil || !cs.desc.ServerStreams {
1023 // err != nil or non-server-streaming indicates end of stream.
1024 cs.finish(err)
1025 }
1026 return err
1027 }
1028 1029 func (cs *clientStream) CloseSend() error {
1030 if cs.sentLast {
1031 // Return a nil error on repeated calls to this method.
1032 return nil
1033 }
1034 cs.sentLast = true
1035 op := func(a *csAttempt) error {
1036 a.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
1037 // Always return nil; io.EOF is the only error that might make sense
1038 // instead, but there is no need to signal the client to call RecvMsg
1039 // as the only use left for the stream after CloseSend is to call
1040 // RecvMsg. This also matches historical behavior.
1041 return nil
1042 }
1043 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) })
1044 if len(cs.binlogs) != 0 {
1045 chc := &binarylog.ClientHalfClose{
1046 OnClientSide: true,
1047 }
1048 for _, binlog := range cs.binlogs {
1049 binlog.Log(cs.ctx, chc)
1050 }
1051 }
1052 // We don't return an error here as we expect users to read all messages
1053 // from the stream and get the RPC status from RecvMsg(). Note that
1054 // SendMsg() must return an error when one occurs so the application
1055 // knows to stop sending messages, but that does not apply here.
1056 return nil
1057 }
1058 1059 func (cs *clientStream) finish(err error) {
1060 if err == io.EOF {
1061 // Ending a stream with EOF indicates a success.
1062 err = nil
1063 }
1064 cs.mu.Lock()
1065 if cs.finished {
1066 cs.mu.Unlock()
1067 return
1068 }
1069 cs.finished = true
1070 cs.commitAttemptLocked()
1071 if cs.attempt != nil {
1072 cs.attempt.finish(err)
1073 // after functions all rely upon having a stream.
1074 if cs.attempt.transportStream != nil {
1075 for _, o := range cs.opts {
1076 o.after(cs.callInfo, cs.attempt)
1077 }
1078 }
1079 }
1080 1081 cs.mu.Unlock()
1082 // Only one of cancel or trailer needs to be logged.
1083 if len(cs.binlogs) != 0 {
1084 switch err {
1085 case errContextCanceled, errContextDeadline, ErrClientConnClosing:
1086 c := &binarylog.Cancel{
1087 OnClientSide: true,
1088 }
1089 for _, binlog := range cs.binlogs {
1090 binlog.Log(cs.ctx, c)
1091 }
1092 default:
1093 logEntry := &binarylog.ServerTrailer{
1094 OnClientSide: true,
1095 Trailer: cs.Trailer(),
1096 Err: err,
1097 }
1098 if peer, ok := peer.FromContext(cs.Context()); ok {
1099 logEntry.PeerAddr = peer.Addr
1100 }
1101 for _, binlog := range cs.binlogs {
1102 binlog.Log(cs.ctx, logEntry)
1103 }
1104 }
1105 }
1106 if err == nil {
1107 cs.retryThrottler.successfulRPC()
1108 }
1109 endOfClientStream(cs.cc, err, cs.opts...)
1110 cs.cancel()
1111 }
1112 1113 func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength, payloadLength int) error {
1114 cs := a.cs
1115 if a.trInfo != nil {
1116 a.mu.Lock()
1117 if a.trInfo.tr != nil {
1118 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1119 }
1120 a.mu.Unlock()
1121 }
1122 if err := a.transportStream.Write(hdr, payld, &transport.WriteOptions{Last: !cs.desc.ClientStreams}); err != nil {
1123 if !cs.desc.ClientStreams {
1124 // For non-client-streaming RPCs, we return nil instead of EOF on error
1125 // because the generated code requires it. finish is not called; RecvMsg()
1126 // will call it with the stream's status independently.
1127 return nil
1128 }
1129 return io.EOF
1130 }
1131 if a.statsHandler != nil {
1132 a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, dataLength, payloadLength, time.Now()))
1133 }
1134 return nil
1135 }
1136 1137 func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
1138 cs := a.cs
1139 if a.statsHandler != nil && payInfo == nil {
1140 payInfo = &payloadInfo{}
1141 defer payInfo.free()
1142 }
1143 1144 if !a.decompressorSet {
1145 // Block until we receive headers containing received message encoding.
1146 if ct := a.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1147 if a.decompressorV0 == nil || a.decompressorV0.Type() != ct {
1148 // No configured decompressor, or it does not match the incoming
1149 // message encoding; attempt to find a registered compressor that does.
1150 a.decompressorV0 = nil
1151 a.decompressorV1 = encoding.GetCompressor(ct)
1152 }
1153 // Validate that the compression method is acceptable for this call.
1154 if !acceptedCompressorAllows(cs.callInfo.acceptedResponseCompressors, ct) {
1155 return status.Errorf(codes.Internal, "grpc: peer compressed the response with %q which is not allowed by AcceptCompressors", ct)
1156 }
1157 } else {
1158 // No compression is used; disable our decompressor.
1159 a.decompressorV0 = nil
1160 }
1161 // Only initialize this state once per stream.
1162 a.decompressorSet = true
1163 }
1164 if err := recv(&a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decompressorV1, false); err != nil {
1165 if err == io.EOF {
1166 if statusErr := a.transportStream.Status().Err(); statusErr != nil {
1167 return statusErr
1168 }
1169 // Received no msg and status OK for non-server streaming rpcs.
1170 if !cs.desc.ServerStreams && !cs.receivedFirstMsg {
1171 return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
1172 }
1173 return io.EOF // indicates successful end of stream.
1174 }
1175 1176 return toRPCErr(err)
1177 }
1178 cs.receivedFirstMsg = true
1179 if a.trInfo != nil {
1180 a.mu.Lock()
1181 if a.trInfo.tr != nil {
1182 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1183 }
1184 a.mu.Unlock()
1185 }
1186 if a.statsHandler != nil {
1187 a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
1188 Client: true,
1189 RecvTime: time.Now(),
1190 Payload: m,
1191 WireLength: payInfo.compressedLength + headerLen,
1192 CompressedLength: payInfo.compressedLength,
1193 Length: payInfo.uncompressedBytes.Len(),
1194 })
1195 }
1196 if cs.desc.ServerStreams {
1197 // Subsequent messages should be received by subsequent RecvMsg calls.
1198 return nil
1199 }
1200 // Special handling for non-server-stream rpcs.
1201 // This recv expects EOF or errors, so we don't collect inPayload.
1202 if err := recv(&a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF {
1203 return a.transportStream.Status().Err() // non-server streaming Recv returns nil on success
1204 } else if err != nil {
1205 return toRPCErr(err)
1206 }
1207 return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
1208 }
1209 1210 func (a *csAttempt) finish(err error) {
1211 a.mu.Lock()
1212 if a.finished {
1213 a.mu.Unlock()
1214 return
1215 }
1216 a.finished = true
1217 if err == io.EOF {
1218 // Ending a stream with EOF indicates a success.
1219 err = nil
1220 }
1221 var tr metadata.MD
1222 if a.transportStream != nil {
1223 a.transportStream.Close(err)
1224 tr = a.transportStream.Trailer()
1225 }
1226 1227 if a.pickResult.Done != nil {
1228 br := false
1229 if a.transportStream != nil {
1230 br = a.transportStream.BytesReceived()
1231 }
1232 a.pickResult.Done(balancer.DoneInfo{
1233 Err: err,
1234 Trailer: tr,
1235 BytesSent: a.transportStream != nil,
1236 BytesReceived: br,
1237 ServerLoad: balancerload.Parse(tr),
1238 })
1239 }
1240 if a.statsHandler != nil {
1241 a.statsHandler.HandleRPC(a.ctx, &stats.End{
1242 Client: true,
1243 BeginTime: a.beginTime,
1244 EndTime: time.Now(),
1245 Trailer: tr,
1246 Error: err,
1247 })
1248 }
1249 if a.trInfo != nil && a.trInfo.tr != nil {
1250 if err == nil {
1251 a.trInfo.tr.LazyPrintf("RPC: [OK]")
1252 } else {
1253 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
1254 a.trInfo.tr.SetError()
1255 }
1256 a.trInfo.tr.Finish()
1257 a.trInfo.tr = nil
1258 }
1259 a.mu.Unlock()
1260 }
1261 1262 // newNonRetryClientStream creates a ClientStream with the specified transport, on the
1263 // given addrConn.
1264 //
1265 // It's expected that the given transport is either the same one in addrConn, or
1266 // is already closed. To avoid race, transport is specified separately, instead
1267 // of using ac.transport.
1268 //
1269 // Main difference between this and ClientConn.NewStream:
1270 // - no retry
1271 // - no service config (or wait for service config)
1272 // - no tracing or stats
1273 func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
1274 if t == nil {
1275 // TODO: return RPC error here?
1276 return nil, errors.New("transport provided is nil")
1277 }
1278 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
1279 c := &callInfo{}
1280 1281 // Possible context leak:
1282 // The cancel function for the child context we create will only be called
1283 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
1284 // an error is generated by SendMsg.
1285 // https://github.com/grpc/grpc-go/issues/1818.
1286 ctx, cancel := context.WithCancel(ctx)
1287 defer func() {
1288 if err != nil {
1289 cancel()
1290 }
1291 }()
1292 1293 for _, o := range opts {
1294 if err := o.before(c); err != nil {
1295 return nil, toRPCErr(err)
1296 }
1297 }
1298 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
1299 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
1300 if err := setCallInfoCodec(c); err != nil {
1301 return nil, err
1302 }
1303 1304 callHdr := &transport.CallHdr{
1305 Host: ac.cc.authority,
1306 Method: method,
1307 ContentSubtype: c.contentSubtype,
1308 }
1309 1310 // Set our outgoing compression according to the UseCompressor CallOption, if
1311 // set. In that case, also find the compressor from the encoding package.
1312 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1313 // if set.
1314 var cp Compressor
1315 var comp encoding.Compressor
1316 if ct := c.compressorName; ct != "" {
1317 callHdr.SendCompress = ct
1318 if ct != encoding.Identity {
1319 comp = encoding.GetCompressor(ct)
1320 if comp == nil {
1321 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1322 }
1323 }
1324 } else if ac.cc.dopts.compressorV0 != nil {
1325 callHdr.SendCompress = ac.cc.dopts.compressorV0.Type()
1326 cp = ac.cc.dopts.compressorV0
1327 }
1328 if c.creds != nil {
1329 callHdr.Creds = c.creds
1330 }
1331 1332 // Use a special addrConnStream to avoid retry.
1333 as := &addrConnStream{
1334 callHdr: callHdr,
1335 ac: ac,
1336 ctx: ctx,
1337 cancel: cancel,
1338 opts: opts,
1339 callInfo: c,
1340 desc: desc,
1341 codec: c.codec,
1342 sendCompressorV0: cp,
1343 sendCompressorV1: comp,
1344 transport: t,
1345 }
1346 1347 s, err := as.transport.NewStream(as.ctx, as.callHdr)
1348 if err != nil {
1349 err = toRPCErr(err)
1350 return nil, err
1351 }
1352 as.transportStream = s
1353 as.parser = parser{r: s, bufferPool: ac.dopts.copts.BufferPool}
1354 ac.incrCallsStarted()
1355 if desc != unaryStreamDesc {
1356 // Listen on stream context to cleanup when the stream context is
1357 // canceled. Also listen for the addrConn's context in case the
1358 // addrConn is closed or reconnects to a different address. In all
1359 // other cases, an error should already be injected into the recv
1360 // buffer by the transport, which the client will eventually receive,
1361 // and then we will cancel the stream's context in
1362 // addrConnStream.finish.
1363 go func() {
1364 ac.mu.Lock()
1365 acCtx := ac.ctx
1366 ac.mu.Unlock()
1367 select {
1368 case <-acCtx.Done():
1369 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1370 case <-ctx.Done():
1371 as.finish(toRPCErr(ctx.Err()))
1372 }
1373 }()
1374 }
1375 return as, nil
1376 }
1377 1378 type addrConnStream struct {
1379 transportStream *transport.ClientStream
1380 ac *addrConn
1381 callHdr *transport.CallHdr
1382 cancel context.CancelFunc
1383 opts []CallOption
1384 callInfo *callInfo
1385 transport transport.ClientTransport
1386 ctx context.Context
1387 sentLast bool
1388 receivedFirstMsg bool
1389 desc *StreamDesc
1390 codec baseCodec
1391 sendCompressorV0 Compressor
1392 sendCompressorV1 encoding.Compressor
1393 decompressorSet bool
1394 decompressorV0 Decompressor
1395 decompressorV1 encoding.Compressor
1396 parser parser
1397 1398 // mu guards finished and is held for the entire finish method.
1399 mu sync.Mutex
1400 finished bool
1401 }
1402 1403 func (as *addrConnStream) Header() (metadata.MD, error) {
1404 m, err := as.transportStream.Header()
1405 if err != nil {
1406 as.finish(toRPCErr(err))
1407 }
1408 return m, err
1409 }
1410 1411 func (as *addrConnStream) Trailer() metadata.MD {
1412 return as.transportStream.Trailer()
1413 }
1414 1415 func (as *addrConnStream) CloseSend() error {
1416 if as.sentLast {
1417 // Return a nil error on repeated calls to this method.
1418 return nil
1419 }
1420 as.sentLast = true
1421 1422 as.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
1423 // Always return nil; io.EOF is the only error that might make sense
1424 // instead, but there is no need to signal the client to call RecvMsg
1425 // as the only use left for the stream after CloseSend is to call
1426 // RecvMsg. This also matches historical behavior.
1427 return nil
1428 }
1429 1430 func (as *addrConnStream) Context() context.Context {
1431 return as.transportStream.Context()
1432 }
1433 1434 func (as *addrConnStream) SendMsg(m any) (err error) {
1435 defer func() {
1436 if err != nil && err != io.EOF {
1437 // Call finish on the client stream for errors generated by this SendMsg
1438 // call, as these indicate problems created by this client. (Transport
1439 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1440 // error will be returned from RecvMsg eventually in that case, or be
1441 // retried.)
1442 as.finish(err)
1443 }
1444 }()
1445 if as.sentLast {
1446 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1447 }
1448 if !as.desc.ClientStreams {
1449 as.sentLast = true
1450 }
1451 1452 // load hdr, payload, data
1453 hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.sendCompressorV0, as.sendCompressorV1, as.ac.dopts.copts.BufferPool)
1454 if err != nil {
1455 return err
1456 }
1457 1458 defer func() {
1459 data.Free()
1460 // only free payload if compression was made, and therefore it is a different set
1461 // of buffers from data.
1462 if pf.isCompressed() {
1463 payload.Free()
1464 }
1465 }()
1466 1467 // TODO(dfawley): should we be checking len(data) instead?
1468 if payload.Len() > *as.callInfo.maxSendMessageSize {
1469 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payload.Len(), *as.callInfo.maxSendMessageSize)
1470 }
1471 1472 if err := as.transportStream.Write(hdr, payload, &transport.WriteOptions{Last: !as.desc.ClientStreams}); err != nil {
1473 if !as.desc.ClientStreams {
1474 // For non-client-streaming RPCs, we return nil instead of EOF on error
1475 // because the generated code requires it. finish is not called; RecvMsg()
1476 // will call it with the stream's status independently.
1477 return nil
1478 }
1479 return io.EOF
1480 }
1481 1482 return nil
1483 }
1484 1485 func (as *addrConnStream) RecvMsg(m any) (err error) {
1486 defer func() {
1487 if err != nil || !as.desc.ServerStreams {
1488 // err != nil or non-server-streaming indicates end of stream.
1489 as.finish(err)
1490 }
1491 }()
1492 1493 if !as.decompressorSet {
1494 // Block until we receive headers containing received message encoding.
1495 if ct := as.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1496 if as.decompressorV0 == nil || as.decompressorV0.Type() != ct {
1497 // No configured decompressor, or it does not match the incoming
1498 // message encoding; attempt to find a registered compressor that does.
1499 as.decompressorV0 = nil
1500 as.decompressorV1 = encoding.GetCompressor(ct)
1501 }
1502 // Validate that the compression method is acceptable for this call.
1503 if !acceptedCompressorAllows(as.callInfo.acceptedResponseCompressors, ct) {
1504 return status.Errorf(codes.Internal, "grpc: peer compressed the response with %q which is not allowed by AcceptCompressors", ct)
1505 }
1506 } else {
1507 // No compression is used; disable our decompressor.
1508 as.decompressorV0 = nil
1509 }
1510 // Only initialize this state once per stream.
1511 as.decompressorSet = true
1512 }
1513 if err := recv(&as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err != nil {
1514 if err == io.EOF {
1515 if statusErr := as.transportStream.Status().Err(); statusErr != nil {
1516 return statusErr
1517 }
1518 // Received no msg and status OK for non-server streaming rpcs.
1519 if !as.desc.ServerStreams && !as.receivedFirstMsg {
1520 return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
1521 }
1522 return io.EOF // indicates successful end of stream.
1523 }
1524 return toRPCErr(err)
1525 }
1526 as.receivedFirstMsg = true
1527 1528 if as.desc.ServerStreams {
1529 // Subsequent messages should be received by subsequent RecvMsg calls.
1530 return nil
1531 }
1532 1533 // Special handling for non-server-stream rpcs.
1534 // This recv expects EOF or errors, so we don't collect inPayload.
1535 if err := recv(&as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err == io.EOF {
1536 return as.transportStream.Status().Err() // non-server streaming Recv returns nil on success
1537 } else if err != nil {
1538 return toRPCErr(err)
1539 }
1540 return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
1541 }
1542 1543 func (as *addrConnStream) finish(err error) {
1544 as.mu.Lock()
1545 if as.finished {
1546 as.mu.Unlock()
1547 return
1548 }
1549 as.finished = true
1550 if err == io.EOF {
1551 // Ending a stream with EOF indicates a success.
1552 err = nil
1553 }
1554 if as.transportStream != nil {
1555 as.transportStream.Close(err)
1556 }
1557 1558 if err != nil {
1559 as.ac.incrCallsFailed()
1560 } else {
1561 as.ac.incrCallsSucceeded()
1562 }
1563 as.cancel()
1564 as.mu.Unlock()
1565 }
1566 1567 // ServerStream defines the server-side behavior of a streaming RPC.
1568 //
1569 // Errors returned from ServerStream methods are compatible with the status
1570 // package. However, the status code will often not match the RPC status as
1571 // seen by the client application, and therefore, should not be relied upon for
1572 // this purpose.
1573 type ServerStream interface {
1574 // SetHeader sets the header metadata. It may be called multiple times.
1575 // When call multiple times, all the provided metadata will be merged.
1576 // All the metadata will be sent out when one of the following happens:
1577 // - ServerStream.SendHeader() is called;
1578 // - The first response is sent out;
1579 // - An RPC status is sent out (error or success).
1580 SetHeader(metadata.MD) error
1581 // SendHeader sends the header metadata.
1582 // The provided md and headers set by SetHeader() will be sent.
1583 // It fails if called multiple times.
1584 SendHeader(metadata.MD) error
1585 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
1586 // When called more than once, all the provided metadata will be merged.
1587 SetTrailer(metadata.MD)
1588 // Context returns the context for this stream.
1589 Context() context.Context
1590 // SendMsg sends a message. On error, SendMsg aborts the stream and the
1591 // error is returned directly.
1592 //
1593 // SendMsg blocks until:
1594 // - There is sufficient flow control to schedule m with the transport, or
1595 // - The stream is done, or
1596 // - The stream breaks.
1597 //
1598 // SendMsg does not wait until the message is received by the client. An
1599 // untimely stream closure may result in lost messages.
1600 //
1601 // It is safe to have a goroutine calling SendMsg and another goroutine
1602 // calling RecvMsg on the same stream at the same time, but it is not safe
1603 // to call SendMsg on the same stream in different goroutines.
1604 //
1605 // It is not safe to modify the message after calling SendMsg. Tracing
1606 // libraries and stats handlers may use the message lazily.
1607 SendMsg(m any) error
1608 // RecvMsg blocks until it receives a message into m or the stream is
1609 // done. It returns io.EOF when the client has performed a CloseSend. On
1610 // any non-EOF error, the stream is aborted and the error contains the
1611 // RPC status.
1612 //
1613 // It is safe to have a goroutine calling SendMsg and another goroutine
1614 // calling RecvMsg on the same stream at the same time, but it is not
1615 // safe to call RecvMsg on the same stream in different goroutines.
1616 RecvMsg(m any) error
1617 }
1618 1619 // serverStream implements a server side Stream.
1620 type serverStream struct {
1621 ctx context.Context
1622 s *transport.ServerStream
1623 p parser
1624 codec baseCodec
1625 desc *StreamDesc
1626 1627 compressorV0 Compressor
1628 compressorV1 encoding.Compressor
1629 decompressorV0 Decompressor
1630 decompressorV1 encoding.Compressor
1631 1632 sendCompressorName string
1633 1634 recvFirstMsg bool // set after the first message is received
1635 1636 maxReceiveMessageSize int
1637 maxSendMessageSize int
1638 trInfo *traceInfo
1639 1640 statsHandler stats.Handler
1641 1642 binlogs []binarylog.MethodLogger
1643 // serverHeaderBinlogged indicates whether server header has been logged. It
1644 // will happen when one of the following two happens: stream.SendHeader(),
1645 // stream.Send().
1646 //
1647 // It's only checked in send and sendHeader, doesn't need to be
1648 // synchronized.
1649 serverHeaderBinlogged bool
1650 1651 mu sync.Mutex // protects trInfo.tr after the service handler runs.
1652 }
1653 1654 func (ss *serverStream) Context() context.Context {
1655 return ss.ctx
1656 }
1657 1658 func (ss *serverStream) SetHeader(md metadata.MD) error {
1659 if md.Len() == 0 {
1660 return nil
1661 }
1662 err := imetadata.Validate(md)
1663 if err != nil {
1664 return status.Error(codes.Internal, err.Error())
1665 }
1666 return ss.s.SetHeader(md)
1667 }
1668 1669 func (ss *serverStream) SendHeader(md metadata.MD) error {
1670 err := imetadata.Validate(md)
1671 if err != nil {
1672 return status.Error(codes.Internal, err.Error())
1673 }
1674 1675 err = ss.s.SendHeader(md)
1676 if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
1677 h, _ := ss.s.Header()
1678 sh := &binarylog.ServerHeader{
1679 Header: h,
1680 }
1681 ss.serverHeaderBinlogged = true
1682 for _, binlog := range ss.binlogs {
1683 binlog.Log(ss.ctx, sh)
1684 }
1685 }
1686 return err
1687 }
1688 1689 func (ss *serverStream) SetTrailer(md metadata.MD) {
1690 if md.Len() == 0 {
1691 return
1692 }
1693 if err := imetadata.Validate(md); err != nil {
1694 logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
1695 }
1696 ss.s.SetTrailer(md)
1697 }
1698 1699 func (ss *serverStream) SendMsg(m any) (err error) {
1700 defer func() {
1701 if ss.trInfo != nil {
1702 ss.mu.Lock()
1703 if ss.trInfo.tr != nil {
1704 if err == nil {
1705 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1706 } else {
1707 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1708 ss.trInfo.tr.SetError()
1709 }
1710 }
1711 ss.mu.Unlock()
1712 }
1713 if err != nil && err != io.EOF {
1714 st, _ := status.FromError(toRPCErr(err))
1715 ss.s.WriteStatus(st)
1716 // Non-user specified status was sent out. This should be an error
1717 // case (as a server side Cancel maybe).
1718 //
1719 // This is not handled specifically now. User will return a final
1720 // status from the service handler, we will log that error instead.
1721 // This behavior is similar to an interceptor.
1722 }
1723 }()
1724 1725 // Server handler could have set new compressor by calling SetSendCompressor.
1726 // In case it is set, we need to use it for compressing outbound message.
1727 if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
1728 ss.compressorV1 = encoding.GetCompressor(sendCompressorsName)
1729 ss.sendCompressorName = sendCompressorsName
1730 }
1731 1732 // load hdr, payload, data
1733 hdr, data, payload, pf, err := prepareMsg(m, ss.codec, ss.compressorV0, ss.compressorV1, ss.p.bufferPool)
1734 if err != nil {
1735 return err
1736 }
1737 1738 defer func() {
1739 data.Free()
1740 // only free payload if compression was made, and therefore it is a different set
1741 // of buffers from data.
1742 if pf.isCompressed() {
1743 payload.Free()
1744 }
1745 }()
1746 1747 dataLen := data.Len()
1748 payloadLen := payload.Len()
1749 1750 // TODO(dfawley): should we be checking len(data) instead?
1751 if payloadLen > ss.maxSendMessageSize {
1752 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, ss.maxSendMessageSize)
1753 }
1754 if err := ss.s.Write(hdr, payload, &transport.WriteOptions{Last: false}); err != nil {
1755 return toRPCErr(err)
1756 }
1757 1758 if len(ss.binlogs) != 0 {
1759 if !ss.serverHeaderBinlogged {
1760 h, _ := ss.s.Header()
1761 sh := &binarylog.ServerHeader{
1762 Header: h,
1763 }
1764 ss.serverHeaderBinlogged = true
1765 for _, binlog := range ss.binlogs {
1766 binlog.Log(ss.ctx, sh)
1767 }
1768 }
1769 sm := &binarylog.ServerMessage{
1770 Message: data.Materialize(),
1771 }
1772 for _, binlog := range ss.binlogs {
1773 binlog.Log(ss.ctx, sm)
1774 }
1775 }
1776 if ss.statsHandler != nil {
1777 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, dataLen, payloadLen, time.Now()))
1778 }
1779 return nil
1780 }
1781 1782 func (ss *serverStream) RecvMsg(m any) (err error) {
1783 defer func() {
1784 if ss.trInfo != nil {
1785 ss.mu.Lock()
1786 if ss.trInfo.tr != nil {
1787 if err == nil {
1788 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1789 } else if err != io.EOF {
1790 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1791 ss.trInfo.tr.SetError()
1792 }
1793 }
1794 ss.mu.Unlock()
1795 }
1796 if err != nil && err != io.EOF {
1797 st, _ := status.FromError(toRPCErr(err))
1798 ss.s.WriteStatus(st)
1799 // Non-user specified status was sent out. This should be an error
1800 // case (as a server side Cancel maybe).
1801 //
1802 // This is not handled specifically now. User will return a final
1803 // status from the service handler, we will log that error instead.
1804 // This behavior is similar to an interceptor.
1805 }
1806 }()
1807 var payInfo *payloadInfo
1808 if ss.statsHandler != nil || len(ss.binlogs) != 0 {
1809 payInfo = &payloadInfo{}
1810 defer payInfo.free()
1811 }
1812 if err := recv(&ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, payInfo, ss.decompressorV1, true); err != nil {
1813 if err == io.EOF {
1814 if len(ss.binlogs) != 0 {
1815 chc := &binarylog.ClientHalfClose{}
1816 for _, binlog := range ss.binlogs {
1817 binlog.Log(ss.ctx, chc)
1818 }
1819 }
1820 // Received no request msg for non-client streaming rpcs.
1821 if !ss.desc.ClientStreams && !ss.recvFirstMsg {
1822 return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC")
1823 }
1824 return err
1825 }
1826 if err == io.ErrUnexpectedEOF {
1827 err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())
1828 }
1829 return toRPCErr(err)
1830 }
1831 ss.recvFirstMsg = true
1832 if ss.statsHandler != nil {
1833 ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
1834 RecvTime: time.Now(),
1835 Payload: m,
1836 Length: payInfo.uncompressedBytes.Len(),
1837 WireLength: payInfo.compressedLength + headerLen,
1838 CompressedLength: payInfo.compressedLength,
1839 })
1840 }
1841 if len(ss.binlogs) != 0 {
1842 cm := &binarylog.ClientMessage{
1843 Message: payInfo.uncompressedBytes.Materialize(),
1844 }
1845 for _, binlog := range ss.binlogs {
1846 binlog.Log(ss.ctx, cm)
1847 }
1848 }
1849 1850 if ss.desc.ClientStreams {
1851 // Subsequent messages should be received by subsequent RecvMsg calls.
1852 return nil
1853 }
1854 // Special handling for non-client-stream rpcs.
1855 // This recv expects EOF or errors, so we don't collect inPayload.
1856 if err := recv(&ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, nil, ss.decompressorV1, true); err == io.EOF {
1857 return nil
1858 } else if err != nil {
1859 return err
1860 }
1861 return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-client-streaming RPC")
1862 }
1863 1864 // MethodFromServerStream returns the method string for the input stream.
1865 // The returned string is in the format of "/service/method".
1866 func MethodFromServerStream(stream ServerStream) (string, bool) {
1867 return Method(stream.Context())
1868 }
1869 1870 // prepareMsg returns the hdr, payload and data using the compressors passed or
1871 // using the passed preparedmsg. The returned boolean indicates whether
1872 // compression was made and therefore whether the payload needs to be freed in
1873 // addition to the returned data. Freeing the payload if the returned boolean is
1874 // false can lead to undefined behavior.
1875 func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor, pool mem.BufferPool) (hdr []byte, data, payload mem.BufferSlice, pf payloadFormat, err error) {
1876 if preparedMsg, ok := m.(*PreparedMsg); ok {
1877 return preparedMsg.hdr, preparedMsg.encodedData, preparedMsg.payload, preparedMsg.pf, nil
1878 }
1879 // The input interface is not a prepared msg.
1880 // Marshal and Compress the data at this point
1881 data, err = encode(codec, m)
1882 if err != nil {
1883 return nil, nil, nil, 0, err
1884 }
1885 compData, pf, err := compress(data, cp, comp, pool)
1886 if err != nil {
1887 data.Free()
1888 return nil, nil, nil, 0, err
1889 }
1890 hdr, payload = msgHeader(data, compData, pf)
1891 return hdr, data, payload, pf, nil
1892 }
1893