diff --git a/bento_beacon/app.py b/bento_beacon/app.py index 3abb891..7766e16 100644 --- a/bento_beacon/app.py +++ b/bento_beacon/app.py @@ -9,6 +9,8 @@ from .endpoints.biosamples import biosamples from .endpoints.cohorts import cohorts from .endpoints.datasets import datasets +from .network.network import network +from .network.utils import init_network_service_registry from .utils.exceptions import APIException from werkzeug.exceptions import HTTPException from .authz.middleware import authz_middleware @@ -44,7 +46,7 @@ app.register_blueprint(info) -blueprints = { +endpoint_blueprints = { "biosamples": biosamples, "cohorts": cohorts, "datasets": datasets, @@ -53,12 +55,21 @@ } with app.app_context(): - # load blueprints + # load blueprints for endpoints endpoint_sets = current_app.config["BEACON_CONFIG"].get("endpointSets") for endpoint_set in endpoint_sets: if endpoint_set not in BEACON_MODELS: raise APIException(message="beacon config contains unknown endpoint set") - app.register_blueprint(blueprints[endpoint_set]) + app.register_blueprint(endpoint_blueprints[endpoint_set]) + + # load blueprint for network + if current_app.config["USE_BEACON_NETWORK"]: + app.register_blueprint(network) + try: + init_network_service_registry() + except APIException: + # trouble setting up network, swallow for now + current_app.logger.error("API Error when initializing beacon network") # get censorship settings from katsu max_filters = None diff --git a/bento_beacon/config_files/config.py b/bento_beacon/config_files/config.py index 93f4b0a..022dcee 100644 --- a/bento_beacon/config_files/config.py +++ b/bento_beacon/config_files/config.py @@ -1,6 +1,7 @@ import json import os import urllib3 +from ..constants import GRANULARITY_COUNT, GRANULARITY_RECORD GA4GH_BEACON_REPO_URL = "https://raw.githubusercontent.com/ga4gh-beacon/beacon-v2" @@ -27,12 +28,13 @@ class Config: # default when no requested granularity, as well as max granularity for anonymous users DEFAULT_GRANULARITY = { - "individuals": "count", - "variants": "count", - "biosamples": "count", - "cohorts": "record", - "datasets": "record", - "info": "record", + "individuals": GRANULARITY_COUNT, + "variants": GRANULARITY_COUNT, + "biosamples": GRANULARITY_COUNT, + "cohorts": GRANULARITY_RECORD, + "datasets": GRANULARITY_RECORD, + "info": GRANULARITY_RECORD, + "network": GRANULARITY_COUNT, } DEFAULT_PAGINATION_PAGE_SIZE = 10 @@ -166,8 +168,6 @@ class Config: MAP_EXTRA_PROPERTIES_TO_INFO = str_to_bool(os.environ.get("MAP_EXTRA_PROPERTIES_TO_INFO", "")) - PHENOPACKETS_SCHEMA_REFERENCE = {"entityType": "individual", "schema": "phenopackets v1"} - MAX_RETRIES_FOR_CENSORSHIP_PARAMS = 2 # don't let anonymous users query arbitrary phenopacket or experiment fields @@ -229,3 +229,23 @@ def retrieve_config_json(filename): BEACON_COHORT = retrieve_config_json("beacon_cohort.json") BEACON_CONFIG = retrieve_config_json("beacon_config.json") + + # ------------------- + # network + + USE_BEACON_NETWORK = os.environ.get("BENTO_BEACON_NETWORK_ENABLED", "false").strip().lower() in ("true", "1", "t") + + NETWORK_CONFIG = retrieve_config_json("beacon_network_config.json") + + NETWORK_URLS = NETWORK_CONFIG.get("beacons", []) + NETWORK_DEFAULT_TIMEOUT_SECONDS = NETWORK_CONFIG.get("network_default_timeout_seconds", 30) + NETWORK_VARIANTS_QUERY_TIMEOUT_SECONDS = NETWORK_CONFIG.get("network_variants_query_timeout_seconds", GOHAN_TIMEOUT) + NETWORK_VALID_QUERY_ENDPOINTS = [ + "analyses", + "biosamples", + "cohorts", + "datasets", + "g_variants", + "individuals", + "runs", + ] diff --git a/bento_beacon/network/__init__.py b/bento_beacon/network/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bento_beacon/network/bento_public_query.py b/bento_beacon/network/bento_public_query.py new file mode 100644 index 0000000..653ee4a --- /dev/null +++ b/bento_beacon/network/bento_public_query.py @@ -0,0 +1,91 @@ +import copy + +# TEMP FILE +# +# handling for bento public query terms, currently how beacon UI handles search options to present to the user +# to be replaced by beacon spec filtering_terms in a future version +# best approach here is not yet clear: +# - intersection of all query terms is too small +# - union of all query terms loses any organization into categories, which varies across instances +# +# we may prefer to make network query terms configurable rather than generating them automatically + + +def flatten(nested_list): + return [item for nested_items in nested_list for item in nested_items] + + +def fields_dict(search_fields): + """ + Given a list of bento_public search fields, one for each instance, + return a dictionary of search fields keyed to phenopackets mapping, with an array of all fields for that mapping + """ + # create a single array of all search fields for all instances, removing nesting + copy = search_fields[:] + + all_fields = [] + for sf in copy: + for f in sf: + all_fields.extend(f["fields"]) + + # make a dict of entries, keyed to phenopackets mapping + group_by, etc, keeping duplicate values + all_fields_by_mapping = {} + for f in all_fields: + field_key = f["mapping"] + f.get("group_by", "") + f.get("group_by_value", "") + f.get("value_mapping", "") + all_fields_by_mapping[field_key] = all_fields_by_mapping.get(field_key, []) + [f] + + return all_fields_by_mapping + + +def options_union(options_list): + # remove duplicates but keep any ordering + return list(dict.fromkeys(flatten(options_list[:]))) + + +def options_intersection(options_list): + num_instances = len(options_list) + flat_options = flatten(options_list[:]) + # only keep options that are present in all instances, preserving order + counter = {} + for option in flat_options: + counter[option] = counter.get(option, 0) + 1 + + intersection = [key for key in counter if counter[key] == num_instances] + return intersection + + +# any filters that exist in all beacons +# bins should be joined also, although some ordering may disappear +# still unclear if this is an useful feature or not +# shortcomings here can be addressed by keeping our configs consistent where possible +def fields_union(search_fields): + fields = fields_dict(search_fields) + + # create one entry for each mapping + union_fields = [] + for f in fields.values(): + entry = copy.deepcopy(f[0]) # arbitrarily get name, description, etc from first entry + entry["options"] = options_union([e["options"] for e in f]) + union_fields.append(entry) + + return union_fields + + +def fields_intersection(search_fields): + num_instances = len(search_fields) + fields = fields_dict(search_fields) + + # remove any fields not in all entries + intersection_dict = {mapping: entries for mapping, entries in fields.items() if len(entries) == num_instances} + + # create one entry for each mapping + intersection_fields = [] + for f in intersection_dict.values(): + entry = {} + entry = copy.deepcopy(f[0]) # arbitrarily get name, description, etc from first entry + options = options_intersection([e["options"] for e in f]) + if options: + entry["options"] = options + intersection_fields.append(entry) + + return intersection_fields diff --git a/bento_beacon/network/network.py b/bento_beacon/network/network.py new file mode 100644 index 0000000..5a3e0c3 --- /dev/null +++ b/bento_beacon/network/network.py @@ -0,0 +1,58 @@ +from flask import current_app, request, Blueprint +from ..utils.exceptions import APIException, NotFoundException +from .utils import network_beacon_get, network_beacon_post, host_beacon_response, filters_intersection, filters_union + +network = Blueprint("network", __name__, url_prefix="/network") + + +# TODOs: +# filtering terms XXXXXXXXXXXXXXXXXXXXXXXXXXX +# /service-info? there's already one at beacon root +# async calls + +# standard beacon info endpoints at the network level: /map, /configuration, etc +# handle GET args + + +@network.route("") +@network.route("/beacons") +def network_beacons(): + beacons_dict = current_app.config.get("NETWORK_BEACONS") + if not beacons_dict: + raise APIException("no beacons found in network config") + + # filters handling still experimental + return { + "filtersUnion": current_app.config["ALL_NETWORK_FILTERS"], + "filtersIntersection": current_app.config["COMMON_NETWORK_FILTERS"], + "beacons": list(beacons_dict.values()), + } + + +# returns 404 if endpoint missing +@network.route("/beacons//", methods=["GET", "POST"]) +def query(beacon_id, endpoint): + beacon = current_app.config["NETWORK_BEACONS"].get(beacon_id) + + if not beacon: + raise NotFoundException(message=f"no beacon found with id {beacon_id}") + + if endpoint not in current_app.config["NETWORK_VALID_QUERY_ENDPOINTS"]: + raise NotFoundException() + + # special handling for host beacon, avoid circular http calls + host_id = current_app.config["BEACON_ID"] + if beacon_id == host_id: + return host_beacon_response(endpoint) + + # all other beacons + api_url = beacon.get("apiUrl") + + if request.method == "POST": + payload = request.get_json() + r = network_beacon_post(api_url, payload, endpoint) + else: + # TODO: pass get args + r = network_beacon_get(api_url, endpoint) + + return r diff --git a/bento_beacon/network/utils.py b/bento_beacon/network/utils.py new file mode 100644 index 0000000..63f93e4 --- /dev/null +++ b/bento_beacon/network/utils.py @@ -0,0 +1,216 @@ +import requests +from flask import current_app +from urllib.parse import urlsplit, urlunsplit +from json import JSONDecodeError +from ..utils.exceptions import APIException +from ..utils.katsu_utils import overview_statistics, get_katsu_config_search_fields +from ..endpoints.info import build_service_details, overview +from ..endpoints.biosamples import get_biosamples +from ..endpoints.cohorts import get_cohorts +from ..endpoints.datasets import get_datasets +from ..endpoints.individuals import get_individuals +from ..endpoints.variants import get_variants +from .bento_public_query import fields_intersection, fields_union + +# future versions will pull metadata query info directly from network beacons instead of network katsus +# to deprecate in Bento 18 +PUBLIC_SEARCH_FIELDS_PATH = "/api/metadata/api/public_search_fields" + + +DEFAULT_ENDPOINT = "individuals" +OVERVIEW_STATS_QUERY = { + "meta": {"apiVersion": "2.0.0"}, + "query": {"requestParameters": {}, "filters": [], "includeResultsetResponses": "ALL"}, + "bento": {"showSummaryStatistics": True}, +} +HOST_VIEWS_BY_ENDPOINT = { + "biosamples": get_biosamples, + "cohorts": get_cohorts, + "datasets": get_datasets, + "individuals": get_individuals, + "variants": get_variants, +} + + +# get network node info for this beacon, which is also hosting the network +# call methods directly instead of circular http calls +def info_for_host_beacon(): + service_details = build_service_details() + + # TODO: fix ugly overlapping overview functions + # requires rolling out changes to all beacons first + bento_overview = overview() + bento_private_overview = overview_statistics() + experiment_stats = {"count": bento_private_overview.get("count", 0)} + biosample_stats = { + "count": bento_private_overview.get("phenopacket", {}) + .get("data_type_specific", {}) + .get("biosamples", {}) + .get("count", 0) + } + + api_url = current_app.config["BEACON_BASE_URL"] + + return { + **service_details, + "apiUrl": api_url, + "b_id": current_app.config["BEACON_ID"], + "overview": { + "individuals": {"count": bento_overview.get("counts", {}).get("individuals")}, + "variants": bento_overview.get("counts", {}).get("variants", {}), + "biosamples": biosample_stats, + "experiments": experiment_stats, + }, + "querySections": get_katsu_config_search_fields(requires_auth="none").get("sections", []), + } + + +def host_beacon_response(endpoint): + # endpoint already known to be valid + return HOST_VIEWS_BY_ENDPOINT[endpoint]() + + +def has_variants_query(payload): + if not payload: + return False + query = payload.get("requestParameters", {}).get("g_variant") + return bool(query) + + +def network_beacon_call(method, url, payload=None): + current_app.logger.info(f"Calling network url: {url}") + timeout = ( + current_app.config["NETWORK_VARIANTS_QUERY_TIMEOUT_SECONDS"] + if has_variants_query(payload) + else current_app.config["NETWORK_DEFAULT_TIMEOUT_SECONDS"] + ) + + try: + if method == "GET": + r = requests.get(url, timeout=timeout) + else: + r = requests.post(url, json=payload, timeout=timeout) + beacon_response = r.json() + + except (requests.exceptions.RequestException, JSONDecodeError) as e: + current_app.logger.error(e) + msg = f"beacon network error calling url {url}: {e}" + raise APIException(message=msg) + + return beacon_response + + +def network_beacon_get(root_url, endpoint=None): + url = root_url if endpoint is None else root_url + "/" + endpoint + return network_beacon_call("GET", url) + + +def network_beacon_post(root_url, payload={}, endpoint=None): + url = root_url if endpoint is None else root_url + "/" + endpoint + return network_beacon_call("POST", url, payload) + + +def make_network_filtering_terms(beacons): + all_query_sections = [b["querySections"] for b in beacons.values()] + current_app.config["ALL_NETWORK_FILTERS"] = filters_union(all_query_sections) + current_app.config["COMMON_NETWORK_FILTERS"] = filters_intersection(all_query_sections) + pass + + +def init_network_service_registry(): + current_app.logger.info("registering beacons") + urls = current_app.config["NETWORK_URLS"] + if not urls: + current_app.logger.error("can't find urls for beacon network, did you forget a config file?") + # this isn't driven by a request, so no point serving API error response here + return + network_beacons = {} + failed_beacons = [] + host_beacon_url = current_app.config["BEACON_BASE_URL"] + current_app.logger.debug(f"host url: {host_beacon_url}") + for url in urls: + + # special handling for calling the beacon this network is hosted on + if url == host_beacon_url: + host_id = current_app.config["BEACON_ID"] + network_beacons[host_id] = info_for_host_beacon() + continue + + # all other beacons + try: + b = network_beacon_get(url, endpoint="overview") + beacon_info = b.get("response") + + except APIException: + failed_beacons.append(url) + current_app.logger.error(f"error contacting network beacon {url}") + continue + + if not beacon_info: + failed_beacons.append(url) + current_app.logger.error(f"bad response from network beacon {url}") + continue + + beacon_info["apiUrl"] = url + + # organize overview stats + # TODO (Redmine #2170) modify beacon /overview so we don't have to make two calls here, with different response formats + + # TODO: filters here?? + biosample_and_experiment_stats = ( + network_beacon_post(url, OVERVIEW_STATS_QUERY, DEFAULT_ENDPOINT).get("info", {}).get("bento") + ) + individual_and_variant_stats = beacon_info.get("overview", {}).get("counts") + + overview = { + "individuals": {"count": individual_and_variant_stats.get("individuals")}, + "variants": individual_and_variant_stats.get("variants"), + **biosample_and_experiment_stats, + } + + b_id = beacon_info.get("id") + network_beacons[b_id] = beacon_info + network_beacons[b_id]["overview"] = overview + + # Note: v15 katsu does not respond here + # TODO (longer): serve beacon spec filtering terms instead of bento public querySections + network_beacons[b_id]["querySections"] = get_public_search_fields(url).get("sections", []) # temp + + # make a merged overview? + # what about merged filtering_terms? + current_app.logger.info( + f"registered {len(network_beacons)} beacon{'' if len(network_beacons) == 1 else 's'} in network: {', '.join(network_beacons)}" + ) + if failed_beacons: + current_app.logger.error( + f"{len(failed_beacons)} network beacon{'' if len(failed_beacons) == 1 else 's'} failed to respond: {', '.join(failed_beacons)}" + ) + + make_network_filtering_terms(network_beacons) + current_app.config["NETWORK_BEACONS"] = network_beacons + + +########################################## +# Temp utils for bento public search terms + + +# deprecate in Bento 18 +def get_public_search_fields(beacon_url): + fields_url = public_search_fields_url(beacon_url) + current_app.logger.info(f"trying public fields url {fields_url}") + fields = network_beacon_get(fields_url) + return fields + + +# deprecate in Bento 18 +def public_search_fields_url(beacon_url): + split_url = urlsplit(beacon_url) + return urlunsplit((split_url.scheme, "portal." + split_url.netloc, PUBLIC_SEARCH_FIELDS_PATH, "", "")) + + +def filters_union(all_search_fields): + return [{"section_title": "All Filters", "fields": fields_union(all_search_fields)}] + + +def filters_intersection(all_search_fields): + return [{"section_title": "Common Filters", "fields": fields_intersection(all_search_fields)}] diff --git a/bento_beacon/utils/gohan_utils.py b/bento_beacon/utils/gohan_utils.py index bc7fb0e..ea3f0e0 100644 --- a/bento_beacon/utils/gohan_utils.py +++ b/bento_beacon/utils/gohan_utils.py @@ -1,4 +1,4 @@ -from flask import current_app +from flask import current_app, request from .exceptions import APIException, InvalidQuery, NotImplemented from ..authz.access import create_access_header_or_fall_back import requests @@ -177,6 +177,7 @@ def gohan_results(url, gohan_args): def gohan_network_call(url, gohan_args): c = current_app.config + try: r = requests.get( url, diff --git a/bento_beacon/utils/katsu_utils.py b/bento_beacon/utils/katsu_utils.py index 7839553..41b6c32 100644 --- a/bento_beacon/utils/katsu_utils.py +++ b/bento_beacon/utils/katsu_utils.py @@ -8,6 +8,8 @@ from ..authz.access import create_access_header_or_fall_back from ..authz.headers import auth_header_from_request +RequiresAuthOptions = Literal["none", "forwarded", "full"] + def katsu_filters_query(beacon_filters, datatype, get_biosample_ids=False): payload = katsu_json_payload(beacon_filters, datatype, get_biosample_ids) @@ -81,7 +83,7 @@ def katsu_network_call(payload, endpoint=None): # used for GET calls at particular katsu endpoints, eg /biosamples -def katsu_get(endpoint, id=None, query="", requires_auth: Literal["none", "forwarded", "full"] = "none"): +def katsu_get(endpoint, id=None, query="", requires_auth: RequiresAuthOptions = "none"): c = current_app.config katsu_base_url = c["KATSU_BASE_URL"] timeout = c["KATSU_TIMEOUT"] @@ -131,9 +133,10 @@ def search_from_config(config_filters): return response.get("matches", []) -def get_katsu_config_search_fields(): - # Use forwarded auth for getting available search fields, which may be limited based on access level - fields = katsu_get(current_app.config["KATSU_PUBLIC_CONFIG_ENDPOINT"], requires_auth="forwarded") +def get_katsu_config_search_fields(requires_auth: RequiresAuthOptions): + # standard forwarded auth for normal beacon requests + # "none" auth for beacon network init, which does not have a request context + fields = katsu_get(current_app.config["KATSU_PUBLIC_CONFIG_ENDPOINT"], requires_auth=requires_auth) current_app.config["KATSU_CONFIG_SEARCH_FIELDS"] = fields return fields @@ -220,7 +223,7 @@ def katsu_resources_to_beacon_resource(r): def katsu_config_filtering_terms(): filtering_terms = [] - sections = get_katsu_config_search_fields().get("sections", []) + sections = get_katsu_config_search_fields(required_auth="forwarded").get("sections", []) for section in sections: for field in section["fields"]: filtering_term = {