diff --git a/circuitbreaker.go b/circuitbreaker.go index 620cd94..9716871 100644 --- a/circuitbreaker.go +++ b/circuitbreaker.go @@ -31,6 +31,7 @@ package circuit import ( + "container/ring" "context" "errors" "sync" @@ -75,6 +76,7 @@ const ( var ( defaultInitialBackOffInterval = 500 * time.Millisecond defaultBackoffMaxElapsedTime = 0 * time.Second + defaultErrorHistoryDepth = 10 ) // Error codes returned by Call @@ -115,15 +117,19 @@ type Breaker struct { eventReceivers []chan BreakerEvent listeners []chan ListenerEvent backoffLock sync.Mutex + + //ring buffer for last N errors + errorsBuffer *ring.Ring } // Options holds breaker configuration options. type Options struct { - BackOff backoff.BackOff - Clock clock.Clock - ShouldTrip TripFunc - WindowTime time.Duration - WindowBuckets int + BackOff backoff.BackOff + Clock clock.Clock + ShouldTrip TripFunc + WindowTime time.Duration + WindowBuckets int + ErrorHistoryDepth int } // NewBreakerWithOptions creates a base breaker with a specified backoff, clock and TripFunc @@ -153,12 +159,17 @@ func NewBreakerWithOptions(options *Options) *Breaker { options.WindowBuckets = DefaultWindowBuckets } + if options.ErrorHistoryDepth == 0 { + options.ErrorHistoryDepth = defaultErrorHistoryDepth + } + return &Breaker{ - BackOff: options.BackOff, - Clock: options.Clock, - ShouldTrip: options.ShouldTrip, - nextBackOff: options.BackOff.NextBackOff(), - counts: newWindow(options.WindowTime, options.WindowBuckets), + BackOff: options.BackOff, + Clock: options.Clock, + ShouldTrip: options.ShouldTrip, + nextBackOff: options.BackOff.NextBackOff(), + counts: newWindow(options.WindowTime, options.WindowBuckets), + errorsBuffer: ring.New(options.ErrorHistoryDepth), } } @@ -293,6 +304,31 @@ func (cb *Breaker) Fail() { } } +// FailWithError is the same as Fail, but keeps history of errors in internal ring buffer +func (cb *Breaker) FailWithError(err error) { + cb.errorsBuffer = cb.errorsBuffer.Next() + cb.errorsBuffer.Value = err + cb.Fail() +} + +// Error returns last error from internal buffer +func (cb *Breaker) Error() error { + if cb.errorsBuffer.Value == nil { + return nil + } + return cb.errorsBuffer.Value.(error) +} + +// Errors returns all errors from internal buffer +func (cb *Breaker) Errors() (errors []error) { + cb.errorsBuffer.Do(func(x interface{}) { + if x != nil { + errors = append(errors, x.(error)) + } + }) + return errors +} + // Success is used to indicate a success condition the Breaker should record. If // the success was triggered by a retry attempt, the breaker will be Reset(). func (cb *Breaker) Success() { @@ -302,7 +338,7 @@ func (cb *Breaker) Success() { cb.backoffLock.Unlock() state := cb.state() - if state == halfopen { + if state == halfopen || state == open { cb.Reset() } atomic.StoreInt64(&cb.consecFailures, 0) @@ -362,7 +398,7 @@ func (cb *Breaker) CallContext(ctx context.Context, circuit func() error, timeou if err != nil { if ctx.Err() != context.Canceled { - cb.Fail() + cb.FailWithError(err) } return err } diff --git a/circuitbreaker_test.go b/circuitbreaker_test.go index 4bf9305..40700fe 100644 --- a/circuitbreaker_test.go +++ b/circuitbreaker_test.go @@ -3,7 +3,7 @@ package circuit import ( "context" "fmt" - "sync/atomic" + "runtime" "testing" "time" @@ -253,6 +253,48 @@ func TestThresholdBreakerCalling(t *testing.T) { } } +func TestThresholdBreakerErrorHistory(t *testing.T) { + cb := NewThresholdBreaker(2) + err := fmt.Errorf("error 1") + cb.FailWithError(err) + if cb.Error() != err { + t.Fatal("expected last error to be `error 1`") + } + + cb = NewThresholdBreaker(1) + err = cb.Call(func() error { + return fmt.Errorf("circuit error") + }, 0) + if err == nil { + t.Fatal("expected threshold breaker to error") + } + if !cb.Tripped() { + t.Fatal("expected threshold breaker to be open") + } + if cb.Error().Error() != "circuit error" { + t.Fatalf("expected last error to be `circut error`, got %s", cb.Error()) + } + + cb.Success() + cb.Call(func() error { + return fmt.Errorf("circuit error 1") + }, 0) + if cb.Error().Error() != "circuit error 1" { + t.Fatalf("expected last error to be `circut error 1`, got %s", cb.Error()) + } + + errs := cb.Errors() + if len(errs) != 2 { + t.Fatalf("expected `%d` errors, got %d", 2, len(errs)) + } + if errs[0].Error() != "circuit error 1" { + t.Fatalf("expected `%s` error, got %s", "circuit error 1", errs[0].Error()) + } + if errs[1].Error() != "circuit error" { + t.Fatalf("expected `%s` error, got %s", "circuit error", errs[0].Error()) + } +} + func TestThresholdBreakerCallingContext(t *testing.T) { circuit := func() error { return fmt.Errorf("error") @@ -323,15 +365,10 @@ func TestThresholdBreakerResets(t *testing.T) { } func TestTimeoutBreaker(t *testing.T) { - wait := make(chan struct{}) - c := clock.NewMock() - called := int32(0) circuit := func() error { - wait <- struct{}{} - atomic.AddInt32(&called, 1) - <-wait + time.Sleep(100000000 * time.Millisecond) return nil } @@ -339,21 +376,29 @@ func TestTimeoutBreaker(t *testing.T) { cb.Clock = c errc := make(chan error) - go func() { errc <- cb.Call(circuit, time.Millisecond) }() - + wait := make(chan struct{}) + go func() { wait <- struct{}{}; errc <- cb.Call(circuit, time.Millisecond) }() <-wait - c.Add(time.Millisecond * 3) - wait <- struct{}{} + // yield and advance the clock + runtime.Gosched() + c.Add(time.Millisecond * 1000) err := <-errc - if err == nil { - t.Fatal("expected timeout breaker to return an error") + if err != ErrBreakerTimeout { + t.Fatalf("expected timeout breaker to return an error `%s`, got %s", ErrBreakerTimeout, err) } - go cb.Call(circuit, time.Millisecond) + cb.Clock = clock.NewMock() + go func() { wait <- struct{}{}; errc <- cb.Call(circuit, time.Millisecond) }() <-wait + // yield and advance the clock + runtime.Gosched() c.Add(time.Millisecond * 3) - wait <- struct{}{} + + err = <-errc + if err != ErrBreakerOpen { + t.Fatalf("expected timeout breaker to return an error `%s`, got %s", ErrBreakerOpen, err) + } if !cb.Tripped() { t.Fatal("expected timeout breaker to be open")