Skip to content

Commit

Permalink
Implement --jobname and -j as arguments to ketchup submit (#30)
Browse files Browse the repository at this point in the history
* jobname code

* jobname tests

* black
  • Loading branch information
Peter Kraus authored Jun 27, 2022
1 parent dd79fe6 commit 97b7614
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/tomato/daemon/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def main_loop(settings: dict, pipelines: dict, test: bool = False) -> None:

# check existing jobs in queue
ret = dbhandler.job_get_all(qup, type=qut)
for jobid, strpl, st in ret:
for jobid, jobname, strpl, st in ret:
payload = json.loads(strpl)
if st in ["q", "qw"]:
if st == "q":
Expand Down
37 changes: 30 additions & 7 deletions src/tomato/dbhandler/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,25 @@ def queue_setup(
dbpath: str,
type: str = "sqlite3",
) -> None:
user_version = 1
conn, cur = get_db_conn(dbpath, type)
log.debug(f"attempting to find table 'queue' in '{dbpath}'")
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='queue';")
exists = bool(len(cur.fetchall()))
conn.close()
if exists:
log.debug(f"table 'queue' present at '{dbpath}'")
conn, cur = get_db_conn(dbpath, type)
cur.execute("PRAGMA user_version;")
curr_version = cur.fetchone()[0]
while curr_version < user_version:
if curr_version == 0:
log.info("upgrading table 'queue' from version 0 to 1")
cur.execute("ALTER TABLE queue ADD COLUMN jobname TEXT;")
cur.execute("PRAGMA user_version = 1;")
conn.commit()
cur.execute("PRAGMA user_version;")
curr_version = cur.fetchone()[0]
else:
log.warning(f"creating a new {type} 'queue' table at '{dbpath}'")
conn, cur = get_db_conn(dbpath, type)
Expand All @@ -39,9 +51,11 @@ def queue_setup(
" status TEXT NOT NULL,"
" submitted_at TEXT NOT NULL,"
" executed_at TEXT,"
" completed_at TEXT"
" completed_at TEXT,"
" jobname TEXT"
");"
)
cur.execute(f"PRAGMA user_version = {user_version};")
conn.commit()
conn.close()

Expand Down Expand Up @@ -93,7 +107,8 @@ def job_get_info(
) -> tuple:
conn, cur = get_db_conn(dbpath, type)
cur.execute(
"SELECT payload, status, submitted_at, executed_at, completed_at FROM queue "
"SELECT jobname, payload, status, submitted_at, executed_at, completed_at "
"FROM queue "
f"WHERE jobid = {jobid};"
)
ret = cur.fetchone()
Expand All @@ -119,7 +134,7 @@ def job_get_all(
type: str = "sqlite3",
) -> list[tuple]:
conn, cur = get_db_conn(dbpath, type)
cur.execute("SELECT jobid, payload, status FROM queue;")
cur.execute("SELECT jobid, jobname, payload, status FROM queue;")
ret = cur.fetchall()
conn.close()
return ret
Expand Down Expand Up @@ -252,14 +267,22 @@ def queue_payload(
dbpath: str,
pstr: str,
type: str = "sqlite3",
jobname: str = None,
) -> tuple:
conn, cur = get_db_conn(dbpath, type)
log.info(f"inserting a new job into 'state'")
submitted_at = str(datetime.now(timezone.utc))
cur.execute(
"INSERT INTO queue (payload, status, submitted_at)" "VALUES (?, ?, ?);",
(pstr, "q", submitted_at),
)
if jobname is None:
cur.execute(
"INSERT INTO queue (payload, status, submitted_at)" "VALUES (?, ?, ?);",
(pstr, "q", submitted_at),
)
else:
cur.execute(
"INSERT INTO queue (payload, status, submitted_at, jobname)"
"VALUES (?, ?, ?, ?);",
(pstr, "q", submitted_at, str(jobname)),
)
conn.commit()
cur.execute("SELECT jobid FROM queue " f"WHERE submitted_at = '{submitted_at}';")
ret = cur.fetchone()[0]
Expand Down
4 changes: 2 additions & 2 deletions src/tomato/drivers/biologic/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def start_job(
ti += 1
first = False
logger.info(f"starting run on '{address}:{channel}'")
api.StartChannel(id_, channel)
api.StartChannel(id_, channel)
logger.info(f"disconnecting from '{address}:{channel}'")
api.Disconnect(id_)
except Exception as e:
Expand Down Expand Up @@ -238,5 +238,5 @@ def stop_job(
except Exception as e:
logger.critical(f"{e=}")
jobqueue.close()
dt = datetime.now(timezone.utc)
dt = datetime.now(timezone.utc)
return dt.timestamp()
54 changes: 36 additions & 18 deletions src/tomato/ketchup/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ def submit(args: Namespace) -> None:
.. code:: bash
ketchup [-t] [-v] [-q] submit <payload>
ketchup [-t] [-v] [-q] submit <payload> [--jobname JOBNAME]
Attempts to open the ``yaml/json`` file specified in the ``<payload>`` argument,
and submit it to tomato's queue.
The supplied :class:`argparse.Namespace` has to contain the path to the ``payload``.
Optional arguments include the verbose/quiet switches (``-v/-q``) and the testing
Optional arguments include an optional ``--jobname/-j`` parameter for supplying a
job name for the queue, the verbose/quiet switches (``-v/-q``) and the testing
switch (``-t``).
Examples
Expand All @@ -43,6 +44,14 @@ def submit(args: Namespace) -> None:
INFO:tomato.dbhandler.sqlite:inserting a new job into 'state'
jobid = 4
>>> # With a job name:
>>> ketchup submit .\dummy_random_2_0.1.yml -j dummy_random_2_0.1
jobid = 5
>>> ketchup status 5
jobid = 5
jobname = dummy_random_2_0.1
...
"""
dirs = setlib.get_dirs(args.test)
settings = setlib.get_settings(dirs.user_config_dir, dirs.user_data_dir)
Expand All @@ -64,7 +73,9 @@ def submit(args: Namespace) -> None:
payload.tomato.output.path = cwd
pstr = payload.json()
log.info("queueing 'payload' into 'queue'")
jobid = dbhandler.queue_payload(queue["path"], pstr, type=queue["type"])
jobid = dbhandler.queue_payload(
queue["path"], pstr, type=queue["type"], jobname=args.jobname
)
print(f"jobid = {jobid}")


Expand Down Expand Up @@ -102,17 +113,18 @@ def status(args: Namespace) -> None:
>>> # Get queue status with all jobs:
>>> ketchup -v status queue
jobid status (PID) pipeline
==========================================
1 c
2 cd
3 r 1035 dummy-10
4 q
jobid jobname status (PID) pipeline
==============================================================
1 None c
2 custom_name cd
3 None r 1035 dummy-10
4 other_name q
>>> # Get status of a given job
>>> ketchup status 1
jobid = 1
status = c
jobname = None
status = c
submitted at = 2022-06-02 06:49:00.578619+00:00
executed at = 2022-06-02 06:49:02.966775+00:00
completed at = 2022-06-02 06:49:08.229213+00:00
Expand All @@ -139,23 +151,29 @@ def status(args: Namespace) -> None:
elif args.jobid == "queue":
jobs = dbhandler.job_get_all(queue["path"], type=queue["type"])
running = dbhandler.pipeline_get_running(state["path"], type=state["type"])
print(f"{'jobid':6s} {'status':6s} {'(PID)':9s} {'pipeline':20s}")
print("=" * 42)
for jobid, payload, status in jobs:
print(
f"{'jobid':6s} {'jobname':20s} {'status':6s} {'(PID)':9s} {'pipeline':20s}"
)
print("=" * (7 + 21 + 7 + 10 + 20))
for jobid, jobname, payload, status in jobs:
if status.startswith("q"):
print(f"{str(jobid):6s} {status}")
print(f"{str(jobid):6s} {str(jobname):20s} {status}")
elif status.startswith("r"):
for pip, pjobid, pid in running:
if pjobid == jobid:
print(f"{str(jobid):6s} {status:6s} {str(pid):7s} {pip:20s}")
print(
f"{str(jobid):6s} {str(jobname):20s} "
f"{status:6s} {str(pid):7s} {pip:20s}"
)
elif status.startswith("c") and args.verbose - args.quiet > 0:
print(f"{str(jobid):6s} {status:6s}")
print(f"{str(jobid):6s} {str(jobname):20s} {status:6s}")
else:
jobid = int(args.jobid)
ji = dbhandler.job_get_info(queue["path"], jobid, type=queue["type"])
payload, status, submitted_at, executed_at, completed_at = ji
jobname, payload, status, submitted_at, executed_at, completed_at = ji
print(f"jobid = {jobid}")
print(f"status = {status}")
print(f"jobname = {jobname}")
print(f"status = {status}")
print(f"submitted at = {submitted_at}")
if status.startswith("r") or status.startswith("c"):
print(f"executed at = {executed_at}")
Expand Down
6 changes: 6 additions & 0 deletions src/tomato/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ def run_ketchup():
help="File containing the payload to be submitted to tomato.",
default=None,
)
submit.add_argument(
"-j",
"--jobname",
help="Set the job name of the submitted job to?",
default=None,
)
submit.set_defaults(func=ketchup.submit)

status = subparsers.add_parser("status")
Expand Down
23 changes: 23 additions & 0 deletions tests/test_dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,26 @@ def test_run_dummy_random(casename, npoints, prefix, datadir):
with open(f"{prefix}.json", "r") as of:
dg = json.load(of)
assert len(dg["steps"][0]["data"]) == npoints


@pytest.mark.parametrize(
"casename, jobname",
[
(
"dummy_random_1_0.1",
"custom_name",
),
],
)
def test_run_dummy_jobname(casename, jobname, datadir):
os.chdir(datadir)
status = utils.run_casename(casename, jobname=jobname)
assert status == "c"
ret = subprocess.run(
["ketchup", "-t", "status", "1"],
capture_output=True,
text=True,
)
for line in ret.stdout.split("\n"):
if line.startswith("jobname"):
assert line.split("=")[1].strip() == jobname
23 changes: 17 additions & 6 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@
import os


def run_casename(casename: str) -> str:
def run_casename(casename: str, jobname: str = None) -> str:
cfg = subprocess.CREATE_NEW_PROCESS_GROUP
proc = subprocess.Popen(["tomato", "-t", "-vv"], creationflags=cfg)
p = psutil.Process(pid=proc.pid)
while not os.path.exists("database.db"):
time.sleep(0.1)
subprocess.run(["ketchup", "-t", "load", casename, "dummy-10", "-vv"])
subprocess.run(["ketchup", "-t", "submit", f"{casename}.yml", "dummy-10", "-vv"])
args = ["ketchup", "-t", "submit", f"{casename}.yml", "dummy-10", "-vv"]
if jobname is not None:
args.append("--jobname")
args.append(jobname)
print(args)
subprocess.run(args)
subprocess.run(["ketchup", "-t", "ready", "dummy-10", "-vv"])

while True:
Expand All @@ -21,11 +26,17 @@ def run_casename(casename: str) -> str:
capture_output=True,
text=True,
)
status = ret.stdout.split("\n")[1].split("=")[1].strip()
if status.startswith("c"):
end = False
for line in ret.stdout.split("\n"):
if line.startswith("status"):
status = line.split("=")[1].strip()
if status.startswith("c"):
end = True
break
else:
time.sleep(0.1)
if end:
break
else:
time.sleep(0.1)

for cp in p.children():
cp.send_signal(signal.SIGTERM)
Expand Down

0 comments on commit 97b7614

Please sign in to comment.