From fb4f3215fd3cd8c33274b61a75672944c0d09843 Mon Sep 17 00:00:00 2001 From: Hai Zhao Date: Wed, 29 Jan 2025 10:20:30 -0800 Subject: [PATCH 1/6] handle state machine deletion for state-based replication --- service/history/workflow/mutable_state.go | 2 + .../history/workflow/mutable_state_impl.go | 61 ++++++++----------- .../history/workflow/mutable_state_mock.go | 14 +++++ 3 files changed, 42 insertions(+), 35 deletions(-) diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index c7d1fcf7380..9d55090f89a 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -450,5 +450,7 @@ type ( AddReapplyCandidateEvent(event *historypb.HistoryEvent) GetReapplyCandidateEvents() []*historypb.HistoryEvent + + DeleteSubStateMachine(path *persistencespb.StateMachinePath) error } ) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index dd7ace7c0ff..02663254afd 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4264,6 +4264,28 @@ func (ms *MutableStateImpl) AddWorkflowExecutionUpdateAdmittedEvent(request *upd return event, nil } +func (ms *MutableStateImpl) DeleteSubStateMachine(path *persistencespb.StateMachinePath) error { + incomingPath := []hsm.Key{} + for _, p := range path.Path { + incomingPath = append(incomingPath, hsm.Key{Type: p.Type, ID: p.Id}) + } + + root := ms.HSM() + node, err := root.Child(incomingPath) + if err != nil { + if !errors.Is(err, hsm.ErrStateMachineNotFound) { + return err + } + ms.logError( + fmt.Sprintf("unable to find path: %v in subStateMachine", incomingPath), + tag.ErrorTypeInvalidMutableStateAction, + ) + // log data inconsistency instead of returning an error + ms.logDataInconsistency() + } + return root.DeleteChild(node.Key) +} + // ApplyWorkflowExecutionUpdateAdmittedEvent applies a WorkflowExecutionUpdateAdmittedEvent to mutable state. func (ms *MutableStateImpl) ApplyWorkflowExecutionUpdateAdmittedEvent(event *historypb.HistoryEvent, batchId int64) error { attrs := event.GetWorkflowExecutionUpdateAdmittedEventAttributes() @@ -7265,41 +7287,8 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx } func (ms *MutableStateImpl) syncSubStateMachinesByType(incoming map[string]*persistencespb.StateMachineMap) error { - currentHSM := ms.HSM() - - // we don't care about the root here which is the entire mutable state - incomingHSM, err := hsm.NewRoot( - ms.shard.StateMachineRegistry(), - StateMachineType, - ms, - incoming, - ms, - ) - if err != nil { - return err - } - - if err := incomingHSM.Walk(func(incomingNode *hsm.Node) error { - if incomingNode.Parent == nil { - // skip root which is the entire mutable state - return nil - } - - incomingNodePath := incomingNode.Path() - currentNode, err := currentHSM.Child(incomingNodePath) - if err != nil { - // 1. Already done history resend if needed before, - // and node creation today always associated with an event - // 2. Node deletion is not supported right now. - // Based on 1 and 2, node should always be found here. - return err - } - - return currentNode.Sync(incomingNode) - }); err != nil { - return err - } - + ms.executionInfo.SubStateMachinesByType = incoming + ms.mustInitHSM() return nil } @@ -7331,6 +7320,8 @@ func (ms *MutableStateImpl) applyTombstones(tombstoneBatches []*persistencespb.S if _, ok := ms.pendingSignalInfoIDs[tombstone.GetSignalExternalInitiatedEventId()]; ok { err = ms.DeletePendingSignal(tombstone.GetSignalExternalInitiatedEventId()) } + case *persistencespb.StateMachineTombstone_StateMachinePath: + err = ms.DeleteSubStateMachine(tombstone.GetStateMachinePath()) default: // TODO: updateID and stateMachinePath err = serviceerror.NewInternal("unknown tombstone type") diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index d3b1f7bbb38..5e06e157529 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -1729,6 +1729,20 @@ func (mr *MockMutableStateMockRecorder) DeleteSignalRequested(requestID any) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSignalRequested", reflect.TypeOf((*MockMutableState)(nil).DeleteSignalRequested), requestID) } +// DeleteSubStateMachine mocks base method. +func (m *MockMutableState) DeleteSubStateMachine(path *persistence.StateMachinePath) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteSubStateMachine", path) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteSubStateMachine indicates an expected call of DeleteSubStateMachine. +func (mr *MockMutableStateMockRecorder) DeleteSubStateMachine(path any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSubStateMachine", reflect.TypeOf((*MockMutableState)(nil).DeleteSubStateMachine), path) +} + // FlushBufferedEvents mocks base method. func (m *MockMutableState) FlushBufferedEvents() { m.ctrl.T.Helper() From d65ca804b782d07bde2fa927be203ed4b5abaeec Mon Sep 17 00:00:00 2001 From: Hai Zhao Date: Wed, 29 Jan 2025 14:53:36 -0800 Subject: [PATCH 2/6] fix --- service/history/workflow/mutable_state_impl.go | 1 + 1 file changed, 1 insertion(+) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 02663254afd..e1893c31a8e 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4282,6 +4282,7 @@ func (ms *MutableStateImpl) DeleteSubStateMachine(path *persistencespb.StateMach ) // log data inconsistency instead of returning an error ms.logDataInconsistency() + return nil } return root.DeleteChild(node.Key) } From d20d766dd3ee0580eed96e7457a9c20fcee3c71a Mon Sep 17 00:00:00 2001 From: Hai Zhao Date: Thu, 6 Feb 2025 12:03:39 -0800 Subject: [PATCH 3/6] address comments --- .../history/ndc/workflow_state_replicator.go | 16 ++++---- .../ndc/workflow_state_replicator_test.go | 2 - service/history/workflow/mutable_state.go | 1 + .../history/workflow/mutable_state_impl.go | 41 +++++++++++-------- .../history/workflow/mutable_state_mock.go | 14 +++++++ service/history/workflow/task_refresher.go | 4 +- 6 files changed, 48 insertions(+), 30 deletions(-) diff --git a/service/history/ndc/workflow_state_replicator.go b/service/history/ndc/workflow_state_replicator.go index 8aa681392f5..9c011354682 100644 --- a/service/history/ndc/workflow_state_replicator.go +++ b/service/history/ndc/workflow_state_replicator.go @@ -360,17 +360,18 @@ func (r *WorkflowStateReplicatorImpl) applyMutation( ) } localTransitionHistory := transitionhistory.CopyVersionedTransitions(localMutableState.GetExecutionInfo().TransitionHistory) + localVersionedTransition := transitionhistory.LastVersionedTransition(localTransitionHistory) sourceTransitionHistory := mutation.StateMutation.ExecutionInfo.TransitionHistory // make sure mutation range is extension of local range if workflow.TransitionHistoryStalenessCheck(localTransitionHistory, mutation.ExclusiveStartVersionedTransition) != nil || - workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, transitionhistory.LastVersionedTransition(localTransitionHistory)) != nil { + workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, localVersionedTransition) != nil { return serviceerrors.NewSyncState( fmt.Sprintf("Failed to apply mutation due to version check failed. local transition history: %v, source transition history: %v", localTransitionHistory, sourceTransitionHistory), namespaceID.String(), workflowID, runID, - localTransitionHistory[len(localTransitionHistory)-1], + localVersionedTransition, localMutableState.GetExecutionInfo().VersionHistories, ) } @@ -402,7 +403,7 @@ func (r *WorkflowStateReplicatorImpl) applyMutation( } } - err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localTransitionHistory[len(localTransitionHistory)-1]) + err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localVersionedTransition) if err != nil { return err } @@ -453,10 +454,12 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( } var isBranchSwitched bool var localTransitionHistory []*persistencespb.VersionedTransition + var localVersionedTransition *persistencespb.VersionedTransition if len(localMutableState.GetExecutionInfo().TransitionHistory) != 0 { localTransitionHistory = transitionhistory.CopyVersionedTransitions(localMutableState.GetExecutionInfo().TransitionHistory) + localVersionedTransition = transitionhistory.LastVersionedTransition(localTransitionHistory) sourceTransitionHistory := snapshot.ExecutionInfo.TransitionHistory - err := workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, transitionhistory.LastVersionedTransition(localTransitionHistory)) + err := workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, localVersionedTransition) switch { case err == nil: // no branch switch @@ -466,7 +469,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( namespaceID.String(), workflowID, runID, - transitionhistory.LastVersionedTransition(localTransitionHistory), + localVersionedTransition, localMutableState.GetExecutionInfo().VersionHistories, ) case errors.Is(err, consts.ErrStaleReference): @@ -508,7 +511,6 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( if err != nil { return err } - localMutableState.PopTasks() // tasks are refreshed manually below var newRunWorkflow Workflow if versionedTransition.NewRunInfo != nil { @@ -530,7 +532,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( return err } } else { - err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localTransitionHistory[len(localTransitionHistory)-1]) + err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localVersionedTransition) if err != nil { return err } diff --git a/service/history/ndc/workflow_state_replicator_test.go b/service/history/ndc/workflow_state_replicator_test.go index 597ef67eed0..d7512f80731 100644 --- a/service/history/ndc/workflow_state_replicator_test.go +++ b/service/history/ndc/workflow_state_replicator_test.go @@ -689,7 +689,6 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_SameBranch_S RunId: s.runID, }).AnyTimes() mockMutableState.EXPECT().ApplySnapshot(versionedTransitionArtifact.GetSyncWorkflowStateSnapshotAttributes().State) - mockMutableState.EXPECT().PopTasks().Times(1) mockTransactionManager.EXPECT().UpdateWorkflow(gomock.Any(), false, gomock.Any(), nil).Return(nil).Times(1) mockTaskRefresher.EXPECT(). PartialRefresh(gomock.Any(), gomock.Any(), EqVersionedTransition(&persistencespb.VersionedTransition{ @@ -774,7 +773,6 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_DifferentBra }, }).AnyTimes() mockMutableState.EXPECT().ApplySnapshot(versionedTransitionArtifact.GetSyncWorkflowStateSnapshotAttributes().State) - mockMutableState.EXPECT().PopTasks().Times(1) mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ RunId: s.runID, }).AnyTimes() diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index 935b580337d..8733e7cb2f5 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -463,5 +463,6 @@ type ( CurrentVersionedTransition() *persistencespb.VersionedTransition DeleteSubStateMachine(path *persistencespb.StateMachinePath) error + IsSubStateMachineDeleted() bool } ) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index fb157cf758f..e4b503f8fb1 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -239,14 +239,15 @@ type ( workflowTaskManager *workflowTaskStateMachine QueryRegistry QueryRegistry - shard shard.Context - clusterMetadata cluster.Metadata - eventsCache events.Cache - config *configs.Config - timeSource clock.TimeSource - logger log.Logger - metricsHandler metrics.Handler - stateMachineNode *hsm.Node + shard shard.Context + clusterMetadata cluster.Metadata + eventsCache events.Cache + config *configs.Config + timeSource clock.TimeSource + logger log.Logger + metricsHandler metrics.Handler + stateMachineNode *hsm.Node + subStateMachineDeleted bool // Tracks all events added via the AddHistoryEvent method that is used by the state machine framework. currentTransactionAddedStateMachineEventTypes []enumspb.EventType @@ -4336,9 +4337,9 @@ func (ms *MutableStateImpl) AddWorkflowExecutionUpdateAdmittedEvent(request *upd } func (ms *MutableStateImpl) DeleteSubStateMachine(path *persistencespb.StateMachinePath) error { - incomingPath := []hsm.Key{} - for _, p := range path.Path { - incomingPath = append(incomingPath, hsm.Key{Type: p.Type, ID: p.Id}) + incomingPath := make([]hsm.Key, len(path.Path)) + for i, p := range path.Path { + incomingPath[i] = hsm.Key{Type: p.Type, ID: p.Id} } root := ms.HSM() @@ -4347,15 +4348,15 @@ func (ms *MutableStateImpl) DeleteSubStateMachine(path *persistencespb.StateMach if !errors.Is(err, hsm.ErrStateMachineNotFound) { return err } - ms.logError( - fmt.Sprintf("unable to find path: %v in subStateMachine", incomingPath), - tag.ErrorTypeInvalidMutableStateAction, - ) - // log data inconsistency instead of returning an error - ms.logDataInconsistency() + // node is already deleted. return nil } - return root.DeleteChild(node.Key) + err = node.Parent.DeleteChild(node.Key) + if err != nil { + return err + } + ms.subStateMachineDeleted = true + return nil } // ApplyWorkflowExecutionUpdateAdmittedEvent applies a WorkflowExecutionUpdateAdmittedEvent to mutable state. @@ -7555,3 +7556,7 @@ func (ms *MutableStateImpl) AddReapplyCandidateEvent(event *historypb.HistoryEve func (ms *MutableStateImpl) GetReapplyCandidateEvents() []*historypb.HistoryEvent { return ms.reapplyEventsCandidate } + +func (ms *MutableStateImpl) IsSubStateMachineDeleted() bool { + return ms.subStateMachineDeleted +} diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index fa79277c292..95c028090af 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -2863,6 +2863,20 @@ func (mr *MockMutableStateMockRecorder) IsStickyTaskQueueSet() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsStickyTaskQueueSet", reflect.TypeOf((*MockMutableState)(nil).IsStickyTaskQueueSet)) } +// IsSubStateMachineDeleted mocks base method. +func (m *MockMutableState) IsSubStateMachineDeleted() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsSubStateMachineDeleted") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsSubStateMachineDeleted indicates an expected call of IsSubStateMachineDeleted. +func (mr *MockMutableStateMockRecorder) IsSubStateMachineDeleted() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSubStateMachineDeleted", reflect.TypeOf((*MockMutableState)(nil).IsSubStateMachineDeleted)) +} + // IsTransientWorkflowTask mocks base method. func (m *MockMutableState) IsTransientWorkflowTask() bool { m.ctrl.T.Helper() diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index 427a4c00a94..29a74ee18fe 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -655,9 +655,7 @@ func (r *TaskRefresherImpl) refreshTasksForSubStateMachines( return err } - if len(nodesToRefresh) != 0 { - // TODO: after hsm node tombstone is tracked in mutable state, - // also trigger trim when there are new tombstones after minVersionedTransition + if len(nodesToRefresh) != 0 || mutableState.IsSubStateMachineDeleted() { if err := TrimStateMachineTimers(mutableState, minVersionedTransition); err != nil { return err } From e0953700a8c81dbf3c10aa6e3a9c5f8acbd8e8f6 Mon Sep 17 00:00:00 2001 From: Hai Zhao Date: Thu, 6 Feb 2025 14:00:59 -0800 Subject: [PATCH 4/6] enable node deletion for state based replication and enable tests. --- components/nexusoperations/events.go | 9 +-------- tests/xdc/nexus_state_replication_test.go | 9 ++++----- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/components/nexusoperations/events.go b/components/nexusoperations/events.go index 5dcf22cfe48..510ba743c25 100644 --- a/components/nexusoperations/events.go +++ b/components/nexusoperations/events.go @@ -306,12 +306,5 @@ func findOperationNode(root *hsm.Node, event *historypb.HistoryEvent) (*hsm.Node } func maybeDeleteNode(node *hsm.Node) error { - ms, err := hsm.MachineData[interface{ IsTransitionHistoryEnabled() bool }](node.Parent) - if err != nil { - return err - } - if !ms.IsTransitionHistoryEnabled() { - return node.Parent.DeleteChild(node.Key) - } - return nil + return node.Parent.DeleteChild(node.Key) } diff --git a/tests/xdc/nexus_state_replication_test.go b/tests/xdc/nexus_state_replication_test.go index 16d29c442e5..19a99d44116 100644 --- a/tests/xdc/nexus_state_replication_test.go +++ b/tests/xdc/nexus_state_replication_test.go @@ -72,11 +72,10 @@ func TestNexusStateReplicationTestSuite(t *testing.T) { name: "DisableTransitionHistory", enableTransitionHistory: false, }, - // TODO(hai719): Enable this test once state based replication works with HSM node deletion. - // { - // name: "EnableTransitionHistory", - // enableTransitionHistory: true, - // }, + { + name: "EnableTransitionHistory", + enableTransitionHistory: true, + }, } { t.Run(tc.name, func(t *testing.T) { s := &NexusStateReplicationSuite{} From 276c3b50d22f32e56ef228345ce98c8183889291 Mon Sep 17 00:00:00 2001 From: Hai Zhao Date: Thu, 6 Feb 2025 16:11:57 -0800 Subject: [PATCH 5/6] address comments --- components/nexusoperations/events.go | 12 ++++------ .../history/workflow/mutable_state_impl.go | 24 +++++++++++++++++++ .../history/workflow/state_machine_timers.go | 13 ++++++++-- 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/components/nexusoperations/events.go b/components/nexusoperations/events.go index 510ba743c25..a958fdf3ca1 100644 --- a/components/nexusoperations/events.go +++ b/components/nexusoperations/events.go @@ -125,7 +125,7 @@ func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.History return err } - return maybeDeleteNode(node) + return node.Parent.DeleteChild(node.Key) } func (d CompletedEventDefinition) Type() enumspb.EventType { @@ -161,7 +161,7 @@ func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEve return err } - return maybeDeleteNode(node) + return node.Parent.DeleteChild(node.Key) } func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error { @@ -192,7 +192,7 @@ func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE return err } - return maybeDeleteNode(node) + return node.Parent.DeleteChild(node.Key) } func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error { @@ -222,7 +222,7 @@ func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE return err } - return maybeDeleteNode(node) + return node.Parent.DeleteChild(node.Key) } func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error { @@ -304,7 +304,3 @@ func findOperationNode(root *hsm.Node, event *historypb.HistoryEvent) (*hsm.Node } return node, nil } - -func maybeDeleteNode(node *hsm.Node) error { - return node.Parent.DeleteChild(node.Key) -} diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index e4b503f8fb1..613c961e3c2 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -6314,6 +6314,7 @@ func (ms *MutableStateImpl) cleanupTransaction() error { ms.timerInfosUserDataUpdated = make(map[string]struct{}) ms.activityInfosUserDataUpdated = make(map[int64]struct{}) ms.reapplyEventsCandidate = nil + ms.subStateMachineDeleted = false ms.stateInDB = ms.executionState.State ms.nextEventIDInDB = ms.GetNextEventID() @@ -7375,6 +7376,29 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx } func (ms *MutableStateImpl) syncSubStateMachinesByType(incoming map[string]*persistencespb.StateMachineMap) error { + // check if there is node been deleted + currentHSM := ms.HSM() + incomingHSM, err := hsm.NewRoot(ms.shard.StateMachineRegistry(), StateMachineType, ms, incoming, ms) + if err != nil { + return err + } + + if err := incomingHSM.Walk(func(incomingNode *hsm.Node) error { + if incomingNode.Parent == nil { + // skip root which is the entire mutable state + return nil + } + incomingNodePath := incomingNode.Path() + _, err := currentHSM.Child(incomingNodePath) + if err != nil && errors.Is(err, hsm.ErrStateMachineNotFound) { + ms.subStateMachineDeleted = true + return nil + } + return err + }); err != nil { + return err + } + ms.executionInfo.SubStateMachinesByType = incoming ms.mustInitHSM() return nil diff --git a/service/history/workflow/state_machine_timers.go b/service/history/workflow/state_machine_timers.go index 2eff94c775e..cc09127594b 100644 --- a/service/history/workflow/state_machine_timers.go +++ b/service/history/workflow/state_machine_timers.go @@ -28,6 +28,7 @@ import ( "time" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/util" "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/tasks" "google.golang.org/protobuf/types/known/timestamppb" @@ -36,10 +37,16 @@ import ( // AddNextStateMachineTimerTask generates a state machine timer task if the first deadline doesn't have a task scheduled // yet. func AddNextStateMachineTimerTask(ms MutableState) { - timers := ms.GetExecutionInfo().StateMachineTimers + // filter out empty timer groups + timers := util.FilterSlice(ms.GetExecutionInfo().StateMachineTimers, func(timerGroup *persistencespb.StateMachineTimerGroup) bool { + return len(timerGroup.Infos) > 0 + }) + ms.GetExecutionInfo().StateMachineTimers = timers + if len(timers) == 0 { return } + timerGroup := timers[0] // We already have a timer for this deadline. if timerGroup.Scheduled { @@ -122,7 +129,9 @@ func TrimStateMachineTimers( trimmedTaskInfos = append(trimmedTaskInfos, taskInfo) } - if len(trimmedTaskInfos) > 0 { + if len(trimmedTaskInfos) > 0 || timerGroup.Scheduled { + // We still want to keep the timer group if it has been scheduled even if it has no task info. + // This will prevent us from scheduling a new timer task for the same group. trimmedStateMachineTimers = append(trimmedStateMachineTimers, &persistencespb.StateMachineTimerGroup{ Infos: trimmedTaskInfos, Deadline: timerGroup.Deadline, From d877cdb9c34383e921a531344339fa059ab8ad6b Mon Sep 17 00:00:00 2001 From: Hai Zhao Date: Thu, 6 Feb 2025 17:08:29 -0800 Subject: [PATCH 6/6] address comment --- service/history/workflow/state_machine_timers.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/history/workflow/state_machine_timers.go b/service/history/workflow/state_machine_timers.go index cc09127594b..dbfcccc7378 100644 --- a/service/history/workflow/state_machine_timers.go +++ b/service/history/workflow/state_machine_timers.go @@ -28,7 +28,6 @@ import ( "time" persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common/util" "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/tasks" "google.golang.org/protobuf/types/known/timestamppb" @@ -38,8 +37,9 @@ import ( // yet. func AddNextStateMachineTimerTask(ms MutableState) { // filter out empty timer groups - timers := util.FilterSlice(ms.GetExecutionInfo().StateMachineTimers, func(timerGroup *persistencespb.StateMachineTimerGroup) bool { - return len(timerGroup.Infos) > 0 + timers := ms.GetExecutionInfo().StateMachineTimers + timers = slices.DeleteFunc(timers, func(timerGroup *persistencespb.StateMachineTimerGroup) bool { + return len(timerGroup.Infos) == 0 }) ms.GetExecutionInfo().StateMachineTimers = timers