endpointsharding.go raw

   1  /*
   2   *
   3   * Copyright 2024 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  // Package endpointsharding implements a load balancing policy that manages
  20  // homogeneous child policies each owning a single endpoint.
  21  //
  22  // # Experimental
  23  //
  24  // Notice: This package is EXPERIMENTAL and may be changed or removed in a
  25  // later release.
  26  package endpointsharding
  27  
  28  import (
  29  	"errors"
  30  	rand "math/rand/v2"
  31  	"sync"
  32  	"sync/atomic"
  33  
  34  	"google.golang.org/grpc/balancer"
  35  	"google.golang.org/grpc/balancer/base"
  36  	"google.golang.org/grpc/connectivity"
  37  	"google.golang.org/grpc/resolver"
  38  )
  39  
  40  var randIntN = rand.IntN
  41  
  42  // ChildState is the balancer state of a child along with the endpoint which
  43  // identifies the child balancer.
  44  type ChildState struct {
  45  	Endpoint resolver.Endpoint
  46  	State    balancer.State
  47  
  48  	// Balancer exposes only the ExitIdler interface of the child LB policy.
  49  	// Other methods of the child policy are called only by endpointsharding.
  50  	Balancer ExitIdler
  51  }
  52  
  53  // ExitIdler provides access to only the ExitIdle method of the child balancer.
  54  type ExitIdler interface {
  55  	// ExitIdle instructs the LB policy to reconnect to backends / exit the
  56  	// IDLE state, if appropriate and possible.  Note that SubConns that enter
  57  	// the IDLE state will not reconnect until SubConn.Connect is called.
  58  	ExitIdle()
  59  }
  60  
  61  // Options are the options to configure the behaviour of the
  62  // endpointsharding balancer.
  63  type Options struct {
  64  	// DisableAutoReconnect allows the balancer to keep child balancer in the
  65  	// IDLE state until they are explicitly triggered to exit using the
  66  	// ChildState obtained from the endpointsharding picker. When set to false,
  67  	// the endpointsharding balancer will automatically call ExitIdle on child
  68  	// connections that report IDLE.
  69  	DisableAutoReconnect bool
  70  }
  71  
  72  // ChildBuilderFunc creates a new balancer with the ClientConn. It has the same
  73  // type as the balancer.Builder.Build method.
  74  type ChildBuilderFunc func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer
  75  
  76  // NewBalancer returns a load balancing policy that manages homogeneous child
  77  // policies each owning a single endpoint. The endpointsharding balancer
  78  // forwards the LoadBalancingConfig in ClientConn state updates to its children.
  79  func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions, childBuilder ChildBuilderFunc, esOpts Options) balancer.Balancer {
  80  	es := &endpointSharding{
  81  		cc:           cc,
  82  		bOpts:        opts,
  83  		esOpts:       esOpts,
  84  		childBuilder: childBuilder,
  85  	}
  86  	es.children.Store(resolver.NewEndpointMap[*balancerWrapper]())
  87  	return es
  88  }
  89  
  90  // endpointSharding is a balancer that wraps child balancers. It creates a child
  91  // balancer with child config for every unique Endpoint received. It updates the
  92  // child states on any update from parent or child.
  93  type endpointSharding struct {
  94  	cc           balancer.ClientConn
  95  	bOpts        balancer.BuildOptions
  96  	esOpts       Options
  97  	childBuilder ChildBuilderFunc
  98  
  99  	// childMu synchronizes calls to any single child. It must be held for all
 100  	// calls into a child. To avoid deadlocks, do not acquire childMu while
 101  	// holding mu.
 102  	childMu  sync.Mutex
 103  	children atomic.Pointer[resolver.EndpointMap[*balancerWrapper]]
 104  
 105  	// inhibitChildUpdates is set during UpdateClientConnState/ResolverError
 106  	// calls (calls to children will each produce an update, only want one
 107  	// update).
 108  	inhibitChildUpdates atomic.Bool
 109  
 110  	// mu synchronizes access to the state stored in balancerWrappers in the
 111  	// children field. mu must not be held during calls into a child since
 112  	// synchronous calls back from the child may require taking mu, causing a
 113  	// deadlock. To avoid deadlocks, do not acquire childMu while holding mu.
 114  	mu sync.Mutex
 115  }
 116  
 117  // rotateEndpoints returns a slice of all the input endpoints rotated a random
 118  // amount.
 119  func rotateEndpoints(es []resolver.Endpoint) []resolver.Endpoint {
 120  	les := len(es)
 121  	if les == 0 {
 122  		return es
 123  	}
 124  	r := randIntN(les)
 125  	// Make a copy to avoid mutating data beyond the end of es.
 126  	ret := make([]resolver.Endpoint, les)
 127  	copy(ret, es[r:])
 128  	copy(ret[les-r:], es[:r])
 129  	return ret
 130  }
 131  
 132  // UpdateClientConnState creates a child for new endpoints and deletes children
 133  // for endpoints that are no longer present. It also updates all the children,
 134  // and sends a single synchronous update of the childrens' aggregated state at
 135  // the end of the UpdateClientConnState operation. If any endpoint has no
 136  // addresses it will ignore that endpoint. Otherwise, returns first error found
 137  // from a child, but fully processes the new update.
 138  func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error {
 139  	es.childMu.Lock()
 140  	defer es.childMu.Unlock()
 141  
 142  	es.inhibitChildUpdates.Store(true)
 143  	defer func() {
 144  		es.inhibitChildUpdates.Store(false)
 145  		es.updateState()
 146  	}()
 147  	var ret error
 148  
 149  	children := es.children.Load()
 150  	newChildren := resolver.NewEndpointMap[*balancerWrapper]()
 151  
 152  	// Update/Create new children.
 153  	for _, endpoint := range rotateEndpoints(state.ResolverState.Endpoints) {
 154  		if _, ok := newChildren.Get(endpoint); ok {
 155  			// Endpoint child was already created, continue to avoid duplicate
 156  			// update.
 157  			continue
 158  		}
 159  		childBalancer, ok := children.Get(endpoint)
 160  		if ok {
 161  			// Endpoint attributes may have changed, update the stored endpoint.
 162  			es.mu.Lock()
 163  			childBalancer.childState.Endpoint = endpoint
 164  			es.mu.Unlock()
 165  		} else {
 166  			childBalancer = &balancerWrapper{
 167  				childState: ChildState{Endpoint: endpoint},
 168  				ClientConn: es.cc,
 169  				es:         es,
 170  			}
 171  			childBalancer.childState.Balancer = childBalancer
 172  			childBalancer.child = es.childBuilder(childBalancer, es.bOpts)
 173  		}
 174  		newChildren.Set(endpoint, childBalancer)
 175  		if err := childBalancer.updateClientConnStateLocked(balancer.ClientConnState{
 176  			BalancerConfig: state.BalancerConfig,
 177  			ResolverState: resolver.State{
 178  				Endpoints:  []resolver.Endpoint{endpoint},
 179  				Attributes: state.ResolverState.Attributes,
 180  			},
 181  		}); err != nil && ret == nil {
 182  			// Return first error found, and always commit full processing of
 183  			// updating children. If desired to process more specific errors
 184  			// across all endpoints, caller should make these specific
 185  			// validations, this is a current limitation for simplicity sake.
 186  			ret = err
 187  		}
 188  	}
 189  	// Delete old children that are no longer present.
 190  	for _, e := range children.Keys() {
 191  		child, _ := children.Get(e)
 192  		if _, ok := newChildren.Get(e); !ok {
 193  			child.closeLocked()
 194  		}
 195  	}
 196  	es.children.Store(newChildren)
 197  	if newChildren.Len() == 0 {
 198  		return balancer.ErrBadResolverState
 199  	}
 200  	return ret
 201  }
 202  
 203  // ResolverError forwards the resolver error to all of the endpointSharding's
 204  // children and sends a single synchronous update of the childStates at the end
 205  // of the ResolverError operation.
 206  func (es *endpointSharding) ResolverError(err error) {
 207  	es.childMu.Lock()
 208  	defer es.childMu.Unlock()
 209  	es.inhibitChildUpdates.Store(true)
 210  	defer func() {
 211  		es.inhibitChildUpdates.Store(false)
 212  		es.updateState()
 213  	}()
 214  	children := es.children.Load()
 215  	for _, child := range children.Values() {
 216  		child.resolverErrorLocked(err)
 217  	}
 218  }
 219  
 220  func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
 221  	// UpdateSubConnState is deprecated.
 222  }
 223  
 224  func (es *endpointSharding) Close() {
 225  	es.childMu.Lock()
 226  	defer es.childMu.Unlock()
 227  	children := es.children.Load()
 228  	for _, child := range children.Values() {
 229  		child.closeLocked()
 230  	}
 231  }
 232  
 233  func (es *endpointSharding) ExitIdle() {
 234  	es.childMu.Lock()
 235  	defer es.childMu.Unlock()
 236  	for _, bw := range es.children.Load().Values() {
 237  		if !bw.isClosed {
 238  			bw.child.ExitIdle()
 239  		}
 240  	}
 241  }
 242  
 243  // updateState updates this component's state. It sends the aggregated state,
 244  // and a picker with round robin behavior with all the child states present if
 245  // needed.
 246  func (es *endpointSharding) updateState() {
 247  	if es.inhibitChildUpdates.Load() {
 248  		return
 249  	}
 250  	var readyPickers, connectingPickers, idlePickers, transientFailurePickers []balancer.Picker
 251  
 252  	es.mu.Lock()
 253  	defer es.mu.Unlock()
 254  
 255  	children := es.children.Load()
 256  	childStates := make([]ChildState, 0, children.Len())
 257  
 258  	for _, child := range children.Values() {
 259  		childState := child.childState
 260  		childStates = append(childStates, childState)
 261  		childPicker := childState.State.Picker
 262  		switch childState.State.ConnectivityState {
 263  		case connectivity.Ready:
 264  			readyPickers = append(readyPickers, childPicker)
 265  		case connectivity.Connecting:
 266  			connectingPickers = append(connectingPickers, childPicker)
 267  		case connectivity.Idle:
 268  			idlePickers = append(idlePickers, childPicker)
 269  		case connectivity.TransientFailure:
 270  			transientFailurePickers = append(transientFailurePickers, childPicker)
 271  			// connectivity.Shutdown shouldn't appear.
 272  		}
 273  	}
 274  
 275  	// Construct the round robin picker based off the aggregated state. Whatever
 276  	// the aggregated state, use the pickers present that are currently in that
 277  	// state only.
 278  	var aggState connectivity.State
 279  	var pickers []balancer.Picker
 280  	if len(readyPickers) >= 1 {
 281  		aggState = connectivity.Ready
 282  		pickers = readyPickers
 283  	} else if len(connectingPickers) >= 1 {
 284  		aggState = connectivity.Connecting
 285  		pickers = connectingPickers
 286  	} else if len(idlePickers) >= 1 {
 287  		aggState = connectivity.Idle
 288  		pickers = idlePickers
 289  	} else if len(transientFailurePickers) >= 1 {
 290  		aggState = connectivity.TransientFailure
 291  		pickers = transientFailurePickers
 292  	} else {
 293  		aggState = connectivity.TransientFailure
 294  		pickers = []balancer.Picker{base.NewErrPicker(errors.New("no children to pick from"))}
 295  	} // No children (resolver error before valid update).
 296  	p := &pickerWithChildStates{
 297  		pickers:     pickers,
 298  		childStates: childStates,
 299  		next:        uint32(randIntN(len(pickers))),
 300  	}
 301  	es.cc.UpdateState(balancer.State{
 302  		ConnectivityState: aggState,
 303  		Picker:            p,
 304  	})
 305  }
 306  
 307  // pickerWithChildStates delegates to the pickers it holds in a round robin
 308  // fashion. It also contains the childStates of all the endpointSharding's
 309  // children.
 310  type pickerWithChildStates struct {
 311  	pickers     []balancer.Picker
 312  	childStates []ChildState
 313  	next        uint32
 314  }
 315  
 316  func (p *pickerWithChildStates) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
 317  	nextIndex := atomic.AddUint32(&p.next, 1)
 318  	picker := p.pickers[nextIndex%uint32(len(p.pickers))]
 319  	return picker.Pick(info)
 320  }
 321  
 322  // ChildStatesFromPicker returns the state of all the children managed by the
 323  // endpoint sharding balancer that created this picker.
 324  func ChildStatesFromPicker(picker balancer.Picker) []ChildState {
 325  	p, ok := picker.(*pickerWithChildStates)
 326  	if !ok {
 327  		return nil
 328  	}
 329  	return p.childStates
 330  }
 331  
 332  // balancerWrapper is a wrapper of a balancer. It ID's a child balancer by
 333  // endpoint, and persists recent child balancer state.
 334  type balancerWrapper struct {
 335  	// The following fields are initialized at build time and read-only after
 336  	// that and therefore do not need to be guarded by a mutex.
 337  
 338  	// child contains the wrapped balancer. Access its methods only through
 339  	// methods on balancerWrapper to ensure proper synchronization
 340  	child               balancer.Balancer
 341  	balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns
 342  
 343  	es *endpointSharding
 344  
 345  	// Access to the following fields is guarded by es.mu.
 346  
 347  	childState ChildState
 348  	isClosed   bool
 349  }
 350  
 351  func (bw *balancerWrapper) UpdateState(state balancer.State) {
 352  	bw.es.mu.Lock()
 353  	bw.childState.State = state
 354  	bw.es.mu.Unlock()
 355  	if state.ConnectivityState == connectivity.Idle && !bw.es.esOpts.DisableAutoReconnect {
 356  		bw.ExitIdle()
 357  	}
 358  	bw.es.updateState()
 359  }
 360  
 361  // ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to
 362  // avoid deadlocks due to synchronous balancer state updates.
 363  func (bw *balancerWrapper) ExitIdle() {
 364  	go func() {
 365  		bw.es.childMu.Lock()
 366  		if !bw.isClosed {
 367  			bw.child.ExitIdle()
 368  		}
 369  		bw.es.childMu.Unlock()
 370  	}()
 371  }
 372  
 373  // updateClientConnStateLocked delivers the ClientConnState to the child
 374  // balancer. Callers must hold the child mutex of the parent endpointsharding
 375  // balancer.
 376  func (bw *balancerWrapper) updateClientConnStateLocked(ccs balancer.ClientConnState) error {
 377  	return bw.child.UpdateClientConnState(ccs)
 378  }
 379  
 380  // closeLocked closes the child balancer. Callers must hold the child mutext of
 381  // the parent endpointsharding balancer.
 382  func (bw *balancerWrapper) closeLocked() {
 383  	bw.child.Close()
 384  	bw.isClosed = true
 385  }
 386  
 387  func (bw *balancerWrapper) resolverErrorLocked(err error) {
 388  	bw.child.ResolverError(err)
 389  }
 390