Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Reset] Track child workflows initiated after reset #7210

Merged
merged 13 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
maxChildrenInResetMutableState = 1000 // max number of children tracked in reset mutable state
)

var (
errWorkflowResetterMaxChildren = serviceerror.NewInvalidArgument(fmt.Sprintf("WorkflowResetter encountered max allowed children [%d] while resetting.", maxChildrenInResetMutableState))
)

type (
workflowResetReapplyEventsFn func(ctx context.Context, resetMutableState workflow.MutableState) error

Expand Down Expand Up @@ -648,6 +656,11 @@ func (r *workflowResetterImpl) reapplyContinueAsNewWorkflowEvents(
// All subsequent events should be excluded from being re-applied. So, do nothing and return.
return lastVisitedRunID, nil
}

// track the child workflows initiated after reset.
// This will be saved in the parent workflow (in execution info) and used by the parent later to determine how to start these child workflows again.
childrenInitializedAfterReset := make(map[string]*persistencespb.ResetChildInfo)

// First, special handling of remaining events for base workflow
nextRunID, err := r.reapplyEventsFromBranch(
ctx,
Expand All @@ -656,6 +669,7 @@ func (r *workflowResetterImpl) reapplyContinueAsNewWorkflowEvents(
baseNextEventID,
baseBranchToken,
resetReapplyExcludeTypes,
childrenInitializedAfterReset,
)
switch err.(type) {
case nil:
Expand Down Expand Up @@ -721,6 +735,7 @@ func (r *workflowResetterImpl) reapplyContinueAsNewWorkflowEvents(
nextWorkflowNextEventID,
nextWorkflowBranchToken,
resetReapplyExcludeTypes,
childrenInitializedAfterReset,
)
switch err.(type) {
case nil:
Expand All @@ -733,6 +748,9 @@ func (r *workflowResetterImpl) reapplyContinueAsNewWorkflowEvents(
return "", err
}
}
if len(childrenInitializedAfterReset) > 0 {
resetMutableState.SetChildrenInitializedPostResetPoint(childrenInitializedAfterReset)
yycptt marked this conversation as resolved.
Show resolved Hide resolved
}
return lastVisitedRunID, nil
}

Expand All @@ -743,6 +761,7 @@ func (r *workflowResetterImpl) reapplyEventsFromBranch(
nextEventID int64,
branchToken []byte,
resetReapplyExcludeTypes map[enumspb.ResetReapplyExcludeType]struct{},
childrenInitializedAfterReset map[string]*persistencespb.ResetChildInfo,
) (string, error) {

// TODO change this logic to fetching all workflow [baseWorkflow, currentWorkflow]
Expand All @@ -768,6 +787,21 @@ func (r *workflowResetterImpl) reapplyEventsFromBranch(
if _, err := r.reapplyEvents(ctx, mutableState, lastEvents, resetReapplyExcludeTypes); err != nil {
return "", err
}
// track the child workflows initiated after reset-point
for _, event := range lastEvents {
if event.GetEventType() == enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED {
attr := event.GetStartChildWorkflowExecutionInitiatedEventAttributes()
// TODO: there is a possibility the childIDs constructed this way may not be unique. But the probability of that is very low.
// Need to figure out a better way to track these child workflows.
childID := fmt.Sprintf("%s:%s", attr.GetWorkflowType().Name, attr.GetWorkflowId())
childrenInitializedAfterReset[childID] = &persistencespb.ResetChildInfo{
ShouldTerminateAndStart: true,
}
if len(childrenInitializedAfterReset) > maxChildrenInResetMutableState {
return "", errWorkflowResetterMaxChildren
}
}
}
}

if len(lastEvents) > 0 {
Expand Down
1 change: 1 addition & 0 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,7 @@ func (s *workflowResetterSuite) TestReapplyWorkflowEvents() {
nextEventID,
branchToken,
nil,
map[string]*persistencespb.ResetChildInfo{},
)
s.NoError(err)
s.Equal(newRunID, nextRunID)
Expand Down
Loading