Skip to content

Commit

Permalink
Data format fixes (#10)
Browse files Browse the repository at this point in the history
* Pollrate, tolerance.

* black

* Added change from local.

* Fixed pollrate, E_range.

* black
  • Loading branch information
Peter Kraus authored Apr 7, 2022
1 parent 56e4f08 commit 678d4b8
Show file tree
Hide file tree
Showing 23 changed files with 1,195 additions and 1,120 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.rc1
0.1.rc2
3 changes: 1 addition & 2 deletions src/tomato/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import sys

sys.path += sys.modules["tomato"].__path__

from .main import run_tomato, run_ketchup


2 changes: 1 addition & 1 deletion src/tomato/daemon/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .main import main_loop
from .main import main_loop
44 changes: 21 additions & 23 deletions src/tomato/daemon/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@
import time
import json
import logging

log = logging.getLogger(__name__)

from ..drivers import driver_worker
from .. import dbhandler


def _find_matching_pipelines(pipelines: dict, method: dict) -> list[str]:
req_names = set(method.keys())
req_capabs = []
for k in req_names:
for s in method[k]:
req_capabs.append(s["name"])
req_capabs = set(req_capabs)

name_match = []
candidates = []
for cd in pipelines.keys():
Expand All @@ -31,9 +33,9 @@ def _find_matching_pipelines(pipelines: dict, method: dict) -> list[str]:
capabs += v["capabilities"]
if req_capabs.intersection(set(capabs)) == req_capabs:
matched.append(cd)

return matched


def _pipeline_ready_sample(ret: tuple, sample: dict) -> bool:
sampleid, ready, jobid, pid = ret
Expand All @@ -45,10 +47,10 @@ def _pipeline_ready_sample(ret: tuple, sample: dict) -> bool:
else:
return False


def job_wrapper(
settings: dict,
pipelines: dict,
settings: dict,
pipelines: dict,
payload: dict,
pip: str,
jobid: int,
Expand All @@ -67,61 +69,57 @@ def job_wrapper(
dbhandler.job_set_time(queue["path"], "completed_at", jobid, type=queue["type"])
dbhandler.pipeline_reset_job(state["path"], pip, ready, type=state["type"])

def main_loop(
settings: dict,
pipelines: dict
) -> None:

def main_loop(settings: dict, pipelines: dict) -> None:
qup = settings["queue"]["path"]
qut = settings["queue"]["type"]
stp = settings["state"]["path"]
stt = settings["state"]["type"]
while True:
# check existing PIDs in state
ret = dbhandler.pipeline_get_running(stp, type = stt)
ret = dbhandler.pipeline_get_running(stp, type=stt)
for pip, jobid, pid in ret:
log.debug(f"checking PID of running job '{jobid}'")
if psutil.pid_exists(pid) and "python" in psutil.Process(pid).name():
log.debug(f"PID of running job '{jobid}' found")
# dbhandler.job_set_status(queue, "r", jobid)
else:
log.debug(f"PID of running job '{jobid}' not found")
dbhandler.pipeline_reset_job(stp, pip, False, type = stt)
dbhandler.job_set_status(qup, "ce", jobid, type = qut)
dbhandler.job_set_time(qup, 'completed_at', jobid, type = qut)
dbhandler.pipeline_reset_job(stp, pip, False, type=stt)
dbhandler.job_set_status(qup, "ce", jobid, type=qut)
dbhandler.job_set_time(qup, "completed_at", jobid, type=qut)

# check existing jobs in queue
ret = dbhandler.job_get_all(qup, type = qut)
ret = dbhandler.job_get_all(qup, type=qut)
for jobid, strpl, st in ret:
payload = json.loads(strpl)
if st in ["q", "qw"]:
log.debug(f"checking whether job '{jobid}' can be matched")
matched_pips = _find_matching_pipelines(pipelines, payload["method"])
if len(matched_pips) > 0 and st != "qw":
dbhandler.job_set_status(qup, "qw", jobid, type = qut)
dbhandler.job_set_status(qup, "qw", jobid, type=qut)
log.debug(f"checking whether job '{jobid}' can be queued")
for pip in matched_pips:
pipinfo = dbhandler.pipeline_get_info(stp, pip, type = stt)
pipinfo = dbhandler.pipeline_get_info(stp, pip, type=stt)
can_queue = _pipeline_ready_sample(pipinfo, payload["sample"])
if can_queue:
dbhandler.pipeline_reset_job(stp, pip, False, type=stt)
p = multiprocessing.Process(
name=f"driver_worker_{jobid}",
target=job_wrapper,
args=(settings, pipelines, payload, pip, jobid)
target=job_wrapper,
args=(settings, pipelines, payload, pip, jobid),
)
p.start()
break
time.sleep(settings.get("main loop", 1))


# - if jobid->status == q:
# find matching pipelines -> qw
# - if jobid->status == qw:
# find matching pipelines
# find matching samples
# is pipeline ready -> r -> assign jobid and pid into pipeline state
# is pipeline ready -> r -> assign jobid and pid into pipeline state

#for pname, pvals in pipelines.items():
# for pname, pvals in pipelines.items():
# print(f'driver_worker(settings, pvals, None): with {pname}')
# driver_worker(settings, pvals, None)

2 changes: 1 addition & 1 deletion src/tomato/dbhandler/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .sqlite import *
from .sqlite import *
77 changes: 30 additions & 47 deletions src/tomato/dbhandler/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from datetime import datetime, timezone
import os
import logging

log = logging.getLogger(__name__)


def get_db_conn(
dbpath: str,
dbpath: str,
type: str = "sqlite3",
) -> tuple:
if type == "sqlite3":
Expand All @@ -20,14 +21,12 @@ def get_db_conn(


def queue_setup(
dbpath: str,
dbpath: str,
type: str = "sqlite3",
) -> None:
conn, cur = get_db_conn(dbpath, type)
log.debug(f"attempting to find table 'queue' in '{dbpath}'")
cur.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='queue';"
)
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='queue';")
exists = bool(len(cur.fetchall()))
conn.close()
if exists:
Expand All @@ -50,14 +49,12 @@ def queue_setup(


def state_setup(
dbpath: str,
dbpath: str,
type: str = "sqlite3",
) -> None:
conn, cur = get_db_conn(dbpath, type)
log.debug(f"attempting to find table 'state' in '{dbpath}'")
cur.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='state';"
)
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='state';")
exists = bool(len(cur.fetchall()))
conn.close()
if exists:
Expand All @@ -80,21 +77,19 @@ def state_setup(


def job_set_status(
dbpath: str,
st: str,
dbpath: str,
st: str,
jobid: int,
type: str = "sqlite3",
) -> None:
conn, cur = get_db_conn(dbpath, type)
cur.execute(
f"UPDATE queue SET status = '{st}' WHERE jobid = {jobid};"
)
cur.execute(f"UPDATE queue SET status = '{st}' WHERE jobid = {jobid};")
conn.commit()
conn.close()


def job_get_info(
dbpath: str,
dbpath: str,
jobid: int,
type: str = "sqlite3",
) -> tuple:
Expand All @@ -106,40 +101,36 @@ def job_get_info(
ret = cur.fetchone()
conn.close()
return ret


def job_set_time(
dbpath: str,
tcol: str,
jobid: int,
dbpath: str,
tcol: str,
jobid: int,
type: str = "sqlite3",
) -> None:
conn, cur = get_db_conn(dbpath, type)
ts = str(datetime.now(timezone.utc))
cur.execute(
f"UPDATE queue SET {tcol} = '{ts}' WHERE jobid = {jobid};"
)
cur.execute(f"UPDATE queue SET {tcol} = '{ts}' WHERE jobid = {jobid};")
conn.commit()
conn.close()


def job_get_all(
dbpath: str,
dbpath: str,
type: str = "sqlite3",
) -> list[tuple]:
conn, cur = get_db_conn(dbpath, type)
cur.execute(
"SELECT jobid, payload, status FROM queue;"
)
cur.execute("SELECT jobid, payload, status FROM queue;")
ret = cur.fetchall()
conn.close()
return ret


def pipeline_reset_job(
dbpath: str,
pip: str,
ready: bool = False,
dbpath: str,
pip: str,
ready: bool = False,
type: str = "sqlite3",
) -> None:
conn, cur = get_db_conn(dbpath, type)
Expand Down Expand Up @@ -173,9 +164,7 @@ def pipeline_get_running(
type: str = "sqlite3",
) -> list[tuple]:
conn, cur = get_db_conn(dbpath, type)
cur.execute(
"SELECT pipeline, jobid, pid FROM state WHERE pid IS NOT NULL;"
)
cur.execute("SELECT pipeline, jobid, pid FROM state WHERE pid IS NOT NULL;")
ret = cur.fetchall()
conn.close()
return ret
Expand All @@ -186,9 +175,7 @@ def pipeline_get_all(
type: str = "sqlite3",
) -> list[tuple]:
conn, cur = get_db_conn(dbpath, type)
cur.execute(
"SELECT pipeline FROM state;"
)
cur.execute("SELECT pipeline FROM state;")
ret = [i[0] for i in cur.fetchall()]
conn.close()
return ret
Expand All @@ -215,9 +202,7 @@ def pipeline_remove(
) -> None:
conn, cur = get_db_conn(dbpath, type)
log.warning(f"deleting pipeline '{pip}' from 'state'")
cur.execute(
"DELETE FROM state WHERE pipeline='{pip}';"
)
cur.execute("DELETE FROM state WHERE pipeline='{pip}';")
conn.commit()
conn.close()

Expand All @@ -230,9 +215,8 @@ def pipeline_insert(
conn, cur = get_db_conn(dbpath, type)
log.info(f"creating pipeline '{pip}' in 'state'")
cur.execute(
"INSERT INTO state (pipeline, sampleid, jobid, ready)"
"VALUES (?, ?, ?, ?);",
(pip, None, None, 0)
"INSERT INTO state (pipeline, sampleid, jobid, ready)" "VALUES (?, ?, ?, ?);",
(pip, None, None, 0),
)
conn.commit()
conn.close()
Expand Down Expand Up @@ -260,12 +244,12 @@ def pipeline_eject_sample(
) -> None:
conn, cur = get_db_conn(dbpath, type)
cur.execute(
f"UPDATE state SET sampleid = NULL, ready = 0 "
f"WHERE pipeline = '{pip}';"
f"UPDATE state SET sampleid = NULL, ready = 0 " f"WHERE pipeline = '{pip}';"
)
conn.commit()
conn.close()


def queue_payload(
dbpath: str,
pstr: str,
Expand All @@ -274,9 +258,8 @@ def queue_payload(
conn, cur = get_db_conn(dbpath, type)
log.info(f"inserting a new job into 'state'")
cur.execute(
"INSERT INTO queue (payload, status, submitted_at)"
"VALUES (?, ?, ?);",
(pstr, 'q', str(datetime.now(timezone.utc)))
"INSERT INTO queue (payload, status, submitted_at)" "VALUES (?, ?, ?);",
(pstr, "q", str(datetime.now(timezone.utc))),
)
conn.commit()
conn.close()
conn.close()
2 changes: 1 addition & 1 deletion src/tomato/drivers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from . import biologic
from .driver_funcs import driver_api, driver_worker
from .driver_funcs import driver_api, driver_worker
2 changes: 1 addition & 1 deletion src/tomato/drivers/biologic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
This driver is a wrapper around BioLogic's `kbio` package.
"""
from .main import get_status, get_data, start_job, stop_job
from .main import get_status, get_data, start_job, stop_job
Loading

0 comments on commit 678d4b8

Please sign in to comment.