Skip to content

Commit

Permalink
Merge pull request #54 from DEIS-Tools/state-abstraction
Browse files Browse the repository at this point in the history
Add state abstraction and update concurrency fixes
  • Loading branch information
falkecarlsen authored Aug 14, 2024
2 parents dcad45a + c18bd6a commit 19afeea
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 101 deletions.
236 changes: 138 additions & 98 deletions py_driver/driver.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import sys
import threading
from datetime import timedelta
from dataclasses import dataclass
from datetime import timedelta, datetime
from time import sleep, time
from typing import Optional

import serial
import utils

Expand Down Expand Up @@ -47,29 +50,78 @@ def print_blue(text):
print(f"{ColorPrinting.OKBLUE}{text}{ColorPrinting.ENDC}")


@dataclass
class ClaireState:
"""
The state of the Claire demonstrator. Can be used to cache the state.
"""

def __init__(self):
self.state = None
self.outdated = True
self.last_update = 0
Tube1_sonar_dist_mm: Optional[float] = None
Tube2_sonar_dist_mm: Optional[float] = None
Tube1_inflow_duty: Optional[int] = None
Tube1_outflow_duty: Optional[int] = None
Tube2_inflow_duty: Optional[int] = None
Tube2_outflow_duty: Optional[int] = None
Stream_inflow_duty: Optional[int] = None
Stream_outflow_duty: Optional[int] = None
dynamic: Optional[bool] = None
last_update: datetime = datetime.now() - timedelta(hours=1)

def __init__(self, state):
self.dynamic = None
self.set_state(state)

def set_state(self, state):
"""
Set the cached state to the provided state. Assumes that the provided state is actual.
:param state: The new state to cache.
"""
self.state = state
self.outdated = state["Tube1_inflow_duty"] or state["Tube1_outflow_duty"] or state["Tube2_inflow_duty"] or \
state["Tube2_outflow_duty"]
try:
self.state = state
for key, value in state.items():
if hasattr(self, key):
setattr(self, key, value)
self.last_update = datetime.now()
self.dynamic = state["Tube1_inflow_duty"] or state["Tube1_outflow_duty"] or state["Tube2_inflow_duty"] or \
state["Tube2_outflow_duty"]
except Exception as e:
print(f"Exception occurred during state update: {e}")
raise # Re-raise the exception after handling

def make_dynamic(self):
"""
Label the cached state as dynamic.
When the demonstrator is being acted upon, all state updates are outdated from when they are measured.
"""
self.dynamic = True

def make_outdated(self):
"""Label the cached state as outdated."""
self.outdated = True
@staticmethod
def convert_distance_to_level(distance):
"""
Convert sensor distance to water level.
:param distance: The distance from the sensor to the measured water surface.
"""
if distance < 0:
raise SensorError()
return TUBE_MAX_LEVEL - distance

@staticmethod
def convert_level_to_distance(level):
"""
Convert water level to sensor distance.
:param level: The water level to convert.
"""
return TUBE_MAX_LEVEL - level

@property
def tube1_level(self) -> Optional[float]:
return self.convert_distance_to_level(self.Tube1_sonar_dist_mm)

@property
def tube2_level(self) -> Optional[float]:
return self.convert_distance_to_level(self.Tube2_sonar_dist_mm)


class ClaireDevice:
Expand All @@ -78,10 +130,9 @@ class ClaireDevice:
"""

def __init__(self, port):
self.state = None
self.device = port
self.heartbeat = time()
self.busy = True # initially unknown, therefore busy
self.state = ClaireState()
# read timeout in secs, 1 should be sufficient

# exclusive only available on posix-like systems, assumes mac-env is posix-like
Expand Down Expand Up @@ -110,13 +161,73 @@ def __init__(self, port):
sleep(1)
self.check_version()

print(f'{TAG} Device initialized. Getting initial state...')
self.heartbeat = time() # last time device was alive
self.update_state()

def alive(self):
"""Check if the device is still alive within bound."""
return time() - self.heartbeat < COMMUNICATION_TIMEOUT

def ready(self):
return not self.busy

def update_state(self, tube=None, quick=False):
"""Get the last state of the device. If cached state is outdated, a new sensor reading is requested."""
# Return cached state if not outdated nor unstable.
if not self.state.dynamic and self.state.last_update >= datetime.now() - timedelta(COMMUNICATION_TIMEOUT):
return self.state

arg = ""

if quick:
arg += "2"
else:
arg += "1"

if tube:
arg += f" {tube};"
else:
arg += ";"

# while busy, wait
while not self.ready():
sleep(1)

# Ask for new state reading.
size_buffer = self.last_printed_buf_line

self.write(arg)
self.busy = True

# Wait for the state to be received.
total_wait = 0
while True:
# wait for device to be ready again after requesting state
while not self.ready():
sleep(0.1)

# todo: not robust looking for {
if self.last_printed_buf_line > size_buffer and self.read_buffer[-2][0] == '{':
# If we received a line starting with {, we have received the new state.
break

sleep(0.1)
total_wait += 0.1

if total_wait > COMMUNICATION_TIMEOUT and not self.busy:
raise RuntimeError("Waiting too long for state to be communicated.")

# New state retrieved, parse it.
state = self.get_last_raw_state()
if state:
# Convert distance to water level
state["Tube1_sonar_dist_mm"] = round(self.state.convert_distance_to_level(state["Tube1_sonar_dist_mm"]), 1)
state["Tube2_sonar_dist_mm"] = round(self.state.convert_distance_to_level(state["Tube2_sonar_dist_mm"]), 1)
self.state = ClaireState(state)
return True
return False

def _underflow_check(self):
TAG = "UNDERFLOW_CHECK"
while True:
Expand All @@ -136,19 +247,19 @@ def _underflow_check(self):
self.update_state()

# check underflows
if self.state.state["Tube1_sonar_dist_mm"] < TUBE_MAX_LEVEL:
if self.state.Tube1_sonar_dist_mm < TUBE_MAX_LEVEL:
# if outflow is active while inflow is stopped, error out
if self.state.state["Tube1_outflow_duty"] > 0 and self.state.state["Tube1_inflow_duty"] == 0:
if self.state.Tube1_outflow_duty > 0 and self.state.Tube1_inflow_duty == 0:
self.set_outflow(1, 0)
print(
f'{TAG}: WARN: Low water level detected in tube 1: {self.state.state["Tube1_sonar_dist_mm"]}. Stopped outflow')
f'{TAG}: WARN: Low water level detected in tube 1: {self.state.Tube1_sonar_dist_mm}. Stopped outflow')

elif self.state.state["Tube2_sonar_dist_mm"] < TUBE_MAX_LEVEL:
elif self.state.Tube2_sonar_dist_mm < TUBE_MAX_LEVEL:
# if outflow is active while inflow is stopped, error out
if self.state.state["Tube2_outflow_duty"] > 0 and self.state.state["Tube2_inflow_duty"] == 0:
if self.state.Tube2_outflow_duty > 0 and self.state.Tube2_inflow_duty == 0:
self.set_outflow(2, 0)
print(
f'{TAG}: WARN: Low water level detected in tube 2: {self.state.state["Tube2_sonar_dist_mm"]}. Stopped outflow')
f'{TAG}: WARN: Low water level detected in tube 2: {self.state.Tube2_sonar_dist_mm}. Stopped outflow')

else:
if DEBUG:
Expand Down Expand Up @@ -235,57 +346,6 @@ def check_version(self):
# check if device is ready
assert self.ready()

def update_state(self, tube=None, quick=False):
"""Get the last state of the device. If cached state is outdated, a new sensor reading is requested."""
# Return cached state if not outdated.
if not self.state.outdated and self.state.last_update >= time() - COMMUNICATION_TIMEOUT:
return self.state.state

# Ask for new state reading.
size_buffer = self.last_printed_buf_line

arg = ""

if quick:
arg += "2"
else:
arg += "1"

if tube:
arg += f" {tube};"
else:
arg += ";"

self.write(arg)

# Wait for the state to be received.
total_wait = 0
while True:
# Fixme: not robust
if self.last_printed_buf_line > size_buffer and self.read_buffer[-2][0] == '{':
# If we received a line starting with {, we have received the new state.
break

sleep(0.1)
# do not incur waiting time if device is busy
if not self.ready():
continue

total_wait += 0.1

if total_wait > COMMUNICATION_TIMEOUT and not self.busy:
raise RuntimeError("Waiting too long for state to be communicated.")

# New state retrieved, parse it.
state = self.get_last_raw_state()
if state:
# Convert distance to water level
state["Tube1_sonar_dist_mm"] = round(self.convert_distance_to_level(state["Tube1_sonar_dist_mm"]), 1)
state["Tube2_sonar_dist_mm"] = round(self.convert_distance_to_level(state["Tube2_sonar_dist_mm"]), 1)
self.state = ClaireState()
self.state.set_state(state)
return state

def get_last_raw_state(self):
"""Get the last raw state of the device without polling."""
# take buf backwards and try to coerce every line into dict
Expand All @@ -299,9 +359,9 @@ def get_last_raw_state(self):
def print_state(self):
"""Print state of the system."""
# seconds since state was grabbed
if self.state.state:
old = timedelta(seconds=time() - self.state.last_update)
print(f'{TAG} State ({old} secs old): {self.state}')
if self.state:
old = datetime.now() - self.state.last_update
print(f'{TAG} State ({old} old): {self.state}')
else:
print(f'{TAG} State: N/A')

Expand Down Expand Up @@ -336,9 +396,9 @@ def set_water_level(self, tube, level):
assert 0 <= level <= TUBE_MAX_LEVEL
while not self.ready():
sleep(1)
self.write(f"5 {tube} {self.convert_level_to_distance(level)};")
self.write(f"5 {tube} {self.state.convert_level_to_distance(level)};")
self.busy = True
self.state.make_outdated()
self.state.make_dynamic()

def set_inflow(self, tube, rate):
"""
Expand All @@ -353,7 +413,7 @@ def set_inflow(self, tube, rate):
sleep(1)
pump = (tube - 1) * 2 + 1
self.write(f"4 {pump} {rate};")
self.state.make_outdated()
self.state.make_dynamic()

def set_outflow(self, tube, rate):
"""
Expand All @@ -368,27 +428,7 @@ def set_outflow(self, tube, rate):
sleep(1)
pump = tube * 2
self.write(f"4 {pump} {rate};")
self.state.make_outdated()

@staticmethod
def convert_distance_to_level(distance):
"""
Convert sensor distance to water level.
:param distance: The distance from the sensor to the measured water surface.
"""
if distance < 0:
raise SensorError()
return TUBE_MAX_LEVEL - distance

@staticmethod
def convert_level_to_distance(level):
"""
Convert water level to sensor distance.
:param level: The water level to convert.
"""
return TUBE_MAX_LEVEL - level
self.state.make_dynamic()

def wait_until_free(self):
"""Wait until the device is free."""
Expand Down
6 changes: 3 additions & 3 deletions py_driver/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@

# Insert the name of the usb port, which might be different for different devices.
# An easy way to get the port name is to use the Arduino IDE.
# PORT = '/dev/ttyUSB0'
PORT = '/dev/cu.usbserial-1420'
PORT = '/dev/ttyUSB1'
#PORT = '/dev/cu.usbserial-1420'


def example_experiment():
claire = driver.ClaireDevice(PORT)
state = claire.update_state() # get current state of device
claire.print_state()
print(f'Current height of TUBE1: {state["Tube1_sonar_dist_mm"]}')
print(f'Current height of TUBE1: {state.Tube1_sonar_dist_mm}')

claire.set_inflow(1, 100)
sleep(3)
Expand Down

0 comments on commit 19afeea

Please sign in to comment.