Skip to content

Commit

Permalink
Compatibility for tomato.json (#20)
Browse files Browse the repository at this point in the history
* keep previous "current"

* Minor fix to tomatojson.

* Drop default pollrate for dummy.

* More verbosity in main_loop.

* More debug, less delay.

* Disable linux tests.
  • Loading branch information
Peter Kraus authored May 10, 2022
1 parent efabae4 commit e25015d
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 99 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/pull-request-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ jobs:
build:
strategy:
matrix:
pyver: ['3.9']
os: ['ubuntu-latest', 'windows-latest']
pyver: ['3.9', '3.10']
os: ['windows-latest']
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
Expand All @@ -27,8 +27,8 @@ jobs:
needs: [build]
strategy:
matrix:
pyver: ['3.9']
os: ['ubuntu-latest', 'windows-latest']
pyver: ['3.9', '3.10']
os: ['windows-latest']
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"console_scripts": [
"tomato=tomato:run_tomato",
"ketchup=tomato:run_ketchup",
"tomato_job=tomato.daemon:tomato_job"
"tomato_job=tomato.drivers:tomato_job"
]
},
)
2 changes: 1 addition & 1 deletion src/tomato/daemon/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .main import main_loop, tomato_job
from .main import main_loop
87 changes: 5 additions & 82 deletions src/tomato/daemon/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from importlib import metadata
import psutil
import argparse
import os
import subprocess
import time
import json
import logging

from ..drivers import driver_worker, driver_reset
from ..drivers import driver_worker, driver_reset, tomato_job
from .. import dbhandler


Expand Down Expand Up @@ -47,71 +45,6 @@ def _pipeline_ready_sample(ret: tuple, sample: dict) -> bool:
return False


def tomato_job() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--version",
action="version",
version=f'%(prog)s version {metadata.version("tomato")}',
)
parser.add_argument(
"jobfile",
help="Path to a ketchup-processed payload json file.",
default=None,
)
args = parser.parse_args()

logfile = args.jobfile.replace(".json", ".log")

logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s:%(levelname)-8s:%(processName)s:%(message)s",
handlers=[logging.FileHandler(logfile, mode="a"), logging.StreamHandler()],
)
logger = logging.getLogger(__name__)

logger.info("attempting to load jobfile '%s'", args.jobfile)
with open(args.jobfile, "r") as infile:
jsdata = json.load(infile)

logger.debug("parsing data from jobfile")
settings = jsdata["settings"]
payload = jsdata["payload"]
pipeline = jsdata["pipeline"]
pip = pipeline["name"]
jobid = jsdata["jobid"]
queue = settings["queue"]
state = settings["state"]
pid = os.getpid()

logger.debug(f"assigning job '{jobid}' on pid '{pid}' into pipeline '{pip}'")
dbhandler.pipeline_assign_job(state["path"], pip, jobid, pid, type=state["type"])
dbhandler.job_set_status(queue["path"], "r", jobid, type=queue["type"])
dbhandler.job_set_time(queue["path"], "executed_at", jobid, type=queue["type"])

logger.info("handing off to 'driver_worker'")
logger.info("==============================")
ret = driver_worker(settings, pipeline, payload, jobid, logfile)

logger.info("==============================")
ready = payload.get("tomato", {}).get("unlock_when_done", False)
if ret is None:
logger.info("job finished successfully, setting status to 'c'")
dbhandler.job_set_status(queue["path"], "c", jobid, type=queue["type"])
else:
logger.info("job was terminated, setting status to 'cd'")
dbhandler.job_set_status(queue["path"], "cd", jobid, type=queue["type"])
logger.info("handing off to 'driver_reset'")
logger.info("==============================")
driver_reset(settings, pipeline)
logger.info("==============================")
ready = False

logger.debug(f"setting pipeline '{pip}' as '{'ready' if ready else 'not ready'}'")
dbhandler.pipeline_reset_job(state["path"], pip, ready, type=state["type"])
dbhandler.job_set_time(queue["path"], "completed_at", jobid, type=queue["type"])


def main_loop(settings: dict, pipelines: dict) -> None:
log = logging.getLogger(__name__)
qup = settings["queue"]["path"]
Expand All @@ -138,7 +71,7 @@ def main_loop(settings: dict, pipelines: dict) -> None:
payload = json.loads(strpl)
if st in ["q", "qw"]:
if st == "q":
log.debug(f"checking whether job '{jobid}' can ever be matched")
log.info(f"checking whether job '{jobid}' can ever be matched")
matched_pips = _find_matching_pipelines(pipelines, payload["method"])
if len(matched_pips) > 0 and st != "qw":
dbhandler.job_set_status(qup, "qw", jobid, type=qut)
Expand All @@ -147,6 +80,7 @@ def main_loop(settings: dict, pipelines: dict) -> None:
pipinfo = dbhandler.pipeline_get_info(stp, pip["name"], type=stt)
can_queue = _pipeline_ready_sample(pipinfo, payload["sample"])
if can_queue:
log.info(f"queueing job '{jobid}' on pipeline '{pip['name']}'")
dbhandler.pipeline_reset_job(stp, pip["name"], False, type=stt)
args = {
"settings": settings,
Expand All @@ -159,22 +93,11 @@ def main_loop(settings: dict, pipelines: dict) -> None:
jpath = os.path.join(root, "jobdata.json")
with open(jpath, "w") as of:
json.dump(args, of, indent=1)
cfs = subprocess.CREATE_NEW_PROCESS_GROUP
cfs |= subprocess.CREATE_NO_WINDOW
cfs = subprocess.CREATE_NO_WINDOW
cfs |= subprocess.CREATE_NEW_PROCESS_GROUP
subprocess.Popen(
["tomato_job", str(jpath)],
creationflags=cfs,
)
break
time.sleep(settings.get("main loop", 1))

# - if jobid->status == q:
# find matching pipelines -> qw
# - if jobid->status == qw:
# find matching pipelines
# find matching samples
# is pipeline ready -> r -> assign jobid and pid into pipeline state

# for pname, pvals in pipelines.items():
# print(f'driver_worker(settings, pvals, None): with {pname}')
# driver_worker(settings, pvals, None)
2 changes: 1 addition & 1 deletion src/tomato/drivers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from . import biologic, dummy
from .driver_funcs import driver_api, driver_worker, driver_reset
from .driver_funcs import driver_api, driver_worker, driver_reset, tomato_job
95 changes: 91 additions & 4 deletions src/tomato/drivers/driver_funcs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Any, Callable
from typing import Any
from importlib import metadata
import importlib
import argparse
import time
import multiprocessing
import os
Expand All @@ -8,6 +10,79 @@
import logging

from .logger_funcs import log_listener_config, log_listener, log_worker_config
from .. import dbhandler

def tomato_job() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--version",
action="version",
version=f'%(prog)s version {metadata.version("tomato")}',
)
parser.add_argument(
"jobfile",
help="Path to a ketchup-processed payload json file.",
default=None,
)
args = parser.parse_args()

logfile = args.jobfile.replace(".json", ".log")

logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s:%(levelname)-8s:%(processName)s:%(message)s",
handlers=[logging.FileHandler(logfile, mode="a"), logging.StreamHandler()],
)
logger = logging.getLogger(__name__)

logger.info("attempting to load jobfile '%s'", args.jobfile)
with open(args.jobfile, "r") as infile:
jsdata = json.load(infile)

logger.debug("parsing data from jobfile")
settings = jsdata["settings"]
payload = jsdata["payload"]
pipeline = jsdata["pipeline"]
pip = pipeline["name"]
jobid = jsdata["jobid"]
queue = settings["queue"]
state = settings["state"]


verbosity = payload.get("tomato", {}).get("verbosity", "INFO")
loglevel = logging._checkLevel(verbosity)
logger.debug("setting logger verbosity to '%s'", verbosity)
logger.setLevel(loglevel)

pid = os.getpid()

logger.debug(f"assigning job '{jobid}' on pid '{pid}' into pipeline '{pip}'")
dbhandler.pipeline_assign_job(state["path"], pip, jobid, pid, type=state["type"])
dbhandler.job_set_status(queue["path"], "r", jobid, type=queue["type"])
dbhandler.job_set_time(queue["path"], "executed_at", jobid, type=queue["type"])

logger.info("handing off to 'driver_worker'")
logger.info("==============================")
ret = driver_worker(settings, pipeline, payload, jobid, logfile, loglevel)

logger.info("==============================")
ready = payload.get("tomato", {}).get("unlock_when_done", False)
if ret is None:
logger.info("job finished successfully, setting status to 'c'")
dbhandler.job_set_status(queue["path"], "c", jobid, type=queue["type"])
else:
logger.info("job was terminated, setting status to 'cd'")
dbhandler.job_set_status(queue["path"], "cd", jobid, type=queue["type"])
logger.info("handing off to 'driver_reset'")
logger.info("==============================")
driver_reset(settings, pipeline)
logger.info("==============================")
ready = False

logger.debug(f"setting pipeline '{pip}' as '{'ready' if ready else 'not ready'}'")
dbhandler.pipeline_reset_job(state["path"], pip, ready, type=state["type"])
dbhandler.job_set_time(queue["path"], "completed_at", jobid, type=queue["type"])



def driver_api(
Expand All @@ -32,14 +107,16 @@ def data_poller(
channel: int,
device: str,
root: str,
loglevel: int,
kwargs: dict,
) -> None:
log_worker_config(lq)
log_worker_config(lq, loglevel)
log = logging.getLogger()
pollrate = kwargs.pop("pollrate", 10)
verbose = bool(kwargs.pop("verbose", 0))
log.debug(f"in 'data_poller', {pollrate=}")
cont = True
previous = None
while cont:
ts, done, metadata = driver_api(
driver, "get_status", jq, log, address, channel, **kwargs
Expand All @@ -54,6 +131,8 @@ def data_poller(
ts, nrows, data = driver_api(
driver, "get_data", jq, log, address, channel, **kwargs
)
data["previous"] = previous
previous = data["current"]
while nrows > 0:
isots = datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
isots = isots.replace(":", "")
Expand All @@ -64,6 +143,8 @@ def data_poller(
ts, nrows, data = driver_api(
driver, "get_data", jq, log, address, channel, **kwargs
)
data["previous"] = previous
previous = data["current"]
if done:
cont = False
else:
Expand All @@ -73,12 +154,18 @@ def data_poller(


def driver_worker(
settings: dict, pipeline: dict, payload: dict, jobid: int, logfile: str
settings: dict,
pipeline: dict,
payload: dict,
jobid: int,
logfile: str,
loglevel: int,
) -> None:

jq = multiprocessing.Queue(maxsize=0)

log = logging.getLogger(__name__)
log.setLevel(loglevel)
log.debug("starting 'log_listener'")
lq = multiprocessing.Queue(maxsize=0)
listener = multiprocessing.Process(
Expand Down Expand Up @@ -125,7 +212,7 @@ def driver_worker(
p = multiprocessing.Process(
name=f"data_poller_{jobid}_{tag}",
target=data_poller,
args=(drv, jq, lq, addr, ch, tag, root, kwargs),
args=(drv, jq, lq, addr, ch, tag, root, loglevel, kwargs),
)
jobs.append(p)
p.start()
Expand Down
9 changes: 5 additions & 4 deletions src/tomato/drivers/dummy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ def _dummy_process(
delay: int = 1,
t: int = 10,
) -> None:
ts = time.perf_counter()
te = time.perf_counter()
ts = te = time.perf_counter()
nd = 0
while te - ts < t:
if queue.empty():
Expand All @@ -25,7 +24,7 @@ def _dummy_process(
"value": random.random() if name == "random" else nd,
}
queue.put(data)
time.sleep(delay / 20)
time.sleep(1e-3)
te = time.perf_counter()
return

Expand Down Expand Up @@ -104,7 +103,7 @@ def get_data(
if jobqueue.empty() and len(points) > 0:
jobqueue.put(None)
npoints = len(points)
data = {"data": points}
data = {"data": points, "current": None}
return dt.timestamp(), npoints, data


Expand Down Expand Up @@ -154,6 +153,8 @@ def start_job(
target=_dummy_process, args=(jobqueue, name, delay, t)
)
pr.start()
# Delay before quitting so that processes get a chance to start
time.sleep(1)
return dt.timestamp()


Expand Down
4 changes: 2 additions & 2 deletions src/tomato/drivers/logger_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def log_listener(queue, configurer, path):
logger.handle(record)


def log_worker_config(queue):
def log_worker_config(queue, loglevel = logging.INFO):
h = logging.handlers.QueueHandler(queue)
root = logging.getLogger()
root.addHandler(h)
root.setLevel(logging.DEBUG)
root.setLevel(loglevel)
1 change: 1 addition & 0 deletions src/tomato/setlib/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def _default_pipelines() -> dict[str, dict]:
"channels": [5, 10],
"driver": "dummy",
"capabilities": ["random"],
"pollrate": 1,
}
],
"pipelines": [
Expand Down
2 changes: 2 additions & 0 deletions tests/common/dummy_random_2_0.1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ method:
- name: "random"
time: 2
delay: 0.1
tomato:
verbosity: "DEBUG"

0 comments on commit e25015d

Please sign in to comment.