Skip to content

Commit

Permalink
remember last error and keep internal buffer of last N errors. fixed …
Browse files Browse the repository at this point in the history
…unreliable timeout test
  • Loading branch information
Kane committed Apr 11, 2017
1 parent 2074adb commit 90479fa
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 27 deletions.
60 changes: 48 additions & 12 deletions circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
package circuit

import (
"container/ring"
"context"
"errors"
"sync"
Expand Down Expand Up @@ -75,6 +76,7 @@ const (
var (
defaultInitialBackOffInterval = 500 * time.Millisecond
defaultBackoffMaxElapsedTime = 0 * time.Second
defaultErrorHistoryDepth = 10
)

// Error codes returned by Call
Expand Down Expand Up @@ -115,15 +117,19 @@ type Breaker struct {
eventReceivers []chan BreakerEvent
listeners []chan ListenerEvent
backoffLock sync.Mutex

//ring buffer for last N errors
errorsBuffer *ring.Ring
}

// Options holds breaker configuration options.
type Options struct {
BackOff backoff.BackOff
Clock clock.Clock
ShouldTrip TripFunc
WindowTime time.Duration
WindowBuckets int
BackOff backoff.BackOff
Clock clock.Clock
ShouldTrip TripFunc
WindowTime time.Duration
WindowBuckets int
ErrorHistoryDepth int
}

// NewBreakerWithOptions creates a base breaker with a specified backoff, clock and TripFunc
Expand Down Expand Up @@ -153,12 +159,17 @@ func NewBreakerWithOptions(options *Options) *Breaker {
options.WindowBuckets = DefaultWindowBuckets
}

if options.ErrorHistoryDepth == 0 {
options.ErrorHistoryDepth = defaultErrorHistoryDepth
}

return &Breaker{
BackOff: options.BackOff,
Clock: options.Clock,
ShouldTrip: options.ShouldTrip,
nextBackOff: options.BackOff.NextBackOff(),
counts: newWindow(options.WindowTime, options.WindowBuckets),
BackOff: options.BackOff,
Clock: options.Clock,
ShouldTrip: options.ShouldTrip,
nextBackOff: options.BackOff.NextBackOff(),
counts: newWindow(options.WindowTime, options.WindowBuckets),
errorsBuffer: ring.New(options.ErrorHistoryDepth),
}
}

Expand Down Expand Up @@ -293,6 +304,31 @@ func (cb *Breaker) Fail() {
}
}

// FailWithError is the same as Fail, but keeps history of errors in internal ring buffer
func (cb *Breaker) FailWithError(err error) {
cb.errorsBuffer = cb.errorsBuffer.Next()
cb.errorsBuffer.Value = err
cb.Fail()
}

// Error returns last error from internal buffer
func (cb *Breaker) Error() error {
if cb.errorsBuffer.Value == nil {
return nil
}
return cb.errorsBuffer.Value.(error)
}

// Errors returns all errors from internal buffer
func (cb *Breaker) Errors() (errors []error) {
cb.errorsBuffer.Do(func(x interface{}) {
if x != nil {
errors = append(errors, x.(error))
}
})
return errors
}

// Success is used to indicate a success condition the Breaker should record. If
// the success was triggered by a retry attempt, the breaker will be Reset().
func (cb *Breaker) Success() {
Expand All @@ -302,7 +338,7 @@ func (cb *Breaker) Success() {
cb.backoffLock.Unlock()

state := cb.state()
if state == halfopen {
if state == halfopen || state == open {
cb.Reset()
}
atomic.StoreInt64(&cb.consecFailures, 0)
Expand Down Expand Up @@ -362,7 +398,7 @@ func (cb *Breaker) CallContext(ctx context.Context, circuit func() error, timeou

if err != nil {
if ctx.Err() != context.Canceled {
cb.Fail()
cb.FailWithError(err)
}
return err
}
Expand Down
75 changes: 60 additions & 15 deletions circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package circuit
import (
"context"
"fmt"
"sync/atomic"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -253,6 +253,48 @@ func TestThresholdBreakerCalling(t *testing.T) {
}
}

func TestThresholdBreakerErrorHistory(t *testing.T) {
cb := NewThresholdBreaker(2)
err := fmt.Errorf("error 1")
cb.FailWithError(err)
if cb.Error() != err {
t.Fatal("expected last error to be `error 1`")
}

cb = NewThresholdBreaker(1)
err = cb.Call(func() error {
return fmt.Errorf("circuit error")
}, 0)
if err == nil {
t.Fatal("expected threshold breaker to error")
}
if !cb.Tripped() {
t.Fatal("expected threshold breaker to be open")
}
if cb.Error().Error() != "circuit error" {
t.Fatalf("expected last error to be `circut error`, got %s", cb.Error())
}

cb.Success()
cb.Call(func() error {
return fmt.Errorf("circuit error 1")
}, 0)
if cb.Error().Error() != "circuit error 1" {
t.Fatalf("expected last error to be `circut error 1`, got %s", cb.Error())
}

errs := cb.Errors()
if len(errs) != 2 {
t.Fatalf("expected `%d` errors, got %d", 2, len(errs))
}
if errs[0].Error() != "circuit error 1" {
t.Fatalf("expected `%s` error, got %s", "circuit error 1", errs[0].Error())
}
if errs[1].Error() != "circuit error" {
t.Fatalf("expected `%s` error, got %s", "circuit error", errs[0].Error())
}
}

func TestThresholdBreakerCallingContext(t *testing.T) {
circuit := func() error {
return fmt.Errorf("error")
Expand Down Expand Up @@ -323,37 +365,40 @@ func TestThresholdBreakerResets(t *testing.T) {
}

func TestTimeoutBreaker(t *testing.T) {
wait := make(chan struct{})

c := clock.NewMock()
called := int32(0)

circuit := func() error {
wait <- struct{}{}
atomic.AddInt32(&called, 1)
<-wait
time.Sleep(100000000 * time.Millisecond)
return nil
}

cb := NewThresholdBreaker(1)
cb.Clock = c

errc := make(chan error)
go func() { errc <- cb.Call(circuit, time.Millisecond) }()

wait := make(chan struct{})
go func() { wait <- struct{}{}; errc <- cb.Call(circuit, time.Millisecond) }()
<-wait
c.Add(time.Millisecond * 3)
wait <- struct{}{}
// yield and advance the clock
runtime.Gosched()
c.Add(time.Millisecond * 1000)

err := <-errc
if err == nil {
t.Fatal("expected timeout breaker to return an error")
if err != ErrBreakerTimeout {
t.Fatalf("expected timeout breaker to return an error `%s`, got %s", ErrBreakerTimeout, err)
}

go cb.Call(circuit, time.Millisecond)
cb.Clock = clock.NewMock()
go func() { wait <- struct{}{}; errc <- cb.Call(circuit, time.Millisecond) }()
<-wait
// yield and advance the clock
runtime.Gosched()
c.Add(time.Millisecond * 3)
wait <- struct{}{}

err = <-errc
if err != ErrBreakerOpen {
t.Fatalf("expected timeout breaker to return an error `%s`, got %s", ErrBreakerOpen, err)
}

if !cb.Tripped() {
t.Fatal("expected timeout breaker to be open")
Expand Down

0 comments on commit 90479fa

Please sign in to comment.