Skip to content

Commit

Permalink
Merge pull request #41 from SciCatProject/background-ingester-config
Browse files Browse the repository at this point in the history
Background ingestor configuration refactoring.
  • Loading branch information
nitrosx authored Jul 5, 2024
2 parents 0e1e9c1 + 5efcb04 commit 2e4c889
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 69 deletions.
10 changes: 5 additions & 5 deletions src/background_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from scicat_configuration import (
build_background_ingestor_arg_parser,
build_scicat_config,
build_scicat_background_ingester_config,
)
from scicat_logging import build_logger
from system_helpers import exit_at_exceptions
Expand All @@ -16,7 +16,7 @@ def main() -> None:
"""Main entry point of the app."""
arg_parser = build_background_ingestor_arg_parser()
arg_namespace = arg_parser.parse_args()
config = build_scicat_config(arg_namespace)
config = build_scicat_background_ingester_config(arg_namespace)
logger = build_logger(config)

# Log the configuration as dictionary so that it is easier to read from the logs
Expand All @@ -25,13 +25,13 @@ def main() -> None:
)
logger.info(config.to_dict())

with exit_at_exceptions(logger):
nexus_file = pathlib.Path(arg_namespace.nexus_file)
with exit_at_exceptions(logger, daemon=False):
nexus_file = pathlib.Path(config.single_run_options.nexus_file)
logger.info("Nexus file to be ingested : ")
logger.info(nexus_file)

done_writing_message_file = pathlib.Path(
arg_namespace.arg_namespace.done_writing_message_file
config.single_run_options.done_writing_message_file
)
logger.info("Done writing message file linked to nexus file : ")
logger.info(done_writing_message_file)
Expand Down
151 changes: 115 additions & 36 deletions src/scicat_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,58 @@
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
import argparse
from collections.abc import Mapping
from dataclasses import dataclass
from dataclasses import asdict, dataclass
from types import MappingProxyType
from typing import Any


def _load_config(config_file: Any) -> dict:
"""Load configuration from the configuration file path."""
import json
import pathlib

if (
isinstance(config_file, str | pathlib.Path)
and (config_file_path := pathlib.Path(config_file)).is_file()
):
return json.loads(config_file_path.read_text())
return {}


def _merge_run_options(config_dict: dict, input_args_dict: dict) -> dict:
"""Merge configuration from the configuration file and input arguments."""
import copy

# Overwrite deep-copied options with command line arguments
run_option_dict: dict = copy.deepcopy(config_dict.setdefault("options", {}))
for arg_name, arg_value in input_args_dict.items():
if arg_value is not None:
run_option_dict[arg_name] = arg_value

return run_option_dict


def _freeze_dict_items(d: dict) -> MappingProxyType:
"""Freeze the dictionary to make it read-only."""
return MappingProxyType(
{
key: MappingProxyType(value) if isinstance(value, dict) else value
for key, value in d.items()
}
)


def _recursive_deepcopy(obj: Any) -> dict:
"""Recursively deep copy a dictionary."""
if not isinstance(obj, dict | MappingProxyType):
return obj

copied = dict(obj)
for key, value in copied.items():
if isinstance(value, Mapping | MappingProxyType):
copied[key] = _recursive_deepcopy(value)

return copied


def build_main_arg_parser() -> argparse.ArgumentParser:
Expand Down Expand Up @@ -96,7 +147,6 @@ def build_main_arg_parser() -> argparse.ArgumentParser:

def build_background_ingestor_arg_parser() -> argparse.ArgumentParser:
parser = build_main_arg_parser()

group = parser.add_argument_group('Scicat Background Ingestor Options')

group.add_argument(
Expand Down Expand Up @@ -180,7 +230,7 @@ class kafkaOptions:


@dataclass
class ScicatConfig:
class IngesterConfig:
original_dict: Mapping
"""Original configuration dictionary in the json file."""
run_options: RunOptions
Expand All @@ -192,50 +242,79 @@ class ScicatConfig:

def to_dict(self) -> dict:
"""Return the configuration as a dictionary."""
from dataclasses import asdict

# Deep copy the original dictionary recursively
original_dict = dict(self.original_dict)
for key, value in original_dict.items():
if isinstance(value, Mapping):
original_dict[key] = dict(value)

copied = ScicatConfig(
original_dict, self.run_options, self.kafka_options, self.graylog_options
return asdict(
IngesterConfig(
_recursive_deepcopy(
self.original_dict
), # asdict does not support MappingProxyType
self.run_options,
self.kafka_options,
self.graylog_options,
)
)
return asdict(copied)


def build_scicat_config(input_args: argparse.Namespace) -> ScicatConfig:
def build_scicat_ingester_config(input_args: argparse.Namespace) -> IngesterConfig:
"""Merge configuration from the configuration file and input arguments."""
import copy
import json
import pathlib
from types import MappingProxyType
config_dict = _load_config(input_args.config_file)
run_option_dict = _merge_run_options(config_dict, vars(input_args))

# Read configuration file
if (
input_args.config_file
and (config_file_path := pathlib.Path(input_args.config_file)).is_file()
):
config_dict = json.loads(config_file_path.read_text())
else:
config_dict = {}
# Wrap configuration in a dataclass
return IngesterConfig(
original_dict=_freeze_dict_items(config_dict),
run_options=RunOptions(**run_option_dict),
kafka_options=kafkaOptions(**config_dict.setdefault("kafka", {})),
graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})),
)


@dataclass
class SingleRunOptions:
nexus_file: str
"""Full path of the input nexus file to be ingested."""
done_writing_message_file: str
"""Full path of the done writing message file that match the ``nexus_file``."""

# Overwrite deep-copied options with command line arguments
run_option_dict: dict = copy.deepcopy(config_dict.setdefault("options", {}))
for arg_name, arg_value in vars(input_args).items():
if arg_value is not None:
run_option_dict[arg_name] = arg_value

# Protect original configuration by making it read-only
for key, value in config_dict.items():
config_dict[key] = MappingProxyType(value)
@dataclass
class BackgroundIngestorConfig(IngesterConfig):
single_run_options: SingleRunOptions
"""Single run configuration options for background ingestor."""

def to_dict(self) -> dict:
"""Return the configuration as a dictionary."""

return asdict(
BackgroundIngestorConfig(
_recursive_deepcopy(
self.original_dict
), # asdict does not support MappingProxyType
self.run_options,
self.kafka_options,
self.graylog_options,
self.single_run_options,
)
)


def build_scicat_background_ingester_config(
input_args: argparse.Namespace,
) -> BackgroundIngestorConfig:
"""Merge configuration from the configuration file and input arguments."""
config_dict = _load_config(input_args.config_file)
input_args_dict = vars(input_args)
single_run_option_dict = {
"nexus_file": input_args_dict.pop("nexus_file"),
"done_writing_message_file": input_args_dict.pop("done_writing_message_file"),
}
run_option_dict = _merge_run_options(config_dict, input_args_dict)

# Wrap configuration in a dataclass
return ScicatConfig(
original_dict=MappingProxyType(config_dict),
return BackgroundIngestorConfig(
original_dict=_freeze_dict_items(config_dict),
run_options=RunOptions(**run_option_dict),
kafka_options=kafkaOptions(**config_dict.setdefault("kafka", {})),
single_run_options=SingleRunOptions(**single_run_option_dict),
graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})),
)
4 changes: 2 additions & 2 deletions src/scicat_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

del importlib

from scicat_configuration import build_main_arg_parser, build_scicat_config
from scicat_configuration import build_main_arg_parser, build_scicat_ingester_config
from scicat_kafka import build_consumer, wrdn_messages
from scicat_logging import build_logger
from system_helpers import exit_at_exceptions
Expand All @@ -21,7 +21,7 @@ def main() -> None:
"""Main entry point of the app."""
arg_parser = build_main_arg_parser()
arg_namespace = arg_parser.parse_args()
config = build_scicat_config(arg_namespace)
config = build_scicat_ingester_config(arg_namespace)
logger = build_logger(config)

# Log the configuration as dictionary so that it is easier to read from the logs
Expand Down
4 changes: 2 additions & 2 deletions src/scicat_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import logging.handlers

import graypy
from scicat_configuration import ScicatConfig
from scicat_configuration import IngesterConfig


def build_logger(config: ScicatConfig) -> logging.Logger:
def build_logger(config: IngesterConfig) -> logging.Logger:
"""Build a logger and configure it according to the ``config``."""
run_options = config.run_options

Expand Down
12 changes: 9 additions & 3 deletions src/system_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ def quit(logger: logging.Logger, unexpected: bool = True) -> None:


@contextmanager
def exit_at_exceptions(logger: logging.Logger) -> Generator[None, None, None]:
def exit_at_exceptions(
logger: logging.Logger, daemon: bool = True
) -> Generator[None, None, None]:
"""Exit the program if an exception is raised."""
try:
yield
Expand All @@ -23,5 +25,9 @@ def exit_at_exceptions(logger: logging.Logger) -> Generator[None, None, None]:
logger.error("An exception occurred: %s", e)
quit(logger, unexpected=True)
else:
logger.error("Loop finished unexpectedly.")
quit(logger, unexpected=True)
if daemon:
logger.error("Loop finished unexpectedly.")
quit(logger, unexpected=True)
else:
logger.info("Finished successfully.")
quit(logger, unexpected=False)
13 changes: 9 additions & 4 deletions tests/test_logging.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import pathlib

import pytest
from scicat_configuration import GraylogOptions, RunOptions, ScicatConfig, kafkaOptions
from scicat_configuration import (
GraylogOptions,
IngesterConfig,
RunOptions,
kafkaOptions,
)


@pytest.fixture()
def scicat_config(tmp_path: pathlib.Path) -> ScicatConfig:
return ScicatConfig(
def scicat_config(tmp_path: pathlib.Path) -> IngesterConfig:
return IngesterConfig(
original_dict={},
run_options=RunOptions(
config_file='test',
Expand All @@ -26,7 +31,7 @@ def scicat_config(tmp_path: pathlib.Path) -> ScicatConfig:
)


def test_scicat_logging_build_logger(scicat_config: ScicatConfig) -> None:
def test_scicat_logging_build_logger(scicat_config: IngesterConfig) -> None:
from scicat_logging import build_logger

logger = build_logger(scicat_config)
Expand Down
Loading

0 comments on commit 2e4c889

Please sign in to comment.