From fa869e2fd5dbea0d5ff3f8aee96d0a30cfa06aac Mon Sep 17 00:00:00 2001 From: joshvanl Date: Mon, 26 Feb 2024 12:53:10 +0000 Subject: [PATCH] Adds events/batch/singular Adds Singular Batcher which treat all events as the same event key. Subscribers provide a function when subscribing which will be executed once the batch triggers. Signed-off-by: joshvanl --- events/batcher/singuler.go | 68 +++++++++++++++++++++++ events/batcher/singuler_test.go | 95 +++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) create mode 100644 events/batcher/singuler.go create mode 100644 events/batcher/singuler_test.go diff --git a/events/batcher/singuler.go b/events/batcher/singuler.go new file mode 100644 index 0000000..14ee842 --- /dev/null +++ b/events/batcher/singuler.go @@ -0,0 +1,68 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package batcher + +import ( + "sync" + "sync/atomic" + "time" +) + +// Singular is a Batcher which batches events and treats them as all the same +// key, behaving like a singular batched queue. +// Subscribe +type Singular struct { + b *Batcher[int] + closeCh chan struct{} + closed atomic.Bool + wg sync.WaitGroup +} + +func NewSingular(interval time.Duration) *Singular { + return &Singular{ + b: New[int](interval), + closeCh: make(chan struct{}), + } +} + +func (s *Singular) Subscribe(fn func()) { + ch := make(chan struct{}) + s.b.Subscribe(ch) + + s.wg.Add(1) + go func() { + defer s.wg.Done() + for { + select { + case <-ch: + case <-s.closeCh: + return + } + + fn() + } + }() +} + +func (s *Singular) Batch() { + s.b.Batch(0) +} + +func (s *Singular) Close() { + defer s.wg.Wait() + if s.closed.CompareAndSwap(false, true) { + close(s.closeCh) + } + s.b.Close() +} diff --git a/events/batcher/singuler_test.go b/events/batcher/singuler_test.go new file mode 100644 index 0000000..8786f3e --- /dev/null +++ b/events/batcher/singuler_test.go @@ -0,0 +1,95 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package batcher + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + testingclock "k8s.io/utils/clock/testing" +) + +func TestSingular(t *testing.T) { + t.Parallel() + + fakeClock := testingclock.NewFakeClock(time.Now()) + s := NewSingular(time.Millisecond * 10) + s.b.WithClock(fakeClock) + + fn := func(i *atomic.Int64) func() { + return func() { + i.Add(1) + } + } + + var f1, f2, f3 atomic.Int64 + s.Subscribe(fn(&f1)) + s.Subscribe(fn(&f2)) + s.Subscribe(fn(&f3)) + + s.Batch() + s.Batch() + s.Batch() + s.Batch() + s.Batch() + s.Batch() + s.Batch() + s.Batch() + + assert.Eventually(t, fakeClock.HasWaiters, time.Second*5, time.Millisecond*100) + + assert.Equal(t, int64(0), f1.Load()) + assert.Equal(t, int64(0), f2.Load()) + assert.Equal(t, int64(0), f3.Load()) + + fakeClock.Step(time.Millisecond * 5) + + assert.Equal(t, int64(0), f1.Load()) + assert.Equal(t, int64(0), f2.Load()) + assert.Equal(t, int64(0), f3.Load()) + + fakeClock.Step(time.Millisecond * 5) + + assert.Eventually(t, func() bool { + return f1.Load() == 1 && f2.Load() == 1 && f3.Load() == 1 + }, time.Second*5, time.Millisecond*100) + + s.Batch() + s.Batch() + s.Batch() + + fakeClock.Step(time.Millisecond * 15) + + assert.Eventually(t, func() bool { + return f1.Load() == 2 && f2.Load() == 2 && f3.Load() == 2 + }, time.Second*5, time.Millisecond*100) + + s.Close() + + s.Batch() + s.Batch() + s.Batch() + + fakeClock.Step(time.Millisecond * 10) + + assert.Eventually(t, func() bool { + return !fakeClock.HasWaiters() + }, time.Second*5, time.Millisecond*100) + + assert.Equal(t, int64(2), f1.Load()) + assert.Equal(t, int64(2), f2.Load()) + assert.Equal(t, int64(2), f3.Load()) +}