delegatingresolver.go raw

   1  /*
   2   *
   3   * Copyright 2024 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  // Package delegatingresolver implements a resolver capable of resolving both
  20  // target URIs and proxy addresses.
  21  package delegatingresolver
  22  
  23  import (
  24  	"fmt"
  25  	"net"
  26  	"net/http"
  27  	"net/url"
  28  	"sync"
  29  
  30  	"google.golang.org/grpc/grpclog"
  31  	"google.golang.org/grpc/internal/envconfig"
  32  	"google.golang.org/grpc/internal/proxyattributes"
  33  	"google.golang.org/grpc/internal/transport"
  34  	"google.golang.org/grpc/internal/transport/networktype"
  35  	"google.golang.org/grpc/resolver"
  36  	"google.golang.org/grpc/serviceconfig"
  37  )
  38  
  39  var (
  40  	logger = grpclog.Component("delegating-resolver")
  41  	// HTTPSProxyFromEnvironment will be overwritten in the tests
  42  	HTTPSProxyFromEnvironment = http.ProxyFromEnvironment
  43  )
  44  
  45  const defaultPort = "443"
  46  
  47  // delegatingResolver manages both target URI and proxy address resolution by
  48  // delegating these tasks to separate child resolvers. Essentially, it acts as
  49  // an intermediary between the gRPC ClientConn and the child resolvers.
  50  //
  51  // It implements the [resolver.Resolver] interface.
  52  type delegatingResolver struct {
  53  	target   resolver.Target     // parsed target URI to be resolved
  54  	cc       resolver.ClientConn // gRPC ClientConn
  55  	proxyURL *url.URL            // proxy URL, derived from proxy environment and target
  56  
  57  	// We do not hold both mu and childMu in the same goroutine. Avoid holding
  58  	// both locks when calling into the child, as the child resolver may
  59  	// synchronously callback into the channel.
  60  	mu                  sync.Mutex         // protects all the fields below
  61  	targetResolverState *resolver.State    // state of the target resolver
  62  	proxyAddrs          []resolver.Address // resolved proxy addresses; empty if no proxy is configured
  63  
  64  	// childMu serializes calls into child resolvers. It also protects access to
  65  	// the following fields.
  66  	childMu        sync.Mutex
  67  	targetResolver resolver.Resolver // resolver for the target URI, based on its scheme
  68  	proxyResolver  resolver.Resolver // resolver for the proxy URI; nil if no proxy is configured
  69  }
  70  
  71  // nopResolver is a resolver that does nothing.
  72  type nopResolver struct{}
  73  
  74  func (nopResolver) ResolveNow(resolver.ResolveNowOptions) {}
  75  
  76  func (nopResolver) Close() {}
  77  
  78  // proxyURLForTarget determines the proxy URL for the given address based on the
  79  // environment. It can return the following:
  80  //   - nil URL, nil error: No proxy is configured or the address is excluded
  81  //     using the `NO_PROXY` environment variable or if req.URL.Host is
  82  //     "localhost" (with or without // a port number)
  83  //   - nil URL, non-nil error: An error occurred while retrieving the proxy URL.
  84  //   - non-nil URL, nil error: A proxy is configured, and the proxy URL was
  85  //     retrieved successfully without any errors.
  86  func proxyURLForTarget(address string) (*url.URL, error) {
  87  	req := &http.Request{URL: &url.URL{
  88  		Scheme: "https",
  89  		Host:   address,
  90  	}}
  91  	return HTTPSProxyFromEnvironment(req)
  92  }
  93  
  94  // New creates a new delegating resolver that can create up to two child
  95  // resolvers:
  96  //   - one to resolve the proxy address specified using the supported
  97  //     environment variables. This uses the registered resolver for the "dns"
  98  //     scheme. It is lazily built when a target resolver update contains at least
  99  //     one TCP address.
 100  //   - one to resolve the target URI using the resolver specified by the scheme
 101  //     in the target URI or specified by the user using the WithResolvers dial
 102  //     option. As a special case, if the target URI's scheme is "dns" and a
 103  //     proxy is specified using the supported environment variables, the target
 104  //     URI's path portion is used as the resolved address unless target
 105  //     resolution is enabled using the dial option.
 106  func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions, targetResolverBuilder resolver.Builder, targetResolutionEnabled bool) (resolver.Resolver, error) {
 107  	r := &delegatingResolver{
 108  		target:         target,
 109  		cc:             cc,
 110  		proxyResolver:  nopResolver{},
 111  		targetResolver: nopResolver{},
 112  	}
 113  
 114  	addr := target.Endpoint()
 115  	var err error
 116  	if target.URL.Scheme == "dns" && !targetResolutionEnabled && envconfig.EnableDefaultPortForProxyTarget {
 117  		addr, err = parseTarget(addr)
 118  		if err != nil {
 119  			return nil, fmt.Errorf("delegating_resolver: invalid target address %q: %v", target.Endpoint(), err)
 120  		}
 121  	}
 122  
 123  	r.proxyURL, err = proxyURLForTarget(addr)
 124  	if err != nil {
 125  		return nil, fmt.Errorf("delegating_resolver: failed to determine proxy URL for target %q: %v", target, err)
 126  	}
 127  
 128  	// proxy is not configured or proxy address excluded using `NO_PROXY` env
 129  	// var, so only target resolver is used.
 130  	if r.proxyURL == nil {
 131  		return targetResolverBuilder.Build(target, cc, opts)
 132  	}
 133  
 134  	if logger.V(2) {
 135  		logger.Infof("Proxy URL detected : %s", r.proxyURL)
 136  	}
 137  
 138  	// Resolver updates from one child may trigger calls into the other. Block
 139  	// updates until the children are initialized.
 140  	r.childMu.Lock()
 141  	defer r.childMu.Unlock()
 142  	// When the scheme is 'dns' and target resolution on client is not enabled,
 143  	// resolution should be handled by the proxy, not the client. Therefore, we
 144  	// bypass the target resolver and store the unresolved target address.
 145  	if target.URL.Scheme == "dns" && !targetResolutionEnabled {
 146  		r.targetResolverState = &resolver.State{
 147  			Addresses: []resolver.Address{{Addr: addr}},
 148  			Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: addr}}}},
 149  		}
 150  		r.updateTargetResolverState(*r.targetResolverState)
 151  		return r, nil
 152  	}
 153  	wcc := &wrappingClientConn{
 154  		stateListener: r.updateTargetResolverState,
 155  		parent:        r,
 156  	}
 157  	if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil {
 158  		return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err)
 159  	}
 160  	return r, nil
 161  }
 162  
 163  // proxyURIResolver creates a resolver for resolving proxy URIs using the "dns"
 164  // scheme. It adjusts the proxyURL to conform to the "dns:///" format and builds
 165  // a resolver with a wrappingClientConn to capture resolved addresses.
 166  func (r *delegatingResolver) proxyURIResolver(opts resolver.BuildOptions) (resolver.Resolver, error) {
 167  	proxyBuilder := resolver.Get("dns")
 168  	if proxyBuilder == nil {
 169  		panic("delegating_resolver: resolver for proxy not found for scheme dns")
 170  	}
 171  	url := *r.proxyURL
 172  	url.Scheme = "dns"
 173  	url.Path = "/" + r.proxyURL.Host
 174  	url.Host = "" // Clear the Host field to conform to the "dns:///" format
 175  
 176  	proxyTarget := resolver.Target{URL: url}
 177  	wcc := &wrappingClientConn{
 178  		stateListener: r.updateProxyResolverState,
 179  		parent:        r,
 180  	}
 181  	return proxyBuilder.Build(proxyTarget, wcc, opts)
 182  }
 183  
 184  func (r *delegatingResolver) ResolveNow(o resolver.ResolveNowOptions) {
 185  	r.childMu.Lock()
 186  	defer r.childMu.Unlock()
 187  	r.targetResolver.ResolveNow(o)
 188  	r.proxyResolver.ResolveNow(o)
 189  }
 190  
 191  func (r *delegatingResolver) Close() {
 192  	r.childMu.Lock()
 193  	defer r.childMu.Unlock()
 194  	r.targetResolver.Close()
 195  	r.targetResolver = nil
 196  
 197  	r.proxyResolver.Close()
 198  	r.proxyResolver = nil
 199  }
 200  
 201  func needsProxyResolver(state *resolver.State) bool {
 202  	for _, addr := range state.Addresses {
 203  		if !skipProxy(addr) {
 204  			return true
 205  		}
 206  	}
 207  	for _, endpoint := range state.Endpoints {
 208  		for _, addr := range endpoint.Addresses {
 209  			if !skipProxy(addr) {
 210  				return true
 211  			}
 212  		}
 213  	}
 214  	return false
 215  }
 216  
 217  // parseTarget takes a target string and ensures it is a valid "host:port" target.
 218  //
 219  // It does the following:
 220  //  1. If the target already has a port (e.g., "host:port", "[ipv6]:port"),
 221  //     it is returned as is.
 222  //  2. If the host part is empty (e.g., ":80"), it defaults to "localhost",
 223  //     returning "localhost:80".
 224  //  3. If the target is missing a port (e.g., "host", "ipv6"), the defaultPort
 225  //     is added.
 226  //
 227  // An error is returned for empty targets or targets with a trailing colon
 228  // but no port (e.g., "host:").
 229  func parseTarget(target string) (string, error) {
 230  	if target == "" {
 231  		return "", fmt.Errorf("missing address")
 232  	}
 233  
 234  	host, port, err := net.SplitHostPort(target)
 235  	if err != nil {
 236  		// If SplitHostPort fails, it's likely because the port is missing.
 237  		// We append the default port and return the result.
 238  		return net.JoinHostPort(target, defaultPort), nil
 239  	}
 240  
 241  	// If SplitHostPort succeeds, we check for edge cases.
 242  	if port == "" {
 243  		// A success with an empty port means the target had a trailing colon,
 244  		// e.g., "host:", which is an error.
 245  		return "", fmt.Errorf("missing port after port-separator colon")
 246  	}
 247  	if host == "" {
 248  		// A success with an empty host means the target was like ":80".
 249  		// We default the host to "localhost".
 250  		host = "localhost"
 251  	}
 252  	return net.JoinHostPort(host, port), nil
 253  }
 254  
 255  func skipProxy(address resolver.Address) bool {
 256  	// Avoid proxy when network is not tcp.
 257  	networkType, ok := networktype.Get(address)
 258  	if !ok {
 259  		networkType, _ = transport.ParseDialTarget(address.Addr)
 260  	}
 261  	if networkType != "tcp" {
 262  		return true
 263  	}
 264  
 265  	req := &http.Request{URL: &url.URL{
 266  		Scheme: "https",
 267  		Host:   address.Addr,
 268  	}}
 269  	// Avoid proxy when address included in `NO_PROXY` environment variable or
 270  	// fails to get the proxy address.
 271  	url, err := HTTPSProxyFromEnvironment(req)
 272  	if err != nil || url == nil {
 273  		return true
 274  	}
 275  	return false
 276  }
 277  
 278  // updateClientConnStateLocked constructs a combined list of addresses by
 279  // pairing each proxy address with every target address of type TCP. For each
 280  // pair, it creates a new [resolver.Address] using the proxy address and
 281  // attaches the corresponding target address and user info as attributes. Target
 282  // addresses that are not of type TCP are appended to the list as-is. The
 283  // function returns nil if either resolver has not yet provided an update, and
 284  // returns the result of ClientConn.UpdateState once both resolvers have
 285  // provided at least one update.
 286  func (r *delegatingResolver) updateClientConnStateLocked() error {
 287  	if r.targetResolverState == nil || r.proxyAddrs == nil {
 288  		return nil
 289  	}
 290  
 291  	// If multiple resolved proxy addresses are present, we send only the
 292  	// unresolved proxy host and let net.Dial handle the proxy host name
 293  	// resolution when creating the transport. Sending all resolved addresses
 294  	// would increase the number of addresses passed to the ClientConn and
 295  	// subsequently to load balancing (LB) policies like Round Robin, leading
 296  	// to additional TCP connections. However, if there's only one resolved
 297  	// proxy address, we send it directly, as it doesn't affect the address
 298  	// count returned by the target resolver and the address count sent to the
 299  	// ClientConn.
 300  	var proxyAddr resolver.Address
 301  	if len(r.proxyAddrs) == 1 {
 302  		proxyAddr = r.proxyAddrs[0]
 303  	} else {
 304  		proxyAddr = resolver.Address{Addr: r.proxyURL.Host}
 305  	}
 306  	var addresses []resolver.Address
 307  	for _, targetAddr := range (*r.targetResolverState).Addresses {
 308  		if skipProxy(targetAddr) {
 309  			addresses = append(addresses, targetAddr)
 310  			continue
 311  		}
 312  		addresses = append(addresses, proxyattributes.Set(proxyAddr, proxyattributes.Options{
 313  			User:        r.proxyURL.User,
 314  			ConnectAddr: targetAddr.Addr,
 315  		}))
 316  	}
 317  
 318  	// For each target endpoint, construct a new [resolver.Endpoint] that
 319  	// includes all addresses from all proxy endpoints and the addresses from
 320  	// that target endpoint, preserving the number of target endpoints.
 321  	var endpoints []resolver.Endpoint
 322  	for _, endpt := range (*r.targetResolverState).Endpoints {
 323  		var addrs []resolver.Address
 324  		for _, targetAddr := range endpt.Addresses {
 325  			// Avoid proxy when network is not tcp.
 326  			if skipProxy(targetAddr) {
 327  				addrs = append(addrs, targetAddr)
 328  				continue
 329  			}
 330  			for _, proxyAddr := range r.proxyAddrs {
 331  				addrs = append(addrs, proxyattributes.Set(proxyAddr, proxyattributes.Options{
 332  					User:        r.proxyURL.User,
 333  					ConnectAddr: targetAddr.Addr,
 334  				}))
 335  			}
 336  		}
 337  		endpoints = append(endpoints, resolver.Endpoint{Addresses: addrs})
 338  	}
 339  	// Use the targetResolverState for its service config and attributes
 340  	// contents. The state update is only sent after both the target and proxy
 341  	// resolvers have sent their updates, and curState has been updated with the
 342  	// combined addresses.
 343  	curState := *r.targetResolverState
 344  	curState.Addresses = addresses
 345  	curState.Endpoints = endpoints
 346  	return r.cc.UpdateState(curState)
 347  }
 348  
 349  // updateProxyResolverState updates the proxy resolver state by storing proxy
 350  // addresses and endpoints, marking the resolver as ready, and triggering a
 351  // state update if both proxy and target resolvers are ready. If the ClientConn
 352  // returns a non-nil error, it calls `ResolveNow()` on the target resolver.  It
 353  // is a StateListener function of wrappingClientConn passed to the proxy
 354  // resolver.
 355  func (r *delegatingResolver) updateProxyResolverState(state resolver.State) error {
 356  	r.mu.Lock()
 357  	defer r.mu.Unlock()
 358  	if logger.V(2) {
 359  		logger.Infof("Addresses received from proxy resolver: %s", state.Addresses)
 360  	}
 361  	if len(state.Endpoints) > 0 {
 362  		// We expect exactly one address per endpoint because the proxy resolver
 363  		// uses "dns" resolution.
 364  		r.proxyAddrs = make([]resolver.Address, 0, len(state.Endpoints))
 365  		for _, endpoint := range state.Endpoints {
 366  			r.proxyAddrs = append(r.proxyAddrs, endpoint.Addresses...)
 367  		}
 368  	} else if state.Addresses != nil {
 369  		r.proxyAddrs = state.Addresses
 370  	} else {
 371  		r.proxyAddrs = []resolver.Address{} // ensure proxyAddrs is non-nil to indicate an update has been received
 372  	}
 373  	err := r.updateClientConnStateLocked()
 374  	// Another possible approach was to block until updates are received from
 375  	// both resolvers. But this is not used because calling `New()` triggers
 376  	// `Build()` for the first resolver, which calls `UpdateState()`. And the
 377  	// second resolver hasn't sent an update yet, so it would cause `New()` to
 378  	// block indefinitely.
 379  	if err != nil {
 380  		go func() {
 381  			r.childMu.Lock()
 382  			defer r.childMu.Unlock()
 383  			if r.targetResolver != nil {
 384  				r.targetResolver.ResolveNow(resolver.ResolveNowOptions{})
 385  			}
 386  		}()
 387  	}
 388  	return err
 389  }
 390  
 391  // updateTargetResolverState is the StateListener function provided to the
 392  // target resolver via wrappingClientConn. It updates the resolver state and
 393  // marks the target resolver as ready. If the update includes at least one TCP
 394  // address and the proxy resolver has not yet been constructed, it initializes
 395  // the proxy resolver. A combined state update is triggered once both resolvers
 396  // are ready. If all addresses are non-TCP, it proceeds without waiting for the
 397  // proxy resolver. If ClientConn.UpdateState returns a non-nil error,
 398  // ResolveNow() is called on the proxy resolver.
 399  func (r *delegatingResolver) updateTargetResolverState(state resolver.State) error {
 400  	r.mu.Lock()
 401  	defer r.mu.Unlock()
 402  
 403  	if logger.V(2) {
 404  		logger.Infof("Addresses received from target resolver: %v", state.Addresses)
 405  	}
 406  	r.targetResolverState = &state
 407  	// If all addresses returned by the target resolver have a non-TCP network
 408  	// type, or are listed in the `NO_PROXY` environment variable, do not wait
 409  	// for proxy update.
 410  	if !needsProxyResolver(r.targetResolverState) {
 411  		return r.cc.UpdateState(*r.targetResolverState)
 412  	}
 413  
 414  	// The proxy resolver may be rebuilt multiple times, specifically each time
 415  	// the target resolver sends an update, even if the target resolver is built
 416  	// successfully but building the proxy resolver fails.
 417  	if len(r.proxyAddrs) == 0 {
 418  		go func() {
 419  			r.childMu.Lock()
 420  			defer r.childMu.Unlock()
 421  			if _, ok := r.proxyResolver.(nopResolver); !ok {
 422  				return
 423  			}
 424  			proxyResolver, err := r.proxyURIResolver(resolver.BuildOptions{})
 425  			if err != nil {
 426  				r.cc.ReportError(fmt.Errorf("delegating_resolver: unable to build the proxy resolver: %v", err))
 427  				return
 428  			}
 429  			r.proxyResolver = proxyResolver
 430  		}()
 431  	}
 432  
 433  	err := r.updateClientConnStateLocked()
 434  	if err != nil {
 435  		go func() {
 436  			r.childMu.Lock()
 437  			defer r.childMu.Unlock()
 438  			if r.proxyResolver != nil {
 439  				r.proxyResolver.ResolveNow(resolver.ResolveNowOptions{})
 440  			}
 441  		}()
 442  	}
 443  	return nil
 444  }
 445  
 446  // wrappingClientConn serves as an intermediary between the parent ClientConn
 447  // and the child resolvers created here. It implements the resolver.ClientConn
 448  // interface and is passed in that capacity to the child resolvers.
 449  type wrappingClientConn struct {
 450  	// Callback to deliver resolver state updates
 451  	stateListener func(state resolver.State) error
 452  	parent        *delegatingResolver
 453  }
 454  
 455  // UpdateState receives resolver state updates and forwards them to the
 456  // appropriate listener function (either for the proxy or target resolver).
 457  func (wcc *wrappingClientConn) UpdateState(state resolver.State) error {
 458  	return wcc.stateListener(state)
 459  }
 460  
 461  // ReportError intercepts errors from the child resolvers and passes them to
 462  // ClientConn.
 463  func (wcc *wrappingClientConn) ReportError(err error) {
 464  	wcc.parent.cc.ReportError(err)
 465  }
 466  
 467  // NewAddress intercepts the new resolved address from the child resolvers and
 468  // passes them to ClientConn.
 469  func (wcc *wrappingClientConn) NewAddress(addrs []resolver.Address) {
 470  	wcc.UpdateState(resolver.State{Addresses: addrs})
 471  }
 472  
 473  // ParseServiceConfig parses the provided service config and returns an object
 474  // that provides the parsed config.
 475  func (wcc *wrappingClientConn) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult {
 476  	return wcc.parent.cc.ParseServiceConfig(serviceConfigJSON)
 477  }
 478