Skip to content

Commit

Permalink
Export functionality. (#21)
Browse files Browse the repository at this point in the history
* Link up with yadg.

* black

* Remove debug print from test

* Ask for a release of yadg.

* Add externaldate exporting.

* Add func & test for path
  • Loading branch information
Peter Kraus authored May 11, 2022
1 parent e25015d commit 3790280
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 74 deletions.
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
"Bug Tracker": "https://github.com/dgbowl/tomato/issues",
},
classifiers=[
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"Operating System :: OS Independent",
],
Expand All @@ -34,6 +35,8 @@
"toml",
"pyyaml",
"psutil",
#"yadg>=4.1.0rc5"
"yadg @ git+https://github.com/dgbowl/yadg.git@master#egg=yadg"
],
extras_require={
"testing": [
Expand Down
12 changes: 4 additions & 8 deletions src/tomato/daemon/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,9 @@
from .. import dbhandler


def _find_matching_pipelines(pipelines: list, method: dict) -> list[str]:
req_names = set(method.keys())
req_capabs = []
for k in req_names:
for s in method[k]:
req_capabs.append(s["name"])
req_capabs = set(req_capabs)
def _find_matching_pipelines(pipelines: list, method: list[dict]) -> list[str]:
req_names = set([item["device"] for item in method])
req_capabs = set([item["technique"] for item in method])

candidates = []
for cd in pipelines:
Expand Down Expand Up @@ -94,7 +90,7 @@ def main_loop(settings: dict, pipelines: dict) -> None:
with open(jpath, "w") as of:
json.dump(args, of, indent=1)
cfs = subprocess.CREATE_NO_WINDOW
cfs |= subprocess.CREATE_NEW_PROCESS_GROUP
# cfs |= subprocess.CREATE_NEW_PROCESS_GROUP
subprocess.Popen(
["tomato_job", str(jpath)],
creationflags=cfs,
Expand Down
30 changes: 15 additions & 15 deletions src/tomato/drivers/biologic/kbio_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ def vlimit(cond: str, vals: list[float], Is: list[float] = None) -> list[float]:


def translate(technique: dict, capacity: float) -> dict:
if technique["name"].startswith("constant_"):
if technique["technique"].startswith("constant_"):
ns = get_num_steps(technique)
tech = {
"name": technique["name"],
"technique": technique["technique"],
"Step_number": ns - 1,
"N_Cycles": technique.get("n_cycles", 0),
"Record_every_dT": technique.get("record_every_dt", 30.0),
Expand All @@ -123,11 +123,11 @@ def translate(technique: dict, capacity: float) -> dict:
"Exit_Cond": pad_steps(2 * int(technique.get("exit_on_limit", False)), ns),
}
ci = 1
if technique["name"].endswith("current"):
if technique["technique"].endswith("current"):
I = current(technique["current"], capacity)
tech["Current_step"] = pad_steps(I, ns)
tech["Record_every_dE"] = technique.get("record_every_dE", 0.005)
elif technique["name"].endswith("voltage"):
elif technique["technique"].endswith("voltage"):
tech["Voltage_step"] = pad_steps(technique["voltage"], ns)
tech["Record_every_dI"] = technique.get("record_every_dI", 0.001)
for prop in {"voltage", "current"}:
Expand All @@ -143,16 +143,16 @@ def translate(technique: dict, capacity: float) -> dict:
vals = vlimit(cond, padded, tech.get("Current_step"))
tech[f"Test{ci}_Value"] = vals
ci += 1
elif technique["name"] == "loop":
elif technique["technique"] == "loop":
tech = {
"name": "loop",
"technique": "loop",
"loop_N_times": technique.get("n_gotos", -1),
"protocol_number": technique.get("goto", 0),
}
elif technique["name"].startswith("sweep_"):
elif technique["technique"].startswith("sweep_"):
ns = get_num_steps(technique)
tech = {
"name": technique["name"],
"technique": technique["technique"],
"Scan_number": ns - 1,
"N_Cycles": technique.get("n_cycles", 0),
"I_Range": I_ranges[technique.get("I_range", "keep")],
Expand All @@ -176,22 +176,22 @@ def translate(technique: dict, capacity: float) -> dict:
tech[f"Test{ci}_Config"] = pad_steps(conf, ns)
tech[f"Test{ci}_Value"] = pad_steps(val, ns)
ci += 1
if technique["name"].endswith("current"):
if technique["technique"].endswith("current"):
I = current(technique["current"], capacity)
tech["Current_step"] = pad_steps(I, ns)
tech["Begin_measuring_E"] = technique.get("scan_start", 0.0)
tech["End_measuring_E"] = technique.get("scan_end", 1.0)
tech["Record_every_dI"] = technique.get("record_every_dI", 0.001)
elif technique["name"].endswith("voltage"):
elif technique["technique"].endswith("voltage"):
tech["Voltage_step"] = pad_steps(technique["voltage"], ns)
tech["Begin_measuring_I"] = technique.get("scan_start", 0.0)
tech["End_measuring_I"] = technique.get("scan_end", 1.0)
tech["Record_every_dE"] = technique.get("record_every_dE", 0.005)
else:
if technique["name"] != "open_circuit_voltage":
log.error(f"technique name '{technique['name']}' not understood.")
if technique["technique"] != "open_circuit_voltage":
log.error(f"technique name '{technique['technique']}' not understood.")
tech = {
"name": "open_circuit_voltage",
"technique": "open_circuit_voltage",
"Rest_time_T": technique.get("time", 0.0),
"Record_every_dT": technique.get("record_every_dt", 30.0),
"Record_every_dE": technique.get("record_every_dE", 0.005),
Expand All @@ -204,7 +204,7 @@ def translate(technique: dict, capacity: float) -> dict:
def dsl_to_ecc(api, dsl: dict) -> EccParams:
eccs = []
for k, val in dsl.items():
if k == "name":
if k == "technique":
continue
elif isinstance(val, list):
for i, v in zip(range(len(val)), val):
Expand All @@ -222,7 +222,7 @@ def payload_to_ecc(api, payload: list[dict], capacity: float) -> list[dict]:
for technique in payload:
dsl = translate(technique, capacity)
eccpars = dsl_to_ecc(api, dsl)
eccs.append((dsl["name"], eccpars))
eccs.append((dsl["technique"], eccpars))
return eccs


Expand Down
74 changes: 42 additions & 32 deletions src/tomato/drivers/driver_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
import argparse
import time
import multiprocessing
import subprocess
import os
import json
from datetime import datetime, timezone
import logging

from .logger_funcs import log_listener_config, log_listener, log_worker_config
from .yadg_funcs import get_yadg_preset
from .. import dbhandler


def tomato_job() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
Expand All @@ -26,6 +29,7 @@ def tomato_job() -> None:
)
args = parser.parse_args()

jobfolder, _ = os.path.split(os.path.abspath(args.jobfile))
logfile = args.jobfile.replace(".json", ".log")

logging.basicConfig(
Expand All @@ -40,20 +44,20 @@ def tomato_job() -> None:
jsdata = json.load(infile)

logger.debug("parsing data from jobfile")
jobid = jsdata["jobid"]
settings = jsdata["settings"]
queue = settings["queue"]
state = settings["state"]
payload = jsdata["payload"]
tomato = payload.get("tomato", {})
pipeline = jsdata["pipeline"]
pip = pipeline["name"]
jobid = jsdata["jobid"]
queue = settings["queue"]
state = settings["state"]


verbosity = payload.get("tomato", {}).get("verbosity", "INFO")

verbosity = tomato.get("verbosity", "INFO")
loglevel = logging._checkLevel(verbosity)
logger.debug("setting logger verbosity to '%s'", verbosity)
logger.setLevel(loglevel)

pid = os.getpid()

logger.debug(f"assigning job '{jobid}' on pid '{pid}' into pipeline '{pip}'")
Expand All @@ -66,7 +70,28 @@ def tomato_job() -> None:
ret = driver_worker(settings, pipeline, payload, jobid, logfile, loglevel)

logger.info("==============================")
ready = payload.get("tomato", {}).get("unlock_when_done", False)

output = tomato.get("output", {})
prefix = output.get("prefix", f"results.{jobid}")
path = output.get("path", ".")
if os.path.exists(path):
assert os.path.isdir(path)
else:
os.makedirs(path)
dgfile = os.path.join(path, f"{prefix}.json")
logging.debug("creating a preset file '%s'", f"preset.{jobid}.json")
preset = get_yadg_preset(payload["method"], pipeline)
with open(f"preset.{jobid}.json", "w") as of:
json.dump(preset, of)

logging.info("running yadg to create a datagram in '%s'", dgfile)
command = ["yadg", "preset", "-pa", f"preset.{jobid}.json", jobfolder, dgfile]
logging.debug(" ".join(command))
subprocess.run(command, check=True)
logging.debug("removing the preset file '%s'", f"preset.{jobid}.json")
os.unlink(f"preset.{jobid}.json")

ready = tomato.get("unlock_when_done", False)
if ret is None:
logger.info("job finished successfully, setting status to 'c'")
dbhandler.job_set_status(queue["path"], "c", jobid, type=queue["type"])
Expand All @@ -84,7 +109,6 @@ def tomato_job() -> None:
dbhandler.job_set_time(queue["path"], "completed_at", jobid, type=queue["type"])



def driver_api(
driver: str,
command: str,
Expand Down Expand Up @@ -113,21 +137,13 @@ def data_poller(
log_worker_config(lq, loglevel)
log = logging.getLogger()
pollrate = kwargs.pop("pollrate", 10)
verbose = bool(kwargs.pop("verbose", 0))
log.debug(f"in 'data_poller', {pollrate=}")
cont = True
previous = None
while cont:
ts, done, metadata = driver_api(
ts, done, _ = driver_api(
driver, "get_status", jq, log, address, channel, **kwargs
)
if verbose:
isots = datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
isots = isots.replace(":", "")
fn = os.path.join(root, f"{device}_{isots}_status.json")
log.debug(f"'writing status info into '{fn}'")
with open(fn, "w") as of:
json.dump(metadata, of)
ts, nrows, data = driver_api(
driver, "get_data", jq, log, address, channel, **kwargs
)
Expand All @@ -154,10 +170,10 @@ def data_poller(


def driver_worker(
settings: dict,
pipeline: dict,
payload: dict,
jobid: int,
settings: dict,
pipeline: dict,
payload: dict,
jobid: int,
logfile: str,
loglevel: int,
) -> None:
Expand All @@ -183,7 +199,7 @@ def driver_worker(
log.info(f"{vi+1}: processing device '{v['tag']}' of type '{v['driver']}'")
drv, addr, ch, tag = v["driver"], v["address"], v["channel"], v["tag"]
dpar = settings["drivers"].get(drv, {})
pl = payload["method"][tag]
pl = [item for item in payload["method"] if item["device"] == v["tag"]]
smpl = payload["sample"]

log.debug(f"{vi+1}: getting status")
Expand All @@ -196,18 +212,12 @@ def driver_worker(
)
metadata["uts"] = start_ts

log.debug(f"{vi+1}: writing metadata")
isots = datetime.fromtimestamp(ts, tz=timezone.utc).isoformat().replace(":", "")
fn = os.path.join(root, f"{tag}_{isots}_status.json")
log.debug(f"{vi+1}: writing initial status")
fn = os.path.join(root, f"{tag}_status.json")
with open(fn, "w") as of:
json.dump(metadata, of)
kwargs = dpar
kwargs.update(
{
"pollrate": v.get("pollrate", 10),
"verbose": v.get("verbose", 0),
}
)
kwargs.update({"pollrate": v.get("pollrate", 10)})
log.info(f"{vi+1}: starting 'data_poller': every {kwargs['pollrate']}s")
p = multiprocessing.Process(
name=f"data_poller_{jobid}_{tag}",
Expand Down
13 changes: 7 additions & 6 deletions src/tomato/drivers/dummy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def _dummy_process(
queue: multiprocessing.Queue,
name: str = "random",
tech: str = "random",
delay: int = 1,
t: int = 10,
) -> None:
Expand All @@ -21,7 +21,7 @@ def _dummy_process(
nd += 1
data = {
"time": te - ts,
"value": random.random() if name == "random" else nd,
"value": random.random() if tech == "random" else nd,
}
queue.put(data)
time.sleep(1e-3)
Expand Down Expand Up @@ -142,19 +142,20 @@ def start_job(
"""
dt = datetime.now(timezone.utc)
logger.info("in 'dummy.start_job'")
logger.debug(f"{payload=}")
for ip, p in enumerate(payload):
delay = p.get("delay", 1)
t = p.get("time", 10)
name = p["name"]
tech = p["technique"]
logger.debug(
f"starting 'dummy._dummy_process' {ip} with {name=}, {t=}, {delay=}."
f"starting 'dummy._dummy_process' {ip} with {tech=}, {t=}, {delay=}."
)
pr = multiprocessing.Process(
target=_dummy_process, args=(jobqueue, name, delay, t)
target=_dummy_process, args=(jobqueue, tech, delay, t)
)
pr.start()
# Delay before quitting so that processes get a chance to start
time.sleep(1)
time.sleep(1)
return dt.timestamp()


Expand Down
2 changes: 1 addition & 1 deletion src/tomato/drivers/logger_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def log_listener(queue, configurer, path):
logger.handle(record)


def log_worker_config(queue, loglevel = logging.INFO):
def log_worker_config(queue, loglevel=logging.INFO):
h = logging.handlers.QueueHandler(queue)
root = logging.getLogger()
root.addHandler(h)
Expand Down
35 changes: 35 additions & 0 deletions src/tomato/drivers/yadg_funcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
_device_to_parser = {
"dummy": "dummy",
"biologic": "electrochem",
}


def get_yadg_preset(method: list[dict], pipeline: dict) -> dict:
preset = {
"metadata": {
"version": "4.1.1",
"provenance": {"type": "tomato"},
"timezone": "localtime",
},
"steps": [],
}

devices = {item["tag"]: item["driver"] for item in pipeline["devices"]}
for dev in set([item["device"] for item in method]):
step = {
"tag": dev,
"parser": _device_to_parser[devices[dev]],
"input": {"folders": ["."], "prefix": dev, "suffix": "data.json"},
"parameters": {"filetype": "tomato.json"},
"externaldate": {
"using": {
"file": {
"type": "json",
"path": f"{dev}_status.json",
"match": "uts"
}
},
}
}
preset["steps"].append(step)
return preset
2 changes: 1 addition & 1 deletion src/tomato/setlib/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _default_pipelines() -> dict[str, dict]:
"address": None,
"channels": [5, 10],
"driver": "dummy",
"capabilities": ["random"],
"capabilities": ["random", "sequential"],
"pollrate": 1,
}
],
Expand Down
Loading

0 comments on commit 3790280

Please sign in to comment.