Skip to content

Commit

Permalink
[BUG] use deep copy of bit arrays when getting array node state (flyt…
Browse files Browse the repository at this point in the history
…eorg#5681)

* [BUG] add retries to handle array node eventing race condition (flyteorg#421)

If there is an error updating a [FlyteWorkflow CRD](https://github.com/unionai/flyte/blob/6a7207c5345604a28a9d4e3699becff767f520f5/flytepropeller/pkg/controller/handler.go#L378), then the propeller streak ends without the CRD getting updated and the in-memory copy of the FlyteWorkflow is not utilized on the next loop.

[TaskPhaseVersion](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L239) is stored in the FlyteWorkflow. This is incremented when there is an update to node/subnode state to ensure that events are unique. If the events stay in the same state and have the same TaskPhaseVersion, then they [get short-circuited and don't get emitted to admin](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/events/admin_eventsink.go#L59) or will get returned as an [AlreadyExists error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flyteadmin/pkg/manager/impl/task_execution_manager.go#L172) and get [handled in propeller to not bubble up in an error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/nodes/node_exec_context.go#L38).

We can run into issues with ArrayNode eventing when:
- array node handler increments task phase version from "0" to "1"
- admin event sink emits event with version "1"
- the propeller controller is not able to update the FlyteWorkflow CRD, so the ArrayNodeStatus indicates taskPhaseVersion is still 0
- next loop, array node handler increments task phase version from "0" to "1"
- admin event sink prevents the event from getting emitted as an event with the same ID has already been received. No error is bubbled up.

This means we lose subnode state until there is an event that contains an update to that subnode. If the lost state is the subnode reaching a terminal state, then the subnode state (from admin/UI) is "stuck" in a non-terminal state.

I confirmed this to be an issue in the load-test-cluster. Whenever, there was an [error syncing the FlyteWorkflow](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/workers.go#L91), the next round of eventing in ArrayNode would fail unless the ArrayNode phase changed.

- added unit test
- tested locally in sandbox
- test in dogfood - https://buildkite.com/unionai/managed-cluster-staging-sync/builds/4398#01914a1a-f6d6-42a5-b41b-7b6807f27370

- should be fine to rollout to prod

Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed to OSS

fixes: https://linear.app/unionai/issue/COR-1534/bug-arraynode-shows-non-complete-jobs-in-ui-when-the-job-is-actually

* [x] Added tests
* [x] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation

Signed-off-by: Paul Dittamo <[email protected]>

* handle already exists error on array node abort (flyteorg#427)

* handle already exists error on array node abort

Signed-off-by: Paul Dittamo <[email protected]>

* update comment

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* [BUG] set cause for already exists EventError (flyteorg#432)

* set cause for already exists EventError

Signed-off-by: Paul Dittamo <[email protected]>

* add nil check event error

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* add deep copy for array node status

Signed-off-by: Paul Dittamo <[email protected]>

* add deep copy for array node status

Signed-off-by: Paul Dittamo <[email protected]>

* use deep copy of bit arrays when getting array node state

Signed-off-by: Paul Dittamo <[email protected]>

* Revert "add deep copy for array node status"

This reverts commit dde7595.

Signed-off-by: Paul Dittamo <[email protected]>

* ignore ErrorOnAlreadyExists when  marshalling event config

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: Bugra Gedik <[email protected]>
  • Loading branch information
pvditt authored and bgedik committed Sep 12, 2024
1 parent 2fd51eb commit 85f7d3d
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 20 deletions.
21 changes: 21 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,27 @@ func (in *ArrayNodeStatus) SetTaskPhaseVersion(taskPhaseVersion uint32) {
}
}

func (in *ArrayNodeStatus) DeepCopyInto(out *ArrayNodeStatus) {
*out = *in
out.MutableStruct = in.MutableStruct

if in.ExecutionError != nil {
in, out := &in.ExecutionError, &out.ExecutionError
*out = new(core.ExecutionError)
*out = *in
}
}

func (in *ArrayNodeStatus) DeepCopy() *ArrayNodeStatus {
if in == nil {
return nil
}

out := &ArrayNodeStatus{}
in.DeepCopyInto(out)
return out
}

type NodeStatus struct {
MutableStruct
Phase NodePhase `json:"phase,omitempty"`
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ const (
type EventConfig struct {
RawOutputPolicy RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."`
FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."`
ErrorOnAlreadyExists bool `json:"error-on-already-exists" pflag:",Whether to return an error when an event already exists."`
// only meant to be overridden for certain node types that have different eventing behavior such as ArrayNode
ErrorOnAlreadyExists bool `json:"-"`
}

// ParallelismBehavior defines how ArrayNode should handle subNode parallelism by default
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions flytepropeller/pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 59 additions & 11 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseRunning,
},
expectedTaskPhaseVersion: 1,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
Expand All @@ -559,7 +563,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
subNodeTransitions: []handler.Transition{
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseQueued,
},
expectedTaskPhaseVersion: 1,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING},
Expand All @@ -580,7 +588,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseRunning,
},
expectedTaskPhaseVersion: 1,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
Expand All @@ -601,7 +613,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
subNodeTransitions: []handler.Transition{
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseQueued,
},
expectedTaskPhaseVersion: 1,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING},
Expand All @@ -619,8 +635,12 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
core.PhaseUndefined,
core.PhaseUndefined,
},
subNodeTransitions: []handler.Transition{},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
subNodeTransitions: []handler.Transition{},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
},
expectedTaskPhaseVersion: 0,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{},
Expand All @@ -642,7 +662,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseRunning,
},
expectedTaskPhaseVersion: 1,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
Expand All @@ -663,7 +687,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseSucceeded,
v1alpha1.NodePhaseSucceeded,
},
expectedTaskPhaseVersion: 0,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_SUCCEEDED, idlcore.TaskExecution_SUCCEEDED},
Expand All @@ -684,7 +712,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(0, "", "", &handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseSucceeded,
v1alpha1.NodePhaseFailed,
},
expectedTaskPhaseVersion: 0,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_SUCCEEDED, idlcore.TaskExecution_FAILED},
Expand All @@ -704,7 +736,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(0, "", "", &handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseFailing,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseFailing,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseFailed,
v1alpha1.NodePhaseSucceeded,
},
expectedTaskPhaseVersion: 0,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_FAILED, idlcore.TaskExecution_SUCCEEDED},
Expand All @@ -724,7 +760,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseRunning,
},
expectedTaskPhaseVersion: 2,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
Expand All @@ -749,6 +789,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
},
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
useFakeEventRecorder: true,
eventRecorderFailures: 5,
Expand All @@ -771,6 +815,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
},
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
useFakeEventRecorder: true,
eventRecorderError: fmt.Errorf("err"),
Expand Down
24 changes: 20 additions & 4 deletions flytepropeller/pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,27 @@ func (n nodeStateManager) GetArrayNodeState() handler.ArrayNodeState {
if an != nil {
as.Phase = an.GetArrayNodePhase()
as.Error = an.GetExecutionError()
as.SubNodePhases = an.GetSubNodePhases()
as.SubNodeTaskPhases = an.GetSubNodeTaskPhases()
as.SubNodeRetryAttempts = an.GetSubNodeRetryAttempts()
as.SubNodeSystemFailures = an.GetSubNodeSystemFailures()
as.TaskPhaseVersion = an.GetTaskPhaseVersion()

subNodePhases := an.GetSubNodePhases()
if subNodePhasesCopy := subNodePhases.DeepCopy(); subNodePhasesCopy != nil {
as.SubNodePhases = *subNodePhasesCopy
}

subNodeTaskPhases := an.GetSubNodeTaskPhases()
if subNodeTaskPhasesCopy := subNodeTaskPhases.DeepCopy(); subNodeTaskPhasesCopy != nil {
as.SubNodeTaskPhases = *subNodeTaskPhasesCopy
}

subNodeRetryAttempts := an.GetSubNodeRetryAttempts()
if subNodeRetryAttemptsCopy := subNodeRetryAttempts.DeepCopy(); subNodeRetryAttemptsCopy != nil {
as.SubNodeRetryAttempts = *subNodeRetryAttemptsCopy
}

subNodeSystemFailures := an.GetSubNodeSystemFailures()
if subNodeSystemFailuresCopy := subNodeSystemFailures.DeepCopy(); subNodeSystemFailuresCopy != nil {
as.SubNodeSystemFailures = *subNodeSystemFailuresCopy
}
}
return as
}
Expand Down

0 comments on commit 85f7d3d

Please sign in to comment.