Skip to content

Commit

Permalink
Fixes 5058: separate set cancel from set cancel_attempted (#908)
Browse files Browse the repository at this point in the history
  • Loading branch information
rverdile authored Nov 27, 2024
1 parent 700dffb commit f5622c2
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pkg/tasks/queue/pgqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ const (
UPDATE tasks
SET status = 'canceled', error = (left($2, 4000)), cancel_attempted = true
WHERE id = $1 AND finished_at IS NULL`
sqlSetCancelAttempted = `
UPDATE tasks
SET cancel_attempted = true
WHERE id = $1`
// sqlUpdatePayload
sqlUpdatePayload = `
UPDATE tasks
Expand Down Expand Up @@ -611,6 +615,13 @@ func (p *PgQueue) Cancel(ctx context.Context, taskId uuid.UUID) error {
return fmt.Errorf("error canceling task: %w", err)
}

// this query is separated because we must ensure the flag is set regardless of finished_at being null
// but we do not want to set the status to canceled if finished_at is not null
_, err = tx.Exec(ctx, sqlSetCancelAttempted, taskId)
if err != nil {
return fmt.Errorf("error setting cancel_attempted: %w", err)
}

dependents, err := p.taskDependents(context.Background(), tx.Conn(), taskId)
if err != nil {
return fmt.Errorf("error fetching task dependents: %w", err)
Expand Down
28 changes: 28 additions & 0 deletions pkg/tasks/queue/pgqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func (s *QueueSuite) TestCannotRequeueCanceledTasks() {
func (s *QueueSuite) TestCannotRequeueCanceledFailedTasks() {
config.Get().Tasking.RetryWaitUpperBound = 0

// Test when task fails right after cancellation
id, err := s.queue.Enqueue(&testTask)
require.NoError(s.T(), err)
assert.NotEqual(s.T(), uuid.Nil, id)
Expand All @@ -361,6 +362,33 @@ func (s *QueueSuite) TestCannotRequeueCanceledFailedTasks() {
assert.Equal(s.T(), config.TaskStatusFailed, info.Status)
assert.Equal(s.T(), true, info.CancelAttempted)
assert.True(s.T(), info.Queued.Equal(*originalQueueTime))

// Test when task fails right before cancellation
id, err = s.queue.Enqueue(&testTask)
require.NoError(s.T(), err)
assert.NotEqual(s.T(), uuid.Nil, id)

info, err = s.queue.Status(id)
require.NoError(s.T(), err)
originalQueueTime = info.Queued

_, err = s.queue.Dequeue(context.Background(), []string{testTaskType})
require.NoError(s.T(), err)

err = s.queue.Finish(id, fmt.Errorf("something went wrong"))
require.NoError(s.T(), err)

err = s.queue.Cancel(context.Background(), id)
require.NoError(s.T(), err)

err = s.queue.RequeueFailedTasks([]string{testTaskType})
assert.NoError(s.T(), err)

info, err = s.queue.Status(id)
require.NoError(s.T(), err)
assert.Equal(s.T(), config.TaskStatusFailed, info.Status)
assert.Equal(s.T(), true, info.CancelAttempted)
assert.True(s.T(), info.Queued.Equal(*originalQueueTime))
}

func (s *QueueSuite) TestRequeueFailedTasksExceedRetries() {
Expand Down

0 comments on commit f5622c2

Please sign in to comment.