pickfirst.go raw

   1  /*
   2   *
   3   * Copyright 2017 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  // Package pickfirst contains the pick_first load balancing policy which
  20  // is the universal leaf policy.
  21  package pickfirst
  22  
  23  import (
  24  	"encoding/json"
  25  	"errors"
  26  	"fmt"
  27  	"net"
  28  	"net/netip"
  29  	"sync"
  30  	"time"
  31  
  32  	"google.golang.org/grpc/balancer"
  33  	"google.golang.org/grpc/balancer/pickfirst/internal"
  34  	"google.golang.org/grpc/connectivity"
  35  	expstats "google.golang.org/grpc/experimental/stats"
  36  	"google.golang.org/grpc/grpclog"
  37  	internalgrpclog "google.golang.org/grpc/internal/grpclog"
  38  	"google.golang.org/grpc/internal/pretty"
  39  	"google.golang.org/grpc/resolver"
  40  	"google.golang.org/grpc/serviceconfig"
  41  )
  42  
  43  func init() {
  44  	balancer.Register(pickfirstBuilder{})
  45  }
  46  
  47  // Name is the name of the pick_first balancer.
  48  const Name = "pick_first"
  49  
  50  // enableHealthListenerKeyType is a unique key type used in resolver
  51  // attributes to indicate whether the health listener usage is enabled.
  52  type enableHealthListenerKeyType struct{}
  53  
  54  var (
  55  	logger               = grpclog.Component("pick-first-leaf-lb")
  56  	disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
  57  		Name:        "grpc.lb.pick_first.disconnections",
  58  		Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
  59  		Unit:        "{disconnection}",
  60  		Labels:      []string{"grpc.target"},
  61  		Default:     false,
  62  	})
  63  	connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
  64  		Name:        "grpc.lb.pick_first.connection_attempts_succeeded",
  65  		Description: "EXPERIMENTAL. Number of successful connection attempts.",
  66  		Unit:        "{attempt}",
  67  		Labels:      []string{"grpc.target"},
  68  		Default:     false,
  69  	})
  70  	connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
  71  		Name:        "grpc.lb.pick_first.connection_attempts_failed",
  72  		Description: "EXPERIMENTAL. Number of failed connection attempts.",
  73  		Unit:        "{attempt}",
  74  		Labels:      []string{"grpc.target"},
  75  		Default:     false,
  76  	})
  77  )
  78  
  79  const (
  80  	// TODO: change to pick-first when this becomes the default pick_first policy.
  81  	logPrefix = "[pick-first-leaf-lb %p] "
  82  	// connectionDelayInterval is the time to wait for during the happy eyeballs
  83  	// pass before starting the next connection attempt.
  84  	connectionDelayInterval = 250 * time.Millisecond
  85  )
  86  
  87  type ipAddrFamily int
  88  
  89  const (
  90  	// ipAddrFamilyUnknown represents strings that can't be parsed as an IP
  91  	// address.
  92  	ipAddrFamilyUnknown ipAddrFamily = iota
  93  	ipAddrFamilyV4
  94  	ipAddrFamilyV6
  95  )
  96  
  97  type pickfirstBuilder struct{}
  98  
  99  func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions) balancer.Balancer {
 100  	b := &pickfirstBalancer{
 101  		cc:              cc,
 102  		target:          bo.Target.String(),
 103  		metricsRecorder: cc.MetricsRecorder(),
 104  
 105  		subConns:              resolver.NewAddressMapV2[*scData](),
 106  		state:                 connectivity.Connecting,
 107  		cancelConnectionTimer: func() {},
 108  	}
 109  	b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
 110  	return b
 111  }
 112  
 113  func (b pickfirstBuilder) Name() string {
 114  	return Name
 115  }
 116  
 117  func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
 118  	var cfg pfConfig
 119  	if err := json.Unmarshal(js, &cfg); err != nil {
 120  		return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
 121  	}
 122  	return cfg, nil
 123  }
 124  
 125  // EnableHealthListener updates the state to configure pickfirst for using a
 126  // generic health listener.
 127  //
 128  // # Experimental
 129  //
 130  // Notice: This API is EXPERIMENTAL and may be changed or removed in a later
 131  // release.
 132  func EnableHealthListener(state resolver.State) resolver.State {
 133  	state.Attributes = state.Attributes.WithValue(enableHealthListenerKeyType{}, true)
 134  	return state
 135  }
 136  
 137  type pfConfig struct {
 138  	serviceconfig.LoadBalancingConfig `json:"-"`
 139  
 140  	// If set to true, instructs the LB policy to shuffle the order of the list
 141  	// of endpoints received from the name resolver before attempting to
 142  	// connect to them.
 143  	ShuffleAddressList bool `json:"shuffleAddressList"`
 144  }
 145  
 146  // scData keeps track of the current state of the subConn.
 147  // It is not safe for concurrent access.
 148  type scData struct {
 149  	// The following fields are initialized at build time and read-only after
 150  	// that.
 151  	subConn balancer.SubConn
 152  	addr    resolver.Address
 153  
 154  	rawConnectivityState connectivity.State
 155  	// The effective connectivity state based on raw connectivity, health state
 156  	// and after following sticky TransientFailure behaviour defined in A62.
 157  	effectiveState              connectivity.State
 158  	lastErr                     error
 159  	connectionFailedInFirstPass bool
 160  }
 161  
 162  func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
 163  	sd := &scData{
 164  		rawConnectivityState: connectivity.Idle,
 165  		effectiveState:       connectivity.Idle,
 166  		addr:                 addr,
 167  	}
 168  	sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
 169  		StateListener: func(state balancer.SubConnState) {
 170  			b.updateSubConnState(sd, state)
 171  		},
 172  	})
 173  	if err != nil {
 174  		return nil, err
 175  	}
 176  	sd.subConn = sc
 177  	return sd, nil
 178  }
 179  
 180  type pickfirstBalancer struct {
 181  	// The following fields are initialized at build time and read-only after
 182  	// that and therefore do not need to be guarded by a mutex.
 183  	logger          *internalgrpclog.PrefixLogger
 184  	cc              balancer.ClientConn
 185  	target          string
 186  	metricsRecorder expstats.MetricsRecorder // guaranteed to be non nil
 187  
 188  	// The mutex is used to ensure synchronization of updates triggered
 189  	// from the idle picker and the already serialized resolver,
 190  	// SubConn state updates.
 191  	mu sync.Mutex
 192  	// State reported to the channel based on SubConn states and resolver
 193  	// updates.
 194  	state connectivity.State
 195  	// scData for active subonns mapped by address.
 196  	subConns              *resolver.AddressMapV2[*scData]
 197  	addressList           addressList
 198  	firstPass             bool
 199  	numTF                 int
 200  	cancelConnectionTimer func()
 201  	healthCheckingEnabled bool
 202  }
 203  
 204  // ResolverError is called by the ClientConn when the name resolver produces
 205  // an error or when pickfirst determined the resolver update to be invalid.
 206  func (b *pickfirstBalancer) ResolverError(err error) {
 207  	b.mu.Lock()
 208  	defer b.mu.Unlock()
 209  	b.resolverErrorLocked(err)
 210  }
 211  
 212  func (b *pickfirstBalancer) resolverErrorLocked(err error) {
 213  	if b.logger.V(2) {
 214  		b.logger.Infof("Received error from the name resolver: %v", err)
 215  	}
 216  
 217  	// The picker will not change since the balancer does not currently
 218  	// report an error. If the balancer hasn't received a single good resolver
 219  	// update yet, transition to TRANSIENT_FAILURE.
 220  	if b.state != connectivity.TransientFailure && b.addressList.size() > 0 {
 221  		if b.logger.V(2) {
 222  			b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.")
 223  		}
 224  		return
 225  	}
 226  
 227  	b.updateBalancerState(balancer.State{
 228  		ConnectivityState: connectivity.TransientFailure,
 229  		Picker:            &picker{err: fmt.Errorf("name resolver error: %v", err)},
 230  	})
 231  }
 232  
 233  func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
 234  	b.mu.Lock()
 235  	defer b.mu.Unlock()
 236  	b.cancelConnectionTimer()
 237  	if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
 238  		// Cleanup state pertaining to the previous resolver state.
 239  		// Treat an empty address list like an error by calling b.ResolverError.
 240  		b.closeSubConnsLocked()
 241  		b.addressList.updateAddrs(nil)
 242  		b.resolverErrorLocked(errors.New("produced zero addresses"))
 243  		return balancer.ErrBadResolverState
 244  	}
 245  	b.healthCheckingEnabled = state.ResolverState.Attributes.Value(enableHealthListenerKeyType{}) != nil
 246  	cfg, ok := state.BalancerConfig.(pfConfig)
 247  	if state.BalancerConfig != nil && !ok {
 248  		return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState)
 249  	}
 250  
 251  	if b.logger.V(2) {
 252  		b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState))
 253  	}
 254  
 255  	var newAddrs []resolver.Address
 256  	if endpoints := state.ResolverState.Endpoints; len(endpoints) != 0 {
 257  		// Perform the optional shuffling described in gRFC A62. The shuffling
 258  		// will change the order of endpoints but not touch the order of the
 259  		// addresses within each endpoint. - A61
 260  		if cfg.ShuffleAddressList {
 261  			endpoints = append([]resolver.Endpoint{}, endpoints...)
 262  			internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
 263  		}
 264  
 265  		// "Flatten the list by concatenating the ordered list of addresses for
 266  		// each of the endpoints, in order." - A61
 267  		for _, endpoint := range endpoints {
 268  			newAddrs = append(newAddrs, endpoint.Addresses...)
 269  		}
 270  	} else {
 271  		// Endpoints not set, process addresses until we migrate resolver
 272  		// emissions fully to Endpoints. The top channel does wrap emitted
 273  		// addresses with endpoints, however some balancers such as weighted
 274  		// target do not forward the corresponding correct endpoints down/split
 275  		// endpoints properly. Once all balancers correctly forward endpoints
 276  		// down, can delete this else conditional.
 277  		newAddrs = state.ResolverState.Addresses
 278  		if cfg.ShuffleAddressList {
 279  			newAddrs = append([]resolver.Address{}, newAddrs...)
 280  			internal.RandShuffle(len(newAddrs), func(i, j int) { newAddrs[i], newAddrs[j] = newAddrs[j], newAddrs[i] })
 281  		}
 282  	}
 283  
 284  	// If an address appears in multiple endpoints or in the same endpoint
 285  	// multiple times, we keep it only once. We will create only one SubConn
 286  	// for the address because an AddressMap is used to store SubConns.
 287  	// Not de-duplicating would result in attempting to connect to the same
 288  	// SubConn multiple times in the same pass. We don't want this.
 289  	newAddrs = deDupAddresses(newAddrs)
 290  	newAddrs = interleaveAddresses(newAddrs)
 291  
 292  	prevAddr := b.addressList.currentAddress()
 293  	prevSCData, found := b.subConns.Get(prevAddr)
 294  	prevAddrsCount := b.addressList.size()
 295  	isPrevRawConnectivityStateReady := found && prevSCData.rawConnectivityState == connectivity.Ready
 296  	b.addressList.updateAddrs(newAddrs)
 297  
 298  	// If the previous ready SubConn exists in new address list,
 299  	// keep this connection and don't create new SubConns.
 300  	if isPrevRawConnectivityStateReady && b.addressList.seekTo(prevAddr) {
 301  		return nil
 302  	}
 303  
 304  	b.reconcileSubConnsLocked(newAddrs)
 305  	// If it's the first resolver update or the balancer was already READY
 306  	// (but the new address list does not contain the ready SubConn) or
 307  	// CONNECTING, enter CONNECTING.
 308  	// We may be in TRANSIENT_FAILURE due to a previous empty address list,
 309  	// we should still enter CONNECTING because the sticky TF behaviour
 310  	//  mentioned in A62 applies only when the TRANSIENT_FAILURE is reported
 311  	// due to connectivity failures.
 312  	if isPrevRawConnectivityStateReady || b.state == connectivity.Connecting || prevAddrsCount == 0 {
 313  		// Start connection attempt at first address.
 314  		b.forceUpdateConcludedStateLocked(balancer.State{
 315  			ConnectivityState: connectivity.Connecting,
 316  			Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
 317  		})
 318  		b.startFirstPassLocked()
 319  	} else if b.state == connectivity.TransientFailure {
 320  		// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
 321  		// we're READY. See A62.
 322  		b.startFirstPassLocked()
 323  	}
 324  	return nil
 325  }
 326  
 327  // UpdateSubConnState is unused as a StateListener is always registered when
 328  // creating SubConns.
 329  func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
 330  	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
 331  }
 332  
 333  func (b *pickfirstBalancer) Close() {
 334  	b.mu.Lock()
 335  	defer b.mu.Unlock()
 336  	b.closeSubConnsLocked()
 337  	b.cancelConnectionTimer()
 338  	b.state = connectivity.Shutdown
 339  }
 340  
 341  // ExitIdle moves the balancer out of idle state. It can be called concurrently
 342  // by the idlePicker and clientConn so access to variables should be
 343  // synchronized.
 344  func (b *pickfirstBalancer) ExitIdle() {
 345  	b.mu.Lock()
 346  	defer b.mu.Unlock()
 347  	if b.state == connectivity.Idle {
 348  		// Move the balancer into CONNECTING state immediately. This is done to
 349  		// avoid staying in IDLE if a resolver update arrives before the first
 350  		// SubConn reports CONNECTING.
 351  		b.updateBalancerState(balancer.State{
 352  			ConnectivityState: connectivity.Connecting,
 353  			Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
 354  		})
 355  		b.startFirstPassLocked()
 356  	}
 357  }
 358  
 359  func (b *pickfirstBalancer) startFirstPassLocked() {
 360  	b.firstPass = true
 361  	b.numTF = 0
 362  	// Reset the connection attempt record for existing SubConns.
 363  	for _, sd := range b.subConns.Values() {
 364  		sd.connectionFailedInFirstPass = false
 365  	}
 366  	b.requestConnectionLocked()
 367  }
 368  
 369  func (b *pickfirstBalancer) closeSubConnsLocked() {
 370  	for _, sd := range b.subConns.Values() {
 371  		sd.subConn.Shutdown()
 372  	}
 373  	b.subConns = resolver.NewAddressMapV2[*scData]()
 374  }
 375  
 376  // deDupAddresses ensures that each address appears only once in the slice.
 377  func deDupAddresses(addrs []resolver.Address) []resolver.Address {
 378  	seenAddrs := resolver.NewAddressMapV2[bool]()
 379  	retAddrs := []resolver.Address{}
 380  
 381  	for _, addr := range addrs {
 382  		if _, ok := seenAddrs.Get(addr); ok {
 383  			continue
 384  		}
 385  		seenAddrs.Set(addr, true)
 386  		retAddrs = append(retAddrs, addr)
 387  	}
 388  	return retAddrs
 389  }
 390  
 391  // interleaveAddresses interleaves addresses of both families (IPv4 and IPv6)
 392  // as per RFC-8305 section 4.
 393  // Whichever address family is first in the list is followed by an address of
 394  // the other address family; that is, if the first address in the list is IPv6,
 395  // then the first IPv4 address should be moved up in the list to be second in
 396  // the list. It doesn't support configuring "First Address Family Count", i.e.
 397  // there will always be a single member of the first address family at the
 398  // beginning of the interleaved list.
 399  // Addresses that are neither IPv4 nor IPv6 are treated as part of a third
 400  // "unknown" family for interleaving.
 401  // See: https://datatracker.ietf.org/doc/html/rfc8305#autoid-6
 402  func interleaveAddresses(addrs []resolver.Address) []resolver.Address {
 403  	familyAddrsMap := map[ipAddrFamily][]resolver.Address{}
 404  	interleavingOrder := []ipAddrFamily{}
 405  	for _, addr := range addrs {
 406  		family := addressFamily(addr.Addr)
 407  		if _, found := familyAddrsMap[family]; !found {
 408  			interleavingOrder = append(interleavingOrder, family)
 409  		}
 410  		familyAddrsMap[family] = append(familyAddrsMap[family], addr)
 411  	}
 412  
 413  	interleavedAddrs := make([]resolver.Address, 0, len(addrs))
 414  
 415  	for curFamilyIdx := 0; len(interleavedAddrs) < len(addrs); curFamilyIdx = (curFamilyIdx + 1) % len(interleavingOrder) {
 416  		// Some IP types may have fewer addresses than others, so we look for
 417  		// the next type that has a remaining member to add to the interleaved
 418  		// list.
 419  		family := interleavingOrder[curFamilyIdx]
 420  		remainingMembers := familyAddrsMap[family]
 421  		if len(remainingMembers) > 0 {
 422  			interleavedAddrs = append(interleavedAddrs, remainingMembers[0])
 423  			familyAddrsMap[family] = remainingMembers[1:]
 424  		}
 425  	}
 426  
 427  	return interleavedAddrs
 428  }
 429  
 430  // addressFamily returns the ipAddrFamily after parsing the address string.
 431  // If the address isn't of the format "ip-address:port", it returns
 432  // ipAddrFamilyUnknown. The address may be valid even if it's not an IP when
 433  // using a resolver like passthrough where the address may be a hostname in
 434  // some format that the dialer can resolve.
 435  func addressFamily(address string) ipAddrFamily {
 436  	// Parse the IP after removing the port.
 437  	host, _, err := net.SplitHostPort(address)
 438  	if err != nil {
 439  		return ipAddrFamilyUnknown
 440  	}
 441  	ip, err := netip.ParseAddr(host)
 442  	if err != nil {
 443  		return ipAddrFamilyUnknown
 444  	}
 445  	switch {
 446  	case ip.Is4() || ip.Is4In6():
 447  		return ipAddrFamilyV4
 448  	case ip.Is6():
 449  		return ipAddrFamilyV6
 450  	default:
 451  		return ipAddrFamilyUnknown
 452  	}
 453  }
 454  
 455  // reconcileSubConnsLocked updates the active subchannels based on a new address
 456  // list from the resolver. It does this by:
 457  //   - closing subchannels: any existing subchannels associated with addresses
 458  //     that are no longer in the updated list are shut down.
 459  //   - removing subchannels: entries for these closed subchannels are removed
 460  //     from the subchannel map.
 461  //
 462  // This ensures that the subchannel map accurately reflects the current set of
 463  // addresses received from the name resolver.
 464  func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address) {
 465  	newAddrsMap := resolver.NewAddressMapV2[bool]()
 466  	for _, addr := range newAddrs {
 467  		newAddrsMap.Set(addr, true)
 468  	}
 469  
 470  	for _, oldAddr := range b.subConns.Keys() {
 471  		if _, ok := newAddrsMap.Get(oldAddr); ok {
 472  			continue
 473  		}
 474  		val, _ := b.subConns.Get(oldAddr)
 475  		val.subConn.Shutdown()
 476  		b.subConns.Delete(oldAddr)
 477  	}
 478  }
 479  
 480  // shutdownRemainingLocked shuts down remaining subConns. Called when a subConn
 481  // becomes ready, which means that all other subConn must be shutdown.
 482  func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) {
 483  	b.cancelConnectionTimer()
 484  	for _, sd := range b.subConns.Values() {
 485  		if sd.subConn != selected.subConn {
 486  			sd.subConn.Shutdown()
 487  		}
 488  	}
 489  	b.subConns = resolver.NewAddressMapV2[*scData]()
 490  	b.subConns.Set(selected.addr, selected)
 491  }
 492  
 493  // requestConnectionLocked starts connecting on the subchannel corresponding to
 494  // the current address. If no subchannel exists, one is created. If the current
 495  // subchannel is in TransientFailure, a connection to the next address is
 496  // attempted until a subchannel is found.
 497  func (b *pickfirstBalancer) requestConnectionLocked() {
 498  	if !b.addressList.isValid() {
 499  		return
 500  	}
 501  	var lastErr error
 502  	for valid := true; valid; valid = b.addressList.increment() {
 503  		curAddr := b.addressList.currentAddress()
 504  		sd, ok := b.subConns.Get(curAddr)
 505  		if !ok {
 506  			var err error
 507  			// We want to assign the new scData to sd from the outer scope,
 508  			// hence we can't use := below.
 509  			sd, err = b.newSCData(curAddr)
 510  			if err != nil {
 511  				// This should never happen, unless the clientConn is being shut
 512  				// down.
 513  				if b.logger.V(2) {
 514  					b.logger.Infof("Failed to create a subConn for address %v: %v", curAddr.String(), err)
 515  				}
 516  				// Do nothing, the LB policy will be closed soon.
 517  				return
 518  			}
 519  			b.subConns.Set(curAddr, sd)
 520  		}
 521  
 522  		switch sd.rawConnectivityState {
 523  		case connectivity.Idle:
 524  			sd.subConn.Connect()
 525  			b.scheduleNextConnectionLocked()
 526  			return
 527  		case connectivity.TransientFailure:
 528  			// The SubConn is being re-used and failed during a previous pass
 529  			// over the addressList. It has not completed backoff yet.
 530  			// Mark it as having failed and try the next address.
 531  			sd.connectionFailedInFirstPass = true
 532  			lastErr = sd.lastErr
 533  			continue
 534  		case connectivity.Connecting:
 535  			// Wait for the connection attempt to complete or the timer to fire
 536  			// before attempting the next address.
 537  			b.scheduleNextConnectionLocked()
 538  			return
 539  		default:
 540  			b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", sd.rawConnectivityState)
 541  			return
 542  
 543  		}
 544  	}
 545  
 546  	// All the remaining addresses in the list are in TRANSIENT_FAILURE, end the
 547  	// first pass if possible.
 548  	b.endFirstPassIfPossibleLocked(lastErr)
 549  }
 550  
 551  func (b *pickfirstBalancer) scheduleNextConnectionLocked() {
 552  	b.cancelConnectionTimer()
 553  	if !b.addressList.hasNext() {
 554  		return
 555  	}
 556  	curAddr := b.addressList.currentAddress()
 557  	cancelled := false // Access to this is protected by the balancer's mutex.
 558  	closeFn := internal.TimeAfterFunc(connectionDelayInterval, func() {
 559  		b.mu.Lock()
 560  		defer b.mu.Unlock()
 561  		// If the scheduled task is cancelled while acquiring the mutex, return.
 562  		if cancelled {
 563  			return
 564  		}
 565  		if b.logger.V(2) {
 566  			b.logger.Infof("Happy Eyeballs timer expired while waiting for connection to %q.", curAddr.Addr)
 567  		}
 568  		if b.addressList.increment() {
 569  			b.requestConnectionLocked()
 570  		}
 571  	})
 572  	// Access to the cancellation callback held by the balancer is guarded by
 573  	// the balancer's mutex, so it's safe to set the boolean from the callback.
 574  	b.cancelConnectionTimer = sync.OnceFunc(func() {
 575  		cancelled = true
 576  		closeFn()
 577  	})
 578  }
 579  
 580  func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
 581  	b.mu.Lock()
 582  	defer b.mu.Unlock()
 583  	oldState := sd.rawConnectivityState
 584  	sd.rawConnectivityState = newState.ConnectivityState
 585  	// Previously relevant SubConns can still callback with state updates.
 586  	// To prevent pickers from returning these obsolete SubConns, this logic
 587  	// is included to check if the current list of active SubConns includes this
 588  	// SubConn.
 589  	if !b.isActiveSCData(sd) {
 590  		return
 591  	}
 592  	if newState.ConnectivityState == connectivity.Shutdown {
 593  		sd.effectiveState = connectivity.Shutdown
 594  		return
 595  	}
 596  
 597  	// Record a connection attempt when exiting CONNECTING.
 598  	if newState.ConnectivityState == connectivity.TransientFailure {
 599  		sd.connectionFailedInFirstPass = true
 600  		connectionAttemptsFailedMetric.Record(b.metricsRecorder, 1, b.target)
 601  	}
 602  
 603  	if newState.ConnectivityState == connectivity.Ready {
 604  		connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target)
 605  		b.shutdownRemainingLocked(sd)
 606  		if !b.addressList.seekTo(sd.addr) {
 607  			// This should not fail as we should have only one SubConn after
 608  			// entering READY. The SubConn should be present in the addressList.
 609  			b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
 610  			return
 611  		}
 612  		if !b.healthCheckingEnabled {
 613  			if b.logger.V(2) {
 614  				b.logger.Infof("SubConn %p reported connectivity state READY and the health listener is disabled. Transitioning SubConn to READY.", sd.subConn)
 615  			}
 616  
 617  			sd.effectiveState = connectivity.Ready
 618  			b.updateBalancerState(balancer.State{
 619  				ConnectivityState: connectivity.Ready,
 620  				Picker:            &picker{result: balancer.PickResult{SubConn: sd.subConn}},
 621  			})
 622  			return
 623  		}
 624  		if b.logger.V(2) {
 625  			b.logger.Infof("SubConn %p reported connectivity state READY. Registering health listener.", sd.subConn)
 626  		}
 627  		// Send a CONNECTING update to take the SubConn out of sticky-TF if
 628  		// required.
 629  		sd.effectiveState = connectivity.Connecting
 630  		b.updateBalancerState(balancer.State{
 631  			ConnectivityState: connectivity.Connecting,
 632  			Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
 633  		})
 634  		sd.subConn.RegisterHealthListener(func(scs balancer.SubConnState) {
 635  			b.updateSubConnHealthState(sd, scs)
 636  		})
 637  		return
 638  	}
 639  
 640  	// If the LB policy is READY, and it receives a subchannel state change,
 641  	// it means that the READY subchannel has failed.
 642  	// A SubConn can also transition from CONNECTING directly to IDLE when
 643  	// a transport is successfully created, but the connection fails
 644  	// before the SubConn can send the notification for READY. We treat
 645  	// this as a successful connection and transition to IDLE.
 646  	// TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second
 647  	// part of the if condition below once the issue is fixed.
 648  	if oldState == connectivity.Ready || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
 649  		// Once a transport fails, the balancer enters IDLE and starts from
 650  		// the first address when the picker is used.
 651  		b.shutdownRemainingLocked(sd)
 652  		sd.effectiveState = newState.ConnectivityState
 653  		// READY SubConn interspliced in between CONNECTING and IDLE, need to
 654  		// account for that.
 655  		if oldState == connectivity.Connecting {
 656  			// A known issue (https://github.com/grpc/grpc-go/issues/7862)
 657  			// causes a race that prevents the READY state change notification.
 658  			// This works around it.
 659  			connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target)
 660  		}
 661  		disconnectionsMetric.Record(b.metricsRecorder, 1, b.target)
 662  		b.addressList.reset()
 663  		b.updateBalancerState(balancer.State{
 664  			ConnectivityState: connectivity.Idle,
 665  			Picker:            &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)},
 666  		})
 667  		return
 668  	}
 669  
 670  	if b.firstPass {
 671  		switch newState.ConnectivityState {
 672  		case connectivity.Connecting:
 673  			// The effective state can be in either IDLE, CONNECTING or
 674  			// TRANSIENT_FAILURE. If it's  TRANSIENT_FAILURE, stay in
 675  			// TRANSIENT_FAILURE until it's READY. See A62.
 676  			if sd.effectiveState != connectivity.TransientFailure {
 677  				sd.effectiveState = connectivity.Connecting
 678  				b.updateBalancerState(balancer.State{
 679  					ConnectivityState: connectivity.Connecting,
 680  					Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
 681  				})
 682  			}
 683  		case connectivity.TransientFailure:
 684  			sd.lastErr = newState.ConnectionError
 685  			sd.effectiveState = connectivity.TransientFailure
 686  			// Since we're re-using common SubConns while handling resolver
 687  			// updates, we could receive an out of turn TRANSIENT_FAILURE from
 688  			// a pass over the previous address list. Happy Eyeballs will also
 689  			// cause out of order updates to arrive.
 690  
 691  			if curAddr := b.addressList.currentAddress(); equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
 692  				b.cancelConnectionTimer()
 693  				if b.addressList.increment() {
 694  					b.requestConnectionLocked()
 695  					return
 696  				}
 697  			}
 698  
 699  			// End the first pass if we've seen a TRANSIENT_FAILURE from all
 700  			// SubConns once.
 701  			b.endFirstPassIfPossibleLocked(newState.ConnectionError)
 702  		}
 703  		return
 704  	}
 705  
 706  	// We have finished the first pass, keep re-connecting failing SubConns.
 707  	switch newState.ConnectivityState {
 708  	case connectivity.TransientFailure:
 709  		b.numTF = (b.numTF + 1) % b.subConns.Len()
 710  		sd.lastErr = newState.ConnectionError
 711  		if b.numTF%b.subConns.Len() == 0 {
 712  			b.updateBalancerState(balancer.State{
 713  				ConnectivityState: connectivity.TransientFailure,
 714  				Picker:            &picker{err: newState.ConnectionError},
 715  			})
 716  		}
 717  		// We don't need to request re-resolution since the SubConn already
 718  		// does that before reporting TRANSIENT_FAILURE.
 719  		// TODO: #7534 - Move re-resolution requests from SubConn into
 720  		// pick_first.
 721  	case connectivity.Idle:
 722  		sd.subConn.Connect()
 723  	}
 724  }
 725  
 726  // endFirstPassIfPossibleLocked ends the first happy-eyeballs pass if all the
 727  // addresses are tried and their SubConns have reported a failure.
 728  func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
 729  	// An optimization to avoid iterating over the entire SubConn map.
 730  	if b.addressList.isValid() {
 731  		return
 732  	}
 733  	// Connect() has been called on all the SubConns. The first pass can be
 734  	// ended if all the SubConns have reported a failure.
 735  	for _, sd := range b.subConns.Values() {
 736  		if !sd.connectionFailedInFirstPass {
 737  			return
 738  		}
 739  	}
 740  	b.firstPass = false
 741  	b.updateBalancerState(balancer.State{
 742  		ConnectivityState: connectivity.TransientFailure,
 743  		Picker:            &picker{err: lastErr},
 744  	})
 745  	// Start re-connecting all the SubConns that are already in IDLE.
 746  	for _, sd := range b.subConns.Values() {
 747  		if sd.rawConnectivityState == connectivity.Idle {
 748  			sd.subConn.Connect()
 749  		}
 750  	}
 751  }
 752  
 753  func (b *pickfirstBalancer) isActiveSCData(sd *scData) bool {
 754  	activeSD, found := b.subConns.Get(sd.addr)
 755  	return found && activeSD == sd
 756  }
 757  
 758  func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.SubConnState) {
 759  	b.mu.Lock()
 760  	defer b.mu.Unlock()
 761  	// Previously relevant SubConns can still callback with state updates.
 762  	// To prevent pickers from returning these obsolete SubConns, this logic
 763  	// is included to check if the current list of active SubConns includes
 764  	// this SubConn.
 765  	if !b.isActiveSCData(sd) {
 766  		return
 767  	}
 768  	sd.effectiveState = state.ConnectivityState
 769  	switch state.ConnectivityState {
 770  	case connectivity.Ready:
 771  		b.updateBalancerState(balancer.State{
 772  			ConnectivityState: connectivity.Ready,
 773  			Picker:            &picker{result: balancer.PickResult{SubConn: sd.subConn}},
 774  		})
 775  	case connectivity.TransientFailure:
 776  		b.updateBalancerState(balancer.State{
 777  			ConnectivityState: connectivity.TransientFailure,
 778  			Picker:            &picker{err: fmt.Errorf("pickfirst: health check failure: %v", state.ConnectionError)},
 779  		})
 780  	case connectivity.Connecting:
 781  		b.updateBalancerState(balancer.State{
 782  			ConnectivityState: connectivity.Connecting,
 783  			Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
 784  		})
 785  	default:
 786  		b.logger.Errorf("Got unexpected health update for SubConn %p: %v", state)
 787  	}
 788  }
 789  
 790  // updateBalancerState stores the state reported to the channel and calls
 791  // ClientConn.UpdateState(). As an optimization, it avoids sending duplicate
 792  // updates to the channel.
 793  func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) {
 794  	// In case of TransientFailures allow the picker to be updated to update
 795  	// the connectivity error, in all other cases don't send duplicate state
 796  	// updates.
 797  	if newState.ConnectivityState == b.state && b.state != connectivity.TransientFailure {
 798  		return
 799  	}
 800  	b.forceUpdateConcludedStateLocked(newState)
 801  }
 802  
 803  // forceUpdateConcludedStateLocked stores the state reported to the channel and
 804  // calls ClientConn.UpdateState().
 805  // A separate function is defined to force update the ClientConn state since the
 806  // channel doesn't correctly assume that LB policies start in CONNECTING and
 807  // relies on LB policy to send an initial CONNECTING update.
 808  func (b *pickfirstBalancer) forceUpdateConcludedStateLocked(newState balancer.State) {
 809  	b.state = newState.ConnectivityState
 810  	b.cc.UpdateState(newState)
 811  }
 812  
 813  type picker struct {
 814  	result balancer.PickResult
 815  	err    error
 816  }
 817  
 818  func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
 819  	return p.result, p.err
 820  }
 821  
 822  // idlePicker is used when the SubConn is IDLE and kicks the SubConn into
 823  // CONNECTING when Pick is called.
 824  type idlePicker struct {
 825  	exitIdle func()
 826  }
 827  
 828  func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
 829  	i.exitIdle()
 830  	return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
 831  }
 832  
 833  // addressList manages sequentially iterating over addresses present in a list
 834  // of endpoints. It provides a 1 dimensional view of the addresses present in
 835  // the endpoints.
 836  // This type is not safe for concurrent access.
 837  type addressList struct {
 838  	addresses []resolver.Address
 839  	idx       int
 840  }
 841  
 842  func (al *addressList) isValid() bool {
 843  	return al.idx < len(al.addresses)
 844  }
 845  
 846  func (al *addressList) size() int {
 847  	return len(al.addresses)
 848  }
 849  
 850  // increment moves to the next index in the address list.
 851  // This method returns false if it went off the list, true otherwise.
 852  func (al *addressList) increment() bool {
 853  	if !al.isValid() {
 854  		return false
 855  	}
 856  	al.idx++
 857  	return al.idx < len(al.addresses)
 858  }
 859  
 860  // currentAddress returns the current address pointed to in the addressList.
 861  // If the list is in an invalid state, it returns an empty address instead.
 862  func (al *addressList) currentAddress() resolver.Address {
 863  	if !al.isValid() {
 864  		return resolver.Address{}
 865  	}
 866  	return al.addresses[al.idx]
 867  }
 868  
 869  func (al *addressList) reset() {
 870  	al.idx = 0
 871  }
 872  
 873  func (al *addressList) updateAddrs(addrs []resolver.Address) {
 874  	al.addresses = addrs
 875  	al.reset()
 876  }
 877  
 878  // seekTo returns false if the needle was not found and the current index was
 879  // left unchanged.
 880  func (al *addressList) seekTo(needle resolver.Address) bool {
 881  	for ai, addr := range al.addresses {
 882  		if !equalAddressIgnoringBalAttributes(&addr, &needle) {
 883  			continue
 884  		}
 885  		al.idx = ai
 886  		return true
 887  	}
 888  	return false
 889  }
 890  
 891  // hasNext returns whether incrementing the addressList will result in moving
 892  // past the end of the list. If the list has already moved past the end, it
 893  // returns false.
 894  func (al *addressList) hasNext() bool {
 895  	if !al.isValid() {
 896  		return false
 897  	}
 898  	return al.idx+1 < len(al.addresses)
 899  }
 900  
 901  // equalAddressIgnoringBalAttributes returns true is a and b are considered
 902  // equal. This is different from the Equal method on the resolver.Address type
 903  // which considers all fields to determine equality. Here, we only consider
 904  // fields that are meaningful to the SubConn.
 905  func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
 906  	return a.Addr == b.Addr && a.ServerName == b.ServerName &&
 907  		a.Attributes.Equal(b.Attributes)
 908  }
 909