Skip to content

Commit

Permalink
Clear workflowExecutionIsCancelling on new WFT
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Jul 26, 2023
1 parent 57d29ee commit 64846ba
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 20 deletions.
2 changes: 2 additions & 0 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,8 @@ func (h *commandsHelper) setCurrentWorkflowTaskStartedEventID(workflowTaskStarte
// We must change the counter here so that others who mutate
// commandsCancelledDuringWFCancellation know it has since been reset
h.nextCommandEventIDResetCounter++
// Once we have handled the cancellation, we can reset the flag
h.workflowExecutionIsCancelling = false
}

func (h *commandsHelper) getNextID() int64 {
Expand Down
17 changes: 17 additions & 0 deletions test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,23 @@ func LocalSleep(_ context.Context, delay time.Duration) error {
return nil
}

func (a *Activities) ActivityToBeCanceled(ctx context.Context) (string, error) {
a.append("ActivityToBeCanceled")
for {
select {
case <-time.After(1 * time.Second):
activity.RecordHeartbeat(ctx, "")
case <-ctx.Done():
return "I am canceled by Done", nil
}
}
}

func (a *Activities) EmptyActivity(ctx context.Context) error {
a.append("EmptyActivity")
return nil
}

func (a *Activities) HeartbeatAndSleep(ctx context.Context, seq int, delay time.Duration) (int, error) {
a.append("heartbeatAndSleep")
activity.GetLogger(ctx).Info("Running HeartbeatAndSleep activity")
Expand Down
52 changes: 32 additions & 20 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,18 @@ func (ts *IntegrationTestSuite) TestMutatingUpdateValidator() {
ts.Nil(ts.client.CancelWorkflow(ctx, "test-mutating-update-validator", ""))
}

func (ts *IntegrationTestSuite) TestWaitForCancelWithDisconnectedContext() {
ctx := context.Background()
run, err := ts.client.ExecuteWorkflow(ctx,
ts.startWorkflowOptions("test-wait-for-cancel-with-disconnected-contex"), ts.workflows.WaitForCancelWithDisconnectedContextWorkflow)
ts.Nil(err)

ts.waitForQueryTrue(run, "timer-created", 1)

ts.Nil(ts.client.CancelWorkflow(ctx, run.GetID(), run.GetRunID()))
ts.Nil(run.Get(ctx, nil))
}

func (ts *IntegrationTestSuite) TestMutatingSideEffect() {
ctx := context.Background()
err := ts.executeWorkflowWithContextAndOption(ctx, ts.startWorkflowOptions("test-mutating-side-effect"), ts.workflows.MutatingSideEffectWorkflow, nil)
Expand Down Expand Up @@ -1871,30 +1883,30 @@ func (ts *IntegrationTestSuite) TestAdvancedPostCancellation() {
}

// Check just activity and timer
// assertPostCancellation(&AdvancedPostCancellationInput{
// PreCancelActivity: true,
// PostCancelActivity: true,
// })
// assertPostCancellation(&AdvancedPostCancellationInput{
// PreCancelTimer: true,
// PostCancelTimer: true,
// })
// // Check mixed
// assertPostCancellation(&AdvancedPostCancellationInput{
// PreCancelActivity: true,
// PostCancelTimer: true,
// })
assertPostCancellation(&AdvancedPostCancellationInput{
PreCancelActivity: true,
PostCancelActivity: true,
})
assertPostCancellation(&AdvancedPostCancellationInput{
PreCancelTimer: true,
PostCancelTimer: true,
})
// Check mixed
assertPostCancellation(&AdvancedPostCancellationInput{
PreCancelActivity: true,
PostCancelTimer: true,
})
assertPostCancellation(&AdvancedPostCancellationInput{
PreCancelTimer: true,
PostCancelActivity: true,
})
// Check all
assertPostCancellation(&AdvancedPostCancellationInput{
PreCancelActivity: true,
PreCancelTimer: true,
PostCancelActivity: true,
PostCancelTimer: true,
})
// // Check all
// assertPostCancellation(&AdvancedPostCancellationInput{
// PreCancelActivity: true,
// PreCancelTimer: true,
// PostCancelActivity: true,
// PostCancelTimer: true,
// })
}

func (ts *IntegrationTestSuite) TestAdvancedPostCancellationChildWithDone() {
Expand Down
54 changes: 54 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,59 @@ func (w *Workflows) CronWorkflow(ctx workflow.Context) (int, error) {
return retme, nil
}

func (w *Workflows) WaitForCancelWithDisconnectedContextWorkflow(ctx workflow.Context) (err error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
HeartbeatTimeout: 5 * time.Second,
WaitForCancellation: true,
}
ctx = workflow.WithActivityOptions(ctx, ao)

var activities *Activities
defer func() {
if !errors.Is(ctx.Err(), workflow.ErrCanceled) {
return
}
// When the Workflow is canceled, it has to get a new disconnected context to execute any Activities
newCtx, _ := workflow.NewDisconnectedContext(ctx)
err = workflow.ExecuteActivity(newCtx, activities.EmptyActivity).Get(newCtx, nil)
}()

//Create selector
s := workflow.NewSelector(ctx)

newCtx, _ := workflow.NewDisconnectedContext(ctx)
newCtx, cancel := workflow.WithCancel(newCtx)

timer1 := workflow.NewTimer(newCtx, 5*time.Minute)

err = workflow.SetQueryHandler(newCtx, "timer-created", func() (bool, error) {
return true, nil
})
if err != nil {
return err
}

s.AddFuture(timer1, func(f workflow.Future) {
err = f.Get(newCtx, nil)
if !errors.Is(ctx.Err(), workflow.ErrCanceled) {
panic("error is not canceled error")
}
})

s.AddReceive(ctx.Done(), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
cancel()
s.Select(ctx)
})

s.Select(ctx)

var result string
err = workflow.ExecuteActivity(ctx, activities.EmptyActivity).Get(ctx, &result)
return
}

func (w *Workflows) CancelTimerConcurrentWithOtherCommandWorkflow(ctx workflow.Context) (int, error) {
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
Expand Down Expand Up @@ -2247,6 +2300,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.CancelTimerAfterActivity)
worker.RegisterWorkflow(w.CancelTimerViaDeferAfterWFTFailure)
worker.RegisterWorkflow(w.CascadingCancellation)
worker.RegisterWorkflow(w.WaitForCancelWithDisconnectedContextWorkflow)
worker.RegisterWorkflow(w.ChildWorkflowRetryOnError)
worker.RegisterWorkflow(w.ChildWorkflowRetryOnTimeout)
worker.RegisterWorkflow(w.ChildWorkflowSuccess)
Expand Down

0 comments on commit 64846ba

Please sign in to comment.