From 676a5d5706dec8ea2791f88f52e0dc1117fbca00 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Tue, 19 Mar 2024 12:00:06 +0800 Subject: [PATCH 1/5] feat: cond with timeout control --- lang/channel/cond.go | 99 +++++++++++++++++++++++++++++++++++++++ lang/channel/cond_test.go | 73 +++++++++++++++++++++++++++++ 2 files changed, 172 insertions(+) create mode 100644 lang/channel/cond.go create mode 100644 lang/channel/cond_test.go diff --git a/lang/channel/cond.go b/lang/channel/cond.go new file mode 100644 index 00000000..8888271c --- /dev/null +++ b/lang/channel/cond.go @@ -0,0 +1,99 @@ +package channel + +import ( + "context" + "sync/atomic" + "time" +) + +var ( + _ Cond = (*cond)(nil) +) + +type CondOption func(c *cond) + +func WithCondTimeout(timeout time.Duration) CondOption { + return func(c *cond) { + c.timeout = timeout + } +} + +type Cond interface { + Signal() bool + Broadcast() bool + Wait(ctx context.Context) bool +} + +func NewCond(opts ...CondOption) Cond { + return new(cond) +} + +type condSignal = chan struct{} + +type cond struct { + signal atomic.Value + timeout time.Duration +} + +func (c *cond) Signal() bool { + sv := c.signal.Load() + if sv == nil { + return false + } + signal := sv.(condSignal) + select { + case signal <- struct{}{}: + return true + default: + return false + } +} + +func (c *cond) Broadcast() bool { +BROADCAST: + sv := c.signal.Load() + if sv == nil { + return false + } + var signal condSignal = nil + if !c.signal.CompareAndSwap(sv, signal) { + goto BROADCAST + } + signal = sv.(condSignal) + select { + case <-signal: + return false + default: + close(signal) + return true + } +} + +func (c *cond) Wait(ctx context.Context) bool { +WAIT: + sv := c.signal.Load() + var signal condSignal + if sv == nil { + signal = make(condSignal) + if !c.signal.CompareAndSwap(nil, signal) { + goto WAIT + } + } else { + signal = sv.(condSignal) + } + if c.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, c.timeout) + defer cancel() + } + if ctx == nil || ctx.Done() == nil { + <-signal + return true + } + select { + case <-signal: + return true + case <-ctx.Done(): + return false + } +} diff --git a/lang/channel/cond_test.go b/lang/channel/cond_test.go new file mode 100644 index 00000000..fb950b20 --- /dev/null +++ b/lang/channel/cond_test.go @@ -0,0 +1,73 @@ +package channel + +import ( + "context" + "runtime" + "sync/atomic" + "testing" + "time" +) + +func TestCond(t *testing.T) { + cd := NewCond() + var finished int32 + emptyCtx := context.Background() + cancelCtx, cancelFunc := context.WithCancel(emptyCtx) + for i := 0; i < 10; i++ { + go func(i int) { + if i%2 == 0 { + cd.Wait(emptyCtx) + } else { + cd.Wait(cancelCtx) + } + atomic.AddInt32(&finished, 1) + }(i) + } + time.Sleep(time.Millisecond * 100) + cancelFunc() + for atomic.LoadInt32(&finished) != int32(5) { + runtime.Gosched() + } + cd.Signal() + for atomic.LoadInt32(&finished) != int32(6) { + runtime.Gosched() + } + cd.Signal() + for atomic.LoadInt32(&finished) != int32(7) { + runtime.Gosched() + } + cd.Broadcast() + cd.Signal() + for atomic.LoadInt32(&finished) != int32(10) { + runtime.Gosched() + } +} + +func BenchmarkChanCond(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + ch := make(chan struct{}) + go func() { + time.Sleep(time.Millisecond) + close(ch) + }() + select { + case <-ch: + case <-time.After(10 * time.Millisecond): + } + } +} + +func BenchmarkCond(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + cd := NewCond(WithCondTimeout(10 * time.Millisecond)) + go func() { + time.Sleep(time.Millisecond) + cd.Signal() + }() + cd.Wait(context.Background()) + } +} From 6250b410874e3f7de045c663ff3986b5b1ed3892 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Tue, 19 Mar 2024 19:34:13 +0800 Subject: [PATCH 2/5] fix: cond options --- lang/channel/cond.go | 6 +++++- lang/channel/cond_test.go | 12 ++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/lang/channel/cond.go b/lang/channel/cond.go index 8888271c..d9c0c196 100644 --- a/lang/channel/cond.go +++ b/lang/channel/cond.go @@ -25,7 +25,11 @@ type Cond interface { } func NewCond(opts ...CondOption) Cond { - return new(cond) + cd := new(cond) + for _, opt := range opts { + opt(cd) + } + return cd } type condSignal = chan struct{} diff --git a/lang/channel/cond_test.go b/lang/channel/cond_test.go index fb950b20..f750da41 100644 --- a/lang/channel/cond_test.go +++ b/lang/channel/cond_test.go @@ -43,6 +43,18 @@ func TestCond(t *testing.T) { } } +func TestCondTimeout(t *testing.T) { + cd := NewCond(WithCondTimeout(time.Millisecond * 200)) + go func() { + time.Sleep(time.Millisecond * 500) + cd.Broadcast() + }() + begin := time.Now() + cd.Wait(context.Background()) + cost := time.Since(begin) + t.Logf("cost=%dms", cost.Milliseconds()) +} + func BenchmarkChanCond(b *testing.B) { b.ReportAllocs() b.ResetTimer() From d28b329f9429c7c9eff54372fa117e6370821713 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Tue, 19 Mar 2024 19:59:56 +0800 Subject: [PATCH 3/5] feat: add signal --- lang/channel/singal.go | 63 +++++++++++++++++++++++++++++++++++++ lang/channel/singal_test.go | 47 +++++++++++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 lang/channel/singal.go create mode 100644 lang/channel/singal_test.go diff --git a/lang/channel/singal.go b/lang/channel/singal.go new file mode 100644 index 00000000..561c20f5 --- /dev/null +++ b/lang/channel/singal.go @@ -0,0 +1,63 @@ +package channel + +import ( + "context" + "time" +) + +var ( + _ Signal = (*sigal)(nil) +) + +type Signal interface { + Signal() + Wait(ctx context.Context) bool +} + +type SignalOption func(c *sigal) + +func WithSinalTimeout(timeout time.Duration) SignalOption { + return func(s *sigal) { + s.timeout = timeout + } +} + +func NewSignal(opts ...SignalOption) Signal { + sg := new(sigal) + for _, opt := range opts { + opt(sg) + } + sg.trigger = make(chan struct{}) + return sg +} + +type sigal struct { + trigger chan struct{} + timeout time.Duration +} + +func (s *sigal) Signal() { + select { + case <-s.trigger: + default: + close(s.trigger) + } +} + +func (s *sigal) Wait(ctx context.Context) bool { + if s.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, s.timeout) + defer cancel() + } + if ctx == nil || ctx.Done() == nil { + <-s.trigger + return true + } + select { + case <-s.trigger: + return true + case <-ctx.Done(): + return false + } +} diff --git a/lang/channel/singal_test.go b/lang/channel/singal_test.go new file mode 100644 index 00000000..1612bf38 --- /dev/null +++ b/lang/channel/singal_test.go @@ -0,0 +1,47 @@ +package channel + +import ( + "context" + "runtime" + "sync/atomic" + "testing" + "time" +) + +func TestSignal(t *testing.T) { + sg := NewSignal() + var finished int32 + emptyCtx := context.Background() + cancelCtx, cancelFunc := context.WithCancel(emptyCtx) + for i := 0; i < 10; i++ { + go func(i int) { + if i%2 == 0 { + sg.Wait(emptyCtx) + } else { + sg.Wait(cancelCtx) + } + atomic.AddInt32(&finished, 1) + }(i) + } + time.Sleep(time.Millisecond * 100) + cancelFunc() + for atomic.LoadInt32(&finished) != int32(5) { + runtime.Gosched() + } + sg.Signal() + for atomic.LoadInt32(&finished) != int32(10) { + runtime.Gosched() + } +} + +func TestSignalTimeout(t *testing.T) { + sg := NewSignal(WithSinalTimeout(time.Millisecond * 200)) + go func() { + time.Sleep(time.Millisecond * 500) + sg.Signal() + }() + begin := time.Now() + sg.Wait(context.Background()) + cost := time.Since(begin) + t.Logf("cost=%dms", cost.Milliseconds()) +} From 12e3909ba64de7c74e572030d0f2f7de7b646f51 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Wed, 20 Mar 2024 15:17:00 +0800 Subject: [PATCH 4/5] chore: fix typo signal --- lang/channel/singal.go | 2 +- lang/channel/singal_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lang/channel/singal.go b/lang/channel/singal.go index 561c20f5..4c539afa 100644 --- a/lang/channel/singal.go +++ b/lang/channel/singal.go @@ -16,7 +16,7 @@ type Signal interface { type SignalOption func(c *sigal) -func WithSinalTimeout(timeout time.Duration) SignalOption { +func WithSignalTimeout(timeout time.Duration) SignalOption { return func(s *sigal) { s.timeout = timeout } diff --git a/lang/channel/singal_test.go b/lang/channel/singal_test.go index 1612bf38..1892291c 100644 --- a/lang/channel/singal_test.go +++ b/lang/channel/singal_test.go @@ -35,7 +35,7 @@ func TestSignal(t *testing.T) { } func TestSignalTimeout(t *testing.T) { - sg := NewSignal(WithSinalTimeout(time.Millisecond * 200)) + sg := NewSignal(WithSignalTimeout(time.Millisecond * 200)) go func() { time.Sleep(time.Millisecond * 500) sg.Signal() From 82d24135f7c2a2f60d7d12bd93136b2982f539e3 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Wed, 20 Mar 2024 16:11:38 +0800 Subject: [PATCH 5/5] chore: fix typo signal --- lang/channel/singal.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lang/channel/singal.go b/lang/channel/singal.go index 4c539afa..67ef0d29 100644 --- a/lang/channel/singal.go +++ b/lang/channel/singal.go @@ -6,7 +6,7 @@ import ( ) var ( - _ Signal = (*sigal)(nil) + _ Signal = (*signal)(nil) ) type Signal interface { @@ -14,16 +14,16 @@ type Signal interface { Wait(ctx context.Context) bool } -type SignalOption func(c *sigal) +type SignalOption func(c *signal) func WithSignalTimeout(timeout time.Duration) SignalOption { - return func(s *sigal) { + return func(s *signal) { s.timeout = timeout } } func NewSignal(opts ...SignalOption) Signal { - sg := new(sigal) + sg := new(signal) for _, opt := range opts { opt(sg) } @@ -31,12 +31,12 @@ func NewSignal(opts ...SignalOption) Signal { return sg } -type sigal struct { +type signal struct { trigger chan struct{} timeout time.Duration } -func (s *sigal) Signal() { +func (s *signal) Signal() { select { case <-s.trigger: default: @@ -44,7 +44,7 @@ func (s *sigal) Signal() { } } -func (s *sigal) Wait(ctx context.Context) bool { +func (s *signal) Wait(ctx context.Context) bool { if s.timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, s.timeout)