diff --git a/rfc/system/RFC-5659-execution-concurrency.md b/rfc/system/RFC-5659-execution-concurrency.md index 88de2da75b..32832169c2 100644 --- a/rfc/system/RFC-5659-execution-concurrency.md +++ b/rfc/system/RFC-5659-execution-concurrency.md @@ -7,7 +7,7 @@ ## 1 Executive Summary -This is a proposal to implement workflow execution concurrency, defined at the launch plan level. +This is a proposal to implement workflow execution concurrency, defined at the launch plan level. Concurrency applies to all version of a launch plan. ## 2 Motivation @@ -82,7 +82,7 @@ message ExecutionStateChangeDetails { At a broad level, we'll follow the precedent of the [scheduler](https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) defined in FlyteAdmin and define a singleton to manage concurrency across all launch plans. -1. At CreateExecution time, if the launch plan in the ExecutionSpec has a concurrency policy +1. At CreateExecution time, if the active version of the launch plan in the ExecutionSpec has a concurrency policy 1. Create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails`. 1. or fail the request when the concurrency policy is set to `ABORT` 2. let the concurrency controller manage scheduling @@ -92,13 +92,13 @@ At a broad level, we'll follow the precedent of the [scheduler](https://github.c Introduce the Concurrency Controller to poll for all pending executions: 1. Upon start-up, initialize a launch plan informer and a worker pool and spawn N number of worker threads. - 1. The launch plan informer will be responsible for keeping a map of launch plans, by [NamedEntityIdentifier](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteidl/protos/flyteidl/admin/common.proto) (that is across versions) and their concurrency policy: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy` + 1. The launch plan informer will be responsible for keeping a map of launch plans, by [NamedEntityIdentifier](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteidl/protos/flyteidl/admin/common.proto) (for each active launch plan version) and their concurrency policy: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy` 1. Periodically query the DB for pending executions `SELECT * FROM executions WHERE phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT');` 1. For each `PENDING` execution returned by the above query, `Add()` the pending execution to a [workqueue](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go). We can fine tune in the future to include differentiated priority. - 1. For each non-`PENDING` execution returned by the above query, update the map of active executions by launch plan named entity into a thread-safe Map of type `rawActiveLaunchPlanExecutions map[admin.NamedEntityIdentifier]util.Set[admin.Execution]` (e.g. using this [set]("k8s.io/apimachinery/pkg/util/sets") library) + 1. For each non-`PENDING` execution returned by the above query, update the map value for the specific launch plan named entity using a thread-safe Map of type `rawActiveLaunchPlanExecutions map[admin.NamedEntityIdentifier]util.Set[admin.Execution]` (e.g. using this [set]("k8s.io/apimachinery/pkg/util/sets") library) 1. After processing the complete set of non-terminal executions, transform the `rawActiveLaunchPlanExecutions` map into a thread-safe, ordered list of executions by creation time: `activeLaunchPlanExecutions map[admin.NamedEntityIdentifier][]*core.WorkflowExecutionIdentifier` using an implementation where different keys can be accessed concurrently. 1. For each worker in the workqueue: - 1. Check a separate in-memory map populated launch plan informer to see: + 1. Check the in-memory map populated launch plan informer to see: 1. If the launch plan no longer has a concurrency policy, proceed to create the execution, see below 1. If the launch plan has an active concurrency policy and max executions has been reached: proceed to respect the concurrency policy: 1. `WAIT`: do nothing @@ -112,15 +112,19 @@ Creating an execution 1. create the workflow CRD 1. if the CRD already exists because we've previously processed this pending execution before we had a chance to update the DB state, swallow the already exists error gracefully 1. conditionally mark the execution as `QUEUED` in the db if it's already in `PENDING` or `QUEUED` to not overwrite any events should the execution have already reported progress in the interim -1. If creating the workflow CRD fails: mark the execution as `FAILED` in the db (and swallow any errors if the workflow is already in a terminal state should we have previously reported the failure after re-enqueuing the pending execution a previous loop). This will remove its eligiblity from the pending loop -2. Upon successful creation of the workflow CRD, append the execution identifier to `activeLaunchPlanExecutions` for the launch plan named entity + 2. conditionally meaning use compare-and-swap semantics within a transaction +1. If creating the workflow CRD fails + 2. Use some form of retries (perhaps with back-off) to create the CRD + 3. If the CRD creation still fails, mark the execution as `FAILED` in the db if it's currently in `PENDING` or `QUEUED`. This will remove its eligiblity from the pending loop + 4. If the CRD creation has failed, but the execution has moved beyond 'QUEUED' and has reported progress from flytepropeller in the interim (due to a network partition, such that the CRD was created but not successfully reported as such) - do not update the execution status in the DB and allow execution to progress to a terminal state +2. Upon successful creation of the workflow CRD **or** failure in step (iv) above to mark the execution as 'FAILED', append the execution identifier to `activeLaunchPlanExecutions` for the launch plan named entity #### Launch Plan informer This is an async process we run in the Concurrency Controller to ensure we have an eventually consistent view of launch plans. Upon Concurrency Controller start-up, we'll query the DB for all active launch plans and populate a map of active launch plans: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy` -Periodically, the informer will re-issue the query, optionally filtering by [UpdatedAt](https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/repositories/models/base.go#L7) to only fetch launch plans that have been updated since the last query to repopulate the map. If an execution has terminated since the last time the query ran, it won't be in the result set and we'll want to update the in memory map to remove the execution. +Periodically, the informer will re-issue the query, optionally filtering by [UpdatedAt](https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/repositories/models/base.go#L7) to only fetch launch plans that have been updated since the last query to repopulate the map. If all versions of a launch plan have been deactivated since the last time the query ran, we'll want to update the in memory map to empty out the launch plan scheduler policy. ### Flyte Admin changes @@ -150,7 +154,7 @@ We should consider adding an index to the executions table to include Executions are always tied to the versioned launch plan that triggered them (see [here](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/execution.go#L26)) However, this proposal only applies concurrency at the launch plan Named Entity level, that is by (project, domain, name) and across all versions. The currently active launch plan version will determine the concurrency policy that gets applied for all executions created with the launch plan NamedEntity. -If we wanted to support concurrency by launch plan versions, we'd introduce `LaunchPlanVersion` to the execution model and add duplicates but with update keys for the in memory maps to be by versioned launch plan rather than NamedEntityIdentifier. +Non-goal, but future proposal: If we wanted to support concurrency by launch plan versions, we'd introduce `LaunchPlanVersion` to the execution model and add duplicates but with update keys for the in memory maps to be by versioned launch plan rather than NamedEntityIdentifier. We could update usage like so