From 5ea95294aeaa438518929f565d0b0ee42477d3dd Mon Sep 17 00:00:00 2001 From: liaotonglang Date: Tue, 25 Jan 2022 10:51:54 +0800 Subject: [PATCH 1/2] fix race on cb.lastFailureTime struct ConsecCircuitBreaker{} cannot pass the race test below: go test ./client -race -run TestCircuitBreakerRace$ Split compond type value lastFailureTime (time.Time) to second and nano, then we can use atomic. --- client/circuit_breaker.go | 22 ++++++++++++++++++---- client/circuit_breaker_test.go | 22 ++++++++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/client/circuit_breaker.go b/client/circuit_breaker.go index 11a30a98..56b61cc7 100644 --- a/client/circuit_breaker.go +++ b/client/circuit_breaker.go @@ -13,7 +13,10 @@ var ( // ConsecCircuitBreaker is window sliding CircuitBreaker with failure threshold. type ConsecCircuitBreaker struct { - lastFailureTime time.Time + // time.Time is a compund type, split into second and nano for using atomic. + lastFailureTimeSecond int64 + lastFailureTimeNano int32 + failures uint64 failureThreshold uint64 window time.Duration @@ -64,7 +67,8 @@ func (cb *ConsecCircuitBreaker) Call(fn func() error, d time.Duration) error { } func (cb *ConsecCircuitBreaker) ready() bool { - if time.Since(cb.lastFailureTime) > cb.window { + lastFailureTime := cb.loadLastFailureTime() + if time.Since(lastFailureTime) > cb.window { cb.reset() return true } @@ -78,7 +82,7 @@ func (cb *ConsecCircuitBreaker) success() { } func (cb *ConsecCircuitBreaker) fail() { atomic.AddUint64(&cb.failures, 1) - cb.lastFailureTime = time.Now() + cb.updateLastFailureTime(time.Now()) } func (cb *ConsecCircuitBreaker) Success() { @@ -94,5 +98,15 @@ func (cb *ConsecCircuitBreaker) Ready() bool { func (cb *ConsecCircuitBreaker) reset() { atomic.StoreUint64(&cb.failures, 0) - cb.lastFailureTime = time.Now() + cb.updateLastFailureTime(time.Now()) +} + +func (cb *ConsecCircuitBreaker) updateLastFailureTime(cur time.Time) { + atomic.StoreInt64(&cb.lastFailureTimeSecond, cur.Unix()) + atomic.StoreInt32(&cb.lastFailureTimeNano, int32(cur.UnixNano())) +} +func (cb *ConsecCircuitBreaker) loadLastFailureTime() time.Time { + nano := atomic.LoadInt32(&cb.lastFailureTimeNano) + second := atomic.LoadInt64(&cb.lastFailureTimeSecond) + return time.Unix(second, int64(nano)) } diff --git a/client/circuit_breaker_test.go b/client/circuit_breaker_test.go index 192cee87..6ab45584 100644 --- a/client/circuit_breaker_test.go +++ b/client/circuit_breaker_test.go @@ -2,6 +2,7 @@ package client import ( "errors" + "math/rand" "testing" "time" ) @@ -51,3 +52,24 @@ func TestConsecCircuitBreaker(t *testing.T) { } } + +func TestCircuitBreakerRace(t *testing.T) { + cb := NewConsecCircuitBreaker(2, 50*time.Millisecond) + routines := 100 + loop := 100000 + + fn := func() error { + if rand.Intn(2) == 1 { + return nil + } + return errors.New("test error") + } + + for r := 0; r < routines; r++ { + go func() { + for i := 0; i < loop; i++ { + cb.Call(fn, 100*time.Millisecond) + } + }() + } +} From d91707b04aed7c45a92bc9b7d893662fe10be132 Mon Sep 17 00:00:00 2001 From: liaotonglang Date: Tue, 25 Jan 2022 11:43:22 +0800 Subject: [PATCH 2/2] Fix connection leak when write failed but dial success. close the connection before return. --- client/connection.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/connection.go b/client/connection.go index fd4a3c5a..f3f2eb72 100644 --- a/client/connection.go +++ b/client/connection.go @@ -139,6 +139,10 @@ func newDirectHTTPConn(c *Client, network, address string) (net.Conn, error) { _, err = io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n") if err != nil { + // Dial() success but Write() failed here, close the successfully + // created conn before return. + conn.Close() + log.Errorf("failed to make CONNECT: %v", err) return nil, err }