Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
HansVRP committed Jan 8, 2025
1 parent 4f4437e commit f889eef
Showing 1 changed file with 14 additions and 34 deletions.
48 changes: 14 additions & 34 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,56 +656,29 @@ def on_job_cancel(self, job: BatchJob, row):
"""
pass

def _cancel_prolonged_job(self, job: BatchJob, row, df):
def _cancel_prolonged_job(self, job: BatchJob, row):
"""Cancel the job if it has been running for too long."""
try:
# Ensure running start time is valid
running_start_time = self._ensure_running_start_time(job, row, df)

# Get the current time in RFC 3339 format (timezone-aware)
current_time_rfc3339 = rfc3339.utcnow()

job_running_start_time = rfc3339.parse_datetime(row.get("running_start_time"), with_timezone=True)

# Parse the current time into a datetime object with timezone info
current_time = rfc3339.parse_datetime(current_time_rfc3339, with_timezone=True)
current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True)

# Calculate the elapsed time between job start and now
elapsed = current_time - running_start_time
elapsed = current_time - job_running_start_time

if elapsed > self._cancel_running_job_after:
try:
_log.info(
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {running_start_time})"
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})"
)
job.stop()
except OpenEoApiError as e:
_log.error(f"Failed to cancel long-running job {job.job_id}: {e}")
except Exception as e:
_log.error(f"Unexpected error while handling job {job.job_id}: {e}")

def _ensure_running_start_time(self, job: BatchJob, row, df) -> datetime.datetime:
"""
Ensures the running start time is valid. If missing, approximates with the current time.
Returns the parsed running start time as a datetime object.
"""
running_start_time_str = row.get("running_start_time")

if not running_start_time_str or pd.isna(running_start_time_str):
_log.warning(
f"Job {job.job_id} does not have a valid running start time. Setting the current time as an approximation."
)
# Generate the current time in RFC 3339 format
current_time_rfc3339 = rfc3339.utcnow()

# Update the DataFrame safely using `.loc`
df.loc[df.index[row.name], "running_start_time"] = current_time_rfc3339

# Parse and return the datetime object with UTC timezone
return rfc3339.parse_datetime(current_time_rfc3339, with_timezone=True)

# Parse the existing time string and return it
return rfc3339.parse_datetime(running_start_time_str, with_timezone=True)


def get_job_dir(self, job_id: str) -> Path:
"""Path to directory where job metadata, results and error logs are be saved."""
return self._root_dir / f"job_{job_id}"
Expand Down Expand Up @@ -765,7 +738,14 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
self.on_job_cancel(the_job, active.loc[i])

if self._cancel_running_job_after and new_status == "running":
self._cancel_prolonged_job(the_job, active.loc[i], active)
if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])):
_log.warning(
f"Unknown 'running_start_time' for running job {job_id}. Using current time as an approximation."
)
stats["job started running"] += 1
active.loc[i, "running_start_time"] = rfc3339.utcnow()

self._cancel_prolonged_job(the_job, active.loc[i])

active.loc[i, "status"] = new_status

Expand Down

0 comments on commit f889eef

Please sign in to comment.