Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Euclid] Add a new endpoint to retrieve data #522

Merged
merged 29 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
05602fd
Update README
JulienPeloton Nov 7, 2023
e472d8a
Update Euclid endpoint to push data into HBase
JulienPeloton Nov 7, 2023
4b8a637
Allow schema to change name
JulienPeloton Nov 7, 2023
3d00b92
Sort fields
JulienPeloton Nov 7, 2023
0b267f6
Dict to list
JulienPeloton Nov 7, 2023
462ef56
Update header handling
JulienPeloton Nov 7, 2023
b1d4e55
Update HBase table name for euclid
JulienPeloton Nov 7, 2023
3012e02
Update function
JulienPeloton Nov 7, 2023
1c0b00f
Update function args
JulienPeloton Nov 7, 2023
6f79fe3
Format name
JulienPeloton Nov 7, 2023
781b0f7
Fix typo
JulienPeloton Nov 7, 2023
4119f2e
Force unique rowkey
JulienPeloton Nov 8, 2023
be7a54a
Modify rowkey structure
JulienPeloton Nov 8, 2023
e787058
eucliddata endpoint
JulienPeloton Nov 9, 2023
dcd0144
eucliddata endpoint
JulienPeloton Nov 9, 2023
887b5e0
eucliddata endpoint
JulienPeloton Nov 9, 2023
9cff887
Schema conversion
JulienPeloton Nov 9, 2023
abd908e
Schema conversion
JulienPeloton Nov 9, 2023
edbb19a
Schema conversion
JulienPeloton Nov 9, 2023
3c445d2
Schema conversion
JulienPeloton Nov 9, 2023
dcf1bd9
Typo on type
JulienPeloton Nov 9, 2023
240fa23
Typo on type
JulienPeloton Nov 9, 2023
5296a6e
Add columns argument and use internal date for the rowkey
JulienPeloton Nov 9, 2023
b38176f
fix typo
JulienPeloton Nov 9, 2023
abcc211
Enable wildcard for dates
JulienPeloton Nov 9, 2023
39336a0
Add a sandbox mode
JulienPeloton Nov 9, 2023
095508f
Fix typo...
JulienPeloton Nov 9, 2023
3a08359
Add sandbox to Euclid pull
JulienPeloton Nov 9, 2023
556b492
New test for Euclid data
JulienPeloton Nov 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The backend is using [Apache HBase](https://hbase.apache.org/), a distributed no

## Backend structure

After each observation night, the data is aggregated and pushed into Apache HBase tables. The main table contains all alert data processed by Fink since 2019-11-01. This represents more than 140 million alerts collected (5 TB), and about 95 million processed (4 TB) as of 07/2022. The main table data is indexed along the `objectId` of alerts, and the emission date `jd`.
After each observation night, the data is aggregated and pushed into Apache HBase tables. The main table contains all alert data processed by Fink since 2019-11-01. This represents more than 210 million alerts collected, and about 145 million scientifically valid (7.5 TB) as of 11/2023. The main table data is indexed along the `objectId` of alerts, and the emission date `jd`.

In order to allow multi-indexing with HBase, we create _index tables_. These tables are indexed along different properties (time, sky position, classification, ...). They contain the same number of rows than the main table but fewer columns. These index tables are used to perform fast search along arbitrary properties and isolate interesting candidates, while the main table is used to display final data.

Expand Down Expand Up @@ -44,7 +44,7 @@ IP: fink-portal.org
PORT: 24000
HBASEIP: hbase-1.lal.in2p3.fr
ZOOPORT: 2183
SCHEMAVER: "schema_2.2_2.0.0"
SCHEMAVER: "schema_3.1_5.0.0"
tablename: ztf
```

Expand Down
73 changes: 72 additions & 1 deletion apps/api/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020-2022 AstroLab Software
# Copyright 2020-2023 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -37,6 +37,7 @@
from apps.api.utils import return_ssoft_pdf
from apps.api.utils import return_resolver_pdf
from apps.api.utils import upload_euclid_data
from apps.api.utils import download_euclid_data
from apps.api.utils import retrieve_metadata, post_metadata, retrieve_oid

from fink_utils.xmatch.simbad import get_simbad_labels
Expand Down Expand Up @@ -682,6 +683,39 @@ def layout(is_mobile):
'name': 'payload',
'required': True,
'description': 'Data file'
},
{
'name': 'mode',
'required': False,
'description': 'Execution mode among production[default], or sandbox. Choose sandbox if you just want to test the upload without touching the tables.'
},
]

args_eucliddata = [
{
'name': 'pipeline',
'required': True,
'description': '`SSOPipe`, `streakdet`, `DL`'
},
{
'name': 'dates',
'required': True,
'description': 'Observation dates. It can be a single date (YYYYMMDD), and range (YYYYMMDD:YYYYMMDD), or any superset (e.g. YYYY)'
},
{
'name': 'columns',
'required': False,
'description': 'Comma-separated data columns to transfer. Default is all columns. See {}/api/v1/columns for more information.'.format(APIURL)
},
{
'name': 'mode',
'required': False,
'description': 'Execution mode among production[default], or sandbox. Choose sandbox if you just want to connect to the test table.'
},
{
'name': 'output-format',
'required': False,
'description': 'Output format among json[default], csv, parquet, votable'
}
]

Expand Down Expand Up @@ -1330,6 +1364,43 @@ def query_euclidin(payload=None):

return out

@api_bp.route('/api/v1/eucliddata', methods=['GET'])
def query_eucliddata_arguments():
""" Obtain information about Euclid stored data
"""
if len(request.args) > 0:
# POST from query URL
return query_eucliddata(payload=request.args)
else:
return jsonify({'args': args_eucliddata})

@api_bp.route('/api/v1/eucliddata', methods=['POST'])
def query_eucliddata(payload=None):
""" Download Euclid data in Fink
"""
# get payload from the JSON
if payload is None:
payload = request.json

# Check all required args are here
required_args = [i['name'] for i in args_eucliddata if i['required'] is True]
for required_arg in required_args:
if required_arg not in payload:
rep = {
'status': 'error',
'text': "A value for `{}` is required. Use GET to check arguments.\n".format(required_arg)
}
return Response(str(rep), 400)

out = download_euclid_data(payload)

# Error propagation
if isinstance(out, Response):
return out

output_format = payload.get('output-format', 'json')
return send_data(out, output_format)

@api_bp.route('/api/v1/metadata', methods=['GET'])
def metadata_arguments():
""" Obtain information about uploading metadata
Expand Down
119 changes: 103 additions & 16 deletions apps/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
from apps.utils import hbase_type_converter

from apps.euclid.utils import load_euclid_header
from apps.euclid.utils import add_columns
from apps.euclid.utils import compute_rowkey
from apps.euclid.utils import check_header

from apps.plotting import legacy_normalizer, convolve, sigmoid_normalizer

Expand Down Expand Up @@ -1459,38 +1462,122 @@ def upload_euclid_data(payload: dict) -> pd.DataFrame:
data = payload['payload']
pipeline_name = payload['pipeline'].lower()

# Read data into pandas DataFrame
pdf = pd.read_csv(io.BytesIO(eval(data)), header=0, sep=' ', index_col=False)

# Add Fink defined columns
pdf = add_columns(
pdf,
pipeline_name,
payload['version'],
payload['date'],
payload['EID']
)

# Load official headers for HBase
header = load_euclid_header(pipeline_name)
euclid_header = header.keys()

pdf = pd.read_csv(io.BytesIO(eval(data)), header=0, sep=' ', index_col=False)
msg = check_header(pdf, list(euclid_header))
if msg != 'ok':
return Response(msg, 400)

# BUG: look for element-wise comparison method
if ~np.all(pdf.columns == np.array(euclid_header)):
missingfields = [field for field in euclid_header if field not in pdf.columns]
newfields = [field for field in pdf.columns if field not in euclid_header]
msg = """
WARNING: we detected a change in the schema.
Missing fields: {}
New fields: {}
""".format(missingfields, newfields)
else:
# add a column with the name of the pipeline
pdf['pipeline'] = pipeline_name
# Push data in the HBase table
mode = payload.get('mode', 'production')
if mode == 'production':
table = 'euclid.in'
elif mode == 'sandbox':
table = 'euclid.test'
client = connect_to_hbase_table(table, schema_name='schema_{}'.format(pipeline_name))

for index, row in pdf.iterrows():
# Compute the row key
rowkey = compute_rowkey(row, index)

# Compute the payload
out = ['d:{}:{}'.format(name, value) for name, value in row.items()]

msg = 'Uploaded!'
client.put(
rowkey,
out
)
client.close()

return Response(
'{} - {} - {} - {} - {}'.format(
'{} - {} - {} - {} - Uploaded!'.format(
payload['EID'],
payload['pipeline'],
payload['version'],
payload['date'],
msg
), 200
)

def download_euclid_data(payload: dict) -> pd.DataFrame:
""" Download Euclid data

Data is from /api/v1/eucliddata

Parameters
----------
payload: dict
See https://fink-portal.org/api/v1/eucliddata

Return
----------
out: pandas dataframe
"""
# Interpret user input
pipeline = payload['pipeline'].lower()

if 'columns' in payload:
cols = payload['columns'].replace(" ", "")
else:
cols = '*'

# Push data in the HBase table
mode = payload.get('mode', 'production')
if mode == 'production':
table = 'euclid.in'
elif mode == 'sandbox':
table = 'euclid.test'

client = connect_to_hbase_table(table, schema_name='schema_{}'.format(pipeline))

# TODO: put a regex instead?
if ":" in payload['dates']:
start, stop = payload['dates'].split(':')
to_evaluate = "key:key:{}_{},key:key:{}_{}".format(pipeline, start, pipeline, stop)
client.setRangeScan(True)
elif payload['dates'].replace(' ', '') == '*':
to_evaluate = "key:key:{}".format(pipeline)
else:
start = payload['dates']
to_evaluate = "key:key:{}_{}".format(pipeline, start)

results = client.scan(
"",
to_evaluate,
cols,
0, False, False
)

pdf = pd.DataFrame.from_dict(results, orient='index')

# Remove hbase specific fields
if 'key:key' in pdf.columns:
pdf = pdf.drop(columns=['key:key'])
if 'key:time' in pdf.columns:
pdf = pdf.drop(columns=['key:time'])

# Type conversion
schema = client.schema()
pdf = pdf.astype(
{i: hbase_type_converter[schema.type(i)] for i in pdf.columns})

client.close()

return pdf

def post_metadata(payload: dict) -> Response:
""" Upload metadata in Fink
"""
Expand Down
90 changes: 82 additions & 8 deletions apps/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,50 @@
import jpype
import jpype.imports

import numpy as np

import yaml


def initialise_jvm():
"""
def initialise_jvm(path=None):
""" Start a JVM

Parameters
----------
path: str, optional
Path to the HBase client. Default is relative to apps/
"""
if not jpype.isJVMStarted():
path = os.path.dirname(apps_loc) + '/../bin/FinkBrowser.exe.jar'
if path is None:
path = os.path.dirname(apps_loc) + '/../bin/FinkBrowser.exe.jar'
jarpath = "-Djava.class.path={}".format(path)
jpype.startJVM(jpype.getDefaultJVMPath(), "-ea", jarpath, convertStrings=True)

jpype.attachThreadToJVM()

def connect_to_hbase_table(tablename: str, nlimit=10000, setphysicalrepo=False):
def connect_to_hbase_table(tablename: str, schema_name=None, nlimit=10000, setphysicalrepo=False, config_path=None):
""" Return a client connected to a HBase table

Parameters
----------
tablename: str
The name of the table
schema_name: str, optional
Name of the rowkey in the table containing the schema. Default is given by the config file.
nlimit: int, optional
Maximum number of objects to return. Default is 10000
setphysicalrepo: bool
setphysicalrepo: bool, optional
If True, store cutouts queried on disk ("/tmp/Lomikel/HBaseClientBinaryDataRepository")
Needs client 02.01+. Default is False
config_path: str, optional
Path to the config file. Default is None (relative to the apps/ folder)
"""
initialise_jvm()

if config_path is None:
config_path = os.path.dirname(apps_loc) + '/../config.yml'
args = yaml.load(
open(os.path.dirname(apps_loc) + '/../config.yml'),
open(config_path),
yaml.Loader
)

Expand All @@ -55,11 +69,71 @@ def connect_to_hbase_table(tablename: str, nlimit=10000, setphysicalrepo=False):

Init.init()

client = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']);
client.connect(tablename, args['SCHEMAVER'])
client = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT'])

if schema_name is None:
schema_name = args['SCHEMAVER']
client.connect(tablename, schema_name)
if setphysicalrepo:
import com.Lomikel.HBaser.FilesBinaryDataRepository
client.setRepository(com.Lomikel.HBaser.FilesBinaryDataRepository())
client.setLimit(nlimit)

return client

def create_or_update_hbase_table(tablename: str, families: list, schema_name: str, schema: dict, create=False, config_path=None):
""" Create or update a table in HBase

By default (create=False), it will only update the schema of the table
otherwise it will create the table in HBase and push the schema. The schema
has a rowkey `schema`.

Currently accepts only a single family name

Parameters
----------
tablename: str
The name of the table
families: list
List of family names, e.g. ['d']
schema_name: str
Rowkey value for the schema
schema: dict
Dictionary with column names (keys) and column types (values)
create: bool
If true, create the table. Default is False (only update schema)
config_path: str, optional
Path to the config file. Default is None (relative to the apps/ folder)
"""
if len(np.unique(families)) != 1:
raise NotImplementedError("`create_hbase_table` only accepts one family name")

initialise_jvm()

if config_path is None:
config_path = os.path.dirname(apps_loc) + '/../config.yml'
args = yaml.load(
open(config_path),
yaml.Loader
)

import com.Lomikel.HBaser
from com.astrolabsoftware.FinkBrowser.Utils import Init

Init.init()

client = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT'])

if create:
# Create the table and connect without schema
client.create(tablename, families)
client.connect(tablename)
else:
# Connect by ignoring the current schema
client.connect(tablename, None)

# Push the schema
out = ['{}:{}:{}'.format(families[0], colname, coltype) for colname, coltype in schema.items()]
client.put(schema_name, out)

client.close()
Loading
Loading