diff --git a/docs/livestream_manual.md b/docs/livestream_manual.md index 8fe4bba..2f1d9f8 100644 --- a/docs/livestream_manual.md +++ b/docs/livestream_manual.md @@ -1,15 +1,15 @@ # Fink livestream manual -_date 11/02/2022_ +_date 25/01/2023_ -This manual has been tested for `fink-client` version 2.10. Other versions might work. In case of trouble, send us an email (contact@fink-broker.org) or open an issue (https://github.com/astrolabsoftware/fink-client). +This manual has been tested for `fink-client` version 4.0. Other versions might work. In case of trouble, send us an email (contact@fink-broker.org) or open an issue (https://github.com/astrolabsoftware/fink-client). ## Installation of fink-client From a terminal, you can install fink-client simply using `pip`: ``` -pip install fink-client +pip install fink-client --upgrade ``` This should install all necessary dependencies. diff --git a/fink_client/__init__.py b/fink_client/__init__.py index d9ff20a..a7f5482 100644 --- a/fink_client/__init__.py +++ b/fink_client/__init__.py @@ -12,5 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "3.1" +__version__ = "4.0" __schema_version__ = "distribution_schema_fink_ztf_{}.avsc" diff --git a/fink_client/consumer.py b/fink_client/consumer.py index 78d5824..78e2e32 100644 --- a/fink_client/consumer.py +++ b/fink_client/consumer.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. import io +import json +import fastavro import confluent_kafka from fink_client.avroUtils import write_alert @@ -50,7 +52,6 @@ def __init__(self, topics: list, config: dict, schema_path=None): self._topics = topics self._kafka_config = _get_kafka_config(config) self.schema_path = schema_path - # self._parsed_schema = _get_alert_schema(schema_path=schema_path) self._consumer = confluent_kafka.Consumer(self._kafka_config) self._consumer.subscribe(self._topics) @@ -93,30 +94,34 @@ def poll(self, timeout: float = -1) -> (str, dict): # decode the key if it is bytes key = msg.key() + if type(key) == bytes: key = key.decode('utf8') - if key is None: - # backward compatibility - key = '1.0_0.4.3' + # compatibility with previous scheme + key = "" - # Get the schema - if self.schema_path is not None: - _parsed_schema = _get_alert_schema(schema_path=self.schema_path) + try: + _parsed_schema = fastavro.schema.parse_schema(json.loads(key)) alert = self._decode_msg(_parsed_schema, msg) - elif key is not None: - try: - _parsed_schema = _get_alert_schema(key=key) - alert = self._decode_msg(_parsed_schema, msg) - except IndexError: - _parsed_schema = _get_alert_schema(key=key + '_replayed') + except json.JSONDecodeError: + # Old way + if self.schema_path is not None: + _parsed_schema = _get_alert_schema(schema_path=self.schema_path) alert = self._decode_msg(_parsed_schema, msg) - else: - msg = """ - The message cannot be decoded as there is no key (None). Alternatively - specify manually the schema path when instantiating ``AlertConsumer`` (or from fink_consumer). - """ - raise NotImplementedError(msg) + elif key is not None: + try: + _parsed_schema = _get_alert_schema(key=key) + alert = self._decode_msg(_parsed_schema, msg) + except IndexError: + _parsed_schema = _get_alert_schema(key=key + '_replayed') + alert = self._decode_msg(_parsed_schema, msg) + else: + msg = """ + The message cannot be decoded as there is no key (None). Alternatively + specify manually the schema path when instantiating ``AlertConsumer`` (or from fink_consumer). + """ + raise NotImplementedError(msg) return topic, alert, key @@ -161,31 +166,42 @@ def consume(self, num_alerts: int = 1, timeout: float = -1) -> list: msg_list = self._consumer.consume(num_alerts, timeout) for msg in msg_list: + if msg is None: + alerts.append((None, None, None)) + continue + topic = msg.topic() # decode the key if it is bytes key = msg.key() + if type(key) == bytes: key = key.decode('utf8') - if key is None: - # backward compatibility - key = '1.0_0.4.3' + # compatibility with previous scheme + key = "" - # Get the schema - if self.schema_path is not None: - _parsed_schema = _get_alert_schema(schema_path=self.schema_path) - elif key is not None: - _parsed_schema = _get_alert_schema(key=key) - else: - msg = """ - The message cannot be decoded as there is no key (None). Either specify a - key when writing the alert, or specify manually the schema path when - instantiating ``AlertConsumer`` (or from fink_consumer). - """ - raise NotImplementedError(msg) - avro_alert = io.BytesIO(msg.value()) - alert = _decode_avro_alert(avro_alert, _parsed_schema) + try: + _parsed_schema = fastavro.schema.parse_schema(json.loads(key)) + alert = self._decode_msg(_parsed_schema, msg) + except json.JSONDecodeError: + # Old way + if self.schema_path is not None: + _parsed_schema = _get_alert_schema(schema_path=self.schema_path) + alert = self._decode_msg(_parsed_schema, msg) + elif key is not None: + try: + _parsed_schema = _get_alert_schema(key=key) + alert = self._decode_msg(_parsed_schema, msg) + except IndexError: + _parsed_schema = _get_alert_schema(key=key + '_replayed') + alert = self._decode_msg(_parsed_schema, msg) + else: + msg = """ + The message cannot be decoded as there is no key (None). Alternatively + specify manually the schema path when instantiating ``AlertConsumer`` (or from fink_consumer). + """ + raise NotImplementedError(msg) alerts.append((topic, alert, key)) @@ -217,20 +233,7 @@ def poll_and_write( topic, alert, key = self.poll(timeout) if topic is not None: - # Get the schema - if self.schema_path is not None: - _parsed_schema = _get_alert_schema(schema_path=self.schema_path) - elif key is not None: - _parsed_schema = _get_alert_schema(key=key) - else: - msg = """ - The message cannot be written as there is no key (None). Either specify a - key when writing the alert, or specify manually the schema path when - instantiating ``AlertConsumer`` (or from fink_consumer). - """ - raise NotImplementedError(msg) - # print('Alert written at {}'.format(outdir)) - write_alert(alert, _parsed_schema, outdir, overwrite=overwrite) + write_alert(alert, self._parsed_schema, outdir, overwrite=overwrite) return topic, alert, key diff --git a/fink_client/scripts/fink_consumer.py b/fink_client/scripts/fink_consumer.py index 94821e5..7c8735e 100755 --- a/fink_client/scripts/fink_consumer.py +++ b/fink_client/scripts/fink_consumer.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2019-2022 AstroLab Software +# Copyright 2019-2023 AstroLab Software # Author: Julien Peloton # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -44,7 +44,7 @@ def main(): help="Folder to store incoming alerts if --save is set. It must exist.") parser.add_argument( '-schema', type=str, default=None, - help="Avro schema to decode the incoming alerts. Default is None (latest version downloaded from server)") + help="Avro schema to decode the incoming alerts. Default is None (version taken from each alert)") args = parser.parse_args(None) # load user configuration