diff --git a/resources/config.sample.json b/resources/config.sample.json index e4954af..12dd806 100644 --- a/resources/config.sample.json +++ b/resources/config.sample.json @@ -5,6 +5,7 @@ "bootstrap_servers": [ "HOST:9092" ], + "individual_message_commit": false, "enable_auto_commit": true, "auto_offset_reset": "earliest" }, diff --git a/src/scicat_configuration.py b/src/scicat_configuration.py index e736a59..fe99737 100644 --- a/src/scicat_configuration.py +++ b/src/scicat_configuration.py @@ -89,6 +89,13 @@ def build_main_arg_parser() -> argparse.ArgumentParser: @dataclass class RunOptions: + """RunOptions dataclass to store the configuration options. + + Most of options don't have default values because they are expected + to be set by the user either in the configuration file or through + command line arguments. + """ + config_file: str verbose: bool file_log: bool @@ -102,12 +109,36 @@ class RunOptions: pyscicat: Optional[str] = None +@dataclass +class kafkaOptions: + """KafkaOptions dataclass to store the configuration options. + + Default values are provided as they are not + expected to be set by command line arguments. + """ + + topics: list[str] | str = "KAFKA_TOPIC_1,KAFKA_TOPIC_2" + """List of Kafka topics. Multiple topics can be separated by commas.""" + group_id: str = "GROUP_ID" + """Kafka consumer group ID.""" + bootstrap_servers: list[str] | str = "localhost:9092" + """List of Kafka bootstrap servers. Multiple servers can be separated by commas.""" + individual_message_commit: bool = False + """Commit for each topic individually.""" + enable_auto_commit: bool = True + """Enable Kafka auto commit.""" + auto_offset_reset: str = "earliest" + """Kafka auto offset reset.""" + + @dataclass class ScicatConfig: original_dict: Mapping """Original configuration dictionary in the json file.""" run_options: RunOptions """Merged configuration dictionary with command line arguments.""" + kafka_options: kafkaOptions + """Kafka configuration options read from files.""" def to_dict(self) -> dict: """Return the configuration as a dictionary.""" @@ -119,7 +150,7 @@ def to_dict(self) -> dict: if isinstance(value, Mapping): original_dict[key] = dict(value) - copied = ScicatConfig(original_dict, self.run_options) + copied = ScicatConfig(original_dict, self.run_options, self.kafka_options) return asdict(copied) @@ -153,4 +184,5 @@ def build_scicat_config(input_args: argparse.Namespace) -> ScicatConfig: return ScicatConfig( original_dict=MappingProxyType(config_dict), run_options=RunOptions(**run_option_dict), + kafka_options=kafkaOptions(**config_dict.setdefault('kafka', dict())), ) diff --git a/src/scicat_ingestor.py b/src/scicat_ingestor.py index df888cc..b9bf2c2 100644 --- a/src/scicat_ingestor.py +++ b/src/scicat_ingestor.py @@ -1,9 +1,20 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) +import logging + from scicat_configuration import build_main_arg_parser, build_scicat_config +from scicat_kafka import build_consumer from scicat_logging import build_logger +def quit(logger: logging.Logger) -> None: + """Log the message and exit the program.""" + import sys + + logger.info("Exiting ingestor") + sys.exit() + + def main() -> None: """Main entry point of the app.""" arg_parser = build_main_arg_parser() @@ -14,3 +25,7 @@ def main() -> None: # Log the configuration as dictionary so that it is easier to read from the logs logger.info('Starting the Scicat Ingestor with the following configuration:') logger.info(config.to_dict()) + + # Kafka consumer + if build_consumer(config.kafka_options, logger) is None: + quit(logger) diff --git a/src/scicat_kafka.py b/src/scicat_kafka.py new file mode 100644 index 0000000..11fcd36 --- /dev/null +++ b/src/scicat_kafka.py @@ -0,0 +1,62 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) +import logging + +from confluent_kafka import Consumer + +from scicat_configuration import kafkaOptions + + +def collect_consumer_options(options: kafkaOptions) -> dict: + """Build a Kafka consumer and configure it according to the ``options``.""" + from dataclasses import asdict + + # Build logger and formatter + config_dict = { + key.replace('_', '.'): value + for key, value in asdict(options).items() + if key not in ('topics', 'individual_message_commit') + } + config_dict['enable.auto.commit'] = ( + not options.individual_message_commit + ) and options.enable_auto_commit + return config_dict + + +def collect_kafka_topics(options: kafkaOptions) -> list[str]: + """Return the Kafka topics as a list.""" + if isinstance(options.topics, str): + return options.topics.split(',') + elif isinstance(options.topics, list): + return options.topics + else: + raise TypeError('The topics must be a list or a comma-separated string.') + + +def build_consumer(kafka_options: kafkaOptions, logger: logging.Logger) -> Consumer: + """Build a Kafka consumer and configure it according to the ``options``.""" + consumer_options = collect_consumer_options(kafka_options) + logger.info('Connecting to Kafka with the following parameters:') + logger.info(consumer_options) + consumer = Consumer(consumer_options) + if not validate_consumer(consumer, logger): + return None + + kafka_topics = collect_kafka_topics(kafka_options) + logger.info(f'Subscribing to the following Kafka topics: {kafka_topics}') + consumer.subscribe(kafka_topics) + return Consumer(consumer_options) + + +def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool: + try: + consumer.list_topics(timeout=1) + except Exception as err: + logger.error( + "Kafka consumer could not be instantiated. " + f"Error message from kafka thread: \n{err}" + ) + return False + else: + logger.info('Kafka consumer successfully instantiated') + return True diff --git a/tests/test_logging.py b/tests/test_logging.py index f7e11dc..daf3f2e 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -2,7 +2,7 @@ import pytest -from scicat_configuration import RunOptions, ScicatConfig +from scicat_configuration import RunOptions, ScicatConfig, kafkaOptions @pytest.fixture @@ -22,6 +22,7 @@ def scicat_config(tmp_path: pathlib.Path) -> ScicatConfig: check_by_job_id=True, pyscicat='test', ), + kafka_options=kafkaOptions(), ) diff --git a/tests/test_scicat_configuration.py b/tests/test_scicat_configuration.py index 193ac8e..09fb5d2 100644 --- a/tests/test_scicat_configuration.py +++ b/tests/test_scicat_configuration.py @@ -77,3 +77,9 @@ def test_scicat_config_original_dict_read_only(scicat_config: ScicatConfig) -> N assert isinstance(scicat_config.original_dict, MappingProxyType) for sub_option in scicat_config.original_dict.values(): assert isinstance(sub_option, MappingProxyType) + + +def test_scicat_config_kafka_options(scicat_config: ScicatConfig) -> None: + """Test if the Kafka options are correctly read.""" + assert scicat_config.kafka_options.topics == ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"] + assert scicat_config.kafka_options.enable_auto_commit