diff --git a/events/queue/processor.go b/events/queue/processor.go index cac2d12..5e83016 100644 --- a/events/queue/processor.go +++ b/events/queue/processor.go @@ -14,7 +14,6 @@ limitations under the License. package queue import ( - "errors" "sync" "sync/atomic" "time" @@ -22,9 +21,6 @@ import ( kclock "k8s.io/utils/clock" ) -// ErrProcessorStopped is returned when the processor is not running. -var ErrProcessorStopped = errors.New("processor is stopped") - // Processor manages the queue of items and processes them at the correct time. type Processor[K comparable, T queueable[K]] struct { executeFn func(r T) @@ -59,9 +55,9 @@ func (p *Processor[K, T]) WithClock(clock kclock.Clock) *Processor[K, T] { // Enqueue adds a new item to the queue. // If a item with the same ID already exists, it'll be replaced. -func (p *Processor[K, T]) Enqueue(r T) error { +func (p *Processor[K, T]) Enqueue(r T) { if p.stopped.Load() { - return ErrProcessorStopped + return } // Insert or replace the item in the queue @@ -74,14 +70,12 @@ func (p *Processor[K, T]) Enqueue(r T) error { isFirst = isFirst || (peek == r) // This is also going to be true if the item just added landed at the front of the queue p.process(isFirst) p.lock.Unlock() - - return nil } // Dequeue removes a item from the queue. -func (p *Processor[K, T]) Dequeue(key K) error { +func (p *Processor[K, T]) Dequeue(key K) { if p.stopped.Load() { - return ErrProcessorStopped + return } // We need to check if this is the next item in the queue, as that requires stopping the processor @@ -93,8 +87,6 @@ func (p *Processor[K, T]) Dequeue(key K) error { p.process(true) } p.lock.Unlock() - - return nil } // Close stops the processor. diff --git a/events/queue/processor_test.go b/events/queue/processor_test.go index e1b17a7..73e03c2 100644 --- a/events/queue/processor_test.go +++ b/events/queue/processor_test.go @@ -63,10 +63,9 @@ func TestProcessor(t *testing.T) { t.Run("enqueue items", func(t *testing.T) { for i := 1; i <= 5; i++ { - err := processor.Enqueue( + processor.Enqueue( newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))), ) - require.NoError(t, err) } // Advance tickers by 500ms to start @@ -83,8 +82,7 @@ func TestProcessor(t *testing.T) { t.Run("enqueue item to be executed right away", func(t *testing.T) { r := newTestItem(1, clock.Now()) - err := processor.Enqueue(r) - require.NoError(t, err) + processor.Enqueue(r) clock.Step(500 * time.Millisecond) @@ -95,10 +93,9 @@ func TestProcessor(t *testing.T) { t.Run("enqueue item at the front of the queue", func(t *testing.T) { // Enqueue 4 items for i := 1; i <= 4; i++ { - err := processor.Enqueue( + processor.Enqueue( newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))), ) - require.NoError(t, err) } assert.Eventually(t, clock.HasWaiters, time.Second, 100*time.Millisecond) @@ -111,10 +108,9 @@ func TestProcessor(t *testing.T) { assert.Equal(t, "1", received.Name) // Add a new item at the front of the queue - err := processor.Enqueue( + processor.Enqueue( newTestItem(99, clock.Now()), ) - require.NoError(t, err) // Advance tickers and assert messages are coming in order for i := 1; i <= 4; i++ { @@ -136,19 +132,16 @@ func TestProcessor(t *testing.T) { // Enqueue 5 items for i := 1; i <= 5; i++ { - err := processor.Enqueue( + processor.Enqueue( newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))), ) - require.NoError(t, err) } assert.Equal(t, 5, processor.queue.Len()) // Dequeue items 2 and 4 // Note that this is a string because it's the key - err := processor.Dequeue("2") - require.NoError(t, err) - err = processor.Dequeue("4") - require.NoError(t, err) + processor.Dequeue("2") + processor.Dequeue("4") assert.Equal(t, 3, processor.queue.Len()) @@ -173,10 +166,9 @@ func TestProcessor(t *testing.T) { t.Run("dequeue item from the front of the queue", func(t *testing.T) { // Enqueue 6 items for i := 1; i <= 6; i++ { - err := processor.Enqueue( + processor.Enqueue( newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))), ) - require.NoError(t, err) } // Advance tickers and assert messages are coming in order @@ -187,8 +179,7 @@ func TestProcessor(t *testing.T) { if i == 2 || i == 5 { // Dequeue the item at the front of the queue // Note that this is a string because it's the key - err := processor.Dequeue(strconv.Itoa(i)) - require.NoError(t, err) + processor.Dequeue(strconv.Itoa(i)) // Skip items that have been removed t.Logf("Should not receive signal %d", i) @@ -206,15 +197,13 @@ func TestProcessor(t *testing.T) { t.Run("replace item", func(t *testing.T) { // Enqueue 5 items for i := 1; i <= 5; i++ { - err := processor.Enqueue( + processor.Enqueue( newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))), ) - require.NoError(t, err) } // Replace item 4, bumping its priority down - err := processor.Enqueue(newTestItem(4, clock.Now().Add(6*time.Second))) - require.NoError(t, err) + processor.Enqueue(newTestItem(4, clock.Now().Add(6*time.Second))) // Advance tickers and assert messages are coming in order for i := 1; i <= 6; i++ { @@ -241,10 +230,9 @@ func TestProcessor(t *testing.T) { t.Run("replace item at the front of the queue", func(t *testing.T) { // Enqueue 5 items for i := 1; i <= 5; i++ { - err := processor.Enqueue( + processor.Enqueue( newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))), ) - require.NoError(t, err) } // Advance tickers and assert messages are coming in order @@ -253,8 +241,7 @@ func TestProcessor(t *testing.T) { if i == 2 { // Replace item 2, bumping its priority down, while it's at the front of the queue - err := processor.Enqueue(newTestItem(2, clock.Now().Add(5*time.Second))) - require.NoError(t, err) + processor.Enqueue(newTestItem(2, clock.Now().Add(5*time.Second))) // This item has been pushed down t.Logf("Should not receive signal %d now", i) @@ -287,8 +274,7 @@ func TestProcessor(t *testing.T) { go func(i int) { defer wg.Done() execTime := now.Add(time.Second * time.Duration(rand.Intn(maxDelay))) //nolint:gosec - err := processor.Enqueue(newTestItem(i, execTime)) - require.NoError(t, err) + processor.Enqueue(newTestItem(i, execTime)) }(i) } wg.Wait() @@ -332,10 +318,9 @@ func TestProcessor(t *testing.T) { t.Run("stop processor", func(t *testing.T) { // Enqueue 5 items for i := 1; i <= 5; i++ { - err := processor.Enqueue( + processor.Enqueue( newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))), ) - require.NoError(t, err) } assert.Eventually(t, clock.HasWaiters, time.Second, 100*time.Millisecond) @@ -348,10 +333,8 @@ func TestProcessor(t *testing.T) { assertNoExecutedItem(t) // Enqueuing and dequeueing should fail - err := processor.Enqueue(newTestItem(99, clock.Now())) - require.ErrorIs(t, err, ErrProcessorStopped) - err = processor.Dequeue("99") - require.ErrorIs(t, err, ErrProcessorStopped) + processor.Enqueue(newTestItem(99, clock.Now())) + processor.Dequeue("99") // Stopping again is a nop (should not crash) require.NoError(t, processor.Close())