Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Add execution concurrency #5659

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
6 changes: 5 additions & 1 deletion rfc/system/RFC-5659-execution-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ enum ConcurrencyPolicy {
// fail the CreateExecution request and do not permit the execution to start
ABORT = 2;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For abort, since this is part of the policy in the scheduler, how does it address the issue with other workflows that does not go through shceduler ?
such as #5125
where users can create workflows without schedule and still create lot of workflows

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't go through the native scheduler but rather is managed by a separate async routine that inspects all pending executions

Copy link

@ZhouGongZiBBS ZhouGongZiBBS Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this help with the native scheduler ? (in the case where the cron schedule is frequent, before the next run starts, the previous run has not finished, and we want to ensure only 1 execution there.)

from my understanding, we can specify ABORT or WAIT:

  • If we specify ABORT for a LP schedule, the desired effect we wanted to achieve is period = max(schedule, run_time), while in this case, there will be some executions being aborted leaving "space" between executions (they are not connected head to tail): imagine a 5 mins periodic schedule, while each run take ~6 mins. When the second run starts, it sees the previous run is still running, so it is ABORTED, then flyte would wait for the third run to start, which takes a bit more time than desired. I.e. not exactly by the time the previous execution finishes.
  • If we specify WAIT for the schedule, the cron job will all be PENDING state in the DB. In this case, we can ensure the 1 execution at a time exactly, however, the DB will grow larger as time goes. (we are pushing executions into the DB instead of holding them back/not starting them)

(my first comment) Should we have sth in the native/original scheduler, so it has the knowledge when the previous execution finished, before starting the next execution
(so it waits a bit even when it is the time for next execution, with max=1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think separation of concerns may be helpful here to simplify the implementation, wdyt? The schedule is only responsible for managing create execution requests at a periodic interval, the concurrency controller will handle when to actually create execution objects for all pending executions.

Abort scenario: I would argue that if a workflow execution takes 6 minutes to execute, and the schedule interval is every 5 minutes then this represents an incompatibility in the schedule, it's basically destined to keep terminating executions before they complete. Maybe a future investment would be a workflow triggering itself as a final step before completing, if you wanted to daisy chain executions.

Wait scenario: I would also argue this is misconfigured and the schedule interval should be increased to allow the average duration job to complete. In any case the execution load remains the same, we're just delaying the processing with the queue build-up like you mentioned.

// terminate the oldest execution when the concurrency limit is reached and immediately begin proceeding with the new execution
REPLACE = 3;
}
message LaunchPlanSpec {
Expand Down Expand Up @@ -113,7 +116,7 @@ If we wanted further parallelization here, we could introduce a worker pool rath

We should consider adding an index to the executions table to include
- launch_plan_id
- phase
- phase==PENDING only (in order to safeguard for well-populated flyteadmin instances with lots of completed, historical executions)
- created_at

##### Concurrency across launch plan versions
Expand Down Expand Up @@ -195,6 +198,7 @@ WHERE ( launch_plan_named_entity_id, created_at ) IN (SELECT launch_plan_named_
GROUP BY launch_plan_named_entity_id);
```

Note, in this proposal, registering a new version of the launch plan and setting it to active will determine the concurrency policy across all launch plan versions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️


#### Prior Art
The flyteadmin native scheduler (https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) already implements a reconciliation loop to catch up on any missed schedules.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly look at the prior art. The number of db queries is minimal. Actually one per cycle.

I would keep all running executions with concurrency_level set in memory and all lps with concurrency_level set in memory (only the concurrency policies)
We should periodically update these and its ok to be eventually consistent

Expand Down
Loading