Skip to content

Commit

Permalink
feat: add DAQJobStats
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Oct 13, 2024
1 parent ca2f2d1 commit 34ba946
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 12 deletions.
10 changes: 10 additions & 0 deletions src/daq/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

from dataclasses_json import DataClassJsonMixin

Expand All @@ -18,6 +20,14 @@ class DAQJobMessageStop(DAQJobMessage):
reason: str


@dataclass
class DAQJobStats:
message_in_count: int
message_out_count: int
last_message_in_date: Optional[datetime]
last_message_out_date: Optional[datetime]


class DAQJobStopError(Exception):
def __init__(self, reason: str):
self.reason = reason
5 changes: 5 additions & 0 deletions src/daq/types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from typing import Dict

from daq.base import DAQJob
from daq.caen.n1081b import DAQJobN1081B
from daq.models import DAQJobStats
from daq.serve_http import DAQJobServeHTTP
from daq.store.csv import DAQJobStoreCSV
from daq.store.root import DAQJobStoreROOT
Expand All @@ -12,3 +15,5 @@
"store_root": DAQJobStoreROOT,
"serve_http": DAQJobServeHTTP,
}

DAQJobStatsDict = Dict[type[DAQJob], DAQJobStats]
51 changes: 42 additions & 9 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import logging
import time
from datetime import datetime
from queue import Empty

import coloredlogs

from daq.base import DAQJobThread
from daq.base import DAQJob, DAQJobThread
from daq.daq_job import load_daq_jobs, parse_store_config, start_daq_job, start_daq_jobs
from daq.models import DAQJobMessage
from daq.models import DAQJobMessage, DAQJobStats
from daq.store.base import DAQJobStore
from daq.store.models import DAQJobMessageStore
from daq.types import DAQJobStatsDict

DAQ_SUPERVISOR_SLEEP_TIME = 0.5
DAQ_JOB_QUEUE_ACTION_TIMEOUT = 0.1
Expand All @@ -18,7 +20,10 @@ def start_daq_job_threads() -> list[DAQJobThread]:
return start_daq_jobs(load_daq_jobs("configs/"))


def loop(daq_job_threads: list[DAQJobThread]) -> list[DAQJobThread]:
def loop(
daq_job_threads: list[DAQJobThread],
daq_job_stats: DAQJobStatsDict,
) -> tuple[list[DAQJobThread], DAQJobStatsDict]:
# Remove dead threads
dead_threads = [t for t in daq_job_threads if not t.thread.is_alive()]
# Clean up dead threads
Expand All @@ -29,29 +34,51 @@ def loop(daq_job_threads: list[DAQJobThread]) -> list[DAQJobThread]:
daq_job_threads.append(start_daq_job(thread.daq_job))

# Get messages from DAQ Jobs
daq_messages = get_messages_from_daq_jobs(daq_job_threads)
daq_messages_out = get_messages_from_daq_jobs(daq_job_threads, daq_job_stats)

# Send messages to appropriate DAQ Jobs
send_messages_to_daq_jobs(daq_job_threads, daq_messages)
send_messages_to_daq_jobs(daq_job_threads, daq_messages_out, daq_job_stats)

return daq_job_threads
return daq_job_threads, daq_job_stats


def get_messages_from_daq_jobs(daq_job_threads: list[DAQJobThread]):
def get_or_create_daq_job_stats(
daq_job_stats: DAQJobStatsDict, daq_job_type: type[DAQJob]
) -> DAQJobStats:
if daq_job_type not in daq_job_stats:
daq_job_stats[daq_job_type] = DAQJobStats(
message_in_count=0,
message_out_count=0,
last_message_in_date=None,
last_message_out_date=None,
)
return daq_job_stats[daq_job_type]


def get_messages_from_daq_jobs(
daq_job_threads: list[DAQJobThread], daq_job_stats: DAQJobStatsDict
) -> list[DAQJobMessage]:
res = []
for thread in daq_job_threads:
try:
while True:
res.append(
thread.daq_job.message_out.get(timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT)
)

# Update stats
stats = get_or_create_daq_job_stats(daq_job_stats, type(thread.daq_job))
stats.message_out_count += 1
stats.last_message_out_date = datetime.now()
except Empty:
pass
return res


def send_messages_to_daq_jobs(
daq_job_threads: list[DAQJobThread], daq_messages: list[DAQJobMessage]
daq_job_threads: list[DAQJobThread],
daq_messages: list[DAQJobMessage],
daq_job_stats: DAQJobStatsDict,
):
for message in daq_messages:
if isinstance(message, DAQJobMessageStore) and isinstance(
Expand All @@ -72,20 +99,26 @@ def send_messages_to_daq_jobs(
continue
daq_job.message_in.put(message, timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT)

# Update stats
stats = get_or_create_daq_job_stats(daq_job_stats, type(daq_job))
stats.message_in_count += 1
stats.last_message_in_date = datetime.now()


if __name__ == "__main__":
coloredlogs.install(
level=logging.DEBUG,
datefmt="%Y-%m-%d %H:%M:%S",
)
daq_job_threads = start_daq_job_threads()
daq_job_stats: DAQJobStatsDict = {}

if not any(x for x in daq_job_threads if isinstance(x.daq_job, DAQJobStore)):
logging.warning("No store job found, data will not be stored")

while True:
try:
daq_job_threads = loop(daq_job_threads)
daq_job_threads, daq_job_stats = loop(daq_job_threads, daq_job_stats)
time.sleep(DAQ_SUPERVISOR_SLEEP_TIME)
except KeyboardInterrupt:
logging.warning("KeyboardInterrupt received, cleaning up")
Expand Down
7 changes: 4 additions & 3 deletions src/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def test_loop(self, mock_start_daq_job):
daq_job_threads = [mock_thread_alive, mock_thread_dead, mock_thread_store]
daq_job_threads: list[DAQJobThread] = daq_job_threads

result = loop(daq_job_threads)
# TODO: test stats
result, _ = loop(daq_job_threads, {})

self.assertEqual(
result, [mock_thread_alive, mock_thread_store, mock_thread_dead]
Expand All @@ -79,7 +80,7 @@ def test_get_messages_from_daq_jobs(self):
daq_job_threads = [mock_thread]
daq_job_threads: list[DAQJobThread] = daq_job_threads

result = get_messages_from_daq_jobs(daq_job_threads)
result = get_messages_from_daq_jobs(daq_job_threads, {})

self.assertEqual(result, [mock_message])

Expand All @@ -94,7 +95,7 @@ def test_send_messages_to_daq_jobs(self, mock_parse_store_config):
daq_job_threads = [mock_thread]
daq_job_threads: list[DAQJobThread] = daq_job_threads

send_messages_to_daq_jobs(daq_job_threads, [mock_message])
send_messages_to_daq_jobs(daq_job_threads, [mock_message], {})

mock_parse_store_config.assert_called_once_with({})
self.assertFalse(mock_thread.daq_job.message_in.empty())
Expand Down

0 comments on commit 34ba946

Please sign in to comment.