service_config.go raw

   1  /*
   2   *
   3   * Copyright 2017 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package grpc
  20  
  21  import (
  22  	"encoding/json"
  23  	"errors"
  24  	"fmt"
  25  	"reflect"
  26  	"time"
  27  
  28  	"google.golang.org/grpc/balancer"
  29  	"google.golang.org/grpc/balancer/pickfirst"
  30  	"google.golang.org/grpc/codes"
  31  	"google.golang.org/grpc/internal"
  32  	"google.golang.org/grpc/internal/balancer/gracefulswitch"
  33  	internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
  34  	"google.golang.org/grpc/serviceconfig"
  35  )
  36  
  37  const maxInt = int(^uint(0) >> 1)
  38  
  39  // MethodConfig defines the configuration recommended by the service providers for a
  40  // particular method.
  41  //
  42  // Deprecated: Users should not use this struct. Service config should be received
  43  // through name resolver, as specified here
  44  // https://github.com/grpc/grpc/blob/master/doc/service_config.md
  45  type MethodConfig = internalserviceconfig.MethodConfig
  46  
  47  // ServiceConfig is provided by the service provider and contains parameters for how
  48  // clients that connect to the service should behave.
  49  //
  50  // Deprecated: Users should not use this struct. Service config should be received
  51  // through name resolver, as specified here
  52  // https://github.com/grpc/grpc/blob/master/doc/service_config.md
  53  type ServiceConfig struct {
  54  	serviceconfig.Config
  55  
  56  	// lbConfig is the service config's load balancing configuration.  If
  57  	// lbConfig and LB are both present, lbConfig will be used.
  58  	lbConfig serviceconfig.LoadBalancingConfig
  59  
  60  	// Methods contains a map for the methods in this service.  If there is an
  61  	// exact match for a method (i.e. /service/method) in the map, use the
  62  	// corresponding MethodConfig.  If there's no exact match, look for the
  63  	// default config for the service (/service/) and use the corresponding
  64  	// MethodConfig if it exists.  Otherwise, the method has no MethodConfig to
  65  	// use.
  66  	Methods map[string]MethodConfig
  67  
  68  	// If a retryThrottlingPolicy is provided, gRPC will automatically throttle
  69  	// retry attempts and hedged RPCs when the client’s ratio of failures to
  70  	// successes exceeds a threshold.
  71  	//
  72  	// For each server name, the gRPC client will maintain a token_count which is
  73  	// initially set to maxTokens, and can take values between 0 and maxTokens.
  74  	//
  75  	// Every outgoing RPC (regardless of service or method invoked) will change
  76  	// token_count as follows:
  77  	//
  78  	//   - Every failed RPC will decrement the token_count by 1.
  79  	//   - Every successful RPC will increment the token_count by tokenRatio.
  80  	//
  81  	// If token_count is less than or equal to maxTokens / 2, then RPCs will not
  82  	// be retried and hedged RPCs will not be sent.
  83  	retryThrottling *retryThrottlingPolicy
  84  	// healthCheckConfig must be set as one of the requirement to enable LB channel
  85  	// health check.
  86  	healthCheckConfig *healthCheckConfig
  87  	// rawJSONString stores service config json string that get parsed into
  88  	// this service config struct.
  89  	rawJSONString string
  90  }
  91  
  92  // healthCheckConfig defines the go-native version of the LB channel health check config.
  93  type healthCheckConfig struct {
  94  	// serviceName is the service name to use in the health-checking request.
  95  	ServiceName string
  96  }
  97  
  98  type jsonRetryPolicy struct {
  99  	MaxAttempts          int
 100  	InitialBackoff       internalserviceconfig.Duration
 101  	MaxBackoff           internalserviceconfig.Duration
 102  	BackoffMultiplier    float64
 103  	RetryableStatusCodes []codes.Code
 104  }
 105  
 106  // retryThrottlingPolicy defines the go-native version of the retry throttling
 107  // policy defined by the service config here:
 108  // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config
 109  type retryThrottlingPolicy struct {
 110  	// The number of tokens starts at maxTokens. The token_count will always be
 111  	// between 0 and maxTokens.
 112  	//
 113  	// This field is required and must be greater than zero.
 114  	MaxTokens float64
 115  	// The amount of tokens to add on each successful RPC. Typically this will
 116  	// be some number between 0 and 1, e.g., 0.1.
 117  	//
 118  	// This field is required and must be greater than zero. Up to 3 decimal
 119  	// places are supported.
 120  	TokenRatio float64
 121  }
 122  
 123  type jsonName struct {
 124  	Service string
 125  	Method  string
 126  }
 127  
 128  var (
 129  	errDuplicatedName             = errors.New("duplicated name")
 130  	errEmptyServiceNonEmptyMethod = errors.New("cannot combine empty 'service' and non-empty 'method'")
 131  )
 132  
 133  func (j jsonName) generatePath() (string, error) {
 134  	if j.Service == "" {
 135  		if j.Method != "" {
 136  			return "", errEmptyServiceNonEmptyMethod
 137  		}
 138  		return "", nil
 139  	}
 140  	res := "/" + j.Service + "/"
 141  	if j.Method != "" {
 142  		res += j.Method
 143  	}
 144  	return res, nil
 145  }
 146  
 147  // TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
 148  type jsonMC struct {
 149  	Name                    *[]jsonName
 150  	WaitForReady            *bool
 151  	Timeout                 *internalserviceconfig.Duration
 152  	MaxRequestMessageBytes  *int64
 153  	MaxResponseMessageBytes *int64
 154  	RetryPolicy             *jsonRetryPolicy
 155  }
 156  
 157  // TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
 158  type jsonSC struct {
 159  	LoadBalancingPolicy *string
 160  	LoadBalancingConfig *json.RawMessage
 161  	MethodConfig        *[]jsonMC
 162  	RetryThrottling     *retryThrottlingPolicy
 163  	HealthCheckConfig   *healthCheckConfig
 164  }
 165  
 166  func init() {
 167  	internal.ParseServiceConfig = func(js string) *serviceconfig.ParseResult {
 168  		return parseServiceConfig(js, defaultMaxCallAttempts)
 169  	}
 170  }
 171  
 172  func parseServiceConfig(js string, maxAttempts int) *serviceconfig.ParseResult {
 173  	if len(js) == 0 {
 174  		return &serviceconfig.ParseResult{Err: fmt.Errorf("no JSON service config provided")}
 175  	}
 176  	var rsc jsonSC
 177  	err := json.Unmarshal([]byte(js), &rsc)
 178  	if err != nil {
 179  		logger.Warningf("grpc: unmarshalling service config %s: %v", js, err)
 180  		return &serviceconfig.ParseResult{Err: err}
 181  	}
 182  	sc := ServiceConfig{
 183  		Methods:           make(map[string]MethodConfig),
 184  		retryThrottling:   rsc.RetryThrottling,
 185  		healthCheckConfig: rsc.HealthCheckConfig,
 186  		rawJSONString:     js,
 187  	}
 188  	c := rsc.LoadBalancingConfig
 189  	if c == nil {
 190  		name := pickfirst.Name
 191  		if rsc.LoadBalancingPolicy != nil {
 192  			name = *rsc.LoadBalancingPolicy
 193  		}
 194  		if balancer.Get(name) == nil {
 195  			name = pickfirst.Name
 196  		}
 197  		cfg := []map[string]any{{name: struct{}{}}}
 198  		strCfg, err := json.Marshal(cfg)
 199  		if err != nil {
 200  			return &serviceconfig.ParseResult{Err: fmt.Errorf("unexpected error marshaling simple LB config: %w", err)}
 201  		}
 202  		r := json.RawMessage(strCfg)
 203  		c = &r
 204  	}
 205  	cfg, err := gracefulswitch.ParseConfig(*c)
 206  	if err != nil {
 207  		return &serviceconfig.ParseResult{Err: err}
 208  	}
 209  	sc.lbConfig = cfg
 210  
 211  	if rsc.MethodConfig == nil {
 212  		return &serviceconfig.ParseResult{Config: &sc}
 213  	}
 214  
 215  	paths := map[string]struct{}{}
 216  	for _, m := range *rsc.MethodConfig {
 217  		if m.Name == nil {
 218  			continue
 219  		}
 220  
 221  		mc := MethodConfig{
 222  			WaitForReady: m.WaitForReady,
 223  			Timeout:      (*time.Duration)(m.Timeout),
 224  		}
 225  		if mc.RetryPolicy, err = convertRetryPolicy(m.RetryPolicy, maxAttempts); err != nil {
 226  			logger.Warningf("grpc: unmarshalling service config %s: %v", js, err)
 227  			return &serviceconfig.ParseResult{Err: err}
 228  		}
 229  		if m.MaxRequestMessageBytes != nil {
 230  			if *m.MaxRequestMessageBytes > int64(maxInt) {
 231  				mc.MaxReqSize = newInt(maxInt)
 232  			} else {
 233  				mc.MaxReqSize = newInt(int(*m.MaxRequestMessageBytes))
 234  			}
 235  		}
 236  		if m.MaxResponseMessageBytes != nil {
 237  			if *m.MaxResponseMessageBytes > int64(maxInt) {
 238  				mc.MaxRespSize = newInt(maxInt)
 239  			} else {
 240  				mc.MaxRespSize = newInt(int(*m.MaxResponseMessageBytes))
 241  			}
 242  		}
 243  		for i, n := range *m.Name {
 244  			path, err := n.generatePath()
 245  			if err != nil {
 246  				logger.Warningf("grpc: error unmarshalling service config %s due to methodConfig[%d]: %v", js, i, err)
 247  				return &serviceconfig.ParseResult{Err: err}
 248  			}
 249  
 250  			if _, ok := paths[path]; ok {
 251  				err = errDuplicatedName
 252  				logger.Warningf("grpc: error unmarshalling service config %s due to methodConfig[%d]: %v", js, i, err)
 253  				return &serviceconfig.ParseResult{Err: err}
 254  			}
 255  			paths[path] = struct{}{}
 256  			sc.Methods[path] = mc
 257  		}
 258  	}
 259  
 260  	if sc.retryThrottling != nil {
 261  		if mt := sc.retryThrottling.MaxTokens; mt <= 0 || mt > 1000 {
 262  			return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: maxTokens (%v) out of range (0, 1000]", mt)}
 263  		}
 264  		if tr := sc.retryThrottling.TokenRatio; tr <= 0 {
 265  			return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: tokenRatio (%v) may not be negative", tr)}
 266  		}
 267  	}
 268  	return &serviceconfig.ParseResult{Config: &sc}
 269  }
 270  
 271  func isValidRetryPolicy(jrp *jsonRetryPolicy) bool {
 272  	return jrp.MaxAttempts > 1 &&
 273  		jrp.InitialBackoff > 0 &&
 274  		jrp.MaxBackoff > 0 &&
 275  		jrp.BackoffMultiplier > 0 &&
 276  		len(jrp.RetryableStatusCodes) > 0
 277  }
 278  
 279  func convertRetryPolicy(jrp *jsonRetryPolicy, maxAttempts int) (p *internalserviceconfig.RetryPolicy, err error) {
 280  	if jrp == nil {
 281  		return nil, nil
 282  	}
 283  
 284  	if !isValidRetryPolicy(jrp) {
 285  		return nil, fmt.Errorf("invalid retry policy (%+v): ", jrp)
 286  	}
 287  
 288  	if jrp.MaxAttempts < maxAttempts {
 289  		maxAttempts = jrp.MaxAttempts
 290  	}
 291  	rp := &internalserviceconfig.RetryPolicy{
 292  		MaxAttempts:          maxAttempts,
 293  		InitialBackoff:       time.Duration(jrp.InitialBackoff),
 294  		MaxBackoff:           time.Duration(jrp.MaxBackoff),
 295  		BackoffMultiplier:    jrp.BackoffMultiplier,
 296  		RetryableStatusCodes: make(map[codes.Code]bool),
 297  	}
 298  	for _, code := range jrp.RetryableStatusCodes {
 299  		rp.RetryableStatusCodes[code] = true
 300  	}
 301  	return rp, nil
 302  }
 303  
 304  func minPointers(a, b *int) *int {
 305  	if *a < *b {
 306  		return a
 307  	}
 308  	return b
 309  }
 310  
 311  func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
 312  	if mcMax == nil && doptMax == nil {
 313  		return &defaultVal
 314  	}
 315  	if mcMax != nil && doptMax != nil {
 316  		return minPointers(mcMax, doptMax)
 317  	}
 318  	if mcMax != nil {
 319  		return mcMax
 320  	}
 321  	return doptMax
 322  }
 323  
 324  func newInt(b int) *int {
 325  	return &b
 326  }
 327  
 328  func init() {
 329  	internal.EqualServiceConfigForTesting = equalServiceConfig
 330  }
 331  
 332  // equalServiceConfig compares two configs. The rawJSONString field is ignored,
 333  // because they may diff in white spaces.
 334  //
 335  // If any of them is NOT *ServiceConfig, return false.
 336  func equalServiceConfig(a, b serviceconfig.Config) bool {
 337  	if a == nil && b == nil {
 338  		return true
 339  	}
 340  	aa, ok := a.(*ServiceConfig)
 341  	if !ok {
 342  		return false
 343  	}
 344  	bb, ok := b.(*ServiceConfig)
 345  	if !ok {
 346  		return false
 347  	}
 348  	aaRaw := aa.rawJSONString
 349  	aa.rawJSONString = ""
 350  	bbRaw := bb.rawJSONString
 351  	bb.rawJSONString = ""
 352  	defer func() {
 353  		aa.rawJSONString = aaRaw
 354  		bb.rawJSONString = bbRaw
 355  	}()
 356  	// Using reflect.DeepEqual instead of cmp.Equal because many balancer
 357  	// configs are unexported, and cmp.Equal cannot compare unexported fields
 358  	// from unexported structs.
 359  	return reflect.DeepEqual(aa, bb)
 360  }
 361