From 994ed7f7d8cc10cd7ce82c17fb4fdcc89450d892 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Fri, 25 Aug 2023 00:13:44 +0200 Subject: [PATCH 1/3] jf flow info and other cli updates --- src/jobflow_remote/cli/flow.py | 43 ++++++++++++++++- src/jobflow_remote/cli/formatting.py | 27 +++++++++++ src/jobflow_remote/cli/job.py | 11 +++++ src/jobflow_remote/cli/types.py | 43 ++++++++++++++++- src/jobflow_remote/cli/utils.py | 17 +++++++ src/jobflow_remote/fireworks/launchpad.py | 1 + src/jobflow_remote/jobs/jobcontroller.py | 57 ++++++++++++++++++++++- 7 files changed, 194 insertions(+), 5 deletions(-) diff --git a/src/jobflow_remote/cli/flow.py b/src/jobflow_remote/cli/flow.py index eedaf300..7f453cdf 100644 --- a/src/jobflow_remote/cli/flow.py +++ b/src/jobflow_remote/cli/flow.py @@ -5,19 +5,21 @@ from rich.text import Text from jobflow_remote import SETTINGS -from jobflow_remote.cli.formatting import get_flow_info_table +from jobflow_remote.cli.formatting import format_flow_info, get_flow_info_table from jobflow_remote.cli.jf import app from jobflow_remote.cli.jfr_typer import JFRTyper from jobflow_remote.cli.types import ( days_opt, db_ids_opt, end_date_opt, + flow_db_id_arg, flow_ids_opt, flow_state_opt, force_opt, + job_flow_id_flag_opt, job_ids_opt, - job_state_opt, max_results_opt, + name_opt, reverse_sort_flag_opt, sort_opt, start_date_opt, @@ -26,7 +28,9 @@ from jobflow_remote.cli.utils import ( SortOption, check_incompatible_opt, + exit_with_error_msg, exit_with_warning_msg, + get_job_db_ids, loading_spinner, out_console, ) @@ -97,6 +101,7 @@ def delete( state: flow_state_opt = None, start_date: start_date_opt = None, end_date: end_date_opt = None, + name: name_opt = None, days: days_opt = None, force: force_opt = False, ): @@ -117,6 +122,7 @@ def delete( state=state, start_date=start_date, end_date=end_date, + name=name, ) if not flows_info: @@ -140,3 +146,36 @@ def delete( out_console.print( f"Deleted Flow(s) with db_id: {', '.join(str(i) for i in to_delete)}" ) + + +@app_flow.command(name="info") +def flow_info( + flow_db_id: flow_db_id_arg, + job_id_flag: job_flow_id_flag_opt = False, +): + """ + Provide detailed information on a Flow + """ + db_id, jf_id = get_job_db_ids(flow_db_id, None) + db_ids = job_ids = flow_ids = None + if db_id is not None: + db_ids = [db_id] + elif job_id_flag: + job_ids = [jf_id] + else: + flow_ids = [jf_id] + + with loading_spinner(): + + jc = JobController() + + flows_info = jc.get_flows_info( + job_ids=job_ids, + db_ids=db_ids, + flow_ids=flow_ids, + limit=1, + ) + if not flows_info: + exit_with_error_msg("No data matching the request") + + out_console.print(format_flow_info(flows_info[0])) diff --git a/src/jobflow_remote/cli/formatting.py b/src/jobflow_remote/cli/formatting.py index c0928cf4..7f942481 100644 --- a/src/jobflow_remote/cli/formatting.py +++ b/src/jobflow_remote/cli/formatting.py @@ -141,6 +141,33 @@ def format_job_info(job_info: JobInfo, show_none: bool = False): return render_scope(d) +def format_flow_info(flow_info: FlowInfo): + + title = f"Flow: {flow_info.name} - {flow_info.flow_id} - {flow_info.state.name}" + table = Table(title=title) + table.title_style = "bold" + table.add_column("DB id") + table.add_column("Name") + table.add_column("State [Remote]") + table.add_column("Job id (Index)") + table.add_column("Worker") + + for i, job_id in enumerate(flow_info.job_ids): + state = flow_info.job_states[i].name + + row = [ + str(flow_info.db_ids[i]), + flow_info.job_names[i], + state, + f"{job_id} ({flow_info.job_indexes[i]})", + flow_info.workers[i], + ] + + table.add_row(*row) + + return table + + def get_exec_config_table(exec_config: dict[str, ExecutionConfig], verbosity: int = 0): table = Table(title="Execution config", show_lines=verbosity > 0) table.add_column("Name") diff --git a/src/jobflow_remote/cli/job.py b/src/jobflow_remote/cli/job.py index 60b4f509..0519fc8c 100644 --- a/src/jobflow_remote/cli/job.py +++ b/src/jobflow_remote/cli/job.py @@ -13,12 +13,15 @@ days_opt, db_ids_opt, end_date_opt, + flow_ids_opt, job_db_id_arg, job_ids_indexes_opt, job_index_arg, job_state_opt, locked_opt, max_results_opt, + metadata_opt, + name_opt, query_opt, remote_state_arg, remote_state_opt, @@ -30,6 +33,7 @@ from jobflow_remote.cli.utils import ( SortOption, check_incompatible_opt, + convert_metadata, exit_with_error_msg, exit_with_warning_msg, get_job_db_ids, @@ -53,10 +57,13 @@ def jobs_list( job_id: job_ids_indexes_opt = None, db_id: db_ids_opt = None, + flow_id: flow_ids_opt = None, state: job_state_opt = None, remote_state: remote_state_opt = None, start_date: start_date_opt = None, end_date: end_date_opt = None, + name: name_opt = None, + metadata: metadata_opt = None, days: days_opt = None, verbosity: verbosity_opt = 0, max_results: max_results_opt = 100, @@ -71,6 +78,7 @@ def jobs_list( check_incompatible_opt({"state": state, "remote-state": remote_state}) check_incompatible_opt({"start_date": start_date, "days": days}) check_incompatible_opt({"end_date": end_date, "days": days}) + metadata_dict = convert_metadata(metadata) job_ids_indexes = get_job_ids_indexes(job_id) @@ -92,11 +100,14 @@ def jobs_list( jobs_info = jc.get_jobs_info( job_ids=job_ids_indexes, db_ids=db_id, + flow_ids=flow_id, state=state, remote_state=remote_state, start_date=start_date, locked=locked, end_date=end_date, + name=name, + metadata=metadata_dict, limit=max_results, sort=sort, ) diff --git a/src/jobflow_remote/cli/types.py b/src/jobflow_remote/cli/types.py index 5dc6ec19..3cd5b301 100644 --- a/src/jobflow_remote/cli/types.py +++ b/src/jobflow_remote/cli/types.py @@ -82,6 +82,28 @@ ), ] +name_opt = Annotated[ + Optional[str], + typer.Option( + "--name", + "-n", + help="The name. A regex can be passed (e.g. .*test.*)", + ), +] + + +metadata_opt = Annotated[ + Optional[str], + typer.Option( + "--metadata", + "-meta", + help="A string representing the metadata to be queried. Can be either" + " a single key=value pair or a string with the JSON representation " + "of a dictionary containing the mongoDB query for the metadata " + 'subdocument (e.g \'{"key1.key2": 1, "key3": "test"}\')', + ), +] + remote_state_arg = Annotated[ RemoteState, typer.Argument(help="One of the remote states") @@ -176,7 +198,7 @@ job_db_id_arg = Annotated[ str, typer.Argument( - help="The ID of the job can the db id (i.e. an integer) or a string (i.e. the uuid)", + help="The ID of the job. Can be the db id (i.e. an integer) or a string (i.e. the uuid)", metavar="ID", ), ] @@ -189,6 +211,15 @@ ] +flow_db_id_arg = Annotated[ + str, + typer.Argument( + help="The ID of the flow. Can the db id (i.e. an integer) or a string (i.e. the uuid)", + metavar="ID", + ), +] + + force_opt = Annotated[ bool, typer.Option( @@ -199,6 +230,16 @@ ] +job_flow_id_flag_opt = Annotated[ + bool, + typer.Option( + "--job", + "-j", + help="The passed ID will be the ID of one of the jobs" + " belonging to the flow, instead of the ID of the flow.", + ), +] + locked_opt = Annotated[ bool, typer.Option( diff --git a/src/jobflow_remote/cli/utils.py b/src/jobflow_remote/cli/utils.py index 5360eb08..8b3ff10b 100644 --- a/src/jobflow_remote/cli/utils.py +++ b/src/jobflow_remote/cli/utils.py @@ -1,6 +1,7 @@ from __future__ import annotations import functools +import json import uuid from contextlib import contextmanager from enum import Enum @@ -184,3 +185,19 @@ def check_valid_uuid(uuid_str): pass raise typer.BadParameter(f"UUID {uuid_str} is in the wrong format.") + + +def convert_metadata(string_metadata: str | None) -> dict | None: + if not string_metadata: + return None + + try: + metadata = json.loads(string_metadata) + except json.JSONDecodeError: + split = string_metadata.split("=") + if len(split) != 2: + raise typer.BadParameter(f"Wrong format for metadata {string_metadata}") + + metadata = {split[0]: split[1]} + + return metadata diff --git a/src/jobflow_remote/fireworks/launchpad.py b/src/jobflow_remote/fireworks/launchpad.py index d94557c4..f4a81a51 100644 --- a/src/jobflow_remote/fireworks/launchpad.py +++ b/src/jobflow_remote/fireworks/launchpad.py @@ -21,6 +21,7 @@ logger = logging.getLogger(__name__) +FW_JOB_PATH = "spec._tasks.job" FW_UUID_PATH = "spec._tasks.job.uuid" FW_INDEX_PATH = "spec._tasks.job.index" REMOTE_DOC_PATH = "spec.remote" diff --git a/src/jobflow_remote/jobs/jobcontroller.py b/src/jobflow_remote/jobs/jobcontroller.py index 11ddda94..5345237c 100644 --- a/src/jobflow_remote/jobs/jobcontroller.py +++ b/src/jobflow_remote/jobs/jobcontroller.py @@ -14,6 +14,7 @@ from jobflow_remote.config.manager import ConfigManager from jobflow_remote.fireworks.launchpad import ( FW_INDEX_PATH, + FW_JOB_PATH, FW_UUID_PATH, REMOTE_DOC_PATH, RemoteLaunchPad, @@ -53,7 +54,7 @@ def get_job_data( load_output: bool = False, ): fw, remote_run = self.rlpad.get_fw_remote_run_from_id( - job_id=job_id, fw_id=db_id + job_id=job_id, fw_id=db_id, job_index=job_index ) job = fw.tasks[0].get("job") state = JobState.from_states(fw.state, remote_run.state if remote_run else None) @@ -68,11 +69,14 @@ def _build_query_fw( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: int | list[int] | None = None, + flow_ids: str | list[str] | None = None, state: JobState | None = None, remote_state: RemoteState | None = None, locked: bool = False, start_date: datetime | None = None, end_date: datetime | None = None, + name: str | None = None, + metadata: dict | None = None, ) -> dict: if state is not None and remote_state is not None: raise ValueError("state and remote_state cannot be queried simultaneously") @@ -84,6 +88,8 @@ def _build_query_fw( job_ids = cast(list[tuple[str, int]], [job_ids]) if db_ids is not None and not isinstance(db_ids, (list, tuple)): db_ids = [db_ids] + if flow_ids and not isinstance(flow_ids, (list, tuple)): + flow_ids = [flow_ids] query: dict = {} @@ -96,6 +102,9 @@ def _build_query_fw( or_list.append({FW_UUID_PATH: job_id, FW_INDEX_PATH: job_index}) query["$or"] = or_list + if flow_ids: + query[f"{FW_JOB_PATH}.hosts"] = {"$in": flow_ids} + if state: fw_states, remote_state = state.to_states() query["state"] = {"$in": fw_states} @@ -115,6 +124,15 @@ def _build_query_fw( if locked: query[f"{REMOTE_DOC_PATH}.{MongoLock.LOCK_KEY}"] = {"$exists": True} + if name: + query["name"] = {"$regex": name} + + if metadata: + metadata_dict = { + f"{FW_JOB_PATH}.metadata.{k}": v for k, v in metadata.items() + } + query.update(metadata_dict) + return query def _build_query_wf( @@ -125,6 +143,7 @@ def _build_query_wf( state: FlowState | None = None, start_date: datetime | None = None, end_date: datetime | None = None, + name: str | None = None, ) -> dict: if job_ids is not None and not isinstance(job_ids, (list, tuple)): @@ -166,7 +185,12 @@ def _build_query_wf( elif state == FlowState.FAILED: query["$or"] = [ {"state": "FIZZLED"}, - {"$and": [{"state": "DEFUSED"}, {"fws.state": {"$in": ["FIZZLED"]}}]}, + { + "$and": [ + {"state": "DEFUSED"}, + {"fws.state": {"$in": ["FIZZLED"]}}, + ] + }, { f"fws.{REMOTE_DOC_PATH}.state": { "$in": [JobState.FAILED.value, JobState.REMOTE_ERROR.value] @@ -187,16 +211,22 @@ def _build_query_wf( end_date_str = end_date.astimezone(timezone.utc) query["updated_on"] = {"$lte": end_date_str} + if name: + query["name"] = {"$regex": name} + return query def get_jobs_data( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: int | list[int] | None = None, + flow_ids: str | list[str] | None = None, state: JobState | None = None, remote_state: RemoteState | None = None, start_date: datetime | None = None, end_date: datetime | None = None, + name: str | None = None, + metadata: dict | None = None, sort: dict | None = None, limit: int = 0, load_output: bool = False, @@ -204,10 +234,13 @@ def get_jobs_data( query = self._build_query_fw( job_ids=job_ids, db_ids=db_ids, + flow_ids=flow_ids, state=state, remote_state=remote_state, start_date=start_date, end_date=end_date, + name=name, + metadata=metadata, ) data = self.rlpad.fireworks.find(query, sort=sort, limit=limit) @@ -261,10 +294,13 @@ def get_jobs_info( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: int | list[int] | None = None, + flow_ids: str | list[str] | None = None, state: JobState | None = None, remote_state: RemoteState | None = None, start_date: datetime | None = None, end_date: datetime | None = None, + name: str | None = None, + metadata: dict | None = None, locked: bool = False, sort: list[tuple] | None = None, limit: int = 0, @@ -272,11 +308,14 @@ def get_jobs_info( query = self._build_query_fw( job_ids=job_ids, db_ids=db_ids, + flow_ids=flow_ids, state=state, remote_state=remote_state, locked=locked, start_date=start_date, end_date=end_date, + name=name, + metadata=metadata, ) return self.get_jobs_info_query(query=query, sort=sort, limit=limit) @@ -317,20 +356,26 @@ def rerun_jobs( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: int | list[int] | None = None, + flow_ids: str | list[str] | None = None, state: JobState | None = None, remote_state: RemoteState | None = None, start_date: datetime | None = None, end_date: datetime | None = None, + name: str | None = None, + metadata: dict | None = None, sort: dict | None = None, limit: int = 0, ) -> list[int]: query = self._build_query_fw( job_ids=job_ids, db_ids=db_ids, + flow_ids=flow_ids, state=state, remote_state=remote_state, start_date=start_date, end_date=end_date, + name=name, + metadata=metadata, ) fw_ids = self.rlpad.get_fw_ids(query=query, sort=sort, limit=limit) @@ -401,6 +446,7 @@ def get_flows_info( state: FlowState | None = None, start_date: datetime | None = None, end_date: datetime | None = None, + name: str | None = None, sort: list[tuple] | None = None, limit: int = 0, ) -> list[FlowInfo]: @@ -411,6 +457,7 @@ def get_flows_info( state=state, start_date=start_date, end_date=end_date, + name=name, ) data = self.rlpad.get_wf_fw_data( @@ -455,19 +502,25 @@ def remove_lock( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: int | list[int] | None = None, + flow_ids: str | list[str] | None = None, state: JobState | None = None, remote_state: RemoteState | None = None, start_date: datetime | None = None, end_date: datetime | None = None, + name: str | None = None, + metadata: dict | None = None, ) -> int: query = self._build_query_fw( job_ids=job_ids, db_ids=db_ids, + flow_ids=flow_ids, state=state, remote_state=remote_state, start_date=start_date, end_date=end_date, locked=True, + name=name, + metadata=metadata, ) return self.rlpad.remove_lock(query=query) From ff181ba2cbaa3d4b4d617c0e8935d0acd35ee628 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Fri, 25 Aug 2023 14:35:54 +0200 Subject: [PATCH 2/3] fix job query --- src/jobflow_remote/jobs/jobcontroller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/jobflow_remote/jobs/jobcontroller.py b/src/jobflow_remote/jobs/jobcontroller.py index 5345237c..8c184e89 100644 --- a/src/jobflow_remote/jobs/jobcontroller.py +++ b/src/jobflow_remote/jobs/jobcontroller.py @@ -60,7 +60,7 @@ def get_job_data( state = JobState.from_states(fw.state, remote_run.state if remote_run else None) output = None jobstore = fw.tasks[0].get("store") or self.jobstore - if load_output and state == RemoteState.COMPLETED: + if load_output and state == JobState.COMPLETED: output = jobstore.query_one({"uuid": job_id}, load=True) return JobData(job=job, state=state, db_id=fw.fw_id, output=output) @@ -258,7 +258,7 @@ def get_jobs_data( info = JobInfo.from_fw_dict(fw_dict) output = None - if state == RemoteState.COMPLETED and load_output: + if state == JobState.COMPLETED and load_output: output = store.query_one({"uuid": job.uuid}, load=True) jobs_data.append( JobData( From 9605bf6de16e4ac3c69ce5c91b24ed023eda71b9 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Mon, 28 Aug 2023 15:37:49 +0200 Subject: [PATCH 3/3] modify name search field --- src/jobflow_remote/cli/types.py | 2 +- src/jobflow_remote/jobs/jobcontroller.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/jobflow_remote/cli/types.py b/src/jobflow_remote/cli/types.py index 3cd5b301..2a2fbff8 100644 --- a/src/jobflow_remote/cli/types.py +++ b/src/jobflow_remote/cli/types.py @@ -87,7 +87,7 @@ typer.Option( "--name", "-n", - help="The name. A regex can be passed (e.g. .*test.*)", + help="The name. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*)", ), ] diff --git a/src/jobflow_remote/jobs/jobcontroller.py b/src/jobflow_remote/jobs/jobcontroller.py index 8c184e89..68425e21 100644 --- a/src/jobflow_remote/jobs/jobcontroller.py +++ b/src/jobflow_remote/jobs/jobcontroller.py @@ -1,5 +1,6 @@ from __future__ import annotations +import fnmatch import io import logging from contextlib import redirect_stdout @@ -125,7 +126,7 @@ def _build_query_fw( query[f"{REMOTE_DOC_PATH}.{MongoLock.LOCK_KEY}"] = {"$exists": True} if name: - query["name"] = {"$regex": name} + query["name"] = {"$regex": fnmatch.translate(name)} if metadata: metadata_dict = { @@ -212,7 +213,7 @@ def _build_query_wf( query["updated_on"] = {"$lte": end_date_str} if name: - query["name"] = {"$regex": name} + query["name"] = {"$regex": fnmatch.translate(name)} return query