From f889eefe3507bf34baef71ba26a8c0a8140cb34b Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Thu, 5 Dec 2024 11:44:50 +0100 Subject: [PATCH] simplify --- openeo/extra/job_management/__init__.py | 48 ++++++++----------------- 1 file changed, 14 insertions(+), 34 deletions(-) diff --git a/openeo/extra/job_management/__init__.py b/openeo/extra/job_management/__init__.py index fed17b222..26cd66f27 100644 --- a/openeo/extra/job_management/__init__.py +++ b/openeo/extra/job_management/__init__.py @@ -656,25 +656,22 @@ def on_job_cancel(self, job: BatchJob, row): """ pass - def _cancel_prolonged_job(self, job: BatchJob, row, df): + def _cancel_prolonged_job(self, job: BatchJob, row): """Cancel the job if it has been running for too long.""" try: # Ensure running start time is valid - running_start_time = self._ensure_running_start_time(job, row, df) - - # Get the current time in RFC 3339 format (timezone-aware) - current_time_rfc3339 = rfc3339.utcnow() - + job_running_start_time = rfc3339.parse_datetime(row.get("running_start_time"), with_timezone=True) + # Parse the current time into a datetime object with timezone info - current_time = rfc3339.parse_datetime(current_time_rfc3339, with_timezone=True) + current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True) # Calculate the elapsed time between job start and now - elapsed = current_time - running_start_time + elapsed = current_time - job_running_start_time if elapsed > self._cancel_running_job_after: try: _log.info( - f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {running_start_time})" + f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})" ) job.stop() except OpenEoApiError as e: @@ -682,30 +679,6 @@ def _cancel_prolonged_job(self, job: BatchJob, row, df): except Exception as e: _log.error(f"Unexpected error while handling job {job.job_id}: {e}") - def _ensure_running_start_time(self, job: BatchJob, row, df) -> datetime.datetime: - """ - Ensures the running start time is valid. If missing, approximates with the current time. - Returns the parsed running start time as a datetime object. - """ - running_start_time_str = row.get("running_start_time") - - if not running_start_time_str or pd.isna(running_start_time_str): - _log.warning( - f"Job {job.job_id} does not have a valid running start time. Setting the current time as an approximation." - ) - # Generate the current time in RFC 3339 format - current_time_rfc3339 = rfc3339.utcnow() - - # Update the DataFrame safely using `.loc` - df.loc[df.index[row.name], "running_start_time"] = current_time_rfc3339 - - # Parse and return the datetime object with UTC timezone - return rfc3339.parse_datetime(current_time_rfc3339, with_timezone=True) - - # Parse the existing time string and return it - return rfc3339.parse_datetime(running_start_time_str, with_timezone=True) - - def get_job_dir(self, job_id: str) -> Path: """Path to directory where job metadata, results and error logs are be saved.""" return self._root_dir / f"job_{job_id}" @@ -765,7 +738,14 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = self.on_job_cancel(the_job, active.loc[i]) if self._cancel_running_job_after and new_status == "running": - self._cancel_prolonged_job(the_job, active.loc[i], active) + if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])): + _log.warning( + f"Unknown 'running_start_time' for running job {job_id}. Using current time as an approximation." + ) + stats["job started running"] += 1 + active.loc[i, "running_start_time"] = rfc3339.utcnow() + + self._cancel_prolonged_job(the_job, active.loc[i]) active.loc[i, "status"] = new_status