Skip to content

Commit

Permalink
Parse bootstrap server address.
Browse files Browse the repository at this point in the history
  • Loading branch information
YooSunYoung committed May 27, 2024
1 parent 8a76c64 commit d721920
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions src/scicat_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,43 @@ def collect_consumer_options(options: kafkaOptions) -> dict:

# Build logger and formatter
config_dict = {
key.replace('_', '.'): value
key.replace("_", "."): value
for key, value in asdict(options).items()
if key not in ('topics', 'individual_message_commit')
if key not in ("topics", "individual_message_commit")
}
config_dict['enable.auto.commit'] = (
config_dict["enable.auto.commit"] = (
not options.individual_message_commit
) and options.enable_auto_commit
if isinstance(bootstrap_servers := options.bootstrap_servers, list):
# Convert the list to a comma-separated string
config_dict["bootstrap.servers"] = ",".join(bootstrap_servers)
else:
config_dict["bootstrap.servers"] = bootstrap_servers

return config_dict


def collect_kafka_topics(options: kafkaOptions) -> list[str]:
"""Return the Kafka topics as a list."""
if isinstance(options.topics, str):
return options.topics.split(',')
return options.topics.split(",")
elif isinstance(options.topics, list):
return options.topics
else:
raise TypeError('The topics must be a list or a comma-separated string.')
raise TypeError("The topics must be a list or a comma-separated string.")


def build_consumer(kafka_options: kafkaOptions, logger: logging.Logger) -> Consumer:
"""Build a Kafka consumer and configure it according to the ``options``."""
consumer_options = collect_consumer_options(kafka_options)
logger.info('Connecting to Kafka with the following parameters:')
logger.info("Connecting to Kafka with the following parameters:")
logger.info(consumer_options)
consumer = Consumer(consumer_options)
if not validate_consumer(consumer, logger):
return None

kafka_topics = collect_kafka_topics(kafka_options)
logger.info(f'Subscribing to the following Kafka topics: {kafka_topics}')
logger.info(f"Subscribing to the following Kafka topics: {kafka_topics}")
consumer.subscribe(kafka_topics)
return Consumer(consumer_options)

Expand All @@ -58,5 +64,5 @@ def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool:
)
return False
else:
logger.info('Kafka consumer successfully instantiated')
logger.info("Kafka consumer successfully instantiated")
return True

0 comments on commit d721920

Please sign in to comment.