1 /*
2 *
3 * Copyright 2022 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 // Package gracefulswitch implements a graceful switch load balancer.
20 package gracefulswitch
21 22 import (
23 "errors"
24 "fmt"
25 "sync"
26 27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/balancer/base"
29 "google.golang.org/grpc/connectivity"
30 "google.golang.org/grpc/resolver"
31 )
32 33 var errBalancerClosed = errors.New("gracefulSwitchBalancer is closed")
34 var _ balancer.Balancer = (*Balancer)(nil)
35 36 // NewBalancer returns a graceful switch Balancer.
37 func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions) *Balancer {
38 return &Balancer{
39 cc: cc,
40 bOpts: opts,
41 }
42 }
43 44 // Balancer is a utility to gracefully switch from one balancer to
45 // a new balancer. It implements the balancer.Balancer interface.
46 type Balancer struct {
47 bOpts balancer.BuildOptions
48 cc balancer.ClientConn
49 50 // mu protects the following fields and all fields within balancerCurrent
51 // and balancerPending. mu does not need to be held when calling into the
52 // child balancers, as all calls into these children happen only as a direct
53 // result of a call into the gracefulSwitchBalancer, which are also
54 // guaranteed to be synchronous. There is one exception: an UpdateState call
55 // from a child balancer when current and pending are populated can lead to
56 // calling Close() on the current. To prevent that racing with an
57 // UpdateSubConnState from the channel, we hold currentMu during Close and
58 // UpdateSubConnState calls.
59 mu sync.Mutex
60 balancerCurrent *balancerWrapper
61 balancerPending *balancerWrapper
62 closed bool // set to true when this balancer is closed
63 64 // currentMu must be locked before mu. This mutex guards against this
65 // sequence of events: UpdateSubConnState() called, finds the
66 // balancerCurrent, gives up lock, updateState comes in, causes Close() on
67 // balancerCurrent before the UpdateSubConnState is called on the
68 // balancerCurrent.
69 currentMu sync.Mutex
70 71 // activeGoroutines tracks all the goroutines that this balancer has started
72 // and that should be waited on when the balancer closes.
73 activeGoroutines sync.WaitGroup
74 }
75 76 // swap swaps out the current lb with the pending lb and updates the ClientConn.
77 // The caller must hold gsb.mu.
78 func (gsb *Balancer) swap() {
79 gsb.cc.UpdateState(gsb.balancerPending.lastState)
80 cur := gsb.balancerCurrent
81 gsb.balancerCurrent = gsb.balancerPending
82 gsb.balancerPending = nil
83 gsb.activeGoroutines.Add(1)
84 go func() {
85 defer gsb.activeGoroutines.Done()
86 gsb.currentMu.Lock()
87 defer gsb.currentMu.Unlock()
88 cur.Close()
89 }()
90 }
91 92 // Helper function that checks if the balancer passed in is current or pending.
93 // The caller must hold gsb.mu.
94 func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {
95 return bw == gsb.balancerCurrent || bw == gsb.balancerPending
96 }
97 98 // SwitchTo initializes the graceful switch process, which completes based on
99 // connectivity state changes on the current/pending balancer. Thus, the switch
100 // process is not complete when this method returns. This method must be called
101 // synchronously alongside the rest of the balancer.Balancer methods this
102 // Graceful Switch Balancer implements.
103 //
104 // Deprecated: use ParseConfig and pass a parsed config to UpdateClientConnState
105 // to cause the Balancer to automatically change to the new child when necessary.
106 func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
107 _, err := gsb.switchTo(builder)
108 return err
109 }
110 111 func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) {
112 gsb.mu.Lock()
113 if gsb.closed {
114 gsb.mu.Unlock()
115 return nil, errBalancerClosed
116 }
117 bw := &balancerWrapper{
118 ClientConn: gsb.cc,
119 builder: builder,
120 gsb: gsb,
121 lastState: balancer.State{
122 ConnectivityState: connectivity.Connecting,
123 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
124 },
125 subconns: make(map[balancer.SubConn]bool),
126 }
127 balToClose := gsb.balancerPending // nil if there is no pending balancer
128 if gsb.balancerCurrent == nil {
129 gsb.balancerCurrent = bw
130 } else {
131 gsb.balancerPending = bw
132 }
133 gsb.mu.Unlock()
134 balToClose.Close()
135 // This function takes a builder instead of a balancer because builder.Build
136 // can call back inline, and this utility needs to handle the callbacks.
137 newBalancer := builder.Build(bw, gsb.bOpts)
138 if newBalancer == nil {
139 // This is illegal and should never happen; we clear the balancerWrapper
140 // we were constructing if it happens to avoid a potential panic.
141 gsb.mu.Lock()
142 if gsb.balancerPending != nil {
143 gsb.balancerPending = nil
144 } else {
145 gsb.balancerCurrent = nil
146 }
147 gsb.mu.Unlock()
148 return nil, balancer.ErrBadResolverState
149 }
150 151 // This write doesn't need to take gsb.mu because this field never gets read
152 // or written to on any calls from the current or pending. Calls from grpc
153 // to this balancer are guaranteed to be called synchronously, so this
154 // bw.Balancer field will never be forwarded to until this SwitchTo()
155 // function returns.
156 bw.Balancer = newBalancer
157 return bw, nil
158 }
159 160 // Returns nil if the graceful switch balancer is closed.
161 func (gsb *Balancer) latestBalancer() *balancerWrapper {
162 gsb.mu.Lock()
163 defer gsb.mu.Unlock()
164 if gsb.balancerPending != nil {
165 return gsb.balancerPending
166 }
167 return gsb.balancerCurrent
168 }
169 170 // UpdateClientConnState forwards the update to the latest balancer created.
171 //
172 // If the state's BalancerConfig is the config returned by a call to
173 // gracefulswitch.ParseConfig, then this function will automatically SwitchTo
174 // the balancer indicated by the config before forwarding its config to it, if
175 // necessary.
176 func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
177 // The resolver data is only relevant to the most recent LB Policy.
178 balToUpdate := gsb.latestBalancer()
179 gsbCfg, ok := state.BalancerConfig.(*lbConfig)
180 if ok {
181 // Switch to the child in the config unless it is already active.
182 if balToUpdate == nil || gsbCfg.childBuilder.Name() != balToUpdate.builder.Name() {
183 var err error
184 balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder)
185 if err != nil {
186 return fmt.Errorf("could not switch to new child balancer: %w", err)
187 }
188 }
189 // Unwrap the child balancer's config.
190 state.BalancerConfig = gsbCfg.childConfig
191 }
192 193 if balToUpdate == nil {
194 return errBalancerClosed
195 }
196 197 // Perform this call without gsb.mu to prevent deadlocks if the child calls
198 // back into the channel. The latest balancer can never be closed during a
199 // call from the channel, even without gsb.mu held.
200 return balToUpdate.UpdateClientConnState(state)
201 }
202 203 // ResolverError forwards the error to the latest balancer created.
204 func (gsb *Balancer) ResolverError(err error) {
205 // The resolver data is only relevant to the most recent LB Policy.
206 balToUpdate := gsb.latestBalancer()
207 if balToUpdate == nil {
208 gsb.cc.UpdateState(balancer.State{
209 ConnectivityState: connectivity.TransientFailure,
210 Picker: base.NewErrPicker(err),
211 })
212 return
213 }
214 // Perform this call without gsb.mu to prevent deadlocks if the child calls
215 // back into the channel. The latest balancer can never be closed during a
216 // call from the channel, even without gsb.mu held.
217 balToUpdate.ResolverError(err)
218 }
219 220 // ExitIdle forwards the call to the latest balancer created.
221 //
222 // If the latest balancer does not support ExitIdle, the subConns are
223 // re-connected to manually.
224 func (gsb *Balancer) ExitIdle() {
225 balToUpdate := gsb.latestBalancer()
226 if balToUpdate == nil {
227 return
228 }
229 // There is no need to protect this read with a mutex, as the write to the
230 // Balancer field happens in SwitchTo, which completes before this can be
231 // called.
232 balToUpdate.ExitIdle()
233 }
234 235 // updateSubConnState forwards the update to the appropriate child.
236 func (gsb *Balancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
237 gsb.currentMu.Lock()
238 defer gsb.currentMu.Unlock()
239 gsb.mu.Lock()
240 // Forward update to the appropriate child. Even if there is a pending
241 // balancer, the current balancer should continue to get SubConn updates to
242 // maintain the proper state while the pending is still connecting.
243 var balToUpdate *balancerWrapper
244 if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {
245 balToUpdate = gsb.balancerCurrent
246 } else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
247 balToUpdate = gsb.balancerPending
248 }
249 if balToUpdate == nil {
250 // SubConn belonged to a stale lb policy that has not yet fully closed,
251 // or the balancer was already closed.
252 gsb.mu.Unlock()
253 return
254 }
255 if state.ConnectivityState == connectivity.Shutdown {
256 delete(balToUpdate.subconns, sc)
257 }
258 gsb.mu.Unlock()
259 if cb != nil {
260 cb(state)
261 } else {
262 balToUpdate.UpdateSubConnState(sc, state)
263 }
264 }
265 266 // UpdateSubConnState forwards the update to the appropriate child.
267 func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
268 gsb.updateSubConnState(sc, state, nil)
269 }
270 271 // Close closes any active child balancers.
272 func (gsb *Balancer) Close() {
273 gsb.mu.Lock()
274 gsb.closed = true
275 currentBalancerToClose := gsb.balancerCurrent
276 gsb.balancerCurrent = nil
277 pendingBalancerToClose := gsb.balancerPending
278 gsb.balancerPending = nil
279 gsb.mu.Unlock()
280 281 currentBalancerToClose.Close()
282 pendingBalancerToClose.Close()
283 gsb.activeGoroutines.Wait()
284 }
285 286 // balancerWrapper wraps a balancer.Balancer, and overrides some Balancer
287 // methods to help cleanup SubConns created by the wrapped balancer.
288 //
289 // It implements the balancer.ClientConn interface and is passed down in that
290 // capacity to the wrapped balancer. It maintains a set of subConns created by
291 // the wrapped balancer and calls from the latter to create/update/shutdown
292 // SubConns update this set before being forwarded to the parent ClientConn.
293 // State updates from the wrapped balancer can result in invocation of the
294 // graceful switch logic.
295 type balancerWrapper struct {
296 balancer.ClientConn
297 balancer.Balancer
298 gsb *Balancer
299 builder balancer.Builder
300 301 lastState balancer.State
302 subconns map[balancer.SubConn]bool // subconns created by this balancer
303 }
304 305 // Close closes the underlying LB policy and shuts down the subconns it
306 // created. bw must not be referenced via balancerCurrent or balancerPending in
307 // gsb when called. gsb.mu must not be held. Does not panic with a nil
308 // receiver.
309 func (bw *balancerWrapper) Close() {
310 // before Close is called.
311 if bw == nil {
312 return
313 }
314 // There is no need to protect this read with a mutex, as Close() is
315 // impossible to be called concurrently with the write in SwitchTo(). The
316 // callsites of Close() for this balancer in Graceful Switch Balancer will
317 // never be called until SwitchTo() returns.
318 bw.Balancer.Close()
319 bw.gsb.mu.Lock()
320 for sc := range bw.subconns {
321 sc.Shutdown()
322 }
323 bw.gsb.mu.Unlock()
324 }
325 326 func (bw *balancerWrapper) UpdateState(state balancer.State) {
327 // Hold the mutex for this entire call to ensure it cannot occur
328 // concurrently with other updateState() calls. This causes updates to
329 // lastState and calls to cc.UpdateState to happen atomically.
330 bw.gsb.mu.Lock()
331 defer bw.gsb.mu.Unlock()
332 bw.lastState = state
333 334 // If Close() acquires the mutex before UpdateState(), the balancer
335 // will already have been removed from the current or pending state when
336 // reaching this point.
337 if !bw.gsb.balancerCurrentOrPending(bw) {
338 // Returning here ensures that (*Balancer).swap() is not invoked after
339 // (*Balancer).Close() and therefore prevents "use after close".
340 return
341 }
342 343 if bw == bw.gsb.balancerCurrent {
344 // In the case that the current balancer exits READY, and there is a pending
345 // balancer, you can forward the pending balancer's cached State up to
346 // ClientConn and swap the pending into the current. This is because there
347 // is no reason to gracefully switch from and keep using the old policy as
348 // the ClientConn is not connected to any backends.
349 if state.ConnectivityState != connectivity.Ready && bw.gsb.balancerPending != nil {
350 bw.gsb.swap()
351 return
352 }
353 // Even if there is a pending balancer waiting to be gracefully switched to,
354 // continue to forward current balancer updates to the Client Conn. Ignoring
355 // state + picker from the current would cause undefined behavior/cause the
356 // system to behave incorrectly from the current LB policies perspective.
357 // Also, the current LB is still being used by grpc to choose SubConns per
358 // RPC, and thus should use the most updated form of the current balancer.
359 bw.gsb.cc.UpdateState(state)
360 return
361 }
362 // This method is now dealing with a state update from the pending balancer.
363 // If the current balancer is currently in a state other than READY, the new
364 // policy can be swapped into place immediately. This is because there is no
365 // reason to gracefully switch from and keep using the old policy as the
366 // ClientConn is not connected to any backends.
367 if state.ConnectivityState != connectivity.Connecting || bw.gsb.balancerCurrent.lastState.ConnectivityState != connectivity.Ready {
368 bw.gsb.swap()
369 }
370 }
371 372 func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
373 bw.gsb.mu.Lock()
374 if !bw.gsb.balancerCurrentOrPending(bw) {
375 bw.gsb.mu.Unlock()
376 return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
377 }
378 bw.gsb.mu.Unlock()
379 380 var sc balancer.SubConn
381 oldListener := opts.StateListener
382 opts.StateListener = func(state balancer.SubConnState) { bw.gsb.updateSubConnState(sc, state, oldListener) }
383 sc, err := bw.gsb.cc.NewSubConn(addrs, opts)
384 if err != nil {
385 return nil, err
386 }
387 bw.gsb.mu.Lock()
388 if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call
389 sc.Shutdown()
390 bw.gsb.mu.Unlock()
391 return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
392 }
393 bw.subconns[sc] = true
394 bw.gsb.mu.Unlock()
395 return sc, nil
396 }
397 398 func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
399 // Ignore ResolveNow requests from anything other than the most recent
400 // balancer, because older balancers were already removed from the config.
401 if bw != bw.gsb.latestBalancer() {
402 return
403 }
404 bw.gsb.cc.ResolveNow(opts)
405 }
406 407 func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
408 // Note: existing third party balancers may call this, so it must remain
409 // until RemoveSubConn is fully removed.
410 sc.Shutdown()
411 }
412 413 func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
414 bw.gsb.mu.Lock()
415 if !bw.gsb.balancerCurrentOrPending(bw) {
416 bw.gsb.mu.Unlock()
417 return
418 }
419 bw.gsb.mu.Unlock()
420 bw.gsb.cc.UpdateAddresses(sc, addrs)
421 }
422