-
Notifications
You must be signed in to change notification settings - Fork 681
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] use deep copy of bit arrays when getting array node state #5681
Conversation
If there is an error updating a [FlyteWorkflow CRD](https://github.com/unionai/flyte/blob/6a7207c5345604a28a9d4e3699becff767f520f5/flytepropeller/pkg/controller/handler.go#L378), then the propeller streak ends without the CRD getting updated and the in-memory copy of the FlyteWorkflow is not utilized on the next loop. [TaskPhaseVersion](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L239) is stored in the FlyteWorkflow. This is incremented when there is an update to node/subnode state to ensure that events are unique. If the events stay in the same state and have the same TaskPhaseVersion, then they [get short-circuited and don't get emitted to admin](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/events/admin_eventsink.go#L59) or will get returned as an [AlreadyExists error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flyteadmin/pkg/manager/impl/task_execution_manager.go#L172) and get [handled in propeller to not bubble up in an error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/nodes/node_exec_context.go#L38). We can run into issues with ArrayNode eventing when: - array node handler increments task phase version from "0" to "1" - admin event sink emits event with version "1" - the propeller controller is not able to update the FlyteWorkflow CRD, so the ArrayNodeStatus indicates taskPhaseVersion is still 0 - next loop, array node handler increments task phase version from "0" to "1" - admin event sink prevents the event from getting emitted as an event with the same ID has already been received. No error is bubbled up. This means we lose subnode state until there is an event that contains an update to that subnode. If the lost state is the subnode reaching a terminal state, then the subnode state (from admin/UI) is "stuck" in a non-terminal state. I confirmed this to be an issue in the load-test-cluster. Whenever, there was an [error syncing the FlyteWorkflow](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/workers.go#L91), the next round of eventing in ArrayNode would fail unless the ArrayNode phase changed. - added unit test - tested locally in sandbox - test in dogfood - https://buildkite.com/unionai/managed-cluster-staging-sync/builds/4398#01914a1a-f6d6-42a5-b41b-7b6807f27370 - should be fine to rollout to prod Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F). - [x] To be upstreamed to OSS fixes: https://linear.app/unionai/issue/COR-1534/bug-arraynode-shows-non-complete-jobs-in-ui-when-the-job-is-actually * [x] Added tests * [x] Ran a deploy dry run and shared the terraform plan * [ ] Added logging and metrics * [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list) * [ ] Updated documentation Signed-off-by: Paul Dittamo <[email protected]>
* handle already exists error on array node abort Signed-off-by: Paul Dittamo <[email protected]> * update comment Signed-off-by: Paul Dittamo <[email protected]> --------- Signed-off-by: Paul Dittamo <[email protected]>
* set cause for already exists EventError Signed-off-by: Paul Dittamo <[email protected]> * add nil check event error Signed-off-by: Paul Dittamo <[email protected]> * lint Signed-off-by: Paul Dittamo <[email protected]> --------- Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: Paul Dittamo <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #5681 +/- ##
=======================================
Coverage 36.17% 36.17%
=======================================
Files 1302 1302
Lines 109556 109580 +24
=======================================
+ Hits 39630 39643 +13
- Misses 65786 65797 +11
Partials 4140 4140
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
.run/single-binary.run.xml
Outdated
@@ -1,13 +0,0 @@ | |||
<component name="ProjectRunConfigurationManager"> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you accidentally delete this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh thanks for catching this. Unsure how I did that. Let me add it back.
This reverts commit dde7595. Signed-off-by: Paul Dittamo <[email protected]>
if in.ArrayNodeStatus != nil { | ||
in, out := &in.ArrayNodeStatus, &out.ArrayNodeStatus | ||
*out = (*in).DeepCopy() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you document how you generated the change in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this manually on our fork when working on an array node feature a couple months ago. I couldn't get the script to run.
@@ -259,6 +259,7 @@ const ( | |||
type EventConfig struct { | |||
RawOutputPolicy RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."` | |||
FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."` | |||
ErrorOnAlreadyExists bool `json:"error-on-already-exists" pflag:",Whether to return an error when an event already exists."` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worth enabling this in single-binary by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should never really be enabled except for array node. I didn't want to update the record task event function, but I guess that might be better if users were to unknowingly change this and have executions fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I can just remove the ability to set this and keep it in the eventconfig struct
ah darn, I should've made that change here #5680
Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: Paul Dittamo <[email protected]>
@@ -259,7 +259,8 @@ const ( | |||
type EventConfig struct { | |||
RawOutputPolicy RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."` | |||
FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."` | |||
ErrorOnAlreadyExists bool `json:"error-on-already-exists" pflag:",Whether to return an error when an event already exists."` | |||
// only meant to be overridden for certain node types that have different eventing behavior such as ArrayNode | |||
ErrorOnAlreadyExists bool `json:"-"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the purpose of this now, thanks for the comment. We're adding a field to a central config object and assuming that such field will only be used in a particular situation. This feels hacky to me, but maybe there's a precedent and good reason to follow this pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup - it definitely is hacky. Looking forward when/if we eventually re-work eventing in propeller
* [BUG] add retries to handle array node eventing race condition (#421) If there is an error updating a [FlyteWorkflow CRD](https://github.com/unionai/flyte/blob/6a7207c5345604a28a9d4e3699becff767f520f5/flytepropeller/pkg/controller/handler.go#L378), then the propeller streak ends without the CRD getting updated and the in-memory copy of the FlyteWorkflow is not utilized on the next loop. [TaskPhaseVersion](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L239) is stored in the FlyteWorkflow. This is incremented when there is an update to node/subnode state to ensure that events are unique. If the events stay in the same state and have the same TaskPhaseVersion, then they [get short-circuited and don't get emitted to admin](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/events/admin_eventsink.go#L59) or will get returned as an [AlreadyExists error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flyteadmin/pkg/manager/impl/task_execution_manager.go#L172) and get [handled in propeller to not bubble up in an error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/nodes/node_exec_context.go#L38). We can run into issues with ArrayNode eventing when: - array node handler increments task phase version from "0" to "1" - admin event sink emits event with version "1" - the propeller controller is not able to update the FlyteWorkflow CRD, so the ArrayNodeStatus indicates taskPhaseVersion is still 0 - next loop, array node handler increments task phase version from "0" to "1" - admin event sink prevents the event from getting emitted as an event with the same ID has already been received. No error is bubbled up. This means we lose subnode state until there is an event that contains an update to that subnode. If the lost state is the subnode reaching a terminal state, then the subnode state (from admin/UI) is "stuck" in a non-terminal state. I confirmed this to be an issue in the load-test-cluster. Whenever, there was an [error syncing the FlyteWorkflow](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/workers.go#L91), the next round of eventing in ArrayNode would fail unless the ArrayNode phase changed. - added unit test - tested locally in sandbox - test in dogfood - https://buildkite.com/unionai/managed-cluster-staging-sync/builds/4398#01914a1a-f6d6-42a5-b41b-7b6807f27370 - should be fine to rollout to prod Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F). - [x] To be upstreamed to OSS fixes: https://linear.app/unionai/issue/COR-1534/bug-arraynode-shows-non-complete-jobs-in-ui-when-the-job-is-actually * [x] Added tests * [x] Ran a deploy dry run and shared the terraform plan * [ ] Added logging and metrics * [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list) * [ ] Updated documentation Signed-off-by: Paul Dittamo <[email protected]> * handle already exists error on array node abort (#427) * handle already exists error on array node abort Signed-off-by: Paul Dittamo <[email protected]> * update comment Signed-off-by: Paul Dittamo <[email protected]> --------- Signed-off-by: Paul Dittamo <[email protected]> * [BUG] set cause for already exists EventError (#432) * set cause for already exists EventError Signed-off-by: Paul Dittamo <[email protected]> * add nil check event error Signed-off-by: Paul Dittamo <[email protected]> * lint Signed-off-by: Paul Dittamo <[email protected]> --------- Signed-off-by: Paul Dittamo <[email protected]> * add deep copy for array node status Signed-off-by: Paul Dittamo <[email protected]> * add deep copy for array node status Signed-off-by: Paul Dittamo <[email protected]> * use deep copy of bit arrays when getting array node state Signed-off-by: Paul Dittamo <[email protected]> * Revert "add deep copy for array node status" This reverts commit dde7595. Signed-off-by: Paul Dittamo <[email protected]> * ignore ErrorOnAlreadyExists when marshalling event config Signed-off-by: Paul Dittamo <[email protected]> --------- Signed-off-by: Paul Dittamo <[email protected]> Signed-off-by: pmahindrakar-oss <[email protected]>
…eorg#5681) * [BUG] add retries to handle array node eventing race condition (flyteorg#421) If there is an error updating a [FlyteWorkflow CRD](https://github.com/unionai/flyte/blob/6a7207c5345604a28a9d4e3699becff767f520f5/flytepropeller/pkg/controller/handler.go#L378), then the propeller streak ends without the CRD getting updated and the in-memory copy of the FlyteWorkflow is not utilized on the next loop. [TaskPhaseVersion](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L239) is stored in the FlyteWorkflow. This is incremented when there is an update to node/subnode state to ensure that events are unique. If the events stay in the same state and have the same TaskPhaseVersion, then they [get short-circuited and don't get emitted to admin](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/events/admin_eventsink.go#L59) or will get returned as an [AlreadyExists error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flyteadmin/pkg/manager/impl/task_execution_manager.go#L172) and get [handled in propeller to not bubble up in an error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/nodes/node_exec_context.go#L38). We can run into issues with ArrayNode eventing when: - array node handler increments task phase version from "0" to "1" - admin event sink emits event with version "1" - the propeller controller is not able to update the FlyteWorkflow CRD, so the ArrayNodeStatus indicates taskPhaseVersion is still 0 - next loop, array node handler increments task phase version from "0" to "1" - admin event sink prevents the event from getting emitted as an event with the same ID has already been received. No error is bubbled up. This means we lose subnode state until there is an event that contains an update to that subnode. If the lost state is the subnode reaching a terminal state, then the subnode state (from admin/UI) is "stuck" in a non-terminal state. I confirmed this to be an issue in the load-test-cluster. Whenever, there was an [error syncing the FlyteWorkflow](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/workers.go#L91), the next round of eventing in ArrayNode would fail unless the ArrayNode phase changed. - added unit test - tested locally in sandbox - test in dogfood - https://buildkite.com/unionai/managed-cluster-staging-sync/builds/4398#01914a1a-f6d6-42a5-b41b-7b6807f27370 - should be fine to rollout to prod Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F). - [x] To be upstreamed to OSS fixes: https://linear.app/unionai/issue/COR-1534/bug-arraynode-shows-non-complete-jobs-in-ui-when-the-job-is-actually * [x] Added tests * [x] Ran a deploy dry run and shared the terraform plan * [ ] Added logging and metrics * [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list) * [ ] Updated documentation Signed-off-by: Paul Dittamo <[email protected]> * handle already exists error on array node abort (flyteorg#427) * handle already exists error on array node abort Signed-off-by: Paul Dittamo <[email protected]> * update comment Signed-off-by: Paul Dittamo <[email protected]> --------- Signed-off-by: Paul Dittamo <[email protected]> * [BUG] set cause for already exists EventError (flyteorg#432) * set cause for already exists EventError Signed-off-by: Paul Dittamo <[email protected]> * add nil check event error Signed-off-by: Paul Dittamo <[email protected]> * lint Signed-off-by: Paul Dittamo <[email protected]> --------- Signed-off-by: Paul Dittamo <[email protected]> * add deep copy for array node status Signed-off-by: Paul Dittamo <[email protected]> * add deep copy for array node status Signed-off-by: Paul Dittamo <[email protected]> * use deep copy of bit arrays when getting array node state Signed-off-by: Paul Dittamo <[email protected]> * Revert "add deep copy for array node status" This reverts commit dde7595. Signed-off-by: Paul Dittamo <[email protected]> * ignore ErrorOnAlreadyExists when marshalling event config Signed-off-by: Paul Dittamo <[email protected]> --------- Signed-off-by: Paul Dittamo <[email protected]> Signed-off-by: Bugra Gedik <[email protected]>
Upstreaming https://github.com/unionai/flyte/pull/428
Why are the changes needed?
ArrayNodeState is not supposed to get persisted unless the handle loop completes and successfully calls PutArrayNodeState
However, any SetItem call on a bitarray (ex) that is used to store subnode phases + subtask phases will persist even if the Handle loop does not successfully call PutArrayNodeState at the end of the handle loop.
This is because the BitSet field in a bitarray is a pointer -> SetItem modifies the underlying data with an explicit call to PutArrayNodeState.
This can cause issues as we don't finish handling a subnode at a given phase if there is an error causing a return UnknownTransition after we persist subnode state.
Example issue:
subnode "1" gets set to a terminal node phase
eventing fails
next handle loops has the accidentally updated state so doesn't get re-evaluated at the previous, non-terminal phase so we don't propagate the necessary eventing to set the subnode's correct phases
What changes were proposed in this pull request?
How was this patch tested?
Setup process
Screenshots
Check all the applicable boxes
Related PRs
branched off #5680 instead of master to reduce conflicts while upstreaming. Will merge that in first to clear up diff.
Docs link