Skip to content

Commit

Permalink
disable job_ids options in the CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
gpetretto committed Aug 16, 2023
1 parent 808ca4a commit 8d32d9f
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 34 deletions.
4 changes: 2 additions & 2 deletions src/jobflow_remote/cli/formatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def get_job_info_table(jobs_info: list[JobInfo], verbosity: int):
table.add_column("DB id")
table.add_column("Name")
table.add_column("State [Remote]")
table.add_column("Job id")
table.add_column("Job id (Index)")

table.add_column("Worker")
table.add_column("Last updated")
Expand Down Expand Up @@ -50,7 +50,7 @@ def get_job_info_table(jobs_info: list[JobInfo], verbosity: int):
str(ji.db_id),
ji.name,
Text.from_markup(state),
ji.job_id,
f"{ji.job_id} ({ji.job_index})",
ji.worker,
ji.last_updated.strftime(fmt_datetime),
]
Expand Down
66 changes: 36 additions & 30 deletions src/jobflow_remote/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
from jobflow_remote.cli.jfr_typer import JFRTyper
from jobflow_remote.cli.types import (
days_opt,
db_id_flag_opt,
db_id_arg,
db_ids_opt,
end_date_opt,
job_id_arg,
job_ids_opt,
job_state_opt,
locked_opt,
Expand All @@ -32,7 +31,6 @@
check_incompatible_opt,
exit_with_error_msg,
exit_with_warning_msg,
get_job_db_ids,
loading_spinner,
out_console,
print_success_msg,
Expand Down Expand Up @@ -118,8 +116,9 @@ def jobs_list(

@app_job.command(name="info")
def job_info(
job_id: job_id_arg,
db_id: db_id_flag_opt = False,
db_id: db_id_arg,
# job_id: job_id_arg,
# db_id: db_id_flag_opt = False,
with_error: Annotated[
bool,
typer.Option(
Expand All @@ -145,11 +144,11 @@ def job_info(

jc = JobController()

db_id_value, job_id_value = get_job_db_ids(db_id, job_id)
# db_id_value, job_id_value = get_job_db_ids(db_id, job_id)

job_info = jc.get_job_info(
job_id=job_id_value,
db_id=db_id_value,
job_id=None,
db_id=db_id,
full=with_error,
)
if not job_info:
Expand All @@ -160,20 +159,21 @@ def job_info(

@app_job.command()
def reset_failed(
job_id: job_id_arg,
db_id: db_id_flag_opt = False,
db_id: db_id_arg,
# job_id: job_id_arg,
# db_id: db_id_flag_opt = False,
):
"""
For a job with a FAILED remote state reset it to the previous state
"""
with loading_spinner():
jc = JobController()

db_id_value, job_id_value = get_job_db_ids(db_id, job_id)
# db_id_value, job_id_value = get_job_db_ids(db_id, job_id)

succeeded = jc.reset_failed_state(
job_id=job_id_value,
db_id=db_id_value,
job_id=None,
db_id=db_id,
)

if not succeeded:
Expand All @@ -184,8 +184,9 @@ def reset_failed(

@app_job.command()
def reset_remote_attempts(
job_id: job_id_arg,
db_id: db_id_flag_opt = False,
db_id: db_id_arg,
# job_id: job_id_arg,
# db_id: db_id_flag_opt = False,
):
"""
Resets the number of attempts to perform a remote action and eliminates
Expand All @@ -194,11 +195,11 @@ def reset_remote_attempts(
with loading_spinner():
jc = JobController()

db_id_value, job_id_value = get_job_db_ids(db_id, job_id)
# db_id_value, job_id_value = get_job_db_ids(db_id, job_id)

succeeded = jc.reset_remote_attempts(
job_id=job_id_value,
db_id=db_id_value,
job_id=None,
db_id=db_id,
)

if not succeeded:
Expand All @@ -209,9 +210,10 @@ def reset_remote_attempts(

@app_job.command()
def set_remote_state(
job_id: job_id_arg,
db_id: db_id_arg,
state: remote_state_arg,
db_id: db_id_flag_opt = False,
# job_id: job_id_arg,
# db_id: db_id_flag_opt = False,
):
"""
Sets the remote state to an arbitrary value.
Expand All @@ -220,12 +222,12 @@ def set_remote_state(
with loading_spinner():
jc = JobController()

db_id_value, job_id_value = get_job_db_ids(db_id, job_id)
# db_id_value, job_id_value = get_job_db_ids(db_id, job_id)

succeeded = jc.set_remote_state(
state=state,
job_id=job_id_value,
db_id=db_id_value,
job_id=None,
db_id=db_id,
)

if not succeeded:
Expand All @@ -236,7 +238,7 @@ def set_remote_state(

@app_job.command()
def rerun(
job_id: job_ids_opt = None,
# job_id: job_ids_opt = None,
db_id: db_ids_opt = None,
state: job_state_opt = None,
remote_state: remote_state_opt = None,
Expand All @@ -252,7 +254,7 @@ def rerun(

with loading_spinner():
fw_ids = jc.rerun_jobs(
job_ids=job_id,
# job_ids=job_id,
db_ids=db_id,
state=state,
remote_state=remote_state,
Expand All @@ -265,18 +267,22 @@ def rerun(

@app_job.command()
def queue_out(
job_id: job_id_arg,
db_id: db_id_flag_opt = False,
db_id: db_id_arg,
# job_id: job_id_arg,
# db_id: db_id_flag_opt = False,
):
"""
Print the content of the output files produced by the queue manager.
"""
with loading_spinner(processing=False) as progress:
progress.add_task(description="Retrieving info...", total=None)
jc = JobController()

db_id_value, job_id_value = get_job_db_ids(db_id, job_id)
# db_id_value, job_id_value = get_job_db_ids(db_id, job_id)

job_data_list = jc.get_jobs_data(
job_ids=job_id_value,
db_ids=db_id_value,
job_ids=None,
db_ids=db_id,
)

if not job_data_list:
Expand Down
4 changes: 4 additions & 0 deletions src/jobflow_remote/cli/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@

job_id_arg = Annotated[str, typer.Argument(help="The ID of the job (i.e. the uuid)")]

db_id_arg = Annotated[
int, typer.Argument(help="The DB id of the job (i.e. an integer)")
]


db_id_flag_opt = Annotated[
bool,
Expand Down
10 changes: 8 additions & 2 deletions src/jobflow_remote/fireworks/launchpad.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from fireworks.utilities.fw_serializers import reconstitute_dates, recursive_dict
from maggma.core import Store
from maggma.stores import MongoStore
from pymongo import ASCENDING
from pymongo import ASCENDING, DESCENDING
from qtoolkit.core.data_objects import QState

from jobflow_remote.jobs.state import RemoteState
Expand All @@ -22,6 +22,7 @@


FW_UUID_PATH = "spec._tasks.job.uuid"
FW_INDEX_PATH = "spec._tasks.job.index"
REMOTE_DOC_PATH = "spec.remote"
REMOTE_LOCK_PATH = f"{REMOTE_DOC_PATH}.{MongoLock.LOCK_KEY}"
REMOTE_LOCK_TIME_PATH = f"{REMOTE_DOC_PATH}.{MongoLock.LOCK_TIME_KEY}"
Expand Down Expand Up @@ -123,7 +124,12 @@ def launches(self):

def reset(self, password, require_password=True, max_reset_wo_password=25):
self.lpad.reset(password, require_password, max_reset_wo_password)
self.fireworks.create_index(FW_UUID_PATH, unique=True, background=True)
self.fireworks.create_index(FW_UUID_PATH, background=True)
self.fireworks.create_index(
[(FW_UUID_PATH, ASCENDING), (FW_INDEX_PATH, DESCENDING)],
unique=True,
background=True,
)

def forget_remote(self, fwid):
"""
Expand Down
9 changes: 9 additions & 0 deletions src/jobflow_remote/jobs/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from jobflow import Job, JobStore

from jobflow_remote.fireworks.launchpad import (
FW_INDEX_PATH,
FW_UUID_PATH,
REMOTE_DOC_PATH,
get_job_doc,
Expand All @@ -29,6 +30,7 @@ class JobData:
job_info_projection = {
"fw_id": 1,
FW_UUID_PATH: 1,
FW_INDEX_PATH: 1,
"state": 1,
f"{REMOTE_DOC_PATH}.state": 1,
"name": 1,
Expand All @@ -51,6 +53,7 @@ class JobData:
class JobInfo:
db_id: int
job_id: str
job_index: int
state: JobState
name: str
last_updated: datetime
Expand Down Expand Up @@ -129,6 +132,7 @@ def from_fw_dict(cls, d):
return cls(
db_id=d["fw_id"],
job_id=d["spec"]["_tasks"][0]["job"]["uuid"],
job_index=d["spec"]["_tasks"][0]["job"]["index"],
state=state,
name=d["name"],
last_updated=last_updated,
Expand Down Expand Up @@ -167,6 +171,7 @@ def estimated_run_time(self) -> float | None:
flow_info_projection = {
"fws.fw_id": 1,
f"fws.{FW_UUID_PATH}": 1,
f"fws.{FW_INDEX_PATH}": 1,
"fws.state": 1,
"fws.name": 1,
f"fws.{REMOTE_DOC_PATH}.state": 1,
Expand All @@ -182,6 +187,7 @@ def estimated_run_time(self) -> float | None:
class FlowInfo:
db_ids: list[int]
job_ids: list[str]
job_indexes: list[int]
flow_id: str
state: FlowState
name: str
Expand All @@ -204,11 +210,13 @@ def from_query_dict(cls, d):
job_names = []
db_ids = []
job_ids = []
job_indexes = []
for fw_doc in fws:
db_ids.append(fw_doc["fw_id"])
job_doc = get_job_doc(fw_doc)
remote_doc = get_remote_doc(fw_doc)
job_ids.append(job_doc["uuid"])
job_indexes.append(job_doc["index"])
job_names.append(fw_doc["name"])
if remote_doc:
remote_state = RemoteState(remote_doc["state"])
Expand All @@ -223,6 +231,7 @@ def from_query_dict(cls, d):
return cls(
db_ids=db_ids,
job_ids=job_ids,
job_indexes=job_indexes,
flow_id=flow_id,
state=state,
name=d["name"],
Expand Down

0 comments on commit 8d32d9f

Please sign in to comment.