picker_wrapper.go raw

   1  /*
   2   *
   3   * Copyright 2017 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package grpc
  20  
  21  import (
  22  	"context"
  23  	"fmt"
  24  	"io"
  25  	"sync/atomic"
  26  
  27  	"google.golang.org/grpc/balancer"
  28  	"google.golang.org/grpc/codes"
  29  	"google.golang.org/grpc/internal/channelz"
  30  	istatus "google.golang.org/grpc/internal/status"
  31  	"google.golang.org/grpc/internal/transport"
  32  	"google.golang.org/grpc/status"
  33  )
  34  
  35  // pickerGeneration stores a picker and a channel used to signal that a picker
  36  // newer than this one is available.
  37  type pickerGeneration struct {
  38  	// picker is the picker produced by the LB policy.  May be nil if a picker
  39  	// has never been produced.
  40  	picker balancer.Picker
  41  	// blockingCh is closed when the picker has been invalidated because there
  42  	// is a new one available.
  43  	blockingCh chan struct{}
  44  }
  45  
  46  // pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
  47  // actions and unblock when there's a picker update.
  48  type pickerWrapper struct {
  49  	// If pickerGen holds a nil pointer, the pickerWrapper is closed.
  50  	pickerGen atomic.Pointer[pickerGeneration]
  51  }
  52  
  53  func newPickerWrapper() *pickerWrapper {
  54  	pw := &pickerWrapper{}
  55  	pw.pickerGen.Store(&pickerGeneration{
  56  		blockingCh: make(chan struct{}),
  57  	})
  58  	return pw
  59  }
  60  
  61  // updatePicker is called by UpdateState calls from the LB policy. It
  62  // unblocks all blocked pick.
  63  func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
  64  	old := pw.pickerGen.Swap(&pickerGeneration{
  65  		picker:     p,
  66  		blockingCh: make(chan struct{}),
  67  	})
  68  	close(old.blockingCh)
  69  }
  70  
  71  // doneChannelzWrapper performs the following:
  72  //   - increments the calls started channelz counter
  73  //   - wraps the done function in the passed in result to increment the calls
  74  //     failed or calls succeeded channelz counter before invoking the actual
  75  //     done function.
  76  func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
  77  	ac := acbw.ac
  78  	ac.incrCallsStarted()
  79  	done := result.Done
  80  	result.Done = func(b balancer.DoneInfo) {
  81  		if b.Err != nil && b.Err != io.EOF {
  82  			ac.incrCallsFailed()
  83  		} else {
  84  			ac.incrCallsSucceeded()
  85  		}
  86  		if done != nil {
  87  			done(b)
  88  		}
  89  	}
  90  }
  91  
  92  type pick struct {
  93  	transport transport.ClientTransport // the selected transport
  94  	result    balancer.PickResult       // the contents of the pick from the LB policy
  95  	blocked   bool                      // set if a picker call queued for a new picker
  96  }
  97  
  98  // pick returns the transport that will be used for the RPC.
  99  // It may block in the following cases:
 100  // - there's no picker
 101  // - the current picker returns ErrNoSubConnAvailable
 102  // - the current picker returns other errors and failfast is false.
 103  // - the subConn returned by the current picker is not READY
 104  // When one of these situations happens, pick blocks until the picker gets updated.
 105  func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (pick, error) {
 106  	var ch chan struct{}
 107  
 108  	var lastPickErr error
 109  	pickBlocked := false
 110  
 111  	for {
 112  		pg := pw.pickerGen.Load()
 113  		if pg == nil {
 114  			return pick{}, ErrClientConnClosing
 115  		}
 116  		if pg.picker == nil {
 117  			ch = pg.blockingCh
 118  		}
 119  		if ch == pg.blockingCh {
 120  			// This could happen when either:
 121  			// - pw.picker is nil (the previous if condition), or
 122  			// - we have already called pick on the current picker.
 123  			select {
 124  			case <-ctx.Done():
 125  				var errStr string
 126  				if lastPickErr != nil {
 127  					errStr = "latest balancer error: " + lastPickErr.Error()
 128  				} else {
 129  					errStr = fmt.Sprintf("%v while waiting for connections to become ready", ctx.Err())
 130  				}
 131  				switch ctx.Err() {
 132  				case context.DeadlineExceeded:
 133  					return pick{}, status.Error(codes.DeadlineExceeded, errStr)
 134  				case context.Canceled:
 135  					return pick{}, status.Error(codes.Canceled, errStr)
 136  				}
 137  			case <-ch:
 138  			}
 139  			continue
 140  		}
 141  
 142  		// If the channel is set, it means that the pick call had to wait for a
 143  		// new picker at some point. Either it's the first iteration and this
 144  		// function received the first picker, or a picker errored with
 145  		// ErrNoSubConnAvailable or errored with failfast set to false, which
 146  		// will trigger a continue to the next iteration. In the first case this
 147  		// conditional will hit if this call had to block (the channel is set).
 148  		// In the second case, the only way it will get to this conditional is
 149  		// if there is a new picker.
 150  		if ch != nil {
 151  			pickBlocked = true
 152  		}
 153  
 154  		ch = pg.blockingCh
 155  		p := pg.picker
 156  
 157  		pickResult, err := p.Pick(info)
 158  		if err != nil {
 159  			if err == balancer.ErrNoSubConnAvailable {
 160  				continue
 161  			}
 162  			if st, ok := status.FromError(err); ok {
 163  				// Status error: end the RPC unconditionally with this status.
 164  				// First restrict the code to the list allowed by gRFC A54.
 165  				if istatus.IsRestrictedControlPlaneCode(st) {
 166  					err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
 167  				}
 168  				return pick{}, dropError{error: err}
 169  			}
 170  			// For all other errors, wait for ready RPCs should block and other
 171  			// RPCs should fail with unavailable.
 172  			if !failfast {
 173  				lastPickErr = err
 174  				continue
 175  			}
 176  			return pick{}, status.Error(codes.Unavailable, err.Error())
 177  		}
 178  
 179  		acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
 180  		if !ok {
 181  			logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
 182  			continue
 183  		}
 184  		if t := acbw.ac.getReadyTransport(); t != nil {
 185  			if channelz.IsOn() {
 186  				doneChannelzWrapper(acbw, &pickResult)
 187  			}
 188  			return pick{transport: t, result: pickResult, blocked: pickBlocked}, nil
 189  		}
 190  		if pickResult.Done != nil {
 191  			// Calling done with nil error, no bytes sent and no bytes received.
 192  			// DoneInfo with default value works.
 193  			pickResult.Done(balancer.DoneInfo{})
 194  		}
 195  		logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
 196  		// If ok == false, ac.state is not READY.
 197  		// A valid picker always returns READY subConn. This means the state of ac
 198  		// just changed, and picker will be updated shortly.
 199  		// continue back to the beginning of the for loop to repick.
 200  	}
 201  }
 202  
 203  func (pw *pickerWrapper) close() {
 204  	old := pw.pickerGen.Swap(nil)
 205  	close(old.blockingCh)
 206  }
 207  
 208  // reset clears the pickerWrapper and prepares it for being used again when idle
 209  // mode is exited.
 210  func (pw *pickerWrapper) reset() {
 211  	old := pw.pickerGen.Swap(&pickerGeneration{blockingCh: make(chan struct{})})
 212  	close(old.blockingCh)
 213  }
 214  
 215  // dropError is a wrapper error that indicates the LB policy wishes to drop the
 216  // RPC and not retry it.
 217  type dropError struct {
 218  	error
 219  }
 220