Skip to content

Commit

Permalink
pull job action away from track_statuses
Browse files Browse the repository at this point in the history
  • Loading branch information
HansVRP committed Jan 22, 2025
1 parent d4f976c commit 90e5f8a
Showing 1 changed file with 32 additions and 12 deletions.
44 changes: 32 additions & 12 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def __init__(
poll_sleep: int = 60,
root_dir: Optional[Union[str, Path]] = ".",
*,
download: Optional[bool] = True,
cancel_running_job_after: Optional[int] = None,
):
"""Create a MultiBackendJobManager."""
Expand All @@ -220,6 +221,7 @@ def __init__(
self._cancel_running_job_after = (
datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None
)
self._download = download
self._thread = None

def add_backend(
Expand Down Expand Up @@ -517,7 +519,7 @@ def _job_update_loop(
stats = stats if stats is not None else collections.defaultdict(int)

with ignore_connection_errors(context="get statuses"):
self._track_statuses(job_db, stats=stats)
jobs_done, jobs_error, jobs_canceled = self._track_statuses(job_db, stats=stats)
stats["track_statuses"] += 1

not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
Expand All @@ -540,6 +542,16 @@ def _job_update_loop(
stats["job_db persist"] += 1
total_added += 1

# Act on jobs
for job, metadata in jobs_error:
self.on_job_error(job, metadata)

for job, metadata in jobs_canceled:
self.on_job_cancel(job, metadata)

for job, metadata in jobs_done:
self.on_job_done(job, metadata)

def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None):
"""Helper method for launching jobs
Expand Down Expand Up @@ -618,15 +630,16 @@ def on_job_done(self, job: BatchJob, row):
# TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use?

job_metadata = job.describe()
job_dir = self.get_job_dir(job.job_id)
metadata_path = self.get_job_metadata_path(job.job_id)

self.ensure_job_dir_exists(job.job_id)
job.get_results().download_files(target=job_dir)

with metadata_path.open("w", encoding="utf-8") as f:
json.dump(job_metadata, f, ensure_ascii=False)

if self._download:
job_dir = self.get_job_dir(job.job_id)
self.ensure_job_dir_exists(job.job_id)
job.get_results().download_files(target=job_dir)


def on_job_error(self, job: BatchJob, row):
"""
Handles jobs that stopped with errors. Can be overridden to provide custom behaviour.
Expand Down Expand Up @@ -704,6 +717,11 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
stats = stats if stats is not None else collections.defaultdict(int)

active = job_db.get_by_status(statuses=["created", "queued", "running"]).copy()

jobs_done = []
jobs_error = []
jobs_canceled = []

for i in active.index:
job_id = active.loc[i, "id"]
backend_name = active.loc[i, "backend_name"]
Expand All @@ -722,20 +740,20 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =

if new_status == "finished":
stats["job finished"] += 1
self.on_job_done(the_job, active.loc[i])
jobs_done.append((the_job, active.loc[i]))

if previous_status != "error" and new_status == "error":
stats["job failed"] += 1
self.on_job_error(the_job, active.loc[i])

if previous_status in {"created", "queued"} and new_status == "running":
stats["job started running"] += 1
active.loc[i, "running_start_time"] = rfc3339.utcnow()
jobs_error.append((the_job, active.loc[i]))

if new_status == "canceled":
stats["job canceled"] += 1
self.on_job_cancel(the_job, active.loc[i])

if previous_status in {"created", "queued"} and new_status == "running":
stats["job started running"] += 1
active.loc[i, "running_start_time"] = rfc3339.utcnow()

if self._cancel_running_job_after and new_status == "running":
if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])):
_log.warning(
Expand Down Expand Up @@ -763,6 +781,8 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
stats["job_db persist"] += 1
job_db.persist(active)

return jobs_done, jobs_error, jobs_canceled


def _format_usage_stat(job_metadata: dict, field: str) -> str:
value = deep_get(job_metadata, "usage", field, "value", default=0)
Expand Down

0 comments on commit 90e5f8a

Please sign in to comment.