From c3c65352268f398660a916d2d4f094e1b2129474 Mon Sep 17 00:00:00 2001 From: Furkan Date: Fri, 11 Oct 2024 18:51:48 +0300 Subject: [PATCH] feat: better message handling - send messages to DAQJobs by looking into their `allowed_message_in_types` variable --- src/daq/base.py | 14 ++++++++++++-- src/daq/store/base.py | 4 +++- src/daq/store/csv.py | 5 +---- src/daq/store/root.py | 1 + src/test_entrypoint.py | 27 ++++++++++++++++++++------- 5 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/daq/base.py b/src/daq/base.py index 4023176..27d56bd 100644 --- a/src/daq/base.py +++ b/src/daq/base.py @@ -8,10 +8,11 @@ class DAQJob: + allowed_message_in_types: list[type[DAQJobMessage]] = [] config_type: Any config: Any - message_in: Queue["DAQJobMessage"] - message_out: Queue["DAQJobMessage"] + message_in: Queue[DAQJobMessage] + message_out: Queue[DAQJobMessage] _logger: logging.Logger @@ -32,6 +33,15 @@ def consume(self): def handle_message(self, message: "DAQJobMessage") -> bool: if isinstance(message, DAQJobMessageStop): raise DAQJobStopError(message.reason) + # check if the message is accepted + is_message_type_accepted = False + for accepted_message_type in self.allowed_message_in_types: + if isinstance(message, accepted_message_type): + is_message_type_accepted = True + if not is_message_type_accepted: + raise Exception( + f"Message type {type(message)} is not accepted by {type(self).__name__}" + ) return True def start(self): diff --git a/src/daq/store/base.py b/src/daq/store/base.py index f2389b0..d8176c9 100644 --- a/src/daq/store/base.py +++ b/src/daq/store/base.py @@ -15,7 +15,9 @@ def start(self): def handle_message(self, message: DAQJobMessage) -> bool: if not self.can_store(message): - raise Exception(f"Invalid message type: {type(message)}") + raise Exception( + f"Invalid message type '{type(message)}' for DAQJob '{type(self).__name__}'" + ) return super().handle_message(message) def can_store(self, message: DAQJobMessage) -> bool: diff --git a/src/daq/store/csv.py b/src/daq/store/csv.py index 57799a0..7f28769 100644 --- a/src/daq/store/csv.py +++ b/src/daq/store/csv.py @@ -25,6 +25,7 @@ class DAQJobStoreCSVConfig(DAQJobConfig): class DAQJobStoreCSV(DAQJobStore): config_type = DAQJobStoreCSVConfig allowed_store_config_types = [DAQJobStoreConfigCSV] + allowed_message_in_types = [DAQJobMessageStore] _open_files: dict[str, TextIOWrapper] def __init__(self, config: Any): @@ -36,10 +37,6 @@ def handle_message(self, message: DAQJobMessageStore) -> bool: store_config = cast(DAQJobStoreConfigCSV, message.store_config) file_path = add_date_to_file_name(store_config.file_path, store_config.add_date) - self._logger.debug( - f"Handling message for DAQ Job: {type(message.daq_job).__name__}" - ) - if file_path not in self._open_files: file_exists = os.path.exists(file_path) # Create the file if it doesn't exist diff --git a/src/daq/store/root.py b/src/daq/store/root.py index 4e53a29..99ee19b 100644 --- a/src/daq/store/root.py +++ b/src/daq/store/root.py @@ -24,6 +24,7 @@ class DAQJobStoreROOTConfig(DAQJobConfig): class DAQJobStoreROOT(DAQJobStore): config_type = DAQJobStoreROOTConfig allowed_store_config_types = [DAQJobStoreConfigROOT] + allowed_message_in_types = [DAQJobMessageStore] _open_files: dict[str, Any] def __init__(self, config: Any): diff --git a/src/test_entrypoint.py b/src/test_entrypoint.py index 4861361..86a6562 100644 --- a/src/test_entrypoint.py +++ b/src/test_entrypoint.py @@ -32,6 +32,7 @@ for thread in dead_threads: daq_job_threads.append(start_daq_job(thread.daq_job)) + # Get messages from DAQ Jobs daq_messages = [] for thread in daq_job_threads: try: @@ -41,14 +42,26 @@ except Empty: pass - # Handle store messages + # Send messages to appropriate DAQ Jobs for message in daq_messages: - if isinstance(message, DAQJobMessageStore): - if isinstance(message.store_config, dict): - message.store_config = parse_store_config(message.store_config) - for store_job in store_jobs: - if not store_job.can_store(message): + if isinstance(message, DAQJobMessageStore) and isinstance( + message.store_config, dict + ): + # Parse store config of DAQJobMessageStore + message.store_config = parse_store_config(message.store_config) + + for daq_job_thread in daq_job_threads: + daq_job = daq_job_thread.daq_job + + # Send if message is allowed for this DAQ Job + if any( + isinstance(message, msg_type) + for msg_type in daq_job.allowed_message_in_types + ): + # Drop message type that is not supported by this DAQ Job + if isinstance(daq_job, DAQJobStore) and not daq_job.can_store(message): continue - store_job.message_in.put(message, timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT) + + daq_job.message_in.put(message, timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT) time.sleep(1)