1 /*
2 *
3 * Copyright 2024 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 // Package endpointsharding implements a load balancing policy that manages
20 // homogeneous child policies each owning a single endpoint.
21 //
22 // # Experimental
23 //
24 // Notice: This package is EXPERIMENTAL and may be changed or removed in a
25 // later release.
26 package endpointsharding
27 28 import (
29 "errors"
30 rand "math/rand/v2"
31 "sync"
32 "sync/atomic"
33 34 "google.golang.org/grpc/balancer"
35 "google.golang.org/grpc/balancer/base"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/resolver"
38 )
39 40 var randIntN = rand.IntN
41 42 // ChildState is the balancer state of a child along with the endpoint which
43 // identifies the child balancer.
44 type ChildState struct {
45 Endpoint resolver.Endpoint
46 State balancer.State
47 48 // Balancer exposes only the ExitIdler interface of the child LB policy.
49 // Other methods of the child policy are called only by endpointsharding.
50 Balancer ExitIdler
51 }
52 53 // ExitIdler provides access to only the ExitIdle method of the child balancer.
54 type ExitIdler interface {
55 // ExitIdle instructs the LB policy to reconnect to backends / exit the
56 // IDLE state, if appropriate and possible. Note that SubConns that enter
57 // the IDLE state will not reconnect until SubConn.Connect is called.
58 ExitIdle()
59 }
60 61 // Options are the options to configure the behaviour of the
62 // endpointsharding balancer.
63 type Options struct {
64 // DisableAutoReconnect allows the balancer to keep child balancer in the
65 // IDLE state until they are explicitly triggered to exit using the
66 // ChildState obtained from the endpointsharding picker. When set to false,
67 // the endpointsharding balancer will automatically call ExitIdle on child
68 // connections that report IDLE.
69 DisableAutoReconnect bool
70 }
71 72 // ChildBuilderFunc creates a new balancer with the ClientConn. It has the same
73 // type as the balancer.Builder.Build method.
74 type ChildBuilderFunc func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer
75 76 // NewBalancer returns a load balancing policy that manages homogeneous child
77 // policies each owning a single endpoint. The endpointsharding balancer
78 // forwards the LoadBalancingConfig in ClientConn state updates to its children.
79 func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions, childBuilder ChildBuilderFunc, esOpts Options) balancer.Balancer {
80 es := &endpointSharding{
81 cc: cc,
82 bOpts: opts,
83 esOpts: esOpts,
84 childBuilder: childBuilder,
85 }
86 es.children.Store(resolver.NewEndpointMap[*balancerWrapper]())
87 return es
88 }
89 90 // endpointSharding is a balancer that wraps child balancers. It creates a child
91 // balancer with child config for every unique Endpoint received. It updates the
92 // child states on any update from parent or child.
93 type endpointSharding struct {
94 cc balancer.ClientConn
95 bOpts balancer.BuildOptions
96 esOpts Options
97 childBuilder ChildBuilderFunc
98 99 // childMu synchronizes calls to any single child. It must be held for all
100 // calls into a child. To avoid deadlocks, do not acquire childMu while
101 // holding mu.
102 childMu sync.Mutex
103 children atomic.Pointer[resolver.EndpointMap[*balancerWrapper]]
104 105 // inhibitChildUpdates is set during UpdateClientConnState/ResolverError
106 // calls (calls to children will each produce an update, only want one
107 // update).
108 inhibitChildUpdates atomic.Bool
109 110 // mu synchronizes access to the state stored in balancerWrappers in the
111 // children field. mu must not be held during calls into a child since
112 // synchronous calls back from the child may require taking mu, causing a
113 // deadlock. To avoid deadlocks, do not acquire childMu while holding mu.
114 mu sync.Mutex
115 }
116 117 // rotateEndpoints returns a slice of all the input endpoints rotated a random
118 // amount.
119 func rotateEndpoints(es []resolver.Endpoint) []resolver.Endpoint {
120 les := len(es)
121 if les == 0 {
122 return es
123 }
124 r := randIntN(les)
125 // Make a copy to avoid mutating data beyond the end of es.
126 ret := make([]resolver.Endpoint, les)
127 copy(ret, es[r:])
128 copy(ret[les-r:], es[:r])
129 return ret
130 }
131 132 // UpdateClientConnState creates a child for new endpoints and deletes children
133 // for endpoints that are no longer present. It also updates all the children,
134 // and sends a single synchronous update of the childrens' aggregated state at
135 // the end of the UpdateClientConnState operation. If any endpoint has no
136 // addresses it will ignore that endpoint. Otherwise, returns first error found
137 // from a child, but fully processes the new update.
138 func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error {
139 es.childMu.Lock()
140 defer es.childMu.Unlock()
141 142 es.inhibitChildUpdates.Store(true)
143 defer func() {
144 es.inhibitChildUpdates.Store(false)
145 es.updateState()
146 }()
147 var ret error
148 149 children := es.children.Load()
150 newChildren := resolver.NewEndpointMap[*balancerWrapper]()
151 152 // Update/Create new children.
153 for _, endpoint := range rotateEndpoints(state.ResolverState.Endpoints) {
154 if _, ok := newChildren.Get(endpoint); ok {
155 // Endpoint child was already created, continue to avoid duplicate
156 // update.
157 continue
158 }
159 childBalancer, ok := children.Get(endpoint)
160 if ok {
161 // Endpoint attributes may have changed, update the stored endpoint.
162 es.mu.Lock()
163 childBalancer.childState.Endpoint = endpoint
164 es.mu.Unlock()
165 } else {
166 childBalancer = &balancerWrapper{
167 childState: ChildState{Endpoint: endpoint},
168 ClientConn: es.cc,
169 es: es,
170 }
171 childBalancer.childState.Balancer = childBalancer
172 childBalancer.child = es.childBuilder(childBalancer, es.bOpts)
173 }
174 newChildren.Set(endpoint, childBalancer)
175 if err := childBalancer.updateClientConnStateLocked(balancer.ClientConnState{
176 BalancerConfig: state.BalancerConfig,
177 ResolverState: resolver.State{
178 Endpoints: []resolver.Endpoint{endpoint},
179 Attributes: state.ResolverState.Attributes,
180 },
181 }); err != nil && ret == nil {
182 // Return first error found, and always commit full processing of
183 // updating children. If desired to process more specific errors
184 // across all endpoints, caller should make these specific
185 // validations, this is a current limitation for simplicity sake.
186 ret = err
187 }
188 }
189 // Delete old children that are no longer present.
190 for _, e := range children.Keys() {
191 child, _ := children.Get(e)
192 if _, ok := newChildren.Get(e); !ok {
193 child.closeLocked()
194 }
195 }
196 es.children.Store(newChildren)
197 if newChildren.Len() == 0 {
198 return balancer.ErrBadResolverState
199 }
200 return ret
201 }
202 203 // ResolverError forwards the resolver error to all of the endpointSharding's
204 // children and sends a single synchronous update of the childStates at the end
205 // of the ResolverError operation.
206 func (es *endpointSharding) ResolverError(err error) {
207 es.childMu.Lock()
208 defer es.childMu.Unlock()
209 es.inhibitChildUpdates.Store(true)
210 defer func() {
211 es.inhibitChildUpdates.Store(false)
212 es.updateState()
213 }()
214 children := es.children.Load()
215 for _, child := range children.Values() {
216 child.resolverErrorLocked(err)
217 }
218 }
219 220 func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
221 // UpdateSubConnState is deprecated.
222 }
223 224 func (es *endpointSharding) Close() {
225 es.childMu.Lock()
226 defer es.childMu.Unlock()
227 children := es.children.Load()
228 for _, child := range children.Values() {
229 child.closeLocked()
230 }
231 }
232 233 func (es *endpointSharding) ExitIdle() {
234 es.childMu.Lock()
235 defer es.childMu.Unlock()
236 for _, bw := range es.children.Load().Values() {
237 if !bw.isClosed {
238 bw.child.ExitIdle()
239 }
240 }
241 }
242 243 // updateState updates this component's state. It sends the aggregated state,
244 // and a picker with round robin behavior with all the child states present if
245 // needed.
246 func (es *endpointSharding) updateState() {
247 if es.inhibitChildUpdates.Load() {
248 return
249 }
250 var readyPickers, connectingPickers, idlePickers, transientFailurePickers []balancer.Picker
251 252 es.mu.Lock()
253 defer es.mu.Unlock()
254 255 children := es.children.Load()
256 childStates := make([]ChildState, 0, children.Len())
257 258 for _, child := range children.Values() {
259 childState := child.childState
260 childStates = append(childStates, childState)
261 childPicker := childState.State.Picker
262 switch childState.State.ConnectivityState {
263 case connectivity.Ready:
264 readyPickers = append(readyPickers, childPicker)
265 case connectivity.Connecting:
266 connectingPickers = append(connectingPickers, childPicker)
267 case connectivity.Idle:
268 idlePickers = append(idlePickers, childPicker)
269 case connectivity.TransientFailure:
270 transientFailurePickers = append(transientFailurePickers, childPicker)
271 // connectivity.Shutdown shouldn't appear.
272 }
273 }
274 275 // Construct the round robin picker based off the aggregated state. Whatever
276 // the aggregated state, use the pickers present that are currently in that
277 // state only.
278 var aggState connectivity.State
279 var pickers []balancer.Picker
280 if len(readyPickers) >= 1 {
281 aggState = connectivity.Ready
282 pickers = readyPickers
283 } else if len(connectingPickers) >= 1 {
284 aggState = connectivity.Connecting
285 pickers = connectingPickers
286 } else if len(idlePickers) >= 1 {
287 aggState = connectivity.Idle
288 pickers = idlePickers
289 } else if len(transientFailurePickers) >= 1 {
290 aggState = connectivity.TransientFailure
291 pickers = transientFailurePickers
292 } else {
293 aggState = connectivity.TransientFailure
294 pickers = []balancer.Picker{base.NewErrPicker(errors.New("no children to pick from"))}
295 } // No children (resolver error before valid update).
296 p := &pickerWithChildStates{
297 pickers: pickers,
298 childStates: childStates,
299 next: uint32(randIntN(len(pickers))),
300 }
301 es.cc.UpdateState(balancer.State{
302 ConnectivityState: aggState,
303 Picker: p,
304 })
305 }
306 307 // pickerWithChildStates delegates to the pickers it holds in a round robin
308 // fashion. It also contains the childStates of all the endpointSharding's
309 // children.
310 type pickerWithChildStates struct {
311 pickers []balancer.Picker
312 childStates []ChildState
313 next uint32
314 }
315 316 func (p *pickerWithChildStates) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
317 nextIndex := atomic.AddUint32(&p.next, 1)
318 picker := p.pickers[nextIndex%uint32(len(p.pickers))]
319 return picker.Pick(info)
320 }
321 322 // ChildStatesFromPicker returns the state of all the children managed by the
323 // endpoint sharding balancer that created this picker.
324 func ChildStatesFromPicker(picker balancer.Picker) []ChildState {
325 p, ok := picker.(*pickerWithChildStates)
326 if !ok {
327 return nil
328 }
329 return p.childStates
330 }
331 332 // balancerWrapper is a wrapper of a balancer. It ID's a child balancer by
333 // endpoint, and persists recent child balancer state.
334 type balancerWrapper struct {
335 // The following fields are initialized at build time and read-only after
336 // that and therefore do not need to be guarded by a mutex.
337 338 // child contains the wrapped balancer. Access its methods only through
339 // methods on balancerWrapper to ensure proper synchronization
340 child balancer.Balancer
341 balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns
342 343 es *endpointSharding
344 345 // Access to the following fields is guarded by es.mu.
346 347 childState ChildState
348 isClosed bool
349 }
350 351 func (bw *balancerWrapper) UpdateState(state balancer.State) {
352 bw.es.mu.Lock()
353 bw.childState.State = state
354 bw.es.mu.Unlock()
355 if state.ConnectivityState == connectivity.Idle && !bw.es.esOpts.DisableAutoReconnect {
356 bw.ExitIdle()
357 }
358 bw.es.updateState()
359 }
360 361 // ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to
362 // avoid deadlocks due to synchronous balancer state updates.
363 func (bw *balancerWrapper) ExitIdle() {
364 go func() {
365 bw.es.childMu.Lock()
366 if !bw.isClosed {
367 bw.child.ExitIdle()
368 }
369 bw.es.childMu.Unlock()
370 }()
371 }
372 373 // updateClientConnStateLocked delivers the ClientConnState to the child
374 // balancer. Callers must hold the child mutex of the parent endpointsharding
375 // balancer.
376 func (bw *balancerWrapper) updateClientConnStateLocked(ccs balancer.ClientConnState) error {
377 return bw.child.UpdateClientConnState(ccs)
378 }
379 380 // closeLocked closes the child balancer. Callers must hold the child mutext of
381 // the parent endpointsharding balancer.
382 func (bw *balancerWrapper) closeLocked() {
383 bw.child.Close()
384 bw.isClosed = true
385 }
386 387 func (bw *balancerWrapper) resolverErrorLocked(err error) {
388 bw.child.ResolverError(err)
389 }
390