diff --git a/src/background_ingestor.py b/src/background_ingestor.py index 2190a6b..a15a742 100644 --- a/src/background_ingestor.py +++ b/src/background_ingestor.py @@ -6,7 +6,7 @@ from scicat_configuration import ( build_background_ingestor_arg_parser, - build_scicat_config, + build_scicat_background_ingester_config, ) from scicat_logging import build_logger from system_helpers import exit_at_exceptions @@ -16,7 +16,7 @@ def main() -> None: """Main entry point of the app.""" arg_parser = build_background_ingestor_arg_parser() arg_namespace = arg_parser.parse_args() - config = build_scicat_config(arg_namespace) + config = build_scicat_background_ingester_config(arg_namespace) logger = build_logger(config) # Log the configuration as dictionary so that it is easier to read from the logs @@ -25,13 +25,13 @@ def main() -> None: ) logger.info(config.to_dict()) - with exit_at_exceptions(logger): - nexus_file = pathlib.Path(arg_namespace.nexus_file) + with exit_at_exceptions(logger, daemon=False): + nexus_file = pathlib.Path(config.single_run_options.nexus_file) logger.info("Nexus file to be ingested : ") logger.info(nexus_file) done_writing_message_file = pathlib.Path( - arg_namespace.arg_namespace.done_writing_message_file + config.single_run_options.done_writing_message_file ) logger.info("Done writing message file linked to nexus file : ") logger.info(done_writing_message_file) diff --git a/src/scicat_configuration.py b/src/scicat_configuration.py index 147b777..adb20f4 100644 --- a/src/scicat_configuration.py +++ b/src/scicat_configuration.py @@ -2,7 +2,58 @@ # Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) import argparse from collections.abc import Mapping -from dataclasses import dataclass +from dataclasses import asdict, dataclass +from types import MappingProxyType +from typing import Any + + +def _load_config(config_file: Any) -> dict: + """Load configuration from the configuration file path.""" + import json + import pathlib + + if ( + isinstance(config_file, str | pathlib.Path) + and (config_file_path := pathlib.Path(config_file)).is_file() + ): + return json.loads(config_file_path.read_text()) + return {} + + +def _merge_run_options(config_dict: dict, input_args_dict: dict) -> dict: + """Merge configuration from the configuration file and input arguments.""" + import copy + + # Overwrite deep-copied options with command line arguments + run_option_dict: dict = copy.deepcopy(config_dict.setdefault("options", {})) + for arg_name, arg_value in input_args_dict.items(): + if arg_value is not None: + run_option_dict[arg_name] = arg_value + + return run_option_dict + + +def _freeze_dict_items(d: dict) -> MappingProxyType: + """Freeze the dictionary to make it read-only.""" + return MappingProxyType( + { + key: MappingProxyType(value) if isinstance(value, dict) else value + for key, value in d.items() + } + ) + + +def _recursive_deepcopy(obj: Any) -> dict: + """Recursively deep copy a dictionary.""" + if not isinstance(obj, dict | MappingProxyType): + return obj + + copied = dict(obj) + for key, value in copied.items(): + if isinstance(value, Mapping | MappingProxyType): + copied[key] = _recursive_deepcopy(value) + + return copied def build_main_arg_parser() -> argparse.ArgumentParser: @@ -96,7 +147,6 @@ def build_main_arg_parser() -> argparse.ArgumentParser: def build_background_ingestor_arg_parser() -> argparse.ArgumentParser: parser = build_main_arg_parser() - group = parser.add_argument_group('Scicat Background Ingestor Options') group.add_argument( @@ -180,7 +230,7 @@ class kafkaOptions: @dataclass -class ScicatConfig: +class IngesterConfig: original_dict: Mapping """Original configuration dictionary in the json file.""" run_options: RunOptions @@ -192,50 +242,79 @@ class ScicatConfig: def to_dict(self) -> dict: """Return the configuration as a dictionary.""" - from dataclasses import asdict - - # Deep copy the original dictionary recursively - original_dict = dict(self.original_dict) - for key, value in original_dict.items(): - if isinstance(value, Mapping): - original_dict[key] = dict(value) - copied = ScicatConfig( - original_dict, self.run_options, self.kafka_options, self.graylog_options + return asdict( + IngesterConfig( + _recursive_deepcopy( + self.original_dict + ), # asdict does not support MappingProxyType + self.run_options, + self.kafka_options, + self.graylog_options, + ) ) - return asdict(copied) -def build_scicat_config(input_args: argparse.Namespace) -> ScicatConfig: +def build_scicat_ingester_config(input_args: argparse.Namespace) -> IngesterConfig: """Merge configuration from the configuration file and input arguments.""" - import copy - import json - import pathlib - from types import MappingProxyType + config_dict = _load_config(input_args.config_file) + run_option_dict = _merge_run_options(config_dict, vars(input_args)) - # Read configuration file - if ( - input_args.config_file - and (config_file_path := pathlib.Path(input_args.config_file)).is_file() - ): - config_dict = json.loads(config_file_path.read_text()) - else: - config_dict = {} + # Wrap configuration in a dataclass + return IngesterConfig( + original_dict=_freeze_dict_items(config_dict), + run_options=RunOptions(**run_option_dict), + kafka_options=kafkaOptions(**config_dict.setdefault("kafka", {})), + graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})), + ) + + +@dataclass +class SingleRunOptions: + nexus_file: str + """Full path of the input nexus file to be ingested.""" + done_writing_message_file: str + """Full path of the done writing message file that match the ``nexus_file``.""" - # Overwrite deep-copied options with command line arguments - run_option_dict: dict = copy.deepcopy(config_dict.setdefault("options", {})) - for arg_name, arg_value in vars(input_args).items(): - if arg_value is not None: - run_option_dict[arg_name] = arg_value - # Protect original configuration by making it read-only - for key, value in config_dict.items(): - config_dict[key] = MappingProxyType(value) +@dataclass +class BackgroundIngestorConfig(IngesterConfig): + single_run_options: SingleRunOptions + """Single run configuration options for background ingestor.""" + + def to_dict(self) -> dict: + """Return the configuration as a dictionary.""" + + return asdict( + BackgroundIngestorConfig( + _recursive_deepcopy( + self.original_dict + ), # asdict does not support MappingProxyType + self.run_options, + self.kafka_options, + self.graylog_options, + self.single_run_options, + ) + ) + + +def build_scicat_background_ingester_config( + input_args: argparse.Namespace, +) -> BackgroundIngestorConfig: + """Merge configuration from the configuration file and input arguments.""" + config_dict = _load_config(input_args.config_file) + input_args_dict = vars(input_args) + single_run_option_dict = { + "nexus_file": input_args_dict.pop("nexus_file"), + "done_writing_message_file": input_args_dict.pop("done_writing_message_file"), + } + run_option_dict = _merge_run_options(config_dict, input_args_dict) # Wrap configuration in a dataclass - return ScicatConfig( - original_dict=MappingProxyType(config_dict), + return BackgroundIngestorConfig( + original_dict=_freeze_dict_items(config_dict), run_options=RunOptions(**run_option_dict), kafka_options=kafkaOptions(**config_dict.setdefault("kafka", {})), + single_run_options=SingleRunOptions(**single_run_option_dict), graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})), ) diff --git a/src/scicat_ingestor.py b/src/scicat_ingestor.py index 0a835eb..f8c8832 100644 --- a/src/scicat_ingestor.py +++ b/src/scicat_ingestor.py @@ -11,7 +11,7 @@ del importlib -from scicat_configuration import build_main_arg_parser, build_scicat_config +from scicat_configuration import build_main_arg_parser, build_scicat_ingester_config from scicat_kafka import build_consumer, wrdn_messages from scicat_logging import build_logger from system_helpers import exit_at_exceptions @@ -21,7 +21,7 @@ def main() -> None: """Main entry point of the app.""" arg_parser = build_main_arg_parser() arg_namespace = arg_parser.parse_args() - config = build_scicat_config(arg_namespace) + config = build_scicat_ingester_config(arg_namespace) logger = build_logger(config) # Log the configuration as dictionary so that it is easier to read from the logs diff --git a/src/scicat_logging.py b/src/scicat_logging.py index ee9a35b..0104cbf 100644 --- a/src/scicat_logging.py +++ b/src/scicat_logging.py @@ -5,10 +5,10 @@ import logging.handlers import graypy -from scicat_configuration import ScicatConfig +from scicat_configuration import IngesterConfig -def build_logger(config: ScicatConfig) -> logging.Logger: +def build_logger(config: IngesterConfig) -> logging.Logger: """Build a logger and configure it according to the ``config``.""" run_options = config.run_options diff --git a/src/system_helpers.py b/src/system_helpers.py index 6077f57..ed88a33 100644 --- a/src/system_helpers.py +++ b/src/system_helpers.py @@ -12,7 +12,9 @@ def quit(logger: logging.Logger, unexpected: bool = True) -> None: @contextmanager -def exit_at_exceptions(logger: logging.Logger) -> Generator[None, None, None]: +def exit_at_exceptions( + logger: logging.Logger, daemon: bool = True +) -> Generator[None, None, None]: """Exit the program if an exception is raised.""" try: yield @@ -23,5 +25,9 @@ def exit_at_exceptions(logger: logging.Logger) -> Generator[None, None, None]: logger.error("An exception occurred: %s", e) quit(logger, unexpected=True) else: - logger.error("Loop finished unexpectedly.") - quit(logger, unexpected=True) + if daemon: + logger.error("Loop finished unexpectedly.") + quit(logger, unexpected=True) + else: + logger.info("Finished successfully.") + quit(logger, unexpected=False) diff --git a/tests/test_logging.py b/tests/test_logging.py index 90e999e..f131515 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -1,12 +1,17 @@ import pathlib import pytest -from scicat_configuration import GraylogOptions, RunOptions, ScicatConfig, kafkaOptions +from scicat_configuration import ( + GraylogOptions, + IngesterConfig, + RunOptions, + kafkaOptions, +) @pytest.fixture() -def scicat_config(tmp_path: pathlib.Path) -> ScicatConfig: - return ScicatConfig( +def scicat_config(tmp_path: pathlib.Path) -> IngesterConfig: + return IngesterConfig( original_dict={}, run_options=RunOptions( config_file='test', @@ -26,7 +31,7 @@ def scicat_config(tmp_path: pathlib.Path) -> ScicatConfig: ) -def test_scicat_logging_build_logger(scicat_config: ScicatConfig) -> None: +def test_scicat_logging_build_logger(scicat_config: IngesterConfig) -> None: from scicat_logging import build_logger logger = build_logger(scicat_config) diff --git a/tests/test_scicat_configuration.py b/tests/test_scicat_configuration.py index dacadd0..5bd8a08 100644 --- a/tests/test_scicat_configuration.py +++ b/tests/test_scicat_configuration.py @@ -3,7 +3,7 @@ import argparse import pytest -from scicat_configuration import ScicatConfig +from scicat_configuration import IngesterConfig @pytest.fixture() @@ -44,41 +44,70 @@ def test_scicat_arg_parser_configuration_matches( def test_build_scicat_config_default(main_arg_parser: argparse.ArgumentParser) -> None: """Test if the configuration can be built from default arguments.""" - from scicat_configuration import build_scicat_config + from scicat_configuration import build_scicat_ingester_config scicat_namespace = main_arg_parser.parse_args() - scicat_config = build_scicat_config(scicat_namespace) + scicat_config = build_scicat_ingester_config(scicat_namespace) assert scicat_config.run_options.config_file == 'config.20240405.json' @pytest.fixture() -def scicat_config(main_arg_parser: argparse.ArgumentParser) -> ScicatConfig: - from scicat_configuration import build_scicat_config +def ingester_config(main_arg_parser: argparse.ArgumentParser) -> IngesterConfig: + from scicat_configuration import build_scicat_ingester_config scicat_namespace = main_arg_parser.parse_args( ['-c', 'resources/config.sample.json', '--verbose'] ) - return build_scicat_config(scicat_namespace) + return build_scicat_ingester_config(scicat_namespace) -def test_build_scicat_config(scicat_config: ScicatConfig) -> None: +def test_build_scicat_config(ingester_config: IngesterConfig) -> None: """Test if the configuration can be built from arguments.""" - assert scicat_config.original_dict['options']['config_file'] == 'config.json' - assert scicat_config.run_options.config_file == 'resources/config.sample.json' - assert not scicat_config.original_dict['options']['verbose'] - assert scicat_config.run_options.verbose + assert ingester_config.original_dict['options']['config_file'] == 'config.json' + assert ingester_config.run_options.config_file == 'resources/config.sample.json' + assert not ingester_config.original_dict['options']['verbose'] + assert ingester_config.run_options.verbose -def test_scicat_config_original_dict_read_only(scicat_config: ScicatConfig) -> None: +def test_scicat_config_original_dict_read_only(ingester_config: IngesterConfig) -> None: """Test if the original dictionary is read-only.""" from types import MappingProxyType - assert isinstance(scicat_config.original_dict, MappingProxyType) - for sub_option in scicat_config.original_dict.values(): + assert isinstance(ingester_config.original_dict, MappingProxyType) + for sub_option in ingester_config.original_dict.values(): assert isinstance(sub_option, MappingProxyType) -def test_scicat_config_kafka_options(scicat_config: ScicatConfig) -> None: +def test_scicat_config_kafka_options(ingester_config: IngesterConfig) -> None: """Test if the Kafka options are correctly read.""" - assert scicat_config.kafka_options.topics == ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"] - assert scicat_config.kafka_options.enable_auto_commit + assert ingester_config.kafka_options.topics == ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"] + assert ingester_config.kafka_options.enable_auto_commit + + +def test_scicat_background_config_single_run_option() -> None: + """Test if the single run options are correctly read.""" + from scicat_configuration import ( + build_background_ingestor_arg_parser, + build_scicat_background_ingester_config, + ) + + arg_parser = build_background_ingestor_arg_parser() + scicat_namespace = arg_parser.parse_args( + [ + '-c', + 'resources/config.sample.json', + '--verbose', + '--nexus-file', + 'file.nxs', + '--done-writing-message-file', + 'file.json', + ] + ) + background_ingester_config = build_scicat_background_ingester_config( + scicat_namespace + ) + assert background_ingester_config.single_run_options.nexus_file == 'file.nxs' + assert ( + background_ingester_config.single_run_options.done_writing_message_file + == 'file.json' + )