Skip to content

Commit

Permalink
feat: better message handling
Browse files Browse the repository at this point in the history
- send messages to DAQJobs by looking into their `allowed_message_in_types` variable
  • Loading branch information
furkan-bilgin committed Oct 11, 2024
1 parent fdb33ec commit c3c6535
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 14 deletions.
14 changes: 12 additions & 2 deletions src/daq/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@


class DAQJob:
allowed_message_in_types: list[type[DAQJobMessage]] = []
config_type: Any
config: Any
message_in: Queue["DAQJobMessage"]
message_out: Queue["DAQJobMessage"]
message_in: Queue[DAQJobMessage]
message_out: Queue[DAQJobMessage]

_logger: logging.Logger

Expand All @@ -32,6 +33,15 @@ def consume(self):
def handle_message(self, message: "DAQJobMessage") -> bool:
if isinstance(message, DAQJobMessageStop):
raise DAQJobStopError(message.reason)
# check if the message is accepted
is_message_type_accepted = False
for accepted_message_type in self.allowed_message_in_types:
if isinstance(message, accepted_message_type):
is_message_type_accepted = True
if not is_message_type_accepted:
raise Exception(
f"Message type {type(message)} is not accepted by {type(self).__name__}"
)
return True

def start(self):
Expand Down
4 changes: 3 additions & 1 deletion src/daq/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ def start(self):

def handle_message(self, message: DAQJobMessage) -> bool:
if not self.can_store(message):
raise Exception(f"Invalid message type: {type(message)}")
raise Exception(
f"Invalid message type '{type(message)}' for DAQJob '{type(self).__name__}'"
)
return super().handle_message(message)

def can_store(self, message: DAQJobMessage) -> bool:
Expand Down
5 changes: 1 addition & 4 deletions src/daq/store/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class DAQJobStoreCSVConfig(DAQJobConfig):
class DAQJobStoreCSV(DAQJobStore):
config_type = DAQJobStoreCSVConfig
allowed_store_config_types = [DAQJobStoreConfigCSV]
allowed_message_in_types = [DAQJobMessageStore]
_open_files: dict[str, TextIOWrapper]

def __init__(self, config: Any):
Expand All @@ -36,10 +37,6 @@ def handle_message(self, message: DAQJobMessageStore) -> bool:
store_config = cast(DAQJobStoreConfigCSV, message.store_config)
file_path = add_date_to_file_name(store_config.file_path, store_config.add_date)

self._logger.debug(
f"Handling message for DAQ Job: {type(message.daq_job).__name__}"
)

if file_path not in self._open_files:
file_exists = os.path.exists(file_path)
# Create the file if it doesn't exist
Expand Down
1 change: 1 addition & 0 deletions src/daq/store/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class DAQJobStoreROOTConfig(DAQJobConfig):
class DAQJobStoreROOT(DAQJobStore):
config_type = DAQJobStoreROOTConfig
allowed_store_config_types = [DAQJobStoreConfigROOT]
allowed_message_in_types = [DAQJobMessageStore]
_open_files: dict[str, Any]

def __init__(self, config: Any):
Expand Down
27 changes: 20 additions & 7 deletions src/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
for thread in dead_threads:
daq_job_threads.append(start_daq_job(thread.daq_job))

# Get messages from DAQ Jobs
daq_messages = []
for thread in daq_job_threads:
try:
Expand All @@ -41,14 +42,26 @@
except Empty:
pass

# Handle store messages
# Send messages to appropriate DAQ Jobs
for message in daq_messages:
if isinstance(message, DAQJobMessageStore):
if isinstance(message.store_config, dict):
message.store_config = parse_store_config(message.store_config)
for store_job in store_jobs:
if not store_job.can_store(message):
if isinstance(message, DAQJobMessageStore) and isinstance(
message.store_config, dict
):
# Parse store config of DAQJobMessageStore
message.store_config = parse_store_config(message.store_config)

for daq_job_thread in daq_job_threads:
daq_job = daq_job_thread.daq_job

# Send if message is allowed for this DAQ Job
if any(
isinstance(message, msg_type)
for msg_type in daq_job.allowed_message_in_types
):
# Drop message type that is not supported by this DAQ Job
if isinstance(daq_job, DAQJobStore) and not daq_job.can_store(message):
continue
store_job.message_in.put(message, timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT)

daq_job.message_in.put(message, timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT)

time.sleep(1)

0 comments on commit c3c6535

Please sign in to comment.