Skip to content

Commit

Permalink
[Euclid] Add a new endpoint to retrieve data (#522)
Browse files Browse the repository at this point in the history
* Update README

* Update Euclid endpoint to push data into HBase

* Allow schema to change name

* Sort fields

* Dict to list

* Update header handling

* Update HBase table name for euclid

* Update function

* Update function args

* Format name

* Fix typo

* Force unique rowkey

* Modify rowkey structure

* eucliddata endpoint

* eucliddata endpoint

* eucliddata endpoint

* Schema conversion

* Schema conversion

* Schema conversion

* Schema conversion

* Typo on type

* Typo on type

* Add columns argument and use internal date for the rowkey

* fix typo

* Enable wildcard for dates

* Add a sandbox mode

* Fix typo...

* Add sandbox to Euclid pull

* New test for Euclid data
  • Loading branch information
JulienPeloton authored Nov 9, 2023
1 parent 5bfb74c commit cfa0625
Show file tree
Hide file tree
Showing 7 changed files with 3,337 additions and 75 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The backend is using [Apache HBase](https://hbase.apache.org/), a distributed no

## Backend structure

After each observation night, the data is aggregated and pushed into Apache HBase tables. The main table contains all alert data processed by Fink since 2019-11-01. This represents more than 140 million alerts collected (5 TB), and about 95 million processed (4 TB) as of 07/2022. The main table data is indexed along the `objectId` of alerts, and the emission date `jd`.
After each observation night, the data is aggregated and pushed into Apache HBase tables. The main table contains all alert data processed by Fink since 2019-11-01. This represents more than 210 million alerts collected, and about 145 million scientifically valid (7.5 TB) as of 11/2023. The main table data is indexed along the `objectId` of alerts, and the emission date `jd`.

In order to allow multi-indexing with HBase, we create _index tables_. These tables are indexed along different properties (time, sky position, classification, ...). They contain the same number of rows than the main table but fewer columns. These index tables are used to perform fast search along arbitrary properties and isolate interesting candidates, while the main table is used to display final data.

Expand Down Expand Up @@ -44,7 +44,7 @@ IP: fink-portal.org
PORT: 24000
HBASEIP: hbase-1.lal.in2p3.fr
ZOOPORT: 2183
SCHEMAVER: "schema_2.2_2.0.0"
SCHEMAVER: "schema_3.1_5.0.0"
tablename: ztf
```

Expand Down
73 changes: 72 additions & 1 deletion apps/api/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020-2022 AstroLab Software
# Copyright 2020-2023 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -37,6 +37,7 @@
from apps.api.utils import return_ssoft_pdf
from apps.api.utils import return_resolver_pdf
from apps.api.utils import upload_euclid_data
from apps.api.utils import download_euclid_data
from apps.api.utils import retrieve_metadata, post_metadata, retrieve_oid

from fink_utils.xmatch.simbad import get_simbad_labels
Expand Down Expand Up @@ -682,6 +683,39 @@ def layout(is_mobile):
'name': 'payload',
'required': True,
'description': 'Data file'
},
{
'name': 'mode',
'required': False,
'description': 'Execution mode among production[default], or sandbox. Choose sandbox if you just want to test the upload without touching the tables.'
},
]

args_eucliddata = [
{
'name': 'pipeline',
'required': True,
'description': '`SSOPipe`, `streakdet`, `DL`'
},
{
'name': 'dates',
'required': True,
'description': 'Observation dates. It can be a single date (YYYYMMDD), and range (YYYYMMDD:YYYYMMDD), or any superset (e.g. YYYY)'
},
{
'name': 'columns',
'required': False,
'description': 'Comma-separated data columns to transfer. Default is all columns. See {}/api/v1/columns for more information.'.format(APIURL)
},
{
'name': 'mode',
'required': False,
'description': 'Execution mode among production[default], or sandbox. Choose sandbox if you just want to connect to the test table.'
},
{
'name': 'output-format',
'required': False,
'description': 'Output format among json[default], csv, parquet, votable'
}
]

Expand Down Expand Up @@ -1330,6 +1364,43 @@ def query_euclidin(payload=None):

return out

@api_bp.route('/api/v1/eucliddata', methods=['GET'])
def query_eucliddata_arguments():
""" Obtain information about Euclid stored data
"""
if len(request.args) > 0:
# POST from query URL
return query_eucliddata(payload=request.args)
else:
return jsonify({'args': args_eucliddata})

@api_bp.route('/api/v1/eucliddata', methods=['POST'])
def query_eucliddata(payload=None):
""" Download Euclid data in Fink
"""
# get payload from the JSON
if payload is None:
payload = request.json

# Check all required args are here
required_args = [i['name'] for i in args_eucliddata if i['required'] is True]
for required_arg in required_args:
if required_arg not in payload:
rep = {
'status': 'error',
'text': "A value for `{}` is required. Use GET to check arguments.\n".format(required_arg)
}
return Response(str(rep), 400)

out = download_euclid_data(payload)

# Error propagation
if isinstance(out, Response):
return out

output_format = payload.get('output-format', 'json')
return send_data(out, output_format)

@api_bp.route('/api/v1/metadata', methods=['GET'])
def metadata_arguments():
""" Obtain information about uploading metadata
Expand Down
119 changes: 103 additions & 16 deletions apps/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
from apps.utils import hbase_type_converter

from apps.euclid.utils import load_euclid_header
from apps.euclid.utils import add_columns
from apps.euclid.utils import compute_rowkey
from apps.euclid.utils import check_header

from apps.plotting import legacy_normalizer, convolve, sigmoid_normalizer

Expand Down Expand Up @@ -1459,38 +1462,122 @@ def upload_euclid_data(payload: dict) -> pd.DataFrame:
data = payload['payload']
pipeline_name = payload['pipeline'].lower()

# Read data into pandas DataFrame
pdf = pd.read_csv(io.BytesIO(eval(data)), header=0, sep=' ', index_col=False)

# Add Fink defined columns
pdf = add_columns(
pdf,
pipeline_name,
payload['version'],
payload['date'],
payload['EID']
)

# Load official headers for HBase
header = load_euclid_header(pipeline_name)
euclid_header = header.keys()

pdf = pd.read_csv(io.BytesIO(eval(data)), header=0, sep=' ', index_col=False)
msg = check_header(pdf, list(euclid_header))
if msg != 'ok':
return Response(msg, 400)

# BUG: look for element-wise comparison method
if ~np.all(pdf.columns == np.array(euclid_header)):
missingfields = [field for field in euclid_header if field not in pdf.columns]
newfields = [field for field in pdf.columns if field not in euclid_header]
msg = """
WARNING: we detected a change in the schema.
Missing fields: {}
New fields: {}
""".format(missingfields, newfields)
else:
# add a column with the name of the pipeline
pdf['pipeline'] = pipeline_name
# Push data in the HBase table
mode = payload.get('mode', 'production')
if mode == 'production':
table = 'euclid.in'
elif mode == 'sandbox':
table = 'euclid.test'
client = connect_to_hbase_table(table, schema_name='schema_{}'.format(pipeline_name))

for index, row in pdf.iterrows():
# Compute the row key
rowkey = compute_rowkey(row, index)

# Compute the payload
out = ['d:{}:{}'.format(name, value) for name, value in row.items()]

msg = 'Uploaded!'
client.put(
rowkey,
out
)
client.close()

return Response(
'{} - {} - {} - {} - {}'.format(
'{} - {} - {} - {} - Uploaded!'.format(
payload['EID'],
payload['pipeline'],
payload['version'],
payload['date'],
msg
), 200
)

def download_euclid_data(payload: dict) -> pd.DataFrame:
""" Download Euclid data
Data is from /api/v1/eucliddata
Parameters
----------
payload: dict
See https://fink-portal.org/api/v1/eucliddata
Return
----------
out: pandas dataframe
"""
# Interpret user input
pipeline = payload['pipeline'].lower()

if 'columns' in payload:
cols = payload['columns'].replace(" ", "")
else:
cols = '*'

# Push data in the HBase table
mode = payload.get('mode', 'production')
if mode == 'production':
table = 'euclid.in'
elif mode == 'sandbox':
table = 'euclid.test'

client = connect_to_hbase_table(table, schema_name='schema_{}'.format(pipeline))

# TODO: put a regex instead?
if ":" in payload['dates']:
start, stop = payload['dates'].split(':')
to_evaluate = "key:key:{}_{},key:key:{}_{}".format(pipeline, start, pipeline, stop)
client.setRangeScan(True)
elif payload['dates'].replace(' ', '') == '*':
to_evaluate = "key:key:{}".format(pipeline)
else:
start = payload['dates']
to_evaluate = "key:key:{}_{}".format(pipeline, start)

results = client.scan(
"",
to_evaluate,
cols,
0, False, False
)

pdf = pd.DataFrame.from_dict(results, orient='index')

# Remove hbase specific fields
if 'key:key' in pdf.columns:
pdf = pdf.drop(columns=['key:key'])
if 'key:time' in pdf.columns:
pdf = pdf.drop(columns=['key:time'])

# Type conversion
schema = client.schema()
pdf = pdf.astype(
{i: hbase_type_converter[schema.type(i)] for i in pdf.columns})

client.close()

return pdf

def post_metadata(payload: dict) -> Response:
""" Upload metadata in Fink
"""
Expand Down
90 changes: 82 additions & 8 deletions apps/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,50 @@
import jpype
import jpype.imports

import numpy as np

import yaml


def initialise_jvm():
"""
def initialise_jvm(path=None):
""" Start a JVM
Parameters
----------
path: str, optional
Path to the HBase client. Default is relative to apps/
"""
if not jpype.isJVMStarted():
path = os.path.dirname(apps_loc) + '/../bin/FinkBrowser.exe.jar'
if path is None:
path = os.path.dirname(apps_loc) + '/../bin/FinkBrowser.exe.jar'
jarpath = "-Djava.class.path={}".format(path)
jpype.startJVM(jpype.getDefaultJVMPath(), "-ea", jarpath, convertStrings=True)

jpype.attachThreadToJVM()

def connect_to_hbase_table(tablename: str, nlimit=10000, setphysicalrepo=False):
def connect_to_hbase_table(tablename: str, schema_name=None, nlimit=10000, setphysicalrepo=False, config_path=None):
""" Return a client connected to a HBase table
Parameters
----------
tablename: str
The name of the table
schema_name: str, optional
Name of the rowkey in the table containing the schema. Default is given by the config file.
nlimit: int, optional
Maximum number of objects to return. Default is 10000
setphysicalrepo: bool
setphysicalrepo: bool, optional
If True, store cutouts queried on disk ("/tmp/Lomikel/HBaseClientBinaryDataRepository")
Needs client 02.01+. Default is False
config_path: str, optional
Path to the config file. Default is None (relative to the apps/ folder)
"""
initialise_jvm()

if config_path is None:
config_path = os.path.dirname(apps_loc) + '/../config.yml'
args = yaml.load(
open(os.path.dirname(apps_loc) + '/../config.yml'),
open(config_path),
yaml.Loader
)

Expand All @@ -55,11 +69,71 @@ def connect_to_hbase_table(tablename: str, nlimit=10000, setphysicalrepo=False):

Init.init()

client = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']);
client.connect(tablename, args['SCHEMAVER'])
client = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT'])

if schema_name is None:
schema_name = args['SCHEMAVER']
client.connect(tablename, schema_name)
if setphysicalrepo:
import com.Lomikel.HBaser.FilesBinaryDataRepository
client.setRepository(com.Lomikel.HBaser.FilesBinaryDataRepository())
client.setLimit(nlimit)

return client

def create_or_update_hbase_table(tablename: str, families: list, schema_name: str, schema: dict, create=False, config_path=None):
""" Create or update a table in HBase
By default (create=False), it will only update the schema of the table
otherwise it will create the table in HBase and push the schema. The schema
has a rowkey `schema`.
Currently accepts only a single family name
Parameters
----------
tablename: str
The name of the table
families: list
List of family names, e.g. ['d']
schema_name: str
Rowkey value for the schema
schema: dict
Dictionary with column names (keys) and column types (values)
create: bool
If true, create the table. Default is False (only update schema)
config_path: str, optional
Path to the config file. Default is None (relative to the apps/ folder)
"""
if len(np.unique(families)) != 1:
raise NotImplementedError("`create_hbase_table` only accepts one family name")

initialise_jvm()

if config_path is None:
config_path = os.path.dirname(apps_loc) + '/../config.yml'
args = yaml.load(
open(config_path),
yaml.Loader
)

import com.Lomikel.HBaser
from com.astrolabsoftware.FinkBrowser.Utils import Init

Init.init()

client = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT'])

if create:
# Create the table and connect without schema
client.create(tablename, families)
client.connect(tablename)
else:
# Connect by ignoring the current schema
client.connect(tablename, None)

# Push the schema
out = ['{}:{}:{}'.format(families[0], colname, coltype) for colname, coltype in schema.items()]
client.put(schema_name, out)

client.close()
Loading

0 comments on commit cfa0625

Please sign in to comment.