From a13678fb3189c29cb9712308d3246a98910d6ae5 Mon Sep 17 00:00:00 2001 From: Furkan Date: Fri, 11 Oct 2024 19:01:12 +0300 Subject: [PATCH] refactor: rename `test_entrypoint.py` to `main.py` and refactor loop to handle `KeyboardInterrupt` - also delete `test_csv.py` --- src/{test_entrypoint.py => main.py} | 22 +++++++++++++++----- src/test_csv.py | 32 ----------------------------- 2 files changed, 17 insertions(+), 37 deletions(-) rename src/{test_entrypoint.py => main.py} (81%) delete mode 100644 src/test_csv.py diff --git a/src/test_entrypoint.py b/src/main.py similarity index 81% rename from src/test_entrypoint.py rename to src/main.py index 86a6562..fe56219 100644 --- a/src/test_entrypoint.py +++ b/src/main.py @@ -5,6 +5,7 @@ import coloredlogs from daq.daq_job import load_daq_jobs, parse_store_config, start_daq_job, start_daq_jobs +from daq.models import DAQJobMessageStop from daq.store.base import DAQJobStore from daq.store.models import DAQJobMessageStore @@ -13,17 +14,18 @@ datefmt="%Y-%m-%d %H:%M:%S", ) +DAQ_SUPERVISOR_SLEEP_TIME = 0.5 DAQ_JOB_QUEUE_ACTION_TIMEOUT = 0.1 daq_jobs = load_daq_jobs("configs/") daq_job_threads = start_daq_jobs(daq_jobs) -store_jobs = [x for x in daq_jobs if isinstance(x, DAQJobStore)] - -if len(store_jobs) == 0: +if not any(x for x in daq_jobs if isinstance(x, DAQJobStore)): logging.warning("No store job found, data will not be stored") -while True: + +def loop(): + global daq_job_threads dead_threads = [t for t in daq_job_threads if not t.thread.is_alive()] # Clean up dead threads daq_job_threads = [t for t in daq_job_threads if t not in dead_threads] @@ -64,4 +66,14 @@ daq_job.message_in.put(message, timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT) - time.sleep(1) + time.sleep(DAQ_SUPERVISOR_SLEEP_TIME) + + +while True: + try: + loop() + except KeyboardInterrupt: + logging.warning("KeyboardInterrupt received, cleaning up") + for daq_job_thread in daq_job_threads: + daq_job_thread.daq_job.__del__() + break diff --git a/src/test_csv.py b/src/test_csv.py deleted file mode 100644 index bfa3efc..0000000 --- a/src/test_csv.py +++ /dev/null @@ -1,32 +0,0 @@ -import logging -import time - -import coloredlogs - -from daq.daq_job import DAQJob, parse_store_config, start_daq_job -from daq.store.csv import DAQJobStoreCSV -from daq.store.models import DAQJobMessageStore - -coloredlogs.install( - level=logging.DEBUG, - datefmt="%Y-%m-%d %H:%M:%S", -) - -daq_job_thread = start_daq_job(DAQJobStoreCSV({})) -_test_daq_job = DAQJob({}) -while True: - daq_job_thread.daq_job.message_in.put_nowait( - DAQJobMessageStore( - daq_job=_test_daq_job, - keys=["a", "b", "c"], - data=[["1", "2", "3"], ["4", "5", "6"]], - store_config=parse_store_config( - { - "daq_job_store_type": "csv", - "file_path": "test.csv", - "add_date": True, - } - ), - ) - ) - time.sleep(1)