diff --git a/components/nexusoperations/events.go b/components/nexusoperations/events.go index 5dcf22cfe48..a958fdf3ca1 100644 --- a/components/nexusoperations/events.go +++ b/components/nexusoperations/events.go @@ -125,7 +125,7 @@ func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.History return err } - return maybeDeleteNode(node) + return node.Parent.DeleteChild(node.Key) } func (d CompletedEventDefinition) Type() enumspb.EventType { @@ -161,7 +161,7 @@ func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEve return err } - return maybeDeleteNode(node) + return node.Parent.DeleteChild(node.Key) } func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error { @@ -192,7 +192,7 @@ func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE return err } - return maybeDeleteNode(node) + return node.Parent.DeleteChild(node.Key) } func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error { @@ -222,7 +222,7 @@ func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE return err } - return maybeDeleteNode(node) + return node.Parent.DeleteChild(node.Key) } func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error { @@ -304,14 +304,3 @@ func findOperationNode(root *hsm.Node, event *historypb.HistoryEvent) (*hsm.Node } return node, nil } - -func maybeDeleteNode(node *hsm.Node) error { - ms, err := hsm.MachineData[interface{ IsTransitionHistoryEnabled() bool }](node.Parent) - if err != nil { - return err - } - if !ms.IsTransitionHistoryEnabled() { - return node.Parent.DeleteChild(node.Key) - } - return nil -} diff --git a/service/history/ndc/workflow_state_replicator.go b/service/history/ndc/workflow_state_replicator.go index 8aa681392f5..9c011354682 100644 --- a/service/history/ndc/workflow_state_replicator.go +++ b/service/history/ndc/workflow_state_replicator.go @@ -360,17 +360,18 @@ func (r *WorkflowStateReplicatorImpl) applyMutation( ) } localTransitionHistory := transitionhistory.CopyVersionedTransitions(localMutableState.GetExecutionInfo().TransitionHistory) + localVersionedTransition := transitionhistory.LastVersionedTransition(localTransitionHistory) sourceTransitionHistory := mutation.StateMutation.ExecutionInfo.TransitionHistory // make sure mutation range is extension of local range if workflow.TransitionHistoryStalenessCheck(localTransitionHistory, mutation.ExclusiveStartVersionedTransition) != nil || - workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, transitionhistory.LastVersionedTransition(localTransitionHistory)) != nil { + workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, localVersionedTransition) != nil { return serviceerrors.NewSyncState( fmt.Sprintf("Failed to apply mutation due to version check failed. local transition history: %v, source transition history: %v", localTransitionHistory, sourceTransitionHistory), namespaceID.String(), workflowID, runID, - localTransitionHistory[len(localTransitionHistory)-1], + localVersionedTransition, localMutableState.GetExecutionInfo().VersionHistories, ) } @@ -402,7 +403,7 @@ func (r *WorkflowStateReplicatorImpl) applyMutation( } } - err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localTransitionHistory[len(localTransitionHistory)-1]) + err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localVersionedTransition) if err != nil { return err } @@ -453,10 +454,12 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( } var isBranchSwitched bool var localTransitionHistory []*persistencespb.VersionedTransition + var localVersionedTransition *persistencespb.VersionedTransition if len(localMutableState.GetExecutionInfo().TransitionHistory) != 0 { localTransitionHistory = transitionhistory.CopyVersionedTransitions(localMutableState.GetExecutionInfo().TransitionHistory) + localVersionedTransition = transitionhistory.LastVersionedTransition(localTransitionHistory) sourceTransitionHistory := snapshot.ExecutionInfo.TransitionHistory - err := workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, transitionhistory.LastVersionedTransition(localTransitionHistory)) + err := workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, localVersionedTransition) switch { case err == nil: // no branch switch @@ -466,7 +469,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( namespaceID.String(), workflowID, runID, - transitionhistory.LastVersionedTransition(localTransitionHistory), + localVersionedTransition, localMutableState.GetExecutionInfo().VersionHistories, ) case errors.Is(err, consts.ErrStaleReference): @@ -508,7 +511,6 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( if err != nil { return err } - localMutableState.PopTasks() // tasks are refreshed manually below var newRunWorkflow Workflow if versionedTransition.NewRunInfo != nil { @@ -530,7 +532,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( return err } } else { - err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localTransitionHistory[len(localTransitionHistory)-1]) + err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localVersionedTransition) if err != nil { return err } diff --git a/service/history/ndc/workflow_state_replicator_test.go b/service/history/ndc/workflow_state_replicator_test.go index 597ef67eed0..d7512f80731 100644 --- a/service/history/ndc/workflow_state_replicator_test.go +++ b/service/history/ndc/workflow_state_replicator_test.go @@ -689,7 +689,6 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_SameBranch_S RunId: s.runID, }).AnyTimes() mockMutableState.EXPECT().ApplySnapshot(versionedTransitionArtifact.GetSyncWorkflowStateSnapshotAttributes().State) - mockMutableState.EXPECT().PopTasks().Times(1) mockTransactionManager.EXPECT().UpdateWorkflow(gomock.Any(), false, gomock.Any(), nil).Return(nil).Times(1) mockTaskRefresher.EXPECT(). PartialRefresh(gomock.Any(), gomock.Any(), EqVersionedTransition(&persistencespb.VersionedTransition{ @@ -774,7 +773,6 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_DifferentBra }, }).AnyTimes() mockMutableState.EXPECT().ApplySnapshot(versionedTransitionArtifact.GetSyncWorkflowStateSnapshotAttributes().State) - mockMutableState.EXPECT().PopTasks().Times(1) mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ RunId: s.runID, }).AnyTimes() diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index b900ad9fdf9..8733e7cb2f5 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -461,5 +461,8 @@ type ( GetReapplyCandidateEvents() []*historypb.HistoryEvent CurrentVersionedTransition() *persistencespb.VersionedTransition + + DeleteSubStateMachine(path *persistencespb.StateMachinePath) error + IsSubStateMachineDeleted() bool } ) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 94932df7713..613c961e3c2 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -239,14 +239,15 @@ type ( workflowTaskManager *workflowTaskStateMachine QueryRegistry QueryRegistry - shard shard.Context - clusterMetadata cluster.Metadata - eventsCache events.Cache - config *configs.Config - timeSource clock.TimeSource - logger log.Logger - metricsHandler metrics.Handler - stateMachineNode *hsm.Node + shard shard.Context + clusterMetadata cluster.Metadata + eventsCache events.Cache + config *configs.Config + timeSource clock.TimeSource + logger log.Logger + metricsHandler metrics.Handler + stateMachineNode *hsm.Node + subStateMachineDeleted bool // Tracks all events added via the AddHistoryEvent method that is used by the state machine framework. currentTransactionAddedStateMachineEventTypes []enumspb.EventType @@ -4335,6 +4336,29 @@ func (ms *MutableStateImpl) AddWorkflowExecutionUpdateAdmittedEvent(request *upd return event, nil } +func (ms *MutableStateImpl) DeleteSubStateMachine(path *persistencespb.StateMachinePath) error { + incomingPath := make([]hsm.Key, len(path.Path)) + for i, p := range path.Path { + incomingPath[i] = hsm.Key{Type: p.Type, ID: p.Id} + } + + root := ms.HSM() + node, err := root.Child(incomingPath) + if err != nil { + if !errors.Is(err, hsm.ErrStateMachineNotFound) { + return err + } + // node is already deleted. + return nil + } + err = node.Parent.DeleteChild(node.Key) + if err != nil { + return err + } + ms.subStateMachineDeleted = true + return nil +} + // ApplyWorkflowExecutionUpdateAdmittedEvent applies a WorkflowExecutionUpdateAdmittedEvent to mutable state. func (ms *MutableStateImpl) ApplyWorkflowExecutionUpdateAdmittedEvent(event *historypb.HistoryEvent, batchId int64) error { attrs := event.GetWorkflowExecutionUpdateAdmittedEventAttributes() @@ -6290,6 +6314,7 @@ func (ms *MutableStateImpl) cleanupTransaction() error { ms.timerInfosUserDataUpdated = make(map[string]struct{}) ms.activityInfosUserDataUpdated = make(map[int64]struct{}) ms.reapplyEventsCandidate = nil + ms.subStateMachineDeleted = false ms.stateInDB = ms.executionState.State ms.nextEventIDInDB = ms.GetNextEventID() @@ -7351,16 +7376,9 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx } func (ms *MutableStateImpl) syncSubStateMachinesByType(incoming map[string]*persistencespb.StateMachineMap) error { + // check if there is node been deleted currentHSM := ms.HSM() - - // we don't care about the root here which is the entire mutable state - incomingHSM, err := hsm.NewRoot( - ms.shard.StateMachineRegistry(), - StateMachineType, - ms, - incoming, - ms, - ) + incomingHSM, err := hsm.NewRoot(ms.shard.StateMachineRegistry(), StateMachineType, ms, incoming, ms) if err != nil { return err } @@ -7370,22 +7388,19 @@ func (ms *MutableStateImpl) syncSubStateMachinesByType(incoming map[string]*pers // skip root which is the entire mutable state return nil } - incomingNodePath := incomingNode.Path() - currentNode, err := currentHSM.Child(incomingNodePath) - if err != nil { - // 1. Already done history resend if needed before, - // and node creation today always associated with an event - // 2. Node deletion is not supported right now. - // Based on 1 and 2, node should always be found here. - return err + _, err := currentHSM.Child(incomingNodePath) + if err != nil && errors.Is(err, hsm.ErrStateMachineNotFound) { + ms.subStateMachineDeleted = true + return nil } - - return currentNode.Sync(incomingNode) + return err }); err != nil { return err } + ms.executionInfo.SubStateMachinesByType = incoming + ms.mustInitHSM() return nil } @@ -7417,6 +7432,8 @@ func (ms *MutableStateImpl) applyTombstones(tombstoneBatches []*persistencespb.S if _, ok := ms.pendingSignalInfoIDs[tombstone.GetSignalExternalInitiatedEventId()]; ok { err = ms.DeletePendingSignal(tombstone.GetSignalExternalInitiatedEventId()) } + case *persistencespb.StateMachineTombstone_StateMachinePath: + err = ms.DeleteSubStateMachine(tombstone.GetStateMachinePath()) default: // TODO: updateID and stateMachinePath err = serviceerror.NewInternal("unknown tombstone type") @@ -7563,3 +7580,7 @@ func (ms *MutableStateImpl) AddReapplyCandidateEvent(event *historypb.HistoryEve func (ms *MutableStateImpl) GetReapplyCandidateEvents() []*historypb.HistoryEvent { return ms.reapplyEventsCandidate } + +func (ms *MutableStateImpl) IsSubStateMachineDeleted() bool { + return ms.subStateMachineDeleted +} diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index 72824457e9f..95c028090af 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -1744,6 +1744,20 @@ func (mr *MockMutableStateMockRecorder) DeleteSignalRequested(requestID any) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSignalRequested", reflect.TypeOf((*MockMutableState)(nil).DeleteSignalRequested), requestID) } +// DeleteSubStateMachine mocks base method. +func (m *MockMutableState) DeleteSubStateMachine(path *persistence.StateMachinePath) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteSubStateMachine", path) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteSubStateMachine indicates an expected call of DeleteSubStateMachine. +func (mr *MockMutableStateMockRecorder) DeleteSubStateMachine(path any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSubStateMachine", reflect.TypeOf((*MockMutableState)(nil).DeleteSubStateMachine), path) +} + // FlushBufferedEvents mocks base method. func (m *MockMutableState) FlushBufferedEvents() { m.ctrl.T.Helper() @@ -2849,6 +2863,20 @@ func (mr *MockMutableStateMockRecorder) IsStickyTaskQueueSet() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsStickyTaskQueueSet", reflect.TypeOf((*MockMutableState)(nil).IsStickyTaskQueueSet)) } +// IsSubStateMachineDeleted mocks base method. +func (m *MockMutableState) IsSubStateMachineDeleted() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsSubStateMachineDeleted") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsSubStateMachineDeleted indicates an expected call of IsSubStateMachineDeleted. +func (mr *MockMutableStateMockRecorder) IsSubStateMachineDeleted() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSubStateMachineDeleted", reflect.TypeOf((*MockMutableState)(nil).IsSubStateMachineDeleted)) +} + // IsTransientWorkflowTask mocks base method. func (m *MockMutableState) IsTransientWorkflowTask() bool { m.ctrl.T.Helper() diff --git a/service/history/workflow/state_machine_timers.go b/service/history/workflow/state_machine_timers.go index 2eff94c775e..dbfcccc7378 100644 --- a/service/history/workflow/state_machine_timers.go +++ b/service/history/workflow/state_machine_timers.go @@ -36,10 +36,17 @@ import ( // AddNextStateMachineTimerTask generates a state machine timer task if the first deadline doesn't have a task scheduled // yet. func AddNextStateMachineTimerTask(ms MutableState) { + // filter out empty timer groups timers := ms.GetExecutionInfo().StateMachineTimers + timers = slices.DeleteFunc(timers, func(timerGroup *persistencespb.StateMachineTimerGroup) bool { + return len(timerGroup.Infos) == 0 + }) + ms.GetExecutionInfo().StateMachineTimers = timers + if len(timers) == 0 { return } + timerGroup := timers[0] // We already have a timer for this deadline. if timerGroup.Scheduled { @@ -122,7 +129,9 @@ func TrimStateMachineTimers( trimmedTaskInfos = append(trimmedTaskInfos, taskInfo) } - if len(trimmedTaskInfos) > 0 { + if len(trimmedTaskInfos) > 0 || timerGroup.Scheduled { + // We still want to keep the timer group if it has been scheduled even if it has no task info. + // This will prevent us from scheduling a new timer task for the same group. trimmedStateMachineTimers = append(trimmedStateMachineTimers, &persistencespb.StateMachineTimerGroup{ Infos: trimmedTaskInfos, Deadline: timerGroup.Deadline, diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index 427a4c00a94..29a74ee18fe 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -655,9 +655,7 @@ func (r *TaskRefresherImpl) refreshTasksForSubStateMachines( return err } - if len(nodesToRefresh) != 0 { - // TODO: after hsm node tombstone is tracked in mutable state, - // also trigger trim when there are new tombstones after minVersionedTransition + if len(nodesToRefresh) != 0 || mutableState.IsSubStateMachineDeleted() { if err := TrimStateMachineTimers(mutableState, minVersionedTransition); err != nil { return err } diff --git a/tests/xdc/nexus_state_replication_test.go b/tests/xdc/nexus_state_replication_test.go index 16d29c442e5..19a99d44116 100644 --- a/tests/xdc/nexus_state_replication_test.go +++ b/tests/xdc/nexus_state_replication_test.go @@ -72,11 +72,10 @@ func TestNexusStateReplicationTestSuite(t *testing.T) { name: "DisableTransitionHistory", enableTransitionHistory: false, }, - // TODO(hai719): Enable this test once state based replication works with HSM node deletion. - // { - // name: "EnableTransitionHistory", - // enableTransitionHistory: true, - // }, + { + name: "EnableTransitionHistory", + enableTransitionHistory: true, + }, } { t.Run(tc.name, func(t *testing.T) { s := &NexusStateReplicationSuite{}