Skip to content

Commit

Permalink
Merge branch 'pr654-multibackend-fix'
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Nov 5, 2024
2 parents a2d43d2 + e419e14 commit 1fef132
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- `MultiBackendJobManager`: Fix issue with duplicate job starting across multiple backends ([#654](https://github.com/Open-EO/openeo-python-client/pull/654))


## [0.34.0] - 2024-10-31

Expand Down
4 changes: 3 additions & 1 deletion openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,16 +499,18 @@ def _job_update_loop(
stats["job_db get_by_status"] += 1
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
total_added = 0
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = self.backends[backend_name].parallel_jobs - backend_load
for i in not_started.index[0:to_add]:
for i in not_started.index[total_added : total_added + to_add]:
self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats)
stats["job launch"] += 1

job_db.persist(not_started.loc[i : i + 1])
stats["job_db persist"] += 1
total_added += 1

def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None):
"""Helper method for launching jobs
Expand Down
8 changes: 4 additions & 4 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def start_job(row, connection, **kwargs):
assert run_stats == dirty_equals.IsPartialDict(
{
"sleep": dirty_equals.IsInt(gt=10),
"start_job call": 7, # TODO?
"start_job call": 5,
"job started running": 5,
"job finished": 5,
"job_db persist": dirty_equals.IsInt(gt=5),
Expand Down Expand Up @@ -162,7 +162,7 @@ def start_job(row, connection, **kwargs):
assert run_stats == dirty_equals.IsPartialDict(
{
"sleep": dirty_equals.IsInt(gt=10),
"start_job call": 7, # TODO?
"start_job call": 5,
"job started running": 5,
"job finished": 5,
"job_db persist": dirty_equals.IsInt(gt=5),
Expand Down Expand Up @@ -198,7 +198,7 @@ def start_job(row, connection, **kwargs):
run_stats = manager.run_jobs(job_db=job_db, start_job=start_job)
assert run_stats == dirty_equals.IsPartialDict(
{
"start_job call": 7, # TODO?
"start_job call": 5,
"job finished": 5,
"job_db persist": dirty_equals.IsInt(gt=5),
}
Expand Down Expand Up @@ -233,7 +233,7 @@ def start_job(row, connection, **kwargs):
run_stats = manager.run_jobs(job_db=job_db, start_job=start_job)
assert run_stats == dirty_equals.IsPartialDict(
{
"start_job call": 7, # TODO?
"start_job call": 5,
"job finished": 5,
"job_db persist": dirty_equals.IsInt(gt=5),
}
Expand Down

0 comments on commit 1fef132

Please sign in to comment.