1 /*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 package grpc
20 21 import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "net/url"
27 "slices"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32 33 "google.golang.org/grpc/balancer"
34 "google.golang.org/grpc/balancer/base"
35 "google.golang.org/grpc/balancer/pickfirst"
36 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/connectivity"
38 "google.golang.org/grpc/credentials"
39 expstats "google.golang.org/grpc/experimental/stats"
40 "google.golang.org/grpc/internal"
41 "google.golang.org/grpc/internal/channelz"
42 "google.golang.org/grpc/internal/grpcsync"
43 "google.golang.org/grpc/internal/idle"
44 iresolver "google.golang.org/grpc/internal/resolver"
45 istats "google.golang.org/grpc/internal/stats"
46 "google.golang.org/grpc/internal/transport"
47 "google.golang.org/grpc/keepalive"
48 "google.golang.org/grpc/resolver"
49 "google.golang.org/grpc/serviceconfig"
50 "google.golang.org/grpc/stats"
51 "google.golang.org/grpc/status"
52 53 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
54 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
55 _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
56 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
57 )
58 59 const (
60 // minimum time to give a connection to complete
61 minConnectTimeout = 20 * time.Second
62 )
63 64 var (
65 // ErrClientConnClosing indicates that the operation is illegal because
66 // the ClientConn is closing.
67 //
68 // Deprecated: this error should not be relied upon by users; use the status
69 // code of Canceled instead.
70 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
71 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
72 errConnDrain = errors.New("grpc: the connection is drained")
73 // errConnClosing indicates that the connection is closing.
74 errConnClosing = errors.New("grpc: the connection is closing")
75 // errConnIdling indicates the connection is being closed as the channel
76 // is moving to an idle mode due to inactivity.
77 errConnIdling = errors.New("grpc: the connection is closing due to channel idleness")
78 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
79 // service config.
80 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
81 // PickFirstBalancerName is the name of the pick_first balancer.
82 PickFirstBalancerName = pickfirst.Name
83 )
84 85 // The following errors are returned from Dial and DialContext
86 var (
87 // errNoTransportSecurity indicates that there is no transport security
88 // being set for ClientConn. Users should either set one or explicitly
89 // call WithInsecure DialOption to disable security.
90 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithTransportCredentials(insecure.NewCredentials()) explicitly or set credentials)")
91 // errTransportCredsAndBundle indicates that creds bundle is used together
92 // with other individual Transport Credentials.
93 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
94 // errNoTransportCredsInBundle indicated that the configured creds bundle
95 // returned a transport credentials which was nil.
96 errNoTransportCredsInBundle = errors.New("grpc: credentials.Bundle must return non-nil transport credentials")
97 // errTransportCredentialsMissing indicates that users want to transmit
98 // security information (e.g., OAuth2 token) which requires secure
99 // connection on an insecure connection.
100 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
101 )
102 103 var (
104 disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
105 Name: "grpc.subchannel.disconnections",
106 Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
107 Unit: "{disconnection}",
108 Labels: []string{"grpc.target"},
109 OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality", "grpc.disconnect_error"},
110 Default: false,
111 })
112 connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
113 Name: "grpc.subchannel.connection_attempts_succeeded",
114 Description: "EXPERIMENTAL. Number of successful connection attempts.",
115 Unit: "{attempt}",
116 Labels: []string{"grpc.target"},
117 OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"},
118 Default: false,
119 })
120 connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
121 Name: "grpc.subchannel.connection_attempts_failed",
122 Description: "EXPERIMENTAL. Number of failed connection attempts.",
123 Unit: "{attempt}",
124 Labels: []string{"grpc.target"},
125 OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"},
126 Default: false,
127 })
128 openConnectionsMetric = expstats.RegisterInt64UpDownCount(expstats.MetricDescriptor{
129 Name: "grpc.subchannel.open_connections",
130 Description: "EXPERIMENTAL. Number of open connections.",
131 Unit: "{attempt}",
132 Labels: []string{"grpc.target"},
133 OptionalLabels: []string{"grpc.lb.backend_service", "grpc.security_level", "grpc.lb.locality"},
134 Default: false,
135 })
136 )
137 138 const (
139 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
140 defaultClientMaxSendMessageSize = math.MaxInt32
141 // http2IOBufSize specifies the buffer size for sending frames.
142 defaultWriteBufSize = 32 * 1024
143 defaultReadBufSize = 32 * 1024
144 )
145 146 type defaultConfigSelector struct {
147 sc *ServiceConfig
148 }
149 150 func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
151 return &iresolver.RPCConfig{
152 Context: rpcInfo.Context,
153 MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
154 }, nil
155 }
156 157 // NewClient creates a new gRPC "channel" for the target URI provided. No I/O
158 // is performed. Use of the ClientConn for RPCs will automatically cause it to
159 // connect. The Connect method may be called to manually create a connection,
160 // but for most users this should be unnecessary.
161 //
162 // The target name syntax is defined in
163 // https://github.com/grpc/grpc/blob/master/doc/naming.md. E.g. to use the dns
164 // name resolver, a "dns:///" prefix may be applied to the target. The default
165 // name resolver will be used if no scheme is detected, or if the parsed scheme
166 // is not a registered name resolver. The default resolver is "dns" but can be
167 // overridden using the resolver package's SetDefaultScheme.
168 //
169 // Examples:
170 //
171 // - "foo.googleapis.com:8080"
172 // - "dns:///foo.googleapis.com:8080"
173 // - "dns:///foo.googleapis.com"
174 // - "dns:///10.0.0.213:8080"
175 // - "dns:///%5B2001:db8:85a3:8d3:1319:8a2e:370:7348%5D:443"
176 // - "dns://8.8.8.8/foo.googleapis.com:8080"
177 // - "dns://8.8.8.8/foo.googleapis.com"
178 // - "zookeeper://zk.example.com:9900/example_service"
179 //
180 // The DialOptions returned by WithBlock, WithTimeout,
181 // WithReturnConnectionError, and FailOnNonTempDialError are ignored by this
182 // function.
183 func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
184 cc := &ClientConn{
185 target: target,
186 conns: make(map[*addrConn]struct{}),
187 dopts: defaultDialOptions(),
188 }
189 190 cc.retryThrottler.Store((*retryThrottler)(nil))
191 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
192 cc.ctx, cc.cancel = context.WithCancel(context.Background())
193 194 // Apply dial options.
195 disableGlobalOpts := false
196 for _, opt := range opts {
197 if _, ok := opt.(*disableGlobalDialOptions); ok {
198 disableGlobalOpts = true
199 break
200 }
201 }
202 203 if !disableGlobalOpts {
204 for _, opt := range globalDialOptions {
205 opt.apply(&cc.dopts)
206 }
207 }
208 209 for _, opt := range opts {
210 opt.apply(&cc.dopts)
211 }
212 213 // Determine the resolver to use.
214 if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
215 return nil, err
216 }
217 218 for _, opt := range globalPerTargetDialOptions {
219 opt.DialOptionForTarget(cc.parsedTarget.URL).apply(&cc.dopts)
220 }
221 222 chainUnaryClientInterceptors(cc)
223 chainStreamClientInterceptors(cc)
224 225 if err := cc.validateTransportCredentials(); err != nil {
226 return nil, err
227 }
228 229 if cc.dopts.defaultServiceConfigRawJSON != nil {
230 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
231 if scpr.Err != nil {
232 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
233 }
234 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
235 }
236 cc.keepaliveParams = cc.dopts.copts.KeepaliveParams
237 238 if err = cc.initAuthority(); err != nil {
239 return nil, err
240 }
241 242 // Register ClientConn with channelz. Note that this is only done after
243 // channel creation cannot fail.
244 cc.channelzRegistration(target)
245 channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget)
246 channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
247 248 cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
249 cc.pickerWrapper = newPickerWrapper()
250 251 cc.metricsRecorderList = istats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)
252 cc.statsHandler = istats.NewCombinedHandler(cc.dopts.copts.StatsHandlers...)
253 254 cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
255 cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
256 257 return cc, nil
258 }
259 260 // Dial calls DialContext(context.Background(), target, opts...).
261 //
262 // Deprecated: use NewClient instead. Will be supported throughout 1.x.
263 func Dial(target string, opts ...DialOption) (*ClientConn, error) {
264 return DialContext(context.Background(), target, opts...)
265 }
266 267 // DialContext calls NewClient and then exits idle mode. If WithBlock(true) is
268 // used, it calls Connect and WaitForStateChange until either the context
269 // expires or the state of the ClientConn is Ready.
270 //
271 // One subtle difference between NewClient and Dial and DialContext is that the
272 // former uses "dns" as the default name resolver, while the latter use
273 // "passthrough" for backward compatibility. This distinction should not matter
274 // to most users, but could matter to legacy users that specify a custom dialer
275 // and expect it to receive the target string directly.
276 //
277 // Deprecated: use NewClient instead. Will be supported throughout 1.x.
278 func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
279 // At the end of this method, we kick the channel out of idle, rather than
280 // waiting for the first rpc.
281 //
282 // WithLocalDNSResolution dial option in `grpc.Dial` ensures that it
283 // preserves behavior: when default scheme passthrough is used, skip
284 // hostname resolution, when "dns" is used for resolution, perform
285 // resolution on the client.
286 opts = append([]DialOption{withDefaultScheme("passthrough"), WithLocalDNSResolution()}, opts...)
287 cc, err := NewClient(target, opts...)
288 if err != nil {
289 return nil, err
290 }
291 292 // We start the channel off in idle mode, but kick it out of idle now,
293 // instead of waiting for the first RPC. This is the legacy behavior of
294 // Dial.
295 defer func() {
296 if err != nil {
297 cc.Close()
298 }
299 }()
300 301 // This creates the name resolver, load balancer, etc.
302 if err := cc.exitIdleMode(); err != nil {
303 return nil, fmt.Errorf("failed to exit idle mode: %w", err)
304 }
305 cc.idlenessMgr.UnsafeSetNotIdle()
306 307 // Return now for non-blocking dials.
308 if !cc.dopts.block {
309 return cc, nil
310 }
311 312 if cc.dopts.timeout > 0 {
313 var cancel context.CancelFunc
314 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
315 defer cancel()
316 }
317 defer func() {
318 select {
319 case <-ctx.Done():
320 switch {
321 case ctx.Err() == err:
322 conn = nil
323 case err == nil || !cc.dopts.returnLastError:
324 conn, err = nil, ctx.Err()
325 default:
326 conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
327 }
328 default:
329 }
330 }()
331 332 // A blocking dial blocks until the clientConn is ready.
333 for {
334 s := cc.GetState()
335 if s == connectivity.Idle {
336 cc.Connect()
337 }
338 if s == connectivity.Ready {
339 return cc, nil
340 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
341 if err = cc.connectionError(); err != nil {
342 terr, ok := err.(interface {
343 Temporary() bool
344 })
345 if ok && !terr.Temporary() {
346 return nil, err
347 }
348 }
349 }
350 if !cc.WaitForStateChange(ctx, s) {
351 // ctx got timeout or canceled.
352 if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
353 return nil, err
354 }
355 return nil, ctx.Err()
356 }
357 }
358 }
359 360 // addTraceEvent is a helper method to add a trace event on the channel. If the
361 // channel is a nested one, the same event is also added on the parent channel.
362 func (cc *ClientConn) addTraceEvent(msg string) {
363 ted := &channelz.TraceEvent{
364 Desc: fmt.Sprintf("Channel %s", msg),
365 Severity: channelz.CtInfo,
366 }
367 if cc.dopts.channelzParent != nil {
368 ted.Parent = &channelz.TraceEvent{
369 Desc: fmt.Sprintf("Nested channel(id:%d) %s", cc.channelz.ID, msg),
370 Severity: channelz.CtInfo,
371 }
372 }
373 channelz.AddTraceEvent(logger, cc.channelz, 1, ted)
374 }
375 376 type idler ClientConn
377 378 func (i *idler) EnterIdleMode() {
379 (*ClientConn)(i).enterIdleMode()
380 }
381 382 func (i *idler) ExitIdleMode() {
383 // Ignore the error returned from this method, because from the perspective
384 // of the caller (idleness manager), the channel would have always moved out
385 // of IDLE by the time this method returns.
386 (*ClientConn)(i).exitIdleMode()
387 }
388 389 // exitIdleMode moves the channel out of idle mode by recreating the name
390 // resolver and load balancer. This should never be called directly; use
391 // cc.idlenessMgr.ExitIdleMode instead.
392 func (cc *ClientConn) exitIdleMode() error {
393 cc.mu.Lock()
394 if cc.conns == nil {
395 cc.mu.Unlock()
396 return errConnClosing
397 }
398 cc.mu.Unlock()
399 400 // Set state to CONNECTING before building the name resolver
401 // so the channel does not remain in IDLE.
402 cc.csMgr.updateState(connectivity.Connecting)
403 404 // This needs to be called without cc.mu because this builds a new resolver
405 // which might update state or report error inline, which would then need to
406 // acquire cc.mu.
407 if err := cc.resolverWrapper.start(); err != nil {
408 // If resolver creation fails, treat it like an error reported by the
409 // resolver before any valid updates. Set channel's state to
410 // TransientFailure, and set an erroring picker with the resolver build
411 // error, which will returned as part of any subsequent RPCs.
412 logger.Warningf("Failed to start resolver: %v", err)
413 cc.csMgr.updateState(connectivity.TransientFailure)
414 cc.mu.Lock()
415 cc.updateResolverStateAndUnlock(resolver.State{}, err)
416 return fmt.Errorf("failed to start resolver: %w", err)
417 }
418 419 cc.addTraceEvent("exiting idle mode")
420 return nil
421 }
422 423 // initIdleStateLocked initializes common state to how it should be while idle.
424 func (cc *ClientConn) initIdleStateLocked() {
425 cc.resolverWrapper = newCCResolverWrapper(cc)
426 cc.balancerWrapper = newCCBalancerWrapper(cc)
427 cc.firstResolveEvent = grpcsync.NewEvent()
428 // cc.conns == nil is a proxy for the ClientConn being closed. So, instead
429 // of setting it to nil here, we recreate the map. This also means that we
430 // don't have to do this when exiting idle mode.
431 cc.conns = make(map[*addrConn]struct{})
432 }
433 434 // enterIdleMode puts the channel in idle mode, and as part of it shuts down the
435 // name resolver, load balancer, and any subchannels. This should never be
436 // called directly; use cc.idlenessMgr.EnterIdleMode instead.
437 func (cc *ClientConn) enterIdleMode() {
438 cc.mu.Lock()
439 440 if cc.conns == nil {
441 cc.mu.Unlock()
442 return
443 }
444 445 conns := cc.conns
446 447 rWrapper := cc.resolverWrapper
448 rWrapper.close()
449 cc.pickerWrapper.reset()
450 bWrapper := cc.balancerWrapper
451 bWrapper.close()
452 cc.csMgr.updateState(connectivity.Idle)
453 cc.addTraceEvent("entering idle mode")
454 455 cc.initIdleStateLocked()
456 457 cc.mu.Unlock()
458 459 // Block until the name resolver and LB policy are closed.
460 <-rWrapper.serializer.Done()
461 <-bWrapper.serializer.Done()
462 463 // Close all subchannels after the LB policy is closed.
464 for ac := range conns {
465 ac.tearDown(errConnIdling)
466 }
467 }
468 469 // validateTransportCredentials performs a series of checks on the configured
470 // transport credentials. It returns a non-nil error if any of these conditions
471 // are met:
472 // - no transport creds and no creds bundle is configured
473 // - both transport creds and creds bundle are configured
474 // - creds bundle is configured, but it lacks a transport credentials
475 // - insecure transport creds configured alongside call creds that require
476 // transport level security
477 //
478 // If none of the above conditions are met, the configured credentials are
479 // deemed valid and a nil error is returned.
480 func (cc *ClientConn) validateTransportCredentials() error {
481 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
482 return errNoTransportSecurity
483 }
484 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
485 return errTransportCredsAndBundle
486 }
487 if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
488 return errNoTransportCredsInBundle
489 }
490 transportCreds := cc.dopts.copts.TransportCredentials
491 if transportCreds == nil {
492 transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
493 }
494 if transportCreds.Info().SecurityProtocol == "insecure" {
495 for _, cd := range cc.dopts.copts.PerRPCCredentials {
496 if cd.RequireTransportSecurity() {
497 return errTransportCredentialsMissing
498 }
499 }
500 }
501 return nil
502 }
503 504 // channelzRegistration registers the newly created ClientConn with channelz and
505 // stores the returned identifier in `cc.channelz`. A channelz trace event is
506 // emitted for ClientConn creation. If the newly created ClientConn is a nested
507 // one, i.e a valid parent ClientConn ID is specified via a dial option, the
508 // trace event is also added to the parent.
509 //
510 // Doesn't grab cc.mu as this method is expected to be called only at Dial time.
511 func (cc *ClientConn) channelzRegistration(target string) {
512 parentChannel, _ := cc.dopts.channelzParent.(*channelz.Channel)
513 cc.channelz = channelz.RegisterChannel(parentChannel, target)
514 cc.addTraceEvent(fmt.Sprintf("created for target %q", target))
515 }
516 517 // chainUnaryClientInterceptors chains all unary client interceptors into one.
518 func chainUnaryClientInterceptors(cc *ClientConn) {
519 interceptors := cc.dopts.chainUnaryInts
520 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
521 // be executed before any other chained interceptors.
522 if cc.dopts.unaryInt != nil {
523 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
524 }
525 var chainedInt UnaryClientInterceptor
526 if len(interceptors) == 0 {
527 chainedInt = nil
528 } else if len(interceptors) == 1 {
529 chainedInt = interceptors[0]
530 } else {
531 chainedInt = func(ctx context.Context, method string, req, reply any, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
532 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
533 }
534 }
535 cc.dopts.unaryInt = chainedInt
536 }
537 538 // getChainUnaryInvoker recursively generate the chained unary invoker.
539 func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
540 if curr == len(interceptors)-1 {
541 return finalInvoker
542 }
543 return func(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {
544 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
545 }
546 }
547 548 // chainStreamClientInterceptors chains all stream client interceptors into one.
549 func chainStreamClientInterceptors(cc *ClientConn) {
550 interceptors := cc.dopts.chainStreamInts
551 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
552 // be executed before any other chained interceptors.
553 if cc.dopts.streamInt != nil {
554 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
555 }
556 var chainedInt StreamClientInterceptor
557 if len(interceptors) == 0 {
558 chainedInt = nil
559 } else if len(interceptors) == 1 {
560 chainedInt = interceptors[0]
561 } else {
562 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
563 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
564 }
565 }
566 cc.dopts.streamInt = chainedInt
567 }
568 569 // getChainStreamer recursively generate the chained client stream constructor.
570 func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
571 if curr == len(interceptors)-1 {
572 return finalStreamer
573 }
574 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
575 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
576 }
577 }
578 579 // newConnectivityStateManager creates an connectivityStateManager with
580 // the specified channel.
581 func newConnectivityStateManager(ctx context.Context, channel *channelz.Channel) *connectivityStateManager {
582 return &connectivityStateManager{
583 channelz: channel,
584 pubSub: grpcsync.NewPubSub(ctx),
585 }
586 }
587 588 // connectivityStateManager keeps the connectivity.State of ClientConn.
589 // This struct will eventually be exported so the balancers can access it.
590 //
591 // TODO: If possible, get rid of the `connectivityStateManager` type, and
592 // provide this functionality using the `PubSub`, to avoid keeping track of
593 // the connectivity state at two places.
594 type connectivityStateManager struct {
595 mu sync.Mutex
596 state connectivity.State
597 notifyChan chan struct{}
598 channelz *channelz.Channel
599 pubSub *grpcsync.PubSub
600 }
601 602 // updateState updates the connectivity.State of ClientConn.
603 // If there's a change it notifies goroutines waiting on state change to
604 // happen.
605 func (csm *connectivityStateManager) updateState(state connectivity.State) {
606 csm.mu.Lock()
607 defer csm.mu.Unlock()
608 if csm.state == connectivity.Shutdown {
609 return
610 }
611 if csm.state == state {
612 return
613 }
614 csm.state = state
615 csm.channelz.ChannelMetrics.State.Store(&state)
616 csm.pubSub.Publish(state)
617 618 channelz.Infof(logger, csm.channelz, "Channel Connectivity change to %v", state)
619 if csm.notifyChan != nil {
620 // There are other goroutines waiting on this channel.
621 close(csm.notifyChan)
622 csm.notifyChan = nil
623 }
624 }
625 626 func (csm *connectivityStateManager) getState() connectivity.State {
627 csm.mu.Lock()
628 defer csm.mu.Unlock()
629 return csm.state
630 }
631 632 func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
633 csm.mu.Lock()
634 defer csm.mu.Unlock()
635 if csm.notifyChan == nil {
636 csm.notifyChan = make(chan struct{})
637 }
638 return csm.notifyChan
639 }
640 641 // ClientConnInterface defines the functions clients need to perform unary and
642 // streaming RPCs. It is implemented by *ClientConn, and is only intended to
643 // be referenced by generated code.
644 type ClientConnInterface interface {
645 // Invoke performs a unary RPC and returns after the response is received
646 // into reply.
647 Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error
648 // NewStream begins a streaming RPC.
649 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
650 }
651 652 // Assert *ClientConn implements ClientConnInterface.
653 var _ ClientConnInterface = (*ClientConn)(nil)
654 655 // ClientConn represents a virtual connection to a conceptual endpoint, to
656 // perform RPCs.
657 //
658 // A ClientConn is free to have zero or more actual connections to the endpoint
659 // based on configuration, load, etc. It is also free to determine which actual
660 // endpoints to use and may change it every RPC, permitting client-side load
661 // balancing.
662 //
663 // A ClientConn encapsulates a range of functionality including name
664 // resolution, TCP connection establishment (with retries and backoff) and TLS
665 // handshakes. It also handles errors on established connections by
666 // re-resolving the name and reconnecting.
667 type ClientConn struct {
668 ctx context.Context // Initialized using the background context at dial time.
669 cancel context.CancelFunc // Cancelled on close.
670 671 // The following are initialized at dial time, and are read-only after that.
672 target string // User's dial target.
673 parsedTarget resolver.Target // See initParsedTargetAndResolverBuilder().
674 authority string // See initAuthority().
675 dopts dialOptions // Default and user specified dial options.
676 channelz *channelz.Channel // Channelz object.
677 resolverBuilder resolver.Builder // See initParsedTargetAndResolverBuilder().
678 idlenessMgr *idle.Manager
679 metricsRecorderList *istats.MetricsRecorderList
680 statsHandler stats.Handler
681 682 // The following provide their own synchronization, and therefore don't
683 // require cc.mu to be held to access them.
684 csMgr *connectivityStateManager
685 pickerWrapper *pickerWrapper
686 safeConfigSelector iresolver.SafeConfigSelector
687 retryThrottler atomic.Value // Updated from service config.
688 689 // mu protects the following fields.
690 // TODO: split mu so the same mutex isn't used for everything.
691 mu sync.RWMutex
692 resolverWrapper *ccResolverWrapper // Always recreated whenever entering idle to simplify Close.
693 balancerWrapper *ccBalancerWrapper // Always recreated whenever entering idle to simplify Close.
694 sc *ServiceConfig // Latest service config received from the resolver.
695 conns map[*addrConn]struct{} // Set to nil on close.
696 keepaliveParams keepalive.ClientParameters // May be updated upon receipt of a GoAway.
697 // firstResolveEvent is used to track whether the name resolver sent us at
698 // least one update. RPCs block on this event. May be accessed without mu
699 // if we know we cannot be asked to enter idle mode while accessing it (e.g.
700 // when the idle manager has already been closed, or if we are already
701 // entering idle mode).
702 firstResolveEvent *grpcsync.Event
703 704 lceMu sync.Mutex // protects lastConnectionError
705 lastConnectionError error
706 }
707 708 // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
709 // ctx expires. A true value is returned in former case and false in latter.
710 func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
711 ch := cc.csMgr.getNotifyChan()
712 if cc.csMgr.getState() != sourceState {
713 return true
714 }
715 select {
716 case <-ctx.Done():
717 return false
718 case <-ch:
719 return true
720 }
721 }
722 723 // GetState returns the connectivity.State of ClientConn.
724 func (cc *ClientConn) GetState() connectivity.State {
725 return cc.csMgr.getState()
726 }
727 728 // Connect causes all subchannels in the ClientConn to attempt to connect if
729 // the channel is idle. Does not wait for the connection attempts to begin
730 // before returning.
731 //
732 // # Experimental
733 //
734 // Notice: This API is EXPERIMENTAL and may be changed or removed in a later
735 // release.
736 func (cc *ClientConn) Connect() {
737 cc.idlenessMgr.ExitIdleMode()
738 739 // If the ClientConn was not in idle mode, we need to call ExitIdle on the
740 // LB policy so that connections can be created.
741 cc.mu.Lock()
742 cc.balancerWrapper.exitIdle()
743 cc.mu.Unlock()
744 }
745 746 // waitForResolvedAddrs blocks until the resolver provides addresses or the
747 // context expires, whichever happens first.
748 //
749 // Error is nil unless the context expires first; otherwise returns a status
750 // error based on the context.
751 //
752 // The returned boolean indicates whether it did block or not. If the
753 // resolution has already happened once before, it returns false without
754 // blocking. Otherwise, it wait for the resolution and return true if
755 // resolution has succeeded or return false along with error if resolution has
756 // failed.
757 func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) (bool, error) {
758 // This is on the RPC path, so we use a fast path to avoid the
759 // more-expensive "select" below after the resolver has returned once.
760 if cc.firstResolveEvent.HasFired() {
761 return false, nil
762 }
763 internal.NewStreamWaitingForResolver()
764 select {
765 case <-cc.firstResolveEvent.Done():
766 return true, nil
767 case <-ctx.Done():
768 return false, status.FromContextError(ctx.Err()).Err()
769 case <-cc.ctx.Done():
770 return false, ErrClientConnClosing
771 }
772 }
773 774 var emptyServiceConfig *ServiceConfig
775 776 func init() {
777 cfg := parseServiceConfig("{}", defaultMaxCallAttempts)
778 if cfg.Err != nil {
779 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
780 }
781 emptyServiceConfig = cfg.Config.(*ServiceConfig)
782 783 internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {
784 return cc.csMgr.pubSub.Subscribe(s)
785 }
786 internal.EnterIdleModeForTesting = func(cc *ClientConn) {
787 cc.idlenessMgr.EnterIdleModeForTesting()
788 }
789 internal.ExitIdleModeForTesting = func(cc *ClientConn) {
790 cc.idlenessMgr.ExitIdleMode()
791 }
792 }
793 794 func (cc *ClientConn) maybeApplyDefaultServiceConfig() {
795 if cc.sc != nil {
796 cc.applyServiceConfigAndBalancer(cc.sc, nil)
797 return
798 }
799 if cc.dopts.defaultServiceConfig != nil {
800 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig})
801 } else {
802 cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig})
803 }
804 }
805 806 func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error {
807 defer cc.firstResolveEvent.Fire()
808 // Check if the ClientConn is already closed. Some fields (e.g.
809 // balancerWrapper) are set to nil when closing the ClientConn, and could
810 // cause nil pointer panic if we don't have this check.
811 if cc.conns == nil {
812 cc.mu.Unlock()
813 return nil
814 }
815 816 if err != nil {
817 // May need to apply the initial service config in case the resolver
818 // doesn't support service configs, or doesn't provide a service config
819 // with the new addresses.
820 cc.maybeApplyDefaultServiceConfig()
821 822 cc.balancerWrapper.resolverError(err)
823 824 // No addresses are valid with err set; return early.
825 cc.mu.Unlock()
826 return balancer.ErrBadResolverState
827 }
828 829 var ret error
830 if cc.dopts.disableServiceConfig {
831 channelz.Infof(logger, cc.channelz, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
832 cc.maybeApplyDefaultServiceConfig()
833 } else if s.ServiceConfig == nil {
834 cc.maybeApplyDefaultServiceConfig()
835 // TODO: do we need to apply a failing LB policy if there is no
836 // default, per the error handling design?
837 } else {
838 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
839 configSelector := iresolver.GetConfigSelector(s)
840 if configSelector != nil {
841 if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
842 channelz.Infof(logger, cc.channelz, "method configs in service config will be ignored due to presence of config selector")
843 }
844 } else {
845 configSelector = &defaultConfigSelector{sc}
846 }
847 cc.applyServiceConfigAndBalancer(sc, configSelector)
848 } else {
849 ret = balancer.ErrBadResolverState
850 if cc.sc == nil {
851 // Apply the failing LB only if we haven't received valid service config
852 // from the name resolver in the past.
853 cc.applyFailingLBLocked(s.ServiceConfig)
854 cc.mu.Unlock()
855 return ret
856 }
857 }
858 }
859 860 balCfg := cc.sc.lbConfig
861 bw := cc.balancerWrapper
862 cc.mu.Unlock()
863 864 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
865 if ret == nil {
866 ret = uccsErr // prefer ErrBadResolver state since any other error is
867 // currently meaningless to the caller.
868 }
869 return ret
870 }
871 872 // applyFailingLBLocked is akin to configuring an LB policy on the channel which
873 // always fails RPCs. Here, an actual LB policy is not configured, but an always
874 // erroring picker is configured, which returns errors with information about
875 // what was invalid in the received service config. A config selector with no
876 // service config is configured, and the connectivity state of the channel is
877 // set to TransientFailure.
878 func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) {
879 var err error
880 if sc.Err != nil {
881 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err)
882 } else {
883 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
884 }
885 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
886 cc.pickerWrapper.updatePicker(base.NewErrPicker(err))
887 cc.csMgr.updateState(connectivity.TransientFailure)
888 }
889 890 // Makes a copy of the input addresses slice. Addresses are passed during
891 // subconn creation and address update operations.
892 func copyAddresses(in []resolver.Address) []resolver.Address {
893 out := make([]resolver.Address, len(in))
894 copy(out, in)
895 return out
896 }
897 898 // newAddrConnLocked creates an addrConn for addrs and adds it to cc.conns.
899 //
900 // Caller needs to make sure len(addrs) > 0.
901 func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
902 if cc.conns == nil {
903 return nil, ErrClientConnClosing
904 }
905 906 ac := &addrConn{
907 state: connectivity.Idle,
908 cc: cc,
909 addrs: copyAddresses(addrs),
910 scopts: opts,
911 dopts: cc.dopts,
912 channelz: channelz.RegisterSubChannel(cc.channelz, ""),
913 resetBackoff: make(chan struct{}),
914 }
915 ac.updateTelemetryLabelsLocked()
916 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
917 // Start with our address set to the first address; this may be updated if
918 // we connect to different addresses.
919 ac.channelz.ChannelMetrics.Target.Store(&addrs[0].Addr)
920 921 channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
922 Desc: "Subchannel created",
923 Severity: channelz.CtInfo,
924 Parent: &channelz.TraceEvent{
925 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelz.ID),
926 Severity: channelz.CtInfo,
927 },
928 })
929 930 // Track ac in cc. This needs to be done before any getTransport(...) is called.
931 cc.conns[ac] = struct{}{}
932 return ac, nil
933 }
934 935 // removeAddrConn removes the addrConn in the subConn from clientConn.
936 // It also tears down the ac with the given error.
937 func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
938 cc.mu.Lock()
939 if cc.conns == nil {
940 cc.mu.Unlock()
941 return
942 }
943 delete(cc.conns, ac)
944 cc.mu.Unlock()
945 ac.tearDown(err)
946 }
947 948 // Target returns the target string of the ClientConn.
949 func (cc *ClientConn) Target() string {
950 return cc.target
951 }
952 953 // CanonicalTarget returns the canonical target string used when creating cc.
954 //
955 // This always has the form "<scheme>://[authority]/<endpoint>". For example:
956 //
957 // - "dns:///example.com:42"
958 // - "dns://8.8.8.8/example.com:42"
959 // - "unix:///path/to/socket"
960 func (cc *ClientConn) CanonicalTarget() string {
961 return cc.parsedTarget.String()
962 }
963 964 func (cc *ClientConn) incrCallsStarted() {
965 cc.channelz.ChannelMetrics.CallsStarted.Add(1)
966 cc.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
967 }
968 969 func (cc *ClientConn) incrCallsSucceeded() {
970 cc.channelz.ChannelMetrics.CallsSucceeded.Add(1)
971 }
972 973 func (cc *ClientConn) incrCallsFailed() {
974 cc.channelz.ChannelMetrics.CallsFailed.Add(1)
975 }
976 977 // connect starts creating a transport.
978 // It does nothing if the ac is not IDLE.
979 // TODO(bar) Move this to the addrConn section.
980 func (ac *addrConn) connect() error {
981 ac.mu.Lock()
982 if ac.state == connectivity.Shutdown {
983 if logger.V(2) {
984 logger.Infof("connect called on shutdown addrConn; ignoring.")
985 }
986 ac.mu.Unlock()
987 return errConnClosing
988 }
989 if ac.state != connectivity.Idle {
990 if logger.V(2) {
991 logger.Infof("connect called on addrConn in non-idle state (%v); ignoring.", ac.state)
992 }
993 ac.mu.Unlock()
994 return nil
995 }
996 997 ac.resetTransportAndUnlock()
998 return nil
999 }
1000 1001 // equalAddressIgnoringBalAttributes returns true is a and b are considered equal.
1002 // This is different from the Equal method on the resolver.Address type which
1003 // considers all fields to determine equality. Here, we only consider fields
1004 // that are meaningful to the subConn.
1005 func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
1006 return a.Addr == b.Addr && a.ServerName == b.ServerName &&
1007 a.Attributes.Equal(b.Attributes) &&
1008 a.Metadata == b.Metadata
1009 }
1010 1011 func equalAddressesIgnoringBalAttributes(a, b []resolver.Address) bool {
1012 return slices.EqualFunc(a, b, func(a, b resolver.Address) bool { return equalAddressIgnoringBalAttributes(&a, &b) })
1013 }
1014 1015 // updateAddrs updates ac.addrs with the new addresses list and handles active
1016 // connections or connection attempts.
1017 func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
1018 addrs = copyAddresses(addrs)
1019 limit := len(addrs)
1020 if limit > 5 {
1021 limit = 5
1022 }
1023 channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), addrs[:limit])
1024 1025 ac.mu.Lock()
1026 if equalAddressesIgnoringBalAttributes(ac.addrs, addrs) {
1027 ac.mu.Unlock()
1028 return
1029 }
1030 1031 ac.addrs = addrs
1032 ac.updateTelemetryLabelsLocked()
1033 if ac.state == connectivity.Shutdown ||
1034 ac.state == connectivity.TransientFailure ||
1035 ac.state == connectivity.Idle {
1036 // We were not connecting, so do nothing but update the addresses.
1037 ac.mu.Unlock()
1038 return
1039 }
1040 1041 if ac.state == connectivity.Ready {
1042 // Try to find the connected address.
1043 for _, a := range addrs {
1044 a.ServerName = ac.cc.getServerName(a)
1045 if equalAddressIgnoringBalAttributes(&a, &ac.curAddr) {
1046 // We are connected to a valid address, so do nothing but
1047 // update the addresses.
1048 ac.mu.Unlock()
1049 return
1050 }
1051 }
1052 }
1053 1054 // We are either connected to the wrong address or currently connecting.
1055 // Stop the current iteration and restart.
1056 1057 ac.cancel()
1058 ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)
1059 1060 // We have to defer here because GracefulClose => onClose, which requires
1061 // locking ac.mu.
1062 if ac.transport != nil {
1063 defer ac.transport.GracefulClose()
1064 ac.transport = nil
1065 }
1066 1067 if len(addrs) == 0 {
1068 ac.updateConnectivityState(connectivity.Idle, nil)
1069 }
1070 1071 // Since we were connecting/connected, we should start a new connection
1072 // attempt.
1073 go ac.resetTransportAndUnlock()
1074 }
1075 1076 // getServerName determines the serverName to be used in the connection
1077 // handshake. The default value for the serverName is the authority on the
1078 // ClientConn, which either comes from the user's dial target or through an
1079 // authority override specified using the WithAuthority dial option. Name
1080 // resolvers can specify a per-address override for the serverName through the
1081 // resolver.Address.ServerName field which is used only if the WithAuthority
1082 // dial option was not used. The rationale is that per-address authority
1083 // overrides specified by the name resolver can represent a security risk, while
1084 // an override specified by the user is more dependable since they probably know
1085 // what they are doing.
1086 func (cc *ClientConn) getServerName(addr resolver.Address) string {
1087 if cc.dopts.authority != "" {
1088 return cc.dopts.authority
1089 }
1090 if addr.ServerName != "" {
1091 return addr.ServerName
1092 }
1093 return cc.authority
1094 }
1095 1096 func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
1097 if sc == nil {
1098 return MethodConfig{}
1099 }
1100 if m, ok := sc.Methods[method]; ok {
1101 return m
1102 }
1103 i := strings.LastIndex(method, "/")
1104 if m, ok := sc.Methods[method[:i+1]]; ok {
1105 return m
1106 }
1107 return sc.Methods[""]
1108 }
1109 1110 // GetMethodConfig gets the method config of the input method.
1111 // If there's an exact match for input method (i.e. /service/method), we return
1112 // the corresponding MethodConfig.
1113 // If there isn't an exact match for the input method, we look for the service's default
1114 // config under the service (i.e /service/) and then for the default for all services (empty string).
1115 //
1116 // If there is a default MethodConfig for the service, we return it.
1117 // Otherwise, we return an empty MethodConfig.
1118 func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
1119 // TODO: Avoid the locking here.
1120 cc.mu.RLock()
1121 defer cc.mu.RUnlock()
1122 return getMethodConfig(cc.sc, method)
1123 }
1124 1125 func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
1126 cc.mu.RLock()
1127 defer cc.mu.RUnlock()
1128 if cc.sc == nil {
1129 return nil
1130 }
1131 return cc.sc.healthCheckConfig
1132 }
1133 1134 func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector) {
1135 if sc == nil {
1136 // should never reach here.
1137 return
1138 }
1139 cc.sc = sc
1140 if configSelector != nil {
1141 cc.safeConfigSelector.UpdateConfigSelector(configSelector)
1142 }
1143 1144 if cc.sc.retryThrottling != nil {
1145 newThrottler := &retryThrottler{
1146 tokens: cc.sc.retryThrottling.MaxTokens,
1147 max: cc.sc.retryThrottling.MaxTokens,
1148 thresh: cc.sc.retryThrottling.MaxTokens / 2,
1149 ratio: cc.sc.retryThrottling.TokenRatio,
1150 }
1151 cc.retryThrottler.Store(newThrottler)
1152 } else {
1153 cc.retryThrottler.Store((*retryThrottler)(nil))
1154 }
1155 }
1156 1157 func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
1158 cc.mu.RLock()
1159 cc.resolverWrapper.resolveNow(o)
1160 cc.mu.RUnlock()
1161 }
1162 1163 func (cc *ClientConn) resolveNowLocked(o resolver.ResolveNowOptions) {
1164 cc.resolverWrapper.resolveNow(o)
1165 }
1166 1167 // ResetConnectBackoff wakes up all subchannels in transient failure and causes
1168 // them to attempt another connection immediately. It also resets the backoff
1169 // times used for subsequent attempts regardless of the current state.
1170 //
1171 // In general, this function should not be used. Typical service or network
1172 // outages result in a reasonable client reconnection strategy by default.
1173 // However, if a previously unavailable network becomes available, this may be
1174 // used to trigger an immediate reconnect.
1175 //
1176 // # Experimental
1177 //
1178 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
1179 // later release.
1180 func (cc *ClientConn) ResetConnectBackoff() {
1181 cc.mu.Lock()
1182 conns := cc.conns
1183 cc.mu.Unlock()
1184 for ac := range conns {
1185 ac.resetConnectBackoff()
1186 }
1187 }
1188 1189 // Close tears down the ClientConn and all underlying connections.
1190 func (cc *ClientConn) Close() error {
1191 defer func() {
1192 cc.cancel()
1193 <-cc.csMgr.pubSub.Done()
1194 }()
1195 1196 // Prevent calls to enter/exit idle immediately, and ensure we are not
1197 // currently entering/exiting idle mode.
1198 cc.idlenessMgr.Close()
1199 1200 cc.mu.Lock()
1201 if cc.conns == nil {
1202 cc.mu.Unlock()
1203 return ErrClientConnClosing
1204 }
1205 1206 conns := cc.conns
1207 cc.conns = nil
1208 cc.csMgr.updateState(connectivity.Shutdown)
1209 1210 // We can safely unlock and continue to access all fields now as
1211 // cc.conns==nil, preventing any further operations on cc.
1212 cc.mu.Unlock()
1213 1214 cc.resolverWrapper.close()
1215 // The order of closing matters here since the balancer wrapper assumes the
1216 // picker is closed before it is closed.
1217 cc.pickerWrapper.close()
1218 cc.balancerWrapper.close()
1219 1220 <-cc.resolverWrapper.serializer.Done()
1221 <-cc.balancerWrapper.serializer.Done()
1222 var wg sync.WaitGroup
1223 for ac := range conns {
1224 wg.Add(1)
1225 go func(ac *addrConn) {
1226 defer wg.Done()
1227 ac.tearDown(ErrClientConnClosing)
1228 }(ac)
1229 }
1230 wg.Wait()
1231 cc.addTraceEvent("deleted")
1232 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
1233 // trace reference to the entity being deleted, and thus prevent it from being
1234 // deleted right away.
1235 channelz.RemoveEntry(cc.channelz.ID)
1236 1237 return nil
1238 }
1239 1240 // addrConn is a network connection to a given address.
1241 type addrConn struct {
1242 ctx context.Context
1243 cancel context.CancelFunc
1244 1245 cc *ClientConn
1246 dopts dialOptions
1247 acbw *acBalancerWrapper
1248 scopts balancer.NewSubConnOptions
1249 1250 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1251 // health checking may require server to report healthy to set ac to READY), and is reset
1252 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1253 // is received, transport is closed, ac has been torn down).
1254 transport transport.ClientTransport // The current transport.
1255 1256 // This mutex is used on the RPC path, so its usage should be minimized as
1257 // much as possible.
1258 // TODO: Find a lock-free way to retrieve the transport and state from the
1259 // addrConn.
1260 mu sync.Mutex
1261 curAddr resolver.Address // The current address.
1262 addrs []resolver.Address // All addresses that the resolver resolved to.
1263 1264 // Use updateConnectivityState for updating addrConn's connectivity state.
1265 state connectivity.State
1266 1267 backoffIdx int // Needs to be stateful for resetConnectBackoff.
1268 resetBackoff chan struct{}
1269 1270 channelz *channelz.SubChannel
1271 1272 localityLabel string
1273 backendServiceLabel string
1274 }
1275 1276 // Note: this requires a lock on ac.mu.
1277 func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1278 if ac.state == s {
1279 return
1280 }
1281 1282 // If we are transitioning out of Ready, it means there is a disconnection.
1283 // A SubConn can also transition from CONNECTING directly to IDLE when
1284 // a transport is successfully created, but the connection fails
1285 // before the SubConn can send the notification for READY. We treat
1286 // this as a successful connection and transition to IDLE.
1287 // TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second
1288 // part of the if condition below once the issue is fixed.
1289 if ac.state == connectivity.Ready || (ac.state == connectivity.Connecting && s == connectivity.Idle) {
1290 disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel, "unknown")
1291 openConnectionsMetric.Record(ac.cc.metricsRecorderList, -1, ac.cc.target, ac.backendServiceLabel, ac.securityLevelLocked(), ac.localityLabel)
1292 }
1293 ac.state = s
1294 ac.channelz.ChannelMetrics.State.Store(&s)
1295 if lastErr == nil {
1296 channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v", s)
1297 } else {
1298 channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
1299 }
1300 ac.acbw.updateState(s, ac.curAddr, lastErr)
1301 }
1302 1303 // adjustParams updates parameters used to create transports upon
1304 // receiving a GoAway.
1305 func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1306 if r == transport.GoAwayTooManyPings {
1307 v := 2 * ac.dopts.copts.KeepaliveParams.Time
1308 ac.cc.mu.Lock()
1309 if v > ac.cc.keepaliveParams.Time {
1310 ac.cc.keepaliveParams.Time = v
1311 }
1312 ac.cc.mu.Unlock()
1313 }
1314 }
1315 1316 // resetTransportAndUnlock unconditionally connects the addrConn.
1317 //
1318 // ac.mu must be held by the caller, and this function will guarantee it is released.
1319 func (ac *addrConn) resetTransportAndUnlock() {
1320 acCtx := ac.ctx
1321 if acCtx.Err() != nil {
1322 ac.mu.Unlock()
1323 return
1324 }
1325 1326 addrs := ac.addrs
1327 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1328 // This will be the duration that dial gets to finish.
1329 dialDuration := minConnectTimeout
1330 if ac.dopts.minConnectTimeout != nil {
1331 dialDuration = ac.dopts.minConnectTimeout()
1332 }
1333 1334 if dialDuration < backoffFor {
1335 // Give dial more time as we keep failing to connect.
1336 dialDuration = backoffFor
1337 }
1338 // We can potentially spend all the time trying the first address, and
1339 // if the server accepts the connection and then hangs, the following
1340 // addresses will never be tried.
1341 //
1342 // The spec doesn't mention what should be done for multiple addresses.
1343 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1344 connectDeadline := time.Now().Add(dialDuration)
1345 1346 ac.updateConnectivityState(connectivity.Connecting, nil)
1347 ac.mu.Unlock()
1348 1349 if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
1350 if !errors.Is(err, context.Canceled) {
1351 connectionAttemptsFailedMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel)
1352 } else {
1353 if logger.V(2) {
1354 // This records cancelled connection attempts which can be later
1355 // replaced by a metric.
1356 logger.Infof("Context cancellation detected; not recording this as a failed connection attempt.")
1357 }
1358 }
1359 // TODO: #7534 - Move re-resolution requests into the pick_first LB policy
1360 // to ensure one resolution request per pass instead of per subconn failure.
1361 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1362 ac.mu.Lock()
1363 if acCtx.Err() != nil {
1364 // addrConn was torn down.
1365 ac.mu.Unlock()
1366 return
1367 }
1368 // After exhausting all addresses, the addrConn enters
1369 // TRANSIENT_FAILURE.
1370 ac.updateConnectivityState(connectivity.TransientFailure, err)
1371 1372 // Backoff.
1373 b := ac.resetBackoff
1374 ac.mu.Unlock()
1375 1376 timer := time.NewTimer(backoffFor)
1377 select {
1378 case <-timer.C:
1379 ac.mu.Lock()
1380 ac.backoffIdx++
1381 ac.mu.Unlock()
1382 case <-b:
1383 timer.Stop()
1384 case <-acCtx.Done():
1385 timer.Stop()
1386 return
1387 }
1388 1389 ac.mu.Lock()
1390 if acCtx.Err() == nil {
1391 ac.updateConnectivityState(connectivity.Idle, err)
1392 }
1393 ac.mu.Unlock()
1394 return
1395 }
1396 // Success; reset backoff.
1397 ac.mu.Lock()
1398 connectionAttemptsSucceededMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel)
1399 openConnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.securityLevelLocked(), ac.localityLabel)
1400 ac.backoffIdx = 0
1401 ac.mu.Unlock()
1402 }
1403 1404 // updateTelemetryLabelsLocked calculates and caches the telemetry labels based on the
1405 // first address in addrConn.
1406 func (ac *addrConn) updateTelemetryLabelsLocked() {
1407 labelsFunc, ok := internal.AddressToTelemetryLabels.(func(resolver.Address) map[string]string)
1408 if !ok || len(ac.addrs) == 0 {
1409 // Reset defaults
1410 ac.localityLabel = ""
1411 ac.backendServiceLabel = ""
1412 return
1413 }
1414 labels := labelsFunc(ac.addrs[0])
1415 ac.localityLabel = labels["grpc.lb.locality"]
1416 ac.backendServiceLabel = labels["grpc.lb.backend_service"]
1417 }
1418 1419 type securityLevelKey struct{}
1420 1421 func (ac *addrConn) securityLevelLocked() string {
1422 var secLevel string
1423 // During disconnection, ac.transport is nil. Fall back to the security level
1424 // stored in the current address during connection.
1425 if ac.transport == nil {
1426 secLevel, _ = ac.curAddr.Attributes.Value(securityLevelKey{}).(string)
1427 return secLevel
1428 }
1429 authInfo := ac.transport.Peer().AuthInfo
1430 if ci, ok := authInfo.(interface {
1431 GetCommonAuthInfo() credentials.CommonAuthInfo
1432 }); ok {
1433 secLevel = ci.GetCommonAuthInfo().SecurityLevel.String()
1434 // Store the security level in the current address' attributes so
1435 // that it remains available for disconnection metrics after the
1436 // transport is closed.
1437 ac.curAddr.Attributes = ac.curAddr.Attributes.WithValue(securityLevelKey{}, secLevel)
1438 }
1439 return secLevel
1440 }
1441 1442 // tryAllAddrs tries to create a connection to the addresses, and stop when at
1443 // the first successful one. It returns an error if no address was successfully
1444 // connected, or updates ac appropriately with the new transport.
1445 func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
1446 var firstConnErr error
1447 for _, addr := range addrs {
1448 ac.channelz.ChannelMetrics.Target.Store(&addr.Addr)
1449 if ctx.Err() != nil {
1450 return errConnClosing
1451 }
1452 ac.mu.Lock()
1453 1454 ac.cc.mu.RLock()
1455 ac.dopts.copts.KeepaliveParams = ac.cc.keepaliveParams
1456 ac.cc.mu.RUnlock()
1457 1458 copts := ac.dopts.copts
1459 if ac.scopts.CredsBundle != nil {
1460 copts.CredsBundle = ac.scopts.CredsBundle
1461 }
1462 ac.mu.Unlock()
1463 1464 channelz.Infof(logger, ac.channelz, "Subchannel picks a new address %q to connect", addr.Addr)
1465 1466 err := ac.createTransport(ctx, addr, copts, connectDeadline)
1467 if err == nil {
1468 return nil
1469 }
1470 if firstConnErr == nil {
1471 firstConnErr = err
1472 }
1473 ac.cc.updateConnectionError(err)
1474 }
1475 1476 // Couldn't connect to any address.
1477 return firstConnErr
1478 }
1479 1480 // createTransport creates a connection to addr. It returns an error if the
1481 // address was not successfully connected, or updates ac appropriately with the
1482 // new transport.
1483 func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
1484 addr.ServerName = ac.cc.getServerName(addr)
1485 hctx, hcancel := context.WithCancel(ctx)
1486 1487 onClose := func(r transport.GoAwayReason) {
1488 ac.mu.Lock()
1489 defer ac.mu.Unlock()
1490 // adjust params based on GoAwayReason
1491 ac.adjustParams(r)
1492 if ctx.Err() != nil {
1493 // Already shut down or connection attempt canceled. tearDown() or
1494 // updateAddrs() already cleared the transport and canceled hctx
1495 // via ac.ctx, and we expected this connection to be closed, so do
1496 // nothing here.
1497 return
1498 }
1499 hcancel()
1500 if ac.transport == nil {
1501 // We're still connecting to this address, which could error. Do
1502 // not update the connectivity state or resolve; these will happen
1503 // at the end of the tryAllAddrs connection loop in the event of an
1504 // error.
1505 return
1506 }
1507 ac.transport = nil
1508 // Refresh the name resolver on any connection loss.
1509 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1510 // Always go idle and wait for the LB policy to initiate a new
1511 // connection attempt.
1512 ac.updateConnectivityState(connectivity.Idle, nil)
1513 }
1514 1515 connectCtx, cancel := context.WithDeadline(ctx, connectDeadline)
1516 defer cancel()
1517 copts.ChannelzParent = ac.channelz
1518 1519 newTr, err := transport.NewHTTP2Client(connectCtx, ac.cc.ctx, addr, copts, onClose)
1520 if err != nil {
1521 if logger.V(2) {
1522 logger.Infof("Creating new client transport to %q: %v", addr, err)
1523 }
1524 // newTr is either nil, or closed.
1525 hcancel()
1526 channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
1527 return err
1528 }
1529 1530 ac.mu.Lock()
1531 defer ac.mu.Unlock()
1532 if ctx.Err() != nil {
1533 // This can happen if the subConn was removed while in `Connecting`
1534 // state. tearDown() would have set the state to `Shutdown`, but
1535 // would not have closed the transport since ac.transport would not
1536 // have been set at that point.
1537 //
1538 // We run this in a goroutine because newTr.Close() calls onClose()
1539 // inline, which requires locking ac.mu.
1540 //
1541 // The error we pass to Close() is immaterial since there are no open
1542 // streams at this point, so no trailers with error details will be sent
1543 // out. We just need to pass a non-nil error.
1544 //
1545 // This can also happen when updateAddrs is called during a connection
1546 // attempt.
1547 go newTr.Close(transport.ErrConnClosing)
1548 return nil
1549 }
1550 if hctx.Err() != nil {
1551 // onClose was already called for this connection, but the connection
1552 // was successfully established first. Consider it a success and set
1553 // the new state to Idle.
1554 ac.updateConnectivityState(connectivity.Idle, nil)
1555 return nil
1556 }
1557 ac.curAddr = addr
1558 ac.transport = newTr
1559 ac.startHealthCheck(hctx) // Will set state to READY if appropriate.
1560 return nil
1561 }
1562 1563 // startHealthCheck starts the health checking stream (RPC) to watch the health
1564 // stats of this connection if health checking is requested and configured.
1565 //
1566 // LB channel health checking is enabled when all requirements below are met:
1567 // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1568 // 2. internal.HealthCheckFunc is set by importing the grpc/health package
1569 // 3. a service config with non-empty healthCheckConfig field is provided
1570 // 4. the load balancer requests it
1571 //
1572 // It sets addrConn to READY if the health checking stream is not started.
1573 //
1574 // Caller must hold ac.mu.
1575 func (ac *addrConn) startHealthCheck(ctx context.Context) {
1576 var healthcheckManagingState bool
1577 defer func() {
1578 if !healthcheckManagingState {
1579 ac.updateConnectivityState(connectivity.Ready, nil)
1580 }
1581 }()
1582 1583 if ac.cc.dopts.disableHealthCheck {
1584 return
1585 }
1586 healthCheckConfig := ac.cc.healthCheckConfig()
1587 if healthCheckConfig == nil {
1588 return
1589 }
1590 if !ac.scopts.HealthCheckEnabled {
1591 return
1592 }
1593 healthCheckFunc := internal.HealthCheckFunc
1594 if healthCheckFunc == nil {
1595 // The health package is not imported to set health check function.
1596 //
1597 // TODO: add a link to the health check doc in the error message.
1598 channelz.Error(logger, ac.channelz, "Health check is requested but health check function is not set.")
1599 return
1600 }
1601 1602 healthcheckManagingState = true
1603 1604 // Set up the health check helper functions.
1605 currentTr := ac.transport
1606 newStream := func(method string) (any, error) {
1607 ac.mu.Lock()
1608 if ac.transport != currentTr {
1609 ac.mu.Unlock()
1610 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1611 }
1612 ac.mu.Unlock()
1613 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1614 }
1615 setConnectivityState := func(s connectivity.State, lastErr error) {
1616 ac.mu.Lock()
1617 defer ac.mu.Unlock()
1618 if ac.transport != currentTr {
1619 return
1620 }
1621 ac.updateConnectivityState(s, lastErr)
1622 }
1623 // Start the health checking stream.
1624 go func() {
1625 err := healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1626 if err != nil {
1627 if status.Code(err) == codes.Unimplemented {
1628 channelz.Error(logger, ac.channelz, "Subchannel health check is unimplemented at server side, thus health check is disabled")
1629 } else {
1630 channelz.Errorf(logger, ac.channelz, "Health checking failed: %v", err)
1631 }
1632 }
1633 }()
1634 }
1635 1636 func (ac *addrConn) resetConnectBackoff() {
1637 ac.mu.Lock()
1638 close(ac.resetBackoff)
1639 ac.backoffIdx = 0
1640 ac.resetBackoff = make(chan struct{})
1641 ac.mu.Unlock()
1642 }
1643 1644 // getReadyTransport returns the transport if ac's state is READY or nil if not.
1645 func (ac *addrConn) getReadyTransport() transport.ClientTransport {
1646 ac.mu.Lock()
1647 defer ac.mu.Unlock()
1648 if ac.state == connectivity.Ready {
1649 return ac.transport
1650 }
1651 return nil
1652 }
1653 1654 // tearDown starts to tear down the addrConn.
1655 //
1656 // Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
1657 // will leak. In most cases, call cc.removeAddrConn() instead.
1658 func (ac *addrConn) tearDown(err error) {
1659 ac.mu.Lock()
1660 if ac.state == connectivity.Shutdown {
1661 ac.mu.Unlock()
1662 return
1663 }
1664 curTr := ac.transport
1665 ac.transport = nil
1666 // We have to set the state to Shutdown before anything else to prevent races
1667 // between setting the state and logic that waits on context cancellation / etc.
1668 ac.updateConnectivityState(connectivity.Shutdown, nil)
1669 ac.cancel()
1670 ac.curAddr = resolver.Address{}
1671 1672 channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
1673 Desc: "Subchannel deleted",
1674 Severity: channelz.CtInfo,
1675 Parent: &channelz.TraceEvent{
1676 Desc: fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelz.ID),
1677 Severity: channelz.CtInfo,
1678 },
1679 })
1680 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
1681 // trace reference to the entity being deleted, and thus prevent it from
1682 // being deleted right away.
1683 channelz.RemoveEntry(ac.channelz.ID)
1684 ac.mu.Unlock()
1685 1686 // We have to release the lock before the call to GracefulClose/Close here
1687 // because both of them call onClose(), which requires locking ac.mu.
1688 if curTr != nil {
1689 if err == errConnDrain {
1690 // Close the transport gracefully when the subConn is being shutdown.
1691 //
1692 // GracefulClose() may be executed multiple times if:
1693 // - multiple GoAway frames are received from the server
1694 // - there are concurrent name resolver or balancer triggered
1695 // address removal and GoAway
1696 curTr.GracefulClose()
1697 } else {
1698 // Hard close the transport when the channel is entering idle or is
1699 // being shutdown. In the case where the channel is being shutdown,
1700 // closing of transports is also taken care of by cancellation of cc.ctx.
1701 // But in the case where the channel is entering idle, we need to
1702 // explicitly close the transports here. Instead of distinguishing
1703 // between these two cases, it is simpler to close the transport
1704 // unconditionally here.
1705 curTr.Close(err)
1706 }
1707 }
1708 }
1709 1710 type retryThrottler struct {
1711 max float64
1712 thresh float64
1713 ratio float64
1714 1715 mu sync.Mutex
1716 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1717 }
1718 1719 // throttle subtracts a retry token from the pool and returns whether a retry
1720 // should be throttled (disallowed) based upon the retry throttling policy in
1721 // the service config.
1722 func (rt *retryThrottler) throttle() bool {
1723 if rt == nil {
1724 return false
1725 }
1726 rt.mu.Lock()
1727 defer rt.mu.Unlock()
1728 rt.tokens--
1729 if rt.tokens < 0 {
1730 rt.tokens = 0
1731 }
1732 return rt.tokens <= rt.thresh
1733 }
1734 1735 func (rt *retryThrottler) successfulRPC() {
1736 if rt == nil {
1737 return
1738 }
1739 rt.mu.Lock()
1740 defer rt.mu.Unlock()
1741 rt.tokens += rt.ratio
1742 if rt.tokens > rt.max {
1743 rt.tokens = rt.max
1744 }
1745 }
1746 1747 func (ac *addrConn) incrCallsStarted() {
1748 ac.channelz.ChannelMetrics.CallsStarted.Add(1)
1749 ac.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
1750 }
1751 1752 func (ac *addrConn) incrCallsSucceeded() {
1753 ac.channelz.ChannelMetrics.CallsSucceeded.Add(1)
1754 }
1755 1756 func (ac *addrConn) incrCallsFailed() {
1757 ac.channelz.ChannelMetrics.CallsFailed.Add(1)
1758 }
1759 1760 // ErrClientConnTimeout indicates that the ClientConn cannot establish the
1761 // underlying connections within the specified timeout.
1762 //
1763 // Deprecated: This error is never returned by grpc and should not be
1764 // referenced by users.
1765 var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1766 1767 // getResolver finds the scheme in the cc's resolvers or the global registry.
1768 // scheme should always be lowercase (typically by virtue of url.Parse()
1769 // performing proper RFC3986 behavior).
1770 func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1771 for _, rb := range cc.dopts.resolvers {
1772 if scheme == rb.Scheme() {
1773 return rb
1774 }
1775 }
1776 return resolver.Get(scheme)
1777 }
1778 1779 func (cc *ClientConn) updateConnectionError(err error) {
1780 cc.lceMu.Lock()
1781 cc.lastConnectionError = err
1782 cc.lceMu.Unlock()
1783 }
1784 1785 func (cc *ClientConn) connectionError() error {
1786 cc.lceMu.Lock()
1787 defer cc.lceMu.Unlock()
1788 return cc.lastConnectionError
1789 }
1790 1791 // initParsedTargetAndResolverBuilder parses the user's dial target and stores
1792 // the parsed target in `cc.parsedTarget`.
1793 //
1794 // The resolver to use is determined based on the scheme in the parsed target
1795 // and the same is stored in `cc.resolverBuilder`.
1796 //
1797 // Doesn't grab cc.mu as this method is expected to be called only at Dial time.
1798 func (cc *ClientConn) initParsedTargetAndResolverBuilder() error {
1799 logger.Infof("original dial target is: %q", cc.target)
1800 1801 var rb resolver.Builder
1802 parsedTarget, err := parseTarget(cc.target)
1803 if err == nil {
1804 rb = cc.getResolver(parsedTarget.URL.Scheme)
1805 if rb != nil {
1806 cc.parsedTarget = parsedTarget
1807 cc.resolverBuilder = rb
1808 return nil
1809 }
1810 }
1811 1812 // We are here because the user's dial target did not contain a scheme or
1813 // specified an unregistered scheme. We should fallback to the default
1814 // scheme, except when a custom dialer is specified in which case, we should
1815 // always use passthrough scheme. For either case, we need to respect any overridden
1816 // global defaults set by the user.
1817 defScheme := cc.dopts.defaultScheme
1818 if internal.UserSetDefaultScheme {
1819 defScheme = resolver.GetDefaultScheme()
1820 }
1821 1822 canonicalTarget := defScheme + ":///" + cc.target
1823 1824 parsedTarget, err = parseTarget(canonicalTarget)
1825 if err != nil {
1826 return err
1827 }
1828 rb = cc.getResolver(parsedTarget.URL.Scheme)
1829 if rb == nil {
1830 return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
1831 }
1832 cc.parsedTarget = parsedTarget
1833 cc.resolverBuilder = rb
1834 return nil
1835 }
1836 1837 // parseTarget uses RFC 3986 semantics to parse the given target into a
1838 // resolver.Target struct containing url. Query params are stripped from the
1839 // endpoint.
1840 func parseTarget(target string) (resolver.Target, error) {
1841 u, err := url.Parse(target)
1842 if err != nil {
1843 return resolver.Target{}, err
1844 }
1845 1846 return resolver.Target{URL: *u}, nil
1847 }
1848 1849 // encodeAuthority escapes the authority string based on valid chars defined in
1850 // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
1851 func encodeAuthority(authority string) string {
1852 const upperhex = "0123456789ABCDEF"
1853 1854 // Return for characters that must be escaped as per
1855 // Valid chars are mentioned here:
1856 // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2
1857 shouldEscape := func(c byte) bool {
1858 // Alphanum are always allowed.
1859 if 'a' <= c && c <= 'z' || 'A' <= c && c <= 'Z' || '0' <= c && c <= '9' {
1860 return false
1861 }
1862 switch c {
1863 case '-', '_', '.', '~': // Unreserved characters
1864 return false
1865 case '!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=': // Subdelim characters
1866 return false
1867 case ':', '[', ']', '@': // Authority related delimiters
1868 return false
1869 }
1870 // Everything else must be escaped.
1871 return true
1872 }
1873 1874 hexCount := 0
1875 for i := 0; i < len(authority); i++ {
1876 c := authority[i]
1877 if shouldEscape(c) {
1878 hexCount++
1879 }
1880 }
1881 1882 if hexCount == 0 {
1883 return authority
1884 }
1885 1886 required := len(authority) + 2*hexCount
1887 t := make([]byte, required)
1888 1889 j := 0
1890 // This logic is a barebones version of escape in the go net/url library.
1891 for i := 0; i < len(authority); i++ {
1892 switch c := authority[i]; {
1893 case shouldEscape(c):
1894 t[j] = '%'
1895 t[j+1] = upperhex[c>>4]
1896 t[j+2] = upperhex[c&15]
1897 j += 3
1898 default:
1899 t[j] = authority[i]
1900 j++
1901 }
1902 }
1903 return string(t)
1904 }
1905 1906 // Determine channel authority. The order of precedence is as follows:
1907 // - user specified authority override using `WithAuthority` dial option
1908 // - creds' notion of server name for the authentication handshake
1909 // - endpoint from dial target of the form "scheme://[authority]/endpoint"
1910 //
1911 // Stores the determined authority in `cc.authority`.
1912 //
1913 // Returns a non-nil error if the authority returned by the transport
1914 // credentials do not match the authority configured through the dial option.
1915 //
1916 // Doesn't grab cc.mu as this method is expected to be called only at Dial time.
1917 func (cc *ClientConn) initAuthority() error {
1918 dopts := cc.dopts
1919 // Historically, we had two options for users to specify the serverName or
1920 // authority for a channel. One was through the transport credentials
1921 // (either in its constructor, or through the OverrideServerName() method).
1922 // The other option (for cases where WithInsecure() dial option was used)
1923 // was to use the WithAuthority() dial option.
1924 //
1925 // A few things have changed since:
1926 // - `insecure` package with an implementation of the `TransportCredentials`
1927 // interface for the insecure case
1928 // - WithAuthority() dial option support for secure credentials
1929 authorityFromCreds := ""
1930 if creds := dopts.copts.TransportCredentials; creds != nil && creds.Info().ServerName != "" {
1931 authorityFromCreds = creds.Info().ServerName
1932 }
1933 authorityFromDialOption := dopts.authority
1934 if (authorityFromCreds != "" && authorityFromDialOption != "") && authorityFromCreds != authorityFromDialOption {
1935 return fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption)
1936 }
1937 1938 endpoint := cc.parsedTarget.Endpoint()
1939 if authorityFromDialOption != "" {
1940 cc.authority = authorityFromDialOption
1941 } else if authorityFromCreds != "" {
1942 cc.authority = authorityFromCreds
1943 } else if auth, ok := cc.resolverBuilder.(resolver.AuthorityOverrider); ok {
1944 cc.authority = auth.OverrideAuthority(cc.parsedTarget)
1945 } else if strings.HasPrefix(endpoint, ":") {
1946 cc.authority = "localhost" + encodeAuthority(endpoint)
1947 } else {
1948 cc.authority = encodeAuthority(endpoint)
1949 }
1950 return nil
1951 }
1952