From a17fe6dbf7b3b371a92f54ac38a8a861bbbdc5d7 Mon Sep 17 00:00:00 2001 From: mouffok <sarah.mouffok@epfl.ch> Date: Wed, 7 Feb 2024 15:21:58 +0100 Subject: [PATCH] fixes and notebook update --- .../use-cases/BBP KG Forge retrieval.ipynb | 185 ++++++++++++++++++ .../specializations/stores/bluebrain_nexus.py | 133 ++++++++----- 2 files changed, 267 insertions(+), 51 deletions(-) create mode 100644 examples/notebooks/use-cases/BBP KG Forge retrieval.ipynb diff --git a/examples/notebooks/use-cases/BBP KG Forge retrieval.ipynb b/examples/notebooks/use-cases/BBP KG Forge retrieval.ipynb new file mode 100644 index 00000000..7e57e7b6 --- /dev/null +++ b/examples/notebooks/use-cases/BBP KG Forge retrieval.ipynb @@ -0,0 +1,185 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "2c4aec45-da10-4b11-8469-b0e62f8a0c31", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import json\n", + "\n", + "from kgforge.core import KnowledgeGraphForge\n", + "from kgforge.specializations.resources import Dataset" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "0ac8879d-b05b-482c-94b6-e46437291854", + "metadata": {}, + "outputs": [ + { + "name": "stdin", + "output_type": "stream", + "text": [ + " ········\n" + ] + } + ], + "source": [ + "import getpass\n", + "TOKEN = getpass.getpass()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "eb70f64b-7ca2-4c71-a725-87cfd7ce2e1d", + "metadata": {}, + "outputs": [], + "source": [ + "BUCKET = \"dke/kgforge\"\n", + "\n", + "forge = KnowledgeGraphForge(\n", + " \"../use-cases/prod-forge-nexus.yml\",\n", + " bucket=BUCKET,\n", + " token=TOKEN\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "826d1183-6898-4977-b7ee-9518e4095fef", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Execution 0 - Value: id - Location: same bucket - Cross bucket: True - Retrieve source: True\n", + "rev 3\n", + "______________________\n", + "Execution 1 - Value: self - Location: same bucket - Cross bucket: True - Retrieve source: True\n", + "rev 3\n", + "______________________\n", + "Execution 2 - Value: id - Location: other bucket - Cross bucket: True - Retrieve source: True\n", + "<action> catch_http_error\n", + "<error> RetrievalError: failed to resolve 'https://bbp.epfl.ch/neurosciencegraph/data/9d64dc0d-07d1-4624-b409-cdc47ccda212' (latest) using resolvers of project 'dke/kgforge'\n", + "\n", + "Not found\n", + "______________________\n", + "Execution 3 - Value: id - Location: same bucket - Cross bucket: True - Retrieve source: False\n", + "rev 3\n", + "______________________\n", + "Execution 4 - Value: self - Location: same bucket - Cross bucket: True - Retrieve source: False\n", + "rev 3\n", + "______________________\n", + "Execution 5 - Value: id - Location: other bucket - Cross bucket: True - Retrieve source: False\n", + "<action> catch_http_error\n", + "<error> RetrievalError: failed to resolve 'https://bbp.epfl.ch/neurosciencegraph/data/9d64dc0d-07d1-4624-b409-cdc47ccda212' (latest) using resolvers of project 'dke/kgforge'\n", + "\n", + "Not found\n", + "______________________\n", + "Execution 6 - Value: id - Location: same bucket - Cross bucket: False - Retrieve source: True\n", + "rev 3\n", + "______________________\n", + "Execution 7 - Value: self - Location: same bucket - Cross bucket: False - Retrieve source: True\n", + "rev 3\n", + "______________________\n", + "Execution 8 - Value: id - Location: other bucket - Cross bucket: False - Retrieve source: True\n", + "<action> catch_http_error\n", + "<error> RetrievalError: resource 'https://bbp.epfl.ch/neurosciencegraph/data/9d64dc0d-07d1-4624-b409-cdc47ccda212' not found in project 'dke/kgforge'\n", + "\n", + "Not found\n", + "______________________\n", + "Execution 9 - Value: id - Location: same bucket - Cross bucket: False - Retrieve source: False\n", + "rev 3\n", + "______________________\n", + "Execution 10 - Value: self - Location: same bucket - Cross bucket: False - Retrieve source: False\n", + "rev 3\n", + "______________________\n", + "Execution 11 - Value: id - Location: other bucket - Cross bucket: False - Retrieve source: False\n", + "<action> catch_http_error\n", + "<error> RetrievalError: 404 Client Error: Not Found for url: https://bbp.epfl.ch/nexus/v1/resources/dke/kgforge/_/https%3A%2F%2Fbbp.epfl.ch%2Fneurosciencegraph%2Fdata%2F9d64dc0d-07d1-4624-b409-cdc47ccda212\n", + "\n", + "Not found\n", + "______________________\n" + ] + } + ], + "source": [ + "id_ = \"https://bbp.epfl.ch/nexus/v1/resources/dke/kgforge/_/20fbc97a-fb26-43ff-8093-9136aab25dff\"\n", + "self_ = \"https://bbp.epfl.ch/nexus/v1/resources/dke/kgforge/_/https:%2F%2Fbbp.epfl.ch%2Fnexus%2Fv1%2Fresources%2Fdke%2Fkgforge%2F_%2F20fbc97a-fb26-43ff-8093-9136aab25dff\"\n", + "\n", + "id_other_bucket = \"https://bbp.epfl.ch/neurosciencegraph/data/9d64dc0d-07d1-4624-b409-cdc47ccda212\"\n", + "self_other_bucket = \"https://bbp.epfl.ch/nexus/v1/resources/bbp/atlas/_/https:%2F%2Fbbp.epfl.ch%2Fneurosciencegraph%2Fdata%2Fallen_ccfv3_spatial_reference_system\"\n", + "\n", + "all_ret = [\n", + " (id_, \"id\", \"same bucket\"), \n", + " (self_, \"self\", \"same bucket\"), \n", + " (id_other_bucket, \"id\", \"other bucket\"), \n", + "]\n", + "\n", + "\n", + "i = 0\n", + "\n", + "for cb in [True, False]:\n", + " for rs in [True, False]:\n", + " for (el, type_, loc) in all_ret:\n", + "\n", + " print(\n", + " f\"Execution {i}\", \"-\", \n", + " f\"Value: {type_}\", \"-\",\n", + " f\"Location: {loc}\", \"-\", \n", + " f\"Cross bucket: {cb}\", \"-\", \n", + " f\"Retrieve source: {rs}\"\n", + " )\n", + "\n", + " e = forge.retrieve(el, cross_bucket=cb, retrieve_source=rs)\n", + "\n", + " if not e:\n", + " print(\"Not found\")\n", + " else:\n", + " print(\"rev\", e._store_metadata._rev)\n", + " print(\"______________________\")\n", + " i += 1" + ] + }, + { + "cell_type": "markdown", + "id": "32a9eb99-82d0-41c0-be8d-b4389698bc58", + "metadata": {}, + "source": [ + "Failures: \n", + "- 2 - \n", + "- 5 - \n", + "- 8 - To be expected: id from other bucket, and cross_bucket = False\n", + "- 11 - To be expected: id from other bucket, and cross_bucket = False" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "forge_venv", + "language": "python", + "name": "venv" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.17" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/kgforge/specializations/stores/bluebrain_nexus.py b/kgforge/specializations/stores/bluebrain_nexus.py index ca30d488..4cac0124 100644 --- a/kgforge/specializations/stores/bluebrain_nexus.py +++ b/kgforge/specializations/stores/bluebrain_nexus.py @@ -277,6 +277,73 @@ def _local_parse(id_value, version_params) -> Tuple[str, Dict]: return id_without_query, query_params + def _retrieve_id( + self, id_, retrieve_source: bool, cross_bucket: bool, query_params: Dict + ): + + url_base = self.service.url_resolver if cross_bucket else self.service.url_resources + + url_resource = Service.add_schema_and_id_to_endpoint( + url_base, schema_id=None, resource_id=id_ + ) + # and not cross_bucket: if cross_bucket, no support for /source and metadata. + # So this will fetch the right metadata. The source data will be fetched later + url = f"{url_resource}/source" if retrieve_source and not cross_bucket else url_resource + + response = requests.get( + url, params=query_params, headers=self.service.headers, timeout=REQUEST_TIMEOUT + ) + catch_http_error_nexus(response, RetrievalError) + + try: + data = response.json() + resource = self.service.to_resource(data) + except Exception as e: + raise RetrievalError(e) from e + + # retrieve_source = False = metadata in payload + # retrieve_source = True and cross_bucket = False: metadata in payload with annotate = True + # Used to be a call without /source, and then synchronize_resource + # retrieve_source = True, and cross_bucket = True: no metadata. To fetch separately. + # Used to be a call to the _self/source and then service.to_resource + + # specific case that requires additional fetching of data without source + if not (retrieve_source and cross_bucket): + return resource + + _self = data.get("_self", None) + + if _self: + response_source = requests.get( + url=f"{_self}/source", + params=query_params, headers=self.service.headers, timeout=REQUEST_TIMEOUT + ) + catch_http_error_nexus(response, RetrievalError) + resource = self.service.to_resource(response_source.json()) + self.service.synchronize_resource( + resource, data, self.retrieve.__name__, True, True + ) + return resource + + raise RetrievalError("Cannot find metadata in payload") + + + def _retrieve_self( + self, self_, retrieve_source: bool, query_params: Dict + ) -> Resource: + url = f"{self_}/source" if retrieve_source else self_ + + response = requests.get( + url, params=query_params, headers=self.service.headers, timeout=REQUEST_TIMEOUT + ) + catch_http_error_nexus(response, RetrievalError) + + try: + data = response.json() + return self.service.to_resource(data) + except Exception as e: + raise RetrievalError(e) from e + def retrieve( self, id_: str, version: Optional[Union[int, str]], cross_bucket: bool = False, **params ) -> Optional[Resource]: @@ -311,67 +378,31 @@ def retrieve( if retrieve_source: query_params.update({"annotate": True}) - url_base = self.service.url_resolver if cross_bucket else self.service.url_resources - - url_resource = Service.add_schema_and_id_to_endpoint( - url_base, schema_id=None, resource_id=id_without_query - ) - - url = f"{url_resource}/source" if retrieve_source else url_resource - try: - response = requests.get( - url, params=query_params, headers=self.service.headers, - timeout=REQUEST_TIMEOUT - ) - catch_http_error_nexus(response, RetrievalError) + return self._retrieve_id( + id_=id_without_query, retrieve_source=retrieve_source, + cross_bucket=cross_bucket, query_params=query_params + ) # either for data or metadata. Metadata if cross_bucket and retrieve_source except RetrievalError as er: # without org and proj, vs with - nexus_path = f"{self.service.endpoint}/resources/" if cross_bucket else self.service.url_resources + nexus_path_no_bucket = f"{self.service.endpoint}/resources/" + nexus_path = nexus_path_no_bucket if cross_bucket else self.service.url_resources - # Try to use the id as it was given - if not id_without_query.startswith(nexus_path): + if not id_without_query.startswith(nexus_path_no_bucket): raise er - url = f"{id_without_query}/source" if retrieve_source else id_without_query - - response = requests.get( - url, params=query_params, headers=self.service.headers, timeout=REQUEST_TIMEOUT - ) - catch_http_error_nexus(response, RetrievalError) - - if not response: - return None - - try: - data = response.json() - resource = self.service.to_resource(data) - except Exception as e: - raise RetrievalError(e) from e - - # retrieve_source = False = metadata in payload - # retrieve_source = True and cross_bucket = False: metadata in payload with annotate = True - # Used to be a call without /source, and then synchronize_resource - # retrieve_source = True, and cross_bucket = True: no metadata. To fetch separately. - # Used to be a call to the _self/source and then service.to_resource - - if not (retrieve_source and cross_bucket): - return resource - - # Retrieving with resolvers and /source doesn't support annotate=True - - _self = data.get("_self", None) + if not id_without_query.startswith(nexus_path): + raise RetrievalError( + "Provided resource identifier is not inside the current bucket, " + "use cross_bucket=True to be able to retrieve it" + ) - if _self: - response_metadata = requests.get( - url=f"{_self}/source", - params=query_params, headers=self.service.headers, timeout=REQUEST_TIMEOUT + # Try to use the id as it was given + return self._retrieve_self( + self_=id_without_query, retrieve_source=retrieve_source, query_params=query_params ) - catch_http_error_nexus(response, RetrievalError) - return self.service.to_resource(response_metadata.json()) - return None def _retrieve_filename(self, id_: str) -> Tuple[str, str]: response = requests.get(id_, headers=self.service.headers, timeout=REQUEST_TIMEOUT)