Skip to content

Commit

Permalink
feat: add DAQJobHandleStats and send supervisor messages from main
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Oct 13, 2024
1 parent b271550 commit 277d9b4
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 11 deletions.
71 changes: 71 additions & 0 deletions src/daq/jobs/handle_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Optional

from daq.base import DAQJob
from daq.models import DAQJobMessage, DAQJobStats
from daq.store.models import DAQJobMessageStore, StorableDAQJobConfig

DAQJobStatsDict = Dict[type[DAQJob], DAQJobStats]


@dataclass
class DAQJobHandleStatsConfig(StorableDAQJobConfig):
pass


@dataclass
class DAQJobMessageStats(DAQJobMessage):
stats: DAQJobStatsDict


class DAQJobHandleStats(DAQJob):
allowed_message_in_types = [DAQJobMessageStats]
config_type = DAQJobHandleStatsConfig
config: DAQJobHandleStatsConfig

def start(self):
while True:
self.consume()
time.sleep(1)

def handle_message(self, message: DAQJobMessageStats) -> bool:
if not super().handle_message(message):
return False

keys = [
"daq_job",
"last_message_in_date",
"message_in_count",
"last_message_out_date",
"message_out_count",
]

def datetime_to_str(dt: Optional[datetime]):
if dt is None:
return "N/A"
return dt.strftime("%Y-%m-%d %H:%M:%S")

data_to_send = []
for daq_job_type, msg in message.stats.items():
data_to_send.append(
[
daq_job_type.__name__,
datetime_to_str(msg.last_message_in_date),
msg.message_in_count,
datetime_to_str(msg.last_message_out_date),
msg.message_out_count,
]
)

self.message_out.put(
DAQJobMessageStore(
store_config=self.config.store_config,
daq_job=self,
keys=keys,
data=data_to_send,
)
)

return True
26 changes: 21 additions & 5 deletions src/daq/jobs/store/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime
from io import TextIOWrapper
from pathlib import Path
from typing import Any, cast
from typing import Any, Optional, cast

from daq.models import DAQJobConfig
from daq.store.base import DAQJobStore
Expand All @@ -20,6 +20,7 @@
class DAQJobStoreConfigCSV(DAQJobStoreConfig):
file_path: str
add_date: bool
overwrite: Optional[bool] = None


@dataclass
Expand All @@ -32,6 +33,7 @@ class CSVFile:
file: TextIOWrapper
last_flush_date: datetime
write_queue: deque[list[Any]]
overwrite: Optional[bool] = None


class DAQJobStoreCSV(DAQJobStore):
Expand All @@ -51,10 +53,12 @@ def handle_message(self, message: DAQJobMessageStore) -> bool:
file_path = modify_file_path(
store_config.file_path, store_config.add_date, message.prefix
)
file, new_file = self._open_csv_file(file_path)
file, new_file = self._open_csv_file(file_path, store_config.overwrite)
if file.overwrite:
file.write_queue.clear()

# Write headers if the file is new
if new_file:
if new_file or file.overwrite:
file.write_queue.append(message.keys)

# Append rows to write_queue
Expand All @@ -63,7 +67,9 @@ def handle_message(self, message: DAQJobMessageStore) -> bool:

return True

def _open_csv_file(self, file_path: str) -> tuple[CSVFile, bool]:
def _open_csv_file(
self, file_path: str, overwrite: Optional[bool]
) -> tuple[CSVFile, bool]:
"""
Opens a file and returns (CSVFile, new_file)
"""
Expand All @@ -74,7 +80,12 @@ def _open_csv_file(self, file_path: str) -> tuple[CSVFile, bool]:
Path(file_path).touch()

# Open file
file = CSVFile(open(file_path, "a", newline=""), datetime.now(), deque())
file = CSVFile(
open(file_path, "a" if not overwrite else "w", newline=""),
datetime.now(),
deque(),
overwrite,
)
self._open_csv_files[file_path] = file
else:
file_exists = True
Expand Down Expand Up @@ -104,6 +115,11 @@ def store_loop(self):
writer.writerows(list(file.write_queue))
file.write_queue.clear()

if file.overwrite:
file.file.close()
files_to_delete.append(file_path)
continue

# Flush if the flush time is up
if self._flush(file):
self._logger.debug(f"Flushed '{file.file.name}' ({row_size} rows)")
Expand Down
7 changes: 2 additions & 5 deletions src/daq/types.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
from typing import Dict

from daq.base import DAQJob
from daq.jobs.caen.n1081b import DAQJobN1081B
from daq.jobs.handle_stats import DAQJobHandleStats
from daq.jobs.serve_http import DAQJobServeHTTP
from daq.jobs.store.csv import DAQJobStoreCSV
from daq.jobs.store.root import DAQJobStoreROOT
from daq.jobs.test_job import DAQJobTest
from daq.models import DAQJobStats

DAQ_JOB_TYPE_TO_CLASS: dict[str, type[DAQJob]] = {
"n1081b": DAQJobN1081B,
"test": DAQJobTest,
"store_csv": DAQJobStoreCSV,
"store_root": DAQJobStoreROOT,
"serve_http": DAQJobServeHTTP,
"handle_stats": DAQJobHandleStats,
}

DAQJobStatsDict = Dict[type[DAQJob], DAQJobStats]
15 changes: 14 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

from daq.base import DAQJob, DAQJobThread
from daq.daq_job import load_daq_jobs, parse_store_config, start_daq_job, start_daq_jobs
from daq.jobs.handle_stats import DAQJobMessageStats, DAQJobStatsDict
from daq.models import DAQJobMessage, DAQJobStats
from daq.store.base import DAQJobStore
from daq.store.models import DAQJobMessageStore
from daq.types import DAQJobStatsDict

DAQ_SUPERVISOR_SLEEP_TIME = 0.5
DAQ_JOB_QUEUE_ACTION_TIMEOUT = 0.1
Expand All @@ -36,12 +36,25 @@ def loop(
# Get messages from DAQ Jobs
daq_messages_out = get_messages_from_daq_jobs(daq_job_threads, daq_job_stats)

# Add supervisor messages
daq_messages_out.extend(get_supervisor_messages(daq_job_threads, daq_job_stats))

# Send messages to appropriate DAQ Jobs
send_messages_to_daq_jobs(daq_job_threads, daq_messages_out, daq_job_stats)

return daq_job_threads, daq_job_stats


def get_supervisor_messages(
daq_job_threads: list[DAQJobThread], daq_job_stats: DAQJobStatsDict
):
messages = []

# Send stats message
messages.append(DAQJobMessageStats(daq_job_stats))
return messages


def get_or_create_daq_job_stats(
daq_job_stats: DAQJobStatsDict, daq_job_type: type[DAQJob]
) -> DAQJobStats:
Expand Down

0 comments on commit 277d9b4

Please sign in to comment.