From ff619560e527423374febf57cd412d3c7787d917 Mon Sep 17 00:00:00 2001 From: Sarah Date: Wed, 2 Oct 2024 11:09:44 +0200 Subject: [PATCH] Simplify retrieve (#420) * parallelize validate * use fixed annotate query param from delta to reduce retrieval to max 2 requests instead of 4 * rm redundant * drop async request for one retrieve * force close tcp connection * client session per task * retrieve schema when loading shapes, todo figure out when to call retrieve for ontologies * retrieve schema, if fail retrieve resource * fix flag * rm retrieve schema specialized call after extension of delta's retrieve resource endpoint * batch resources per client session * cleanup * session timeout * rm threadpool exec * cleanup * lint * bump request v * rm retrieve schema implementation --- kgforge/specializations/models/rdf_model.py | 2 +- .../specializations/stores/bluebrain_nexus.py | 328 +++++++----------- .../stores/nexus/batch_request_handler.py | 141 +++++--- setup.py | 2 +- 4 files changed, 222 insertions(+), 251 deletions(-) diff --git a/kgforge/specializations/models/rdf_model.py b/kgforge/specializations/models/rdf_model.py index 4ad3f639..98c100c1 100644 --- a/kgforge/specializations/models/rdf_model.py +++ b/kgforge/specializations/models/rdf_model.py @@ -64,7 +64,7 @@ DEFAULT_TYPE_ORDER = [str, float, int, bool, datetime.date, datetime.time] -VALIDATION_PARALLELISM = None +VALIDATION_PARALLELISM = 10 class RdfModel(Model): diff --git a/kgforge/specializations/stores/bluebrain_nexus.py b/kgforge/specializations/stores/bluebrain_nexus.py index 83c6a0fc..4a9f603c 100644 --- a/kgforge/specializations/stores/bluebrain_nexus.py +++ b/kgforge/specializations/stores/bluebrain_nexus.py @@ -26,6 +26,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union, Type, Callable from urllib.parse import quote_plus, unquote, urlparse, parse_qs +import aiohttp import requests from aiohttp import ClientSession, MultipartWriter, hdrs, ClientResponseError from aiohttp.hdrs import CONTENT_DISPOSITION, CONTENT_TYPE @@ -270,150 +271,77 @@ def _local_url_parse(id_value, version_params) -> Tuple[str, Dict]: return id_without_query, query_params - async def _retrieve_id( - self, - session, - id_, - retrieve_source: bool, - cross_bucket: bool, - query_params: Dict, - ): - """ - Retrieves assuming the provided identifier really is the id - """ - url_base = ( - self.service.url_resolver if cross_bucket else self.service.url_resources - ) + def _get_resource_sync(self, url: str, query_params: Dict) -> Resource: - url_resource = Service.add_schema_and_id_to_endpoint( - url_base, schema_id=None, resource_id=id_ + response = requests.request( + method=hdrs.METH_GET, + url=url, + headers=self.service.headers, + params=query_params, ) - # 4 cases depending on the value of retrieve_source and cross_bucket: - # retrieve_source = False and cross_bucket = True: metadata in payload - # retrieve_source = False and cross_bucket = False: metadata in payload - # retrieve_source = True and cross_bucket = False: metadata in payload with annotate = True - # retrieve_source = True and cross_bucket = True: - # Uses the resolvers endpoint. No metadata if retrieving_source. - # https://github.com/BlueBrain/nexus/issues/4717 To fetch separately. - # Solution: first API call used to retrieve metadata - # afterwards, second API call to retrieve data - - # TODO temporary - # url = f"{url_resource}/source" if retrieve_source else url_resource - # - # # if cross_bucket, no support for /source and metadata. - # # So this will fetch the right metadata. The source data will be fetched later - # if cross_bucket: - # url = url_resource - - url = url_resource - - async with session.request( - method=hdrs.METH_GET, url=url, headers=self.service.headers, params=query_params - ) as response_not_source_with_metadata: - # turns the retrieved data into a resource - not_source_with_metadata = await response_not_source_with_metadata.json() catch_http_error_nexus( - response_not_source_with_metadata, RetrievalError, aiohttp_error=True + response, RetrievalError, aiohttp_error=False ) - try: - # TODO temporary - # if not (retrieve_source and cross_bucket): - # return self.service.to_resource(not_source_with_metadata) + response_json = response.json() - if not retrieve_source: - return self.service.to_resource(not_source_with_metadata) + try: + resource = self.service.to_resource(response_json) + self.service.synchronize_resource( + resource, None, self.retrieve.__name__, True, True + ) + return resource except Exception as e: raise RetrievalError(e) from e - # specific case that requires additional fetching of data with source - _self = not_source_with_metadata.get("_self", None) - - # Retrieves the appropriate data if retrieve_source = True - if _self: - - return await self._merge_metadata_with_source_data( - session, _self, not_source_with_metadata, query_params - ) - - raise RetrievalError("Cannot find metadata in payload") - - async def _merge_metadata_with_source_data( - self, session, _self, data_not_source_with_metadata, query_params - ): - - async with session.request( - method=hdrs.METH_GET, - url=f"{_self}/source", - headers=self.service.headers, - params=query_params, - ) as response_source: - # turns the retrieved data into a resource - data_source = await response_source.json() - - catch_http_error_nexus(response_source, RetrievalError, aiohttp_error=True) - - resource = self.service.to_resource(data_source) - # uses the metadata of the first call - self.service.synchronize_resource( - resource, data_not_source_with_metadata, self.retrieve.__name__, True, True - ) - return resource - - async def _retrieve_self( - self, session, self_, retrieve_source: bool, query_params: Dict + async def _get_resource_async( + self, session: ClientSession, url: str, query_params: Dict ) -> Resource: - """ - Retrieves assuming the provided identifier is actually the resource's _self field - """ - # TODO temporary - # url = f"{self_}/source" if retrieve_source else self_ - url = self_ async with session.request( method=hdrs.METH_GET, url=url, headers=self.service.headers, params=query_params, - ) as response_not_source_with_metadata: - # turns the retrieved data into a resource - not_source_with_metadata = await response_not_source_with_metadata.json() + ) as response: - catch_http_error_nexus( - response_not_source_with_metadata, RetrievalError, aiohttp_error=True - ) + catch_http_error_nexus( + response, RetrievalError, aiohttp_error=True + ) + + response_json = await response.json() try: - if not retrieve_source: - return self.service.to_resource(not_source_with_metadata) + resource = self.service.to_resource(response_json) + self.service.synchronize_resource( + resource, None, self.retrieve.__name__, True, True + ) + return resource except Exception as e: raise RetrievalError(e) from e - return await self._merge_metadata_with_source_data( - session, self_, not_source_with_metadata, query_params - ) - def _retrieve_one( self, id_: str, version: Optional[Union[int, str]], cross_bucket: bool, **params ): - loop = asyncio.get_event_loop() - - async def do(): - async with ClientSession() as session: - return await self._retrieve( - semaphore=asyncio.Semaphore(1), - session=session, - id_=id_, - version=version, - cross_bucket=cross_bucket, - **params, - ) - return loop.run_until_complete(do()) + url, query_params = self._make_get_resource_url( + by_id=True, id_=id_, version=version, cross_bucket=cross_bucket, **params + ) + + try: + resource = self._get_resource_sync(url=url, query_params=query_params) + except RetrievalError as er: + + url, query_params = self._make_get_resource_url( + by_id=False, raise_=er, id_=id_, version=version, cross_bucket=cross_bucket, **params + ) + + resource = self._get_resource_sync(url=url, query_params=query_params) + + return resource def _retrieve_many( self, @@ -421,55 +349,77 @@ def _retrieve_many( versions: List[Optional[Union[int, str]]], cross_bucket: bool, **params, - ) -> List[Optional[Resource]]: + ) -> List[Union[Resource, Action]]: - def create_tasks( + def retrieve_done_callback(task: Task): + result = task.result() + + succeeded = not isinstance(result, Action) + + if isinstance(result, Resource): + self.service.synchronize_resource( + resource=result, + response=None, + action_name=self, + succeeded=succeeded, + synchronized=succeeded, + ) + + async def create_tasks( semaphore: asyncio.Semaphore, - session: ClientSession, loop: AbstractEventLoop, ids_: List[Any], service, **kwargs, - ) -> List[asyncio.Task]: + ) -> Tuple[List[asyncio.Task], List[ClientSession]]: - vs = kwargs["versions"] - tasks = [] + async def do_catch(id_, version, session) -> Union[Resource, Action]: + try: - def retrieve_done_callback(task: Task): - result = task.result() + url, query_params = self._make_get_resource_url( + by_id=True, id_=id_, version=version, cross_bucket=cross_bucket, **params + ) - succeeded = not isinstance(result, Action) + async with semaphore: - if isinstance(result, Resource): - self.service.synchronize_resource( - resource=result, - response=None, - action_name=self, - succeeded=succeeded, - synchronized=succeeded, - ) + try: + resource = await self._get_resource_async( + session=session, + url=url, + query_params=query_params + ) + except RetrievalError as er: + + url, query_params = self._make_get_resource_url( + by_id=False, raise_=er, id_=id_, version=version, cross_bucket=cross_bucket, **params + ) + resource = await self._get_resource_async( + session=session, + url=url, + query_params=query_params + ) - async def do_catch(id_, version): - try: - resource = await self._retrieve( - semaphore=semaphore, - session=session, - service=service, - id_=id_, - version=version, - **kwargs, - ) return resource + except RetrievalError as e: return Action(self._retrieve_many.__name__, False, e) - for id_, version in zip(ids_, vs): - batch_result = do_catch(id_, version) - prepared_request: asyncio.Task = loop.create_task(batch_result) - prepared_request.add_done_callback(retrieve_done_callback) - tasks.append(prepared_request) + vs = kwargs["versions"] + tasks = [] + sessions = [] + for batch_i in BatchRequestHandler.batch(list(zip(ids_, vs))): + + session = ClientSession() + sessions.append(session) - return tasks + for id_, version in batch_i: + batch_result = do_catch(id_, version, session) + prepared_request: asyncio.Task = loop.create_task(batch_result) + + prepared_request.add_done_callback(retrieve_done_callback) + tasks.append(prepared_request) + + return tasks, sessions batch_results = BatchRequestHandler.batch_request( service=self.service, @@ -517,27 +467,15 @@ def retrieve( return self._retrieve_many(ids, versions, cross_bucket, **params) - # TODO service.to_resource probably makes requests of its own and should have a callback in prepare_done - async def _retrieve( + def _make_get_resource_url( self, - semaphore: asyncio.Semaphore, - session: ClientSession, + by_id: bool, id_: str, version: Optional[Union[int, str]], cross_bucket: bool = False, + raise_: Optional[Exception] = None, **params, - ) -> Optional[Resource]: - """ - Retrieve a resource by its identifier from the configured store and possibly at a given version. - - :param id_: the resource identifier to retrieve - :param version: a version of the resource to retrieve - :param cross_bucket: instructs the configured store to whether search beyond the configured bucket (True) or not (False) - :param params: a dictionary of parameters. Supported parameters are: - [retrieve_source] whether to retrieve the resource payload as registered in the last update - (default: True) - :return: Resource - """ + ) -> Tuple[str, Dict]: if version is not None: if isinstance(version, int): @@ -555,43 +493,43 @@ async def _retrieve( retrieve_source = params.get("retrieve_source", True) - # if retrieve_source: - # query_params.update({"annotate": True}) - - async with semaphore: - try: - return await self._retrieve_id( - session=session, - id_=id_without_query, - retrieve_source=retrieve_source, - cross_bucket=cross_bucket, - query_params=query_params, - ) - except RetrievalError as er: + # add_resource_id_to_endpoint - # without org and proj, vs with - nexus_path_no_bucket = f"{self.service.endpoint}/resources/" - nexus_path = ( - nexus_path_no_bucket if cross_bucket else self.service.url_resources - ) + if retrieve_source: + query_params.update({"annotate": "true"}) + + if by_id: + url_base = ( + self.service.url_resolver if cross_bucket else self.service.url_resources + ) - if not id_without_query.startswith(nexus_path_no_bucket): - raise er + url_resource = Service.add_schema_and_id_to_endpoint( + url_base, schema_id=None, resource_id=id_without_query + ) - if not id_without_query.startswith(nexus_path): - raise RetrievalError( - f"Provided resource identifier {id_} is not inside the current bucket, " - "use cross_bucket=True to be able to retrieve it" - ) + url = f"{url_resource}/source" if retrieve_source else url_resource - # Try to use the id as it was given - return await self._retrieve_self( - session=session, - self_=id_without_query, - retrieve_source=retrieve_source, - query_params=query_params, + return url, query_params + else: + # without org and proj, vs with + nexus_path_no_bucket = f"{self.service.endpoint}/resources/" + nexus_path = ( + nexus_path_no_bucket if cross_bucket else self.service.url_resources + ) + + if not id_without_query.startswith(nexus_path_no_bucket): + raise raise_ + + if not id_without_query.startswith(nexus_path): + raise RetrievalError( + f"Provided resource identifier {id_} is not inside the current bucket, " + "use cross_bucket=True to be able to retrieve it" ) + self_ = id_without_query + url = f"{self_}/source" if retrieve_source else self_ + return url, query_params + def _retrieve_file_metadata(self, id_: str) -> Dict: response = requests.get( id_, headers=self.service.headers, timeout=REQUEST_TIMEOUT diff --git a/kgforge/specializations/stores/nexus/batch_request_handler.py b/kgforge/specializations/stores/nexus/batch_request_handler.py index 122fd646..ad4acbec 100644 --- a/kgforge/specializations/stores/nexus/batch_request_handler.py +++ b/kgforge/specializations/stores/nexus/batch_request_handler.py @@ -3,9 +3,11 @@ import json import asyncio -from typing import Callable, Dict, List, Optional, Tuple, Type, Any +from typing import Callable, Dict, List, Optional, Tuple, Type, Any, Coroutine, Union +from kgforge.core.commons.actions import Action from kgforge.core.commons.constants import DEFAULT_REQUEST_TIMEOUT + from typing_extensions import Unpack from aiohttp import ClientSession, ClientTimeout @@ -21,29 +23,45 @@ class BatchRequestHandler: + BATCH_SIZE = 80 + + @staticmethod + def batch(iterable, n=BATCH_SIZE): + + length_v = len(iterable) + for ndx in range(0, length_v, n): + yield iterable[ndx:min(ndx + n, length_v)] @staticmethod def batch_request( service: Service, data: List[Any], task_creator: Callable[ - [asyncio.Semaphore, ClientSession, AbstractEventLoop, List[Any], Service, Unpack[Any]], - List[asyncio.Task] + [asyncio.Semaphore, AbstractEventLoop, List[Any], Service, Unpack[Any]], + Coroutine[Any, Any, Tuple[List[asyncio.Task], List[ClientSession]]] ], **kwargs ): + loop = asyncio.get_event_loop() + async def dispatch_action(): semaphore = asyncio.Semaphore(service.max_connection) - loop = asyncio.get_event_loop() - async with ClientSession(timeout=ClientTimeout(total=BATCH_REQUEST_TIMEOUT_PER_REQUEST)) as session: - tasks = task_creator( - semaphore, session, loop, data, service, **kwargs - ) - return await asyncio.gather(*tasks) + tasks, sessions = await task_creator( + semaphore, loop, data, service, **kwargs + ) + + return await asyncio.gather(*tasks), sessions - return asyncio.run(dispatch_action()) + async def close_sessions(sesss): + for sess in sesss: + await sess.close() + + res, sessions = asyncio.run(dispatch_action()) + closing_task = loop.create_task(close_sessions(sessions)) + asyncio.run(closing_task) + return res @staticmethod def batch_request_on_resources( @@ -53,67 +71,82 @@ def batch_request_on_resources( ['Service', Resource, Dict, Unpack[Any]], Tuple[str, str, Resource, Type[RunException], Dict, Optional[Dict], Optional[Dict]] ], - callback: Callable, + callback: Optional[Callable] = None, **kwargs ) -> BatchResults: + async def create_tasks_for_resources( + semaphore: asyncio.Semaphore, + loop: AbstractEventLoop, + resources: List[Resource], + service, + **kwargs + ) -> Tuple[List[asyncio.Task], List[ClientSession]]: + + prepare_function = kwargs["prepare_function"] + callback = kwargs["callback"] + + async def request(resource: Optional[Resource], client_session: ClientSession) -> BatchResult: + + method, url, resource, exception, headers, params, payload = prepare_function( + service, resource, **kwargs + ) + + async with semaphore: + + try: + async with client_session.request( + method=method, + url=url, + headers=headers, + data=json.dumps(payload, ensure_ascii=True), + params=params + ) as response: + content = await response.json() + if response.status < 400: + return BatchResult(resource, content) + + error = exception(_error_message(content)) + return BatchResult(resource, error) + + except Exception as e: + return BatchResult(resource, exception(str(e))) + + return BatchRequestHandler.create_tasks_and_sessions( + loop, resources, request, callback + ) + return BatchRequestHandler.batch_request( service=service, data=resources, - task_creator=BatchRequestHandler.create_tasks_for_resources, + task_creator=create_tasks_for_resources, prepare_function=prepare_function, callback=callback, **kwargs ) @staticmethod - def create_tasks_for_resources( - semaphore: asyncio.Semaphore, - session: ClientSession, + def create_tasks_and_sessions( loop: AbstractEventLoop, - resources: List[Resource], - service, - **kwargs - ) -> List[asyncio.Task]: - - prepare_function = kwargs["prepare_function"] - callback = kwargs["callback"] - - async def request(resource: Optional[Resource]) -> BatchResult: - - method, url, resource, exception, headers, params, payload = prepare_function( - service, resource, **kwargs - ) - - async with semaphore: - try: - async with session.request( - method=method, - url=url, - headers=headers, - data=json.dumps(payload, ensure_ascii=True), - params=params, - ) as response: - content = await response.json() - if response.status < 400: - return BatchResult(resource, content) - - error = exception(_error_message(content)) - return BatchResult(resource, error) - - except asyncio.exceptions.TimeoutError as timeout_error: - - return BatchResult(resource, exception(str(timeout_error))) + elements: List[Union[str, Resource]], + fc: Callable[[Union[str, Resource], ClientSession], Coroutine[Any, Any, Union[Resource, Action, BatchResult]]], + callback: Optional[Callable] + ) -> Tuple[List[asyncio.Task], List[ClientSession]]: tasks = [] + sessions = [] + + for batch_i in BatchRequestHandler.batch(elements): - for res in resources: + session = ClientSession(timeout=ClientTimeout(BATCH_REQUEST_TIMEOUT_PER_REQUEST)) + sessions.append(session) - prepared_request: asyncio.Task = loop.create_task(request(res)) + for res in batch_i: + prepared_request: asyncio.Task = loop.create_task(fc(res, session)) - if callback: - prepared_request.add_done_callback(callback) + if callback: + prepared_request.add_done_callback(callback) - tasks.append(prepared_request) + tasks.append(prepared_request) - return tasks + return tasks, sessions diff --git a/setup.py b/setup.py index 1ecbc072..7c2a8b66 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ "pyparsing>=2.0.2", "owlrl>=5.2.3", "elasticsearch_dsl==7.4.0", - "requests==2.31.0", + "requests==2.32.0", "typing-extensions" ], extras_require={