Skip to content

Commit

Permalink
feat: add verbosity to DAQJobConfig and add debug logs to `DAQJob…
Browse files Browse the repository at this point in the history
…Remote`
  • Loading branch information
furkan-bilgin committed Oct 19, 2024
1 parent cfede93 commit a22df53
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/daq/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ def __init__(self, config: Any):
self.instance_id = daq_job_instance_id
daq_job_instance_id += 1
self._logger = logging.getLogger(f"{type(self).__name__}({self.instance_id})")
if isinstance(config, DAQJobConfig):
self._logger.setLevel(config.verbosity.to_logging_level())

self.config = config
self.message_in = Queue()
Expand Down
9 changes: 7 additions & 2 deletions src/daq/jobs/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def handle_message(self, message: DAQJobMessage) -> bool:
def _start_receive_thread(self):
while True:
message = self._zmq_remote.recv()
self._logger.debug(
f"Received {len(message)} bytes from remote ({self.config.zmq_remote_url})"
)
# remote message_in -> message_out
self.message_out.put(self._unpack_message(message))

Expand All @@ -78,13 +81,14 @@ def start(self):

while True:
if not self._receive_thread.is_alive():
raise RuntimeError("receive thread died")
raise RuntimeError("Receive thread died")
# message_in -> remote message_out
self.consume()
time.sleep(0.1)

def _pack_message(self, message: DAQJobMessage) -> bytes:
message_type = type(message).__name__
self._logger.debug(f"Packing message {message_type} ({message.id})")
return json.dumps([message_type, message.to_json()]).encode("utf-8")

def _unpack_message(self, message: bytes) -> DAQJobMessage:
Expand All @@ -94,11 +98,12 @@ def _unpack_message(self, message: bytes) -> DAQJobMessage:

message_class = self._message_class_cache[message_type]

res = message_class.from_json(data)
res: DAQJobMessage = message_class.from_json(data)
if res.id is None:
raise Exception("Message id is not set")

self._remote_message_ids.add(res.id)
if len(self._remote_message_ids) > DAQ_JOB_REMOTE_MAX_REMOTE_MESSAGE_ID_COUNT:
self._remote_message_ids.pop()
self._logger.debug(f"Unpacked message {message_type} ({res.id})")
return res
18 changes: 16 additions & 2 deletions src/daq/models.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
import logging
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional

from dataclasses_json import DataClassJsonMixin


@dataclass
class LogVerbosity(str, Enum):
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"

def to_logging_level(self) -> int:
return logging._nameToLevel[self.value]


@dataclass(kw_only=True)
class DAQJobConfig(DataClassJsonMixin):
verbosity: LogVerbosity = LogVerbosity.INFO
daq_job_type: str


@dataclass(kw_only=True)
class DAQJobMessage(DataClassJsonMixin):
id: Optional[str] = None
id: Optional[str] = field(default_factory=lambda: str(uuid.uuid4()))


@dataclass
Expand Down

0 comments on commit a22df53

Please sign in to comment.