Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gow committed Feb 12, 2025
1 parent a0186dc commit a48bf75
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
13 changes: 8 additions & 5 deletions service/history/transfer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,10 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution(
RunId: task.GetRunID(),
}
executionInfo := mutableState.GetExecutionInfo()
children := copyChildWorkflowInfos(mutableState.GetPendingChildExecutionInfos())
var completionEvent *historypb.HistoryEvent // needed to report close event to parent workflow
replyToParentWorkflow := mutableState.HasParentExecution() && executionInfo.NewExecutionRunId == ""
if replyToParentWorkflow {
if replyToParentWorkflow || len(children) > 0 {
// only load close event if needed.
completionEvent, err = mutableState.GetCompletionEvent(ctx)
if err != nil {
Expand All @@ -369,7 +370,6 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution(
}

namespaceName := mutableState.GetNamespaceEntry().Name()
children := copyChildWorkflowInfos(mutableState.GetPendingChildExecutionInfos())

// NOTE: do not access anything related mutable state after this lock release.
// Release lock immediately since mutable state is not needed
Expand Down Expand Up @@ -402,13 +402,16 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution(

// process parentClosePolicy except when the execution was reset. In case of reset, we need to keep the children around so that we can reconnect to them.
// We know an execution was reset when ResetRunId was populated in it.
// TODO (Chetan): update this condition as new reset policies are added. For now we keep all children since "Reconnect" is the only policy available.
// Note: Sometimes the reset operation might race with this task processing. i.e the WF was closed and before this task can be executed, a reset operation is recorded.
// So we need to additionally check the termination reason for this parent to determine if this task was indeed created due to reset or due to normal completion of the WF.
// Also, checking the dynamic config is not strictly safe since by definition it can change at any time. However this reduces the chance of us skipping the parent close policy when we shouldn't.
allowResetWithPendingChildren := t.config.AllowResetWithPendingChildren(namespaceName.String())
shouldSkipParentClosePolicy := false
if executionInfo.GetResetRunId() != "" && allowResetWithPendingChildren {
isParentTerminatedDueToReset := (completionEvent != nil) && ndc.IsTerminatedByResetter(completionEvent)
if isParentTerminatedDueToReset && executionInfo.GetResetRunId() != "" && allowResetWithPendingChildren {
// TODO (Chetan): update this condition as new reset policies/cases are added.
shouldSkipParentClosePolicy = true // only skip if the parent is reset and we are using the new flow.
}

if !shouldSkipParentClosePolicy {
if err := t.processParentClosePolicy(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
}

event, _ = mutableState.AddWorkflowTaskCompletedEvent(wt, &workflowservice.RespondWorkflowTaskCompletedRequest{
Identity: "some random identity",
Identity: consts.IdentityResetter,
Commands: commands,
}, defaultWorkflowTaskCompletionLimits)

Expand Down

0 comments on commit a48bf75

Please sign in to comment.