Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Livestream] Add possibility to dump schema for inspection #191

Merged
merged 1 commit into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion fink_client/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import time
import fastavro
import confluent_kafka
from datetime import datetime, timezone

from fink_client.avroUtils import write_alert
from fink_client.avroUtils import _get_alert_schema
Expand All @@ -33,7 +34,7 @@ class AlertConsumer:
High level Kafka consumer to receive alerts from Fink broker
"""

def __init__(self, topics: list, config: dict, schema_path=None):
def __init__(self, topics: list, config: dict, schema_path=None, dump_schema=False):
"""Creates an instance of `AlertConsumer`

Parameters
Expand All @@ -56,6 +57,7 @@ def __init__(self, topics: list, config: dict, schema_path=None):
self.schema_path = schema_path
self._consumer = confluent_kafka.Consumer(self._kafka_config)
self._consumer.subscribe(self._topics)
self.dump_schema = dump_schema

def __enter__(self):
return self
Expand Down Expand Up @@ -101,6 +103,12 @@ def process_message(self, msg):

try:
_parsed_schema = fastavro.schema.parse_schema(json.loads(key))
if self.dump_schema:
today = datetime.now(timezone.utc).isoformat()
filename = 'schema_{}.json'.format(today)
with open(filename, 'w') as json_file:
json.dump(_parsed_schema, json_file, sort_keys=True, indent=4)
print("Schema saved as {}".format(filename))
alert = self._decode_msg(_parsed_schema, msg)
except json.JSONDecodeError:
# Old way
Expand Down
9 changes: 7 additions & 2 deletions fink_client/scripts/fink_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import time

from tabulate import tabulate
from astropy.time import Time

from fink_client.consumer import AlertConsumer
from fink_client.configuration import load_credentials
Expand Down Expand Up @@ -48,6 +49,10 @@ def main():
'-schema', type=str, default=None,
help="Avro schema to decode the incoming alerts. Default is None (version taken from each alert)"
)
parser.add_argument(
'--dump_schema', action='store_true',
help="If specified, save the schema on disk (json file)"
)
args = parser.parse_args(None)

# load user configuration
Expand All @@ -66,7 +71,7 @@ def main():
schema = None
else:
schema = args.schema
consumer = AlertConsumer(conf['mytopics'], myconfig, schema_path=schema)
consumer = AlertConsumer(conf['mytopics'], myconfig, schema_path=schema, dump_schema=args.dump_schema)

if args.available_topics:
print(consumer.available_topics().keys())
Expand Down Expand Up @@ -107,7 +112,7 @@ def main():
else:
table = [
[
alert['timestamp'], utc, topic, alert['objectId'],
Time(alert['candidate']['jd'], format='jd').iso, utc, topic, alert['objectId'],
alert['cdsxmatch'],
alert['candidate']['magpsf']
],
Expand Down
Loading