From 8d32d9f680c98a2db967fdc93e66255305da92eb Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Wed, 16 Aug 2023 17:12:08 +0200 Subject: [PATCH] disable job_ids options in the CLI --- src/jobflow_remote/cli/formatting.py | 4 +- src/jobflow_remote/cli/job.py | 66 ++++++++++++----------- src/jobflow_remote/cli/types.py | 4 ++ src/jobflow_remote/fireworks/launchpad.py | 10 +++- src/jobflow_remote/jobs/data.py | 9 ++++ 5 files changed, 59 insertions(+), 34 deletions(-) diff --git a/src/jobflow_remote/cli/formatting.py b/src/jobflow_remote/cli/formatting.py index 36a2c646..ab73086a 100644 --- a/src/jobflow_remote/cli/formatting.py +++ b/src/jobflow_remote/cli/formatting.py @@ -19,7 +19,7 @@ def get_job_info_table(jobs_info: list[JobInfo], verbosity: int): table.add_column("DB id") table.add_column("Name") table.add_column("State [Remote]") - table.add_column("Job id") + table.add_column("Job id (Index)") table.add_column("Worker") table.add_column("Last updated") @@ -50,7 +50,7 @@ def get_job_info_table(jobs_info: list[JobInfo], verbosity: int): str(ji.db_id), ji.name, Text.from_markup(state), - ji.job_id, + f"{ji.job_id} ({ji.job_index})", ji.worker, ji.last_updated.strftime(fmt_datetime), ] diff --git a/src/jobflow_remote/cli/job.py b/src/jobflow_remote/cli/job.py index dd5ed5ee..a262d406 100644 --- a/src/jobflow_remote/cli/job.py +++ b/src/jobflow_remote/cli/job.py @@ -11,10 +11,9 @@ from jobflow_remote.cli.jfr_typer import JFRTyper from jobflow_remote.cli.types import ( days_opt, - db_id_flag_opt, + db_id_arg, db_ids_opt, end_date_opt, - job_id_arg, job_ids_opt, job_state_opt, locked_opt, @@ -32,7 +31,6 @@ check_incompatible_opt, exit_with_error_msg, exit_with_warning_msg, - get_job_db_ids, loading_spinner, out_console, print_success_msg, @@ -118,8 +116,9 @@ def jobs_list( @app_job.command(name="info") def job_info( - job_id: job_id_arg, - db_id: db_id_flag_opt = False, + db_id: db_id_arg, + # job_id: job_id_arg, + # db_id: db_id_flag_opt = False, with_error: Annotated[ bool, typer.Option( @@ -145,11 +144,11 @@ def job_info( jc = JobController() - db_id_value, job_id_value = get_job_db_ids(db_id, job_id) + # db_id_value, job_id_value = get_job_db_ids(db_id, job_id) job_info = jc.get_job_info( - job_id=job_id_value, - db_id=db_id_value, + job_id=None, + db_id=db_id, full=with_error, ) if not job_info: @@ -160,8 +159,9 @@ def job_info( @app_job.command() def reset_failed( - job_id: job_id_arg, - db_id: db_id_flag_opt = False, + db_id: db_id_arg, + # job_id: job_id_arg, + # db_id: db_id_flag_opt = False, ): """ For a job with a FAILED remote state reset it to the previous state @@ -169,11 +169,11 @@ def reset_failed( with loading_spinner(): jc = JobController() - db_id_value, job_id_value = get_job_db_ids(db_id, job_id) + # db_id_value, job_id_value = get_job_db_ids(db_id, job_id) succeeded = jc.reset_failed_state( - job_id=job_id_value, - db_id=db_id_value, + job_id=None, + db_id=db_id, ) if not succeeded: @@ -184,8 +184,9 @@ def reset_failed( @app_job.command() def reset_remote_attempts( - job_id: job_id_arg, - db_id: db_id_flag_opt = False, + db_id: db_id_arg, + # job_id: job_id_arg, + # db_id: db_id_flag_opt = False, ): """ Resets the number of attempts to perform a remote action and eliminates @@ -194,11 +195,11 @@ def reset_remote_attempts( with loading_spinner(): jc = JobController() - db_id_value, job_id_value = get_job_db_ids(db_id, job_id) + # db_id_value, job_id_value = get_job_db_ids(db_id, job_id) succeeded = jc.reset_remote_attempts( - job_id=job_id_value, - db_id=db_id_value, + job_id=None, + db_id=db_id, ) if not succeeded: @@ -209,9 +210,10 @@ def reset_remote_attempts( @app_job.command() def set_remote_state( - job_id: job_id_arg, + db_id: db_id_arg, state: remote_state_arg, - db_id: db_id_flag_opt = False, + # job_id: job_id_arg, + # db_id: db_id_flag_opt = False, ): """ Sets the remote state to an arbitrary value. @@ -220,12 +222,12 @@ def set_remote_state( with loading_spinner(): jc = JobController() - db_id_value, job_id_value = get_job_db_ids(db_id, job_id) + # db_id_value, job_id_value = get_job_db_ids(db_id, job_id) succeeded = jc.set_remote_state( state=state, - job_id=job_id_value, - db_id=db_id_value, + job_id=None, + db_id=db_id, ) if not succeeded: @@ -236,7 +238,7 @@ def set_remote_state( @app_job.command() def rerun( - job_id: job_ids_opt = None, + # job_id: job_ids_opt = None, db_id: db_ids_opt = None, state: job_state_opt = None, remote_state: remote_state_opt = None, @@ -252,7 +254,7 @@ def rerun( with loading_spinner(): fw_ids = jc.rerun_jobs( - job_ids=job_id, + # job_ids=job_id, db_ids=db_id, state=state, remote_state=remote_state, @@ -265,18 +267,22 @@ def rerun( @app_job.command() def queue_out( - job_id: job_id_arg, - db_id: db_id_flag_opt = False, + db_id: db_id_arg, + # job_id: job_id_arg, + # db_id: db_id_flag_opt = False, ): + """ + Print the content of the output files produced by the queue manager. + """ with loading_spinner(processing=False) as progress: progress.add_task(description="Retrieving info...", total=None) jc = JobController() - db_id_value, job_id_value = get_job_db_ids(db_id, job_id) + # db_id_value, job_id_value = get_job_db_ids(db_id, job_id) job_data_list = jc.get_jobs_data( - job_ids=job_id_value, - db_ids=db_id_value, + job_ids=None, + db_ids=db_id, ) if not job_data_list: diff --git a/src/jobflow_remote/cli/types.py b/src/jobflow_remote/cli/types.py index 33d2eccc..9f8d4c75 100644 --- a/src/jobflow_remote/cli/types.py +++ b/src/jobflow_remote/cli/types.py @@ -152,6 +152,10 @@ job_id_arg = Annotated[str, typer.Argument(help="The ID of the job (i.e. the uuid)")] +db_id_arg = Annotated[ + int, typer.Argument(help="The DB id of the job (i.e. an integer)") +] + db_id_flag_opt = Annotated[ bool, diff --git a/src/jobflow_remote/fireworks/launchpad.py b/src/jobflow_remote/fireworks/launchpad.py index c3b0d35b..1ca03bef 100644 --- a/src/jobflow_remote/fireworks/launchpad.py +++ b/src/jobflow_remote/fireworks/launchpad.py @@ -10,7 +10,7 @@ from fireworks.utilities.fw_serializers import reconstitute_dates, recursive_dict from maggma.core import Store from maggma.stores import MongoStore -from pymongo import ASCENDING +from pymongo import ASCENDING, DESCENDING from qtoolkit.core.data_objects import QState from jobflow_remote.jobs.state import RemoteState @@ -22,6 +22,7 @@ FW_UUID_PATH = "spec._tasks.job.uuid" +FW_INDEX_PATH = "spec._tasks.job.index" REMOTE_DOC_PATH = "spec.remote" REMOTE_LOCK_PATH = f"{REMOTE_DOC_PATH}.{MongoLock.LOCK_KEY}" REMOTE_LOCK_TIME_PATH = f"{REMOTE_DOC_PATH}.{MongoLock.LOCK_TIME_KEY}" @@ -123,7 +124,12 @@ def launches(self): def reset(self, password, require_password=True, max_reset_wo_password=25): self.lpad.reset(password, require_password, max_reset_wo_password) - self.fireworks.create_index(FW_UUID_PATH, unique=True, background=True) + self.fireworks.create_index(FW_UUID_PATH, background=True) + self.fireworks.create_index( + [(FW_UUID_PATH, ASCENDING), (FW_INDEX_PATH, DESCENDING)], + unique=True, + background=True, + ) def forget_remote(self, fwid): """ diff --git a/src/jobflow_remote/jobs/data.py b/src/jobflow_remote/jobs/data.py index 9d515502..274ff736 100644 --- a/src/jobflow_remote/jobs/data.py +++ b/src/jobflow_remote/jobs/data.py @@ -6,6 +6,7 @@ from jobflow import Job, JobStore from jobflow_remote.fireworks.launchpad import ( + FW_INDEX_PATH, FW_UUID_PATH, REMOTE_DOC_PATH, get_job_doc, @@ -29,6 +30,7 @@ class JobData: job_info_projection = { "fw_id": 1, FW_UUID_PATH: 1, + FW_INDEX_PATH: 1, "state": 1, f"{REMOTE_DOC_PATH}.state": 1, "name": 1, @@ -51,6 +53,7 @@ class JobData: class JobInfo: db_id: int job_id: str + job_index: int state: JobState name: str last_updated: datetime @@ -129,6 +132,7 @@ def from_fw_dict(cls, d): return cls( db_id=d["fw_id"], job_id=d["spec"]["_tasks"][0]["job"]["uuid"], + job_index=d["spec"]["_tasks"][0]["job"]["index"], state=state, name=d["name"], last_updated=last_updated, @@ -167,6 +171,7 @@ def estimated_run_time(self) -> float | None: flow_info_projection = { "fws.fw_id": 1, f"fws.{FW_UUID_PATH}": 1, + f"fws.{FW_INDEX_PATH}": 1, "fws.state": 1, "fws.name": 1, f"fws.{REMOTE_DOC_PATH}.state": 1, @@ -182,6 +187,7 @@ def estimated_run_time(self) -> float | None: class FlowInfo: db_ids: list[int] job_ids: list[str] + job_indexes: list[int] flow_id: str state: FlowState name: str @@ -204,11 +210,13 @@ def from_query_dict(cls, d): job_names = [] db_ids = [] job_ids = [] + job_indexes = [] for fw_doc in fws: db_ids.append(fw_doc["fw_id"]) job_doc = get_job_doc(fw_doc) remote_doc = get_remote_doc(fw_doc) job_ids.append(job_doc["uuid"]) + job_indexes.append(job_doc["index"]) job_names.append(fw_doc["name"]) if remote_doc: remote_state = RemoteState(remote_doc["state"]) @@ -223,6 +231,7 @@ def from_query_dict(cls, d): return cls( db_ids=db_ids, job_ids=job_ids, + job_indexes=job_indexes, flow_id=flow_id, state=state, name=d["name"],