From d72192044768d6277acc45a339f2cd2f7322fd57 Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Mon, 27 May 2024 17:41:15 +0200 Subject: [PATCH] Parse bootstrap server address. --- src/scicat_kafka.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/scicat_kafka.py b/src/scicat_kafka.py index 11fcd36..a832867 100644 --- a/src/scicat_kafka.py +++ b/src/scicat_kafka.py @@ -13,37 +13,43 @@ def collect_consumer_options(options: kafkaOptions) -> dict: # Build logger and formatter config_dict = { - key.replace('_', '.'): value + key.replace("_", "."): value for key, value in asdict(options).items() - if key not in ('topics', 'individual_message_commit') + if key not in ("topics", "individual_message_commit") } - config_dict['enable.auto.commit'] = ( + config_dict["enable.auto.commit"] = ( not options.individual_message_commit ) and options.enable_auto_commit + if isinstance(bootstrap_servers := options.bootstrap_servers, list): + # Convert the list to a comma-separated string + config_dict["bootstrap.servers"] = ",".join(bootstrap_servers) + else: + config_dict["bootstrap.servers"] = bootstrap_servers + return config_dict def collect_kafka_topics(options: kafkaOptions) -> list[str]: """Return the Kafka topics as a list.""" if isinstance(options.topics, str): - return options.topics.split(',') + return options.topics.split(",") elif isinstance(options.topics, list): return options.topics else: - raise TypeError('The topics must be a list or a comma-separated string.') + raise TypeError("The topics must be a list or a comma-separated string.") def build_consumer(kafka_options: kafkaOptions, logger: logging.Logger) -> Consumer: """Build a Kafka consumer and configure it according to the ``options``.""" consumer_options = collect_consumer_options(kafka_options) - logger.info('Connecting to Kafka with the following parameters:') + logger.info("Connecting to Kafka with the following parameters:") logger.info(consumer_options) consumer = Consumer(consumer_options) if not validate_consumer(consumer, logger): return None kafka_topics = collect_kafka_topics(kafka_options) - logger.info(f'Subscribing to the following Kafka topics: {kafka_topics}') + logger.info(f"Subscribing to the following Kafka topics: {kafka_topics}") consumer.subscribe(kafka_topics) return Consumer(consumer_options) @@ -58,5 +64,5 @@ def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool: ) return False else: - logger.info('Kafka consumer successfully instantiated') + logger.info("Kafka consumer successfully instantiated") return True