resolver_wrapper.go raw

   1  /*
   2   *
   3   * Copyright 2017 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package grpc
  20  
  21  import (
  22  	"context"
  23  	"strings"
  24  	"sync"
  25  
  26  	"google.golang.org/grpc/internal/channelz"
  27  	"google.golang.org/grpc/internal/grpcsync"
  28  	"google.golang.org/grpc/internal/pretty"
  29  	"google.golang.org/grpc/internal/resolver/delegatingresolver"
  30  	"google.golang.org/grpc/resolver"
  31  	"google.golang.org/grpc/serviceconfig"
  32  )
  33  
  34  // ccResolverWrapper is a wrapper on top of cc for resolvers.
  35  // It implements resolver.ClientConn interface.
  36  type ccResolverWrapper struct {
  37  	// The following fields are initialized when the wrapper is created and are
  38  	// read-only afterwards, and therefore can be accessed without a mutex.
  39  	cc                  *ClientConn
  40  	ignoreServiceConfig bool
  41  	serializer          *grpcsync.CallbackSerializer
  42  	serializerCancel    context.CancelFunc
  43  
  44  	resolver resolver.Resolver // only accessed within the serializer
  45  
  46  	// The following fields are protected by mu.  Caller must take cc.mu before
  47  	// taking mu.
  48  	mu       sync.Mutex
  49  	curState resolver.State
  50  	closed   bool
  51  }
  52  
  53  // newCCResolverWrapper initializes the ccResolverWrapper.  It can only be used
  54  // after calling start, which builds the resolver.
  55  func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper {
  56  	ctx, cancel := context.WithCancel(cc.ctx)
  57  	return &ccResolverWrapper{
  58  		cc:                  cc,
  59  		ignoreServiceConfig: cc.dopts.disableServiceConfig,
  60  		serializer:          grpcsync.NewCallbackSerializer(ctx),
  61  		serializerCancel:    cancel,
  62  	}
  63  }
  64  
  65  // start builds the name resolver using the resolver.Builder in cc and returns
  66  // any error encountered.  It must always be the first operation performed on
  67  // any newly created ccResolverWrapper, except that close may be called instead.
  68  func (ccr *ccResolverWrapper) start() error {
  69  	errCh := make(chan error)
  70  	ccr.serializer.TrySchedule(func(ctx context.Context) {
  71  		if ctx.Err() != nil {
  72  			errCh <- ctx.Err()
  73  			return
  74  		}
  75  		opts := resolver.BuildOptions{
  76  			DisableServiceConfig: ccr.cc.dopts.disableServiceConfig,
  77  			DialCreds:            ccr.cc.dopts.copts.TransportCredentials,
  78  			CredsBundle:          ccr.cc.dopts.copts.CredsBundle,
  79  			Dialer:               ccr.cc.dopts.copts.Dialer,
  80  			Authority:            ccr.cc.authority,
  81  			MetricsRecorder:      ccr.cc.metricsRecorderList,
  82  		}
  83  		var err error
  84  		// The delegating resolver is used unless:
  85  		//   - A custom dialer is provided via WithContextDialer dialoption or
  86  		//   - Proxy usage is disabled through WithNoProxy dialoption.
  87  		// In these cases, the resolver is built based on the scheme of target,
  88  		// using the appropriate resolver builder.
  89  		if ccr.cc.dopts.copts.Dialer != nil || !ccr.cc.dopts.useProxy {
  90  			ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts)
  91  		} else {
  92  			ccr.resolver, err = delegatingresolver.New(ccr.cc.parsedTarget, ccr, opts, ccr.cc.resolverBuilder, ccr.cc.dopts.enableLocalDNSResolution)
  93  		}
  94  		errCh <- err
  95  	})
  96  	return <-errCh
  97  }
  98  
  99  func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
 100  	ccr.serializer.TrySchedule(func(ctx context.Context) {
 101  		if ctx.Err() != nil || ccr.resolver == nil {
 102  			return
 103  		}
 104  		ccr.resolver.ResolveNow(o)
 105  	})
 106  }
 107  
 108  // close initiates async shutdown of the wrapper.  To determine the wrapper has
 109  // finished shutting down, the channel should block on ccr.serializer.Done()
 110  // without cc.mu held.
 111  func (ccr *ccResolverWrapper) close() {
 112  	channelz.Info(logger, ccr.cc.channelz, "Closing the name resolver")
 113  	ccr.mu.Lock()
 114  	ccr.closed = true
 115  	ccr.mu.Unlock()
 116  
 117  	ccr.serializer.TrySchedule(func(context.Context) {
 118  		if ccr.resolver == nil {
 119  			return
 120  		}
 121  		ccr.resolver.Close()
 122  		ccr.resolver = nil
 123  	})
 124  	ccr.serializerCancel()
 125  }
 126  
 127  // UpdateState is called by resolver implementations to report new state to gRPC
 128  // which includes addresses and service config.
 129  func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
 130  	ccr.cc.mu.Lock()
 131  	ccr.mu.Lock()
 132  	if ccr.closed {
 133  		ccr.mu.Unlock()
 134  		ccr.cc.mu.Unlock()
 135  		return nil
 136  	}
 137  	if s.Endpoints == nil {
 138  		s.Endpoints = addressesToEndpoints(s.Addresses)
 139  	}
 140  	ccr.addChannelzTraceEvent(s)
 141  	ccr.curState = s
 142  	ccr.mu.Unlock()
 143  	return ccr.cc.updateResolverStateAndUnlock(s, nil)
 144  }
 145  
 146  // ReportError is called by resolver implementations to report errors
 147  // encountered during name resolution to gRPC.
 148  func (ccr *ccResolverWrapper) ReportError(err error) {
 149  	ccr.cc.mu.Lock()
 150  	ccr.mu.Lock()
 151  	if ccr.closed {
 152  		ccr.mu.Unlock()
 153  		ccr.cc.mu.Unlock()
 154  		return
 155  	}
 156  	ccr.mu.Unlock()
 157  	channelz.Warningf(logger, ccr.cc.channelz, "ccResolverWrapper: reporting error to cc: %v", err)
 158  	ccr.cc.updateResolverStateAndUnlock(resolver.State{}, err)
 159  }
 160  
 161  // NewAddress is called by the resolver implementation to send addresses to
 162  // gRPC.
 163  func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
 164  	ccr.cc.mu.Lock()
 165  	ccr.mu.Lock()
 166  	if ccr.closed {
 167  		ccr.mu.Unlock()
 168  		ccr.cc.mu.Unlock()
 169  		return
 170  	}
 171  	s := resolver.State{
 172  		Addresses:     addrs,
 173  		ServiceConfig: ccr.curState.ServiceConfig,
 174  		Endpoints:     addressesToEndpoints(addrs),
 175  	}
 176  	ccr.addChannelzTraceEvent(s)
 177  	ccr.curState = s
 178  	ccr.mu.Unlock()
 179  	ccr.cc.updateResolverStateAndUnlock(s, nil)
 180  }
 181  
 182  // ParseServiceConfig is called by resolver implementations to parse a JSON
 183  // representation of the service config.
 184  func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
 185  	return parseServiceConfig(scJSON, ccr.cc.dopts.maxCallAttempts)
 186  }
 187  
 188  // addChannelzTraceEvent adds a channelz trace event containing the new
 189  // state received from resolver implementations.
 190  func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
 191  	if !logger.V(0) && !channelz.IsOn() {
 192  		return
 193  	}
 194  	var updates []string
 195  	var oldSC, newSC *ServiceConfig
 196  	var oldOK, newOK bool
 197  	if ccr.curState.ServiceConfig != nil {
 198  		oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
 199  	}
 200  	if s.ServiceConfig != nil {
 201  		newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
 202  	}
 203  	if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
 204  		updates = append(updates, "service config updated")
 205  	}
 206  	if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
 207  		updates = append(updates, "resolver returned an empty address list")
 208  	} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
 209  		updates = append(updates, "resolver returned new addresses")
 210  	}
 211  	channelz.Infof(logger, ccr.cc.channelz, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
 212  }
 213  
 214  func addressesToEndpoints(addrs []resolver.Address) []resolver.Endpoint {
 215  	endpoints := make([]resolver.Endpoint, 0, len(addrs))
 216  	for _, a := range addrs {
 217  		ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
 218  		ep.Addresses[0].BalancerAttributes = nil
 219  		endpoints = append(endpoints, ep)
 220  	}
 221  	return endpoints
 222  }
 223