Skip to content

Commit

Permalink
add run time information
Browse files Browse the repository at this point in the history
  • Loading branch information
gpetretto committed Jul 12, 2023
1 parent e3bae86 commit a6f8915
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 4 deletions.
15 changes: 14 additions & 1 deletion src/jobflow_remote/cli/formatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from jobflow_remote.cli.utils import ReprStr, fmt_datetime
from jobflow_remote.config.base import ExecutionConfig, WorkerBase
from jobflow_remote.jobs.data import FlowInfo, JobInfo
from jobflow_remote.jobs.state import JobState
from jobflow_remote.jobs.state import JobState, RemoteState
from jobflow_remote.utils.data import remove_none


Expand All @@ -26,6 +26,7 @@ def get_job_info_table(jobs_info: list[JobInfo], verbosity: int):

if verbosity >= 1:
table.add_column("Queue id")
table.add_column("Run time")
table.add_column("Retry time")
table.add_column("Prev state")
if verbosity < 2:
Expand Down Expand Up @@ -56,6 +57,18 @@ def get_job_info_table(jobs_info: list[JobInfo], verbosity: int):

if verbosity >= 1:
row.append(ji.queue_job_id)
prefix = ""
if ji.remote_state == RemoteState.RUNNING:
run_time = ji.estimated_run_time
prefix = "~"
else:
run_time = ji.run_time
if run_time:
m, s = divmod(run_time, 60)
h, m = divmod(m, 60)
row.append(prefix + f"{h:g}:{m:02g}")
else:
row.append("")
row.append(
ji.retry_time_limit.strftime(fmt_datetime)
if ji.retry_time_limit
Expand Down
11 changes: 10 additions & 1 deletion src/jobflow_remote/fireworks/launchpad.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class RemoteRun:
lock_time: datetime.datetime | None = None
process_id: str | None = None
run_dir: str | None = None
start_time: datetime.datetime | None = None
end_time: datetime.datetime | None = None

def as_db_dict(self):
d = asdict(self)
Expand Down Expand Up @@ -188,8 +190,15 @@ def recover_remote(
)
already_running = True

# Fixed with respect to fireworks.
# Otherwise the created_on for RUNNING state is wrong
if not already_running:
m_launch.state = "RUNNING" # this should also add a history item
for s in m_launch.state_history:
if s["state"] == "RUNNING":
s["created_on"] = reconstitute_dates(
remote_status["started_on"]
)

status = remote_status.get("state")
if terminated and status not in ("COMPLETED", "FIZZLED"):
Expand Down Expand Up @@ -265,7 +274,7 @@ def recover_remote(
self.lpad.complete_launch(launch_id, m_action, "FIZZLED")

completed = True
return m_launch.fw_id, completed
return m_launch, completed

def add_wf(self, wf):
return self.lpad.add_wf(wf)
Expand Down
29 changes: 29 additions & 0 deletions src/jobflow_remote/jobs/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class JobData:
f"{REMOTE_DOC_PATH}.retry_time_limit": 1,
f"{REMOTE_DOC_PATH}.process_id": 1,
f"{REMOTE_DOC_PATH}.run_dir": 1,
f"{REMOTE_DOC_PATH}.start_time": 1,
f"{REMOTE_DOC_PATH}.end_time": 1,
"spec._tasks.worker": 1,
"spec._tasks.job.hosts": 1,
}
Expand All @@ -63,6 +65,8 @@ class JobInfo:
error_job: str | None = None
error_remote: str | None = None
host_flows_ids: list[str] = field(default_factory=lambda: list())
start_time: datetime | None = None
end_time: datetime | None = None

@classmethod
def from_fw_dict(cls, d):
Expand Down Expand Up @@ -113,6 +117,13 @@ def from_fw_dict(cls, d):
# convert to string in case the format is the one of an integer
queue_job_id = str(queue_job_id)

start_time = remote.get("start_time")
if start_time:
start_time = start_time.replace(tzinfo=timezone.utc).astimezone(tz=None)
end_time = remote.get("end_time")
if end_time:
end_time = end_time.replace(tzinfo=timezone.utc).astimezone(tz=None)

return cls(
db_id=d["fw_id"],
job_id=d["spec"]["_tasks"][0]["job"]["uuid"],
Expand All @@ -130,8 +141,26 @@ def from_fw_dict(cls, d):
error_remote=remote.get("error"),
error_job=error_job,
host_flows_ids=d["spec"]["_tasks"][0]["job"]["hosts"],
start_time=start_time,
end_time=end_time,
)

@property
def run_time(self) -> float | None:
if self.start_time and self.end_time:
return (self.end_time - self.start_time).total_seconds()

return None

@property
def estimated_run_time(self) -> float | None:
if self.start_time:
return (
datetime.now(tz=self.start_time.tzinfo) - self.start_time
).total_seconds()

return None


flow_info_projection = {
"fws.fw_id": 1,
Expand Down
17 changes: 15 additions & 2 deletions src/jobflow_remote/jobs/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,14 +476,21 @@ def complete_launch(self, doc):
# TODO add ping data?
remote_store = get_remote_store(store, local_path)
remote_store.connect()
fw_id, completed = self.rlpad.recover_remote(
launch, completed = self.rlpad.recover_remote(
remote_status=remote_data,
store=store,
remote_store=remote_store,
save=save,
launch_id=remote_doc["launch_id"],
terminated=True,
)

set_output = {
"$set": {
f"{REMOTE_DOC_PATH}.start_time": launch.time_start or None,
f"{REMOTE_DOC_PATH}.end_time": launch.time_end or None,
}
}
except json.JSONDecodeError:
# if an empty file is copied this error can appear, do not retry
err_msg = traceback.format_exc()
Expand All @@ -497,7 +504,7 @@ def complete_launch(self, doc):
err_msg = "the parsed output does not contain the required information to complete the job"
return err_msg, True, None

return None, False, None
return None, False, set_output

def check_run_status(self):
logger.debug("check_run_status")
Expand Down Expand Up @@ -549,11 +556,13 @@ def check_run_status(self):
qstate = qjob.state if qjob else None
collection = self.rlpad.fireworks
next_state = None
start_time = None
if (
qstate == QState.RUNNING
and remote_doc["state"] == RemoteState.SUBMITTED.value
):
next_state = RemoteState.RUNNING
start_time = datetime.utcnow()
logger.debug(
f"remote job with id {remote_doc['process_id']} is running"
)
Expand Down Expand Up @@ -588,6 +597,10 @@ def check_run_status(self):
else None
}
}
if start_time:
set_output["$set"][
f"{REMOTE_DOC_PATH}.start_time"
] = start_time
lock.update_on_release = self._prepare_lock_update(
doc, error, False, set_output, next_state
)
Expand Down

0 comments on commit a6f8915

Please sign in to comment.