diff --git a/src/daq/base.py b/src/daq/base.py index f7391e0..b20718b 100644 --- a/src/daq/base.py +++ b/src/daq/base.py @@ -29,6 +29,8 @@ def __init__(self, config: Any): self.instance_id = daq_job_instance_id daq_job_instance_id += 1 self._logger = logging.getLogger(f"{type(self).__name__}({self.instance_id})") + if isinstance(config, DAQJobConfig): + self._logger.setLevel(config.verbosity.to_logging_level()) self.config = config self.message_in = Queue() diff --git a/src/daq/jobs/remote.py b/src/daq/jobs/remote.py index a6cc9dc..3edff71 100644 --- a/src/daq/jobs/remote.py +++ b/src/daq/jobs/remote.py @@ -70,6 +70,9 @@ def handle_message(self, message: DAQJobMessage) -> bool: def _start_receive_thread(self): while True: message = self._zmq_remote.recv() + self._logger.debug( + f"Received {len(message)} bytes from remote ({self.config.zmq_remote_url})" + ) # remote message_in -> message_out self.message_out.put(self._unpack_message(message)) @@ -78,13 +81,14 @@ def start(self): while True: if not self._receive_thread.is_alive(): - raise RuntimeError("receive thread died") + raise RuntimeError("Receive thread died") # message_in -> remote message_out self.consume() time.sleep(0.1) def _pack_message(self, message: DAQJobMessage) -> bytes: message_type = type(message).__name__ + self._logger.debug(f"Packing message {message_type} ({message.id})") return json.dumps([message_type, message.to_json()]).encode("utf-8") def _unpack_message(self, message: bytes) -> DAQJobMessage: @@ -94,11 +98,12 @@ def _unpack_message(self, message: bytes) -> DAQJobMessage: message_class = self._message_class_cache[message_type] - res = message_class.from_json(data) + res: DAQJobMessage = message_class.from_json(data) if res.id is None: raise Exception("Message id is not set") self._remote_message_ids.add(res.id) if len(self._remote_message_ids) > DAQ_JOB_REMOTE_MAX_REMOTE_MESSAGE_ID_COUNT: self._remote_message_ids.pop() + self._logger.debug(f"Unpacked message {message_type} ({res.id})") return res diff --git a/src/daq/models.py b/src/daq/models.py index 8067a6f..02e7922 100644 --- a/src/daq/models.py +++ b/src/daq/models.py @@ -1,18 +1,32 @@ +import logging +import uuid from dataclasses import dataclass, field from datetime import datetime +from enum import Enum from typing import Optional from dataclasses_json import DataClassJsonMixin -@dataclass +class LogVerbosity(str, Enum): + DEBUG = "DEBUG" + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + + def to_logging_level(self) -> int: + return logging._nameToLevel[self.value] + + +@dataclass(kw_only=True) class DAQJobConfig(DataClassJsonMixin): + verbosity: LogVerbosity = LogVerbosity.INFO daq_job_type: str @dataclass(kw_only=True) class DAQJobMessage(DataClassJsonMixin): - id: Optional[str] = None + id: Optional[str] = field(default_factory=lambda: str(uuid.uuid4())) @dataclass