Skip to content

Commit

Permalink
Change events/batcher to use events/queue as backend. (#82)
Browse files Browse the repository at this point in the history
* events/batcher: use events/queue as queue backend

Signed-off-by: joshvanl <[email protected]>

* Make events/queue/queue key type comparable

Signed-off-by: joshvanl <[email protected]>

* Explicitly define NewProcessor generic type in test

Signed-off-by: joshvanl <[email protected]>

---------

Signed-off-by: joshvanl <[email protected]>
  • Loading branch information
JoshVanL authored Jan 15, 2024
1 parent c24d1d2 commit 858719e
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 113 deletions.
94 changes: 44 additions & 50 deletions events/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,42 @@ import (
"time"

"k8s.io/utils/clock"
)

// key is the type of the comparable key used to batch events.
type key interface {
comparable
}
"github.com/dapr/kit/events/queue"
)

// Batcher is a one to many event batcher. It batches events and sends them to
// the added event channel subscribers. Events are sent to the channels after
// the interval has elapsed. If events with the same key are received within
// the interval, the timer is reset.
type Batcher[T key] struct {
type Batcher[T comparable] struct {
interval time.Duration
actives map[T]clock.Timer
eventChs []chan<- struct{}
queue *queue.Processor[T, *item[T]]

clock clock.WithDelayedExecution
clock clock.Clock
lock sync.Mutex
wg sync.WaitGroup
closeCh chan struct{}
closed atomic.Bool
}

// New creates a new Batcher with the given interval and key type.
func New[T key](interval time.Duration) *Batcher[T] {
return &Batcher[T]{
func New[T comparable](interval time.Duration) *Batcher[T] {
b := &Batcher[T]{
interval: interval,
actives: make(map[T]clock.Timer),
clock: clock.RealClock{},
closeCh: make(chan struct{}),
}

b.queue = queue.NewProcessor[T, *item[T]](b.execute)

return b
}

// WithClock sets the clock used by the batcher. Used for testing.
func (b *Batcher[T]) WithClock(clock clock.WithDelayedExecution) {
func (b *Batcher[T]) WithClock(clock clock.Clock) {
b.queue.WithClock(clock)
b.clock = clock
}

Expand All @@ -68,63 +69,56 @@ func (b *Batcher[T]) Subscribe(eventCh ...chan<- struct{}) {
b.eventChs = append(b.eventChs, eventCh...)
}

// Batch adds the given key to the batcher. If an event for this key is already
// active, the timer is reset. If the batcher is closed, the key is silently
// dropped.
func (b *Batcher[T]) Batch(key T) {
func (b *Batcher[T]) execute(_ *item[T]) {
b.lock.Lock()
defer b.lock.Unlock()

if b.closed.Load() {
return
}

if active, ok := b.actives[key]; ok {
if !active.Stop() {
<-active.C()
}
active.Reset(b.interval)
return
b.wg.Add(len(b.eventChs))
for _, eventCh := range b.eventChs {
go func(eventCh chan<- struct{}) {
defer b.wg.Done()
select {
case eventCh <- struct{}{}:
case <-b.closeCh:
}
}(eventCh)
}
}

b.actives[key] = b.clock.AfterFunc(b.interval, func() {
b.lock.Lock()
defer b.lock.Unlock()

b.wg.Add(len(b.eventChs))
delete(b.actives, key)
for _, eventCh := range b.eventChs {
go func(eventCh chan<- struct{}) {
defer b.wg.Done()
select {
case eventCh <- struct{}{}:
case <-b.closeCh:
}
}(eventCh)
}
// Batch adds the given key to the batcher. If an event for this key is already
// active, the timer is reset. If the batcher is closed, the key is silently
// dropped.
func (b *Batcher[T]) Batch(key T) {
b.queue.Enqueue(&item[T]{
key: key,
ttl: b.clock.Now().Add(b.interval),
})
}

// Close closes the batcher. It blocks until all events have been sent to the
// subscribers. The batcher will be a no-op after this call.
func (b *Batcher[T]) Close() {
defer b.wg.Wait()

// Lock to ensure that no new timers are created.
b.lock.Lock()
if b.closed.CompareAndSwap(false, true) {
close(b.closeCh)
}
actives := b.actives
b.lock.Unlock()
b.queue.Close()
}

for _, active := range actives {
if !active.Stop() {
<-active.C()
}
}
// item implements queue.queueable.
type item[T comparable] struct {
key T
ttl time.Time
}

b.lock.Lock()
b.actives = nil
b.lock.Unlock()
func (b *item[T]) Key() T {
return b.key
}

func (b *item[T]) ScheduledTime() time.Time {
return b.ttl
}
15 changes: 1 addition & 14 deletions events/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestBatch(t *testing.T) {

fakeClock := testingclock.NewFakeClock(time.Now())
b := New[string](time.Millisecond * 10)
b.clock = fakeClock
b.WithClock(fakeClock)
ch1 := make(chan struct{})
ch2 := make(chan struct{})
ch3 := make(chan struct{})
Expand All @@ -67,8 +67,6 @@ func TestBatch(t *testing.T) {
b.Batch("key3")
b.Batch("key3")

assert.Len(t, b.actives, 3)

assert.Eventually(t, func() bool {
return fakeClock.HasWaiters()
}, time.Second*5, time.Millisecond*100)
Expand Down Expand Up @@ -112,19 +110,8 @@ func TestClose(t *testing.T) {
b.Subscribe(ch)
assert.Len(t, b.eventChs, 1)
b.Batch("key1")
assert.Len(t, b.actives, 1)
b.Close()
assert.True(t, b.closed.Load())
assert.Empty(t, b.actives)
}

func TestBatchAfterClose(t *testing.T) {
t.Parallel()

b := New[string](time.Millisecond * 10)
b.Close()
b.Batch("key1")
assert.Empty(t, b.actives)
}

func TestSubscribeAfterClose(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion events/queue/eventqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func ExampleProcessor() {
}

// Create the processor
processor := NewProcessor[*queueableItem](executeFn)
processor := NewProcessor[string, *queueableItem](executeFn)

// Add items to the processor, in any order, using Enqueue
processor.Enqueue(&queueableItem{Name: "item1", ExecutionTime: time.Now().Add(500 * time.Millisecond)})
Expand Down
24 changes: 12 additions & 12 deletions events/queue/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
var ErrProcessorStopped = errors.New("processor is stopped")

// Processor manages the queue of items and processes them at the correct time.
type Processor[T queueable] struct {
type Processor[K comparable, T queueable[K]] struct {
executeFn func(r T)
queue queue[T]
queue queue[K, T]
clock kclock.Clock
lock sync.Mutex
wg sync.WaitGroup
Expand All @@ -40,10 +40,10 @@ type Processor[T queueable] struct {

// NewProcessor returns a new Processor object.
// executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine.
func NewProcessor[T queueable](executeFn func(r T)) *Processor[T] {
return &Processor[T]{
func NewProcessor[K comparable, T queueable[K]](executeFn func(r T)) *Processor[K, T] {
return &Processor[K, T]{
executeFn: executeFn,
queue: newQueue[T](),
queue: newQueue[K, T](),
processorRunningCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
resetCh: make(chan struct{}, 1),
Expand All @@ -52,14 +52,14 @@ func NewProcessor[T queueable](executeFn func(r T)) *Processor[T] {
}

// WithClock sets the clock used by the processor. Used for testing.
func (p *Processor[T]) WithClock(clock kclock.Clock) *Processor[T] {
func (p *Processor[K, T]) WithClock(clock kclock.Clock) *Processor[K, T] {
p.clock = clock
return p
}

// Enqueue adds a new item to the queue.
// If a item with the same ID already exists, it'll be replaced.
func (p *Processor[T]) Enqueue(r T) error {
func (p *Processor[K, T]) Enqueue(r T) error {
if p.stopped.Load() {
return ErrProcessorStopped
}
Expand All @@ -79,7 +79,7 @@ func (p *Processor[T]) Enqueue(r T) error {
}

// Dequeue removes a item from the queue.
func (p *Processor[T]) Dequeue(key string) error {
func (p *Processor[K, T]) Dequeue(key K) error {
if p.stopped.Load() {
return ErrProcessorStopped
}
Expand All @@ -99,7 +99,7 @@ func (p *Processor[T]) Dequeue(key string) error {

// Close stops the processor.
// This method blocks until the processor loop returns.
func (p *Processor[T]) Close() error {
func (p *Processor[K, T]) Close() error {
defer p.wg.Wait()
if p.stopped.CompareAndSwap(false, true) {
// Send a signal to stop
Expand All @@ -114,7 +114,7 @@ func (p *Processor[T]) Close() error {

// Start the processing loop if it's not already running.
// This must be invoked while the caller has a lock.
func (p *Processor[T]) process(isNext bool) {
func (p *Processor[K, T]) process(isNext bool) {
// Do not start a loop if it's already running
select {
case p.processorRunningCh <- struct{}{}:
Expand All @@ -140,7 +140,7 @@ func (p *Processor[T]) process(isNext bool) {
}

// Processing loop.
func (p *Processor[T]) processLoop() {
func (p *Processor[K, T]) processLoop() {
defer func() {
// Release the channel when exiting
<-p.processorRunningCh
Expand Down Expand Up @@ -209,7 +209,7 @@ func (p *Processor[T]) processLoop() {
}

// Executes a item when it's time.
func (p *Processor[T]) execute(r T) {
func (p *Processor[K, T]) execute(r T) {
// Pop the item now that we're ready to process it
// There's a small chance this is a different item than the one we peeked before
p.lock.Lock()
Expand Down
4 changes: 2 additions & 2 deletions events/queue/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestProcessor(t *testing.T) {
// Create the processor
clock := clocktesting.NewFakeClock(time.Now())
executeCh := make(chan *queueableItem)
processor := NewProcessor(func(r *queueableItem) {
processor := NewProcessor[string](func(r *queueableItem) {
executeCh <- r
})
processor.clock = clock
Expand Down Expand Up @@ -364,7 +364,7 @@ func TestClose(t *testing.T) {
// Create the processor
clock := clocktesting.NewFakeClock(time.Now())
executeCh := make(chan *queueableItem)
processor := NewProcessor(func(r *queueableItem) {
processor := NewProcessor[string](func(r *queueableItem) {
executeCh <- r
})
processor.clock = clock
Expand Down
Loading

0 comments on commit 858719e

Please sign in to comment.