Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle state machine deletion for state-based replication #7177

Merged
merged 7 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions components/nexusoperations/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.History
return err
}

return maybeDeleteNode(node)
return node.Parent.DeleteChild(node.Key)
}

func (d CompletedEventDefinition) Type() enumspb.EventType {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEve
return err
}

return maybeDeleteNode(node)
return node.Parent.DeleteChild(node.Key)
}

func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand Down Expand Up @@ -192,7 +192,7 @@ func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE
return err
}

return maybeDeleteNode(node)
return node.Parent.DeleteChild(node.Key)
}

func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand Down Expand Up @@ -222,7 +222,7 @@ func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE
return err
}

return maybeDeleteNode(node)
return node.Parent.DeleteChild(node.Key)
}

func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand Down Expand Up @@ -304,14 +304,3 @@ func findOperationNode(root *hsm.Node, event *historypb.HistoryEvent) (*hsm.Node
}
return node, nil
}

func maybeDeleteNode(node *hsm.Node) error {
ms, err := hsm.MachineData[interface{ IsTransitionHistoryEnabled() bool }](node.Parent)
if err != nil {
return err
}
if !ms.IsTransitionHistoryEnabled() {
return node.Parent.DeleteChild(node.Key)
}
return nil
}
16 changes: 9 additions & 7 deletions service/history/ndc/workflow_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,17 +360,18 @@ func (r *WorkflowStateReplicatorImpl) applyMutation(
)
}
localTransitionHistory := transitionhistory.CopyVersionedTransitions(localMutableState.GetExecutionInfo().TransitionHistory)
localVersionedTransition := transitionhistory.LastVersionedTransition(localTransitionHistory)
sourceTransitionHistory := mutation.StateMutation.ExecutionInfo.TransitionHistory

// make sure mutation range is extension of local range
if workflow.TransitionHistoryStalenessCheck(localTransitionHistory, mutation.ExclusiveStartVersionedTransition) != nil ||
workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, transitionhistory.LastVersionedTransition(localTransitionHistory)) != nil {
workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, localVersionedTransition) != nil {
return serviceerrors.NewSyncState(
fmt.Sprintf("Failed to apply mutation due to version check failed. local transition history: %v, source transition history: %v", localTransitionHistory, sourceTransitionHistory),
namespaceID.String(),
workflowID,
runID,
localTransitionHistory[len(localTransitionHistory)-1],
localVersionedTransition,
localMutableState.GetExecutionInfo().VersionHistories,
)
}
Expand Down Expand Up @@ -402,7 +403,7 @@ func (r *WorkflowStateReplicatorImpl) applyMutation(
}
}

err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localTransitionHistory[len(localTransitionHistory)-1])
err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localVersionedTransition)
if err != nil {
return err
}
Expand Down Expand Up @@ -453,10 +454,12 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot(
}
var isBranchSwitched bool
var localTransitionHistory []*persistencespb.VersionedTransition
var localVersionedTransition *persistencespb.VersionedTransition
if len(localMutableState.GetExecutionInfo().TransitionHistory) != 0 {
localTransitionHistory = transitionhistory.CopyVersionedTransitions(localMutableState.GetExecutionInfo().TransitionHistory)
localVersionedTransition = transitionhistory.LastVersionedTransition(localTransitionHistory)
sourceTransitionHistory := snapshot.ExecutionInfo.TransitionHistory
err := workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, transitionhistory.LastVersionedTransition(localTransitionHistory))
err := workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, localVersionedTransition)
switch {
case err == nil:
// no branch switch
Expand All @@ -466,7 +469,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot(
namespaceID.String(),
workflowID,
runID,
transitionhistory.LastVersionedTransition(localTransitionHistory),
localVersionedTransition,
localMutableState.GetExecutionInfo().VersionHistories,
)
case errors.Is(err, consts.ErrStaleReference):
Expand Down Expand Up @@ -508,7 +511,6 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot(
if err != nil {
return err
}
localMutableState.PopTasks() // tasks are refreshed manually below

var newRunWorkflow Workflow
if versionedTransition.NewRunInfo != nil {
Expand All @@ -530,7 +532,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot(
return err
}
} else {
err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localTransitionHistory[len(localTransitionHistory)-1])
err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localVersionedTransition)
if err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions service/history/ndc/workflow_state_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,6 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_SameBranch_S
RunId: s.runID,
}).AnyTimes()
mockMutableState.EXPECT().ApplySnapshot(versionedTransitionArtifact.GetSyncWorkflowStateSnapshotAttributes().State)
mockMutableState.EXPECT().PopTasks().Times(1)
mockTransactionManager.EXPECT().UpdateWorkflow(gomock.Any(), false, gomock.Any(), nil).Return(nil).Times(1)
mockTaskRefresher.EXPECT().
PartialRefresh(gomock.Any(), gomock.Any(), EqVersionedTransition(&persistencespb.VersionedTransition{
Expand Down Expand Up @@ -774,7 +773,6 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_DifferentBra
},
}).AnyTimes()
mockMutableState.EXPECT().ApplySnapshot(versionedTransitionArtifact.GetSyncWorkflowStateSnapshotAttributes().State)
mockMutableState.EXPECT().PopTasks().Times(1)
mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
RunId: s.runID,
}).AnyTimes()
Expand Down
3 changes: 3 additions & 0 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,5 +461,8 @@ type (
GetReapplyCandidateEvents() []*historypb.HistoryEvent

CurrentVersionedTransition() *persistencespb.VersionedTransition

DeleteSubStateMachine(path *persistencespb.StateMachinePath) error
IsSubStateMachineDeleted() bool
}
)
75 changes: 48 additions & 27 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,15 @@ type (
workflowTaskManager *workflowTaskStateMachine
QueryRegistry QueryRegistry

shard shard.Context
clusterMetadata cluster.Metadata
eventsCache events.Cache
config *configs.Config
timeSource clock.TimeSource
logger log.Logger
metricsHandler metrics.Handler
stateMachineNode *hsm.Node
shard shard.Context
clusterMetadata cluster.Metadata
eventsCache events.Cache
config *configs.Config
timeSource clock.TimeSource
logger log.Logger
metricsHandler metrics.Handler
stateMachineNode *hsm.Node
subStateMachineDeleted bool

// Tracks all events added via the AddHistoryEvent method that is used by the state machine framework.
currentTransactionAddedStateMachineEventTypes []enumspb.EventType
Expand Down Expand Up @@ -4335,6 +4336,29 @@ func (ms *MutableStateImpl) AddWorkflowExecutionUpdateAdmittedEvent(request *upd
return event, nil
}

func (ms *MutableStateImpl) DeleteSubStateMachine(path *persistencespb.StateMachinePath) error {
incomingPath := make([]hsm.Key, len(path.Path))
for i, p := range path.Path {
incomingPath[i] = hsm.Key{Type: p.Type, ID: p.Id}
}

root := ms.HSM()
node, err := root.Child(incomingPath)
if err != nil {
if !errors.Is(err, hsm.ErrStateMachineNotFound) {
return err
}
// node is already deleted.
return nil
}
err = node.Parent.DeleteChild(node.Key)
if err != nil {
return err
}
ms.subStateMachineDeleted = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this being reset anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reset in cleanupTransaction()

return nil
}

// ApplyWorkflowExecutionUpdateAdmittedEvent applies a WorkflowExecutionUpdateAdmittedEvent to mutable state.
func (ms *MutableStateImpl) ApplyWorkflowExecutionUpdateAdmittedEvent(event *historypb.HistoryEvent, batchId int64) error {
attrs := event.GetWorkflowExecutionUpdateAdmittedEventAttributes()
Expand Down Expand Up @@ -6290,6 +6314,7 @@ func (ms *MutableStateImpl) cleanupTransaction() error {
ms.timerInfosUserDataUpdated = make(map[string]struct{})
ms.activityInfosUserDataUpdated = make(map[int64]struct{})
ms.reapplyEventsCandidate = nil
ms.subStateMachineDeleted = false

ms.stateInDB = ms.executionState.State
ms.nextEventIDInDB = ms.GetNextEventID()
Expand Down Expand Up @@ -7351,16 +7376,9 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx
}

func (ms *MutableStateImpl) syncSubStateMachinesByType(incoming map[string]*persistencespb.StateMachineMap) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yycptt do you see an issue with this? I assume that we always just want to do a full sync here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm are we still using the TransitionCount field in StateMachineNode? If no, then I think this should work.

We can also remove the PopTask() logic in workflowStateReplicator.applySnapshot I think as we are no longer calling node.Sync which regenerates the tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when source cluster is generating snapshot, the TransitionCount is reset to 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code, looks like TransitionCount is checked in ValidateNotTransitioned, and ValidateNotTransitioned is used only in components/dummy and hsmtest.

@bergundy can you confirm?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use TransitionCount anymore. It's only used in unit tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PopTask() removed.

// check if there is node been deleted
currentHSM := ms.HSM()

// we don't care about the root here which is the entire mutable state
incomingHSM, err := hsm.NewRoot(
ms.shard.StateMachineRegistry(),
StateMachineType,
ms,
incoming,
ms,
)
incomingHSM, err := hsm.NewRoot(ms.shard.StateMachineRegistry(), StateMachineType, ms, incoming, ms)
if err != nil {
return err
}
Expand All @@ -7370,22 +7388,19 @@ func (ms *MutableStateImpl) syncSubStateMachinesByType(incoming map[string]*pers
// skip root which is the entire mutable state
return nil
}

incomingNodePath := incomingNode.Path()
currentNode, err := currentHSM.Child(incomingNodePath)
if err != nil {
// 1. Already done history resend if needed before,
// and node creation today always associated with an event
// 2. Node deletion is not supported right now.
// Based on 1 and 2, node should always be found here.
return err
_, err := currentHSM.Child(incomingNodePath)
if err != nil && errors.Is(err, hsm.ErrStateMachineNotFound) {
ms.subStateMachineDeleted = true
return nil
}

return currentNode.Sync(incomingNode)
return err
}); err != nil {
return err
}

ms.executionInfo.SubStateMachinesByType = incoming
ms.mustInitHSM()
Comment on lines +7402 to +7403
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just please make sure that we regenerate the relevant timers from mutable state and delete any timers for state machines that were deleted during the full sync.
The important thing to note here is that the state machine timer group has a "scheduled" flag that indicates that there's a corresponding task in the queue, we want to make sure we don't regenerate the already scheduled tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated as discussed offline.

return nil
}

Expand Down Expand Up @@ -7417,6 +7432,8 @@ func (ms *MutableStateImpl) applyTombstones(tombstoneBatches []*persistencespb.S
if _, ok := ms.pendingSignalInfoIDs[tombstone.GetSignalExternalInitiatedEventId()]; ok {
err = ms.DeletePendingSignal(tombstone.GetSignalExternalInitiatedEventId())
}
case *persistencespb.StateMachineTombstone_StateMachinePath:
err = ms.DeleteSubStateMachine(tombstone.GetStateMachinePath())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also need to TrimStateMachineTimers when nodes are being deleted.

I have a TODO in task_refresher.go about this (~L660). Probably need to find a way to trigger the trim only once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to trim every time when we close a transaction, @justinp-tt is working on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a flag to track if subStateMachine is deleted.

default:
// TODO: updateID and stateMachinePath
err = serviceerror.NewInternal("unknown tombstone type")
Expand Down Expand Up @@ -7563,3 +7580,7 @@ func (ms *MutableStateImpl) AddReapplyCandidateEvent(event *historypb.HistoryEve
func (ms *MutableStateImpl) GetReapplyCandidateEvents() []*historypb.HistoryEvent {
return ms.reapplyEventsCandidate
}

func (ms *MutableStateImpl) IsSubStateMachineDeleted() bool {
return ms.subStateMachineDeleted
}
28 changes: 28 additions & 0 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions service/history/workflow/state_machine_timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/tasks"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -36,10 +37,16 @@ import (
// AddNextStateMachineTimerTask generates a state machine timer task if the first deadline doesn't have a task scheduled
// yet.
func AddNextStateMachineTimerTask(ms MutableState) {
timers := ms.GetExecutionInfo().StateMachineTimers
// filter out empty timer groups
timers := util.FilterSlice(ms.GetExecutionInfo().StateMachineTimers, func(timerGroup *persistencespb.StateMachineTimerGroup) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think you can use slices.DeleteFunc here but not critical.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

return len(timerGroup.Infos) > 0
})
ms.GetExecutionInfo().StateMachineTimers = timers

if len(timers) == 0 {
return
}

timerGroup := timers[0]
// We already have a timer for this deadline.
if timerGroup.Scheduled {
Expand Down Expand Up @@ -122,7 +129,9 @@ func TrimStateMachineTimers(

trimmedTaskInfos = append(trimmedTaskInfos, taskInfo)
}
if len(trimmedTaskInfos) > 0 {
if len(trimmedTaskInfos) > 0 || timerGroup.Scheduled {
// We still want to keep the timer group if it has been scheduled even if it has no task info.
// This will prevent us from scheduling a new timer task for the same group.
trimmedStateMachineTimers = append(trimmedStateMachineTimers, &persistencespb.StateMachineTimerGroup{
Infos: trimmedTaskInfos,
Deadline: timerGroup.Deadline,
Expand Down
4 changes: 1 addition & 3 deletions service/history/workflow/task_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,7 @@ func (r *TaskRefresherImpl) refreshTasksForSubStateMachines(
return err
}

if len(nodesToRefresh) != 0 {
// TODO: after hsm node tombstone is tracked in mutable state,
// also trigger trim when there are new tombstones after minVersionedTransition
if len(nodesToRefresh) != 0 || mutableState.IsSubStateMachineDeleted() {
if err := TrimStateMachineTimers(mutableState, minVersionedTransition); err != nil {
return err
}
Expand Down
9 changes: 4 additions & 5 deletions tests/xdc/nexus_state_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,10 @@ func TestNexusStateReplicationTestSuite(t *testing.T) {
name: "DisableTransitionHistory",
enableTransitionHistory: false,
},
// TODO(hai719): Enable this test once state based replication works with HSM node deletion.
// {
// name: "EnableTransitionHistory",
// enableTransitionHistory: true,
// },
{
name: "EnableTransitionHistory",
enableTransitionHistory: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
s := &NexusStateReplicationSuite{}
Expand Down
Loading