diff --git a/fink_client/avro_utils.py b/fink_client/avro_utils.py index d4efaee..d63f4e3 100644 --- a/fink_client/avro_utils.py +++ b/fink_client/avro_utils.py @@ -245,6 +245,12 @@ def write_alert( path: str Folder that will contain the alert. The filename will always be .avro + overwrite: bool, optional + If True, overwrite existing alert. Default is False. + id1: str, optional + First prefix for alert name: {id1}_{id2}.avro + id2: str, optional + Second prefix for alert name: {id1}_{id2}.avro Examples -------- diff --git a/fink_client/consumer.py b/fink_client/consumer.py index 8eb5e99..152098e 100644 --- a/fink_client/consumer.py +++ b/fink_client/consumer.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import io +import sys import json import time import fastavro @@ -35,7 +36,14 @@ class AlertError(Exception): class AlertConsumer: """High level Kafka consumer to receive alerts from Fink broker""" - def __init__(self, topics: list, config: dict, schema_path=None, dump_schema=False): + def __init__( + self, + topics: list, + config: dict, + schema_path=None, + dump_schema=False, + on_assign=None, + ): """Creates an instance of `AlertConsumer` Parameters @@ -52,12 +60,27 @@ def __init__(self, topics: list, config: dict, schema_path=None, dump_schema=Fal group.id for Kafka consumer bootstrap.servers: str, optional Kafka servers to connect to + schema_path: str, optional + If specified, path to an alert schema (avsc). + Default is None. + dump_schema: bool, optional + If True, save incoming alert schema on disk. + Useful for schema inspection when getting `IndexError`. + Default is False. + on_assign: callable, optional + Callback to update the current assignment + and specify start offsets. Default is None. + """ self._topics = topics self._kafka_config = _get_kafka_config(config) self.schema_path = schema_path self._consumer = confluent_kafka.Consumer(self._kafka_config) - self._consumer.subscribe(self._topics) + + if on_assign is not None: + self._consumer.subscribe(self._topics, on_assign=on_assign) + else: + self._consumer.subscribe(self._topics) self.dump_schema = dump_schema def __enter__(self): @@ -281,7 +304,9 @@ def close(self): self._consumer.close() -def return_offsets(consumer, topic, waitfor=1, timeout=10, verbose=False): +def return_offsets( + consumer, topic, waitfor=1, timeout=10, hide_empty_partition=True, verbose=False +): """Poll servers to get the total committed offsets, and remaining lag Parameters @@ -294,6 +319,9 @@ def return_offsets(consumer, topic, waitfor=1, timeout=10, verbose=False): Time in second to wait before polling. Default is 1 second. timeout: int, optional Timeout in second when polling the servers. Default is 10. + hide_empty_partition: bool, optional + If True, display only non-empty partitions. + Default is True verbose: bool, optional If True, prints useful table. Default is False. @@ -357,18 +385,117 @@ def return_offsets(consumer, topic, waitfor=1, timeout=10, verbose=False): total_lag = total_lag + int(lag) if verbose: - print( - "%-50s %9s %9s" - % ("{} [{}]".format(partition.topic, partition.partition), offset, lag) - ) + if (hide_empty_partition and offset != "-") or (not hide_empty_partition): + print( + "%-50s %9s %9s" + % ( + "{} [{}]".format(partition.topic, partition.partition), + offset, + lag, + ) + ) if verbose: print("-" * 72) - print("%-50s %9s %9s" % ("Total", total_offsets, total_lag)) + print( + "%-50s %9s %9s" % ("Total for {}".format(topic), total_offsets, total_lag) + ) print("-" * 72) return total_offsets, total_lag +def return_last_offsets(kafka_config, topic): + """Return the last offsets + + Parameters + ---------- + kafka_config: dict + Kafka consumer config + topic: str + Topic name + + Returns + ------- + offsets: list + Last offsets of each partition + """ + consumer = confluent_kafka.Consumer(kafka_config) + topics = ["{}".format(topic)] + consumer.subscribe(topics) + + metadata = consumer.list_topics(topic) + if metadata.topics[topic].error is not None: + raise confluent_kafka.KafkaException(metadata.topics[topic].error) + + # List of partitions + partitions = [ + confluent_kafka.TopicPartition(topic, p) + for p in metadata.topics[topic].partitions + ] + committed = consumer.committed(partitions) + offsets = [] + for partition in committed: + if partition.offset != confluent_kafka.OFFSET_INVALID: + offsets.append(partition.offset) + else: + offsets.append(0) + + consumer.close() + return offsets + + +def print_offsets( + kafka_config, topic, maxtimeout=10, hide_empty_partition=True, verbose=True +): + """Wrapper around `consumer.return_offsets` + + If the server is rebalancing the offsets, it will exit the program. + + Parameters + ---------- + kafka_config: dic + Dictionary with consumer parameters + topic: str + Topic name + maxtimeout: int, optional + Timeout in second, when polling the servers + hide_empty_partition: bool, optional + If True, display only non-empty partitions. + Default is True + verbose: bool, optional + If True, prints useful table. Default is True. + + Returns + ------- + total_offsets: int + Total number of messages committed across all partitions + total_lag: int + Remaining messages in the topic across all partitions. + """ + consumer = confluent_kafka.Consumer(kafka_config) + + topics = ["{}".format(topic)] + consumer.subscribe(topics) + total_offset, total_lag = return_offsets( + consumer, + topic, + timeout=maxtimeout, + waitfor=0, + verbose=verbose, + hide_empty_partition=hide_empty_partition, + ) + if (total_offset, total_lag) == (-1, -1): + print( + "Warning: Consumer group '{}' is rebalancing. Please wait.".format( + kafka_config["group.id"] + ) + ) + sys.exit() + consumer.close() + + return total_lag, total_offset + + def _get_kafka_config(config: dict) -> dict: """Returns configurations for a consumer instance @@ -392,7 +519,7 @@ def _get_kafka_config(config: dict) -> dict: kafka_config["sasl.username"] = config["username"] kafka_config["sasl.password"] = config["password"] - kafka_config["group.id"] = config["group_id"] + kafka_config["group.id"] = config["group.id"] kafka_config.update(default_config) @@ -405,3 +532,103 @@ def _get_kafka_config(config: dict) -> dict: kafka_config["bootstrap.servers"] = "{}".format(",".join(fink_servers)) return kafka_config + + +def return_npartitions(topic, kafka_config): + """Get the number of partitions + + Parameters + ---------- + kafka_config: dic + Dictionary with consumer parameters + topic: str + Topic name + + Returns + ------- + nbpartitions: int + Number of partitions in the topic + + """ + consumer = confluent_kafka.Consumer(kafka_config) + + # Details to get + nbpartitions = 0 + try: + # Topic metadata + metadata = consumer.list_topics(topic=topic) + + if metadata.topics and topic in metadata.topics: + partitions = metadata.topics[topic].partitions + nbpartitions = len(partitions) + else: + print("The topic {} does not exist".format(topic)) + + except confluent_kafka.KafkaException as e: + print(f"Error while getting the number of partitions: {e}") + + consumer.close() + + return nbpartitions + + +def return_partition_offset(consumer, topic, partition): + """Return the offset and the remaining lag of a partition + + consumer: confluent_kafka.Consumer + Kafka consumer + topic: str + Topic name + partition: int + The partition number + + Returns + ------- + offset : int + Total number of offsets in the topic + """ + topicPartition = confluent_kafka.TopicPartition(topic, partition) + low_offset, high_offset = consumer.get_watermark_offsets(topicPartition) + partition_size = high_offset - low_offset + + return partition_size + + +def get_schema_from_stream(kafka_config, topic, maxtimeout): + """Poll the schema data from the schema topic + + Parameters + ---------- + kafka_config: dic + Dictionary with consumer parameters + topic: str + Topic name + timeout: int, optional + Timeout in second, when polling the servers + + Returns + ------- + schema: None or dic + Schema data. None if the poll was not successful. + Reasons to get None: + 1. timeout has been reached (increase timeout) + 2. topic is empty (produce new data) + 3. topic does not exist (create the topic) + """ + # Instantiate a consumer + consumer_schema = confluent_kafka.Consumer(kafka_config) + + # Subscribe to schema topic + topics = ["{}_schema".format(topic)] + consumer_schema.subscribe(topics) + + # Poll + msg = consumer_schema.poll(maxtimeout) + if msg is not None: + schema = fastavro.schema.parse_schema(json.loads(msg.key())) + else: + schema = None + + consumer_schema.close() + + return schema diff --git a/fink_client/scripts/fink_consumer.py b/fink_client/scripts/fink_consumer.py index 0b84dbc..289f2fc 100755 --- a/fink_client/scripts/fink_consumer.py +++ b/fink_client/scripts/fink_consumer.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2019-2023 AstroLab Software +# Copyright 2019-2024 AstroLab Software # Author: Julien Peloton # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -28,6 +28,8 @@ from fink_client.configuration import load_credentials from fink_client.configuration import mm_topic_names +from fink_client.consumer import print_offsets + def main(): """ """ @@ -37,6 +39,11 @@ def main(): action="store_true", help="If specified, print on screen information about incoming alert.", ) + parser.add_argument( + "--display_statistics", + action="store_true", + help="If specified, print on screen information about queues, and exit.", + ) parser.add_argument( "-limit", type=int, @@ -70,27 +77,74 @@ def main(): action="store_true", help="If specified, save the schema on disk (json file)", ) + parser.add_argument( + "-start_at", + type=str, + default="", + help=r"If specified, reset offsets to 0 (`earliest`) or empty queue (`latest`).", + ) args = parser.parse_args(None) # load user configuration conf = load_credentials() myconfig = { - "username": conf["username"], "bootstrap.servers": conf["servers"], - "group_id": conf["group_id"], + "group.id": conf["group_id"], } if conf["password"] is not None: myconfig["password"] = conf["password"] + if args.display_statistics: + print() + for topic in conf["mytopics"]: + total_lag, total_offset = print_offsets( + myconfig, topic, conf["maxtimeout"], verbose=True + ) + print() + sys.exit(0) + # Instantiate a consumer if args.schema is None: schema = None else: schema = args.schema + + if args.start_at != "": + if args.start_at == "earliest": + + def assign_offset(consumer, partitions): + print("Resetting offsets to BEGINNING") + for p in partitions: + low, high = consumer.get_watermark_offsets(p) + p.offset = low + print("assign", p) + consumer.assign(partitions) + elif args.start_at == "latest": + + def assign_offset(consumer, partitions): + print("Resetting offsets to END") + for p in partitions: + low, high = consumer.get_watermark_offsets(p) + p.offset = high + print("assign", p) + consumer.assign(partitions) + else: + raise AttributeError( + "{} not recognized. -start_at should be `earliest` or `latest`.".format( + args.start_at + ) + ) + else: + assign_offset = None + consumer = AlertConsumer( - conf["mytopics"], myconfig, schema_path=schema, dump_schema=args.dump_schema + conf["mytopics"], + myconfig, + schema_path=schema, + dump_schema=args.dump_schema, + on_assign=assign_offset, ) if args.available_topics: diff --git a/fink_client/scripts/fink_datatransfer.py b/fink_client/scripts/fink_datatransfer.py index 4220545..1fe2165 100755 --- a/fink_client/scripts/fink_datatransfer.py +++ b/fink_client/scripts/fink_datatransfer.py @@ -18,7 +18,6 @@ import sys import os import io -import json import argparse import logging import psutil @@ -37,221 +36,13 @@ from fink_client.configuration import load_credentials -from fink_client.consumer import return_offsets - - -def print_offsets(kafka_config, topic, maxtimeout=10, verbose=True): - """Wrapper around `consumer.return_offsets` - - If the server is rebalancing the offsets, it will exit the program. - - Parameters - ---------- - kafka_config: dic - Dictionary with consumer parameters - topic: str - Topic name - maxtimeout: int, optional - Timeout in second, when polling the servers - - Returns - ------- - total_offsets: int - Total number of messages committed across all partitions - total_lag: int - Remaining messages in the topic across all partitions. - """ - consumer = confluent_kafka.Consumer(kafka_config) - - topics = ["{}".format(topic)] - consumer.subscribe(topics) - total_offset, total_lag = return_offsets( - consumer, topic, timeout=maxtimeout, waitfor=0, verbose=verbose - ) - if (total_offset, total_lag) == (-1, -1): - print( - "Warning: Consumer group '{}' is rebalancing. Please wait.".format( - kafka_config["group.id"] - ) - ) - sys.exit() - consumer.close() - - return total_lag, total_offset - - -def get_schema(kafka_config, topic, maxtimeout): - """Poll the schema data from the schema topic - - Parameters - ---------- - kafka_config: dic - Dictionary with consumer parameters - topic: str - Topic name - timeout: int, optional - Timeout in second, when polling the servers - - Returns - ------- - schema: None or dic - Schema data. None if the poll was not successful. - Reasons to get None: - 1. timeout has been reached (increase timeout) - 2. topic is empty (produce new data) - 3. topic does not exist (create the topic) - """ - # Instantiate a consumer - consumer_schema = confluent_kafka.Consumer(kafka_config) - - # Subscribe to schema topic - topics = ["{}_schema".format(topic)] - consumer_schema.subscribe(topics) - - # Poll - msg = consumer_schema.poll(maxtimeout) - if msg is not None: - schema = fastavro.schema.parse_schema(json.loads(msg.key())) - else: - schema = None - - consumer_schema.close() - - return schema - - -def my_assign(consumer, partitions): - """Function to reset offsets when (re)polling - - It must be passed when subscribing to a topic: - `consumer.subscribe(topics, on_assign=my_assign)` - - Parameters - ---------- - consumer: confluent_kafka.Consumer - Kafka consumer - partitions: Kafka partitions - Internal object to deal with partitions - """ - for p in partitions: - p.offset = 0 - consumer.assign(partitions) - - -def reset_offset(kafka_config, topic): - """Rest offsets for a given topic - - Parameters - ---------- - kafka_config: dict - Kafka server parameters - topic: str - Topic name - """ - consumer = confluent_kafka.Consumer(kafka_config) - topics = ["{}".format(topic)] - consumer.subscribe(topics, on_assign=my_assign) - consumer.close() - - -def return_partition_offset(consumer, topic, partition): - """Return the offset and the remaining lag of a partition - - consumer: confluent_kafka.Consumer - Kafka consumer - topic: str - Topic name - partition: int - The partition number - - Returns - ------- - offset : int - Total number of offsets in the topic - """ - topicPartition = confluent_kafka.TopicPartition(topic, partition) - low_offset, high_offset = consumer.get_watermark_offsets(topicPartition) - partition_size = high_offset - low_offset - - return partition_size - - -def return_npartitions(topic, kafka_config): - """Get the number of partitions - - Parameters - ---------- - kafka_config: dic - Dictionary with consumer parameters - topic: str - Topic name - - Returns - ------- - nbpartitions: int - Number of partitions in the topic - - """ - consumer = confluent_kafka.Consumer(kafka_config) - - # Details to get - nbpartitions = 0 - try: - # Topic metadata - metadata = consumer.list_topics(topic=topic) - - if metadata.topics and topic in metadata.topics: - partitions = metadata.topics[topic].partitions - nbpartitions = len(partitions) - else: - print("The topic {} does not exist".format(topic)) - - except confluent_kafka.KafkaException as e: - print(f"Error while getting the number of partitions: {e}") - - consumer.close() - - return nbpartitions - - -def return_last_offsets(kafka_config, topic): - """Return the last offsets - - Parameters - ---------- - kafka_config: dict - Kafka consumer config - topic: str - Topic name - - Returns - ------- - offsets: list - Last offsets of each partition - """ - consumer = confluent_kafka.Consumer(kafka_config) - topics = ["{}".format(topic)] - consumer.subscribe(topics) - - metadata = consumer.list_topics(topic) - if metadata.topics[topic].error is not None: - raise confluent_kafka.KafkaException(metadata.topics[topic].error) - - # List of partitions - partitions = [ - confluent_kafka.TopicPartition(topic, p) - for p in metadata.topics[topic].partitions - ] - committed = consumer.committed(partitions) - offsets = [] - for partition in committed: - if partition.offset != confluent_kafka.OFFSET_INVALID: - offsets.append(partition.offset) - else: - offsets.append(0) - - consumer.close() - return offsets +from fink_client.consumer import ( + print_offsets, + return_npartitions, + return_partition_offset, + return_last_offsets, + get_schema_from_stream, +) def poll(process_id, nconsumers, queue, schema, kafka_config, rng, args): @@ -529,14 +320,18 @@ def main(): if args.restart_from_beginning: total_lag, total_offset = print_offsets( - kafka_config, args.topic, args.maxtimeout, verbose=False + kafka_config, + args.topic, + args.maxtimeout, + verbose=False, + hide_empty_partition=False, ) args.total_lag = total_offset args.total_offset = 0 offsets = [0 for _ in range(args.number_partitions)] else: total_lag, total_offset = print_offsets( - kafka_config, args.topic, args.maxtimeout + kafka_config, args.topic, args.maxtimeout, hide_empty_partition=False ) args.total_lag = total_lag args.total_offset = total_offset @@ -551,7 +346,7 @@ def main(): if (args.limit is not None) and (args.limit < args.batchsize): args.batchsize = args.limit - schema = get_schema(kafka_config, args.topic, args.maxtimeout) + schema = get_schema_from_stream(kafka_config, args.topic, args.maxtimeout) if schema is None: # TBD: raise error print(