From 97b76146556077547eaefa9790af1b86020a357b Mon Sep 17 00:00:00 2001 From: Peter Kraus Date: Mon, 27 Jun 2022 14:46:23 +0200 Subject: [PATCH] Implement `--jobname` and `-j` as arguments to `ketchup submit` (#30) * jobname code * jobname tests * black --- src/tomato/daemon/main.py | 2 +- src/tomato/dbhandler/sqlite.py | 37 ++++++++++++++++---- src/tomato/drivers/biologic/main.py | 4 +-- src/tomato/ketchup/functions.py | 54 +++++++++++++++++++---------- src/tomato/main.py | 6 ++++ tests/test_dummy.py | 23 ++++++++++++ tests/utils.py | 23 ++++++++---- 7 files changed, 115 insertions(+), 34 deletions(-) diff --git a/src/tomato/daemon/main.py b/src/tomato/daemon/main.py index 13b371d..ad99eda 100644 --- a/src/tomato/daemon/main.py +++ b/src/tomato/daemon/main.py @@ -61,7 +61,7 @@ def main_loop(settings: dict, pipelines: dict, test: bool = False) -> None: # check existing jobs in queue ret = dbhandler.job_get_all(qup, type=qut) - for jobid, strpl, st in ret: + for jobid, jobname, strpl, st in ret: payload = json.loads(strpl) if st in ["q", "qw"]: if st == "q": diff --git a/src/tomato/dbhandler/sqlite.py b/src/tomato/dbhandler/sqlite.py index 0ed25bd..a76decb 100644 --- a/src/tomato/dbhandler/sqlite.py +++ b/src/tomato/dbhandler/sqlite.py @@ -22,6 +22,7 @@ def queue_setup( dbpath: str, type: str = "sqlite3", ) -> None: + user_version = 1 conn, cur = get_db_conn(dbpath, type) log.debug(f"attempting to find table 'queue' in '{dbpath}'") cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='queue';") @@ -29,6 +30,17 @@ def queue_setup( conn.close() if exists: log.debug(f"table 'queue' present at '{dbpath}'") + conn, cur = get_db_conn(dbpath, type) + cur.execute("PRAGMA user_version;") + curr_version = cur.fetchone()[0] + while curr_version < user_version: + if curr_version == 0: + log.info("upgrading table 'queue' from version 0 to 1") + cur.execute("ALTER TABLE queue ADD COLUMN jobname TEXT;") + cur.execute("PRAGMA user_version = 1;") + conn.commit() + cur.execute("PRAGMA user_version;") + curr_version = cur.fetchone()[0] else: log.warning(f"creating a new {type} 'queue' table at '{dbpath}'") conn, cur = get_db_conn(dbpath, type) @@ -39,9 +51,11 @@ def queue_setup( " status TEXT NOT NULL," " submitted_at TEXT NOT NULL," " executed_at TEXT," - " completed_at TEXT" + " completed_at TEXT," + " jobname TEXT" ");" ) + cur.execute(f"PRAGMA user_version = {user_version};") conn.commit() conn.close() @@ -93,7 +107,8 @@ def job_get_info( ) -> tuple: conn, cur = get_db_conn(dbpath, type) cur.execute( - "SELECT payload, status, submitted_at, executed_at, completed_at FROM queue " + "SELECT jobname, payload, status, submitted_at, executed_at, completed_at " + "FROM queue " f"WHERE jobid = {jobid};" ) ret = cur.fetchone() @@ -119,7 +134,7 @@ def job_get_all( type: str = "sqlite3", ) -> list[tuple]: conn, cur = get_db_conn(dbpath, type) - cur.execute("SELECT jobid, payload, status FROM queue;") + cur.execute("SELECT jobid, jobname, payload, status FROM queue;") ret = cur.fetchall() conn.close() return ret @@ -252,14 +267,22 @@ def queue_payload( dbpath: str, pstr: str, type: str = "sqlite3", + jobname: str = None, ) -> tuple: conn, cur = get_db_conn(dbpath, type) log.info(f"inserting a new job into 'state'") submitted_at = str(datetime.now(timezone.utc)) - cur.execute( - "INSERT INTO queue (payload, status, submitted_at)" "VALUES (?, ?, ?);", - (pstr, "q", submitted_at), - ) + if jobname is None: + cur.execute( + "INSERT INTO queue (payload, status, submitted_at)" "VALUES (?, ?, ?);", + (pstr, "q", submitted_at), + ) + else: + cur.execute( + "INSERT INTO queue (payload, status, submitted_at, jobname)" + "VALUES (?, ?, ?, ?);", + (pstr, "q", submitted_at, str(jobname)), + ) conn.commit() cur.execute("SELECT jobid FROM queue " f"WHERE submitted_at = '{submitted_at}';") ret = cur.fetchone()[0] diff --git a/src/tomato/drivers/biologic/main.py b/src/tomato/drivers/biologic/main.py index 7428648..b2aaf74 100644 --- a/src/tomato/drivers/biologic/main.py +++ b/src/tomato/drivers/biologic/main.py @@ -184,7 +184,7 @@ def start_job( ti += 1 first = False logger.info(f"starting run on '{address}:{channel}'") - api.StartChannel(id_, channel) + api.StartChannel(id_, channel) logger.info(f"disconnecting from '{address}:{channel}'") api.Disconnect(id_) except Exception as e: @@ -238,5 +238,5 @@ def stop_job( except Exception as e: logger.critical(f"{e=}") jobqueue.close() - dt = datetime.now(timezone.utc) + dt = datetime.now(timezone.utc) return dt.timestamp() diff --git a/src/tomato/ketchup/functions.py b/src/tomato/ketchup/functions.py index dcd2a56..5d94767 100644 --- a/src/tomato/ketchup/functions.py +++ b/src/tomato/ketchup/functions.py @@ -20,13 +20,14 @@ def submit(args: Namespace) -> None: .. code:: bash - ketchup [-t] [-v] [-q] submit + ketchup [-t] [-v] [-q] submit [--jobname JOBNAME] Attempts to open the ``yaml/json`` file specified in the ```` argument, and submit it to tomato's queue. The supplied :class:`argparse.Namespace` has to contain the path to the ``payload``. - Optional arguments include the verbose/quiet switches (``-v/-q``) and the testing + Optional arguments include an optional ``--jobname/-j`` parameter for supplying a + job name for the queue, the verbose/quiet switches (``-v/-q``) and the testing switch (``-t``). Examples @@ -43,6 +44,14 @@ def submit(args: Namespace) -> None: INFO:tomato.dbhandler.sqlite:inserting a new job into 'state' jobid = 4 + >>> # With a job name: + >>> ketchup submit .\dummy_random_2_0.1.yml -j dummy_random_2_0.1 + jobid = 5 + >>> ketchup status 5 + jobid = 5 + jobname = dummy_random_2_0.1 + ... + """ dirs = setlib.get_dirs(args.test) settings = setlib.get_settings(dirs.user_config_dir, dirs.user_data_dir) @@ -64,7 +73,9 @@ def submit(args: Namespace) -> None: payload.tomato.output.path = cwd pstr = payload.json() log.info("queueing 'payload' into 'queue'") - jobid = dbhandler.queue_payload(queue["path"], pstr, type=queue["type"]) + jobid = dbhandler.queue_payload( + queue["path"], pstr, type=queue["type"], jobname=args.jobname + ) print(f"jobid = {jobid}") @@ -102,17 +113,18 @@ def status(args: Namespace) -> None: >>> # Get queue status with all jobs: >>> ketchup -v status queue - jobid status (PID) pipeline - ========================================== - 1 c - 2 cd - 3 r 1035 dummy-10 - 4 q + jobid jobname status (PID) pipeline + ============================================================== + 1 None c + 2 custom_name cd + 3 None r 1035 dummy-10 + 4 other_name q >>> # Get status of a given job >>> ketchup status 1 jobid = 1 - status = c + jobname = None + status = c submitted at = 2022-06-02 06:49:00.578619+00:00 executed at = 2022-06-02 06:49:02.966775+00:00 completed at = 2022-06-02 06:49:08.229213+00:00 @@ -139,23 +151,29 @@ def status(args: Namespace) -> None: elif args.jobid == "queue": jobs = dbhandler.job_get_all(queue["path"], type=queue["type"]) running = dbhandler.pipeline_get_running(state["path"], type=state["type"]) - print(f"{'jobid':6s} {'status':6s} {'(PID)':9s} {'pipeline':20s}") - print("=" * 42) - for jobid, payload, status in jobs: + print( + f"{'jobid':6s} {'jobname':20s} {'status':6s} {'(PID)':9s} {'pipeline':20s}" + ) + print("=" * (7 + 21 + 7 + 10 + 20)) + for jobid, jobname, payload, status in jobs: if status.startswith("q"): - print(f"{str(jobid):6s} {status}") + print(f"{str(jobid):6s} {str(jobname):20s} {status}") elif status.startswith("r"): for pip, pjobid, pid in running: if pjobid == jobid: - print(f"{str(jobid):6s} {status:6s} {str(pid):7s} {pip:20s}") + print( + f"{str(jobid):6s} {str(jobname):20s} " + f"{status:6s} {str(pid):7s} {pip:20s}" + ) elif status.startswith("c") and args.verbose - args.quiet > 0: - print(f"{str(jobid):6s} {status:6s}") + print(f"{str(jobid):6s} {str(jobname):20s} {status:6s}") else: jobid = int(args.jobid) ji = dbhandler.job_get_info(queue["path"], jobid, type=queue["type"]) - payload, status, submitted_at, executed_at, completed_at = ji + jobname, payload, status, submitted_at, executed_at, completed_at = ji print(f"jobid = {jobid}") - print(f"status = {status}") + print(f"jobname = {jobname}") + print(f"status = {status}") print(f"submitted at = {submitted_at}") if status.startswith("r") or status.startswith("c"): print(f"executed at = {executed_at}") diff --git a/src/tomato/main.py b/src/tomato/main.py index 9aacf78..246ab69 100644 --- a/src/tomato/main.py +++ b/src/tomato/main.py @@ -112,6 +112,12 @@ def run_ketchup(): help="File containing the payload to be submitted to tomato.", default=None, ) + submit.add_argument( + "-j", + "--jobname", + help="Set the job name of the submitted job to?", + default=None, + ) submit.set_defaults(func=ketchup.submit) status = subparsers.add_parser("status") diff --git a/tests/test_dummy.py b/tests/test_dummy.py index 48b4c4f..d031b06 100644 --- a/tests/test_dummy.py +++ b/tests/test_dummy.py @@ -54,3 +54,26 @@ def test_run_dummy_random(casename, npoints, prefix, datadir): with open(f"{prefix}.json", "r") as of: dg = json.load(of) assert len(dg["steps"][0]["data"]) == npoints + + +@pytest.mark.parametrize( + "casename, jobname", + [ + ( + "dummy_random_1_0.1", + "custom_name", + ), + ], +) +def test_run_dummy_jobname(casename, jobname, datadir): + os.chdir(datadir) + status = utils.run_casename(casename, jobname=jobname) + assert status == "c" + ret = subprocess.run( + ["ketchup", "-t", "status", "1"], + capture_output=True, + text=True, + ) + for line in ret.stdout.split("\n"): + if line.startswith("jobname"): + assert line.split("=")[1].strip() == jobname diff --git a/tests/utils.py b/tests/utils.py index 1783e1c..6a3e5f2 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -5,14 +5,19 @@ import os -def run_casename(casename: str) -> str: +def run_casename(casename: str, jobname: str = None) -> str: cfg = subprocess.CREATE_NEW_PROCESS_GROUP proc = subprocess.Popen(["tomato", "-t", "-vv"], creationflags=cfg) p = psutil.Process(pid=proc.pid) while not os.path.exists("database.db"): time.sleep(0.1) subprocess.run(["ketchup", "-t", "load", casename, "dummy-10", "-vv"]) - subprocess.run(["ketchup", "-t", "submit", f"{casename}.yml", "dummy-10", "-vv"]) + args = ["ketchup", "-t", "submit", f"{casename}.yml", "dummy-10", "-vv"] + if jobname is not None: + args.append("--jobname") + args.append(jobname) + print(args) + subprocess.run(args) subprocess.run(["ketchup", "-t", "ready", "dummy-10", "-vv"]) while True: @@ -21,11 +26,17 @@ def run_casename(casename: str) -> str: capture_output=True, text=True, ) - status = ret.stdout.split("\n")[1].split("=")[1].strip() - if status.startswith("c"): + end = False + for line in ret.stdout.split("\n"): + if line.startswith("status"): + status = line.split("=")[1].strip() + if status.startswith("c"): + end = True + break + else: + time.sleep(0.1) + if end: break - else: - time.sleep(0.1) for cp in p.children(): cp.send_signal(signal.SIGTERM)