gracefulswitch.go raw

   1  /*
   2   *
   3   * Copyright 2022 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  // Package gracefulswitch implements a graceful switch load balancer.
  20  package gracefulswitch
  21  
  22  import (
  23  	"errors"
  24  	"fmt"
  25  	"sync"
  26  
  27  	"google.golang.org/grpc/balancer"
  28  	"google.golang.org/grpc/balancer/base"
  29  	"google.golang.org/grpc/connectivity"
  30  	"google.golang.org/grpc/resolver"
  31  )
  32  
  33  var errBalancerClosed = errors.New("gracefulSwitchBalancer is closed")
  34  var _ balancer.Balancer = (*Balancer)(nil)
  35  
  36  // NewBalancer returns a graceful switch Balancer.
  37  func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions) *Balancer {
  38  	return &Balancer{
  39  		cc:    cc,
  40  		bOpts: opts,
  41  	}
  42  }
  43  
  44  // Balancer is a utility to gracefully switch from one balancer to
  45  // a new balancer. It implements the balancer.Balancer interface.
  46  type Balancer struct {
  47  	bOpts balancer.BuildOptions
  48  	cc    balancer.ClientConn
  49  
  50  	// mu protects the following fields and all fields within balancerCurrent
  51  	// and balancerPending. mu does not need to be held when calling into the
  52  	// child balancers, as all calls into these children happen only as a direct
  53  	// result of a call into the gracefulSwitchBalancer, which are also
  54  	// guaranteed to be synchronous. There is one exception: an UpdateState call
  55  	// from a child balancer when current and pending are populated can lead to
  56  	// calling Close() on the current. To prevent that racing with an
  57  	// UpdateSubConnState from the channel, we hold currentMu during Close and
  58  	// UpdateSubConnState calls.
  59  	mu              sync.Mutex
  60  	balancerCurrent *balancerWrapper
  61  	balancerPending *balancerWrapper
  62  	closed          bool // set to true when this balancer is closed
  63  
  64  	// currentMu must be locked before mu. This mutex guards against this
  65  	// sequence of events: UpdateSubConnState() called, finds the
  66  	// balancerCurrent, gives up lock, updateState comes in, causes Close() on
  67  	// balancerCurrent before the UpdateSubConnState is called on the
  68  	// balancerCurrent.
  69  	currentMu sync.Mutex
  70  
  71  	// activeGoroutines tracks all the goroutines that this balancer has started
  72  	// and that should be waited on when the balancer closes.
  73  	activeGoroutines sync.WaitGroup
  74  }
  75  
  76  // swap swaps out the current lb with the pending lb and updates the ClientConn.
  77  // The caller must hold gsb.mu.
  78  func (gsb *Balancer) swap() {
  79  	gsb.cc.UpdateState(gsb.balancerPending.lastState)
  80  	cur := gsb.balancerCurrent
  81  	gsb.balancerCurrent = gsb.balancerPending
  82  	gsb.balancerPending = nil
  83  	gsb.activeGoroutines.Add(1)
  84  	go func() {
  85  		defer gsb.activeGoroutines.Done()
  86  		gsb.currentMu.Lock()
  87  		defer gsb.currentMu.Unlock()
  88  		cur.Close()
  89  	}()
  90  }
  91  
  92  // Helper function that checks if the balancer passed in is current or pending.
  93  // The caller must hold gsb.mu.
  94  func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {
  95  	return bw == gsb.balancerCurrent || bw == gsb.balancerPending
  96  }
  97  
  98  // SwitchTo initializes the graceful switch process, which completes based on
  99  // connectivity state changes on the current/pending balancer. Thus, the switch
 100  // process is not complete when this method returns. This method must be called
 101  // synchronously alongside the rest of the balancer.Balancer methods this
 102  // Graceful Switch Balancer implements.
 103  //
 104  // Deprecated: use ParseConfig and pass a parsed config to UpdateClientConnState
 105  // to cause the Balancer to automatically change to the new child when necessary.
 106  func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
 107  	_, err := gsb.switchTo(builder)
 108  	return err
 109  }
 110  
 111  func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) {
 112  	gsb.mu.Lock()
 113  	if gsb.closed {
 114  		gsb.mu.Unlock()
 115  		return nil, errBalancerClosed
 116  	}
 117  	bw := &balancerWrapper{
 118  		ClientConn: gsb.cc,
 119  		builder:    builder,
 120  		gsb:        gsb,
 121  		lastState: balancer.State{
 122  			ConnectivityState: connectivity.Connecting,
 123  			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
 124  		},
 125  		subconns: make(map[balancer.SubConn]bool),
 126  	}
 127  	balToClose := gsb.balancerPending // nil if there is no pending balancer
 128  	if gsb.balancerCurrent == nil {
 129  		gsb.balancerCurrent = bw
 130  	} else {
 131  		gsb.balancerPending = bw
 132  	}
 133  	gsb.mu.Unlock()
 134  	balToClose.Close()
 135  	// This function takes a builder instead of a balancer because builder.Build
 136  	// can call back inline, and this utility needs to handle the callbacks.
 137  	newBalancer := builder.Build(bw, gsb.bOpts)
 138  	if newBalancer == nil {
 139  		// This is illegal and should never happen; we clear the balancerWrapper
 140  		// we were constructing if it happens to avoid a potential panic.
 141  		gsb.mu.Lock()
 142  		if gsb.balancerPending != nil {
 143  			gsb.balancerPending = nil
 144  		} else {
 145  			gsb.balancerCurrent = nil
 146  		}
 147  		gsb.mu.Unlock()
 148  		return nil, balancer.ErrBadResolverState
 149  	}
 150  
 151  	// This write doesn't need to take gsb.mu because this field never gets read
 152  	// or written to on any calls from the current or pending. Calls from grpc
 153  	// to this balancer are guaranteed to be called synchronously, so this
 154  	// bw.Balancer field will never be forwarded to until this SwitchTo()
 155  	// function returns.
 156  	bw.Balancer = newBalancer
 157  	return bw, nil
 158  }
 159  
 160  // Returns nil if the graceful switch balancer is closed.
 161  func (gsb *Balancer) latestBalancer() *balancerWrapper {
 162  	gsb.mu.Lock()
 163  	defer gsb.mu.Unlock()
 164  	if gsb.balancerPending != nil {
 165  		return gsb.balancerPending
 166  	}
 167  	return gsb.balancerCurrent
 168  }
 169  
 170  // UpdateClientConnState forwards the update to the latest balancer created.
 171  //
 172  // If the state's BalancerConfig is the config returned by a call to
 173  // gracefulswitch.ParseConfig, then this function will automatically SwitchTo
 174  // the balancer indicated by the config before forwarding its config to it, if
 175  // necessary.
 176  func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
 177  	// The resolver data is only relevant to the most recent LB Policy.
 178  	balToUpdate := gsb.latestBalancer()
 179  	gsbCfg, ok := state.BalancerConfig.(*lbConfig)
 180  	if ok {
 181  		// Switch to the child in the config unless it is already active.
 182  		if balToUpdate == nil || gsbCfg.childBuilder.Name() != balToUpdate.builder.Name() {
 183  			var err error
 184  			balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder)
 185  			if err != nil {
 186  				return fmt.Errorf("could not switch to new child balancer: %w", err)
 187  			}
 188  		}
 189  		// Unwrap the child balancer's config.
 190  		state.BalancerConfig = gsbCfg.childConfig
 191  	}
 192  
 193  	if balToUpdate == nil {
 194  		return errBalancerClosed
 195  	}
 196  
 197  	// Perform this call without gsb.mu to prevent deadlocks if the child calls
 198  	// back into the channel. The latest balancer can never be closed during a
 199  	// call from the channel, even without gsb.mu held.
 200  	return balToUpdate.UpdateClientConnState(state)
 201  }
 202  
 203  // ResolverError forwards the error to the latest balancer created.
 204  func (gsb *Balancer) ResolverError(err error) {
 205  	// The resolver data is only relevant to the most recent LB Policy.
 206  	balToUpdate := gsb.latestBalancer()
 207  	if balToUpdate == nil {
 208  		gsb.cc.UpdateState(balancer.State{
 209  			ConnectivityState: connectivity.TransientFailure,
 210  			Picker:            base.NewErrPicker(err),
 211  		})
 212  		return
 213  	}
 214  	// Perform this call without gsb.mu to prevent deadlocks if the child calls
 215  	// back into the channel. The latest balancer can never be closed during a
 216  	// call from the channel, even without gsb.mu held.
 217  	balToUpdate.ResolverError(err)
 218  }
 219  
 220  // ExitIdle forwards the call to the latest balancer created.
 221  //
 222  // If the latest balancer does not support ExitIdle, the subConns are
 223  // re-connected to manually.
 224  func (gsb *Balancer) ExitIdle() {
 225  	balToUpdate := gsb.latestBalancer()
 226  	if balToUpdate == nil {
 227  		return
 228  	}
 229  	// There is no need to protect this read with a mutex, as the write to the
 230  	// Balancer field happens in SwitchTo, which completes before this can be
 231  	// called.
 232  	balToUpdate.ExitIdle()
 233  }
 234  
 235  // updateSubConnState forwards the update to the appropriate child.
 236  func (gsb *Balancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
 237  	gsb.currentMu.Lock()
 238  	defer gsb.currentMu.Unlock()
 239  	gsb.mu.Lock()
 240  	// Forward update to the appropriate child.  Even if there is a pending
 241  	// balancer, the current balancer should continue to get SubConn updates to
 242  	// maintain the proper state while the pending is still connecting.
 243  	var balToUpdate *balancerWrapper
 244  	if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {
 245  		balToUpdate = gsb.balancerCurrent
 246  	} else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
 247  		balToUpdate = gsb.balancerPending
 248  	}
 249  	if balToUpdate == nil {
 250  		// SubConn belonged to a stale lb policy that has not yet fully closed,
 251  		// or the balancer was already closed.
 252  		gsb.mu.Unlock()
 253  		return
 254  	}
 255  	if state.ConnectivityState == connectivity.Shutdown {
 256  		delete(balToUpdate.subconns, sc)
 257  	}
 258  	gsb.mu.Unlock()
 259  	if cb != nil {
 260  		cb(state)
 261  	} else {
 262  		balToUpdate.UpdateSubConnState(sc, state)
 263  	}
 264  }
 265  
 266  // UpdateSubConnState forwards the update to the appropriate child.
 267  func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
 268  	gsb.updateSubConnState(sc, state, nil)
 269  }
 270  
 271  // Close closes any active child balancers.
 272  func (gsb *Balancer) Close() {
 273  	gsb.mu.Lock()
 274  	gsb.closed = true
 275  	currentBalancerToClose := gsb.balancerCurrent
 276  	gsb.balancerCurrent = nil
 277  	pendingBalancerToClose := gsb.balancerPending
 278  	gsb.balancerPending = nil
 279  	gsb.mu.Unlock()
 280  
 281  	currentBalancerToClose.Close()
 282  	pendingBalancerToClose.Close()
 283  	gsb.activeGoroutines.Wait()
 284  }
 285  
 286  // balancerWrapper wraps a balancer.Balancer, and overrides some Balancer
 287  // methods to help cleanup SubConns created by the wrapped balancer.
 288  //
 289  // It implements the balancer.ClientConn interface and is passed down in that
 290  // capacity to the wrapped balancer. It maintains a set of subConns created by
 291  // the wrapped balancer and calls from the latter to create/update/shutdown
 292  // SubConns update this set before being forwarded to the parent ClientConn.
 293  // State updates from the wrapped balancer can result in invocation of the
 294  // graceful switch logic.
 295  type balancerWrapper struct {
 296  	balancer.ClientConn
 297  	balancer.Balancer
 298  	gsb     *Balancer
 299  	builder balancer.Builder
 300  
 301  	lastState balancer.State
 302  	subconns  map[balancer.SubConn]bool // subconns created by this balancer
 303  }
 304  
 305  // Close closes the underlying LB policy and shuts down the subconns it
 306  // created. bw must not be referenced via balancerCurrent or balancerPending in
 307  // gsb when called. gsb.mu must not be held.  Does not panic with a nil
 308  // receiver.
 309  func (bw *balancerWrapper) Close() {
 310  	// before Close is called.
 311  	if bw == nil {
 312  		return
 313  	}
 314  	// There is no need to protect this read with a mutex, as Close() is
 315  	// impossible to be called concurrently with the write in SwitchTo(). The
 316  	// callsites of Close() for this balancer in Graceful Switch Balancer will
 317  	// never be called until SwitchTo() returns.
 318  	bw.Balancer.Close()
 319  	bw.gsb.mu.Lock()
 320  	for sc := range bw.subconns {
 321  		sc.Shutdown()
 322  	}
 323  	bw.gsb.mu.Unlock()
 324  }
 325  
 326  func (bw *balancerWrapper) UpdateState(state balancer.State) {
 327  	// Hold the mutex for this entire call to ensure it cannot occur
 328  	// concurrently with other updateState() calls. This causes updates to
 329  	// lastState and calls to cc.UpdateState to happen atomically.
 330  	bw.gsb.mu.Lock()
 331  	defer bw.gsb.mu.Unlock()
 332  	bw.lastState = state
 333  
 334  	// If Close() acquires the mutex before UpdateState(), the balancer
 335  	// will already have been removed from the current or pending state when
 336  	// reaching this point.
 337  	if !bw.gsb.balancerCurrentOrPending(bw) {
 338  		// Returning here ensures that (*Balancer).swap() is not invoked after
 339  		// (*Balancer).Close() and therefore prevents "use after close".
 340  		return
 341  	}
 342  
 343  	if bw == bw.gsb.balancerCurrent {
 344  		// In the case that the current balancer exits READY, and there is a pending
 345  		// balancer, you can forward the pending balancer's cached State up to
 346  		// ClientConn and swap the pending into the current. This is because there
 347  		// is no reason to gracefully switch from and keep using the old policy as
 348  		// the ClientConn is not connected to any backends.
 349  		if state.ConnectivityState != connectivity.Ready && bw.gsb.balancerPending != nil {
 350  			bw.gsb.swap()
 351  			return
 352  		}
 353  		// Even if there is a pending balancer waiting to be gracefully switched to,
 354  		// continue to forward current balancer updates to the Client Conn. Ignoring
 355  		// state + picker from the current would cause undefined behavior/cause the
 356  		// system to behave incorrectly from the current LB policies perspective.
 357  		// Also, the current LB is still being used by grpc to choose SubConns per
 358  		// RPC, and thus should use the most updated form of the current balancer.
 359  		bw.gsb.cc.UpdateState(state)
 360  		return
 361  	}
 362  	// This method is now dealing with a state update from the pending balancer.
 363  	// If the current balancer is currently in a state other than READY, the new
 364  	// policy can be swapped into place immediately. This is because there is no
 365  	// reason to gracefully switch from and keep using the old policy as the
 366  	// ClientConn is not connected to any backends.
 367  	if state.ConnectivityState != connectivity.Connecting || bw.gsb.balancerCurrent.lastState.ConnectivityState != connectivity.Ready {
 368  		bw.gsb.swap()
 369  	}
 370  }
 371  
 372  func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
 373  	bw.gsb.mu.Lock()
 374  	if !bw.gsb.balancerCurrentOrPending(bw) {
 375  		bw.gsb.mu.Unlock()
 376  		return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
 377  	}
 378  	bw.gsb.mu.Unlock()
 379  
 380  	var sc balancer.SubConn
 381  	oldListener := opts.StateListener
 382  	opts.StateListener = func(state balancer.SubConnState) { bw.gsb.updateSubConnState(sc, state, oldListener) }
 383  	sc, err := bw.gsb.cc.NewSubConn(addrs, opts)
 384  	if err != nil {
 385  		return nil, err
 386  	}
 387  	bw.gsb.mu.Lock()
 388  	if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call
 389  		sc.Shutdown()
 390  		bw.gsb.mu.Unlock()
 391  		return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
 392  	}
 393  	bw.subconns[sc] = true
 394  	bw.gsb.mu.Unlock()
 395  	return sc, nil
 396  }
 397  
 398  func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
 399  	// Ignore ResolveNow requests from anything other than the most recent
 400  	// balancer, because older balancers were already removed from the config.
 401  	if bw != bw.gsb.latestBalancer() {
 402  		return
 403  	}
 404  	bw.gsb.cc.ResolveNow(opts)
 405  }
 406  
 407  func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
 408  	// Note: existing third party balancers may call this, so it must remain
 409  	// until RemoveSubConn is fully removed.
 410  	sc.Shutdown()
 411  }
 412  
 413  func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
 414  	bw.gsb.mu.Lock()
 415  	if !bw.gsb.balancerCurrentOrPending(bw) {
 416  		bw.gsb.mu.Unlock()
 417  		return
 418  	}
 419  	bw.gsb.mu.Unlock()
 420  	bw.gsb.cc.UpdateAddresses(sc, addrs)
 421  }
 422