Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v4: Fix broken job lease extension #1030

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

JacobHayes
Copy link
Contributor

@JacobHayes JacobHayes commented Mar 5, 2025

Changes

The extend_job_leases function was checking for RunState.started in a while loop, however the coroutine is first executed by the (already running) job executors when the scheduler is still RunState.starting. This caused the loop condition to immediately evaluate to False and the function to return. This commit fixes that by checking for both RunState.starting and RunState.started in extend_job_leases, similar to the cleanup task.

With the MemoryDataStore, if a job ran longer than the lease and past the next scheduled execution, then the exact same job would be executed again (along with a job for the next schedule). The first execution cleans up when it finally finishes, but then the second execution fails in cleanup.

The issue primarily affects the MemoryDataStore because its acquire_jobs method seems to acquire even expired jobs (which the scheduler re-executes) more aggressively than the other data stores, which have different logic for discarding jobs / handling task job slots.

This commit partially fixes #803, #834, #952, #967, and maybe others. I suspect there are more bugs lurking that I haven't pinned down yet, likely related to the acquiring, releasing, and perhaps cleanup logic.


Do you want me to add the repro from #952 (comment) as a test, ensuring it doesn't fail after say 10 seconds or something? It's admittedly a bit contrived (and hard to prove a negative), but I think represents the same issue I've been seeing with longer timespans.

Checklist

If this is a user-facing code change, like a bugfix or a new feature, please ensure that
you've fulfilled the following conditions (where applicable):

  • You've added tests (in tests/) added which would fail without your patch
  • You've updated the documentation (in docs/, in case of behavior changes or new
    features)
  • You've added a new changelog entry (in docs/versionhistory.rst).

If this is a trivial change, like a typo fix or a code reformatting, then you can ignore
these instructions.

Updating the changelog

If there are no entries after the last release, use **UNRELEASED** as the version.
If, say, your patch fixes issue #999, the entry should look like this:

* Fix big bad boo-boo in the async scheduler (#999 <https://github.com/agronholm/apscheduler/issues/999>_; PR by @yourgithubaccount)

If there's no issue linked, just link to your pull request instead by updating the
changelog after you've created the PR.

@coveralls
Copy link

coveralls commented Mar 5, 2025

Coverage Status

coverage: 92.018% (+0.03%) from 91.991%
when pulling 89f2d98 on JacobHayes:push-ykyxnnxlrknz
into 89151a2 on agronholm:master.

@JacobHayes JacobHayes mentioned this pull request Mar 5, 2025
3 tasks
The `extend_job_leases` function was checking for `RunState.started` in a `while` loop,
however the coroutine is first executed by the (already running) job executors when the
scheduler is still `RunState.starting`. This caused the loop condition to immediately
evaluate to `False` and the function to return. This commit fixes that by checking for
both `RunState.starting` and `RunState.started` in `extend_job_leases`, similar to the
`cleanup` task.

With the `MemoryDataStore`, if a job ran longer than the lease *and* past the next
scheduled execution, then the exact same job would be executed again (along with a job
for the next schedule). The first execution cleans up when it finally finishes, but then
  the second execution fails in cleanup.

The issue primarily affects the `MemoryDataStore` because its `acquire_jobs` method
seems to acquire even expired jobs (which the scheduler re-executes) more aggressively
than the other data stores, which have different logic for discarding jobs / handling
task job slots.

This commit partially fixes agronholm#803, agronholm#834, agronholm#952, agronholm#967, and maybe others. I suspect there
are more bugs lurking that I haven't pinned down yet, likely related to the acquiring,
releasing, and perhaps cleanup logic.
@agronholm
Copy link
Owner

I'll have to take my time evaluating this, first refreshing my memory about the v4 code base and how the lease extensions work. I'll try to do it this week.

@JacobHayes
Copy link
Contributor Author

JacobHayes commented Mar 6, 2025

Thanks, I'm happy to give any more context on my digging or debugging if it'd be helpful! I'm not sure how/if the lease extension ever worked, even in the initial commit (e91c8de) the extend_job_leases task was scheduled (and likely executed by the running executors) before the await self.get_next_event(SchedulerStarted).

One quick way to check this is by just adding a print("Ending extend_job_leases") after the while loop in extend_job_leases with and without this PR. Without the PR, it just exits immediately (and nothing else ever schedules it again).

@b4stien
Copy link
Contributor

b4stien commented Mar 6, 2025

FYI I'm testing this in prod on a small/non-critical app which was triggering the bug ~10 times a day. I'll report in a couple of days with the output.

@agronholm
Copy link
Owner

FYI I'm testing this in prod on a small/non-critical app which was triggering the bug ~10 times a day. I'll report in a couple of days with the output.

You're quite brave to run an alpha version in production!

@HK-Mattew
Copy link
Contributor

You're quite brave to run an alpha version in production!

There are some crazy people following you hahah.

Well, I'm also using it in production and honestly, for me, there are no more bugs. I've had APScheduler V4 processes running for over 3 months and not a single error has occurred (and it's working very well).

I believe that for this lib to come out as a stable version, there are only a few things missing.

@JacobHayes
Copy link
Contributor Author

Yeah, I had it running in a prototype service for a month with no issues, but as I added tasks it would crash maybe once every other week. Earlier this week though, I added some tasks with 1-minute frequencies and it was crashing ~1/day. I think a lot of the issues are just small boundary or race conditions, so really depends on chance (but increasing scheduling/cleanup/etc frequencies increases your rolls of 🎲 haha).

Otherwise, the interface of v4 is quite nice and the type hints are appreciated too. :)

@agronholm
Copy link
Owner

OK, so I've taken an initial look and refreshed my memory about this a bit. While I think you've correctly recognized that the loop will exit prematurely, the underlying issues with this functionality run a bit deeper. What needs to happen when the scheduler is starting is that any jobs left in the running state by this scheduler should be reaped, marked as abandoned. This is done in a separate maintenance loop (AsyncScheduler._cleanup_loop). What's missing, I think, is that right when the scheduler starts, it should clean up any such leftover jobs for that particular scheduler, even if the lease duration hasn't expired yet. This step should be done before the first run of extend_acquired_job_leases(). I will work on a proper fix for this.

@JacobHayes
Copy link
Contributor Author

JacobHayes commented Mar 9, 2025

To confirm I'm following, you're saying we need to avoid extending the leases of jobs leftover from other (presumably dead) schedulers, correct? Or, are you referring to the same scheduler object that is stopped and then later restarted? (I'm admittedly a bit fuzzy on the multiple scheduler scenario since I've mostly dealt with the memory data store with 1 start of the scheduler per process.)

Thinking out loud a bit, since extend_acquired_job_leases only looks at the jobs in the local object's self._running_jobs:

  • it wouldn't affect jobs from previous scheduler objects (leaving them for cleanup, though not until after the normal lease ends)
  • could we just clear self._running_jobs when the scheduler starts up? On the first start, it'll obviously be empty and on the second start, we'd clear it (leaving cleanup to handle them, as above) so we only extend leases for newly started jobs.

Separately, another thought I had was that perhaps instead of a general "extend leases" coroutine, we could create one as a sibling of each job coroutine in the same TaskGroup. It's probably more coroutines overall assuming you have concurrent jobs, but no extend steps between jobs since it'd have the same lifetime due to normal cancelation semantics and would allow tuning the lease duration per task (if that's desirable).

@agronholm
Copy link
Owner

To confirm I'm following, you're saying we need to avoid extending the leases of jobs leftover from other (presumably dead) schedulers, correct? Or, are you referring to the same scheduler object that is stopped and then later restarted? (I'm admittedly a bit fuzzy on the multiple scheduler scenario since I've mostly dealt with the memory data store with 1 start of the scheduler per process.)

Thinking out loud a bit, since extend_acquired_job_leases only looks at the jobs in the local object's self._running_jobs:

* it wouldn't affect jobs from previous scheduler objects (leaving them for cleanup, though not until after the normal lease ends)

* could we just clear `self._running_jobs` when the scheduler starts up? On the first start, it'll obviously be empty and on the second start, we'd clear it (leaving cleanup to handle them, as above) so we only extend leases for newly started jobs.

I think I got confused myself, I forgot how extend_acquired_job_leases() worked. If the app crashed, then self._running_jobs would indeed be empty. But we still need some kind of operation to deal with any schedules and jobs that were left in an acquired state by the scheduler before it crashed. Perhaps this should be handled in the start() method? I'd need to change the signature to pass in the scheduler ID for that to work.

Separately, another thought I had was that perhaps instead of a general "extend leases" coroutine, we could create one as a sibling of each job coroutine in the same TaskGroup. It's probably more coroutines overall assuming you have concurrent jobs, but no extend steps between jobs since it'd have the same lifetime due to normal cancelation semantics and would allow tuning the lease duration per task (if that's desirable).

I read this paragraph twice and was still left confused. What is it that you're actually proposing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

APScheduler 4.0.0 Bug
5 participants