From ddf516d96d753f9670c2d6bb22aec2a7327e1316 Mon Sep 17 00:00:00 2001 From: falkecarlsen <11318702+falkecarlsen@users.noreply.github.com> Date: Mon, 12 Aug 2024 15:53:30 +0200 Subject: [PATCH 1/4] add state abstraction and wip on update lock for concurrency --- py_driver/driver.py | 96 +++++++++++++++++++++++++++++------------ py_driver/experiment.py | 6 +-- 2 files changed, 71 insertions(+), 31 deletions(-) diff --git a/py_driver/driver.py b/py_driver/driver.py index 4ea3297..89cc29a 100644 --- a/py_driver/driver.py +++ b/py_driver/driver.py @@ -1,7 +1,10 @@ import sys import threading -from datetime import timedelta +from dataclasses import dataclass, field +from datetime import timedelta, datetime from time import sleep, time +from typing import Optional + import serial import utils @@ -47,15 +50,24 @@ def print_blue(text): print(f"{ColorPrinting.OKBLUE}{text}{ColorPrinting.ENDC}") +@dataclass class ClaireState: """ The state of the Claire demonstrator. Can be used to cache the state. """ + Tube1_sonar_dist_mm: Optional[float] = None + Tube2_sonar_dist_mm: Optional[float] = None + Tube1_inflow_duty: Optional[int] = None + Tube1_outflow_duty: Optional[int] = None + Tube2_inflow_duty: Optional[int] = None + Tube2_outflow_duty: Optional[int] = None + Stream_inflow_duty: Optional[int] = None + Stream_outflow_duty: Optional[int] = None + dynamic: Optional[bool] = None + last_update: datetime = datetime.now() def __init__(self): - self.state = None - self.outdated = True - self.last_update = 0 + self.dynamic = None def set_state(self, state): """ @@ -63,13 +75,32 @@ def set_state(self, state): :param state: The new state to cache. """ - self.state = state - self.outdated = state["Tube1_inflow_duty"] or state["Tube1_outflow_duty"] or state["Tube2_inflow_duty"] or \ - state["Tube2_outflow_duty"] + try: + self.state = state + for key, value in state.items(): + if hasattr(self, key): + setattr(self, key, value) + self.last_update = datetime.now() + self.dynamic = state["Tube1_inflow_duty"] or state["Tube1_outflow_duty"] or state["Tube2_inflow_duty"] or \ + state["Tube2_outflow_duty"] + except Exception as e: + print(f"Exception occurred during state update: {e}") + raise # Re-raise the exception after handling + + def make_dynamic(self): + """ + Label the cached state as dynamic. + When the demonstrator is being acted upon, all state updates are outdated from when they are measured. + """ + self.dynamic = True - def make_outdated(self): - """Label the cached state as outdated.""" - self.outdated = True + @property + def tube1_dist(self) -> Optional[float]: + return self.Tube1_sonar_dist_mm + + @property + def tube2_dist(self) -> Optional[float]: + return self.Tube2_sonar_dist_mm class ClaireDevice: @@ -79,7 +110,7 @@ class ClaireDevice: def __init__(self, port): self.device = port - self.heartbeat = time() + self.heartbeat = time() # last time device was alive self.busy = True # initially unknown, therefore busy self.state = ClaireState() # read timeout in secs, 1 should be sufficient @@ -110,6 +141,10 @@ def __init__(self, port): sleep(1) self.check_version() + print(f'{TAG} Device initialized. Getting initial state...') + self.update_state() + + def alive(self): """Check if the device is still alive within bound.""" return time() - self.heartbeat < COMMUNICATION_TIMEOUT @@ -136,19 +171,19 @@ def _underflow_check(self): self.update_state() # check underflows - if self.state.state["Tube1_sonar_dist_mm"] < TUBE_MAX_LEVEL: + if self.state.Tube1_sonar_dist_mm < TUBE_MAX_LEVEL: # if outflow is active while inflow is stopped, error out - if self.state.state["Tube1_outflow_duty"] > 0 and self.state.state["Tube1_inflow_duty"] == 0: + if self.state.Tube1_outflow_duty > 0 and self.state.Tube1_inflow_duty == 0: self.set_outflow(1, 0) print( - f'{TAG}: WARN: Low water level detected in tube 1: {self.state.state["Tube1_sonar_dist_mm"]}. Stopped outflow') + f'{TAG}: WARN: Low water level detected in tube 1: {self.state.Tube1_sonar_dist_mm}. Stopped outflow') - elif self.state.state["Tube2_sonar_dist_mm"] < TUBE_MAX_LEVEL: + elif self.state.Tube2_sonar_dist_mm < TUBE_MAX_LEVEL: # if outflow is active while inflow is stopped, error out - if self.state.state["Tube2_outflow_duty"] > 0 and self.state.state["Tube2_inflow_duty"] == 0: + if self.state.Tube2_outflow_duty > 0 and self.state.Tube2_inflow_duty == 0: self.set_outflow(2, 0) print( - f'{TAG}: WARN: Low water level detected in tube 2: {self.state.state["Tube2_sonar_dist_mm"]}. Stopped outflow') + f'{TAG}: WARN: Low water level detected in tube 2: {self.state.Tube2_sonar_dist_mm}. Stopped outflow') else: if DEBUG: @@ -237,9 +272,9 @@ def check_version(self): def update_state(self, tube=None, quick=False): """Get the last state of the device. If cached state is outdated, a new sensor reading is requested.""" - # Return cached state if not outdated. - if not self.state.outdated and self.state.last_update >= time() - COMMUNICATION_TIMEOUT: - return self.state.state + # Return cached state if not outdated nor unstable. + if not self.state.dynamic and self.state.last_update >= datetime.now() - timedelta(COMMUNICATION_TIMEOUT): + return self.state # Ask for new state reading. size_buffer = self.last_printed_buf_line @@ -256,12 +291,16 @@ def update_state(self, tube=None, quick=False): else: arg += ";" + # while busy, wait + while not self.ready(): + sleep(1) + self.write(arg) # Wait for the state to be received. total_wait = 0 while True: - # Fixme: not robust + # Fixme: not robust looking for { if self.last_printed_buf_line > size_buffer and self.read_buffer[-2][0] == '{': # If we received a line starting with {, we have received the new state. break @@ -284,7 +323,8 @@ def update_state(self, tube=None, quick=False): state["Tube2_sonar_dist_mm"] = round(self.convert_distance_to_level(state["Tube2_sonar_dist_mm"]), 1) self.state = ClaireState() self.state.set_state(state) - return state + return True + return False def get_last_raw_state(self): """Get the last raw state of the device without polling.""" @@ -299,9 +339,9 @@ def get_last_raw_state(self): def print_state(self): """Print state of the system.""" # seconds since state was grabbed - if self.state.state: - old = timedelta(seconds=time() - self.state.last_update) - print(f'{TAG} State ({old} secs old): {self.state}') + if self.state: + old = datetime.now() - self.state.last_update + print(f'{TAG} State ({old} old): {self.state}') else: print(f'{TAG} State: N/A') @@ -338,7 +378,7 @@ def set_water_level(self, tube, level): sleep(1) self.write(f"5 {tube} {self.convert_level_to_distance(level)};") self.busy = True - self.state.make_outdated() + self.state.make_dynamic() def set_inflow(self, tube, rate): """ @@ -353,7 +393,7 @@ def set_inflow(self, tube, rate): sleep(1) pump = (tube - 1) * 2 + 1 self.write(f"4 {pump} {rate};") - self.state.make_outdated() + self.state.make_dynamic() def set_outflow(self, tube, rate): """ @@ -368,7 +408,7 @@ def set_outflow(self, tube, rate): sleep(1) pump = tube * 2 self.write(f"4 {pump} {rate};") - self.state.make_outdated() + self.state.make_dynamic() @staticmethod def convert_distance_to_level(distance): diff --git a/py_driver/experiment.py b/py_driver/experiment.py index 487ab1f..77fc083 100644 --- a/py_driver/experiment.py +++ b/py_driver/experiment.py @@ -3,15 +3,15 @@ # Insert the name of the usb port, which might be different for different devices. # An easy way to get the port name is to use the Arduino IDE. -# PORT = '/dev/ttyUSB0' -PORT = '/dev/cu.usbserial-1420' +PORT = '/dev/ttyUSB1' +#PORT = '/dev/cu.usbserial-1420' def example_experiment(): claire = driver.ClaireDevice(PORT) state = claire.update_state() # get current state of device claire.print_state() - print(f'Current height of TUBE1: {state["Tube1_sonar_dist_mm"]}') + print(f'Current height of TUBE1: {state.Tube1_sonar_dist_mm}') claire.set_inflow(1, 100) sleep(3) From 8a18f2aa0881ce1eb3c59ea85a182708e821b7c5 Mon Sep 17 00:00:00 2001 From: falkecarlsen <11318702+falkecarlsen@users.noreply.github.com> Date: Wed, 14 Aug 2024 10:37:05 +0200 Subject: [PATCH 2/4] refactor state util functions onto ClaireState --- py_driver/driver.py | 61 ++++++++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/py_driver/driver.py b/py_driver/driver.py index 89cc29a..ce6f8e7 100644 --- a/py_driver/driver.py +++ b/py_driver/driver.py @@ -66,8 +66,9 @@ class ClaireState: dynamic: Optional[bool] = None last_update: datetime = datetime.now() - def __init__(self): + def __init__(self, state): self.dynamic = None + self.set_state(state) def set_state(self, state): """ @@ -94,13 +95,33 @@ def make_dynamic(self): """ self.dynamic = True + @staticmethod + def convert_distance_to_level(distance): + """ + Convert sensor distance to water level. + + :param distance: The distance from the sensor to the measured water surface. + """ + if distance < 0: + raise SensorError() + return TUBE_MAX_LEVEL - distance + + @staticmethod + def convert_level_to_distance(level): + """ + Convert water level to sensor distance. + + :param level: The water level to convert. + """ + return TUBE_MAX_LEVEL - level + @property - def tube1_dist(self) -> Optional[float]: - return self.Tube1_sonar_dist_mm + def tube1_level(self) -> Optional[float]: + return self.convert_distance_to_level(self.Tube1_sonar_dist_mm) @property - def tube2_dist(self) -> Optional[float]: - return self.Tube2_sonar_dist_mm + def tube2_level(self) -> Optional[float]: + return self.convert_distance_to_level(self.Tube2_sonar_dist_mm) class ClaireDevice: @@ -144,7 +165,6 @@ def __init__(self, port): print(f'{TAG} Device initialized. Getting initial state...') self.update_state() - def alive(self): """Check if the device is still alive within bound.""" return time() - self.heartbeat < COMMUNICATION_TIMEOUT @@ -319,10 +339,9 @@ def update_state(self, tube=None, quick=False): state = self.get_last_raw_state() if state: # Convert distance to water level - state["Tube1_sonar_dist_mm"] = round(self.convert_distance_to_level(state["Tube1_sonar_dist_mm"]), 1) - state["Tube2_sonar_dist_mm"] = round(self.convert_distance_to_level(state["Tube2_sonar_dist_mm"]), 1) - self.state = ClaireState() - self.state.set_state(state) + state["Tube1_sonar_dist_mm"] = round(self.state.convert_distance_to_level(state["Tube1_sonar_dist_mm"]), 1) + state["Tube2_sonar_dist_mm"] = round(self.state.convert_distance_to_level(state["Tube2_sonar_dist_mm"]), 1) + self.state = ClaireState(state) return True return False @@ -376,7 +395,7 @@ def set_water_level(self, tube, level): assert 0 <= level <= TUBE_MAX_LEVEL while not self.ready(): sleep(1) - self.write(f"5 {tube} {self.convert_level_to_distance(level)};") + self.write(f"5 {tube} {self.state.convert_level_to_distance(level)};") self.busy = True self.state.make_dynamic() @@ -410,26 +429,6 @@ def set_outflow(self, tube, rate): self.write(f"4 {pump} {rate};") self.state.make_dynamic() - @staticmethod - def convert_distance_to_level(distance): - """ - Convert sensor distance to water level. - - :param distance: The distance from the sensor to the measured water surface. - """ - if distance < 0: - raise SensorError() - return TUBE_MAX_LEVEL - distance - - @staticmethod - def convert_level_to_distance(level): - """ - Convert water level to sensor distance. - - :param level: The water level to convert. - """ - return TUBE_MAX_LEVEL - level - def wait_until_free(self): """Wait until the device is free.""" while True: From 09633b3a478d94e2d963754a6056854ea20bb0aa Mon Sep 17 00:00:00 2001 From: falkecarlsen <11318702+falkecarlsen@users.noreply.github.com> Date: Wed, 14 Aug 2024 11:26:45 +0200 Subject: [PATCH 3/4] refactor state updating logic to avoid timeout in threaded behaviour --- py_driver/driver.py | 111 ++++++++++++++++++++++---------------------- 1 file changed, 56 insertions(+), 55 deletions(-) diff --git a/py_driver/driver.py b/py_driver/driver.py index ce6f8e7..63fca29 100644 --- a/py_driver/driver.py +++ b/py_driver/driver.py @@ -172,6 +172,62 @@ def alive(self): def ready(self): return not self.busy + def update_state(self, tube=None, quick=False): + """Get the last state of the device. If cached state is outdated, a new sensor reading is requested.""" + # Return cached state if not outdated nor unstable. + if not self.state.dynamic and self.state.last_update >= datetime.now() - timedelta(COMMUNICATION_TIMEOUT): + return self.state + + arg = "" + + if quick: + arg += "2" + else: + arg += "1" + + if tube: + arg += f" {tube};" + else: + arg += ";" + + # while busy, wait + while not self.ready(): + sleep(1) + + # Ask for new state reading. + size_buffer = self.last_printed_buf_line + + self.write(arg) + self.busy = True + + # Wait for the state to be received. + total_wait = 0 + while True: + # wait for device to be ready again after requesting state + while not self.ready(): + sleep(0.1) + + # todo: not robust looking for { + if self.last_printed_buf_line > size_buffer and self.read_buffer[-2][0] == '{': + # If we received a line starting with {, we have received the new state. + break + + sleep(0.1) + total_wait += 0.1 + + if total_wait > COMMUNICATION_TIMEOUT and not self.busy: + raise RuntimeError("Waiting too long for state to be communicated.") + + # New state retrieved, parse it. + state = self.get_last_raw_state() + if state: + # Convert distance to water level + state["Tube1_sonar_dist_mm"] = round(self.state.convert_distance_to_level(state["Tube1_sonar_dist_mm"]), 1) + state["Tube2_sonar_dist_mm"] = round(self.state.convert_distance_to_level(state["Tube2_sonar_dist_mm"]), 1) + self.state = ClaireState(state) + return True + return False + def _underflow_check(self): TAG = "UNDERFLOW_CHECK" while True: @@ -290,61 +346,6 @@ def check_version(self): # check if device is ready assert self.ready() - def update_state(self, tube=None, quick=False): - """Get the last state of the device. If cached state is outdated, a new sensor reading is requested.""" - # Return cached state if not outdated nor unstable. - if not self.state.dynamic and self.state.last_update >= datetime.now() - timedelta(COMMUNICATION_TIMEOUT): - return self.state - - # Ask for new state reading. - size_buffer = self.last_printed_buf_line - - arg = "" - - if quick: - arg += "2" - else: - arg += "1" - - if tube: - arg += f" {tube};" - else: - arg += ";" - - # while busy, wait - while not self.ready(): - sleep(1) - - self.write(arg) - - # Wait for the state to be received. - total_wait = 0 - while True: - # Fixme: not robust looking for { - if self.last_printed_buf_line > size_buffer and self.read_buffer[-2][0] == '{': - # If we received a line starting with {, we have received the new state. - break - - sleep(0.1) - # do not incur waiting time if device is busy - if not self.ready(): - continue - - total_wait += 0.1 - - if total_wait > COMMUNICATION_TIMEOUT and not self.busy: - raise RuntimeError("Waiting too long for state to be communicated.") - - # New state retrieved, parse it. - state = self.get_last_raw_state() - if state: - # Convert distance to water level - state["Tube1_sonar_dist_mm"] = round(self.state.convert_distance_to_level(state["Tube1_sonar_dist_mm"]), 1) - state["Tube2_sonar_dist_mm"] = round(self.state.convert_distance_to_level(state["Tube2_sonar_dist_mm"]), 1) - self.state = ClaireState(state) - return True - return False - def get_last_raw_state(self): """Get the last raw state of the device without polling.""" # take buf backwards and try to coerce every line into dict From c18bd6ae8b1fc6480ab4bc0389d0826030f6b921 Mon Sep 17 00:00:00 2001 From: falkecarlsen <11318702+falkecarlsen@users.noreply.github.com> Date: Wed, 14 Aug 2024 13:34:32 +0200 Subject: [PATCH 4/4] fixup update_time and state init --- py_driver/driver.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/py_driver/driver.py b/py_driver/driver.py index 63fca29..ed101a4 100644 --- a/py_driver/driver.py +++ b/py_driver/driver.py @@ -1,6 +1,6 @@ import sys import threading -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import timedelta, datetime from time import sleep, time from typing import Optional @@ -64,7 +64,7 @@ class ClaireState: Stream_inflow_duty: Optional[int] = None Stream_outflow_duty: Optional[int] = None dynamic: Optional[bool] = None - last_update: datetime = datetime.now() + last_update: datetime = datetime.now() - timedelta(hours=1) def __init__(self, state): self.dynamic = None @@ -130,10 +130,9 @@ class ClaireDevice: """ def __init__(self, port): + self.state = None self.device = port - self.heartbeat = time() # last time device was alive self.busy = True # initially unknown, therefore busy - self.state = ClaireState() # read timeout in secs, 1 should be sufficient # exclusive only available on posix-like systems, assumes mac-env is posix-like @@ -163,6 +162,7 @@ def __init__(self, port): self.check_version() print(f'{TAG} Device initialized. Getting initial state...') + self.heartbeat = time() # last time device was alive self.update_state() def alive(self):