diff --git a/circuitbreaker.go b/circuitbreaker.go index 9299a07..3f239c3 100644 --- a/circuitbreaker.go +++ b/circuitbreaker.go @@ -151,12 +151,15 @@ func NewBreakerWithOptions(options *Options) *Breaker { options.WindowBuckets = DefaultWindowBuckets } + win := newWindow(options.WindowTime, options.WindowBuckets, options.Clock) + win.Run() + return &Breaker{ BackOff: options.BackOff, Clock: options.Clock, ShouldTrip: options.ShouldTrip, nextBackOff: options.BackOff.NextBackOff(), - counts: newWindow(options.WindowTime, options.WindowBuckets), + counts: win, } } diff --git a/window.go b/window.go index ab83187..2f7b519 100644 --- a/window.go +++ b/window.go @@ -1,8 +1,7 @@ package circuit import ( - "container/ring" - "sync" + "sync/atomic" "time" "github.com/facebookgo/clock" @@ -24,18 +23,26 @@ type bucket struct { // Reset resets the counts to 0 func (b *bucket) Reset() { - b.failure = 0 - b.success = 0 + atomic.StoreInt64(&b.failure, 0) + atomic.StoreInt64(&b.success, 0) } // Fail increments the failure count func (b *bucket) Fail() { - b.failure++ + atomic.AddInt64(&b.failure, 1) } // Sucecss increments the success count func (b *bucket) Success() { - b.success++ + atomic.AddInt64(&b.success, 1) +} + +func (b *bucket) Failures() int64 { + return atomic.LoadInt64(&b.failure) +} + +func (b *bucket) Successes() int64 { + return atomic.LoadInt64(&b.success) } // window maintains a ring of buckets and increments the failure and success @@ -43,75 +50,86 @@ func (b *bucket) Success() { // advance to the next bucket, reseting its counts. This allows the keeping of // rolling statistics on the counts. type window struct { - buckets *ring.Ring - bucketTime time.Duration - bucketLock sync.RWMutex - lastAccess time.Time + bucketIdx int64 + buckets []bucket clock clock.Clock + stop chan struct{} + bucketTime time.Duration + isRunning bool } // newWindow creates a new window. windowTime is the time covering the entire // window. windowBuckets is the number of buckets the window is divided into. // An example: a 10 second window with 10 buckets will have 10 buckets covering // 1 second each. -func newWindow(windowTime time.Duration, windowBuckets int) *window { - buckets := ring.New(windowBuckets) - for i := 0; i < buckets.Len(); i++ { - buckets.Value = &bucket{} - buckets = buckets.Next() +func newWindow(windowTime time.Duration, windowBuckets int, clock clock.Clock) *window { + return &window{ + buckets: make([]bucket, windowBuckets), + bucketTime: time.Duration(windowTime.Nanoseconds() / int64(windowBuckets)), + clock: clock, + stop: make(chan struct{}), } +} - clock := clock.New() +// Run starts the goroutine that increments the bucket index and sets up the +// next bucket. +func (w *window) Run() { + if w.isRunning { + return + } - bucketTime := time.Duration(windowTime.Nanoseconds() / int64(windowBuckets)) - return &window{ - buckets: buckets, - bucketTime: bucketTime, - clock: clock, - lastAccess: clock.Now(), + go func() { + ticker := w.clock.Ticker(w.bucketTime) + for { + select { + case <-ticker.C: + idx := atomic.LoadInt64(&w.bucketIdx) + idx = (idx + 1) % int64(len(w.buckets)) + w.buckets[idx].Reset() + atomic.StoreInt64(&w.bucketIdx, idx) + case <-w.stop: + return + } + } + }() +} + +// Stop stops the index incrementing goroutine. +func (w *window) Stop() { + if !w.isRunning { + return } + + w.stop <- struct{}{} } // Fail records a failure in the current bucket. func (w *window) Fail() { - w.bucketLock.Lock() - b := w.getLatestBucket() - b.Fail() - w.bucketLock.Unlock() + idx := atomic.LoadInt64(&w.bucketIdx) + w.buckets[idx].Fail() } // Success records a success in the current bucket. func (w *window) Success() { - w.bucketLock.Lock() - b := w.getLatestBucket() - b.Success() - w.bucketLock.Unlock() + idx := atomic.LoadInt64(&w.bucketIdx) + w.buckets[idx].Success() } // Failures returns the total number of failures recorded in all buckets. func (w *window) Failures() int64 { - w.bucketLock.RLock() - var failures int64 - w.buckets.Do(func(x interface{}) { - b := x.(*bucket) - failures += b.failure - }) - - w.bucketLock.RUnlock() + for i := 0; i < len(w.buckets); i++ { + failures += w.buckets[i].Failures() + } return failures } // Successes returns the total number of successes recorded in all buckets. func (w *window) Successes() int64 { - w.bucketLock.RLock() - var successes int64 - w.buckets.Do(func(x interface{}) { - b := x.(*bucket) - successes += b.success - }) - w.bucketLock.RUnlock() + for i := 0; i < len(w.buckets); i++ { + successes += w.buckets[i].Successes() + } return successes } @@ -121,13 +139,11 @@ func (w *window) ErrorRate() float64 { var total int64 var failures int64 - w.bucketLock.RLock() - w.buckets.Do(func(x interface{}) { - b := x.(*bucket) - total += b.failure + b.success - failures += b.failure - }) - w.bucketLock.RUnlock() + for i := 0; i < len(w.buckets); i++ { + b := &w.buckets[i] + total += b.Failures() + b.Successes() + failures += b.Failures() + } if total == 0 { return 0.0 @@ -138,37 +154,7 @@ func (w *window) ErrorRate() float64 { // Reset resets the count of all buckets. func (w *window) Reset() { - w.bucketLock.Lock() - - w.buckets.Do(func(x interface{}) { - x.(*bucket).Reset() - }) - w.bucketLock.Unlock() -} - -// getLatestBucket returns the current bucket. If the bucket time has elapsed -// it will move to the next bucket, resetting its counts and updating the last -// access time before returning it. getLatestBucket assumes that the caller has -// locked the bucketLock -func (w *window) getLatestBucket() *bucket { - var b *bucket - b = w.buckets.Value.(*bucket) - elapsed := w.clock.Now().Sub(w.lastAccess) - - if elapsed > w.bucketTime { - // Reset the buckets between now and number of buckets ago. If - // that is more that the existing buckets, reset all. - for i := 0; i < w.buckets.Len(); i++ { - w.buckets = w.buckets.Next() - b = w.buckets.Value.(*bucket) - b.Reset() - elapsed = time.Duration(int64(elapsed) - int64(w.bucketTime)) - if elapsed < w.bucketTime { - // Done resetting buckets. - break - } - } - w.lastAccess = w.clock.Now() + for i := 0; i < len(w.buckets); i++ { + w.buckets[i].Reset() } - return b } diff --git a/window_test.go b/window_test.go index 7b60fbf..ef861cc 100644 --- a/window_test.go +++ b/window_test.go @@ -1,6 +1,7 @@ package circuit import ( + "runtime" "testing" "time" @@ -8,7 +9,7 @@ import ( ) func TestWindowCounts(t *testing.T) { - w := newWindow(time.Millisecond*10, 2) + w := newWindow(time.Millisecond*10, 2, clock.NewMock()) w.Fail() w.Fail() w.Success() @@ -38,35 +39,38 @@ func TestWindowCounts(t *testing.T) { func TestWindowSlides(t *testing.T) { c := clock.NewMock() - w := newWindow(time.Millisecond*10, 2) - w.clock = c - w.lastAccess = c.Now() + w := newWindow(time.Millisecond*10, 2, c) + w.Run() + runtime.Gosched() w.Fail() - c.Add(time.Millisecond * 6) + c.Add(time.Millisecond * 5) w.Fail() + w.Stop() counts := 0 - w.buckets.Do(func(x interface{}) { - b := x.(*bucket) + for _, b := range w.buckets { if b.failure > 0 { counts++ } - }) + } if counts != 2 { t.Fatalf("expected 2 buckets to have failures, got %d", counts) } + w.Run() + runtime.Gosched() c.Add(time.Millisecond * 15) w.Success() + w.Stop() + counts = 0 - w.buckets.Do(func(x interface{}) { - b := x.(*bucket) + for _, b := range w.buckets { if b.failure > 0 { counts++ } - }) + } if counts != 0 { t.Fatalf("expected 0 buckets to have failures, got %d", counts)