Skip to content

Commit

Permalink
queing
Browse files Browse the repository at this point in the history
  • Loading branch information
HansVRP committed Feb 3, 2025
1 parent 617dfd9 commit 62fad7d
Showing 1 changed file with 32 additions and 12 deletions.
44 changes: 32 additions & 12 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import warnings
from pathlib import Path
from threading import Thread, Semaphore, Lock
from queue import Queue


from typing import (
Any,
Expand Down Expand Up @@ -558,29 +560,47 @@ def _get_jobs_to_launch(self, not_started, per_backend):

return jobs_to_add

def _run_job_threads(self, jobs_to_add, start_job, not_started, stats, job_db):
"""Manages threading for job launching."""
semaphore = Semaphore(self._max_concurrent_job_launch)
threads = []
def _run_job_threads(
self,
jobs_to_add: list,
start_job: Callable[[], BatchJob],
not_started: pd.DataFrame,
stats: Optional[dict],
job_db: JobDatabaseInterface
) -> None:
"""Manages threading for job launching using a queue and thread pool."""
job_queue = Queue()

# Fill the queue with jobs to launch
for i, backend_name in jobs_to_add:
job_queue.put((i, backend_name))

def job_worker(i, backend_name):
with semaphore:
def job_worker():
while not job_queue.empty():
i, backend_name = job_queue.get()
try:
# Process job
self._launch_job(start_job, not_started, i, backend_name, stats)
stats["job launch"] += 1

with self._db_lock:
job_db.persist(not_started.loc[i : i + 1])

stats["job_db persist"] += 1
job_db.persist(not_started.loc[i: i + 1])
stats["job_db persist"] += 1
except Exception as e:
_log.error(f"Job launch failed for index {i}: {e}")
finally:
job_queue.task_done()

for i, backend_name in jobs_to_add:
thread = Thread(target=job_worker, args=(i, backend_name))
# Create a pool of threads that work concurrently
num_threads = min(len(jobs_to_add), self._max_concurrent_job_launch)
threads = []

for _ in range(num_threads):
thread = Thread(target=job_worker)
thread.start()
threads.append(thread)

# Wait for all jobs in the queue to be processed
for thread in threads:
thread.join()

Expand Down

0 comments on commit 62fad7d

Please sign in to comment.