Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Add execution concurrency #5659

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open

Conversation

katrogan
Copy link
Contributor

@katrogan katrogan commented Aug 13, 2024

Related Issues

#5125
#420
#267

Docs link

Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Copy link

codecov bot commented Aug 13, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 37.06%. Comparing base (d797f08) to head (c2f9fe3).
Report is 232 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff             @@
##           master    #5659       +/-   ##
===========================================
+ Coverage    9.74%   37.06%   +27.31%     
===========================================
  Files         214     1318     +1104     
  Lines       39190   132644    +93454     
===========================================
+ Hits         3820    49164    +45344     
- Misses      35031    79230    +44199     
- Partials      339     4250     +3911     
Flag Coverage Δ
unittests-datacatalog 51.58% <ø> (+0.21%) ⬆️
unittests-flyteadmin 54.33% <ø> (?)
unittests-flytecopilot 30.99% <ø> (+18.81%) ⬆️
unittests-flytectl 62.29% <ø> (?)
unittests-flyteidl 7.23% <ø> (+0.15%) ⬆️
unittests-flyteplugins 53.85% <ø> (?)
unittests-flytepropeller 42.70% <ø> (?)
unittests-flytestdlib 55.29% <ø> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

- created_at

#### Open Questions
- Should we always attempt to schedule pending executions in ascending order of creation time?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make it configurable? FIFO, FILO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I wasn't sure! any suggestions here? we could introduce an enum and choose fifo to begin with and expand support

Copy link

@granthamtaylor granthamtaylor Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have mixed thoughts on making the queue's order of execution configurable.

If we support a limited number of parallel executions (more than 1), the order of these executions would naturally start as FIFO up until that limit is reached.

To me, providing an option to begin executing FILO after that limit is reached feels confusing to me.

However, that brings a different question to mind: If multiple workflows are queued up, should we provide an option to enable loud notifications?

In other words, if backlogged executions have the possibility of impacting downstream operations, can we enable users to receive loud notifications, including the number of queued executions?

I can imagine a use case where: holiday shopping -> increased purchase volume -> increased data size -> multiple, consecutive execution delays -> cascading backlog of executions. In this scenario, the owners of the workflow may be out on leave and not be aware of the growing backlog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, we have workflow notifications enabled for terminal state but we've talked more about richer, customizable notifications and I think this slates neatly into that

I think for a v1 having the default behavior be fifo with an extended description/explanation for the pending state may provide some visibility here to start off with

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this suggestion of having an enum listing the policies to the Implementation details section?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can add a customers feedback here, where the desired behaviour is to actually replace (terminate) the current executions by subsequent executions. Sounds like too much for the initial scope but still interested if this would be possible to add later with the current approach?

Copy link
Contributor Author

@katrogan katrogan Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fiedlerNr9 added a section under Alternatives. I don't think this is precluded by this implementation but not in scope for this proposal atm

@katrogan katrogan marked this pull request as ready for review August 19, 2024 07:54
Copy link
Contributor

@eapolinario eapolinario left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking pretty good. I'd feel more comfortable if we fleshed out the implementation a bit more, but otherwise, I feel like we're on the same page.


```

### FlyteAdmin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During last week's contributors meeting someone asked a question about having this concurrency control work across versions. Can we either have a discussion in this PR about it or list that use case as not being supported explicitly in the RFC?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can say that something that works across versions would be really useful for us.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For us too because we very often pyflyte run which means we often don't have two executions of the same version.

This could be made configurable here:

 concurrency=Concurrency(
        max=1,  # defines how many executions with this launch plan can run in parallel
        policy=ConcurrencyPolicy.WAIT  # defines the policy to apply when the max concurrency is reached,
        level=ConcurrencyLevel.Version, # or ConcurrencyLevel.LaunchPlan
    )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @eapolinario @corleyma @fg91 for the feedback, I don't think this will be too much of a lift but added a proposal for different levels of precision here too

1. or fail the request when the concurrency policy is set to `ABORT`
1. Do not create the workflow CRD

Introduce an async reconciliation loop in FlyteAdmin to poll for all pending executions:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have prior art for this kind of reconciliation loop in flyteadmin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the scheduler!

rfc/system/RFC-0000-execution-concurrency.md Outdated Show resolved Hide resolved
- created_at

#### Open Questions
- Should we always attempt to schedule pending executions in ascending order of creation time?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this suggestion of having an enum listing the policies to the Implementation details section?


## 4 Metrics & Dashboards

*What are the main metrics we should be measuring? For example, when interacting with an external system, it might be the external system latency. When adding a new table, how fast would it fill up?*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this feature going to be rolled out? Should we have an explicit list of metrics used to help the health of the feature? (e.g. total number of attempts of a given launchplan )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting question. I think scheduling attempts here is based on the polling interval right? But could be useful to understand time spent in PENDING


## 5 Drawbacks

*Are there any reasons why we should not do this? Here we aim to evaluate risk and check ourselves.*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any reservations about more load on the DB (even with indexes, etc)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, we already have a ton of indices on executions - there is definitely a tradeoff to adding a new one

@katrogan
Copy link
Contributor Author

I'd feel more comfortable if we fleshed out the implementation a bit more, but otherwise, I feel like we're on the same page.

Sounds good, just wanted overall alignment before diving into the implementation. Will do that next and thank you already for all the feedback

@katrogan
Copy link
Contributor Author

added some more implementation details, mind taking another look @eapolinario

}
```

Furthermore, we may want to introduce a max pending period to fail executions that have been in `PENDING` for too long
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I agree that this would be good.

## 8 Unresolved questions

- Should we always attempt to schedule pending executions in ascending order of creation time?
- Decision: We'll use FIFO scheduling by default but can extend scheduling behavior with an enum going forward.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this has been decided (I'm ok with it), could you please reformulate in the text above where this is still discussed as an open question? 🙏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the discussion above!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please change the filename to include the PR number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks

@corleyma
Copy link

I'd add one thing, possibly out of scope for this RFC: it would be really nice to be able to define a "max execution concurrency" on the backend, either propeller-wide or per project/domain. Flyte would benefit from more controls that allow operators to protect quality of service and aren't dependent on workflow authors to set reasonable limits.

@katrogan
Copy link
Contributor Author

hi @corleyma thanks for reviewing! re your comment on platform-max execution concurrency, that's really intriguing - would you want to start a separate discussion on that here: https://github.com/flyteorg/flyte/discussions so we don't lose track of the suggestion?

execution namespace quota is meant to help address quality of service and fairness in a multitenant system but it would be cool to flesh out other mechanisms for managing overall executions

Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
@corleyma
Copy link

corleyma commented Sep 4, 2024

execution namespace quota is meant to help address quality of service and fairness in a multitenant system but it would be cool to flesh out other mechanisms for managing overall executions

execution namespace quota can help protect against workloads that would otherwise utilize too many cluster resources, but it doesn't really help protect e.g. flyte propeller from too many concurrent executions.

I am happy to start a separate conversation though!

ConcurrencyPolicy policy = 2;
}

enum ConcurrencyPolicy {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have a replace option also?
To stop the previous execution and replace it with the current one. This is what k8s job does.

https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#concurrency-policy

Would be great if the behaviour can be made as close to this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great point, updated

@davidmirror-ops
Copy link
Contributor

09/26/2024 Contributors sync notes: no updates

Signed-off-by: Katrina Rogan <[email protected]>
@@ -195,6 +198,7 @@ WHERE ( launch_plan_named_entity_id, created_at ) IN (SELECT launch_plan_named_
GROUP BY launch_plan_named_entity_id);
```

Note, in this proposal, registering a new version of the launch plan and setting it to active will determine the concurrency policy across all launch plan versions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

@kumare3
Copy link
Contributor

kumare3 commented Nov 22, 2024

I would not implement this with too many db calls. I would make the decision simple and completely in memory using Singletons like the scheduler. This is because

  1. We can easily update the algorithm of how to order, probably support backfilling etc
  2. Create priorities in the future
  3. Alleviate DB pressure and make it really scalable
    image

1. or fail the request when the concurrency policy is set to `ABORT`
1. Do not create the workflow CRD

Introduce an async reconciliation loop in FlyteAdmin to poll for all pending executions:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has to be done very carefully. Lets use the controller pattern. Just simply run everything in memory on one machine. We have seen with scheduler and propeller this is stable and highly scalable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes sorry for the lack of clarity, this is meant to follow the scheduler singleton pattern!

WHERE phase = 'PENDING'
GROUP BY launch_plan_id);
```
2. For each execution returned by the above query, `Add()` the pending execution to a [rate limiting workqueue](https://github.com/kubernetes/client-go/blob/master/util/workqueue/rate_limiting_queue.go#L27-L40) (as a suggestion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would not actually do this, this is because it avoids making across execution generalizations like priority etc. This is infact a scheduler.
Important thing to understand is when scheduler is and when is it not.

```
2. For each execution returned by the above query, `Add()` the pending execution to a [rate limiting workqueue](https://github.com/kubernetes/client-go/blob/master/util/workqueue/rate_limiting_queue.go#L27-L40) (as a suggestion)
3. In a separate goroutine, fetch items from the workqueue and individually process each execution entry
1. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions matching the launch plan ID in the pending execution model
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would do this once in memory and keep a copy of running executions in memory. Then as the plan changes we can simply update the inmemory state, reducing the db load greatly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we reconcile the terminated executions to unblock concurrency gates if we don't read from the db?

Note, in this proposal, registering a new version of the launch plan and setting it to active will determine the concurrency policy across all launch plan versions.

#### Prior Art
The flyteadmin native scheduler (https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) already implements a reconciliation loop to catch up on any missed schedules.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly look at the prior art. The number of db queries is minimal. Actually one per cycle.

I would keep all running executions with concurrency_level set in memory and all lps with concurrency_level set in memory (only the concurrency policies)
We should periodically update these and its ok to be eventually consistent

Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
@SebSIK
Copy link

SebSIK commented Nov 27, 2024

It was suggested I add a use case to this PR -- slack thread

I have a cron scheduled workflow (wf) that should not execute if the wf is already executing (from a previous schedule).

Why? The wf executes a legacy app that checks if new files are available on S3. If so, then the app begins processing, which involves other resources that allow only single use. Processing time can vary from minutes to hours, depending on the data received. If another process is started, both will eventually fail due to resource contention.

Considerations:

  • It is impractical to change the legacy app.
  • It is desired to set cron to a rather small period. So several wf's can start while the original is running.
  • It is not desirable to have tasks stack up waiting for the first to finish (i.e. cache serialize)

@LemontreeN
Copy link

Hello,
I would like to know if this feature is under development? We really need this feature!

UNSPECIFIED = 0;

// wait for previous executions to terminate before starting a new one
WAIT = 1;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, this policy is on the scheduler side, and wait essentially means the scheduler does not fire request to admin before the previous one is completed ?

WAIT = 1;

// fail the CreateExecution request and do not permit the execution to start
ABORT = 2;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For abort, since this is part of the policy in the scheduler, how does it address the issue with other workflows that does not go through shceduler ?
such as #5125
where users can create workflows without schedule and still create lot of workflows

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't go through the native scheduler but rather is managed by a separate async routine that inspects all pending executions

Copy link

@ZhouGongZiBBS ZhouGongZiBBS Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this help with the native scheduler ? (in the case where the cron schedule is frequent, before the next run starts, the previous run has not finished, and we want to ensure only 1 execution there.)

from my understanding, we can specify ABORT or WAIT:

  • If we specify ABORT for a LP schedule, the desired effect we wanted to achieve is period = max(schedule, run_time), while in this case, there will be some executions being aborted leaving "space" between executions (they are not connected head to tail): imagine a 5 mins periodic schedule, while each run take ~6 mins. When the second run starts, it sees the previous run is still running, so it is ABORTED, then flyte would wait for the third run to start, which takes a bit more time than desired. I.e. not exactly by the time the previous execution finishes.
  • If we specify WAIT for the schedule, the cron job will all be PENDING state in the DB. In this case, we can ensure the 1 execution at a time exactly, however, the DB will grow larger as time goes. (we are pushing executions into the DB instead of holding them back/not starting them)

(my first comment) Should we have sth in the native/original scheduler, so it has the knowledge when the previous execution finished, before starting the next execution
(so it waits a bit even when it is the time for next execution, with max=1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think separation of concerns may be helpful here to simplify the implementation, wdyt? The schedule is only responsible for managing create execution requests at a periodic interval, the concurrency controller will handle when to actually create execution objects for all pending executions.

Abort scenario: I would argue that if a workflow execution takes 6 minutes to execute, and the schedule interval is every 5 minutes then this represents an incompatibility in the schedule, it's basically destined to keep terminating executions before they complete. Maybe a future investment would be a workflow triggering itself as a final step before completing, if you wanted to daisy chain executions.

Wait scenario: I would also argue this is misconfigured and the schedule interval should be increased to allow the average duration job to complete. In any case the execution load remains the same, we're just delaying the processing with the queue build-up like you mentioned.

@thomasjhuang
Copy link

Hello, I would like to know if this feature is under development? We really need this feature!

We are in progress! Still cleaning up final parts of the RFC and I think implementation can be underway soon.

@wild-endeavor
Copy link
Contributor

Met with met up with @ ZhouGongZi_yizluo and @thomasjhuang yesterday to discuss this RFC. We just went over the doc in detail to make sure we were all on the same page. Have a series of clarifications we wanted more information/questions that we weren’t able to reach a conclusion on, detailed below.

  • Reiterate we’re not going to do concurrency per version. Need a compelling user story here to justify this work.
  • We asked/discussed at the end of whether there was talk ever of having an Abort threshold - like the behavior should be to wait, until there’s a backup of N executions all in Pending state. At that point, the N + 1 execution should fail immediately. Definitely also not doing this unless there is a compelling user requirement… it adds a lot of complexity to the project unless we can really justify it.
  • In “Creating an execution” item 2 - two questions here: a) what’s the scenario in which the process tries to write QUEUED and the database already has QUEUED ? and b) by “conditionally mark” you just mean a sql statement that checks the current value first right? (compare-and-swap semantics).
  • In “Creating an execution” item 3 - from “and swallow any errors…” onwards we didn’t really understand. Could you clarify this point please?
    • one scenario we wanted clarity on: let’s say step 1 completes (“create the workflow CRD”). Before step 2 can run, the process crashes, the node running the concurrency manager goes away completely. When it comes up again, it’ll again query the Admin database and see that the execution is still Pending (let’s say propeller hasn’t picked up the crd yet). So we enter the execution create cycle again, only this time let’s say there’s a network partition, and it fails, the kube api is down. But in the background propeller now starts working on the CRD and updates admin to Running. At this point we should move to step 3 right, attempt to mark the execution as Failed?
      • related question due to my lack of understanding of the K8s workqueue library - can the same execution be in the workqueue twice? That is, in between steps 1 and 2, is there a chance that another goroutine comes up and tries to run step 1 (‘create the workflow CRD’) again?
  • In “Launch Plan informer” section, the last paragraph discusses the launch plan informer object running queries in the background to get updated launch plan concurrency information. The last sentence though is: “If an execution has terminated since the last time the query ran, it won’t be in the result set and we’ll want to update the in memory map to remove the execution.” What does executions terminating have anything to do with launch plan table update times?
  • In “Introduce the Concurrency Controller” - item 2. ii. - “update the map…“. By ‘update’ we mean replace right? Otherwise the Sets will just grow forever.
  • In “Introduce the Concurrency Controller” - item 4. i. - “Check a separate in-memory map…“. Shouldn’t this just be the launch plan informer? Or are there two of these informers?

Other points:

  • Was there some consideration given to the ‘active’ launch plan marker? One core use-case for this concurrency feature is to make sure only one scheduled workflow is running at a given time. Given that schedules key off of the active launch plan, would it make sense that the concurrency setting on the active launch plan be the one that’s used rather than the latest version by time?
  • We should have some sort of a CLI that can show the current status of pending executions. What this API looks like will is to be determined by Thomas, Yizhou et. al. in the course of implementing this feature - they’ll learn what is helpful to see/debug.

Copy link
Contributor

@hamersaw hamersaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has there been previous conversation about the term "concurrency"? My concern is this will be difficult to disambiguate from the parallelism / max-parallelism we can define at the LP level to define the number of tasks within a workflow that can run at the same time. In the cache_serialize proposal we had some similar conversation and decided to use a completely separate term, ie. serialize. IMO something like SerializationStrategy aligns with the cache_serialize concept and disambiguates from any parallelism controls we have elsewhere.

```

### FlyteIDL
We propose adding a new IDL message to capture concurrency behavior at CreateExecutionTime and embedding it in the existing [Schedule](https://github.com/flyteorg/flyte/blob/master/flyteidl/protos/flyteidl/admin/schedule.proto) message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason to embed this in the Schedule proto? Does this mean it only applies to executions which are scheduled? So concurrency policy does not apply if a user manually starts a workflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was originally a suggestion from Ketan since it applies to scheduling policies

@katrogan
Copy link
Contributor Author

katrogan commented Jan 21, 2025

@wild-endeavor, @ZhouGongZiBBS and @thomasjhuang thank you for the very detailed feedback, I've updated the RFC in response but wanted to address individual points in a reply as well

Reiterate we’re not going to do concurrency per version. Need a compelling user story here to justify this work.

Done, called this out explicitly as a non-goal although included a potential design approach in case we decide to pursue this in the future to ensure this is option isn't closed off to us

We asked/discussed at the end of whether there was talk ever of having an Abort threshold - like the behavior should be to wait, until there’s a backup of N executions all in Pending state. At that point, the N + 1 execution should fail immediately. Definitely also not doing this unless there is a compelling user requirement… it adds a lot of complexity to the project unless we can really justify it.

Really interesting point, we could always extend the SchedulerPolicy with the abort threshold and have the concurrency controller manage the auto-aborts.

In “Creating an execution” item 2 - two questions here: a) what’s the scenario in which the process tries to write QUEUED and the database already has QUEUED ? and b) by “conditionally mark” you just mean a sql statement that checks the current value first right? (compare-and-swap semantics).

a) For the state machine I envisioned something that could recover at any failure point. So if for whatever reason the process stops after we write a QUEUED event and we reboot the concurrency controller, we shouldn't undo the execution progress we've already recorded.
b) yes exactly! clarified in the doc

In “Creating an execution” item 3 - from “and swallow any errors…” onwards we didn’t really understand. Could you clarify this point please?

  • one scenario we wanted clarity on: let’s say step 1 completes (“create the workflow CRD”). Before step 2 can run, the process crashes, the node running the concurrency manager goes away completely. When it comes up again, it’ll again query the Admin database and see that the execution is still Pending (let’s say propeller hasn’t picked up the crd yet). So we enter the execution create cycle again, only this time let’s say there’s a network partition, and it fails, the kube api is down. But in the background propeller now starts working on the CRD and updates admin to Running. At this point we should move to step 3 right, attempt to mark the execution as Failed?
  • related question due to my lack of understanding of the K8s workqueue library - can the same execution be in the workqueue twice? That is, in between steps 1 and 2, is there a chance that another goroutine comes up and tries to run step 1 (‘create the workflow CRD’) again?

Great points, updated this so we only overwrite for 'PENDING' or 'QUEUED' statuses if the execution progresses due to network partition.
Re: same execution, this will have an identical execution identifier, correct? We upstreamed #6025 which gracefully handles re-enqueing the same item.

In “Launch Plan informer” section, the last paragraph discusses the launch plan informer object running queries in the background to get updated launch plan concurrency information. The last sentence though is: “If an execution has terminated since the last time the query ran, it won’t be in the result set and we’ll want to update the in memory map to remove the execution.” What does executions terminating have anything to do with launch plan table update times?

I have no idea, I'm sorry. I updated this section.

In “Introduce the Concurrency Controller” - item 2. ii. - “update the map…“. By ‘update’ we mean replace right? Otherwise the Sets will just grow forever.

Yes, just overwriting the map entry. Clarified in the doc.

In “Introduce the Concurrency Controller” - item 4. i. - “Check a separate in-memory map…“. Shouldn’t this just be the launch plan informer? Or are there two of these informers?

Yes clarified, thank you!

Was there some consideration given to the ‘active’ launch plan marker? One core use-case for this concurrency feature is to make sure only one scheduled workflow is running at a given time. Given that schedules key off of the active launch plan, would it make sense that the concurrency setting on the active launch plan be the one that’s used rather than the latest version by time?

Yes this is exactly what we should use to get the active Scheduler Policy! I clarified the proposal but let me know if this is still ambiguous.

We should have some sort of a CLI that can show the current status of pending executions. What this API looks like will is to be determined by Thomas, Yizhou et. al. in the course of implementing this feature - they’ll learn what is helpful to see/debug.

This sounds like an excellent plan!

Signed-off-by: Katrina Rogan <[email protected]>
@flyte-bot
Copy link
Collaborator

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Review
Development

Successfully merging this pull request may close these issues.