Skip to content

Commit

Permalink
Data Transfer utilities (#152)
Browse files Browse the repository at this point in the history
* Update doc

* Add py3.10

* Update requirements

* stringify python version

* PEP8

* Fix import

* Update doc

* Fix typo in the script name

* Remove py37 deps

* Bump to 3.0

* README

* README
  • Loading branch information
JulienPeloton authored Jan 17, 2023
1 parent c11ad08 commit 93c31f0
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.7, 3.9]
python-version: ["3.9", "3.10"]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/run_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.7, 3.9]
python-version: ["3.9", "3.10"]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
41 changes: 35 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@

# Fink client

`fink-client` is a light package to manipulate catalogs and alerts issued from the [fink broker](https://github.com/astrolabsoftware/fink-broker) programmatically. Learn how to connect and use it by checking the [documentation](docs/).
`fink-client` is a light package to manipulate catalogs and alerts issued from the [fink broker](https://github.com/astrolabsoftware/fink-broker) programmatically. It is used in the context of 2 major Fink services: Livestream and Data Transfer.

## Installation

`fink_client` requires a version of Python 3.7+. To install it, just run
`fink_client` requires a version of Python 3.9+. To install it, just run

```bash
pip install fink-client
pip install fink-client --upgrade
```

Learn how to connect and use it by checking the [documentation](docs/).

## Registration

In order to connect and poll alerts from Fink, you need to get your credentials:
Expand All @@ -27,7 +29,7 @@ In order to connect and poll alerts from Fink, you need to get your credentials:
fink_client_register -username <USERNAME> -group_id <GROUP_ID> ...
```

## Usage
## Livestream usage

Once you have your credentials, you are ready to poll streams!

Expand All @@ -36,7 +38,7 @@ fink_consumer -h
usage: fink_consumer [-h] [--display] [-limit LIMIT] [--available_topics]
[--save] [-outdir OUTDIR] [-schema SCHEMA]

Kafka consumer to listen and archive Fink streams
Kafka consumer to listen and archive Fink streams from the Livestream service

optional arguments:
-h, --help show this help message and exit
Expand Down Expand Up @@ -67,5 +69,32 @@ optional arguments:
-filename FILENAME Path to an alert data file (avro format)
```
Learn how to use fink-client by following the dedicated [tutorial](https://github.com/astrolabsoftware/fink-client-tutorial). It should not take long to learn it!
More information at [docs/livestream](docs/livestream_manual.md).
## Data Transfer usage
If you requested data using the [Data Transfer service](https://fink-portal.org/download), you can easily poll your stream using:
```bash
fink_datatransfer -h
usage: fink_datatransfer [-h] [-topic TOPIC] [-limit LIMIT] [-outdir OUTDIR] [-partitionby PARTITIONBY] [-batchsize BATCHSIZE] [--restart_from_beginning]
[--verbose]

Kafka consumer to listen and archive Fink streams from the data transfer service

optional arguments:
-h, --help show this help message and exit
-topic TOPIC Topic name for the stream that contains the data.
-limit LIMIT If specified, download only `limit` alerts from the stream. Default is None, that is download all alerts.
-outdir OUTDIR Folder to store incoming alerts. It will be created if it does not exist.
-partitionby PARTITIONBY
Partition data by `time` (year=YYYY/month=MM/day=DD), or `finkclass` (finkclass=CLASS), or `tnsclass` (tnsclass=CLASS). Default is
time.
-batchsize BATCHSIZE Maximum number of alert within the `maxtimeout` (see conf). Default is 1000 alerts.
--restart_from_beginning
If specified, restart downloading from the 1st alert in the stream. Default is False.
--verbose If specified, print on screen information about the consuming.

```
More information at [docs/datatransfer](docs/datatransfer.md).
106 changes: 106 additions & 0 deletions docs/datatransfer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Fink data transfer manual

_date 17/01/2023_

This manual has been tested for `fink-client` version 3.0. In case of trouble, send us an email ([email protected]) or open an issue (https://github.com/astrolabsoftware/fink-client).

## Installation of fink-client

From a terminal, you can install fink-client simply using `pip`:

```
pip install fink-client --upgrade
```

This should install all necessary dependencies.

## Registering

You first need to register your credentials. Upon installation, run on a terminal:

```bash
# access help using `fink_client_register -h`
fink_client_register \
-username <USERNAME> \ # given privately
-group_id <GROUP_ID> \ # given privately
-mytopics <topic1 topic2 etc> \ # see https://fink-broker.readthedocs.io/en/latest/topics/
-servers <SERVER> \ # given privately, comma separated if several
-maxtimeout 5 \ # in seconds
--verbose
```

where `<USERNAME>`, `<GROUP_ID>`, and `<SERVER>` have been sent to you privately when filling this [form](https://forms.gle/2td4jysT4e9pkf889). By default, the credentials are installed in the home:

```bash
cat ~/.finkclient/credentials.yml
```

For the list of available topics, see [https://fink-broker.readthedocs.io/en/latest/topics/](https://fink-broker.readthedocs.io/en/latest/topics/).

## First steps: testing the connection

Processed alerts are stored 1 week on our servers, which means if you forget to poll data, you'll be able to retrieve it up to one week after emission. Before you get all of them, let's retrieve only a few alerts to check the connection. On a terminal, run the following

```bash
# access help using `fink_datatransfer -h`
fink_datatransfer \
-topic <topic name> \
-outdir myalertfolder \
-partitionby finkclass \
--verbose \
-limit 20
```

This will download and store the first 20 available alerts, and partition the data according to their `finkclass`:

```bash
myalertfolder/
├── Early SN Ia candidate
├── SN candidate
├── Solar System candidate
└── Unknown
...
```

Note that the alert schema is automatically downloaded from the Kafka servers.

## Downloading, resuming and starting from scratch

Once you are ready, you can poll all alerts in the topic:

```bash
# access help using `fink_datatransfer -h`
fink_datatransfer \
-topic <topic name> \
-outdir myalertfolder \
-partitionby finkclass \
--verbose
```

You can stop the poll by hitting `CTRL+C` on your keyboard, and resume later. The poll will restart from the last offset, namely you will not have duplicate. In case you want to start polling data from the beginning of the stream, you can use the `--restart_from_beginning` option:

```bash
# Make sure `myalertfolder` is empty or does not
# exist to avoid duplicates.
fink_datatransfer \
-topic <topic name> \
-outdir myalertfolder \
-partitionby finkclass \
--verbose \
--restart_from_beginning
```

## Reading alerts

Alerts are saved in the Apache Parquet format. Assuming you are using Python, you can easily read them using Pandas:

```python
import pandas as pd

# you can pass the folder name
pdf = pd.read_parquet('myalertfolder')
```

## Troubleshooting

In case of trouble, send us an email ([email protected]) or open an issue (https://github.com/astrolabsoftware/fink-client).
2 changes: 1 addition & 1 deletion docs/livestream_manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ For the list of available topics, see [https://fink-broker.readthedocs.io/en/lat
Processed alerts are stored 1 week on our servers, which means if you forget to poll data, you'll be able to retrieve it up to one week after emission. This also means on your first connection, you will have one week of alert to retrieve. Before you get all of them, let's retrieve the first available one to check the connection. On a terminal, run the following

```bash
# access help using `fink_consumer-h`
# access help using `fink_consumer -h`
fink_consumer --display -limit 1
```

Expand Down
2 changes: 1 addition & 1 deletion fink_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "2.12"
__version__ = "3.0"
__schema_version__ = "distribution_schema_fink_ztf_{}.avsc"
2 changes: 1 addition & 1 deletion fink_client/scripts/fink_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Kafka consumer to listen and archive Fink streams """
""" Kafka consumer to listen and archive Fink streams from the Livestream service """
import sys

import argparse
Expand Down
161 changes: 161 additions & 0 deletions fink_client/scripts/fink_datatransfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#!/usr/bin/env python
# Copyright 2023 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Kafka consumer to listen and archive Fink streams from the data transfer service """
import sys
import os
import io
import json
import argparse

import pyarrow as pa
import pyarrow.dataset as ds
import fastavro
import confluent_kafka

import numpy as np
import pandas as pd

from fink_client.configuration import load_credentials

def main():
""" """
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'-topic', type=str, default='.',
help="Topic name for the stream that contains the data.")
parser.add_argument(
'-limit', type=int, default=None,
help="If specified, download only `limit` alerts from the stream. Default is None, that is download all alerts.")
parser.add_argument(
'-outdir', type=str, default='.',
help="Folder to store incoming alerts. It will be created if it does not exist.")
parser.add_argument(
'-partitionby', type=str, default='time',
help="Partition data by `time` (year=YYYY/month=MM/day=DD), or `finkclass` (finkclass=CLASS), or `tnsclass` (tnsclass=CLASS). Default is time.")
parser.add_argument(
'-batchsize', type=int, default=1000,
help="Maximum number of alert within the `maxtimeout` (see conf). Default is 1000 alerts.")
parser.add_argument(
'--restart_from_beginning', action='store_true',
help="If specified, restart downloading from the 1st alert in the stream. Default is False.")
parser.add_argument(
'--verbose', action='store_true',
help="If specified, print on screen information about the consuming.")
args = parser.parse_args(None)

if args.partitionby not in ['time', 'finkclass', 'tnsclass']:
print("{} is an unknown partitioning. `-partitionby` should be in ['time', 'finkclass', 'tnsclass']".format(args.partitionby))
sys.exit()

# load user configuration
conf = load_credentials()

if args.restart_from_beginning:
# resetting the group ID acts as a new consumer
group_id = conf['group_id'] + '_{}'.format(np.random.randint(1e6))
else:
group_id = conf['group_id']

kafka_config = {
'bootstrap.servers': conf['servers'],
'group.id': group_id,
"auto.offset.reset": "earliest"
}

if conf['password'] is not None:
kafka_config['password'] = conf['password']

if not os.path.isdir(args.outdir):
os.makedirs(args.outdir, exist_ok=True)

# Time to wait before polling again if no alerts
maxtimeout = conf['maxtimeout']

if args.limit < args.batchsize:
args.batchsize = args.limit

# Instantiate a consumer
consumer = confluent_kafka.Consumer(kafka_config)

# Subscribe to schema topic
topics = ['{}_schema'.format(args.topic)]
consumer.subscribe(topics)

# Poll
msg = consumer.poll(maxtimeout)
schema = fastavro.schema.parse_schema(json.loads(msg.key()))

# Subscribe to schema topic
topics = ['{}'.format(args.topic)]
consumer.subscribe(topics)

# infinite loop
maxpoll = args.limit if args.limit is not None else 1e10
try:
poll_number = 0
while poll_number < maxpoll:
msgs = consumer.consume(args.batchsize, maxtimeout)

# Decode the message
if msgs is not None:
pdf = pd.DataFrame.from_records(
[fastavro.schemaless_reader(io.BytesIO(msg.value()), schema) for msg in msgs],
)
if pdf.empty:
print('No alerts the last {} seconds ({} polled)... Exiting'.format(maxtimeout, poll_number))
break

if 'tracklet' in pdf.columns:
pdf['tracklet'] = pdf['tracklet'].astype('str')

# if 'jd' in pdf.columns:
# # create columns year, month, day

table = pa.Table.from_pandas(pdf)

if poll_number == 0:
table_schema = table.schema

if args.partitionby == 'time':
partitioning = ['year', 'month', 'day']
elif args.partitionby == 'finkclass':
partitioning = ['finkclass']
elif args.partitionby == 'tnsclass':
partitioning = ['tnsclass']

ds.write_dataset(
table,
args.outdir,
schema=table_schema,
basename_template='part-{{i}}-{}.parquet'.format(poll_number),
format="parquet",
partitioning=partitioning,
existing_data_behavior='overwrite_or_ignore'
)

poll_number += len(msgs)
if args.verbose:
print('Number of alerts polled: {}'.format(poll_number))
else:
print('No alerts the last {} seconds ({} polled)'.format(maxtimeout, poll_number))
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
finally:
consumer.close()


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ confluent-kafka==1.9.2
fastavro==1.6.0
astropy
numpy
pyarrow
pandas
pyyaml
tabulate
Expand Down
Loading

0 comments on commit 93c31f0

Please sign in to comment.