balancer_wrapper.go raw

   1  /*
   2   *
   3   * Copyright 2017 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package grpc
  20  
  21  import (
  22  	"context"
  23  	"fmt"
  24  	"sync"
  25  
  26  	"google.golang.org/grpc/balancer"
  27  	"google.golang.org/grpc/codes"
  28  	"google.golang.org/grpc/connectivity"
  29  	"google.golang.org/grpc/experimental/stats"
  30  	"google.golang.org/grpc/internal"
  31  	"google.golang.org/grpc/internal/balancer/gracefulswitch"
  32  	"google.golang.org/grpc/internal/channelz"
  33  	"google.golang.org/grpc/internal/grpcsync"
  34  	"google.golang.org/grpc/resolver"
  35  	"google.golang.org/grpc/status"
  36  )
  37  
  38  var (
  39  	setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
  40  	// noOpRegisterHealthListenerFn is used when client side health checking is
  41  	// disabled. It sends a single READY update on the registered listener.
  42  	noOpRegisterHealthListenerFn = func(_ context.Context, listener func(balancer.SubConnState)) func() {
  43  		listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
  44  		return func() {}
  45  	}
  46  )
  47  
  48  // ccBalancerWrapper sits between the ClientConn and the Balancer.
  49  //
  50  // ccBalancerWrapper implements methods corresponding to the ones on the
  51  // balancer.Balancer interface. The ClientConn is free to call these methods
  52  // concurrently and the ccBalancerWrapper ensures that calls from the ClientConn
  53  // to the Balancer happen in order by performing them in the serializer, without
  54  // any mutexes held.
  55  //
  56  // ccBalancerWrapper also implements the balancer.ClientConn interface and is
  57  // passed to the Balancer implementations. It invokes unexported methods on the
  58  // ClientConn to handle these calls from the Balancer.
  59  //
  60  // It uses the gracefulswitch.Balancer internally to ensure that balancer
  61  // switches happen in a graceful manner.
  62  type ccBalancerWrapper struct {
  63  	internal.EnforceClientConnEmbedding
  64  	// The following fields are initialized when the wrapper is created and are
  65  	// read-only afterwards, and therefore can be accessed without a mutex.
  66  	cc               *ClientConn
  67  	opts             balancer.BuildOptions
  68  	serializer       *grpcsync.CallbackSerializer
  69  	serializerCancel context.CancelFunc
  70  
  71  	// The following fields are only accessed within the serializer or during
  72  	// initialization.
  73  	curBalancerName string
  74  	balancer        *gracefulswitch.Balancer
  75  
  76  	// The following field is protected by mu.  Caller must take cc.mu before
  77  	// taking mu.
  78  	mu     sync.Mutex
  79  	closed bool
  80  }
  81  
  82  // newCCBalancerWrapper creates a new balancer wrapper in idle state. The
  83  // underlying balancer is not created until the updateClientConnState() method
  84  // is invoked.
  85  func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
  86  	ctx, cancel := context.WithCancel(cc.ctx)
  87  	ccb := &ccBalancerWrapper{
  88  		cc: cc,
  89  		opts: balancer.BuildOptions{
  90  			DialCreds:       cc.dopts.copts.TransportCredentials,
  91  			CredsBundle:     cc.dopts.copts.CredsBundle,
  92  			Dialer:          cc.dopts.copts.Dialer,
  93  			Authority:       cc.authority,
  94  			CustomUserAgent: cc.dopts.copts.UserAgent,
  95  			ChannelzParent:  cc.channelz,
  96  			Target:          cc.parsedTarget,
  97  		},
  98  		serializer:       grpcsync.NewCallbackSerializer(ctx),
  99  		serializerCancel: cancel,
 100  	}
 101  	ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
 102  	return ccb
 103  }
 104  
 105  func (ccb *ccBalancerWrapper) MetricsRecorder() stats.MetricsRecorder {
 106  	return ccb.cc.metricsRecorderList
 107  }
 108  
 109  // updateClientConnState is invoked by grpc to push a ClientConnState update to
 110  // the underlying balancer.  This is always executed from the serializer, so
 111  // it is safe to call into the balancer here.
 112  func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
 113  	errCh := make(chan error)
 114  	uccs := func(ctx context.Context) {
 115  		defer close(errCh)
 116  		if ctx.Err() != nil || ccb.balancer == nil {
 117  			return
 118  		}
 119  		name := gracefulswitch.ChildName(ccs.BalancerConfig)
 120  		if ccb.curBalancerName != name {
 121  			ccb.curBalancerName = name
 122  			channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name)
 123  		}
 124  		err := ccb.balancer.UpdateClientConnState(*ccs)
 125  		if logger.V(2) && err != nil {
 126  			logger.Infof("error from balancer.UpdateClientConnState: %v", err)
 127  		}
 128  		errCh <- err
 129  	}
 130  	onFailure := func() { close(errCh) }
 131  
 132  	// UpdateClientConnState can race with Close, and when the latter wins, the
 133  	// serializer is closed, and the attempt to schedule the callback will fail.
 134  	// It is acceptable to ignore this failure. But since we want to handle the
 135  	// state update in a blocking fashion (when we successfully schedule the
 136  	// callback), we have to use the ScheduleOr method and not the MaybeSchedule
 137  	// method on the serializer.
 138  	ccb.serializer.ScheduleOr(uccs, onFailure)
 139  	return <-errCh
 140  }
 141  
 142  // resolverError is invoked by grpc to push a resolver error to the underlying
 143  // balancer.  The call to the balancer is executed from the serializer.
 144  func (ccb *ccBalancerWrapper) resolverError(err error) {
 145  	ccb.serializer.TrySchedule(func(ctx context.Context) {
 146  		if ctx.Err() != nil || ccb.balancer == nil {
 147  			return
 148  		}
 149  		ccb.balancer.ResolverError(err)
 150  	})
 151  }
 152  
 153  // close initiates async shutdown of the wrapper.  cc.mu must be held when
 154  // calling this function.  To determine the wrapper has finished shutting down,
 155  // the channel should block on ccb.serializer.Done() without cc.mu held.
 156  func (ccb *ccBalancerWrapper) close() {
 157  	ccb.mu.Lock()
 158  	ccb.closed = true
 159  	ccb.mu.Unlock()
 160  	channelz.Info(logger, ccb.cc.channelz, "ccBalancerWrapper: closing")
 161  	ccb.serializer.TrySchedule(func(context.Context) {
 162  		if ccb.balancer == nil {
 163  			return
 164  		}
 165  		ccb.balancer.Close()
 166  		ccb.balancer = nil
 167  	})
 168  	ccb.serializerCancel()
 169  }
 170  
 171  // exitIdle invokes the balancer's exitIdle method in the serializer.
 172  func (ccb *ccBalancerWrapper) exitIdle() {
 173  	ccb.serializer.TrySchedule(func(ctx context.Context) {
 174  		if ctx.Err() != nil || ccb.balancer == nil {
 175  			return
 176  		}
 177  		ccb.balancer.ExitIdle()
 178  	})
 179  }
 180  
 181  func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
 182  	ccb.cc.mu.Lock()
 183  	defer ccb.cc.mu.Unlock()
 184  
 185  	ccb.mu.Lock()
 186  	if ccb.closed {
 187  		ccb.mu.Unlock()
 188  		return nil, fmt.Errorf("balancer is being closed; no new SubConns allowed")
 189  	}
 190  	ccb.mu.Unlock()
 191  
 192  	if len(addrs) == 0 {
 193  		return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
 194  	}
 195  	ac, err := ccb.cc.newAddrConnLocked(addrs, opts)
 196  	if err != nil {
 197  		channelz.Warningf(logger, ccb.cc.channelz, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
 198  		return nil, err
 199  	}
 200  	acbw := &acBalancerWrapper{
 201  		ccb:           ccb,
 202  		ac:            ac,
 203  		producers:     make(map[balancer.ProducerBuilder]*refCountedProducer),
 204  		stateListener: opts.StateListener,
 205  		healthData:    newHealthData(connectivity.Idle),
 206  	}
 207  	ac.acbw = acbw
 208  	return acbw, nil
 209  }
 210  
 211  func (ccb *ccBalancerWrapper) RemoveSubConn(balancer.SubConn) {
 212  	// The graceful switch balancer will never call this.
 213  	logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc")
 214  }
 215  
 216  func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
 217  	acbw, ok := sc.(*acBalancerWrapper)
 218  	if !ok {
 219  		return
 220  	}
 221  	acbw.UpdateAddresses(addrs)
 222  }
 223  
 224  func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
 225  	ccb.cc.mu.Lock()
 226  	defer ccb.cc.mu.Unlock()
 227  	if ccb.cc.conns == nil {
 228  		// The CC has been closed; ignore this update.
 229  		return
 230  	}
 231  
 232  	ccb.mu.Lock()
 233  	if ccb.closed {
 234  		ccb.mu.Unlock()
 235  		return
 236  	}
 237  	ccb.mu.Unlock()
 238  	// Update picker before updating state.  Even though the ordering here does
 239  	// not matter, it can lead to multiple calls of Pick in the common start-up
 240  	// case where we wait for ready and then perform an RPC.  If the picker is
 241  	// updated later, we could call the "connecting" picker when the state is
 242  	// updated, and then call the "ready" picker after the picker gets updated.
 243  
 244  	// Note that there is no need to check if the balancer wrapper was closed,
 245  	// as we know the graceful switch LB policy will not call cc if it has been
 246  	// closed.
 247  	ccb.cc.pickerWrapper.updatePicker(s.Picker)
 248  	ccb.cc.csMgr.updateState(s.ConnectivityState)
 249  }
 250  
 251  func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
 252  	ccb.cc.mu.RLock()
 253  	defer ccb.cc.mu.RUnlock()
 254  
 255  	ccb.mu.Lock()
 256  	if ccb.closed {
 257  		ccb.mu.Unlock()
 258  		return
 259  	}
 260  	ccb.mu.Unlock()
 261  	ccb.cc.resolveNowLocked(o)
 262  }
 263  
 264  func (ccb *ccBalancerWrapper) Target() string {
 265  	return ccb.cc.target
 266  }
 267  
 268  // acBalancerWrapper is a wrapper on top of ac for balancers.
 269  // It implements balancer.SubConn interface.
 270  type acBalancerWrapper struct {
 271  	internal.EnforceSubConnEmbedding
 272  	ac            *addrConn          // read-only
 273  	ccb           *ccBalancerWrapper // read-only
 274  	stateListener func(balancer.SubConnState)
 275  
 276  	producersMu sync.Mutex
 277  	producers   map[balancer.ProducerBuilder]*refCountedProducer
 278  
 279  	// Access to healthData is protected by healthMu.
 280  	healthMu sync.Mutex
 281  	// healthData is stored as a pointer to detect when the health listener is
 282  	// dropped or updated. This is required as closures can't be compared for
 283  	// equality.
 284  	healthData *healthData
 285  }
 286  
 287  // healthData holds data related to health state reporting.
 288  type healthData struct {
 289  	// connectivityState stores the most recent connectivity state delivered
 290  	// to the LB policy. This is stored to avoid sending updates when the
 291  	// SubConn has already exited connectivity state READY.
 292  	connectivityState connectivity.State
 293  	// closeHealthProducer stores function to close the ref counted health
 294  	// producer. The health producer is automatically closed when the SubConn
 295  	// state changes.
 296  	closeHealthProducer func()
 297  }
 298  
 299  func newHealthData(s connectivity.State) *healthData {
 300  	return &healthData{
 301  		connectivityState:   s,
 302  		closeHealthProducer: func() {},
 303  	}
 304  }
 305  
 306  // updateState is invoked by grpc to push a subConn state update to the
 307  // underlying balancer.
 308  func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolver.Address, err error) {
 309  	acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
 310  		if ctx.Err() != nil || acbw.ccb.balancer == nil {
 311  			return
 312  		}
 313  		// Invalidate all producers on any state change.
 314  		acbw.closeProducers()
 315  
 316  		// Even though it is optional for balancers, gracefulswitch ensures
 317  		// opts.StateListener is set, so this cannot ever be nil.
 318  		// TODO: delete this comment when UpdateSubConnState is removed.
 319  		scs := balancer.SubConnState{ConnectivityState: s, ConnectionError: err}
 320  		if s == connectivity.Ready {
 321  			setConnectedAddress(&scs, curAddr)
 322  		}
 323  		// Invalidate the health listener by updating the healthData.
 324  		acbw.healthMu.Lock()
 325  		// A race may occur if a health listener is registered soon after the
 326  		// connectivity state is set but before the stateListener is called.
 327  		// Two cases may arise:
 328  		// 1. The new state is not READY: RegisterHealthListener has checks to
 329  		//    ensure no updates are sent when the connectivity state is not
 330  		//    READY.
 331  		// 2. The new state is READY: This means that the old state wasn't Ready.
 332  		//    The RegisterHealthListener API mentions that a health listener
 333  		//    must not be registered when a SubConn is not ready to avoid such
 334  		//    races. When this happens, the LB policy would get health updates
 335  		//    on the old listener. When the LB policy registers a new listener
 336  		//    on receiving the connectivity update, the health updates will be
 337  		//    sent to the new health listener.
 338  		acbw.healthData = newHealthData(scs.ConnectivityState)
 339  		acbw.healthMu.Unlock()
 340  
 341  		acbw.stateListener(scs)
 342  	})
 343  }
 344  
 345  func (acbw *acBalancerWrapper) String() string {
 346  	return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelz.ID)
 347  }
 348  
 349  func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
 350  	acbw.ac.updateAddrs(addrs)
 351  }
 352  
 353  func (acbw *acBalancerWrapper) Connect() {
 354  	go acbw.ac.connect()
 355  }
 356  
 357  func (acbw *acBalancerWrapper) Shutdown() {
 358  	acbw.closeProducers()
 359  	acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
 360  }
 361  
 362  // NewStream begins a streaming RPC on the addrConn.  If the addrConn is not
 363  // ready, blocks until it is or ctx expires.  Returns an error when the context
 364  // expires or the addrConn is shut down.
 365  func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
 366  	transport := acbw.ac.getReadyTransport()
 367  	if transport == nil {
 368  		return nil, status.Errorf(codes.Unavailable, "SubConn state is not Ready")
 369  
 370  	}
 371  	return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)
 372  }
 373  
 374  // Invoke performs a unary RPC.  If the addrConn is not ready, returns
 375  // errSubConnNotReady.
 376  func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error {
 377  	cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...)
 378  	if err != nil {
 379  		return err
 380  	}
 381  	if err := cs.SendMsg(args); err != nil {
 382  		return err
 383  	}
 384  	return cs.RecvMsg(reply)
 385  }
 386  
 387  type refCountedProducer struct {
 388  	producer balancer.Producer
 389  	refs     int    // number of current refs to the producer
 390  	close    func() // underlying producer's close function
 391  }
 392  
 393  func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) {
 394  	acbw.producersMu.Lock()
 395  	defer acbw.producersMu.Unlock()
 396  
 397  	// Look up existing producer from this builder.
 398  	pData := acbw.producers[pb]
 399  	if pData == nil {
 400  		// Not found; create a new one and add it to the producers map.
 401  		p, closeFn := pb.Build(acbw)
 402  		pData = &refCountedProducer{producer: p, close: closeFn}
 403  		acbw.producers[pb] = pData
 404  	}
 405  	// Account for this new reference.
 406  	pData.refs++
 407  
 408  	// Return a cleanup function wrapped in a OnceFunc to remove this reference
 409  	// and delete the refCountedProducer from the map if the total reference
 410  	// count goes to zero.
 411  	unref := func() {
 412  		acbw.producersMu.Lock()
 413  		// If closeProducers has already closed this producer instance, refs is
 414  		// set to 0, so the check after decrementing will never pass, and the
 415  		// producer will not be double-closed.
 416  		pData.refs--
 417  		if pData.refs == 0 {
 418  			defer pData.close() // Run outside the acbw mutex
 419  			delete(acbw.producers, pb)
 420  		}
 421  		acbw.producersMu.Unlock()
 422  	}
 423  	return pData.producer, sync.OnceFunc(unref)
 424  }
 425  
 426  func (acbw *acBalancerWrapper) closeProducers() {
 427  	acbw.producersMu.Lock()
 428  	defer acbw.producersMu.Unlock()
 429  	for pb, pData := range acbw.producers {
 430  		pData.refs = 0
 431  		pData.close()
 432  		delete(acbw.producers, pb)
 433  	}
 434  }
 435  
 436  // healthProducerRegisterFn is a type alias for the health producer's function
 437  // for registering listeners.
 438  type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func()
 439  
 440  // healthListenerRegFn returns a function to register a listener for health
 441  // updates. If client side health checks are disabled, the registered listener
 442  // will get a single READY (raw connectivity state) update.
 443  //
 444  // Client side health checking is enabled when all the following
 445  // conditions are satisfied:
 446  // 1. Health checking is not disabled using the dial option.
 447  // 2. The health package is imported.
 448  // 3. The health check config is present in the service config.
 449  func (acbw *acBalancerWrapper) healthListenerRegFn() func(context.Context, func(balancer.SubConnState)) func() {
 450  	if acbw.ccb.cc.dopts.disableHealthCheck {
 451  		return noOpRegisterHealthListenerFn
 452  	}
 453  	cfg := acbw.ac.cc.healthCheckConfig()
 454  	if cfg == nil {
 455  		return noOpRegisterHealthListenerFn
 456  	}
 457  	regHealthLisFn := internal.RegisterClientHealthCheckListener
 458  	if regHealthLisFn == nil {
 459  		// The health package is not imported.
 460  		channelz.Error(logger, acbw.ac.channelz, "Health check is requested but health package is not imported.")
 461  		return noOpRegisterHealthListenerFn
 462  	}
 463  	return func(ctx context.Context, listener func(balancer.SubConnState)) func() {
 464  		return regHealthLisFn.(healthProducerRegisterFn)(ctx, acbw, cfg.ServiceName, listener)
 465  	}
 466  }
 467  
 468  // RegisterHealthListener accepts a health listener from the LB policy. It sends
 469  // updates to the health listener as long as the SubConn's connectivity state
 470  // doesn't change and a new health listener is not registered. To invalidate
 471  // the currently registered health listener, acbw updates the healthData. If a
 472  // nil listener is registered, the active health listener is dropped.
 473  func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) {
 474  	acbw.healthMu.Lock()
 475  	defer acbw.healthMu.Unlock()
 476  	acbw.healthData.closeHealthProducer()
 477  	// listeners should not be registered when the connectivity state
 478  	// isn't Ready. This may happen when the balancer registers a listener
 479  	// after the connectivityState is updated, but before it is notified
 480  	// of the update.
 481  	if acbw.healthData.connectivityState != connectivity.Ready {
 482  		return
 483  	}
 484  	// Replace the health data to stop sending updates to any previously
 485  	// registered health listeners.
 486  	hd := newHealthData(connectivity.Ready)
 487  	acbw.healthData = hd
 488  	if listener == nil {
 489  		return
 490  	}
 491  
 492  	registerFn := acbw.healthListenerRegFn()
 493  	acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
 494  		if ctx.Err() != nil || acbw.ccb.balancer == nil {
 495  			return
 496  		}
 497  		// Don't send updates if a new listener is registered.
 498  		acbw.healthMu.Lock()
 499  		defer acbw.healthMu.Unlock()
 500  		if acbw.healthData != hd {
 501  			return
 502  		}
 503  		// Serialize the health updates from the health producer with
 504  		// other calls into the LB policy.
 505  		listenerWrapper := func(scs balancer.SubConnState) {
 506  			acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
 507  				if ctx.Err() != nil || acbw.ccb.balancer == nil {
 508  					return
 509  				}
 510  				acbw.healthMu.Lock()
 511  				defer acbw.healthMu.Unlock()
 512  				if acbw.healthData != hd {
 513  					return
 514  				}
 515  				listener(scs)
 516  			})
 517  		}
 518  
 519  		hd.closeHealthProducer = registerFn(ctx, listenerWrapper)
 520  	})
 521  }
 522