From 1d4d4b73313b48660a2f047407821121e1d56757 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Tue, 13 Aug 2024 15:14:20 +0200 Subject: [PATCH 01/13] checkpoint Signed-off-by: Katrina Rogan --- rfc/system/RFC-0000-execution-concurrency.md | 165 +++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 rfc/system/RFC-0000-execution-concurrency.md diff --git a/rfc/system/RFC-0000-execution-concurrency.md b/rfc/system/RFC-0000-execution-concurrency.md new file mode 100644 index 0000000000..26569120b7 --- /dev/null +++ b/rfc/system/RFC-0000-execution-concurrency.md @@ -0,0 +1,165 @@ +# [RFC Template] Title + +**Authors:** + +- @eapolinario +- @katrogan + +## 1 Executive Summary + +This is a proposal to implement workflow execution concurrency, defined at the launch plan level. + +## 2 Motivation + +See the following issues +1. https://github.com/flyteorg/flyte/issues/267 +2. https://github.com/flyteorg/flyte/issues/420 +3. https://github.com/flyteorg/flyte/discussions/3754 +4. https://github.com/flyteorg/flyte/issues/5125 + +## 3 Proposed Implementation + +Introduce a new attribute in [LaunchPlan.get_or_create](https://github.com/flyteorg/flytekit/blob/bc2e000cc8d710ed3d135cdbf3cbf257c5da8100/flytekit/core/launch_plan.py#L195) to allow specifying execution concurrency + +e.g. +```python +my_lp = LaunchPlan.get_or_create( + name="my_serial_lp", + workflow=my_wf, + ... + concurrency=Concurrency( + max=1, # defines how many executions with this launch plan can run in parallel + policy=ConcurrencyPolicy.WAIT # defines the policy to apply when the max concurrency is reached + ) +) +``` + +### FlyteIDL +We propose adding a new IDL message to capture concurrency behavior at CreateExecutionTime + +```protobuf +message Concurrency { + // Defines how many executions with this launch plan can run in parallel + uint32 max = 1; + + // Defines how to handle the execution when the max concurrency is reached. + ConcurrencyPolicy policy = 2; +} + +enum ConcurrencyPolicy { + UNSPECIFIED = 0; + + // wait for previous executions to terminate before starting a new one + WAIT = 1; + + // fail the CreateExecution request and do not permit the execution to start + ABORT = 2; +} + +message LaunchPlanSpec { + ... + + Concurrency concurrency = X; +} + +// embedded in the ExecutionClosure +message ExecutionStateChangeDetails { + ... + + // Includes the reason for the `PENDING` phase + string description = X; + + +} + +// Can also add to ExecutionSpec to specify execution time overrides + +``` + +### FlyteAdmin +At a broad level +1. At CreateExecution time, if the launch plan in the ExecutionSpec has a concurrency policy + 2. Create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails`. + 3. or fail the request when the concurrency policy is set to `ABORT` + 3. Do not create the workflow CRD + +Introduce an async reconciliation loop in FlyteAdmin to poll for all pending executions: +1. Query all pending executions by timestamp ascending (open question, should we prefer more recent executions instead? should we make this configurable?) + 2. as an optimization, could even parallelize this into goroutines, one per launch plan distinct launch plan ID that has any `PENDING` execution +2. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions with an identical launch plan ID +3. If there are none, select the oldest pending execution for that launch plan + 4. create the workflow CRD + 5. open question: also update its phase in the database to `QUEUED`? + 6. let execution proceed + +We should consider adding an index to the executions table to include +- launch_plan_id +- phase +- created_at + +#### Open Questions +- Should we always attempt to schedule pending executions in ascending order of creation time? +- Should we propagate concurrency policies to child executions? + +## 4 Metrics & Dashboards + +*What are the main metrics we should be measuring? For example, when interacting with an external system, it might be the external system latency. When adding a new table, how fast would it fill up?* + +## 5 Drawbacks + +*Are there any reasons why we should not do this? Here we aim to evaluate risk and check ourselves.* + +## 6 Alternatives + +*What are other ways of achieving the same outcome?* + +## 7 Potential Impact and Dependencies + +*Here, we aim to be mindful of our environment and generate empathy towards others who may be impacted by our decisions.* + +- *What other systems or teams are affected by this proposal?* +- *How could this be exploited by malicious attackers?* + +## 8 Unresolved questions + +*What parts of the proposal are still being defined or not covered by this proposal?* + +## 9 Conclusion + +*Here, we briefly outline why this is the right decision to make at this time and move forward!* + +## 10 RFC Process Guide, remove this section when done + +*By writing an RFC, you're giving insight to your team on the direction you're taking. There may not be a right or better decision in many cases, but we will likely learn from it. By authoring, you're making a decision on where you want us to go and are looking for feedback on this direction from your team members, but ultimately the decision is yours.* + +This document is a: + +- thinking exercise, prototype with words. +- historical record, its value may decrease over time. +- way to broadcast information. +- mechanism to build trust. +- tool to empower. +- communication channel. + +This document is not: + +- a request for permission. +- the most up to date representation of any process or system + +**Checklist:** + +- [ ] Copy template +- [ ] Draft RFC (think of it as a wireframe) +- [ ] Share as WIP with folks you trust to gut-check +- [ ] Send pull request when comfortable +- [ ] Label accordingly +- [ ] Assign reviewers +- [ ] Merge PR + +**Recommendations** + +- Tag RFC title with [WIP] if you're still ironing out details. +- Tag RFC title with [Newbie] if you're trying out something experimental or you're not entirely convinced of what you're proposing. +- Tag RFC title with [RR] if you'd like to schedule a review request to discuss the RFC. +- If there are areas that you're not convinced on, tag people who you consider may know about this and ask for their input. +- If you have doubts, ask on [#feature-discussions](https://slack.com/app_redirect?channel=CPQ3ZFQ84&team=TN89P6GGK) for help moving something forward. From 5f9a60f537b953001caa49d19659c75491b85085 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Tue, 13 Aug 2024 15:15:37 +0200 Subject: [PATCH 02/13] formatting Signed-off-by: Katrina Rogan --- rfc/system/RFC-0000-execution-concurrency.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rfc/system/RFC-0000-execution-concurrency.md b/rfc/system/RFC-0000-execution-concurrency.md index 26569120b7..77257916ab 100644 --- a/rfc/system/RFC-0000-execution-concurrency.md +++ b/rfc/system/RFC-0000-execution-concurrency.md @@ -79,9 +79,9 @@ message ExecutionStateChangeDetails { ### FlyteAdmin At a broad level 1. At CreateExecution time, if the launch plan in the ExecutionSpec has a concurrency policy - 2. Create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails`. - 3. or fail the request when the concurrency policy is set to `ABORT` - 3. Do not create the workflow CRD + 1. Create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails`. + 1. or fail the request when the concurrency policy is set to `ABORT` + 1. Do not create the workflow CRD Introduce an async reconciliation loop in FlyteAdmin to poll for all pending executions: 1. Query all pending executions by timestamp ascending (open question, should we prefer more recent executions instead? should we make this configurable?) From 9b6665126b587e88e52f4e4ce6974fcf2e9434e5 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Tue, 13 Aug 2024 15:16:24 +0200 Subject: [PATCH 03/13] formatting Signed-off-by: Katrina Rogan --- rfc/system/RFC-0000-execution-concurrency.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rfc/system/RFC-0000-execution-concurrency.md b/rfc/system/RFC-0000-execution-concurrency.md index 77257916ab..bf5053def7 100644 --- a/rfc/system/RFC-0000-execution-concurrency.md +++ b/rfc/system/RFC-0000-execution-concurrency.md @@ -85,12 +85,12 @@ At a broad level Introduce an async reconciliation loop in FlyteAdmin to poll for all pending executions: 1. Query all pending executions by timestamp ascending (open question, should we prefer more recent executions instead? should we make this configurable?) - 2. as an optimization, could even parallelize this into goroutines, one per launch plan distinct launch plan ID that has any `PENDING` execution + 1. as an optimization, could even parallelize this into goroutines, one per launch plan distinct launch plan ID that has any `PENDING` execution 2. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions with an identical launch plan ID 3. If there are none, select the oldest pending execution for that launch plan - 4. create the workflow CRD - 5. open question: also update its phase in the database to `QUEUED`? - 6. let execution proceed + 1. create the workflow CRD + 1. open question: also update its phase in the database to `QUEUED`? + 1. let execution proceed We should consider adding an index to the executions table to include - launch_plan_id From a108074b61d41c84aa3b43dc35ac67ffcf68d85a Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Tue, 13 Aug 2024 15:19:27 +0200 Subject: [PATCH 04/13] grammar Signed-off-by: Katrina Rogan --- rfc/system/RFC-0000-execution-concurrency.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rfc/system/RFC-0000-execution-concurrency.md b/rfc/system/RFC-0000-execution-concurrency.md index bf5053def7..077471ced3 100644 --- a/rfc/system/RFC-0000-execution-concurrency.md +++ b/rfc/system/RFC-0000-execution-concurrency.md @@ -85,9 +85,9 @@ At a broad level Introduce an async reconciliation loop in FlyteAdmin to poll for all pending executions: 1. Query all pending executions by timestamp ascending (open question, should we prefer more recent executions instead? should we make this configurable?) - 1. as an optimization, could even parallelize this into goroutines, one per launch plan distinct launch plan ID that has any `PENDING` execution + 1. as an optimization, could even parallelize this into goroutines, one per distinct launch plan ID that has any `PENDING` execution 2. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions with an identical launch plan ID -3. If there are none, select the oldest pending execution for that launch plan +3. If there are fewer than `MAX_CONCURRENCY` executions running, select the oldest pending execution for that launch plan 1. create the workflow CRD 1. open question: also update its phase in the database to `QUEUED`? 1. let execution proceed From 121066ca3c03d15a8ac6389f87819bf246e30421 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Tue, 20 Aug 2024 16:29:38 +0200 Subject: [PATCH 05/13] review comments, still need to flesh out impl Signed-off-by: Katrina Rogan --- rfc/system/RFC-0000-execution-concurrency.md | 49 ++++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/rfc/system/RFC-0000-execution-concurrency.md b/rfc/system/RFC-0000-execution-concurrency.md index 077471ced3..dd5aefa575 100644 --- a/rfc/system/RFC-0000-execution-concurrency.md +++ b/rfc/system/RFC-0000-execution-concurrency.md @@ -84,7 +84,7 @@ At a broad level 1. Do not create the workflow CRD Introduce an async reconciliation loop in FlyteAdmin to poll for all pending executions: -1. Query all pending executions by timestamp ascending (open question, should we prefer more recent executions instead? should we make this configurable?) +1. Query all pending executions by timestamp ascending 1. as an optimization, could even parallelize this into goroutines, one per distinct launch plan ID that has any `PENDING` execution 2. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions with an identical launch plan ID 3. If there are fewer than `MAX_CONCURRENCY` executions running, select the oldest pending execution for that launch plan @@ -97,17 +97,58 @@ We should consider adding an index to the executions table to include - phase - created_at +#### Prior Art +The flyteadmin native scheduler (https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) already implements a reconciliation loop to catch up on any missed schedules. + +#### Caveats +Executions are always tied to the versioned launch plan that triggered them (see [here](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/execution.go#L26)) +Therefore, this proposal only applies concurrency at the versioned launch plan level. + +If we wanted to support concurrency across launch plan versions we could add another index to the Executions table to include the launch plan named entity, that is the entry in the [NamedEntity table](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/named_entity.go#L39-L42) corresponding to the launch plan project, domain & name + +In models/execution.go: +```go + +type Execution struct { + + ... + // Already exists + LaunchPlanID uint `gorm:"index"` + // New field to make querying on the named entity + LaunchPlanNamedEntityID uint `gorm:"index"` + +} + +``` + +Then the reconciliation loop would query executions in a non-terminal phase matching the launch plan named entity ID instead of LaunchPlanID. + #### Open Questions - Should we always attempt to schedule pending executions in ascending order of creation time? + - Decision: We'll use FIFO scheduling by default but can extend scheduling behavior with an enum going forward. - Should we propagate concurrency policies to child executions? + - Decision: no. Child executions can define concurrency at the child launch plan level if necessary. ## 4 Metrics & Dashboards - -*What are the main metrics we should be measuring? For example, when interacting with an external system, it might be the external system latency. When adding a new table, how fast would it fill up?* +- Time spent in PENDING: It's useful to understand the duration spent in PENDING before a launch plan transitions to RUNNING +- It may be useful for Flyte platform operators to also configure alerts if an execution stays in PENDING for too long of a threshold ## 5 Drawbacks -*Are there any reasons why we should not do this? Here we aim to evaluate risk and check ourselves.* +The [executions model](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/execution.go) +already has indices on +- primary key index +- launch plan id +- workflow id +- task id (for single task executions) +- execution created at +- error kind (code) +- user who launched the execution +- state + +Database performance suffers as new indices are added (ref [[1](https://use-the-index-luke.com/sql/dml/insert)] [[2](https://www.timescale.com/learn/postgresql-performance-tuning-optimizing-database-indexes)]) +We could as an alternative, repurpose the existing launch plan index to include (launch plan id, phase, created at) to optimize the query for pending executions and not significantly affect queries on launch plan id leveraging the existing index. + ## 6 Alternatives From 4a88d7c7a52aeea895b8432d111f6645a4c657b4 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Thu, 22 Aug 2024 22:03:49 +0200 Subject: [PATCH 06/13] details Signed-off-by: Katrina Rogan --- rfc/system/RFC-0000-execution-concurrency.md | 100 ++++++++++++------- 1 file changed, 63 insertions(+), 37 deletions(-) diff --git a/rfc/system/RFC-0000-execution-concurrency.md b/rfc/system/RFC-0000-execution-concurrency.md index dd5aefa575..86921bd463 100644 --- a/rfc/system/RFC-0000-execution-concurrency.md +++ b/rfc/system/RFC-0000-execution-concurrency.md @@ -84,19 +84,40 @@ At a broad level 1. Do not create the workflow CRD Introduce an async reconciliation loop in FlyteAdmin to poll for all pending executions: -1. Query all pending executions by timestamp ascending - 1. as an optimization, could even parallelize this into goroutines, one per distinct launch plan ID that has any `PENDING` execution -2. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions with an identical launch plan ID -3. If there are fewer than `MAX_CONCURRENCY` executions running, select the oldest pending execution for that launch plan - 1. create the workflow CRD - 1. open question: also update its phase in the database to `QUEUED`? - 1. let execution proceed +1. 1x a minute: Query all pending executions by timestamp ascending, grouped by launch plan ID, roughly something like +```sql +SELECT e.* +FROM executions AS e +WHERE ( launch_plan_id, created_at ) IN (SELECT launch_plan_id, + Min(created_at) + FROM executions + WHERE phase = 'PENDING' + GROUP BY launch_plan_id); +``` +2. For each execution returned by the above query, `Add()` the pending execution to a [rate limiting workqueue](https://github.com/kubernetes/client-go/blob/master/util/workqueue/rate_limiting_queue.go#L27-L40) (as a suggestion) +3. In a separate goroutine, fetch items from the workqueue and individually process each execution entry + 1. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions matching the launch plan ID in the pending execution model + ```sql + select count(launch_plan_id) from executions where phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT') group by launch_plan_id; + ``` + 1. If there are fewer than `MAX_CONCURRENCY` executions running + 1. check that the execution is still in `PENDING` + 1. create the workflow CRD + 1. if the CRD already exists because we've previously processed this pending execution before we had a chance to update the DB state, swallow the already exists error gracefully + 1. conditionally mark the execution as `QUEUED` in the db if it's already in `PENDING` or `QUEUED` to not overwrite any events should the execution have already reported progress in the interim + 1. If creating the workflow CRD fails: mark the execution as `FAILED` in the db (and swallow any errors if the workflow is already in a terminal state should we have previously reported the failure after re-enqueuing the pending execution a previous loop). This will remove its eligiblity from the pending loop + 1. If there are already `MAX_CONCURRENCY` executions running, simply proceed to (iii.) + 1. Finally, always mark the queue item as [Done()](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L33) + +If we wanted further parallelization here, we could introduce a worker pool rather than having one async process read from the workqueue. We should consider adding an index to the executions table to include - launch_plan_id - phase - created_at + + #### Prior Art The flyteadmin native scheduler (https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) already implements a reconciliation loop to catch up on any missed schedules. @@ -123,12 +144,6 @@ type Execution struct { Then the reconciliation loop would query executions in a non-terminal phase matching the launch plan named entity ID instead of LaunchPlanID. -#### Open Questions -- Should we always attempt to schedule pending executions in ascending order of creation time? - - Decision: We'll use FIFO scheduling by default but can extend scheduling behavior with an enum going forward. -- Should we propagate concurrency policies to child executions? - - Decision: no. Child executions can define concurrency at the child launch plan level if necessary. - ## 4 Metrics & Dashboards - Time spent in PENDING: It's useful to understand the duration spent in PENDING before a launch plan transitions to RUNNING - It may be useful for Flyte platform operators to also configure alerts if an execution stays in PENDING for too long of a threshold @@ -152,7 +167,32 @@ We could as an alternative, repurpose the existing launch plan index to include ## 6 Alternatives -*What are other ways of achieving the same outcome?* +### Scheduling +This proposal purposefully uses FIFO scheduling but there is a chance we may want to define other scheduling orders or catch-up policies. + +To accomplish this, we can extend the `ConcurrenyPolicy` proto message to encapsulate scheduling behavior + +```protobuf +message Concurrency { + // Defines how many executions with this launch plan can run in parallel + uint32 max = 1; + + // Defines how to handle the execution when the max concurrency is reached. + ConcurrencyPolicy policy = 2; + + ConcurrencyScheduling scheduling = 3; +} + + +type ConcurrencyScheduling enum { + FIFO = 0; + FILO = 1; + ... +} +``` + +Furthermore, we may want to introduce a max pending period to fail executions that have been in `PENDING` for too long + ## 7 Potential Impact and Dependencies @@ -163,36 +203,22 @@ We could as an alternative, repurpose the existing launch plan index to include ## 8 Unresolved questions -*What parts of the proposal are still being defined or not covered by this proposal?* +- Should we always attempt to schedule pending executions in ascending order of creation time? + - Decision: We'll use FIFO scheduling by default but can extend scheduling behavior with an enum going forward. +- Should we propagate concurrency policies to child executions? + - Decision: no. Child executions can define concurrency at the child launch plan level if necessary. ## 9 Conclusion -*Here, we briefly outline why this is the right decision to make at this time and move forward!* - -## 10 RFC Process Guide, remove this section when done - -*By writing an RFC, you're giving insight to your team on the direction you're taking. There may not be a right or better decision in many cases, but we will likely learn from it. By authoring, you're making a decision on where you want us to go and are looking for feedback on this direction from your team members, but ultimately the decision is yours.* - -This document is a: - -- thinking exercise, prototype with words. -- historical record, its value may decrease over time. -- way to broadcast information. -- mechanism to build trust. -- tool to empower. -- communication channel. - -This document is not: +This is a simple and lightweight means for limiting execution concurrency that we can build upon, for flexible scheduling policies and even limiting task execution concurrency. -- a request for permission. -- the most up to date representation of any process or system **Checklist:** -- [ ] Copy template -- [ ] Draft RFC (think of it as a wireframe) -- [ ] Share as WIP with folks you trust to gut-check -- [ ] Send pull request when comfortable +- [x] Copy template +- [x] Draft RFC (think of it as a wireframe) +- [x] Share as WIP with folks you trust to gut-check +- [x] Send pull request when comfortable - [ ] Label accordingly - [ ] Assign reviewers - [ ] Merge PR From 550c571cd112a6788cd484fe50af31c06da7efec Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 4 Sep 2024 16:12:01 +0200 Subject: [PATCH 07/13] More feedback, update filename Signed-off-by: Katrina Rogan --- ...y.md => RFC-5659-execution-concurrency.md} | 83 +++++++++++++++++-- 1 file changed, 76 insertions(+), 7 deletions(-) rename rfc/system/{RFC-0000-execution-concurrency.md => RFC-5659-execution-concurrency.md} (73%) diff --git a/rfc/system/RFC-0000-execution-concurrency.md b/rfc/system/RFC-5659-execution-concurrency.md similarity index 73% rename from rfc/system/RFC-0000-execution-concurrency.md rename to rfc/system/RFC-5659-execution-concurrency.md index 86921bd463..c2785e4740 100644 --- a/rfc/system/RFC-0000-execution-concurrency.md +++ b/rfc/system/RFC-5659-execution-concurrency.md @@ -116,16 +116,54 @@ We should consider adding an index to the executions table to include - phase - created_at +##### Concurrency across launch plan versions +Executions are always tied to the versioned launch plan that triggered them (see [here](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/execution.go#L26)) +Therefore, this proposal only applies concurrency at the versioned launch plan level. +If we wanted to support concurrency across launch plan versions: -#### Prior Art -The flyteadmin native scheduler (https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) already implements a reconciliation loop to catch up on any missed schedules. +We could update usage like so -#### Caveats -Executions are always tied to the versioned launch plan that triggered them (see [here](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/execution.go#L26)) -Therefore, this proposal only applies concurrency at the versioned launch plan level. +```python +my_lp = LaunchPlan.get_or_create( + name="my_serial_lp", + workflow=my_wf, + ... + concurrency=Concurrency( + max=1, # defines how many executions with this launch plan can run in parallel + policy=ConcurrencyPolicy.WAIT # defines the policy to apply when the max concurrency is reached + precision=ConcurrencyPrecision.LAUNCH_PLAN + ) +) +``` + +and by default, when the precision is omitted the SDK could register the launch plan using `ConcurrencyPrecision.LAUNCH_PLAN_VERSION` -If we wanted to support concurrency across launch plan versions we could add another index to the Executions table to include the launch plan named entity, that is the entry in the [NamedEntity table](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/named_entity.go#L39-L42) corresponding to the launch plan project, domain & name +We could update the concurrency protobuf definition like so: +```protobuf +message Concurrency { + // Defines how many executions with this launch plan can run in parallel + uint32 max = 1; + + // Defines how to handle the execution when the max concurrency is reached. + ConcurrencyPolicy policy = 2; + + ConcurrencyLevel level = 3; +} + +enum ConcurrencyPrecision { + UNSPECIFIED = 0; + + // Applies concurrency limits across all launch plan versions. + LAUNCH_PLAN = 1; + + // Applies concurrency at the versioned launch plan level + LAUNCH_PLAN_VERSION = 2; +} +``` + + +We could add another index to the Executions table to include the launch plan named entity, that is the entry in the [NamedEntity table](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/named_entity.go#L39-L42) corresponding to the launch plan project, domain & name In models/execution.go: ```go @@ -140,10 +178,16 @@ type Execution struct { } + ``` Then the reconciliation loop would query executions in a non-terminal phase matching the launch plan named entity ID instead of LaunchPlanID. + +#### Prior Art +The flyteadmin native scheduler (https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) already implements a reconciliation loop to catch up on any missed schedules. + + ## 4 Metrics & Dashboards - Time spent in PENDING: It's useful to understand the duration spent in PENDING before a launch plan transitions to RUNNING - It may be useful for Flyte platform operators to also configure alerts if an execution stays in PENDING for too long of a threshold @@ -168,7 +212,7 @@ We could as an alternative, repurpose the existing launch plan index to include ## 6 Alternatives ### Scheduling -This proposal purposefully uses FIFO scheduling but there is a chance we may want to define other scheduling orders or catch-up policies. +This proposal purposefully uses FIFO scheduling. But this does not preclude defining other scheduling orders or catch-up policies in the future. To accomplish this, we can extend the `ConcurrenyPolicy` proto message to encapsulate scheduling behavior @@ -193,6 +237,31 @@ type ConcurrencyScheduling enum { Furthermore, we may want to introduce a max pending period to fail executions that have been in `PENDING` for too long +### Other concurrency policies: Terminate priors on execution + +What if we actually want to terminate existing executions when the concurrency limit is reached? + +In practice this could work by adding a new `ConcurrencyPolicy` enum for `RUN_IMMEDIATELY` + +And the reconciliation loop would now proceed like so + +In a separate goroutine, fetch items from the workqueue and individually process each execution entry +1. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions matching the launch plan ID in the pending execution model + ```sql + select count(launch_plan_id) from executions where phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT') group by launch_plan_id; + ``` +1. If there are fewer than `MAX_CONCURRENCY` executions running + 1. check that the execution is still in `PENDING` + 1. create the workflow CRD + 1. if the CRD already exists because we've previously processed this pending execution before we had a chance to update the DB state, swallow the already exists error gracefully + 1. conditionally mark the execution as `QUEUED` in the db if it's already in `PENDING` or `QUEUED` to not overwrite any events should the execution have already reported progress in the interim + 1. If creating the workflow CRD fails: mark the execution as `FAILED` in the db (and swallow any errors if the workflow is already in a terminal state should we have previously reported the failure after re-enqueuing the pending execution a previous loop). This will remove its eligiblity from the pending loop +1. If there are already `MAX_CONCURRENCY` executions running + 1. Retrieve n executions where n = count(actively running executions) - MAX_CONCURRENCY (ordered by creation time, ascending so we kill the oldest executions first) + 2. Kill each execution + 3. Proceed to (1) above. +1. Finally, always mark the queue item as [Done()](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L33) + ## 7 Potential Impact and Dependencies From 61debe2563c0a7af09211f182dd42c3baf742e88 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 4 Sep 2024 16:16:15 +0200 Subject: [PATCH 08/13] comment Signed-off-by: Katrina Rogan --- rfc/system/RFC-5659-execution-concurrency.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rfc/system/RFC-5659-execution-concurrency.md b/rfc/system/RFC-5659-execution-concurrency.md index c2785e4740..78d2bdb1fb 100644 --- a/rfc/system/RFC-5659-execution-concurrency.md +++ b/rfc/system/RFC-5659-execution-concurrency.md @@ -175,13 +175,15 @@ type Execution struct { LaunchPlanID uint `gorm:"index"` // New field to make querying on the named entity LaunchPlanNamedEntityID uint `gorm:"index"` + // New field to make querying on concurrency policy by the reconciliation loop easier + ConcurrencyLevel uint32 } ``` -Then the reconciliation loop would query executions in a non-terminal phase matching the launch plan named entity ID instead of LaunchPlanID. +Then the reconciliation loop would query executions in a non-terminal phase matching the launch plan named entity ID instead of LaunchPlanID based on the ConcurrencyLevel. #### Prior Art From da704b4cda292b99532f44a99ec148b8c3b208dc Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 4 Sep 2024 16:22:34 +0200 Subject: [PATCH 09/13] details Signed-off-by: Katrina Rogan --- rfc/system/RFC-5659-execution-concurrency.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/rfc/system/RFC-5659-execution-concurrency.md b/rfc/system/RFC-5659-execution-concurrency.md index 78d2bdb1fb..777f2d1571 100644 --- a/rfc/system/RFC-5659-execution-concurrency.md +++ b/rfc/system/RFC-5659-execution-concurrency.md @@ -151,7 +151,7 @@ message Concurrency { ConcurrencyLevel level = 3; } -enum ConcurrencyPrecision { +enum ConcurrencyLevel { UNSPECIFIED = 0; // Applies concurrency limits across all launch plan versions. @@ -185,6 +185,16 @@ type Execution struct { Then the reconciliation loop would query executions in a non-terminal phase matching the launch plan named entity ID instead of LaunchPlanID based on the ConcurrencyLevel. +```sql +SELECT e.* +FROM executions AS e +WHERE ( launch_plan_named_entity_id, created_at ) IN (SELECT launch_plan_named_entity_id, + Min(created_at) + FROM executions + WHERE phase = 'PENDING' AND concurrency_level = 2; + GROUP BY launch_plan_named_entity_id); +``` + #### Prior Art The flyteadmin native scheduler (https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) already implements a reconciliation loop to catch up on any missed schedules. From cce3e692a427c1c6b0bd340e35a85dc4e8397bc4 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 20 Nov 2024 16:48:34 +0100 Subject: [PATCH 10/13] comments Signed-off-by: Katrina Rogan --- rfc/system/RFC-5659-execution-concurrency.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rfc/system/RFC-5659-execution-concurrency.md b/rfc/system/RFC-5659-execution-concurrency.md index 777f2d1571..7e72064d7e 100644 --- a/rfc/system/RFC-5659-execution-concurrency.md +++ b/rfc/system/RFC-5659-execution-concurrency.md @@ -54,6 +54,9 @@ enum ConcurrencyPolicy { // fail the CreateExecution request and do not permit the execution to start ABORT = 2; + + // terminate the oldest execution when the concurrency limit is reached and immediately begin proceeding with the new execution + REPLACE = 3; } message LaunchPlanSpec { @@ -113,7 +116,7 @@ If we wanted further parallelization here, we could introduce a worker pool rath We should consider adding an index to the executions table to include - launch_plan_id -- phase +- phase==PENDING only (in order to safeguard for well-populated flyteadmin instances with lots of completed, historical executions) - created_at ##### Concurrency across launch plan versions @@ -195,6 +198,7 @@ WHERE ( launch_plan_named_entity_id, created_at ) IN (SELECT launch_plan_named_ GROUP BY launch_plan_named_entity_id); ``` +Note, in this proposal, registering a new version of the launch plan and setting it to active will determine the concurrency policy across all launch plan versions. #### Prior Art The flyteadmin native scheduler (https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) already implements a reconciliation loop to catch up on any missed schedules. From dd584066d4b0dd482c6572ae159dd7c80cd23412 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Mon, 25 Nov 2024 13:58:35 +0100 Subject: [PATCH 11/13] Discussion comments Signed-off-by: Katrina Rogan --- rfc/system/RFC-5659-execution-concurrency.md | 168 ++++++++----------- 1 file changed, 66 insertions(+), 102 deletions(-) diff --git a/rfc/system/RFC-5659-execution-concurrency.md b/rfc/system/RFC-5659-execution-concurrency.md index 7e72064d7e..e2c73b3efe 100644 --- a/rfc/system/RFC-5659-execution-concurrency.md +++ b/rfc/system/RFC-5659-execution-concurrency.md @@ -35,7 +35,7 @@ my_lp = LaunchPlan.get_or_create( ``` ### FlyteIDL -We propose adding a new IDL message to capture concurrency behavior at CreateExecutionTime +We propose adding a new IDL message to capture concurrency behavior at CreateExecutionTime and embedding it in the existing [Schedule](https://github.com/flyteorg/flyte/blob/master/flyteidl/protos/flyteidl/admin/schedule.proto) message ```protobuf message Concurrency { @@ -59,7 +59,7 @@ enum ConcurrencyPolicy { REPLACE = 3; } -message LaunchPlanSpec { +message Schedule { ... Concurrency concurrency = X; @@ -79,51 +79,75 @@ message ExecutionStateChangeDetails { ``` -### FlyteAdmin -At a broad level +### Concurrency Controller Singleton +At a broad level, we'll follow the precedent of the [scheduler](https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) defined in FlyteAdmin and define a singleton to manage concurrency across all launch plans. + 1. At CreateExecution time, if the launch plan in the ExecutionSpec has a concurrency policy 1. Create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails`. 1. or fail the request when the concurrency policy is set to `ABORT` 1. Do not create the workflow CRD -Introduce an async reconciliation loop in FlyteAdmin to poll for all pending executions: -1. 1x a minute: Query all pending executions by timestamp ascending, grouped by launch plan ID, roughly something like -```sql -SELECT e.* -FROM executions AS e -WHERE ( launch_plan_id, created_at ) IN (SELECT launch_plan_id, - Min(created_at) - FROM executions - WHERE phase = 'PENDING' - GROUP BY launch_plan_id); -``` -2. For each execution returned by the above query, `Add()` the pending execution to a [rate limiting workqueue](https://github.com/kubernetes/client-go/blob/master/util/workqueue/rate_limiting_queue.go#L27-L40) (as a suggestion) -3. In a separate goroutine, fetch items from the workqueue and individually process each execution entry - 1. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions matching the launch plan ID in the pending execution model - ```sql - select count(launch_plan_id) from executions where phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT') group by launch_plan_id; - ``` - 1. If there are fewer than `MAX_CONCURRENCY` executions running - 1. check that the execution is still in `PENDING` - 1. create the workflow CRD - 1. if the CRD already exists because we've previously processed this pending execution before we had a chance to update the DB state, swallow the already exists error gracefully - 1. conditionally mark the execution as `QUEUED` in the db if it's already in `PENDING` or `QUEUED` to not overwrite any events should the execution have already reported progress in the interim - 1. If creating the workflow CRD fails: mark the execution as `FAILED` in the db (and swallow any errors if the workflow is already in a terminal state should we have previously reported the failure after re-enqueuing the pending execution a previous loop). This will remove its eligiblity from the pending loop - 1. If there are already `MAX_CONCURRENCY` executions running, simply proceed to (iii.) - 1. Finally, always mark the queue item as [Done()](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L33) - -If we wanted further parallelization here, we could introduce a worker pool rather than having one async process read from the workqueue. +Introduce the Concurrency Controller to poll for all pending executions: +1. Upon start-up, initialize a launch plan informer and a worker pool and spawn N number of worker threads. + 1. The launch plan informer will be responsible for keeping a map of launch plans, by [NamedEntityIdentifier](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteidl/protos/flyteidl/admin/common.proto) (that is across versions) and their concurrency policy: `map[admin.NamedEntityIdentifier]admin.Schedule` +1. Periodically query the DB for pending executions `SELECT * FROM executions WHERE phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT') group by launch_plan_id;` + 1. For each `PENDING` execution returned by the above query, `Add()` the pending execution to a [workqueue](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go). We can fine tune in the future to include differentiated priority. + 1. For each non-`PENDING` execution returned by the above query, update the map of active executions by launch plan named entity into a thread-safe Map of type `rawActiveLaunchPlanExecutions map[admin.NamedEntityIdentifier]util.Set[admin.Execution]` (e.g. using this [set]("k8s.io/apimachinery/pkg/util/sets") library) +1. After processing the complete set of non-terminal executions, transform the `rawActiveLaunchPlanExecutions` map into a thread-safe, ordered list of executions by creation time: `activeLaunchPlanExecutions map[admin.NamedEntityIdentifier][]*core.WorkflowExecutionIdentifier` using an implementation where different keys can be accessed concurrently. +1. For each worker in the workqueue: + 1. Check a separate in-memory map populated launch plan informer to see: + 1. If the launch plan no longer has a concurrency policy, proceed to create the execution, see below + 1. If the launch plan has an active concurrency policy and max executions has been reached: proceed to respect the concurrency policy: + 1. `WAIT`: do nothing + 1. `ABORT`: mark the execution as `FAILED` in the db with a sensible error explaining the concurrency policy was violated + 1. `REPLACE`: terminate the oldest execution for the execution's launch plan in `activeLaunchPlanExecutions`. If this succeeds, or it's already terminated, then proceed to create the new execution: see below + 1. If the launch plan has an active concurrency policy and max executions have not been reached: + 1. Proceed to create the execution, see below + 1. Finally, always mark the queue item as [Done()](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L33) + +Creating an execution +1. create the workflow CRD + 1. if the CRD already exists because we've previously processed this pending execution before we had a chance to update the DB state, swallow the already exists error gracefully +1. conditionally mark the execution as `QUEUED` in the db if it's already in `PENDING` or `QUEUED` to not overwrite any events should the execution have already reported progress in the interim +1. If creating the workflow CRD fails: mark the execution as `FAILED` in the db (and swallow any errors if the workflow is already in a terminal state should we have previously reported the failure after re-enqueuing the pending execution a previous loop). This will remove its eligiblity from the pending loop +2. Upon successful creation of the workflow CRD, append the execution identifier to `activeLaunchPlanExecutions` for the launch plan named entity + +#### Launch Plan informer +This is an async process we run in the Concurrency Controller to ensure we have an eventually consistent view of launch plans. + +Upon Concurrency Controller start-up, we'll query the DB for all active launch plans and populate a map of active launch plans: `map[admin.NamedEntityIdentifier]admin.Schedule + +Periodically, the informer will re-issue the query, optionally filtering by [UpdatedAt](https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/repositories/models/base.go#L7) to only fetch launch plans that have been updated since the last query to repopulate the map. + + +### Flyte Admin changes +### Execution Manager +Because we fetch the launch plan to reconcile execution inputs at CreateExecution time, we'll have the concurrency policy available to us at the time of execution creation. +If there is no concurrency policy defined, we'll proceed as [normal](https://github.com/flyteorg/flyte/blob/f14348165ccdfb26f8509c0f1ef380a360e59c4d/flyteadmin/pkg/manager/impl/execution_manager.go#L1169-L1173) and create the workflow execution CRD and then create a database entry for the execution with phase `UNKNOWN`. This way, we don't incur any penalty for executions + +If there is a concurrency policy defined, we'll create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails` _but will not create a workflow CRD_ + + +#### Database +For performance, we can introduce new fields to denormalize the launch plan named entity the execution was launched by +In [models/execution.go](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteadmin/pkg/repositories/models/execution.go) +```go +model Execution { + ... + LaunchPlanProject string + LaunchPlanDomain string + LaunchPlanName string +} +```` We should consider adding an index to the executions table to include -- launch_plan_id -- phase==PENDING only (in order to safeguard for well-populated flyteadmin instances with lots of completed, historical executions) -- created_at +- phase in (`PENDING`, `QUEUED`, `RUNNING`) only (in order to safeguard for well-populated flyteadmin instances with lots of completed, historical executions) -##### Concurrency across launch plan versions +##### Concurrency by specified launch plan versions Executions are always tied to the versioned launch plan that triggered them (see [here](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/execution.go#L26)) -Therefore, this proposal only applies concurrency at the versioned launch plan level. +Therefore, this proposal only applies concurrency at the launch plan Named Entity level, that is across (project, domain, version). -If we wanted to support concurrency across launch plan versions: +If we wanted to support concurrency by launch plan versions, we'd introduce `LaunchPlanVersion` to the execution model and update the keys for the in memory maps to be by versioned launch plan rather than NamedEntityIdentifier. We could update usage like so @@ -155,49 +179,14 @@ message Concurrency { } enum ConcurrencyLevel { - UNSPECIFIED = 0; - // Applies concurrency limits across all launch plan versions. - LAUNCH_PLAN = 1; + LAUNCH_PLAN = 0; // Applies concurrency at the versioned launch plan level - LAUNCH_PLAN_VERSION = 2; + LAUNCH_PLAN_VERSION = 1; } ``` - -We could add another index to the Executions table to include the launch plan named entity, that is the entry in the [NamedEntity table](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/named_entity.go#L39-L42) corresponding to the launch plan project, domain & name - -In models/execution.go: -```go - -type Execution struct { - - ... - // Already exists - LaunchPlanID uint `gorm:"index"` - // New field to make querying on the named entity - LaunchPlanNamedEntityID uint `gorm:"index"` - // New field to make querying on concurrency policy by the reconciliation loop easier - ConcurrencyLevel uint32 - -} - - -``` - -Then the reconciliation loop would query executions in a non-terminal phase matching the launch plan named entity ID instead of LaunchPlanID based on the ConcurrencyLevel. - -```sql -SELECT e.* -FROM executions AS e -WHERE ( launch_plan_named_entity_id, created_at ) IN (SELECT launch_plan_named_entity_id, - Min(created_at) - FROM executions - WHERE phase = 'PENDING' AND concurrency_level = 2; - GROUP BY launch_plan_named_entity_id); -``` - Note, in this proposal, registering a new version of the launch plan and setting it to active will determine the concurrency policy across all launch plan versions. #### Prior Art @@ -222,13 +211,11 @@ already has indices on - state Database performance suffers as new indices are added (ref [[1](https://use-the-index-luke.com/sql/dml/insert)] [[2](https://www.timescale.com/learn/postgresql-performance-tuning-optimizing-database-indexes)]) -We could as an alternative, repurpose the existing launch plan index to include (launch plan id, phase, created at) to optimize the query for pending executions and not significantly affect queries on launch plan id leveraging the existing index. - ## 6 Alternatives ### Scheduling -This proposal purposefully uses FIFO scheduling. But this does not preclude defining other scheduling orders or catch-up policies in the future. +This proposal purposefully uses random scheduling. But this does not preclude defining other scheduling orders or catch-up policies in the future. To accomplish this, we can extend the `ConcurrenyPolicy` proto message to encapsulate scheduling behavior @@ -251,32 +238,9 @@ type ConcurrencyScheduling enum { } ``` -Furthermore, we may want to introduce a max pending period to fail executions that have been in `PENDING` for too long +When we process the pending executions in the Concurrency Controller, we can sort the pending executions by creation time in ascending or descending order based on the scheduling policy. -### Other concurrency policies: Terminate priors on execution - -What if we actually want to terminate existing executions when the concurrency limit is reached? - -In practice this could work by adding a new `ConcurrencyPolicy` enum for `RUN_IMMEDIATELY` - -And the reconciliation loop would now proceed like so - -In a separate goroutine, fetch items from the workqueue and individually process each execution entry -1. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions matching the launch plan ID in the pending execution model - ```sql - select count(launch_plan_id) from executions where phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT') group by launch_plan_id; - ``` -1. If there are fewer than `MAX_CONCURRENCY` executions running - 1. check that the execution is still in `PENDING` - 1. create the workflow CRD - 1. if the CRD already exists because we've previously processed this pending execution before we had a chance to update the DB state, swallow the already exists error gracefully - 1. conditionally mark the execution as `QUEUED` in the db if it's already in `PENDING` or `QUEUED` to not overwrite any events should the execution have already reported progress in the interim - 1. If creating the workflow CRD fails: mark the execution as `FAILED` in the db (and swallow any errors if the workflow is already in a terminal state should we have previously reported the failure after re-enqueuing the pending execution a previous loop). This will remove its eligiblity from the pending loop -1. If there are already `MAX_CONCURRENCY` executions running - 1. Retrieve n executions where n = count(actively running executions) - MAX_CONCURRENCY (ordered by creation time, ascending so we kill the oldest executions first) - 2. Kill each execution - 3. Proceed to (1) above. -1. Finally, always mark the queue item as [Done()](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L33) +Furthermore, we may want to introduce a max pending period to fail executions that have been in `PENDING` for too long ## 7 Potential Impact and Dependencies From fea29eb42e33849d0cccc2864ee05954a70278f9 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Mon, 25 Nov 2024 14:08:17 +0100 Subject: [PATCH 12/13] more discussion comments Signed-off-by: Katrina Rogan --- rfc/system/RFC-5659-execution-concurrency.md | 29 +++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/rfc/system/RFC-5659-execution-concurrency.md b/rfc/system/RFC-5659-execution-concurrency.md index e2c73b3efe..88de2da75b 100644 --- a/rfc/system/RFC-5659-execution-concurrency.md +++ b/rfc/system/RFC-5659-execution-concurrency.md @@ -38,7 +38,7 @@ my_lp = LaunchPlan.get_or_create( We propose adding a new IDL message to capture concurrency behavior at CreateExecutionTime and embedding it in the existing [Schedule](https://github.com/flyteorg/flyte/blob/master/flyteidl/protos/flyteidl/admin/schedule.proto) message ```protobuf -message Concurrency { +message SchedulerPolicy { // Defines how many executions with this launch plan can run in parallel uint32 max = 1; @@ -62,7 +62,7 @@ enum ConcurrencyPolicy { message Schedule { ... - Concurrency concurrency = X; + SchedulerPolicy scheduler_policy = X; } // embedded in the ExecutionClosure @@ -78,19 +78,22 @@ message ExecutionStateChangeDetails { // Can also add to ExecutionSpec to specify execution time overrides ``` +### Control Plane -### Concurrency Controller Singleton At a broad level, we'll follow the precedent of the [scheduler](https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) defined in FlyteAdmin and define a singleton to manage concurrency across all launch plans. 1. At CreateExecution time, if the launch plan in the ExecutionSpec has a concurrency policy 1. Create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails`. 1. or fail the request when the concurrency policy is set to `ABORT` + 2. let the concurrency controller manage scheduling 1. Do not create the workflow CRD +### Concurrency Controller Singleton + Introduce the Concurrency Controller to poll for all pending executions: 1. Upon start-up, initialize a launch plan informer and a worker pool and spawn N number of worker threads. - 1. The launch plan informer will be responsible for keeping a map of launch plans, by [NamedEntityIdentifier](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteidl/protos/flyteidl/admin/common.proto) (that is across versions) and their concurrency policy: `map[admin.NamedEntityIdentifier]admin.Schedule` -1. Periodically query the DB for pending executions `SELECT * FROM executions WHERE phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT') group by launch_plan_id;` + 1. The launch plan informer will be responsible for keeping a map of launch plans, by [NamedEntityIdentifier](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteidl/protos/flyteidl/admin/common.proto) (that is across versions) and their concurrency policy: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy` +1. Periodically query the DB for pending executions `SELECT * FROM executions WHERE phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT');` 1. For each `PENDING` execution returned by the above query, `Add()` the pending execution to a [workqueue](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go). We can fine tune in the future to include differentiated priority. 1. For each non-`PENDING` execution returned by the above query, update the map of active executions by launch plan named entity into a thread-safe Map of type `rawActiveLaunchPlanExecutions map[admin.NamedEntityIdentifier]util.Set[admin.Execution]` (e.g. using this [set]("k8s.io/apimachinery/pkg/util/sets") library) 1. After processing the complete set of non-terminal executions, transform the `rawActiveLaunchPlanExecutions` map into a thread-safe, ordered list of executions by creation time: `activeLaunchPlanExecutions map[admin.NamedEntityIdentifier][]*core.WorkflowExecutionIdentifier` using an implementation where different keys can be accessed concurrently. @@ -115,9 +118,9 @@ Creating an execution #### Launch Plan informer This is an async process we run in the Concurrency Controller to ensure we have an eventually consistent view of launch plans. -Upon Concurrency Controller start-up, we'll query the DB for all active launch plans and populate a map of active launch plans: `map[admin.NamedEntityIdentifier]admin.Schedule +Upon Concurrency Controller start-up, we'll query the DB for all active launch plans and populate a map of active launch plans: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy` -Periodically, the informer will re-issue the query, optionally filtering by [UpdatedAt](https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/repositories/models/base.go#L7) to only fetch launch plans that have been updated since the last query to repopulate the map. +Periodically, the informer will re-issue the query, optionally filtering by [UpdatedAt](https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/repositories/models/base.go#L7) to only fetch launch plans that have been updated since the last query to repopulate the map. If an execution has terminated since the last time the query ran, it won't be in the result set and we'll want to update the in memory map to remove the execution. ### Flyte Admin changes @@ -125,7 +128,7 @@ Periodically, the informer will re-issue the query, optionally filtering by [Upd Because we fetch the launch plan to reconcile execution inputs at CreateExecution time, we'll have the concurrency policy available to us at the time of execution creation. If there is no concurrency policy defined, we'll proceed as [normal](https://github.com/flyteorg/flyte/blob/f14348165ccdfb26f8509c0f1ef380a360e59c4d/flyteadmin/pkg/manager/impl/execution_manager.go#L1169-L1173) and create the workflow execution CRD and then create a database entry for the execution with phase `UNKNOWN`. This way, we don't incur any penalty for executions -If there is a concurrency policy defined, we'll create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails` _but will not create a workflow CRD_ +If there is a concurrency policy defined, if it's set to `ABORT` immediately fail the execution. Otherwise, we'll create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails` _but will not create a workflow CRD_ #### Database @@ -145,9 +148,9 @@ We should consider adding an index to the executions table to include ##### Concurrency by specified launch plan versions Executions are always tied to the versioned launch plan that triggered them (see [here](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/execution.go#L26)) -Therefore, this proposal only applies concurrency at the launch plan Named Entity level, that is across (project, domain, version). +However, this proposal only applies concurrency at the launch plan Named Entity level, that is by (project, domain, name) and across all versions. The currently active launch plan version will determine the concurrency policy that gets applied for all executions created with the launch plan NamedEntity. -If we wanted to support concurrency by launch plan versions, we'd introduce `LaunchPlanVersion` to the execution model and update the keys for the in memory maps to be by versioned launch plan rather than NamedEntityIdentifier. +If we wanted to support concurrency by launch plan versions, we'd introduce `LaunchPlanVersion` to the execution model and add duplicates but with update keys for the in memory maps to be by versioned launch plan rather than NamedEntityIdentifier. We could update usage like so @@ -159,16 +162,16 @@ my_lp = LaunchPlan.get_or_create( concurrency=Concurrency( max=1, # defines how many executions with this launch plan can run in parallel policy=ConcurrencyPolicy.WAIT # defines the policy to apply when the max concurrency is reached - precision=ConcurrencyPrecision.LAUNCH_PLAN + precision=ConcurrencyPrecision.LAUNCH_PLAN_VERSION ) ) ``` -and by default, when the precision is omitted the SDK could register the launch plan using `ConcurrencyPrecision.LAUNCH_PLAN_VERSION` +and by default, when the precision is omitted the SDK could register the launch plan using `ConcurrencyPrecision.LAUNCH_PLAN` We could update the concurrency protobuf definition like so: ```protobuf -message Concurrency { +message SchedulerPolicy { // Defines how many executions with this launch plan can run in parallel uint32 max = 1; From c2f9fe356915b463634ae33d536737965242fcb5 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Tue, 21 Jan 2025 10:58:26 +0100 Subject: [PATCH 13/13] updates Signed-off-by: Katrina Rogan --- rfc/system/RFC-5659-execution-concurrency.md | 22 ++++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/rfc/system/RFC-5659-execution-concurrency.md b/rfc/system/RFC-5659-execution-concurrency.md index 88de2da75b..32832169c2 100644 --- a/rfc/system/RFC-5659-execution-concurrency.md +++ b/rfc/system/RFC-5659-execution-concurrency.md @@ -7,7 +7,7 @@ ## 1 Executive Summary -This is a proposal to implement workflow execution concurrency, defined at the launch plan level. +This is a proposal to implement workflow execution concurrency, defined at the launch plan level. Concurrency applies to all version of a launch plan. ## 2 Motivation @@ -82,7 +82,7 @@ message ExecutionStateChangeDetails { At a broad level, we'll follow the precedent of the [scheduler](https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) defined in FlyteAdmin and define a singleton to manage concurrency across all launch plans. -1. At CreateExecution time, if the launch plan in the ExecutionSpec has a concurrency policy +1. At CreateExecution time, if the active version of the launch plan in the ExecutionSpec has a concurrency policy 1. Create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails`. 1. or fail the request when the concurrency policy is set to `ABORT` 2. let the concurrency controller manage scheduling @@ -92,13 +92,13 @@ At a broad level, we'll follow the precedent of the [scheduler](https://github.c Introduce the Concurrency Controller to poll for all pending executions: 1. Upon start-up, initialize a launch plan informer and a worker pool and spawn N number of worker threads. - 1. The launch plan informer will be responsible for keeping a map of launch plans, by [NamedEntityIdentifier](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteidl/protos/flyteidl/admin/common.proto) (that is across versions) and their concurrency policy: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy` + 1. The launch plan informer will be responsible for keeping a map of launch plans, by [NamedEntityIdentifier](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteidl/protos/flyteidl/admin/common.proto) (for each active launch plan version) and their concurrency policy: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy` 1. Periodically query the DB for pending executions `SELECT * FROM executions WHERE phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT');` 1. For each `PENDING` execution returned by the above query, `Add()` the pending execution to a [workqueue](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go). We can fine tune in the future to include differentiated priority. - 1. For each non-`PENDING` execution returned by the above query, update the map of active executions by launch plan named entity into a thread-safe Map of type `rawActiveLaunchPlanExecutions map[admin.NamedEntityIdentifier]util.Set[admin.Execution]` (e.g. using this [set]("k8s.io/apimachinery/pkg/util/sets") library) + 1. For each non-`PENDING` execution returned by the above query, update the map value for the specific launch plan named entity using a thread-safe Map of type `rawActiveLaunchPlanExecutions map[admin.NamedEntityIdentifier]util.Set[admin.Execution]` (e.g. using this [set]("k8s.io/apimachinery/pkg/util/sets") library) 1. After processing the complete set of non-terminal executions, transform the `rawActiveLaunchPlanExecutions` map into a thread-safe, ordered list of executions by creation time: `activeLaunchPlanExecutions map[admin.NamedEntityIdentifier][]*core.WorkflowExecutionIdentifier` using an implementation where different keys can be accessed concurrently. 1. For each worker in the workqueue: - 1. Check a separate in-memory map populated launch plan informer to see: + 1. Check the in-memory map populated launch plan informer to see: 1. If the launch plan no longer has a concurrency policy, proceed to create the execution, see below 1. If the launch plan has an active concurrency policy and max executions has been reached: proceed to respect the concurrency policy: 1. `WAIT`: do nothing @@ -112,15 +112,19 @@ Creating an execution 1. create the workflow CRD 1. if the CRD already exists because we've previously processed this pending execution before we had a chance to update the DB state, swallow the already exists error gracefully 1. conditionally mark the execution as `QUEUED` in the db if it's already in `PENDING` or `QUEUED` to not overwrite any events should the execution have already reported progress in the interim -1. If creating the workflow CRD fails: mark the execution as `FAILED` in the db (and swallow any errors if the workflow is already in a terminal state should we have previously reported the failure after re-enqueuing the pending execution a previous loop). This will remove its eligiblity from the pending loop -2. Upon successful creation of the workflow CRD, append the execution identifier to `activeLaunchPlanExecutions` for the launch plan named entity + 2. conditionally meaning use compare-and-swap semantics within a transaction +1. If creating the workflow CRD fails + 2. Use some form of retries (perhaps with back-off) to create the CRD + 3. If the CRD creation still fails, mark the execution as `FAILED` in the db if it's currently in `PENDING` or `QUEUED`. This will remove its eligiblity from the pending loop + 4. If the CRD creation has failed, but the execution has moved beyond 'QUEUED' and has reported progress from flytepropeller in the interim (due to a network partition, such that the CRD was created but not successfully reported as such) - do not update the execution status in the DB and allow execution to progress to a terminal state +2. Upon successful creation of the workflow CRD **or** failure in step (iv) above to mark the execution as 'FAILED', append the execution identifier to `activeLaunchPlanExecutions` for the launch plan named entity #### Launch Plan informer This is an async process we run in the Concurrency Controller to ensure we have an eventually consistent view of launch plans. Upon Concurrency Controller start-up, we'll query the DB for all active launch plans and populate a map of active launch plans: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy` -Periodically, the informer will re-issue the query, optionally filtering by [UpdatedAt](https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/repositories/models/base.go#L7) to only fetch launch plans that have been updated since the last query to repopulate the map. If an execution has terminated since the last time the query ran, it won't be in the result set and we'll want to update the in memory map to remove the execution. +Periodically, the informer will re-issue the query, optionally filtering by [UpdatedAt](https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/repositories/models/base.go#L7) to only fetch launch plans that have been updated since the last query to repopulate the map. If all versions of a launch plan have been deactivated since the last time the query ran, we'll want to update the in memory map to empty out the launch plan scheduler policy. ### Flyte Admin changes @@ -150,7 +154,7 @@ We should consider adding an index to the executions table to include Executions are always tied to the versioned launch plan that triggered them (see [here](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/execution.go#L26)) However, this proposal only applies concurrency at the launch plan Named Entity level, that is by (project, domain, name) and across all versions. The currently active launch plan version will determine the concurrency policy that gets applied for all executions created with the launch plan NamedEntity. -If we wanted to support concurrency by launch plan versions, we'd introduce `LaunchPlanVersion` to the execution model and add duplicates but with update keys for the in memory maps to be by versioned launch plan rather than NamedEntityIdentifier. +Non-goal, but future proposal: If we wanted to support concurrency by launch plan versions, we'd introduce `LaunchPlanVersion` to the execution model and add duplicates but with update keys for the in memory maps to be by versioned launch plan rather than NamedEntityIdentifier. We could update usage like so