diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f28fc15e..394d43c63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- `MultiBackendJobManager`: Fix issue with duplicate job starting across multiple backends ([#654](https://github.com/Open-EO/openeo-python-client/pull/654)) + ## [0.34.0] - 2024-10-31 diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 65a4c0309..18ac8fe80 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -499,16 +499,18 @@ def _job_update_loop( stats["job_db get_by_status"] += 1 per_backend = running.groupby("backend_name").size().to_dict() _log.info(f"Running per backend: {per_backend}") + total_added = 0 for backend_name in self.backends: backend_load = per_backend.get(backend_name, 0) if backend_load < self.backends[backend_name].parallel_jobs: to_add = self.backends[backend_name].parallel_jobs - backend_load - for i in not_started.index[0:to_add]: + for i in not_started.index[total_added : total_added + to_add]: self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats) stats["job launch"] += 1 job_db.persist(not_started.loc[i : i + 1]) stats["job_db persist"] += 1 + total_added += 1 def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None): """Helper method for launching jobs diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 31412b63e..49d131d5f 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -118,7 +118,7 @@ def start_job(row, connection, **kwargs): assert run_stats == dirty_equals.IsPartialDict( { "sleep": dirty_equals.IsInt(gt=10), - "start_job call": 7, # TODO? + "start_job call": 5, "job started running": 5, "job finished": 5, "job_db persist": dirty_equals.IsInt(gt=5), @@ -162,7 +162,7 @@ def start_job(row, connection, **kwargs): assert run_stats == dirty_equals.IsPartialDict( { "sleep": dirty_equals.IsInt(gt=10), - "start_job call": 7, # TODO? + "start_job call": 5, "job started running": 5, "job finished": 5, "job_db persist": dirty_equals.IsInt(gt=5), @@ -198,7 +198,7 @@ def start_job(row, connection, **kwargs): run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) assert run_stats == dirty_equals.IsPartialDict( { - "start_job call": 7, # TODO? + "start_job call": 5, "job finished": 5, "job_db persist": dirty_equals.IsInt(gt=5), } @@ -233,7 +233,7 @@ def start_job(row, connection, **kwargs): run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) assert run_stats == dirty_equals.IsPartialDict( { - "start_job call": 7, # TODO? + "start_job call": 5, "job finished": 5, "job_db persist": dirty_equals.IsInt(gt=5), }