circuit_breaker.go raw

   1  // Copyright (c) 2016, 2018, 2025, Oracle and/or its affiliates.  All rights reserved.
   2  // This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.
   3  
   4  package common
   5  
   6  import (
   7  	"fmt"
   8  	"math/rand"
   9  	"net/http"
  10  	"os"
  11  	"strconv"
  12  	"sync"
  13  	"time"
  14  
  15  	"github.com/sony/gobreaker"
  16  )
  17  
  18  const (
  19  	// CircuitBreakerDefaultFailureRateThreshold is the requests failure rate which calculates in at most 120 seconds, once reaches to this rate, the circuit breaker state changes from closed to open
  20  	CircuitBreakerDefaultFailureRateThreshold float64 = 0.80
  21  	// CircuitBreakerDefaultClosedWindow is the default value of closeStateWindow, which is the cyclic period of the closed state
  22  	CircuitBreakerDefaultClosedWindow time.Duration = 120 * time.Second
  23  	// CircuitBreakerDefaultResetTimeout is the default value of openStateWindow, which is the wait time before setting the breaker to halfOpen state from open state
  24  	CircuitBreakerDefaultResetTimeout time.Duration = 30 * time.Second
  25  	// CircuitBreakerDefaultVolumeThreshold is the default value of minimumRequests in closed status
  26  	CircuitBreakerDefaultVolumeThreshold uint32 = 10
  27  	// DefaultCircuitBreakerName is the name of the circuit breaker
  28  	DefaultCircuitBreakerName string = "DefaultCircuitBreaker"
  29  	// DefaultCircuitBreakerServiceName is the servicename of the circuit breaker
  30  	DefaultCircuitBreakerServiceName string = ""
  31  	// DefaultCircuitBreakerHistoryCount is the default count of failed response history in circuit breaker
  32  	DefaultCircuitBreakerHistoryCount int = 5
  33  	// MinAuthClientCircuitBreakerResetTimeout is the min value of openStateWindow, which is the wait time before setting the breaker to halfOpen state from open state
  34  	MinAuthClientCircuitBreakerResetTimeout = 30
  35  	// MaxAuthClientCircuitBreakerResetTimeout is the max value of openStateWindow, which is the wait time before setting the breaker to halfOpen state from open state
  36  	MaxAuthClientCircuitBreakerResetTimeout = 49
  37  	// AuthClientCircuitBreakerName is the default circuit breaker name for the DefaultAuthClientCircuitBreakerSetting
  38  	AuthClientCircuitBreakerName = "FederationClientCircuitBreaker"
  39  	// AuthClientCircuitBreakerDefaultFailureThreshold is the default requests failure rate for the DefaultAuthClientCircuitBreakerSetting
  40  	AuthClientCircuitBreakerDefaultFailureThreshold float64 = 0.65
  41  	// AuthClientCircuitBreakerDefaultMinimumRequests is the default value of minimumRequests in closed status
  42  	AuthClientCircuitBreakerDefaultMinimumRequests uint32 = 3
  43  )
  44  
  45  // CircuitBreakerSetting wraps all exposed configurable params of circuit breaker
  46  type CircuitBreakerSetting struct {
  47  	// Name is the Circuit Breaker's identifier
  48  	name string
  49  	// isEnabled is the switch of the circuit breaker, used for disable circuit breaker
  50  	isEnabled bool
  51  	// closeStateWindow is the cyclic period of the closed state, the default value is 120 seconds
  52  	closeStateWindow time.Duration
  53  	// openStateWindow is the wait time before setting the breaker to halfOpen state from open state, the default value is 30 seconds
  54  	openStateWindow time.Duration
  55  	// failureRateThreshold is the failure rate which calculates in at most closeStateWindow seconds, once reaches to this rate, the circuit breaker state changes from closed to open
  56  	// the circuit will transition from closed to open, the default value is 80%
  57  	failureRateThreshold float64
  58  	// minimumRequests is the minimum number of counted requests in closed state, the default value is 10 requests
  59  	minimumRequests uint32
  60  	// successStatCodeMap is the error(s) of StatusCode returned from service, which should be considered as the success or failure accounted by circuit breaker
  61  	// successStatCodeMap and successStatErrCodeMap are combined to use, if both StatusCode and ErrorCode are required, no need to add it to successStatCodeMap,
  62  	// the default value is [429, 500, 502, 503, 504]
  63  	successStatCodeMap map[int]bool
  64  	// successStatErrCodeMap is the error(s) of StatusCode and ErrorCode returned from service, which should be considered
  65  	// as the success or failure accounted by circuit breaker
  66  	// the default value is {409, "IncorrectState"}
  67  	successStatErrCodeMap map[StatErrCode]bool
  68  	// serviceName is the name of the service which can be set using withServiceName option for NewCircuitBreaker.
  69  	// the default value is empty string
  70  	serviceName string
  71  	// numberOfRecordedHistoryResponse is the number of failure responses stored in Circuit breaker history for debugging purpose
  72  	// the default value is 5
  73  	numberOfRecordedHistoryResponse int
  74  }
  75  
  76  // String Converts CircuitBreakerSetting to human-readable string representation
  77  func (cbst CircuitBreakerSetting) String() string {
  78  	return fmt.Sprintf("{name=%v, isEnabled=%v, closeStateWindow=%v, openStateWindow=%v, failureRateThreshold=%v, minimumRequests=%v, successStatCodeMap=%v, successStatErrCodeMap=%v, serviceName=%v, historyCount=%v}",
  79  		cbst.name, cbst.isEnabled, cbst.closeStateWindow, cbst.openStateWindow, cbst.failureRateThreshold, cbst.minimumRequests, cbst.successStatCodeMap, cbst.successStatErrCodeMap, cbst.serviceName, cbst.numberOfRecordedHistoryResponse)
  80  }
  81  
  82  // ResponseHistory wraps the response params
  83  type ResponseHistory struct {
  84  	timestamp    time.Time
  85  	opcReqID     string
  86  	errorCode    string
  87  	errorMessage string
  88  	statusCode   int
  89  }
  90  
  91  // String Converts ResponseHistory to human-readable string representation
  92  func (rh ResponseHistory) String() string {
  93  	return fmt.Sprintf("Opc-Req-id - %v\nErrorCode - %v - %v\nErrorMessage - %v\n\n", rh.opcReqID, rh.statusCode, rh.errorCode, rh.errorMessage)
  94  }
  95  
  96  // AddToHistory processed the response and adds to response history queue
  97  func (ocb *OciCircuitBreaker) AddToHistory(resp *http.Response, err ServiceError) {
  98  	respHist := new(ResponseHistory)
  99  	respHist.opcReqID = err.GetOpcRequestID()
 100  	respHist.errorCode = err.GetCode()
 101  	respHist.errorMessage = err.GetMessage()
 102  	respHist.statusCode = err.GetHTTPStatusCode()
 103  	respHist.timestamp, _ = time.Parse(time.RFC1123, resp.Header.Get("Date"))
 104  	ocb.historyQueueMutex.Lock()
 105  	defer ocb.historyQueueMutex.Unlock()
 106  	ocb.historyQueue = append(ocb.historyQueue, *respHist)
 107  	// cleaning up older values
 108  	if len(ocb.historyQueue) > ocb.Cbst.numberOfRecordedHistoryResponse {
 109  		// We have reached the capacity. Clean up the oldest value
 110  		ocb.historyQueue = ocb.historyQueue[1:]
 111  	}
 112  	for index := len(ocb.historyQueue) - 1; index >= 0; index-- {
 113  		if time.Since(ocb.historyQueue[index].timestamp) > ocb.Cbst.closeStateWindow {
 114  			// This response is older than the circuit breaker closeStateWindow.
 115  			// Remove all the older responses from 0 to index
 116  			ocb.historyQueue = ocb.historyQueue[index+1:]
 117  			break
 118  		}
 119  	}
 120  	return
 121  }
 122  
 123  // GetHistory processes the rsponse in queue to construct a String
 124  func (ocb *OciCircuitBreaker) GetHistory() string {
 125  	getHistoryString := ""
 126  	ocb.historyQueueMutex.Lock()
 127  	defer ocb.historyQueueMutex.Unlock()
 128  	for _, value := range ocb.historyQueue {
 129  		getHistoryString += value.String()
 130  	}
 131  	return getHistoryString
 132  }
 133  
 134  // OciCircuitBreaker wraps all exposed configurable params of circuit breaker and 3P gobreaker CircuirBreaker
 135  type OciCircuitBreaker struct {
 136  	Cbst              *CircuitBreakerSetting
 137  	Cb                *gobreaker.CircuitBreaker
 138  	historyQueue      []ResponseHistory
 139  	historyQueueMutex sync.Mutex
 140  }
 141  
 142  // NewOciCircuitBreaker is used for initializing specified oci circuit breaker configuration with circuit breaker settings
 143  func NewOciCircuitBreaker(cbst *CircuitBreakerSetting, gbcb *gobreaker.CircuitBreaker) *OciCircuitBreaker {
 144  	ocb := new(OciCircuitBreaker)
 145  	ocb.Cbst = cbst
 146  	if ocb.Cbst.numberOfRecordedHistoryResponse == 0 {
 147  		fmt.Println("num hist empty")
 148  		ocb.Cbst.numberOfRecordedHistoryResponse = getDefaultNumHistoryCount()
 149  	}
 150  	ocb.Cb = gbcb
 151  	ocb.historyQueue = make([]ResponseHistory, 0, ocb.Cbst.numberOfRecordedHistoryResponse)
 152  
 153  	return ocb
 154  }
 155  
 156  // CircuitBreakerOption is the type of the options for NewCircuitBreakerWithOptions.
 157  type CircuitBreakerOption func(cbst *CircuitBreakerSetting)
 158  
 159  // NewGoCircuitBreaker is a function to initialize a CircuitBreaker object with the specified configuration
 160  // Add the interface, to allow the user directly use the 3P gobreaker.Setting's params.
 161  func NewGoCircuitBreaker(st gobreaker.Settings) *gobreaker.CircuitBreaker {
 162  	return gobreaker.NewCircuitBreaker(st)
 163  }
 164  
 165  // DefaultCircuitBreakerSetting is used for set circuit breaker with default config
 166  func DefaultCircuitBreakerSetting() *CircuitBreakerSetting {
 167  	successStatErrCodeMap := map[StatErrCode]bool{
 168  		{409, "IncorrectState"}: false,
 169  	}
 170  	successStatCodeMap := map[int]bool{
 171  		429: false,
 172  		500: false,
 173  		502: false,
 174  		503: false,
 175  		504: false,
 176  	}
 177  	return newCircuitBreakerSetting(
 178  		WithName(DefaultCircuitBreakerName),
 179  		WithIsEnabled(true),
 180  		WithCloseStateWindow(CircuitBreakerDefaultClosedWindow),
 181  		WithOpenStateWindow(CircuitBreakerDefaultResetTimeout),
 182  		WithFailureRateThreshold(CircuitBreakerDefaultFailureRateThreshold),
 183  		WithMinimumRequests(CircuitBreakerDefaultVolumeThreshold),
 184  		WithSuccessStatErrCodeMap(successStatErrCodeMap),
 185  		WithSuccessStatCodeMap(successStatCodeMap),
 186  		WithHistoryCount(getDefaultNumHistoryCount()))
 187  }
 188  
 189  // DefaultCircuitBreakerSettingWithServiceName is used for set circuit breaker with default config
 190  func DefaultCircuitBreakerSettingWithServiceName(servicename string) *CircuitBreakerSetting {
 191  	successStatErrCodeMap := map[StatErrCode]bool{
 192  		{409, "IncorrectState"}: false,
 193  	}
 194  	successStatCodeMap := map[int]bool{
 195  		429: false,
 196  		500: false,
 197  		502: false,
 198  		503: false,
 199  		504: false,
 200  	}
 201  	return newCircuitBreakerSetting(
 202  		WithName(DefaultCircuitBreakerName),
 203  		WithIsEnabled(true),
 204  		WithCloseStateWindow(CircuitBreakerDefaultClosedWindow),
 205  		WithOpenStateWindow(CircuitBreakerDefaultResetTimeout),
 206  		WithFailureRateThreshold(CircuitBreakerDefaultFailureRateThreshold),
 207  		WithMinimumRequests(CircuitBreakerDefaultVolumeThreshold),
 208  		WithSuccessStatErrCodeMap(successStatErrCodeMap),
 209  		WithSuccessStatCodeMap(successStatCodeMap),
 210  		WithServiceName(servicename),
 211  		WithHistoryCount(getDefaultNumHistoryCount()))
 212  }
 213  
 214  // NoCircuitBreakerSetting is used for disable Circuit Breaker
 215  func NoCircuitBreakerSetting() *CircuitBreakerSetting {
 216  	return NewCircuitBreakerSettingWithOptions(WithIsEnabled(false))
 217  }
 218  
 219  // NewCircuitBreakerSettingWithOptions is a helper method to assemble a CircuitBreakerSetting object.
 220  // It starts out with the values returned by defaultCircuitBreakerSetting().
 221  func NewCircuitBreakerSettingWithOptions(opts ...CircuitBreakerOption) *CircuitBreakerSetting {
 222  	cbst := DefaultCircuitBreakerSettingWithServiceName(DefaultCircuitBreakerServiceName)
 223  	// allow changing values
 224  	for _, opt := range opts {
 225  		opt(cbst)
 226  	}
 227  	if defaultLogger != nil && defaultLogger.LogLevel() == verboseLogging {
 228  		Debugf("Circuit Breaker setting: %s\n", cbst.String())
 229  	}
 230  
 231  	return cbst
 232  }
 233  
 234  // NewCircuitBreaker is used for initialing specified circuit breaker configuration with base client
 235  func NewCircuitBreaker(cbst *CircuitBreakerSetting) *OciCircuitBreaker {
 236  	if !cbst.isEnabled {
 237  		return nil
 238  	}
 239  
 240  	st := gobreaker.Settings{}
 241  	customizeGoBreakerSetting(&st, cbst)
 242  	gbcb := gobreaker.NewCircuitBreaker(st)
 243  
 244  	return NewOciCircuitBreaker(cbst, gbcb)
 245  }
 246  
 247  func newCircuitBreakerSetting(opts ...CircuitBreakerOption) *CircuitBreakerSetting {
 248  	cbSetting := CircuitBreakerSetting{}
 249  
 250  	// allow changing values
 251  	for _, opt := range opts {
 252  		opt(&cbSetting)
 253  	}
 254  	return &cbSetting
 255  }
 256  
 257  // customizeGoBreakerSetting is used for converting CircuitBreakerSetting to 3P gobreaker's setting type
 258  func customizeGoBreakerSetting(st *gobreaker.Settings, cbst *CircuitBreakerSetting) {
 259  	st.Name = cbst.name
 260  	st.Timeout = cbst.openStateWindow
 261  	st.Interval = cbst.closeStateWindow
 262  	st.OnStateChange = func(name string, from gobreaker.State, to gobreaker.State) {
 263  		if to == gobreaker.StateOpen {
 264  			Debugf("Circuit Breaker %s is now in Open State\n", name)
 265  		}
 266  	}
 267  	st.ReadyToTrip = func(counts gobreaker.Counts) bool {
 268  		failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
 269  		return counts.Requests >= cbst.minimumRequests && failureRatio >= cbst.failureRateThreshold
 270  	}
 271  	st.IsSuccessful = func(err error) bool {
 272  		if serviceErr, ok := IsServiceError(err); ok {
 273  			if isSuccessful, ok := cbst.successStatCodeMap[serviceErr.GetHTTPStatusCode()]; ok {
 274  				return isSuccessful
 275  			}
 276  			if isSuccessful, ok := cbst.successStatErrCodeMap[StatErrCode{serviceErr.GetHTTPStatusCode(), serviceErr.GetCode()}]; ok {
 277  				return isSuccessful
 278  			}
 279  		}
 280  		return true
 281  	}
 282  }
 283  
 284  // WithName is the option for NewCircuitBreaker that sets the Name.
 285  func WithName(name string) CircuitBreakerOption {
 286  	// this is the CircuitBreakerOption function type
 287  	return func(cbst *CircuitBreakerSetting) {
 288  		cbst.name = name
 289  	}
 290  }
 291  
 292  // WithIsEnabled is the option for NewCircuitBreaker that sets the isEnabled.
 293  func WithIsEnabled(isEnabled bool) CircuitBreakerOption {
 294  	// this is the CircuitBreakerOption function type
 295  	return func(cbst *CircuitBreakerSetting) {
 296  		cbst.isEnabled = isEnabled
 297  	}
 298  }
 299  
 300  // WithCloseStateWindow is the option for NewCircuitBreaker that sets the closeStateWindow.
 301  func WithCloseStateWindow(window time.Duration) CircuitBreakerOption {
 302  	// this is the CircuitBreakerOption function type
 303  	return func(cbst *CircuitBreakerSetting) {
 304  		cbst.closeStateWindow = window
 305  	}
 306  }
 307  
 308  // WithOpenStateWindow is the option for NewCircuitBreaker that sets the openStateWindow.
 309  func WithOpenStateWindow(window time.Duration) CircuitBreakerOption {
 310  	// this is the CircuitBreakerOption function type
 311  	return func(cbst *CircuitBreakerSetting) {
 312  		cbst.openStateWindow = window
 313  	}
 314  }
 315  
 316  // WithFailureRateThreshold is the option for NewCircuitBreaker that sets the failureRateThreshold.
 317  func WithFailureRateThreshold(threshold float64) CircuitBreakerOption {
 318  	// this is the CircuitBreakerOption function type
 319  	return func(cbst *CircuitBreakerSetting) {
 320  		cbst.failureRateThreshold = threshold
 321  	}
 322  }
 323  
 324  // WithMinimumRequests is the option for NewCircuitBreaker that sets the minimumRequests.
 325  func WithMinimumRequests(num uint32) CircuitBreakerOption {
 326  	// this is the CircuitBreakerOption function type
 327  	return func(cbst *CircuitBreakerSetting) {
 328  		cbst.minimumRequests = num
 329  	}
 330  }
 331  
 332  // WithSuccessStatCodeMap is the option for NewCircuitBreaker that sets the successStatCodeMap.
 333  func WithSuccessStatCodeMap(successStatCodeMap map[int]bool) CircuitBreakerOption {
 334  	// this is the CircuitBreakerOption function type
 335  	return func(cbst *CircuitBreakerSetting) {
 336  		cbst.successStatCodeMap = successStatCodeMap
 337  	}
 338  }
 339  
 340  // WithSuccessStatErrCodeMap is the option for NewCircuitBreaker that sets the successStatErrCodeMap.
 341  func WithSuccessStatErrCodeMap(successStatErrCodeMap map[StatErrCode]bool) CircuitBreakerOption {
 342  	// this is the CircuitBreakerOption function type
 343  	return func(cbst *CircuitBreakerSetting) {
 344  		cbst.successStatErrCodeMap = successStatErrCodeMap
 345  	}
 346  }
 347  
 348  // WithServiceName is the option for NewCircuitBreaker that sets the ServiceName.
 349  func WithServiceName(serviceName string) CircuitBreakerOption {
 350  	// this is the CircuitBreakerOption function type
 351  	return func(cbst *CircuitBreakerSetting) {
 352  		cbst.serviceName = serviceName
 353  	}
 354  }
 355  
 356  // WithHistoryCount to set the number of failed responses
 357  func WithHistoryCount(count int) CircuitBreakerOption {
 358  	// this is the CircuitBreakerOption function type
 359  	return func(cbst *CircuitBreakerSetting) {
 360  		cbst.numberOfRecordedHistoryResponse = count
 361  	}
 362  }
 363  
 364  // getDefaultNumHistoryCount to set the number of failed responses
 365  func getDefaultNumHistoryCount() int {
 366  	if val, isSet := os.LookupEnv(circuitBreakerNumberOfHistoryResponseEnv); isSet {
 367  		count, err := strconv.Atoi(val)
 368  		if err == nil && count > 0 {
 369  			return count
 370  		}
 371  		Debugf("Invalid history count specified. Resetting to default value")
 372  	}
 373  	return DefaultCircuitBreakerHistoryCount
 374  }
 375  
 376  // GlobalCircuitBreakerSetting is global level circuit breaker setting, it would impact all services, the precedence is lower
 377  // than client level circuit breaker
 378  var GlobalCircuitBreakerSetting *CircuitBreakerSetting = nil
 379  
 380  // ConfigCircuitBreakerFromEnvVar is used for checking the circuit breaker environment variable setting, default value is nil
 381  func ConfigCircuitBreakerFromEnvVar(baseClient *BaseClient) {
 382  	if IsEnvVarTrue(isDefaultCircuitBreakerEnabled) {
 383  		baseClient.Configuration.CircuitBreaker = NewCircuitBreaker(DefaultCircuitBreakerSetting())
 384  		return
 385  	}
 386  	if IsEnvVarFalse(isDefaultCircuitBreakerEnabled) {
 387  		baseClient.Configuration.CircuitBreaker = nil
 388  	}
 389  }
 390  
 391  // ConfigCircuitBreakerFromGlobalVar is used for checking if global circuitBreakerSetting is configured, the priority is higher than cb env var
 392  func ConfigCircuitBreakerFromGlobalVar(baseClient *BaseClient) {
 393  	if GlobalCircuitBreakerSetting != nil {
 394  		baseClient.Configuration.CircuitBreaker = NewCircuitBreaker(GlobalCircuitBreakerSetting)
 395  	}
 396  }
 397  
 398  // DefaultAuthClientCircuitBreakerSetting returns the default circuit breaker setting for the Auth Client
 399  func DefaultAuthClientCircuitBreakerSetting() *CircuitBreakerSetting {
 400  	return NewCircuitBreakerSettingWithOptions(
 401  		WithOpenStateWindow(time.Duration(rand.Intn(MaxAuthClientCircuitBreakerResetTimeout+1-MinAuthClientCircuitBreakerResetTimeout)+MinAuthClientCircuitBreakerResetTimeout)*time.Second),
 402  		WithName(AuthClientCircuitBreakerName),
 403  		WithFailureRateThreshold(AuthClientCircuitBreakerDefaultFailureThreshold),
 404  		WithMinimumRequests(AuthClientCircuitBreakerDefaultMinimumRequests),
 405  	)
 406  }
 407  
 408  // GlobalAuthClientCircuitBreakerSetting is global level circuit breaker setting for the Auth Client
 409  // than client level circuit breaker
 410  var GlobalAuthClientCircuitBreakerSetting *CircuitBreakerSetting = nil
 411