-
Notifications
You must be signed in to change notification settings - Fork 893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
handle state machine deletion for state-based replication #7177
Conversation
@@ -4264,6 +4264,29 @@ func (ms *MutableStateImpl) AddWorkflowExecutionUpdateAdmittedEvent(request *upd | |||
return event, nil | |||
} | |||
|
|||
func (ms *MutableStateImpl) DeleteSubStateMachine(path *persistencespb.StateMachinePath) error { | |||
incomingPath := []hsm.Key{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incomingPath := []hsm.Key{} | |
incomingPath := make([]hsm.Key, len(path)) |
And then you can just assign with the index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
ms.logDataInconsistency() | ||
return nil | ||
} | ||
return root.DeleteChild(node.Key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'd need to find the parent of the node to delete the immediate child.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
@@ -7265,41 +7288,8 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx | |||
} | |||
|
|||
func (ms *MutableStateImpl) syncSubStateMachinesByType(incoming map[string]*persistencespb.StateMachineMap) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yycptt do you see an issue with this? I assume that we always just want to do a full sync here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm are we still using the TransitionCount
field in StateMachineNode? If no, then I think this should work.
We can also remove the PopTask()
logic in workflowStateReplicator.applySnapshot I think as we are no longer calling node.Sync which regenerates the tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when source cluster is generating snapshot, the TransitionCount is reset to 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the code, looks like TransitionCount
is checked in ValidateNotTransitioned
, and ValidateNotTransitioned
is used only in components/dummy and hsmtest.
@bergundy can you confirm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use TransitionCount
anymore. It's only used in unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PopTask() removed.
@@ -7265,41 +7288,8 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx | |||
} | |||
|
|||
func (ms *MutableStateImpl) syncSubStateMachinesByType(incoming map[string]*persistencespb.StateMachineMap) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm are we still using the TransitionCount
field in StateMachineNode? If no, then I think this should work.
We can also remove the PopTask()
logic in workflowStateReplicator.applySnapshot I think as we are no longer calling node.Sync which regenerates the tasks.
@@ -7331,6 +7321,8 @@ func (ms *MutableStateImpl) applyTombstones(tombstoneBatches []*persistencespb.S | |||
if _, ok := ms.pendingSignalInfoIDs[tombstone.GetSignalExternalInitiatedEventId()]; ok { | |||
err = ms.DeletePendingSignal(tombstone.GetSignalExternalInitiatedEventId()) | |||
} | |||
case *persistencespb.StateMachineTombstone_StateMachinePath: | |||
err = ms.DeleteSubStateMachine(tombstone.GetStateMachinePath()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also need to TrimStateMachineTimers
when nodes are being deleted.
I have a TODO in task_refresher.go about this (~L660). Probably need to find a way to trigger the trim only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to trim every time when we close a transaction, @justinp-tt is working on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a flag to track if subStateMachine is deleted.
return err | ||
} | ||
ms.logError( | ||
fmt.Sprintf("unable to find path: %v in subStateMachine", incomingPath), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I think this could happen? If the mutation being synced may have a start point older than the workflow's current versioned transition. So some tombstones have already been synced before.
well I guess I just can't prove that situation won't happen... 🤷♂️
It's just printing logs so no strong opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
components/nexusoperations/events.go
Outdated
@@ -306,12 +306,5 @@ func findOperationNode(root *hsm.Node, event *historypb.HistoryEvent) (*hsm.Node | |||
} | |||
|
|||
func maybeDeleteNode(node *hsm.Node) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can get rid of this function entirely, it's just confusing now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
ms.executionInfo.SubStateMachinesByType = incoming | ||
ms.mustInitHSM() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just please make sure that we regenerate the relevant timers from mutable state and delete any timers for state machines that were deleted during the full sync.
The important thing to note here is that the state machine timer group has a "scheduled" flag that indicates that there's a corresponding task in the queue, we want to make sure we don't regenerate the already scheduled tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated as discussed offline.
if err != nil { | ||
return err | ||
} | ||
ms.subStateMachineDeleted = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this being reset anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reset in cleanupTransaction()
@@ -36,10 +37,16 @@ import ( | |||
// AddNextStateMachineTimerTask generates a state machine timer task if the first deadline doesn't have a task scheduled | |||
// yet. | |||
func AddNextStateMachineTimerTask(ms MutableState) { | |||
timers := ms.GetExecutionInfo().StateMachineTimers | |||
// filter out empty timer groups | |||
timers := util.FilterSlice(ms.GetExecutionInfo().StateMachineTimers, func(timerGroup *persistencespb.StateMachineTimerGroup) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think you can use slices.DeleteFunc
here but not critical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
What changed?
Handle state machine deletion for state-based replication.
Why?
Deletion is needed as we want to provide support for more nexus operations.
How did you test it?
unit test.
Potential risks
Documentation
Is hotfix candidate?