Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lockless window #39

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,15 @@ func NewBreakerWithOptions(options *Options) *Breaker {
options.WindowBuckets = DefaultWindowBuckets
}

win := newWindow(options.WindowTime, options.WindowBuckets, options.Clock)
win.Run()

return &Breaker{
BackOff: options.BackOff,
Clock: options.Clock,
ShouldTrip: options.ShouldTrip,
nextBackOff: options.BackOff.NextBackOff(),
counts: newWindow(options.WindowTime, options.WindowBuckets),
counts: win,
}
}

Expand Down
154 changes: 70 additions & 84 deletions window.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package circuit

import (
"container/ring"
"sync"
"sync/atomic"
"time"

"github.com/facebookgo/clock"
Expand All @@ -24,94 +23,113 @@ type bucket struct {

// Reset resets the counts to 0
func (b *bucket) Reset() {
b.failure = 0
b.success = 0
atomic.StoreInt64(&b.failure, 0)
atomic.StoreInt64(&b.success, 0)
}

// Fail increments the failure count
func (b *bucket) Fail() {
b.failure++
atomic.AddInt64(&b.failure, 1)
}

// Sucecss increments the success count
func (b *bucket) Success() {
b.success++
atomic.AddInt64(&b.success, 1)
}

func (b *bucket) Failures() int64 {
return atomic.LoadInt64(&b.failure)
}

func (b *bucket) Successes() int64 {
return atomic.LoadInt64(&b.success)
}

// window maintains a ring of buckets and increments the failure and success
// counts of the current bucket. Once a specified time has elapsed, it will
// advance to the next bucket, reseting its counts. This allows the keeping of
// rolling statistics on the counts.
type window struct {
buckets *ring.Ring
bucketTime time.Duration
bucketLock sync.RWMutex
lastAccess time.Time
bucketIdx int64
buckets []bucket
clock clock.Clock
stop chan struct{}
bucketTime time.Duration
isRunning bool
}

// newWindow creates a new window. windowTime is the time covering the entire
// window. windowBuckets is the number of buckets the window is divided into.
// An example: a 10 second window with 10 buckets will have 10 buckets covering
// 1 second each.
func newWindow(windowTime time.Duration, windowBuckets int) *window {
buckets := ring.New(windowBuckets)
for i := 0; i < buckets.Len(); i++ {
buckets.Value = &bucket{}
buckets = buckets.Next()
func newWindow(windowTime time.Duration, windowBuckets int, clock clock.Clock) *window {
return &window{
buckets: make([]bucket, windowBuckets),
bucketTime: time.Duration(windowTime.Nanoseconds() / int64(windowBuckets)),
clock: clock,
stop: make(chan struct{}),
}
}

clock := clock.New()
// Run starts the goroutine that increments the bucket index and sets up the
// next bucket.
func (w *window) Run() {
if w.isRunning {
return
}

bucketTime := time.Duration(windowTime.Nanoseconds() / int64(windowBuckets))
return &window{
buckets: buckets,
bucketTime: bucketTime,
clock: clock,
lastAccess: clock.Now(),
go func() {
ticker := w.clock.Ticker(w.bucketTime)
for {
select {
case <-ticker.C:
idx := atomic.LoadInt64(&w.bucketIdx)
idx = (idx + 1) % int64(len(w.buckets))
w.buckets[idx].Reset()
atomic.StoreInt64(&w.bucketIdx, idx)
case <-w.stop:
return
}
}
}()
}

// Stop stops the index incrementing goroutine.
func (w *window) Stop() {
if !w.isRunning {
return
}

w.stop <- struct{}{}
}

// Fail records a failure in the current bucket.
func (w *window) Fail() {
w.bucketLock.Lock()
b := w.getLatestBucket()
b.Fail()
w.bucketLock.Unlock()
idx := atomic.LoadInt64(&w.bucketIdx)
w.buckets[idx].Fail()
}

// Success records a success in the current bucket.
func (w *window) Success() {
w.bucketLock.Lock()
b := w.getLatestBucket()
b.Success()
w.bucketLock.Unlock()
idx := atomic.LoadInt64(&w.bucketIdx)
w.buckets[idx].Success()
}

// Failures returns the total number of failures recorded in all buckets.
func (w *window) Failures() int64 {
w.bucketLock.RLock()

var failures int64
w.buckets.Do(func(x interface{}) {
b := x.(*bucket)
failures += b.failure
})

w.bucketLock.RUnlock()
for i := 0; i < len(w.buckets); i++ {
failures += w.buckets[i].Failures()
}
return failures
}

// Successes returns the total number of successes recorded in all buckets.
func (w *window) Successes() int64 {
w.bucketLock.RLock()

var successes int64
w.buckets.Do(func(x interface{}) {
b := x.(*bucket)
successes += b.success
})
w.bucketLock.RUnlock()
for i := 0; i < len(w.buckets); i++ {
successes += w.buckets[i].Successes()
}
return successes
}

Expand All @@ -121,13 +139,11 @@ func (w *window) ErrorRate() float64 {
var total int64
var failures int64

w.bucketLock.RLock()
w.buckets.Do(func(x interface{}) {
b := x.(*bucket)
total += b.failure + b.success
failures += b.failure
})
w.bucketLock.RUnlock()
for i := 0; i < len(w.buckets); i++ {
b := &w.buckets[i]
total += b.Failures() + b.Successes()
failures += b.Failures()
}

if total == 0 {
return 0.0
Expand All @@ -138,37 +154,7 @@ func (w *window) ErrorRate() float64 {

// Reset resets the count of all buckets.
func (w *window) Reset() {
w.bucketLock.Lock()

w.buckets.Do(func(x interface{}) {
x.(*bucket).Reset()
})
w.bucketLock.Unlock()
}

// getLatestBucket returns the current bucket. If the bucket time has elapsed
// it will move to the next bucket, resetting its counts and updating the last
// access time before returning it. getLatestBucket assumes that the caller has
// locked the bucketLock
func (w *window) getLatestBucket() *bucket {
var b *bucket
b = w.buckets.Value.(*bucket)
elapsed := w.clock.Now().Sub(w.lastAccess)

if elapsed > w.bucketTime {
// Reset the buckets between now and number of buckets ago. If
// that is more that the existing buckets, reset all.
for i := 0; i < w.buckets.Len(); i++ {
w.buckets = w.buckets.Next()
b = w.buckets.Value.(*bucket)
b.Reset()
elapsed = time.Duration(int64(elapsed) - int64(w.bucketTime))
if elapsed < w.bucketTime {
// Done resetting buckets.
break
}
}
w.lastAccess = w.clock.Now()
for i := 0; i < len(w.buckets); i++ {
w.buckets[i].Reset()
}
return b
}
26 changes: 15 additions & 11 deletions window_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package circuit

import (
"runtime"
"testing"
"time"

"github.com/facebookgo/clock"
)

func TestWindowCounts(t *testing.T) {
w := newWindow(time.Millisecond*10, 2)
w := newWindow(time.Millisecond*10, 2, clock.NewMock())
w.Fail()
w.Fail()
w.Success()
Expand Down Expand Up @@ -38,35 +39,38 @@ func TestWindowCounts(t *testing.T) {
func TestWindowSlides(t *testing.T) {
c := clock.NewMock()

w := newWindow(time.Millisecond*10, 2)
w.clock = c
w.lastAccess = c.Now()
w := newWindow(time.Millisecond*10, 2, c)
w.Run()
runtime.Gosched()

w.Fail()
c.Add(time.Millisecond * 6)
c.Add(time.Millisecond * 5)
w.Fail()
w.Stop()

counts := 0
w.buckets.Do(func(x interface{}) {
b := x.(*bucket)
for _, b := range w.buckets {
if b.failure > 0 {
counts++
}
})
}

if counts != 2 {
t.Fatalf("expected 2 buckets to have failures, got %d", counts)
}

w.Run()
runtime.Gosched()
c.Add(time.Millisecond * 15)
w.Success()
w.Stop()

counts = 0
w.buckets.Do(func(x interface{}) {
b := x.(*bucket)
for _, b := range w.buckets {
if b.failure > 0 {
counts++
}
})
}

if counts != 0 {
t.Fatalf("expected 0 buckets to have failures, got %d", counts)
Expand Down