Skip to content

Commit

Permalink
feat: reconstruct DAQJob instead of starting the same class
Browse files Browse the repository at this point in the history
- doing that leads to jobs with broken state being restarted
  • Loading branch information
furkan-bilgin committed Oct 15, 2024
1 parent 5860789 commit 39f8a82
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 4 deletions.
13 changes: 12 additions & 1 deletion src/daq/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,32 @@

from daq.models import DAQJobMessage, DAQJobMessageStop, DAQJobStopError

daq_job_instance_id = 0
daq_job_instance_id_lock = threading.Lock()


class DAQJob:
allowed_message_in_types: list[type[DAQJobMessage]] = []
config_type: Any
config: Any
message_in: Queue[DAQJobMessage]
message_out: Queue[DAQJobMessage]
instance_id: int

_logger: logging.Logger

def __init__(self, config: Any):
global daq_job_instance_id, daq_job_instance_id_lock

with daq_job_instance_id_lock:
self.instance_id = daq_job_instance_id
daq_job_instance_id += 1
self._logger = logging.getLogger(f"{type(self).__name__}({self.instance_id})")

self.config = config
self.message_in = Queue()
self.message_out = Queue()
self._logger = logging.getLogger(type(self).__name__)

self._should_stop = False

def consume(self):
Expand Down
8 changes: 8 additions & 0 deletions src/daq/daq_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ def start_daq_job(daq_job: DAQJob) -> DAQJobThread:
return DAQJobThread(daq_job, thread)


def restart_daq_job(daq_job: DAQJob) -> DAQJobThread:
logging.info(f"Restarting {type(daq_job).__name__}")
new_daq_job = type(daq_job)(daq_job.config)
thread = threading.Thread(target=new_daq_job.start, daemon=True)
thread.start()
return DAQJobThread(new_daq_job, thread)


def start_daq_jobs(daq_jobs: list[DAQJob]) -> list[DAQJobThread]:
threads = []
for daq_job in daq_jobs:
Expand Down
9 changes: 7 additions & 2 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@

from daq.alert.base import DAQJobAlert
from daq.base import DAQJob, DAQJobThread
from daq.daq_job import load_daq_jobs, parse_store_config, start_daq_job, start_daq_jobs
from daq.daq_job import (
load_daq_jobs,
parse_store_config,
restart_daq_job,
start_daq_jobs,
)
from daq.jobs.handle_stats import DAQJobMessageStats, DAQJobStatsDict
from daq.models import DAQJobMessage, DAQJobStats
from daq.store.base import DAQJobStore
Expand All @@ -31,7 +36,7 @@ def loop(

# Restart jobs that have stopped
for thread in dead_threads:
daq_job_threads.append(start_daq_job(thread.daq_job))
daq_job_threads.append(restart_daq_job(thread.daq_job))
# Update restart stats
get_daq_job_stats(daq_job_stats, type(thread.daq_job)).restart_stats.increase()

Expand Down
4 changes: 3 additions & 1 deletion src/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def test_start_daq_job_threads(self, mock_load_daq_jobs, mock_start_daq_jobs):
self.assertEqual(result, ["thread1", "thread2"])

@patch("main.start_daq_job")
def test_loop(self, mock_start_daq_job):
@patch("main.restart_daq_job")
def test_loop(self, mock_start_daq_job, mock_restart_daq_job):
RUN_COUNT = 3
mock_thread_alive = MagicMock(name="thread_alive")

Expand Down Expand Up @@ -57,6 +58,7 @@ def test_loop(self, mock_start_daq_job):
mock_thread_alive.daq_job.message_out.put(mock_store_message)

mock_start_daq_job.return_value = mock_thread_dead
mock_restart_daq_job.return_value = mock_thread_store

daq_job_threads = [mock_thread_alive, mock_thread_dead, mock_thread_store]
daq_job_threads: list[DAQJobThread] = daq_job_threads
Expand Down

0 comments on commit 39f8a82

Please sign in to comment.