Skip to content

Commit

Permalink
Get biologic working in parallel (#28)
Browse files Browse the repository at this point in the history
* wrap in try-except

* portalocker

* get biologic working with locks

* Add installation into docs.
  • Loading branch information
Peter Kraus authored Jun 13, 2022
1 parent 682859c commit 9804919
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 41 deletions.
22 changes: 22 additions & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,28 @@ Currently supported hardware is:
- a Dummy device for testing
- BioLogic potentiostats via the EC-Lib library

Installation
------------
Pre-built wheels of **tomato** are available on `PyPI <https://pypi.org/project/tomato/>`_
and can be installed using:

.. code::
pip install tomato
.. note::

We strongly recommend installing **tomato** into a separate conda environment.
Additionally, **tomato** depends on ``portalocker``, which can be installed from
conda's defaults channel. You can easily create a new conda environment and install
the required packages using:

.. code::
conda create -n tomato python=3.9 portalocker git pip
pip install tomato
Usage
-----

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"psutil",
"yadg>=4.1",
"dgbowl_schemas>=105",
"portalocker",
],
extras_require={
"testing": [
Expand Down
108 changes: 67 additions & 41 deletions src/tomato/drivers/biologic/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import multiprocessing
import time
import portalocker

from datetime import datetime, timezone

Expand All @@ -17,6 +19,7 @@ def get_status(
jobqueue: multiprocessing.Queue,
logger: logging.Logger,
dllpath: str = None,
lockpath: str = None,
**kwargs: dict,
) -> tuple[float, dict]:
"""
Expand All @@ -43,22 +46,30 @@ def get_status(
api = get_kbio_api(dllpath)
metadata = {}
metadata["dll_version"] = api.GetLibVersion()
logger.debug(f"connecting to '{address}:{channel}'")
id_, device_info = api.Connect(address)
with portalocker.Lock(lockpath, timeout=60) as fh:
try:
logger.info(f"connecting to '{address}:{channel}'")
id_, device_info = api.Connect(address)
logger.info(f"getting status of '{address}:{channel}'")
channel_info = api.GetChannelInfo(id_, channel)
logger.info(f"disconnecting from '{address}:{channel}'")
api.Disconnect(id_)
except Exception as e:
logger.critical(f"{e=}")
metadata["device_model"] = device_info.model
metadata["device_channels"] = device_info.NumberOfChannels
channel_info = api.GetChannelInfo(id_, channel)
dt = datetime.now(timezone.utc)
metadata["channel_state"] = channel_info.state
metadata["channel_board"] = channel_info.board
metadata["channel_amp"] = channel_info.amplifier if channel_info.NbAmps else None
metadata["channel_I_ranges"] = [channel_info.min_IRange, channel_info.max_IRange]
logger.debug(f"disconnecting from '{address}:{channel}'")
api.Disconnect(id_)
if metadata["channel_state"] in ["STOP"]:
if metadata["channel_state"] in {"STOP"}:
ready = True
else:
elif metadata["channel_state"] in {"RUN"}:
ready = False
else:
logger.critical("channel state not understood: '%s'", metadata["channel_state"])
raise ValueError("channel state not understood")
dt = datetime.now(timezone.utc)
return dt.timestamp(), ready, metadata


Expand All @@ -68,6 +79,7 @@ def get_data(
jobqueue: multiprocessing.Queue,
logger: logging.Logger,
dllpath: str = None,
lockpath: str = None,
**kwargs: dict,
) -> tuple[float, dict]:
"""
Expand All @@ -91,13 +103,17 @@ def get_data(
"""
api = get_kbio_api(dllpath)
logger.debug(f"connecting to '{address}:{channel}'")
id_, device_info = api.Connect(address)
logger.debug(f"getting data")
data = api.GetData(id_, channel)
with portalocker.Lock(lockpath, timeout=60) as fh:
try:
logger.info(f"connecting to '{address}:{channel}'")
id_, device_info = api.Connect(address)
logger.info(f"getting data from '{address}:{channel}'")
data = api.GetData(id_, channel)
logger.info(f"disconnecting from '{address}:{channel}'")
api.Disconnect(id_)
except Exception as e:
logger.critical(f"{e=}")
dt = datetime.now(timezone.utc)
logger.debug(f"disconnecting from '{address}:{channel}'")
api.Disconnect(id_)
data = parse_raw_data(api, data, device_info.model)
return dt.timestamp(), data["technique"]["data_rows"], data

Expand All @@ -109,6 +125,7 @@ def start_job(
logger: logging.Logger,
payload: list[dict],
dllpath: str = None,
lockpath: str = None,
capacity: float = 0.0,
**kwargs: dict,
) -> float:
Expand Down Expand Up @@ -149,27 +166,31 @@ def start_job(
logger.debug("translating payload to ECC")
eccpars = payload_to_ecc(api, payload, capacity)
ntechs = len(eccpars)
first = True
last = False
ti = 1
logger.debug(f"connecting to '{address}:{channel}'")
id_, device_info = api.Connect(address)
for techname, pars in eccpars:
if ti == ntechs:
last = True
techfile = get_kbio_techpath(dllpath, techname, device_info.model)
logger.debug(f"loading technique {ti}: '{techname}'")
api.LoadTechnique(
id_, channel, techfile, pars, first=first, last=last, display=False
)
ti += 1
first = False
logger.debug(f"starting run on '{address}:{channel}'")
api.StartChannel(id_, channel)
with portalocker.Lock(lockpath, timeout=60) as fh:
try:
first = True
last = False
ti = 1
logger.info(f"connecting to '{address}:{channel}'")
id_, device_info = api.Connect(address)
for techname, pars in eccpars:
if ti == ntechs:
last = True
techfile = get_kbio_techpath(dllpath, techname, device_info.model)
logger.info(f"loading technique {ti}: '{techname}'")
api.LoadTechnique(
id_, channel, techfile, pars, first=first, last=last, display=False
)
ti += 1
first = False
logger.info(f"starting run on '{address}:{channel}'")
api.StartChannel(id_, channel)
logger.info(f"disconnecting from '{address}:{channel}'")
api.Disconnect(id_)
except Exception as e:
logger.critical(f"{e=}")
dt = datetime.now(timezone.utc)
logger.info(f"run started at '{dt}'")
logger.debug(f"disconnecting from '{address}:{channel}'")
api.Disconnect(id_)
return dt.timestamp()


Expand All @@ -178,7 +199,8 @@ def stop_job(
channel: int,
jobqueue: multiprocessing.Queue,
logger: multiprocessing.Queue,
dllpath: str,
dllpath: str = None,
lockpath: str = None,
**kwargs: dict,
) -> float:
"""
Expand All @@ -205,12 +227,16 @@ def stop_job(
"""
api = get_kbio_api(dllpath)
logger.debug(f"connecting to '{address}:{channel}'")
id_, device_info = api.Connect(address)
logger.debug(f"stopping run on '{address}:{channel}'")
api.StopChannel(id_, channel)
dt = datetime.now(timezone.utc)
logger.info(f"run stopped at '{dt}'")
api.Disconnect(id_)
with portalocker.Lock(lockpath, timeout=60) as fh:
try:
logger.info(f"connecting to '{address}:{channel}'")
id_, device_info = api.Connect(address)
logger.info(f"stopping run on '{address}:{channel}'")
api.StopChannel(id_, channel)
logger.info(f"run stopped at '{dt}'")
api.Disconnect(id_)
except Exception as e:
logger.critical(f"{e=}")
jobqueue.close()
dt = datetime.now(timezone.utc)
return dt.timestamp()
2 changes: 2 additions & 0 deletions src/tomato/drivers/driver_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def driver_worker(

log.debug(f"{vi+1}: getting status")
ts, ready, metadata = driver_api(drv, "get_status", jq, log, addr, ch, **dpar)
log.debug(f"{ready=}")
assert ready, f"Failed: device '{tag}' is not ready."

log.debug(f"{vi+1}: starting payload")
Expand Down Expand Up @@ -237,6 +238,7 @@ def driver_worker(
ret = None
for p in jobs:
p.join()
log.debug(f"{p=}")
if p.exitcode == 0:
log.info(f"'data_poller' with pid {p.pid} closed successfully")
else:
Expand Down

0 comments on commit 9804919

Please sign in to comment.