1 // Package gobreaker implements the Circuit Breaker pattern.
2 // See https://msdn.microsoft.com/en-us/library/dn589784.aspx.
3 package gobreaker
4 5 import (
6 "errors"
7 "fmt"
8 "sync"
9 "time"
10 )
11 12 // State is a type that represents a state of CircuitBreaker.
13 type State int
14 15 // These constants are states of CircuitBreaker.
16 const (
17 StateClosed State = iota
18 StateHalfOpen
19 StateOpen
20 )
21 22 var (
23 // ErrTooManyRequests is returned when the CB state is half open and the requests count is over the cb maxRequests
24 ErrTooManyRequests = errors.New("too many requests")
25 // ErrOpenState is returned when the CB state is open
26 ErrOpenState = errors.New("circuit breaker is open")
27 )
28 29 // String implements stringer interface.
30 func (s State) String() string {
31 switch s {
32 case StateClosed:
33 return "closed"
34 case StateHalfOpen:
35 return "half-open"
36 case StateOpen:
37 return "open"
38 default:
39 return fmt.Sprintf("unknown state: %d", s)
40 }
41 }
42 43 // Counts holds the numbers of requests and their successes/failures.
44 // CircuitBreaker clears the internal Counts either
45 // on the change of the state or at the closed-state intervals.
46 // Counts ignores the results of the requests sent before clearing.
47 type Counts struct {
48 Requests uint32
49 TotalSuccesses uint32
50 TotalFailures uint32
51 ConsecutiveSuccesses uint32
52 ConsecutiveFailures uint32
53 }
54 55 func (c *Counts) onRequest() {
56 c.Requests++
57 }
58 59 func (c *Counts) onSuccess() {
60 c.TotalSuccesses++
61 c.ConsecutiveSuccesses++
62 c.ConsecutiveFailures = 0
63 }
64 65 func (c *Counts) onFailure() {
66 c.TotalFailures++
67 c.ConsecutiveFailures++
68 c.ConsecutiveSuccesses = 0
69 }
70 71 func (c *Counts) clear() {
72 c.Requests = 0
73 c.TotalSuccesses = 0
74 c.TotalFailures = 0
75 c.ConsecutiveSuccesses = 0
76 c.ConsecutiveFailures = 0
77 }
78 79 // Settings configures CircuitBreaker:
80 //
81 // Name is the name of the CircuitBreaker.
82 //
83 // MaxRequests is the maximum number of requests allowed to pass through
84 // when the CircuitBreaker is half-open.
85 // If MaxRequests is 0, the CircuitBreaker allows only 1 request.
86 //
87 // Interval is the cyclic period of the closed state
88 // for the CircuitBreaker to clear the internal Counts.
89 // If Interval is less than or equal to 0, the CircuitBreaker doesn't clear internal Counts during the closed state.
90 //
91 // Timeout is the period of the open state,
92 // after which the state of the CircuitBreaker becomes half-open.
93 // If Timeout is less than or equal to 0, the timeout value of the CircuitBreaker is set to 60 seconds.
94 //
95 // ReadyToTrip is called with a copy of Counts whenever a request fails in the closed state.
96 // If ReadyToTrip returns true, the CircuitBreaker will be placed into the open state.
97 // If ReadyToTrip is nil, default ReadyToTrip is used.
98 // Default ReadyToTrip returns true when the number of consecutive failures is more than 5.
99 //
100 // OnStateChange is called whenever the state of the CircuitBreaker changes.
101 //
102 // IsSuccessful is called with the error returned from a request.
103 // If IsSuccessful returns true, the error is counted as a success.
104 // Otherwise the error is counted as a failure.
105 // If IsSuccessful is nil, default IsSuccessful is used, which returns false for all non-nil errors.
106 type Settings struct {
107 Name string
108 MaxRequests uint32
109 Interval time.Duration
110 Timeout time.Duration
111 ReadyToTrip func(counts Counts) bool
112 OnStateChange func(name string, from State, to State)
113 IsSuccessful func(err error) bool
114 }
115 116 // CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
117 type CircuitBreaker struct {
118 name string
119 maxRequests uint32
120 interval time.Duration
121 timeout time.Duration
122 readyToTrip func(counts Counts) bool
123 isSuccessful func(err error) bool
124 onStateChange func(name string, from State, to State)
125 126 mutex sync.Mutex
127 state State
128 generation uint64
129 counts Counts
130 expiry time.Time
131 }
132 133 // TwoStepCircuitBreaker is like CircuitBreaker but instead of surrounding a function
134 // with the breaker functionality, it only checks whether a request can proceed and
135 // expects the caller to report the outcome in a separate step using a callback.
136 type TwoStepCircuitBreaker struct {
137 cb *CircuitBreaker
138 }
139 140 // NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings.
141 func NewCircuitBreaker(st Settings) *CircuitBreaker {
142 cb := new(CircuitBreaker)
143 144 cb.name = st.Name
145 cb.onStateChange = st.OnStateChange
146 147 if st.MaxRequests == 0 {
148 cb.maxRequests = 1
149 } else {
150 cb.maxRequests = st.MaxRequests
151 }
152 153 if st.Interval <= 0 {
154 cb.interval = defaultInterval
155 } else {
156 cb.interval = st.Interval
157 }
158 159 if st.Timeout <= 0 {
160 cb.timeout = defaultTimeout
161 } else {
162 cb.timeout = st.Timeout
163 }
164 165 if st.ReadyToTrip == nil {
166 cb.readyToTrip = defaultReadyToTrip
167 } else {
168 cb.readyToTrip = st.ReadyToTrip
169 }
170 171 if st.IsSuccessful == nil {
172 cb.isSuccessful = defaultIsSuccessful
173 } else {
174 cb.isSuccessful = st.IsSuccessful
175 }
176 177 cb.toNewGeneration(time.Now())
178 179 return cb
180 }
181 182 // NewTwoStepCircuitBreaker returns a new TwoStepCircuitBreaker configured with the given Settings.
183 func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker {
184 return &TwoStepCircuitBreaker{
185 cb: NewCircuitBreaker(st),
186 }
187 }
188 189 const defaultInterval = time.Duration(0) * time.Second
190 const defaultTimeout = time.Duration(60) * time.Second
191 192 func defaultReadyToTrip(counts Counts) bool {
193 return counts.ConsecutiveFailures > 5
194 }
195 196 func defaultIsSuccessful(err error) bool {
197 return err == nil
198 }
199 200 // Name returns the name of the CircuitBreaker.
201 func (cb *CircuitBreaker) Name() string {
202 return cb.name
203 }
204 205 // State returns the current state of the CircuitBreaker.
206 func (cb *CircuitBreaker) State() State {
207 cb.mutex.Lock()
208 defer cb.mutex.Unlock()
209 210 now := time.Now()
211 state, _ := cb.currentState(now)
212 return state
213 }
214 215 // Counts returns internal counters
216 func (cb *CircuitBreaker) Counts() Counts {
217 cb.mutex.Lock()
218 defer cb.mutex.Unlock()
219 220 return cb.counts
221 }
222 223 // Execute runs the given request if the CircuitBreaker accepts it.
224 // Execute returns an error instantly if the CircuitBreaker rejects the request.
225 // Otherwise, Execute returns the result of the request.
226 // If a panic occurs in the request, the CircuitBreaker handles it as an error
227 // and causes the same panic again.
228 func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
229 generation, err := cb.beforeRequest()
230 if err != nil {
231 return nil, err
232 }
233 234 defer func() {
235 e := recover()
236 if e != nil {
237 cb.afterRequest(generation, false)
238 panic(e)
239 }
240 }()
241 242 result, err := req()
243 cb.afterRequest(generation, cb.isSuccessful(err))
244 return result, err
245 }
246 247 // Name returns the name of the TwoStepCircuitBreaker.
248 func (tscb *TwoStepCircuitBreaker) Name() string {
249 return tscb.cb.Name()
250 }
251 252 // State returns the current state of the TwoStepCircuitBreaker.
253 func (tscb *TwoStepCircuitBreaker) State() State {
254 return tscb.cb.State()
255 }
256 257 // Counts returns internal counters
258 func (tscb *TwoStepCircuitBreaker) Counts() Counts {
259 return tscb.cb.Counts()
260 }
261 262 // Allow checks if a new request can proceed. It returns a callback that should be used to
263 // register the success or failure in a separate step. If the circuit breaker doesn't allow
264 // requests, it returns an error.
265 func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error) {
266 generation, err := tscb.cb.beforeRequest()
267 if err != nil {
268 return nil, err
269 }
270 271 return func(success bool) {
272 tscb.cb.afterRequest(generation, success)
273 }, nil
274 }
275 276 func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
277 cb.mutex.Lock()
278 defer cb.mutex.Unlock()
279 280 now := time.Now()
281 state, generation := cb.currentState(now)
282 283 if state == StateOpen {
284 return generation, ErrOpenState
285 } else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
286 return generation, ErrTooManyRequests
287 }
288 289 cb.counts.onRequest()
290 return generation, nil
291 }
292 293 func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
294 cb.mutex.Lock()
295 defer cb.mutex.Unlock()
296 297 now := time.Now()
298 state, generation := cb.currentState(now)
299 if generation != before {
300 return
301 }
302 303 if success {
304 cb.onSuccess(state, now)
305 } else {
306 cb.onFailure(state, now)
307 }
308 }
309 310 func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
311 switch state {
312 case StateClosed:
313 cb.counts.onSuccess()
314 case StateHalfOpen:
315 cb.counts.onSuccess()
316 if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
317 cb.setState(StateClosed, now)
318 }
319 }
320 }
321 322 func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
323 switch state {
324 case StateClosed:
325 cb.counts.onFailure()
326 if cb.readyToTrip(cb.counts) {
327 cb.setState(StateOpen, now)
328 }
329 case StateHalfOpen:
330 cb.setState(StateOpen, now)
331 }
332 }
333 334 func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
335 switch cb.state {
336 case StateClosed:
337 if !cb.expiry.IsZero() && cb.expiry.Before(now) {
338 cb.toNewGeneration(now)
339 }
340 case StateOpen:
341 if cb.expiry.Before(now) {
342 cb.setState(StateHalfOpen, now)
343 }
344 }
345 return cb.state, cb.generation
346 }
347 348 func (cb *CircuitBreaker) setState(state State, now time.Time) {
349 if cb.state == state {
350 return
351 }
352 353 prev := cb.state
354 cb.state = state
355 356 cb.toNewGeneration(now)
357 358 if cb.onStateChange != nil {
359 cb.onStateChange(cb.name, prev, state)
360 }
361 }
362 363 func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
364 cb.generation++
365 cb.counts.clear()
366 367 var zero time.Time
368 switch cb.state {
369 case StateClosed:
370 if cb.interval == 0 {
371 cb.expiry = zero
372 } else {
373 cb.expiry = now.Add(cb.interval)
374 }
375 case StateOpen:
376 cb.expiry = now.Add(cb.timeout)
377 default: // StateHalfOpen
378 cb.expiry = zero
379 }
380 }
381