Skip to content

Commit

Permalink
ketchup: json string switch and unready function (#80)
Browse files Browse the repository at this point in the history
* Ketchup: add switch -J for receiving/sending json strings in submit and status

* black formatting

* Bugfix status of specific job

zip(*running) would fail if no cells were running, now only does this if running is not empty. Also delete extraneous print statements.

* Add 'unready' function to ketchup

* Add ketchup unready to ketchup function list

* Fix typo

* Bugfix, simplify ketchup ready unready

* PR #80 review changes
Refactor ketchup status
-J argument only on status and submit functions

* Test on 0.2.x branch

* ketchup status x print as yaml

* Remove memory filled warning from driver_funcs

Memory filled/size is not guaranteed by all drivers, should be made driver-specific

* Attempt to read status directly from get_data, if not fall back to get_status

* Bugfix checking if data["current"]["status"] exists

* Bugfixfix last bugfix still didn't work if data is None

* Bugfixfixfix just use a try except block

Somehow data was NoneType but did not short circuit the previous 'if data and ....' statement

* Bugfix example-counter early job termination

Change get_status and get_data order to avoid race conditions when polling rate is the same as test duration

---------

Co-authored-by: Peter Kraus <[email protected]>
  • Loading branch information
g-kimbell and PeterKraus authored Jun 6, 2024
1 parent 9f92a16 commit fb93703
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 106 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/pull-request-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: pull-request-commit
on:
pull_request:
branches:
- master
- 0.2.x
jobs:
build:
strategy:
Expand All @@ -20,7 +20,7 @@ jobs:
- uses: ./.github/workflows/before-job
- uses: ./.github/workflows/build-job
- uses: actions/upload-artifact@v2
with:
with:
name: dist-${{ matrix.os }}-${{ matrix.pyver }}
path: dist
test:
Expand Down Expand Up @@ -72,6 +72,6 @@ jobs:
subprocess.run(["pip", "install", f"{fn}[docs]"])
- uses: ./.github/workflows/docs-job
- uses: actions/upload-artifact@v2
with:
with:
name: public-${{ matrix.os }}-${{ matrix.pyver }}
path: public
9 changes: 9 additions & 0 deletions docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ To manage *samples* in the *pipelines*, use the following :mod:`~tomato.ketchup`
has been implemented to allow the user to investigate the *sample* and/or *pipeline*
for any faults.

#. **To mark** a *pipeline* as **not ready**, run:

.. code-block:: bash
>>> ketchup unready <pipeline>
This command will mark the *pipeline* as not ready, any *jobs* that are submitted
will not be run automatically.

.. note::

Further information about :mod:`~tomato.ketchup` is available in the documentation
Expand Down
59 changes: 23 additions & 36 deletions src/tomato/drivers/driver_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,30 +143,30 @@ def data_poller(
stops_required = kwargs.get("stops_required", 2)
stops_received = 0
log.debug(f"in 'data_poller', {pollrate=}")
_, _, metadata = driver_api(
driver, "get_status", jq, log, address, channel, **kwargs
)
mem_size = metadata["mem_size"]
done = False
finished_polling = False
previous = None
while not done:
while True:
data = {}
while not finished_polling:
try: # try to get status from last get_data, otherwise use get_status
status = data["current"]["status"]
if status in ["RUN", "PAUSE"]:
stop = False
else:
stop = True
if status != "STOP":
log.info(
f"device '{device}' status '{status}' not understood, expected 'RUN' 'PAUSE' or 'STOP'"
)
except (TypeError, KeyError): # data["current"]["status"] doesn't exist
ts, stop, _ = driver_api(
driver, "get_status", jq, log, address, channel, **kwargs
)
while True: # get data until no more data is available
ts, nrows, data = driver_api(
driver, "get_data", jq, log, address, channel, **kwargs
)
data["previous"] = previous
previous = data["current"]
mem_filled = data["current"]["mem_filled"]
f_mem_filled = mem_filled / mem_size
if f_mem_filled > 0.995:
log.critical(
f"{device} {address}:{channel} memory is full, data is being lost"
)
elif f_mem_filled > 0.8:
log.warning(
f"{device} {address}:{channel} memory is {f_mem_filled*100:.1f}% full"
)

if nrows > 0:
isots = datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
isots = isots.replace(":", "")
Expand All @@ -175,26 +175,13 @@ def data_poller(
with open(fn, "w") as of:
json.dump(data, of)
else:
status = data["current"]["status"]
if status == "STOP":
stops_received += 1
log.debug(
f"{address}:{channel} has given {stops_received} 'STOP' statuses in a row"
)
elif status in ["RUN", "PAUSE"]:
stops_received = 0
else:
stops_received += 1
log.critical(
f"get_data status not understood: '{status}', counting as 'STOP' status {stops_received}"
)
break
if stops_received >= stops_required:
done = True
log.info(
f"device '{device}' has stopped polling after {stops_required} consecutive 'STOP' statuses"
)
if stop:
stops_received += 1
if stops_received >= stops_required:
finished_polling = True
else:
stops_received = 0
time.sleep(pollrate)
log.info(f"rejoining main thread")
return
Expand Down
2 changes: 2 additions & 0 deletions src/tomato/ketchup/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- :func:`.load` to load a *sample* into a *pipeline*
- :func:`.eject` to remove any *sample* present in a *pipeline*
- :func:`.ready` to mark a *pipeline* as ready
- :func:`.unready` to mark a *pipeline* as not ready
"""
Expand All @@ -29,6 +30,7 @@
load,
eject,
ready,
unready,
snapshot,
search,
cancel_all,
Expand Down
Loading

0 comments on commit fb93703

Please sign in to comment.