Skip to content

Commit

Permalink
Update the livestream consumer for new schema distribution (#156)
Browse files Browse the repository at this point in the history
* Update the livestream consumer for new schema distribution

* Fix key is None

* Update doc
  • Loading branch information
JulienPeloton authored Jan 25, 2023
1 parent 1dd7537 commit 167664b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 56 deletions.
6 changes: 3 additions & 3 deletions docs/livestream_manual.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# Fink livestream manual

_date 11/02/2022_
_date 25/01/2023_

This manual has been tested for `fink-client` version 2.10. Other versions might work. In case of trouble, send us an email ([email protected]) or open an issue (https://github.com/astrolabsoftware/fink-client).
This manual has been tested for `fink-client` version 4.0. Other versions might work. In case of trouble, send us an email ([email protected]) or open an issue (https://github.com/astrolabsoftware/fink-client).

## Installation of fink-client

From a terminal, you can install fink-client simply using `pip`:

```
pip install fink-client
pip install fink-client --upgrade
```

This should install all necessary dependencies.
Expand Down
2 changes: 1 addition & 1 deletion fink_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "3.1"
__version__ = "4.0"
__schema_version__ = "distribution_schema_fink_ztf_{}.avsc"
103 changes: 53 additions & 50 deletions fink_client/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import json
import fastavro
import confluent_kafka

from fink_client.avroUtils import write_alert
Expand Down Expand Up @@ -50,7 +52,6 @@ def __init__(self, topics: list, config: dict, schema_path=None):
self._topics = topics
self._kafka_config = _get_kafka_config(config)
self.schema_path = schema_path
# self._parsed_schema = _get_alert_schema(schema_path=schema_path)
self._consumer = confluent_kafka.Consumer(self._kafka_config)
self._consumer.subscribe(self._topics)

Expand Down Expand Up @@ -93,30 +94,34 @@ def poll(self, timeout: float = -1) -> (str, dict):

# decode the key if it is bytes
key = msg.key()

if type(key) == bytes:
key = key.decode('utf8')

if key is None:
# backward compatibility
key = '1.0_0.4.3'
# compatibility with previous scheme
key = ""

# Get the schema
if self.schema_path is not None:
_parsed_schema = _get_alert_schema(schema_path=self.schema_path)
try:
_parsed_schema = fastavro.schema.parse_schema(json.loads(key))
alert = self._decode_msg(_parsed_schema, msg)
elif key is not None:
try:
_parsed_schema = _get_alert_schema(key=key)
alert = self._decode_msg(_parsed_schema, msg)
except IndexError:
_parsed_schema = _get_alert_schema(key=key + '_replayed')
except json.JSONDecodeError:
# Old way
if self.schema_path is not None:
_parsed_schema = _get_alert_schema(schema_path=self.schema_path)
alert = self._decode_msg(_parsed_schema, msg)
else:
msg = """
The message cannot be decoded as there is no key (None). Alternatively
specify manually the schema path when instantiating ``AlertConsumer`` (or from fink_consumer).
"""
raise NotImplementedError(msg)
elif key is not None:
try:
_parsed_schema = _get_alert_schema(key=key)
alert = self._decode_msg(_parsed_schema, msg)
except IndexError:
_parsed_schema = _get_alert_schema(key=key + '_replayed')
alert = self._decode_msg(_parsed_schema, msg)
else:
msg = """
The message cannot be decoded as there is no key (None). Alternatively
specify manually the schema path when instantiating ``AlertConsumer`` (or from fink_consumer).
"""
raise NotImplementedError(msg)

return topic, alert, key

Expand Down Expand Up @@ -161,31 +166,42 @@ def consume(self, num_alerts: int = 1, timeout: float = -1) -> list:
msg_list = self._consumer.consume(num_alerts, timeout)

for msg in msg_list:
if msg is None:
alerts.append((None, None, None))
continue

topic = msg.topic()

# decode the key if it is bytes
key = msg.key()

if type(key) == bytes:
key = key.decode('utf8')

if key is None:
# backward compatibility
key = '1.0_0.4.3'
# compatibility with previous scheme
key = ""

# Get the schema
if self.schema_path is not None:
_parsed_schema = _get_alert_schema(schema_path=self.schema_path)
elif key is not None:
_parsed_schema = _get_alert_schema(key=key)
else:
msg = """
The message cannot be decoded as there is no key (None). Either specify a
key when writing the alert, or specify manually the schema path when
instantiating ``AlertConsumer`` (or from fink_consumer).
"""
raise NotImplementedError(msg)
avro_alert = io.BytesIO(msg.value())
alert = _decode_avro_alert(avro_alert, _parsed_schema)
try:
_parsed_schema = fastavro.schema.parse_schema(json.loads(key))
alert = self._decode_msg(_parsed_schema, msg)
except json.JSONDecodeError:
# Old way
if self.schema_path is not None:
_parsed_schema = _get_alert_schema(schema_path=self.schema_path)
alert = self._decode_msg(_parsed_schema, msg)
elif key is not None:
try:
_parsed_schema = _get_alert_schema(key=key)
alert = self._decode_msg(_parsed_schema, msg)
except IndexError:
_parsed_schema = _get_alert_schema(key=key + '_replayed')
alert = self._decode_msg(_parsed_schema, msg)
else:
msg = """
The message cannot be decoded as there is no key (None). Alternatively
specify manually the schema path when instantiating ``AlertConsumer`` (or from fink_consumer).
"""
raise NotImplementedError(msg)

alerts.append((topic, alert, key))

Expand Down Expand Up @@ -217,20 +233,7 @@ def poll_and_write(
topic, alert, key = self.poll(timeout)

if topic is not None:
# Get the schema
if self.schema_path is not None:
_parsed_schema = _get_alert_schema(schema_path=self.schema_path)
elif key is not None:
_parsed_schema = _get_alert_schema(key=key)
else:
msg = """
The message cannot be written as there is no key (None). Either specify a
key when writing the alert, or specify manually the schema path when
instantiating ``AlertConsumer`` (or from fink_consumer).
"""
raise NotImplementedError(msg)
# print('Alert written at {}'.format(outdir))
write_alert(alert, _parsed_schema, outdir, overwrite=overwrite)
write_alert(alert, self._parsed_schema, outdir, overwrite=overwrite)

return topic, alert, key

Expand Down
4 changes: 2 additions & 2 deletions fink_client/scripts/fink_consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
# Copyright 2019-2022 AstroLab Software
# Copyright 2019-2023 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -44,7 +44,7 @@ def main():
help="Folder to store incoming alerts if --save is set. It must exist.")
parser.add_argument(
'-schema', type=str, default=None,
help="Avro schema to decode the incoming alerts. Default is None (latest version downloaded from server)")
help="Avro schema to decode the incoming alerts. Default is None (version taken from each alert)")
args = parser.parse_args(None)

# load user configuration
Expand Down

0 comments on commit 167664b

Please sign in to comment.