Skip to content

Commit

Permalink
feat: further the messaging of daq jobs
Browse files Browse the repository at this point in the history
- add `message_in` and `message_out`
- handle generic messages in base model
- let it crash! make the daemon restart the DAQJob's, not the DAQJob itself.
  • Loading branch information
furkan-bilgin committed Oct 10, 2024
1 parent 6fda5d1 commit 1f295d2
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 44 deletions.
55 changes: 22 additions & 33 deletions src/daq/caen/n1081b.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from N1081B import N1081B
from websocket import WebSocket

from daq.models import DAQJob, DAQJobConfig
from daq.models import DAQJob, DAQJobConfig, DAQJobMessage

N1081B_QUERY_INTERVAL_SECONDS = 1

Expand All @@ -30,49 +30,38 @@ def __init__(self, config: DAQN1081BConfig):
if section not in N1081B.Section.__members__:
raise Exception(f"Invalid section: {section}")

def handle_message(self, message: DAQJobMessage):
super().handle_message(message)

# Do not handle the rest of the messages if the connection is not established
if not self._is_connected():
return False

def start(self):
while True:
# Try to connect to the device
if not self._try_connect():
self._logger.error("Connection failed, retrying")
continue
self.consume()

self._start_loop()
# Log in if not connected
if not self._is_connected():
self._logger.error("Connecting to the device...")
self._connect_to_device()

def _start_loop(self):
while True:
if self._should_stop:
return True

# Stop if the connection is dropped
if isinstance(self.device.ws, WebSocket) and not self.device.ws.connected:
self._logger.error("Connection dropped")
break

try:
self._loop()
except ConnectionResetError:
self._logger.error("Connection reset")
break
except ConnectionAbortedError:
self._logger.error("Connection aborted")
break
# Poll sections
self._poll_sections()

time.sleep(N1081B_QUERY_INTERVAL_SECONDS)

def _try_connect(self) -> bool:
try:
if not self.device.connect():
return False
except ConnectionRefusedError:
return False
def _is_connected(self) -> bool:
return isinstance(self.device.ws, WebSocket) and self.device.ws.connected

def _connect_to_device(self):
if not self.device.connect():
raise Exception("Connection failed")

if not self.device.login(self.config.password):
raise Exception("Login failed")

return True

def _loop(self):
def _poll_sections(self):
for section in self.config.sections_to_store:
section = N1081B.Section[section]

Expand Down
40 changes: 34 additions & 6 deletions src/daq/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import threading
from dataclasses import dataclass
from queue import Queue
from typing import Any

from dataclasses_json import DataClassJsonMixin
Expand All @@ -14,20 +15,32 @@ class DAQJobConfig(DataClassJsonMixin):
class DAQJob:
config_type: Any
config: Any
message_in: Queue["DAQJobMessage"]
message_out: Queue["DAQJobMessage"]

_logger: logging.Logger
_should_stop: bool

def __init__(self, config: Any):
self.config = config
self.message_in = Queue()
self.message_out = Queue()
self._logger = logging.getLogger(type(self).__name__)
self._should_stop = False

def start(self):
pass
def consume(self):
# consume messages from the queue
while not self.message_in.empty():
message = self.message_in.get()
if not self.handle_message(message):
self.message_in.put_nowait(message)

def handle_message(self, message: "DAQJobMessage") -> bool:
if isinstance(message, DAQJobMessageStop):
raise DAQJobStopError(message.reason)
return True

def stop(self):
assert not self._should_stop, "DAQ job is already stopped"
self._should_stop = True
def start(self):
raise NotImplementedError

def __del__(self):
self._logger.info("DAQ job is being deleted")
Expand All @@ -37,3 +50,18 @@ def __del__(self):
class DAQJobThread:
daq_job: DAQJob
thread: threading.Thread


@dataclass
class DAQJobMessage:
pass


@dataclass
class DAQJobMessageStop(DAQJobMessage):
reason: str


class DAQJobStopError(Exception):
def __init__(self, reason: str):
self.reason = reason
15 changes: 10 additions & 5 deletions src/test_entrypoint.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
import time

from daq.daq_job import load_daq_jobs, start_daq_jobs
from daq.daq_job import load_daq_jobs, start_daq_job, start_daq_jobs
from daq.models import DAQJobMessageStop

Check failure on line 5 in src/test_entrypoint.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F401)

src/test_entrypoint.py:5:24: F401 `daq.models.DAQJobMessageStop` imported but unused
from daq.store.models import DAQJobStore

logging.basicConfig(
level=logging.DEBUG,
format="[%(asctime)s] [%(name)s.%(funcName)s:%(lineno)d] %(levelname)s: %(message)s",
format="[%(asctime)s] [%(name)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)

Expand All @@ -20,8 +21,12 @@
logging.warning("No store job found, data will not be stored")

while True:
any_thread_alive = any(t.thread.is_alive() for t in daq_job_threads)
if not any_thread_alive:
break
dead_threads = [t for t in daq_job_threads if not t.thread.is_alive()]
# Clean up dead threads
daq_job_threads = [t for t in daq_job_threads if t not in dead_threads]

# Restart jobs that have stopped
for thread in dead_threads:
daq_job_threads.append(start_daq_job(thread.daq_job))

time.sleep(1)

0 comments on commit 1f295d2

Please sign in to comment.