diff --git a/app.py b/app.py index 4367d2bf..775a59bb 100644 --- a/app.py +++ b/app.py @@ -1,10 +1,22 @@ +# Copyright 2021-2023 AstroLab Software +# Author: Julien Peloton +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import dash import dash_bootstrap_components as dbc from dash.long_callback import DiskcacheLongCallbackManager -import jpype -import jpype.imports -from jpype import JImplements, JOverride, JImplementationFor +# import jpype import yaml import diskcache @@ -47,74 +59,3 @@ server = app.server app.config.suppress_callback_exceptions = True - -if not jpype.isJVMStarted(): - jarpath = "-Djava.class.path=bin/FinkBrowser.exe.jar" - jpype.startJVM(jpype.getDefaultJVMPath(), "-ea", jarpath, convertStrings=True) - -jpype.attachThreadToJVM() - -import com.Lomikel.HBaser -from com.astrolabsoftware.FinkBrowser.Utils import Init - -Init.init() - -client = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -client.connect(args['tablename'], args['SCHEMAVER']) -client.setLimit(nlimit) - -clientT = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientT.connect(args['tablename'] + ".jd", args['SCHEMAVER']) -clientT.setLimit(nlimit) - -clientP128 = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientP128.connect(args['tablename'] + ".pixel128", args['SCHEMAVER']) -clientP128.setLimit(nlimit) - -clientS = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientS.connect(args['tablename'] + ".class", args['SCHEMAVER']) -clientS.setLimit(nlimit) - -clientU = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientU.connect(args['tablename'] + ".upper", args['SCHEMAVER']) -clientU.setLimit(nlimit) - -clientUV = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientUV.connect(args['tablename'] + ".uppervalid", args['SCHEMAVER']) -clientUV.setLimit(nlimit) - -clientSSO = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientSSO.connect(args['tablename'] + ".ssnamenr", args['SCHEMAVER']) -clientSSO.setLimit(nlimit) - -clientTRCK = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientTRCK.connect(args['tablename'] + ".tracklet", args['SCHEMAVER']) -clientTRCK.setLimit(nlimit) - -clientTNS = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientTNS.connect(args['tablename'] + ".tns", args['SCHEMAVER']) -clientTNS.setLimit(nlimit) - -clientStats = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientStats.connect('statistics_class', args['SCHEMAVER']) -clientStats.setLimit(nlimit) - -clientSSOCAND = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientSSOCAND.connect(args['tablename'] + ".sso_cand", args['SCHEMAVER']) -clientSSOCAND.setLimit(nlimit) - -clientSSOORB = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientSSOORB.connect(args['tablename'] + ".orb_cand", args['SCHEMAVER']) -clientSSOORB.setLimit(nlimit) - -clientANOMALY = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientANOMALY.connect(args['tablename'] + ".anomaly", args['SCHEMAVER']) -clientANOMALY.setLimit(nlimit) - -clientTNSRESOL = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientTNSRESOL.connect(args['tablename'] + ".tns_resolver", args['SCHEMAVER']) -clientTNSRESOL.setLimit(nlimit) - -clientMeta = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); -clientMeta.connect(args['tablename'] + ".metadata", 'schema') -clientMeta.setLimit(nlimit) diff --git a/apps/api/utils.py b/apps/api/utils.py index 272f3e1e..cd79c7fa 100644 --- a/apps/api/utils.py +++ b/apps/api/utils.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import io -import java import gzip import yaml import requests @@ -32,23 +31,17 @@ from astropy.io import fits, votable from astropy.table import Table -from app import client -from app import clientU, clientUV -from app import clientP128 -from app import clientT, clientTNS, clientS, clientSSO, clientTRCK -from app import clientSSOCAND, clientSSOORB -from app import clientStats -from app import clientANOMALY -from app import clientTNSRESOL -from app import clientMeta -from app import nlimit from app import APIURL +from apps.client import connect_to_hbase_table + from apps.utils import get_miriade_data from apps.utils import format_hbase_output from apps.utils import extract_cutouts from apps.utils import hbase_type_converter +from apps.euclid.utils import load_euclid_header + from apps.plotting import legacy_normalizer, convolve, sigmoid_normalizer from flask import Response @@ -96,8 +89,10 @@ def return_object_pdf(payload: dict) -> pd.DataFrame: else: truncated = True + client = connect_to_hbase_table('ztf') + # Get data from the main table - results = java.util.TreeMap() + results = {} for to_evaluate in objectids: result = client.scan( "", @@ -105,13 +100,10 @@ def return_object_pdf(payload: dict) -> pd.DataFrame: cols, 0, True, True ) - results.putAll(result) + results.update(result) schema_client = client.schema() - # reset the limit in case it has been changed above - client.setLimit(nlimit) - pdf = format_hbase_output( results, schema_client, group_alerts=False, truncated=truncated ) @@ -120,8 +112,9 @@ def return_object_pdf(payload: dict) -> pd.DataFrame: pdf = extract_cutouts(pdf, client) if withupperlim: + clientU = connect_to_hbase_table('ztf.upper') # upper limits - resultsU = java.util.TreeMap() + resultsU = {} for to_evaluate in objectids: resultU = clientU.scan( "", @@ -129,10 +122,11 @@ def return_object_pdf(payload: dict) -> pd.DataFrame: "*", 0, False, False ) - resultsU.putAll(resultU) + resultsU.update(resultU) # bad quality - resultsUP = java.util.TreeMap() + clientUV = connect_to_hbase_table('ztf.uppervalid') + resultsUP = {} for to_evaluate in objectids: resultUP = clientUV.scan( "", @@ -140,7 +134,7 @@ def return_object_pdf(payload: dict) -> pd.DataFrame: "*", 0, False, False ) - resultsUP.putAll(resultUP) + resultsUP.update(resultUP) pdfU = pd.DataFrame.from_dict(resultsU, orient='index') pdfUP = pd.DataFrame.from_dict(resultsUP, orient='index') @@ -163,6 +157,11 @@ def return_object_pdf(payload: dict) -> pd.DataFrame: else: pdf = pdf_ + clientU.close() + clientUV.close() + + client.close() + return pdf def return_explorer_pdf(payload: dict, user_group: int) -> pd.DataFrame: @@ -181,7 +180,8 @@ def return_explorer_pdf(payload: dict, user_group: int) -> pd.DataFrame: """ truncated = False if user_group == 0: - results = java.util.TreeMap() + client = connect_to_hbase_table('ztf') + results = {} for oid in payload['objectId'].split(','): # objectId search to_evaluate = "key:key:{}".format(oid.strip()) @@ -191,13 +191,11 @@ def return_explorer_pdf(payload: dict, user_group: int) -> pd.DataFrame: "*", 0, True, True ) - results.putAll(result) - - # reset the limit in case it has been changed above - client.setLimit(nlimit) + results.update(result) schema_client = client.schema() if user_group == 1: + client = connect_to_hbase_table('ztf.pixel128') # Interpret user input ra, dec = payload['ra'], payload['dec'] radius = payload['radius'] @@ -249,7 +247,7 @@ def return_explorer_pdf(payload: dict, user_group: int) -> pd.DataFrame: inclusive=True ) - # For the future: we could set clientP_.setRangeScan(True) + # For the future: we could set client.setRangeScan(True) # and pass directly the time boundaries here instead of # grouping by later. @@ -263,33 +261,35 @@ def return_explorer_pdf(payload: dict, user_group: int) -> pd.DataFrame: jdstart = Time(startdate, format='mjd').jd jdend = jdstart + window_days - clientP128.setRangeScan(True) - results = java.util.TreeMap() + client.setRangeScan(True) + results = {} for pix in pixs: to_search = "key:key:{}_{},key:key:{}_{}".format(pix, jdstart, pix, jdend) - result = clientP128.scan( + result = client.scan( "", to_search, "*", 0, True, True ) - results.putAll(result) - clientP128.setRangeScan(False) + results.update(result) + client.setRangeScan(False) else: - results = java.util.TreeMap() + client = connect_to_hbase_table('ztf.pixel128') + results = {} for pix in pixs: to_search = "key:key:{}".format(pix) - result = clientP128.scan( + result = client.scan( "", to_search, "*", 0, True, True ) - results.putAll(result) + results.update(result) - schema_client = clientP128.schema() + schema_client = client.schema() truncated = True elif user_group == 2: + client = connect_to_hbase_table('ztf.jd') if int(payload['window']) > 180: rep = { 'status': 'error', @@ -301,18 +301,17 @@ def return_explorer_pdf(payload: dict, user_group: int) -> pd.DataFrame: jd_end = jd_start + TimeDelta(int(payload['window']) * 60, format='sec').jd # Send the request. RangeScan. - clientT.setRangeScan(True) + client.setRangeScan(True) to_evaluate = "key:key:{},key:key:{}".format(jd_start, jd_end) - results = clientT.scan( + results = client.scan( "", to_evaluate, "*", 0, True, True ) - schema_client = clientT.schema() + schema_client = client.schema() - # reset the limit in case it has been changed above - client.setLimit(nlimit) + client.close() pdfs = format_hbase_output( results, @@ -391,12 +390,13 @@ def return_latests_pdf(payload: dict, return_raw: bool = False) -> pd.DataFrame: tns_classes = pd.read_csv('assets/tns_types.csv', header=None)[0].values is_tns = payload['class'].startswith('(TNS)') and (payload['class'].split('(TNS) ')[1] in tns_classes) if is_tns: + client = connect_to_hbase_table('ztf.tns') classname = payload['class'].split('(TNS) ')[1] - clientTNS.setLimit(nalerts) - clientTNS.setRangeScan(True) - clientTNS.setReversed(True) + client.setLimit(nalerts) + client.setRangeScan(True) + client.setReversed(True) - results = clientTNS.scan( + results = client.scan( "", "key:key:{}_{},key:key:{}_{}".format( classname, @@ -406,20 +406,21 @@ def return_latests_pdf(payload: dict, return_raw: bool = False) -> pd.DataFrame: ), cols, 0, True, True ) - schema_client = clientTNS.schema() + schema_client = client.schema() group_alerts = True - clientTNS.setLimit(nlimit) elif payload['class'].startswith('(SIMBAD)') or payload['class'] != 'allclasses': if payload['class'].startswith('(SIMBAD)'): classname = payload['class'].split('(SIMBAD) ')[1] else: classname = payload['class'] - clientS.setLimit(nalerts) - clientS.setRangeScan(True) - clientS.setReversed(True) + client = connect_to_hbase_table('ztf.class') + + client.setLimit(nalerts) + client.setRangeScan(True) + client.setReversed(True) - results = clientS.scan( + results = client.scan( "", "key:key:{}_{},key:key:{}_{}".format( classname, @@ -429,26 +430,25 @@ def return_latests_pdf(payload: dict, return_raw: bool = False) -> pd.DataFrame: ), cols, 0, False, False ) - schema_client = clientS.schema() + schema_client = client.schema() group_alerts = False - clientS.setLimit(nlimit) elif payload['class'] == 'allclasses': - clientT.setLimit(nalerts) - clientT.setRangeScan(True) - clientT.setReversed(True) + client = connect_to_hbase_table('ztf.jd') + client.setLimit(nalerts) + client.setRangeScan(True) + client.setReversed(True) to_evaluate = "key:key:{},key:key:{}".format(jd_start, jd_stop) - results = clientT.scan( + results = client.scan( "", to_evaluate, cols, 0, True, True ) - schema_client = clientT.schema() + schema_client = client.schema() group_alerts = False - # Restore default limits - clientT.setLimit(nlimit) + client.close() if return_raw: return results @@ -501,20 +501,21 @@ def return_sso_pdf(payload: dict) -> pd.DataFrame: names = ["key:key:{}_".format(payload['n_or_d'].replace(' ', ''))] # Get data from the main table - results = java.util.TreeMap() + client = connect_to_hbase_table('ztf.ssnamenr') + results = {} for to_evaluate in names: - result = clientSSO.scan( + result = client.scan( "", to_evaluate, cols, 0, True, True ) - results.putAll(result) + results.update(result) - schema_client = clientSSO.schema() + schema_client = client.schema() # reset the limit in case it has been changed above - clientSSO.setLimit(nlimit) + client.close() pdf = format_hbase_output( results, @@ -554,14 +555,14 @@ def return_ssocand_pdf(payload: dict) -> pd.DataFrame: payload_name = payload['kind'] if payload_name == 'orbParams': - gen_client = clientSSOORB + gen_client = connect_to_hbase_table('ztf.orb_cand') if trajectory_id is not None: to_evaluate = "key:key:cand_{}".format(trajectory_id) else: to_evaluate = "key:key:cand_" elif payload_name == 'lightcurves': - gen_client = clientSSOCAND + gen_client = connect_to_hbase_table('ztf.sso_cand') if 'start_date' in payload: start_date = Time(payload['start_date'], format='iso').jd @@ -588,10 +589,7 @@ def return_ssocand_pdf(payload: dict) -> pd.DataFrame: ) schema_client = gen_client.schema() - - # reset the limit in case it has been changed above - gen_client.setLimit(nlimit) - gen_client.setEvaluation("") + gen_client.close() if results.isEmpty(): return pd.DataFrame({}) @@ -647,17 +645,18 @@ def return_tracklet_pdf(payload: dict) -> pd.DataFrame: # Note the trailing _ to_evaluate = "key:key:{}".format(payload_name) - results = clientTRCK.scan( + client = connect_to_hbase_table('ztf.tracklet') + results = client.scan( "", to_evaluate, cols, 0, True, True ) - schema_client = clientTRCK.schema() + schema_client = client.schema() # reset the limit in case it has been changed above - clientTRCK.setLimit(nlimit) + client.close() pdf = format_hbase_output( results, @@ -705,6 +704,7 @@ def format_and_send_cutout(payload: dict) -> pd.DataFrame: filename = filename + '.fits' # Query the Database (object query) + client = connect_to_hbase_table('ztf') results = client.scan( "", "key:key:{}".format(payload['objectId']), @@ -714,6 +714,7 @@ def format_and_send_cutout(payload: dict) -> pd.DataFrame: # Format the results schema_client = client.schema() + pdf = format_hbase_output( results, schema_client, group_alerts=False, @@ -750,6 +751,7 @@ def format_and_send_cutout(payload: dict) -> pd.DataFrame: col='b:cutout{}_stampData'.format(payload['kind']), return_type='array' ) + client.close() array = pdf['b:cutout{}_stampData'.format(payload['kind'])].values[0] @@ -997,20 +999,21 @@ def return_bayestar_pdf(payload: dict) -> pd.DataFrame: jdstart = Time(header['DATE-OBS']).jd - n_day_min jdend = jdstart + n_day_max - clientP128.setRangeScan(True) - results = java.util.TreeMap() + client = connect_to_hbase_table('ztf.pixel128') + client.setRangeScan(True) + results = {} for pix in pixs: to_search = "key:key:{}_{},key:key:{}_{}".format(pix, jdstart, pix, jdend) - result = clientP128.scan( + result = client.scan( "", to_search, "*", 0, True, True ) - results.putAll(result) + results.update(result) - clientP128.setRangeScan(False) - schema_client = clientP128.schema() + schema_client = client.schema() + client.close() pdfs = format_hbase_output( results, @@ -1053,12 +1056,14 @@ def return_statistics_pdf(payload: dict) -> pd.DataFrame: to_evaluate = "key:key:ztf_{}".format(payload_date) - results = clientStats.scan( + client = connect_to_hbase_table('statistics_class') + results = client.scan( "", to_evaluate, cols, 0, True, True ) + client.close() pdf = pd.DataFrame.from_dict(results, orient='index') @@ -1129,9 +1134,10 @@ def return_random_pdf(payload: dict) -> pd.DataFrame: np.random.seed(int(payload['seed'])) # logic + client = connect_to_hbase_table('ztf.jd') results = [] - clientT.setLimit(1000) - clientT.setRangeScan(True) + client.setLimit(1000) + client.setRangeScan(True) jd_low = Time('2019-11-02 03:00:00.0').jd jd_high = Time.now().jd @@ -1154,7 +1160,7 @@ def return_random_pdf(payload: dict) -> pd.DataFrame: } results = return_latests_pdf(payload_data, return_raw=True) else: - results = clientT.scan( + results = client.scan( "", "key:key:{},key:key:{}".format(jdstart, jdstop), "", 0, False, False @@ -1165,10 +1171,12 @@ def return_random_pdf(payload: dict) -> pd.DataFrame: index_oid = np.random.randint(0, len(oids), number) oid = oids[index_oid] + client.close() + client = connect_to_hbase_table('ztf') client.setLimit(2000) # Get data from the main table - results = java.util.TreeMap() + results = {} for oid_ in oid: result = client.scan( "", @@ -1176,14 +1184,13 @@ def return_random_pdf(payload: dict) -> pd.DataFrame: "{}".format(cols), 0, False, False ) - results.putAll(result) + results.update(result) pdf = format_hbase_output( results, client.schema(), group_alerts=False, truncated=truncated ) - clientT.setLimit(nlimit) - client.setLimit(nlimit) + client.close() return pdf @@ -1228,21 +1235,20 @@ def return_anomalous_objects_pdf(payload: dict) -> pd.DataFrame: else: truncated = True - clientANOMALY.setLimit(nalerts) - clientANOMALY.setRangeScan(True) - clientANOMALY.setReversed(True) + client = connect_to_hbase_table('ztf.anomaly') + client.setLimit(nalerts) + client.setRangeScan(True) + client.setReversed(True) to_evaluate = "key:key:{},key:key:{}".format(jd_start, jd_stop) - results = clientANOMALY.scan( + results = client.scan( "", to_evaluate, cols, 0, True, True ) - schema_client = clientANOMALY.schema() - - # Restore default limits - clientANOMALY.setLimit(nlimit) + schema_client = client.schema() + client.close() # We want to return alerts # color computation is disabled @@ -1353,9 +1359,10 @@ def return_resolver_pdf(payload: dict) -> pd.DataFrame: reverse = True if resolver == 'tns': + client = connect_to_hbase_table('ztf.tns_resolver') if name == "": # return the full table - results = clientTNSRESOL.scan( + results = client.scan( "", "", "*", @@ -1363,10 +1370,10 @@ def return_resolver_pdf(payload: dict) -> pd.DataFrame: ) else: # TNS poll -- take the first nmax occurences - clientTNSRESOL.setLimit(nmax) + client.setLimit(nmax) if reverse: to_evaluate = "d:internalname:{}".format(name) - results = clientTNSRESOL.scan( + results = client.scan( "", to_evaluate, "*", @@ -1374,18 +1381,19 @@ def return_resolver_pdf(payload: dict) -> pd.DataFrame: ) else: to_evaluate = "key:key:{}".format(name) - results = clientTNSRESOL.scan( + results = client.scan( "", to_evaluate, "*", 0, False, False ) - # Restore default limits - clientTNSRESOL.setLimit(nlimit) + # Restore default limits + client.close() pdfs = pd.DataFrame.from_dict(results, orient='index') elif resolver == 'simbad': + client = connect_to_hbase_table('ztf') if reverse: to_evaluate = "key:key:{}".format(name) client.setLimit(nmax) @@ -1395,7 +1403,7 @@ def return_resolver_pdf(payload: dict) -> pd.DataFrame: "i:objectId,d:cdsxmatch,i:ra,i:dec", 0, False, False ) - client.setLimit(nlimit) + client.close() pdfs = pd.DataFrame.from_dict(results, orient='index') else: r = requests.get( @@ -1408,6 +1416,7 @@ def return_resolver_pdf(payload: dict) -> pd.DataFrame: else: pdfs = pd.DataFrame() elif resolver == 'ssodnet': + client = connect_to_hbase_table('ztf') if reverse: to_evaluate = "key:key:{}".format(name) client.setLimit(nmax) @@ -1417,7 +1426,7 @@ def return_resolver_pdf(payload: dict) -> pd.DataFrame: "i:objectId,i:ssnamenr", 0, False, False ) - client.setLimit(nlimit) + client.close() pdfs = pd.DataFrame.from_dict(results, orient='index') else: r = requests.get( @@ -1448,78 +1457,29 @@ def upload_euclid_data(payload: dict) -> pd.DataFrame: """ # Interpret user input data = payload['payload'] + pipeline_name = payload['pipeline'].lower() - if payload['pipeline'].lower() == 'ssopipe': - HEADER = [ - 'INDEX', - 'RA', - 'DEC', - 'PROP_MOT', - 'N_DET', - 'CATALOG', - 'X_WORLD', - 'Y_WORLD', - 'ERRA_WORLD', - 'ERRB_WORLD', - 'FLUX_AUTO', - 'FLUXERR_AUTO', - 'MAG_AUTO', - 'MAGERR_AUTO', - 'ELONGATION', - 'ELLIPTICITY', - 'MJD' - ] - - elif payload['pipeline'].lower() == 'streakdet': - HEADER = [ - 'Obj_id', - 'Dither', - 'NDet', - 'RA_middle', - 'DEC_middle', - 'RA_start', - 'DEC_start', - 'RA_end', - 'DEC_end', - 'MJD_middle', - 'MJD_start', - 'MJD_end', - 'FLUX_AUTO', - 'MAG_AUTO' - ] - - elif payload['pipeline'].lower() == 'dl': - HEADER = [ - 'Obj_id', - 'Dither', - 'NDet', - 'RA_middle', - 'DEC_middle', - 'RA_start', - 'DEC_start', - 'RA_end', - 'DEC_end', - 'MJD_middle', - 'MJD_start', - 'MJD_end', - 'FLUX_AUTO', - 'MAG_AUTO', - 'Score' - ] + header = load_euclid_header(pipeline_name) + euclid_header = header.keys() pdf = pd.read_csv(io.BytesIO(eval(data)), header=0, sep=' ', index_col=False) # BUG: look for element-wise comparison method - if ~np.all(pdf.columns == np.array(HEADER)): - missingfields = [field for field in HEADER if field not in pdf.columns] - newfields = [field for field in pdf.columns if field not in HEADER] + if ~np.all(pdf.columns == np.array(euclid_header)): + missingfields = [field for field in euclid_header if field not in pdf.columns] + newfields = [field for field in pdf.columns if field not in euclid_header] msg = """ WARNING: we detected a change in the schema. Missing fields: {} New fields: {} """.format(missingfields, newfields) else: - msg = 'Uploaded!' + # add a column with the name of the pipeline + pdf['pipeline'] = pipeline_name + + + + msg = 'Uploaded!' return Response( '{} - {} - {} - {} - {}'.format( @@ -1534,8 +1494,9 @@ def upload_euclid_data(payload: dict) -> pd.DataFrame: def post_metadata(payload: dict) -> Response: """ Upload metadata in Fink """ + client = connect_to_hbase_table('ztf.metadata') encoded = payload['internal_name'].replace(' ', '') - clientMeta.put( + client.put( payload['objectId'].strip(), [ 'd:internal_name:{}'.format(payload['internal_name']), @@ -1544,6 +1505,7 @@ def post_metadata(payload: dict) -> Response: 'd:username:{}'.format(payload['username']) ] ) + client.close() return Response( 'Thanks {} - You can visit {}/{}'.format( @@ -1556,27 +1518,30 @@ def post_metadata(payload: dict) -> Response: def retrieve_metadata(objectId: str) -> pd.DataFrame: """ Retrieve metadata in Fink given a ZTF object ID """ + client = connect_to_hbase_table('ztf.metadata') to_evaluate = "key:key:{}".format(objectId) - results = clientMeta.scan( + results = client.scan( "", to_evaluate, "*", 0, False, False ) pdf = pd.DataFrame.from_dict(results, orient='index') - + client.close() return pdf def retrieve_oid(metaname: str, field: str) -> pd.DataFrame: """ Retrieve a ZTF object ID given metadata in Fink """ + client = connect_to_hbase_table('ztf.metadata') to_evaluate = "d:{}:{}:exact".format(field, metaname) - results = clientMeta.scan( + results = client.scan( "", to_evaluate, "*", 0, True, True ) pdf = pd.DataFrame.from_dict(results, orient='index') + client.close() return pdf diff --git a/apps/cards.py b/apps/cards.py index 0f76b3a7..ab18b6ba 100644 --- a/apps/cards.py +++ b/apps/cards.py @@ -289,15 +289,15 @@ def create_external_links_brokers(objectId): ) return buttons -@app.callback( - Output('card_id_col', 'children'), - [ - Input('object-data', 'children'), - ]) -def card_id(object_data): +# @app.callback( +# Output('card_id_col', 'children'), +# [ +# Input('object-data', 'children'), +# ]) +def card_id(pdf): """ Add a card containing basic alert data """ - pdf = pd.read_json(object_data) + # pdf = pd.read_json(object_data) objectid = pdf['i:objectId'].values[0] ra0 = pdf['i:ra'].values[0] dec0 = pdf['i:dec'].values[0] diff --git a/apps/client.py b/apps/client.py new file mode 100644 index 00000000..1603d43c --- /dev/null +++ b/apps/client.py @@ -0,0 +1,65 @@ +# Copyright 2023 AstroLab Software +# Author: Julien Peloton +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from apps import __file__ as apps_loc +import os +import jpype +import jpype.imports + +import yaml + + +def initialise_jvm(): + """ + """ + if not jpype.isJVMStarted(): + path = os.path.dirname(apps_loc) + '/../bin/FinkBrowser.exe.jar' + jarpath = "-Djava.class.path={}".format(path) + jpype.startJVM(jpype.getDefaultJVMPath(), "-ea", jarpath, convertStrings=True) + + jpype.attachThreadToJVM() + +def connect_to_hbase_table(tablename: str, nlimit=10000, setphysicalrepo=False): + """ Return a client connected to a HBase table + + Parameters + ---------- + tablename: str + The name of the table + nlimit: int, optional + Maximum number of objects to return. Default is 10000 + setphysicalrepo: bool + If True, store cutouts queried on disk ("/tmp/Lomikel/HBaseClientBinaryDataRepository") + Needs client 02.01+. Default is False + """ + initialise_jvm() + + args = yaml.load( + open(os.path.dirname(apps_loc) + '/../config.yml'), + yaml.Loader + ) + + import com.Lomikel.HBaser + from com.astrolabsoftware.FinkBrowser.Utils import Init + + Init.init() + + client = com.Lomikel.HBaser.HBaseClient(args['HBASEIP'], args['ZOOPORT']); + client.connect(tablename, args['SCHEMAVER']) + if setphysicalrepo: + import com.Lomikel.HBaser.FilesBinaryDataRepository + client.setRepository(com.Lomikel.HBaser.FilesBinaryDataRepository()) + client.setLimit(nlimit) + + return client diff --git a/apps/euclid/__init__.py b/apps/euclid/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/euclid/utils.py b/apps/euclid/utils.py new file mode 100644 index 00000000..e230e646 --- /dev/null +++ b/apps/euclid/utils.py @@ -0,0 +1,89 @@ +# Copyright 2023 AstroLab Software +# Author: Julien Peloton +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def load_euclid_header(pipeline=None): + """ Load the header from a Euclid pipeline + + Parameters + ---------- + pipeline: str + Name of the pipeline: ssopipe, streakdet, dl + + Returns + ---------- + header: dict + Keys are Euclid pipeline names, values are Fink translated names + """ + if pipeline == 'ssopipe': + HEADER = { + 'INDEX': 'index', + 'RA': 'ra', + 'DEC': 'dec', + 'PROP_MOT': 'prop_mot', + 'N_DET': 'n_det', + 'CATALOG': 'catalog', + 'X_WORLD': 'x_world', + 'Y_WORLD': 'y_world', + 'ERRA_WORLD': 'erra_world', + 'ERRB_WORLD': 'errb_world', + 'FLUX_AUTO': 'flux_auto', + 'FLUXERR_AUTO': 'fluxerr_auto', + 'MAG_AUTO': 'mag_auto', + 'MAGERR_AUTO': 'magerr_auto', + 'ELONGATION': 'elongation', + 'ELLIPTICITY': 'ellipticity', + 'MJD': 'mjd' + } + elif pipeline == 'streakdet': + HEADER = { + 'Obj_id': 'index', + 'Dither': 'dither', + 'NDet': 'n_det', + 'RA_middle': 'ra_middle', + 'DEC_middle': 'dec_middle', + 'RA_start': 'ra_start', + 'DEC_start': 'dec_start', + 'RA_end': 'ra_end', + 'DEC_end': 'dec_end', + 'MJD_middle': 'mjd_middle', + 'MJD_start': 'mjd_start', + 'MJD_end': 'mjd_end', + 'FLUX_AUTO': 'flux_auto', + 'MAG_AUTO': 'mag_auto' + } + elif pipeline == 'dl': + HEADER = { + 'Obj_id': 'index', + 'Dither': 'dither', + 'NDet': 'n_det', + 'RA_middle': 'ra_middle', + 'DEC_middle': 'dec_middle', + 'RA_start': 'ra_start', + 'DEC_start': 'dec_start', + 'RA_end': 'ra_end', + 'DEC_end': 'dec_end', + 'MJD_middle': 'mjd_middle', + 'MJD_start': 'mjd_start', + 'MJD_end': 'mjd_end', + 'FLUX_AUTO': 'flux_auto', + 'MAG_AUTO': 'mag_auto', + 'Score': 'score' + } + else: + print('Pipeline name {} not understood'.format(pipeline)) + HEADER = {} + + return HEADER + diff --git a/apps/plotting.py b/apps/plotting.py index 53420456..9be48dda 100644 --- a/apps/plotting.py +++ b/apps/plotting.py @@ -18,7 +18,6 @@ from scipy.optimize import curve_fit import datetime -import java import copy from astropy.time import Time from astropy.coordinates import SkyCoord @@ -37,6 +36,8 @@ from apps.utils import sine_fit from apps.utils import class_colors from apps.statistics import dic_names +from apps.client import connect_to_hbase_table + from app import APIURL from fink_utils.sso.spins import func_hg, func_hg12, func_hg1g2, func_hg1g2_with_spin @@ -51,7 +52,7 @@ import astropy.units as u from sbpy.data import Obs -from app import client, app, clientSSO, clientStats +from app import app COLORS_ZTF = ['#15284F', '#F5622E'] COLORS_ZTF_RGB = ['rgba(21, 40, 79, 1, 0.2)', 'rgba(245, 98, 46, 1, 0.2)'] @@ -1530,11 +1531,23 @@ def extract_cutout(object_data, time0, kind): position = np.where(jds == jd0)[0][0] # Grab the cutout data + client = connect_to_hbase_table('ztf') + + # Fire the client to trigger cutout data retrieval + _ = client.scan( + "", + "key:key:{}".format(pdf_['i:objectId'].values[0]), + "*i:candid,b:cutout{}_stampData".format(kind.capitalize()), + 0, False, False + ) + + # actually grab the data cutout = readstamp( client.repository().get( pdfs['b:cutout{}_stampData'.format(kind.capitalize())].values[position] ) ) + client.close() return cutout @app.callback( @@ -3053,6 +3066,7 @@ def plot_stat_evolution(pathname, param_name, switch): else: param_name_ = param_name + clientStats = connect_to_hbase_table('ztf.statistics_class') results = clientStats.scan( "", "key:key:ztf_", @@ -3061,6 +3075,7 @@ def plot_stat_evolution(pathname, param_name, switch): False, False ) + clientStats.close() # Construct the dataframe pdf = pd.DataFrame.from_dict(results, orient='index') @@ -3399,6 +3414,7 @@ def make_daily_card(pdf, color, linecolor, title, description, height='12pc', sc def hist_sci_raw(pathname, dropdown_days): """ Make an histogram """ + clientStats = connect_to_hbase_table('ztf.statistics_class') results = clientStats.scan( "", "key:key:ztf_", @@ -3407,6 +3423,7 @@ def hist_sci_raw(pathname, dropdown_days): False, False ) + clientStats.close() # Construct the dataframe pdf = pd.DataFrame.from_dict(results, orient='index') @@ -3439,6 +3456,7 @@ def hist_sci_raw(pathname, dropdown_days): def hist_catalogued(pathname, dropdown_days): """ Make an histogram """ + clientStats = connect_to_hbase_table('ztf.statistics_class') results = clientStats.scan( "", "key:key:ztf_", @@ -3447,6 +3465,7 @@ def hist_catalogued(pathname, dropdown_days): False, False ) + clientStats.close() # Construct the dataframe pdf = pd.DataFrame.from_dict(results, orient='index') @@ -3481,6 +3500,7 @@ def hist_catalogued(pathname, dropdown_days): def hist_classified(pathname, dropdown_days): """ Make an histogram """ + clientStats = connect_to_hbase_table('ztf.statistics_class') results = clientStats.scan( "", "key:key:ztf_", @@ -3489,6 +3509,7 @@ def hist_classified(pathname, dropdown_days): False, False ) + clientStats.close() # Construct the dataframe pdf = pd.DataFrame.from_dict(results, orient='index') @@ -3526,6 +3547,7 @@ def hist_classified(pathname, dropdown_days): def hist_candidates(pathname, dropdown_days): """ Make an histogram """ + clientStats = connect_to_hbase_table('ztf.statistics_class') results = clientStats.scan( "", "key:key:ztf_", @@ -3534,6 +3556,7 @@ def hist_candidates(pathname, dropdown_days): False, False ) + clientStats.close() # Construct the dataframe pdf = pd.DataFrame.from_dict(results, orient='index') @@ -3571,6 +3594,7 @@ def hist_candidates(pathname, dropdown_days): def fields_exposures(pathname, dropdown_days): """ Make an histogram """ + clientStats = connect_to_hbase_table('ztf.statistics_class') results = clientStats.scan( "", "key:key:ztf_", @@ -3579,6 +3603,7 @@ def fields_exposures(pathname, dropdown_days): False, False ) + clientStats.close() # Construct the dataframe pdf = pd.DataFrame.from_dict(results, orient='index') diff --git a/apps/statistics.py b/apps/statistics.py index 9a8ae736..60be9bf1 100644 --- a/apps/statistics.py +++ b/apps/statistics.py @@ -15,7 +15,8 @@ from dash import html, dcc, Input, Output import dash_bootstrap_components as dbc -from app import app, clientStats +from app import app +from apps.client import connect_to_hbase_table import numpy as np import pandas as pd @@ -121,6 +122,7 @@ def store_stat_query(name): name = 'ztf_' cols = 'basic:raw,basic:sci,basic:fields,basic:exposures,class:Unknown' + clientStats = connect_to_hbase_table('statistics_class') results = clientStats.scan( "", "key:key:{}".format(name), @@ -129,6 +131,7 @@ def store_stat_query(name): True, True ) + clientStats.close() # Construct the dataframe pdf = pd.DataFrame.from_dict(results, orient='index') @@ -339,6 +342,7 @@ def daily_stats(): def generate_night_list(): """ Generate the list of available nights (last night first) """ + clientStats = connect_to_hbase_table('statistics_class') results = clientStats.scan( "", "key:key:ztf_", @@ -347,6 +351,7 @@ def generate_night_list(): True, True ) + clientStats.close() # Construct the dataframe pdf = pd.DataFrame.from_dict(results, orient='index') @@ -371,7 +376,9 @@ def generate_night_list(): def generate_col_list(): """ Generate the list of available columns """ + clientStats = connect_to_hbase_table('statistics_class') schema = clientStats.schema() + clientStats.close() schema_list = list(schema.columnNames()) labels = [ @@ -404,6 +411,7 @@ def get_data_one_night(night): """ Get the statistics for one night """ cols = 'basic:raw,basic:sci,basic:fields,basic:exposures' + clientStats = connect_to_hbase_table('statistics_class') results = clientStats.scan( "", "key:key:{}".format(night), @@ -412,6 +420,7 @@ def get_data_one_night(night): True, True ) + clientStats.close() # Construct the dataframe pdf = pd.DataFrame.from_dict(results, orient='index') diff --git a/apps/summary.py b/apps/summary.py index 77674774..da99b3a1 100644 --- a/apps/summary.py +++ b/apps/summary.py @@ -28,7 +28,9 @@ import rocks -from app import app, client, clientU, clientUV, clientSSO, clientTRCK +from app import app + +from apps.client import connect_to_hbase_table from apps.supernovae.cards import card_sn_scores from apps.varstars.cards import card_explanation_variable, card_variable_button @@ -37,7 +39,7 @@ from apps.sso.cards import card_sso_left from apps.cards import card_lightcurve_summary -from apps.cards import card_id1 +from apps.cards import card_id from apps.cards import create_external_links, create_external_links_brokers from apps.plotting import draw_sso_lightcurve, draw_sso_astrometry, draw_sso_residual @@ -59,7 +61,7 @@ dcc.Location(id='url', refresh=False) -def tab1_content(extra_div): +def tab1_content(pdf, extra_div): """ Summary tab """ tab1_content_ = html.Div([ @@ -81,7 +83,7 @@ def tab1_content(extra_div): ), dbc.Row([ dbc.Col([extra_div, card_lightcurve_summary()], width=8), - dbc.Col(id="card_id_col", width=4) + dbc.Col(card_id(pdf), width=4) ]), ]) @@ -460,7 +462,7 @@ def tabs(pdf, is_mobile): dmc.Tab("GRB", value="GRB", disabled=True) ], position='right' ), - dmc.TabsPanel(tab1_content(extra_div), value="Summary"), + dmc.TabsPanel(tab1_content(pdf, extra_div), value="Summary"), dmc.TabsPanel(tab2_content(), value="Supernovae"), dmc.TabsPanel(tab3_content(), value="Variable stars"), dmc.TabsPanel(tab4_content(), value="Microlensing"), @@ -667,19 +669,26 @@ def store_query(name): raise PreventUpdate else: oid = name[1:] + client = connect_to_hbase_table('ztf') results = client.scan("", "key:key:{}".format(oid), "*", 0, True, True) schema_client = client.schema() pdfs = format_hbase_output(results, schema_client, group_alerts=False) + client.close() + clientU = connect_to_hbase_table('ztf.upper') uppers = clientU.scan("", "key:key:{}".format(oid), "*", 0, True, True) pdfsU = pd.DataFrame.from_dict(uppers, orient='index') + clientU.close() + clientUV = connect_to_hbase_table('ztf.uppervalid') uppersV = clientUV.scan("", "key:key:{}".format(oid), "*", 0, True, True) pdfsUV = pd.DataFrame.from_dict(uppersV, orient='index') + clientUV.close() payload = pdfs['i:ssnamenr'].values[0] is_sso = np.alltrue([i == payload for i in pdfs['i:ssnamenr'].values]) if str(payload) != 'null' and is_sso: + clientSSO = connect_to_hbase_table('ztf.ssnamenr') results = clientSSO.scan( "", "key:key:{}_".format(payload), @@ -691,6 +700,7 @@ def store_query(name): results, schema_client_sso, group_alerts=False, truncated=False, extract_color=False ) + clientSSO.close() if pdfsso.empty: # This can happen for SSO candidate with a ssnamenr @@ -707,6 +717,7 @@ def store_query(name): payload = pdfs['d:tracklet'].values[0] if str(payload).startswith('TRCK'): + clientTRCK = connect_to_hbase_table('ztf.tracklet') results = clientTRCK.scan( "", "key:key:{}".format(payload), @@ -718,6 +729,7 @@ def store_query(name): results, schema_client_tracklet, group_alerts=False, truncated=False, extract_color=False ) + clientTRCK.close() else: pdftracklet = pd.DataFrame() return pdfs.to_json(), pdfsU.to_json(), pdfsUV.to_json(), pdfsso.to_json(), pdftracklet.to_json() diff --git a/apps/utils.py b/apps/utils.py index fdeb0fb3..c61b601b 100644 --- a/apps/utils.py +++ b/apps/utils.py @@ -19,6 +19,7 @@ import io import requests import base64 +import yaml import qrcode from qrcode.image.styledpil import StyledPilImage @@ -43,7 +44,7 @@ from fink_utils.photometry.conversion import dc_mag from fink_utils.xmatch.simbad import get_simbad_labels -from app import APIURL +from app import APIURL, nlimit simbad_types = get_simbad_labels('old_and_new') simbad_types = sorted(simbad_types, key=lambda s: s.lower()) @@ -76,7 +77,7 @@ def format_hbase_output( extract_color: bool = True, with_constellation: bool = True): """ """ - if hbase_output.isEmpty(): + if len(hbase_output) == 0: return pd.DataFrame({}) # Construct the dataframe @@ -827,4 +828,4 @@ def retrieve_oid_from_metaname(name): if r.json() != []: return r.json()[0]['key:key'] - return None \ No newline at end of file + return None diff --git a/bin/FinkBrowser.exe.jar b/bin/FinkBrowser.exe.jar index 2680eed7..a1af4166 100644 Binary files a/bin/FinkBrowser.exe.jar and b/bin/FinkBrowser.exe.jar differ diff --git a/index.py b/index.py index 88416dfe..ff0d79d1 100644 --- a/index.py +++ b/index.py @@ -25,11 +25,11 @@ from app import server from app import app -from app import client from app import APIURL from apps import summary, about, statistics, query_cluster, gw from apps.api import api +from apps.client import connect_to_hbase_table from apps import __version__ as portal_version from apps.utils import markdownify_objectid, class_colors, simbad_types @@ -453,7 +453,9 @@ def display_table_results(table, is_mobile): 2. Table of results The dropdown is shown only if the table is non-empty. """ + client = connect_to_hbase_table('ztf') schema = client.schema() + client.close() schema_list = list(schema.columnNames()) fink_fields = [i for i in schema_list if i.startswith('d:')] ztf_fields = [i for i in schema_list if i.startswith('i:')]