diff --git a/bento_lib/_internal.py b/bento_lib/_internal.py new file mode 100644 index 0000000..8086e87 --- /dev/null +++ b/bento_lib/_internal.py @@ -0,0 +1,7 @@ +import logging + +__all__ = ["internal_logger"] + +logging.basicConfig(level=logging.NOTSET) + +internal_logger = logging.getLogger("bento_lib") diff --git a/bento_lib/drs/__init__.py b/bento_lib/drs/__init__.py index 3bb6ea5..e69de29 100644 --- a/bento_lib/drs/__init__.py +++ b/bento_lib/drs/__init__.py @@ -1,5 +0,0 @@ -from . import utils - -__all__ = [ - "utils", -] diff --git a/bento_lib/drs/exceptions.py b/bento_lib/drs/exceptions.py new file mode 100644 index 0000000..5b4cb05 --- /dev/null +++ b/bento_lib/drs/exceptions.py @@ -0,0 +1,17 @@ +__all__ = [ + "DrsInvalidScheme", + "DrsRecordNotFound", + "DrsRequestError", +] + + +class DrsInvalidScheme(Exception): + pass + + +class DrsRecordNotFound(Exception): + pass + + +class DrsRequestError(Exception): + pass diff --git a/bento_lib/drs/resolver.py b/bento_lib/drs/resolver.py new file mode 100644 index 0000000..1a38c80 --- /dev/null +++ b/bento_lib/drs/resolver.py @@ -0,0 +1,62 @@ +from datetime import datetime + +from .utils import decode_drs_uri, fetch_drs_record_by_uri, fetch_drs_record_by_uri_async + +__all__ = [ + "DrsResolver", +] + + +class DrsResolver: + def __init__(self, cache_ttl: float = 900.0): + """ + Constructor for a DrsResolver object, which is used to fetch and cache DRS records by their drs://-schemed URI. + :param cache_ttl: How long (in seconds) a cache entry is valid for. + """ + self._cache_ttl: float = cache_ttl + self._drs_record_cache: dict[str, tuple[float, dict]] = {} + + @staticmethod + def decode_drs_uri(drs_uri: str) -> str: + """ + This is just a wrapper for bento_lib.drs.utils.decode_drs_uri. + :param drs_uri: The drs://-schemed URI for the DRS object. + :return: The HTTP URI for the DRS object specified. + """ + return decode_drs_uri(drs_uri) + + def fetch_drs_record_by_uri(self, drs_uri: str) -> dict: + """ + Fetches and, for a time, caches a DRS record using its drs://-schemed URI. + Cache is shared between this function and its asynchronous version. + :param drs_uri: A resolvable drs://-schemed URI for a DRS object record. + :return: The fetched record dictionary. + """ + + now = datetime.now().timestamp() + + cache_record = self._drs_record_cache.get(drs_uri) + if cache_record is not None and now - cache_record[0] <= self._cache_ttl: + return cache_record[1] + + res = fetch_drs_record_by_uri(drs_uri) + self._drs_record_cache[drs_uri] = (now, res) + return res + + async def fetch_drs_record_by_uri_async(self, drs_uri: str) -> dict: + """ + Asynchronously fetches and, for a time, caches a DRS record using its drs://-schemed URI. + Cache is shared between this function and its synchronous version. + :param drs_uri: A resolvable drs://-schemed URI for a DRS object record. + :return: The fetched record dictionary. + """ + + now = datetime.now().timestamp() + + cache_record = self._drs_record_cache.get(drs_uri) + if cache_record is not None and now - cache_record[0] <= self._cache_ttl: + return cache_record[1] + + res = await fetch_drs_record_by_uri_async(drs_uri) + self._drs_record_cache[drs_uri] = (now, res) + return res diff --git a/bento_lib/drs/utils.py b/bento_lib/drs/utils.py index 1b2a93a..67ef135 100644 --- a/bento_lib/drs/utils.py +++ b/bento_lib/drs/utils.py @@ -1,29 +1,18 @@ import aiohttp import requests -import sys from urllib.parse import urlparse -from typing import Optional +from .exceptions import DrsInvalidScheme, DrsRecordNotFound, DrsRequestError __all__ = [ - "DrsInvalidScheme", - "DrsRequestError", - "get_access_method_of_type", "decode_drs_uri", - "fetch_drs_record_by_uri" + "fetch_drs_record_by_uri", + "fetch_drs_record_by_uri_async", ] -class DrsInvalidScheme(Exception): - pass - - -class DrsRequestError(Exception): - pass - - def get_access_method_of_type(drs_object_record: dict, access_type: str, default=None): """ Gets an access method of specified type from a DRS object, if one with the type exists. @@ -35,80 +24,58 @@ def get_access_method_of_type(drs_object_record: dict, access_type: str, default return next((a for a in drs_object_record.get("access_methods", []) if a.get("type", None) == access_type), default) -def decode_drs_uri(drs_uri: str, internal_drs_base_url: Optional[str] = None) -> str: +def decode_drs_uri(drs_uri: str) -> str: """ Given a DRS URI and possibly an override for the DRS service URL, returns the decoded HTTP URL for the DRS object. :param drs_uri: The drs://-schemed URI for the DRS object. - :param internal_drs_base_url: An optional override hard-coded DRS base URL to use, for container networking etc. :return: The HTTP URI for the DRS object specified. """ parsed_drs_uri = urlparse(drs_uri) if parsed_drs_uri.scheme != "drs": - print(f"[Bento Lib] Invalid scheme: '{parsed_drs_uri.scheme}'", file=sys.stderr, flush=True) raise DrsInvalidScheme(f"Encountered invalid DRS scheme: {parsed_drs_uri.scheme}") - drs_base_path = internal_drs_base_url.rstrip("/") if internal_drs_base_url else f"https://{parsed_drs_uri.netloc}" - return f"{drs_base_path}/ga4gh/drs/v1/objects/{parsed_drs_uri.path.split('/')[-1]}" + return f"https://{parsed_drs_uri.netloc}/ga4gh/drs/v1/objects/{parsed_drs_uri.path.split('/')[-1]}" -def fetch_drs_record_by_uri(drs_uri: str, internal_drs_base_url: Optional[str] = None) -> Optional[dict]: +def fetch_drs_record_by_uri(drs_uri: str) -> dict: """ Given a URI in the format drs:///, decodes it into an HTTP URL and fetches the object metadata. :param drs_uri: The URI of the object to fetch. - :param internal_drs_base_url: An optional override hard-coded DRS base URL to use, for container networking etc. :return: The fetched DRS object metadata. """ - # TODO: Translation dictionary for internal DRS hostnames, to avoid overriding EVERY DRS host. + decoded_object_uri = decode_drs_uri(drs_uri) - decoded_object_uri = decode_drs_uri(drs_uri, internal_drs_base_url) - print(f"[Bento Lib] Attempting to fetch {decoded_object_uri}", flush=True) - params = {"internal_path": "true"} if internal_drs_base_url else {} - drs_res = requests.get(decoded_object_uri, params=params) + drs_res = requests.get(decoded_object_uri) - if drs_res.status_code != 200: - print(f"[Bento Lib] Could not fetch: '{decoded_object_uri}'", file=sys.stderr, flush=True) - print(f"\tAttempted URL: {decoded_object_uri} (status: {drs_res.status_code})", file=sys.stderr, flush=True) + if drs_res.status_code == 404: + raise DrsRecordNotFound(f"Could not find DRS record at '{decoded_object_uri}'") + elif drs_res.status_code != 200: raise DrsRequestError(f"Could not fetch '{decoded_object_uri}' (status: {drs_res.status_code})") - # TODO: Handle JSON parse errors - # TODO: Schema for DRS response - return drs_res.json() -async def fetch_drs_record_by_uri_async( - drs_uri: str, - internal_drs_base_url: Optional[str] = None, - session_kwargs: dict | None = None, -) -> Optional[dict]: +async def fetch_drs_record_by_uri_async(drs_uri: str, session_kwargs: dict | None = None) -> dict: """ Given a URI in the format drs:///, decodes it into an HTTP URL and asynchronously fetches the object metadata. :param drs_uri: The URI of the object to fetch. - :param internal_drs_base_url: An optional override hard-coded DRS base URL to use, for container networking etc. :param session_kwargs: Optional dictionary of parameters to pass to the aiohttp.ClientSession constructor. :return: The fetched DRS object metadata. """ - # TODO: Translation dictionary for internal DRS hostnames, to avoid overriding EVERY DRS host. + decoded_object_uri = decode_drs_uri(drs_uri) - decoded_object_uri = decode_drs_uri(drs_uri, internal_drs_base_url) - print(f"[Bento Lib] Attempting to fetch {decoded_object_uri}", flush=True) - - params = {"internal_path": "true"} if internal_drs_base_url else {} async with aiohttp.ClientSession(**(session_kwargs or {})) as session: - async with session.get(decoded_object_uri, params=params) as drs_res: - if drs_res.status != 200: - print(f"[Bento Lib] Could not fetch: '{decoded_object_uri}'", file=sys.stderr, flush=True) - print(f"\tAttempted URL: {decoded_object_uri} (status: {drs_res.status})", file=sys.stderr, flush=True) + async with session.get(decoded_object_uri) as drs_res: + if drs_res.status == 404: + raise DrsRecordNotFound(f"Could not find DRS record at '{decoded_object_uri}'") + elif drs_res.status != 200: raise DrsRequestError(f"Could not fetch '{decoded_object_uri}' (status: {drs_res.status})") - # TODO: Handle JSON parse errors - # TODO: Schema for DRS response - return await drs_res.json() diff --git a/bento_lib/responses/errors.py b/bento_lib/responses/errors.py index 5c19821..6712299 100644 --- a/bento_lib/responses/errors.py +++ b/bento_lib/responses/errors.py @@ -1,8 +1,10 @@ +import logging from datetime import datetime, timezone from functools import partial +from typing import Callable from werkzeug.http import HTTP_STATUS_CODES -from typing import Callable +from bento_lib._internal import internal_logger __all__ = [ @@ -32,6 +34,7 @@ def http_error( drs_compat: bool = False, sr_compat: bool = False, beacon_meta_callback: Callable[[], dict] | None = None, + logger: logging.Logger | None = None, ): """ Builds a dictionary for an HTTP error JSON response. @@ -41,16 +44,19 @@ def http_error( :param sr_compat: Whether to generate a GA4GH Service Registry backwards-compatible response. :param beacon_meta_callback: Callback for generating GA4GH Beacon V2 backwards-compatible meta field for error response. If this is specified, Beacon V2-compatible errors will be enabled. + :param logger: A logger object to use for internal function error logging. :return: A dictionary to encode in JSON for the error response. """ + logger = logger or internal_logger + if code not in HTTP_STATUS_CODES: - print(f"[Bento Lib] Error: Could not find code {code} in valid HTTP status codes.") + logger.error(f"Could not find code {code} in valid HTTP status codes.") code = 500 errors = (*errors, f"An invalid status code of {code} was specified by the service.") if code < 400: - print(f"[Bento Lib] Error: Code {code} is not an HTTP error code.") + logger.error(f"Code {code} is not an HTTP error code.") code = 500 errors = (*errors, f"A non-error status code of {code} was specified by the service.") diff --git a/bento_lib/responses/flask_errors.py b/bento_lib/responses/flask_errors.py index 00c4a00..a9e9a95 100644 --- a/bento_lib/responses/flask_errors.py +++ b/bento_lib/responses/flask_errors.py @@ -1,11 +1,11 @@ import logging -import sys import traceback from flask import jsonify, request from functools import partial from typing import Callable +from .._internal import internal_logger from ..auth.types import MarkAuthzDoneType from ..responses import errors @@ -34,24 +34,16 @@ def flask_error_wrap_with_traceback(fn: Callable, *args, **kwargs) -> Callable: """ Function to wrap flask_* error creators with something that supports the application.register_error_handler method, - while also printing a traceback. Optionally, the keyword argument service_name can be passed in to make the error - logging more precise. + while also printing a traceback. :param fn: The flask error-generating function to wrap :return: The wrapped function """ - service_name = kwargs.pop("service_name", "Bento Service") - - logger: logging.Logger | None = kwargs.pop("logger", None) + logger: logging.Logger = kwargs.pop("logger", internal_logger) authz: MarkAuthzDoneType | None = kwargs.pop("authz", None) def handle_error(e): - if logger: - logger.error(f"Encountered error:\n{traceback.format_exception(type(e), e, e.__traceback__)}") - else: - print(f"[{service_name}] Encountered error:", file=sys.stderr) - # TODO: py3.10: print_exception(e) - traceback.print_exception(type(e), e, e.__traceback__) + logger.error(f"Encountered error:\n{traceback.format_exception(type(e), e, e.__traceback__)}") if authz: authz.mark_authz_done(request) return fn(str(e), *args, **kwargs) diff --git a/tests/test_drs.py b/tests/test_drs.py index 14b15c0..fec08fa 100644 --- a/tests/test_drs.py +++ b/tests/test_drs.py @@ -1,10 +1,11 @@ import json import pytest import responses +import time from aioresponses import aioresponses from bento_lib.responses import errors -from bento_lib.drs import utils as drs_utils +from bento_lib.drs import exceptions as drs_exceptions, resolver as drs_resolver, utils as drs_utils TEST_DRS_ID = "dd11912c-3433-4a0a-8a01-3c0699288bef" @@ -40,38 +41,32 @@ def test_get_access_method(): def test_drs_uri_decode(): assert drs_utils.decode_drs_uri("drs://example.org/abc") == "https://example.org/ga4gh/drs/v1/objects/abc" - assert drs_utils.decode_drs_uri("drs://example.org/abc", internal_drs_base_url="http://localhost/sub/") == \ - "http://localhost/sub/ga4gh/drs/v1/objects/abc" - with pytest.raises(drs_utils.DrsInvalidScheme): + with pytest.raises(drs_exceptions.DrsInvalidScheme): drs_utils.decode_drs_uri("http://example.org/abc") - # TODO: Really, Bento should pass ga4gh/drs URLs to DRS directly instead of under a sub-path - @responses.activate -def test_drs_uri_fetch(): +def test_drs_uri_fetch_sync(): responses.add(responses.GET, f"https://example.org/ga4gh/drs/v1/objects/{TEST_DRS_ID}", json=TEST_DRS_REPLY, status=200) - responses.add(responses.GET, f"http://localhost/ga4gh/drs/v1/objects/{TEST_DRS_ID}", - json=TEST_DRS_REPLY, status=200) - responses.add(responses.GET, "https://example.org/ga4gh/drs/v1/objects/abc", - json=errors.not_found_error(drs_compat=True), status=404) - responses.add(responses.GET, "http://localhost/ga4gh/drs/v1/objects/abc", - json=errors.not_found_error(drs_compat=True), status=404) assert json.dumps(drs_utils.fetch_drs_record_by_uri(f"drs://example.org/{TEST_DRS_ID}"), sort_keys=True) == \ json.dumps(TEST_DRS_REPLY, sort_keys=True) - assert json.dumps(drs_utils.fetch_drs_record_by_uri( - f"drs://example.org/{TEST_DRS_ID}", - internal_drs_base_url="http://localhost/" - ), sort_keys=True) == json.dumps(TEST_DRS_REPLY, sort_keys=True) - with pytest.raises(drs_utils.DrsRequestError): + +@responses.activate +def test_drs_uri_fetch_sync_errors(): + responses.add(responses.GET, "https://example.org/ga4gh/drs/v1/objects/abc", + json=errors.not_found_error(drs_compat=True), status=404) + responses.add(responses.GET, "https://example.org/ga4gh/drs/v1/objects/xyz", + json=errors.not_found_error(drs_compat=True), status=400) + + with pytest.raises(drs_exceptions.DrsRecordNotFound): drs_utils.fetch_drs_record_by_uri("drs://example.org/abc") - with pytest.raises(drs_utils.DrsRequestError): - drs_utils.fetch_drs_record_by_uri("drs://example.org/abc", internal_drs_base_url="http://localhost/") + with pytest.raises(drs_exceptions.DrsRequestError): + drs_utils.fetch_drs_record_by_uri("drs://example.org/xyz") @pytest.fixture @@ -83,28 +78,69 @@ def mocked(): @pytest.mark.asyncio async def test_drs_uri_fetch_async(mocked): mocked.get(f"https://example.org/ga4gh/drs/v1/objects/{TEST_DRS_ID}", payload=TEST_DRS_REPLY, status=200) - mocked.get(f"http://localhost/ga4gh/drs/v1/objects/{TEST_DRS_ID}", payload=TEST_DRS_REPLY, status=200) - mocked.get( - f"http://localhost/ga4gh/drs/v1/objects/{TEST_DRS_ID}?internal_path=true", payload=TEST_DRS_REPLY, status=200) - mocked.get("https://example.org/ga4gh/drs/v1/objects/abc", - payload=errors.not_found_error(drs_compat=True), status=404) - mocked.get("http://localhost/ga4gh/drs/v1/objects/abc", - payload=errors.not_found_error(drs_compat=True), status=404) - mocked.get("http://localhost/ga4gh/drs/v1/objects/abc?internal_path=true", - payload=errors.not_found_error(drs_compat=True), status=404) assert json.dumps( await drs_utils.fetch_drs_record_by_uri_async(f"drs://example.org/{TEST_DRS_ID}"), sort_keys=True ) == json.dumps(TEST_DRS_REPLY, sort_keys=True) - assert json.dumps(await drs_utils.fetch_drs_record_by_uri_async( - f"drs://example.org/{TEST_DRS_ID}", - internal_drs_base_url="http://localhost/" - ), sort_keys=True) == json.dumps(TEST_DRS_REPLY, sort_keys=True) - with pytest.raises(drs_utils.DrsRequestError): + +@pytest.mark.asyncio +async def test_drs_uri_fetch_async_errors(mocked): + mocked.get("https://example.org/ga4gh/drs/v1/objects/abc", + payload=errors.not_found_error(drs_compat=True), status=404) + mocked.get("https://example.org/ga4gh/drs/v1/objects/xyz", + payload=errors.not_found_error(drs_compat=True), status=400) + + with pytest.raises(drs_exceptions.DrsRecordNotFound): await drs_utils.fetch_drs_record_by_uri_async("drs://example.org/abc") - with pytest.raises(drs_utils.DrsRequestError): - await drs_utils.fetch_drs_record_by_uri_async( - "drs://example.org/abc", internal_drs_base_url="http://localhost/") + with pytest.raises(drs_exceptions.DrsRequestError): + await drs_utils.fetch_drs_record_by_uri_async("drs://example.org/xyz") + + +def test_drs_resolver_class_basic(): + r = drs_resolver.DrsResolver() + + assert r.decode_drs_uri("drs://example.org/abc") == "https://example.org/ga4gh/drs/v1/objects/abc" + + with pytest.raises(drs_exceptions.DrsInvalidScheme): + r.decode_drs_uri("http://example.org/abc") + + +@responses.activate +def test_drs_resolver_class_sync(): + responses.add(responses.GET, f"https://example.org/ga4gh/drs/v1/objects/{TEST_DRS_ID}", + json=TEST_DRS_REPLY, status=200) + + r = drs_resolver.DrsResolver(cache_ttl=1.0) + + uri = f"drs://example.org/{TEST_DRS_ID}" + rec1 = r.fetch_drs_record_by_uri(uri) + assert uri in r._drs_record_cache + rec2 = r.fetch_drs_record_by_uri(uri) + assert rec1 == rec2 + + time.sleep(1.1) + + responses.add(responses.GET, f"https://example.org/ga4gh/drs/v1/objects/{TEST_DRS_ID}", + json=TEST_DRS_REPLY, status=200) + + r.fetch_drs_record_by_uri(uri) # should refetch + + +@pytest.mark.asyncio +async def test_drs_resolver_class_async(mocked): + r = drs_resolver.DrsResolver() + + mocked.get(f"https://example.org/ga4gh/drs/v1/objects/{TEST_DRS_ID}", payload=TEST_DRS_REPLY, status=200) + + uri = f"drs://example.org/{TEST_DRS_ID}" + rec1 = await r.fetch_drs_record_by_uri_async(uri) + assert uri in r._drs_record_cache + rec2 = await r.fetch_drs_record_by_uri_async(uri) + assert rec1 == rec2 + + time.sleep(1.1) + + await r.fetch_drs_record_by_uri_async(uri) # should refetch