From 9804919365dd579df4441672596196badd2bef25 Mon Sep 17 00:00:00 2001 From: Peter Kraus Date: Mon, 13 Jun 2022 10:25:57 +0200 Subject: [PATCH] Get `biologic` working in parallel (#28) * wrap in try-except * portalocker * get biologic working with locks * Add installation into docs. --- docs/source/index.rst | 22 ++++++ setup.py | 1 + src/tomato/drivers/biologic/main.py | 108 +++++++++++++++++----------- src/tomato/drivers/driver_funcs.py | 2 + 4 files changed, 92 insertions(+), 41 deletions(-) diff --git a/docs/source/index.rst b/docs/source/index.rst index f215494..dc0542a 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -13,6 +13,28 @@ Currently supported hardware is: - a Dummy device for testing - BioLogic potentiostats via the EC-Lib library +Installation +------------ +Pre-built wheels of **tomato** are available on `PyPI `_ +and can be installed using: + +.. code:: + + pip install tomato + +.. note:: + + We strongly recommend installing **tomato** into a separate conda environment. + Additionally, **tomato** depends on ``portalocker``, which can be installed from + conda's defaults channel. You can easily create a new conda environment and install + the required packages using: + + .. code:: + + conda create -n tomato python=3.9 portalocker git pip + pip install tomato + + Usage ----- diff --git a/setup.py b/setup.py index 06eae7d..64bfd1c 100644 --- a/setup.py +++ b/setup.py @@ -37,6 +37,7 @@ "psutil", "yadg>=4.1", "dgbowl_schemas>=105", + "portalocker", ], extras_require={ "testing": [ diff --git a/src/tomato/drivers/biologic/main.py b/src/tomato/drivers/biologic/main.py index 5b11842..e34b3c4 100644 --- a/src/tomato/drivers/biologic/main.py +++ b/src/tomato/drivers/biologic/main.py @@ -1,5 +1,7 @@ import logging import multiprocessing +import time +import portalocker from datetime import datetime, timezone @@ -17,6 +19,7 @@ def get_status( jobqueue: multiprocessing.Queue, logger: logging.Logger, dllpath: str = None, + lockpath: str = None, **kwargs: dict, ) -> tuple[float, dict]: """ @@ -43,22 +46,30 @@ def get_status( api = get_kbio_api(dllpath) metadata = {} metadata["dll_version"] = api.GetLibVersion() - logger.debug(f"connecting to '{address}:{channel}'") - id_, device_info = api.Connect(address) + with portalocker.Lock(lockpath, timeout=60) as fh: + try: + logger.info(f"connecting to '{address}:{channel}'") + id_, device_info = api.Connect(address) + logger.info(f"getting status of '{address}:{channel}'") + channel_info = api.GetChannelInfo(id_, channel) + logger.info(f"disconnecting from '{address}:{channel}'") + api.Disconnect(id_) + except Exception as e: + logger.critical(f"{e=}") metadata["device_model"] = device_info.model metadata["device_channels"] = device_info.NumberOfChannels - channel_info = api.GetChannelInfo(id_, channel) - dt = datetime.now(timezone.utc) metadata["channel_state"] = channel_info.state metadata["channel_board"] = channel_info.board metadata["channel_amp"] = channel_info.amplifier if channel_info.NbAmps else None metadata["channel_I_ranges"] = [channel_info.min_IRange, channel_info.max_IRange] - logger.debug(f"disconnecting from '{address}:{channel}'") - api.Disconnect(id_) - if metadata["channel_state"] in ["STOP"]: + if metadata["channel_state"] in {"STOP"}: ready = True - else: + elif metadata["channel_state"] in {"RUN"}: ready = False + else: + logger.critical("channel state not understood: '%s'", metadata["channel_state"]) + raise ValueError("channel state not understood") + dt = datetime.now(timezone.utc) return dt.timestamp(), ready, metadata @@ -68,6 +79,7 @@ def get_data( jobqueue: multiprocessing.Queue, logger: logging.Logger, dllpath: str = None, + lockpath: str = None, **kwargs: dict, ) -> tuple[float, dict]: """ @@ -91,13 +103,17 @@ def get_data( """ api = get_kbio_api(dllpath) - logger.debug(f"connecting to '{address}:{channel}'") - id_, device_info = api.Connect(address) - logger.debug(f"getting data") - data = api.GetData(id_, channel) + with portalocker.Lock(lockpath, timeout=60) as fh: + try: + logger.info(f"connecting to '{address}:{channel}'") + id_, device_info = api.Connect(address) + logger.info(f"getting data from '{address}:{channel}'") + data = api.GetData(id_, channel) + logger.info(f"disconnecting from '{address}:{channel}'") + api.Disconnect(id_) + except Exception as e: + logger.critical(f"{e=}") dt = datetime.now(timezone.utc) - logger.debug(f"disconnecting from '{address}:{channel}'") - api.Disconnect(id_) data = parse_raw_data(api, data, device_info.model) return dt.timestamp(), data["technique"]["data_rows"], data @@ -109,6 +125,7 @@ def start_job( logger: logging.Logger, payload: list[dict], dllpath: str = None, + lockpath: str = None, capacity: float = 0.0, **kwargs: dict, ) -> float: @@ -149,27 +166,31 @@ def start_job( logger.debug("translating payload to ECC") eccpars = payload_to_ecc(api, payload, capacity) ntechs = len(eccpars) - first = True - last = False - ti = 1 - logger.debug(f"connecting to '{address}:{channel}'") - id_, device_info = api.Connect(address) - for techname, pars in eccpars: - if ti == ntechs: - last = True - techfile = get_kbio_techpath(dllpath, techname, device_info.model) - logger.debug(f"loading technique {ti}: '{techname}'") - api.LoadTechnique( - id_, channel, techfile, pars, first=first, last=last, display=False - ) - ti += 1 - first = False - logger.debug(f"starting run on '{address}:{channel}'") - api.StartChannel(id_, channel) + with portalocker.Lock(lockpath, timeout=60) as fh: + try: + first = True + last = False + ti = 1 + logger.info(f"connecting to '{address}:{channel}'") + id_, device_info = api.Connect(address) + for techname, pars in eccpars: + if ti == ntechs: + last = True + techfile = get_kbio_techpath(dllpath, techname, device_info.model) + logger.info(f"loading technique {ti}: '{techname}'") + api.LoadTechnique( + id_, channel, techfile, pars, first=first, last=last, display=False + ) + ti += 1 + first = False + logger.info(f"starting run on '{address}:{channel}'") + api.StartChannel(id_, channel) + logger.info(f"disconnecting from '{address}:{channel}'") + api.Disconnect(id_) + except Exception as e: + logger.critical(f"{e=}") dt = datetime.now(timezone.utc) logger.info(f"run started at '{dt}'") - logger.debug(f"disconnecting from '{address}:{channel}'") - api.Disconnect(id_) return dt.timestamp() @@ -178,7 +199,8 @@ def stop_job( channel: int, jobqueue: multiprocessing.Queue, logger: multiprocessing.Queue, - dllpath: str, + dllpath: str = None, + lockpath: str = None, **kwargs: dict, ) -> float: """ @@ -205,12 +227,16 @@ def stop_job( """ api = get_kbio_api(dllpath) - logger.debug(f"connecting to '{address}:{channel}'") - id_, device_info = api.Connect(address) - logger.debug(f"stopping run on '{address}:{channel}'") - api.StopChannel(id_, channel) - dt = datetime.now(timezone.utc) - logger.info(f"run stopped at '{dt}'") - api.Disconnect(id_) + with portalocker.Lock(lockpath, timeout=60) as fh: + try: + logger.info(f"connecting to '{address}:{channel}'") + id_, device_info = api.Connect(address) + logger.info(f"stopping run on '{address}:{channel}'") + api.StopChannel(id_, channel) + logger.info(f"run stopped at '{dt}'") + api.Disconnect(id_) + except Exception as e: + logger.critical(f"{e=}") jobqueue.close() + dt = datetime.now(timezone.utc) return dt.timestamp() diff --git a/src/tomato/drivers/driver_funcs.py b/src/tomato/drivers/driver_funcs.py index cdf6fd1..f42687e 100644 --- a/src/tomato/drivers/driver_funcs.py +++ b/src/tomato/drivers/driver_funcs.py @@ -208,6 +208,7 @@ def driver_worker( log.debug(f"{vi+1}: getting status") ts, ready, metadata = driver_api(drv, "get_status", jq, log, addr, ch, **dpar) + log.debug(f"{ready=}") assert ready, f"Failed: device '{tag}' is not ready." log.debug(f"{vi+1}: starting payload") @@ -237,6 +238,7 @@ def driver_worker( ret = None for p in jobs: p.join() + log.debug(f"{p=}") if p.exitcode == 0: log.info(f"'data_poller' with pid {p.pid} closed successfully") else: