diff --git a/events/batcher/batcher.go b/events/batcher/batcher.go index e6f58e9..c1e19f6 100644 --- a/events/batcher/batcher.go +++ b/events/batcher/batcher.go @@ -19,23 +19,20 @@ import ( "time" "k8s.io/utils/clock" -) -// key is the type of the comparable key used to batch events. -type key interface { - comparable -} + "github.com/dapr/kit/events/queue" +) // Batcher is a one to many event batcher. It batches events and sends them to // the added event channel subscribers. Events are sent to the channels after // the interval has elapsed. If events with the same key are received within // the interval, the timer is reset. -type Batcher[T key] struct { +type Batcher[T comparable] struct { interval time.Duration - actives map[T]clock.Timer eventChs []chan<- struct{} + queue *queue.Processor[T, *item[T]] - clock clock.WithDelayedExecution + clock clock.Clock lock sync.Mutex wg sync.WaitGroup closeCh chan struct{} @@ -43,17 +40,21 @@ type Batcher[T key] struct { } // New creates a new Batcher with the given interval and key type. -func New[T key](interval time.Duration) *Batcher[T] { - return &Batcher[T]{ +func New[T comparable](interval time.Duration) *Batcher[T] { + b := &Batcher[T]{ interval: interval, - actives: make(map[T]clock.Timer), clock: clock.RealClock{}, closeCh: make(chan struct{}), } + + b.queue = queue.NewProcessor[T, *item[T]](b.execute) + + return b } // WithClock sets the clock used by the batcher. Used for testing. -func (b *Batcher[T]) WithClock(clock clock.WithDelayedExecution) { +func (b *Batcher[T]) WithClock(clock clock.Clock) { + b.queue.WithClock(clock) b.clock = clock } @@ -68,40 +69,31 @@ func (b *Batcher[T]) Subscribe(eventCh ...chan<- struct{}) { b.eventChs = append(b.eventChs, eventCh...) } -// Batch adds the given key to the batcher. If an event for this key is already -// active, the timer is reset. If the batcher is closed, the key is silently -// dropped. -func (b *Batcher[T]) Batch(key T) { +func (b *Batcher[T]) execute(_ *item[T]) { b.lock.Lock() defer b.lock.Unlock() - if b.closed.Load() { return } - - if active, ok := b.actives[key]; ok { - if !active.Stop() { - <-active.C() - } - active.Reset(b.interval) - return + b.wg.Add(len(b.eventChs)) + for _, eventCh := range b.eventChs { + go func(eventCh chan<- struct{}) { + defer b.wg.Done() + select { + case eventCh <- struct{}{}: + case <-b.closeCh: + } + }(eventCh) } +} - b.actives[key] = b.clock.AfterFunc(b.interval, func() { - b.lock.Lock() - defer b.lock.Unlock() - - b.wg.Add(len(b.eventChs)) - delete(b.actives, key) - for _, eventCh := range b.eventChs { - go func(eventCh chan<- struct{}) { - defer b.wg.Done() - select { - case eventCh <- struct{}{}: - case <-b.closeCh: - } - }(eventCh) - } +// Batch adds the given key to the batcher. If an event for this key is already +// active, the timer is reset. If the batcher is closed, the key is silently +// dropped. +func (b *Batcher[T]) Batch(key T) { + b.queue.Enqueue(&item[T]{ + key: key, + ttl: b.clock.Now().Add(b.interval), }) } @@ -109,22 +101,24 @@ func (b *Batcher[T]) Batch(key T) { // subscribers. The batcher will be a no-op after this call. func (b *Batcher[T]) Close() { defer b.wg.Wait() - - // Lock to ensure that no new timers are created. b.lock.Lock() if b.closed.CompareAndSwap(false, true) { close(b.closeCh) } - actives := b.actives b.lock.Unlock() + b.queue.Close() +} - for _, active := range actives { - if !active.Stop() { - <-active.C() - } - } +// item implements queue.queueable. +type item[T comparable] struct { + key T + ttl time.Time +} - b.lock.Lock() - b.actives = nil - b.lock.Unlock() +func (b *item[T]) Key() T { + return b.key +} + +func (b *item[T]) ScheduledTime() time.Time { + return b.ttl } diff --git a/events/batcher/batcher_test.go b/events/batcher/batcher_test.go index a62e856..9371513 100644 --- a/events/batcher/batcher_test.go +++ b/events/batcher/batcher_test.go @@ -51,7 +51,7 @@ func TestBatch(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) b := New[string](time.Millisecond * 10) - b.clock = fakeClock + b.WithClock(fakeClock) ch1 := make(chan struct{}) ch2 := make(chan struct{}) ch3 := make(chan struct{}) @@ -67,8 +67,6 @@ func TestBatch(t *testing.T) { b.Batch("key3") b.Batch("key3") - assert.Len(t, b.actives, 3) - assert.Eventually(t, func() bool { return fakeClock.HasWaiters() }, time.Second*5, time.Millisecond*100) @@ -112,19 +110,8 @@ func TestClose(t *testing.T) { b.Subscribe(ch) assert.Len(t, b.eventChs, 1) b.Batch("key1") - assert.Len(t, b.actives, 1) b.Close() assert.True(t, b.closed.Load()) - assert.Empty(t, b.actives) -} - -func TestBatchAfterClose(t *testing.T) { - t.Parallel() - - b := New[string](time.Millisecond * 10) - b.Close() - b.Batch("key1") - assert.Empty(t, b.actives) } func TestSubscribeAfterClose(t *testing.T) { diff --git a/events/queue/eventqueue_test.go b/events/queue/eventqueue_test.go index 9af8454..a072263 100644 --- a/events/queue/eventqueue_test.go +++ b/events/queue/eventqueue_test.go @@ -43,7 +43,7 @@ func ExampleProcessor() { } // Create the processor - processor := NewProcessor[*queueableItem](executeFn) + processor := NewProcessor[string, *queueableItem](executeFn) // Add items to the processor, in any order, using Enqueue processor.Enqueue(&queueableItem{Name: "item1", ExecutionTime: time.Now().Add(500 * time.Millisecond)}) diff --git a/events/queue/processor.go b/events/queue/processor.go index b7661b5..f11f3a0 100644 --- a/events/queue/processor.go +++ b/events/queue/processor.go @@ -26,9 +26,9 @@ import ( var ErrProcessorStopped = errors.New("processor is stopped") // Processor manages the queue of items and processes them at the correct time. -type Processor[T queueable] struct { +type Processor[K comparable, T queueable[K]] struct { executeFn func(r T) - queue queue[T] + queue queue[K, T] clock kclock.Clock lock sync.Mutex wg sync.WaitGroup @@ -40,10 +40,10 @@ type Processor[T queueable] struct { // NewProcessor returns a new Processor object. // executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine. -func NewProcessor[T queueable](executeFn func(r T)) *Processor[T] { - return &Processor[T]{ +func NewProcessor[K comparable, T queueable[K]](executeFn func(r T)) *Processor[K, T] { + return &Processor[K, T]{ executeFn: executeFn, - queue: newQueue[T](), + queue: newQueue[K, T](), processorRunningCh: make(chan struct{}, 1), stopCh: make(chan struct{}), resetCh: make(chan struct{}, 1), @@ -52,14 +52,14 @@ func NewProcessor[T queueable](executeFn func(r T)) *Processor[T] { } // WithClock sets the clock used by the processor. Used for testing. -func (p *Processor[T]) WithClock(clock kclock.Clock) *Processor[T] { +func (p *Processor[K, T]) WithClock(clock kclock.Clock) *Processor[K, T] { p.clock = clock return p } // Enqueue adds a new item to the queue. // If a item with the same ID already exists, it'll be replaced. -func (p *Processor[T]) Enqueue(r T) error { +func (p *Processor[K, T]) Enqueue(r T) error { if p.stopped.Load() { return ErrProcessorStopped } @@ -79,7 +79,7 @@ func (p *Processor[T]) Enqueue(r T) error { } // Dequeue removes a item from the queue. -func (p *Processor[T]) Dequeue(key string) error { +func (p *Processor[K, T]) Dequeue(key K) error { if p.stopped.Load() { return ErrProcessorStopped } @@ -99,7 +99,7 @@ func (p *Processor[T]) Dequeue(key string) error { // Close stops the processor. // This method blocks until the processor loop returns. -func (p *Processor[T]) Close() error { +func (p *Processor[K, T]) Close() error { defer p.wg.Wait() if p.stopped.CompareAndSwap(false, true) { // Send a signal to stop @@ -114,7 +114,7 @@ func (p *Processor[T]) Close() error { // Start the processing loop if it's not already running. // This must be invoked while the caller has a lock. -func (p *Processor[T]) process(isNext bool) { +func (p *Processor[K, T]) process(isNext bool) { // Do not start a loop if it's already running select { case p.processorRunningCh <- struct{}{}: @@ -140,7 +140,7 @@ func (p *Processor[T]) process(isNext bool) { } // Processing loop. -func (p *Processor[T]) processLoop() { +func (p *Processor[K, T]) processLoop() { defer func() { // Release the channel when exiting <-p.processorRunningCh @@ -209,7 +209,7 @@ func (p *Processor[T]) processLoop() { } // Executes a item when it's time. -func (p *Processor[T]) execute(r T) { +func (p *Processor[K, T]) execute(r T) { // Pop the item now that we're ready to process it // There's a small chance this is a different item than the one we peeked before p.lock.Lock() diff --git a/events/queue/processor_test.go b/events/queue/processor_test.go index 7cbedf1..e1b17a7 100644 --- a/events/queue/processor_test.go +++ b/events/queue/processor_test.go @@ -31,7 +31,7 @@ func TestProcessor(t *testing.T) { // Create the processor clock := clocktesting.NewFakeClock(time.Now()) executeCh := make(chan *queueableItem) - processor := NewProcessor(func(r *queueableItem) { + processor := NewProcessor[string](func(r *queueableItem) { executeCh <- r }) processor.clock = clock @@ -364,7 +364,7 @@ func TestClose(t *testing.T) { // Create the processor clock := clocktesting.NewFakeClock(time.Now()) executeCh := make(chan *queueableItem) - processor := NewProcessor(func(r *queueableItem) { + processor := NewProcessor[string](func(r *queueableItem) { executeCh <- r }) processor.clock = clock diff --git a/events/queue/queue.go b/events/queue/queue.go index 17ef2a8..07bd11b 100644 --- a/events/queue/queue.go +++ b/events/queue/queue.go @@ -19,9 +19,9 @@ import ( ) // queueable is the interface for items that can be added to the queue. -type queueable interface { +type queueable[T comparable] interface { comparable - Key() string + Key() T ScheduledTime() time.Time } @@ -29,27 +29,27 @@ type queueable interface { // It acts as a "priority queue", in which items are added in order of when they're scheduled. // Internally, it uses a heap (from container/heap) that allows Insert and Pop operations to be completed in O(log N) time (where N is the queue's length). // Note: methods in this struct are not safe for concurrent use. Callers should use locks to ensure consistency. -type queue[T queueable] struct { - heap *queueHeap[T] - items map[string]*queueItem[T] +type queue[K comparable, T queueable[K]] struct { + heap *queueHeap[K, T] + items map[K]*queueItem[K, T] } // newQueue creates a new queue. -func newQueue[T queueable]() queue[T] { - return queue[T]{ - heap: new(queueHeap[T]), - items: make(map[string]*queueItem[T]), +func newQueue[K comparable, T queueable[K]]() queue[K, T] { + return queue[K, T]{ + heap: new(queueHeap[K, T]), + items: make(map[K]*queueItem[K, T]), } } // Len returns the number of items in the queue. -func (p *queue[T]) Len() int { +func (p *queue[K, T]) Len() int { return p.heap.Len() } // Insert inserts a new item into the queue. // If replace is true, existing items are replaced -func (p *queue[T]) Insert(r T, replace bool) { +func (p *queue[K, T]) Insert(r T, replace bool) { key := r.Key() // Check if the item already exists @@ -62,7 +62,7 @@ func (p *queue[T]) Insert(r T, replace bool) { return } - item = &queueItem[T]{ + item = &queueItem[K, T]{ value: r, } heap.Push(p.heap, item) @@ -71,13 +71,13 @@ func (p *queue[T]) Insert(r T, replace bool) { // Pop removes the next item in the queue and returns it. // The returned boolean value will be "true" if an item was found. -func (p *queue[T]) Pop() (T, bool) { +func (p *queue[K, T]) Pop() (T, bool) { if p.Len() == 0 { var zero T return zero, false } - item, ok := heap.Pop(p.heap).(*queueItem[T]) + item, ok := heap.Pop(p.heap).(*queueItem[K, T]) if !ok || item == nil { var zero T return zero, false @@ -89,7 +89,7 @@ func (p *queue[T]) Pop() (T, bool) { // Peek returns the next item in the queue, without removing it. // The returned boolean value will be "true" if an item was found. -func (p *queue[T]) Peek() (T, bool) { +func (p *queue[K, T]) Peek() (T, bool) { if p.Len() == 0 { var zero T return zero, false @@ -99,7 +99,7 @@ func (p *queue[T]) Peek() (T, bool) { } // Remove an item from the queue. -func (p *queue[T]) Remove(key string) { +func (p *queue[K, T]) Remove(key K) { // If the item is not in the queue, this is a nop item, ok := p.items[key] if !ok { @@ -111,7 +111,7 @@ func (p *queue[T]) Remove(key string) { } // Update an item in the queue. -func (p *queue[T]) Update(r T) { +func (p *queue[K, T]) Update(r T) { // If the item is not in the queue, this is a nop item, ok := p.items[r.Key()] if !ok { @@ -122,37 +122,37 @@ func (p *queue[T]) Update(r T) { heap.Fix(p.heap, item.index) } -type queueItem[T queueable] struct { +type queueItem[K comparable, T queueable[K]] struct { value T // The index of the item in the heap. This is maintained by the heap.Interface methods. index int } -type queueHeap[T queueable] []*queueItem[T] +type queueHeap[K comparable, T queueable[K]] []*queueItem[K, T] -func (pq queueHeap[T]) Len() int { +func (pq queueHeap[K, T]) Len() int { return len(pq) } -func (pq queueHeap[T]) Less(i, j int) bool { +func (pq queueHeap[K, T]) Less(i, j int) bool { return pq[i].value.ScheduledTime().Before(pq[j].value.ScheduledTime()) } -func (pq queueHeap[T]) Swap(i, j int) { +func (pq queueHeap[K, T]) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i pq[j].index = j } -func (pq *queueHeap[T]) Push(x any) { +func (pq *queueHeap[K, T]) Push(x any) { n := len(*pq) - item := x.(*queueItem[T]) + item := x.(*queueItem[K, T]) item.index = n *pq = append(*pq, item) } -func (pq *queueHeap[T]) Pop() any { +func (pq *queueHeap[K, T]) Pop() any { old := *pq n := len(old) item := old[n-1] diff --git a/events/queue/queue_test.go b/events/queue/queue_test.go index 1c163fd..ddfd338 100644 --- a/events/queue/queue_test.go +++ b/events/queue/queue_test.go @@ -23,7 +23,7 @@ import ( ) func TestQueue(t *testing.T) { - queue := newQueue[*queueableItem]() + queue := newQueue[string, *queueableItem]() // Add 5 items, which are not in order queue.Insert(newTestItem(2, "2022-02-02T02:02:02Z"), false) @@ -56,7 +56,7 @@ func TestQueue(t *testing.T) { } func TestQueueSkipDuplicates(t *testing.T) { - queue := newQueue[*queueableItem]() + queue := newQueue[string, *queueableItem]() // Add 2 items queue.Insert(newTestItem(2, "2022-02-02T02:02:02Z"), false) @@ -78,7 +78,7 @@ func TestQueueSkipDuplicates(t *testing.T) { } func TestQueueReplaceDuplicates(t *testing.T) { - queue := newQueue[*queueableItem]() + queue := newQueue[string, *queueableItem]() // Add 2 items queue.Insert(newTestItem(2, "2022-02-02T02:02:02Z"), false) @@ -100,7 +100,7 @@ func TestQueueReplaceDuplicates(t *testing.T) { } func TestAddToQueue(t *testing.T) { - queue := newQueue[*queueableItem]() + queue := newQueue[string, *queueableItem]() // Add 5 items, which are not in order queue.Insert(newTestItem(2, "2022-02-02T02:02:02Z"), false) @@ -151,7 +151,7 @@ func TestAddToQueue(t *testing.T) { } func TestRemoveFromQueue(t *testing.T) { - queue := newQueue[*queueableItem]() + queue := newQueue[string, *queueableItem]() // Add 5 items, which are not in order queue.Insert(newTestItem(2, "2022-02-02T02:02:02Z"), false) @@ -193,7 +193,7 @@ func TestRemoveFromQueue(t *testing.T) { } func TestUpdateInQueue(t *testing.T) { - queue := newQueue[*queueableItem]() + queue := newQueue[string, *queueableItem]() // Add 5 items, which are not in order queue.Insert(newTestItem(2, "2022-02-02T02:02:02Z"), false) @@ -238,7 +238,7 @@ func TestUpdateInQueue(t *testing.T) { } func TestQueuePeek(t *testing.T) { - queue := newQueue[*queueableItem]() + queue := newQueue[string, *queueableItem]() // Peeking an empty queue returns false _, ok := queue.Peek() @@ -299,7 +299,7 @@ func newTestItem(n int, dueTime any) *queueableItem { return r } -func popAndCompare(t *testing.T, q *queue[*queueableItem], expectN int, expectDueTime string) { +func popAndCompare(t *testing.T, q *queue[string, *queueableItem], expectN int, expectDueTime string) { r, ok := q.Pop() require.True(t, ok) require.NotNil(t, r) @@ -307,7 +307,7 @@ func popAndCompare(t *testing.T, q *queue[*queueableItem], expectN int, expectDu assert.Equal(t, expectDueTime, r.ScheduledTime().Format(time.RFC3339)) } -func peekAndCompare(t *testing.T, q *queue[*queueableItem], expectN int, expectDueTime string) { +func peekAndCompare(t *testing.T, q *queue[string, *queueableItem], expectN int, expectDueTime string) { r, ok := q.Peek() require.True(t, ok) require.NotNil(t, r) diff --git a/fswatcher/fswatcher.go b/fswatcher/fswatcher.go index 749ae13..75d00d6 100644 --- a/fswatcher/fswatcher.go +++ b/fswatcher/fswatcher.go @@ -79,6 +79,7 @@ func (f *FSWatcher) Run(ctx context.Context, eventCh chan<- struct{}) error { if !f.running.CompareAndSwap(false, true) { return errors.New("watcher already running") } + defer f.batcher.Close() f.batcher.Subscribe(eventCh)