Skip to content

Commit

Permalink
Merge pull request #222 from bento-platform/feat/new-drs-utils
Browse files Browse the repository at this point in the history
feat(drs)!: new DrsResolver class + restructured module/exceptions
  • Loading branch information
davidlougheed authored Aug 19, 2024
2 parents 32914ca + bea9dd2 commit 240d923
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 107 deletions.
7 changes: 7 additions & 0 deletions bento_lib/_internal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import logging

__all__ = ["internal_logger"]

logging.basicConfig(level=logging.NOTSET)

internal_logger = logging.getLogger("bento_lib")
5 changes: 0 additions & 5 deletions bento_lib/drs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +0,0 @@
from . import utils

__all__ = [
"utils",
]
17 changes: 17 additions & 0 deletions bento_lib/drs/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
__all__ = [
"DrsInvalidScheme",
"DrsRecordNotFound",
"DrsRequestError",
]


class DrsInvalidScheme(Exception):
pass


class DrsRecordNotFound(Exception):
pass


class DrsRequestError(Exception):
pass
62 changes: 62 additions & 0 deletions bento_lib/drs/resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from datetime import datetime

from .utils import decode_drs_uri, fetch_drs_record_by_uri, fetch_drs_record_by_uri_async

__all__ = [
"DrsResolver",
]


class DrsResolver:
def __init__(self, cache_ttl: float = 900.0):
"""
Constructor for a DrsResolver object, which is used to fetch and cache DRS records by their drs://-schemed URI.
:param cache_ttl: How long (in seconds) a cache entry is valid for.
"""
self._cache_ttl: float = cache_ttl
self._drs_record_cache: dict[str, tuple[float, dict]] = {}

@staticmethod
def decode_drs_uri(drs_uri: str) -> str:
"""
This is just a wrapper for bento_lib.drs.utils.decode_drs_uri.
:param drs_uri: The drs://-schemed URI for the DRS object.
:return: The HTTP URI for the DRS object specified.
"""
return decode_drs_uri(drs_uri)

def fetch_drs_record_by_uri(self, drs_uri: str) -> dict:
"""
Fetches and, for a time, caches a DRS record using its drs://-schemed URI.
Cache is shared between this function and its asynchronous version.
:param drs_uri: A resolvable drs://-schemed URI for a DRS object record.
:return: The fetched record dictionary.
"""

now = datetime.now().timestamp()

cache_record = self._drs_record_cache.get(drs_uri)
if cache_record is not None and now - cache_record[0] <= self._cache_ttl:
return cache_record[1]

res = fetch_drs_record_by_uri(drs_uri)
self._drs_record_cache[drs_uri] = (now, res)
return res

async def fetch_drs_record_by_uri_async(self, drs_uri: str) -> dict:
"""
Asynchronously fetches and, for a time, caches a DRS record using its drs://-schemed URI.
Cache is shared between this function and its synchronous version.
:param drs_uri: A resolvable drs://-schemed URI for a DRS object record.
:return: The fetched record dictionary.
"""

now = datetime.now().timestamp()

cache_record = self._drs_record_cache.get(drs_uri)
if cache_record is not None and now - cache_record[0] <= self._cache_ttl:
return cache_record[1]

res = await fetch_drs_record_by_uri_async(drs_uri)
self._drs_record_cache[drs_uri] = (now, res)
return res
67 changes: 17 additions & 50 deletions bento_lib/drs/utils.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,18 @@
import aiohttp
import requests
import sys

from urllib.parse import urlparse

from typing import Optional
from .exceptions import DrsInvalidScheme, DrsRecordNotFound, DrsRequestError

__all__ = [
"DrsInvalidScheme",
"DrsRequestError",

"get_access_method_of_type",
"decode_drs_uri",
"fetch_drs_record_by_uri"
"fetch_drs_record_by_uri",
"fetch_drs_record_by_uri_async",
]


class DrsInvalidScheme(Exception):
pass


class DrsRequestError(Exception):
pass


def get_access_method_of_type(drs_object_record: dict, access_type: str, default=None):
"""
Gets an access method of specified type from a DRS object, if one with the type exists.
Expand All @@ -35,80 +24,58 @@ def get_access_method_of_type(drs_object_record: dict, access_type: str, default
return next((a for a in drs_object_record.get("access_methods", []) if a.get("type", None) == access_type), default)


def decode_drs_uri(drs_uri: str, internal_drs_base_url: Optional[str] = None) -> str:
def decode_drs_uri(drs_uri: str) -> str:
"""
Given a DRS URI and possibly an override for the DRS service URL, returns
the decoded HTTP URL for the DRS object.
:param drs_uri: The drs://-schemed URI for the DRS object.
:param internal_drs_base_url: An optional override hard-coded DRS base URL to use, for container networking etc.
:return: The HTTP URI for the DRS object specified.
"""

parsed_drs_uri = urlparse(drs_uri)

if parsed_drs_uri.scheme != "drs":
print(f"[Bento Lib] Invalid scheme: '{parsed_drs_uri.scheme}'", file=sys.stderr, flush=True)
raise DrsInvalidScheme(f"Encountered invalid DRS scheme: {parsed_drs_uri.scheme}")

drs_base_path = internal_drs_base_url.rstrip("/") if internal_drs_base_url else f"https://{parsed_drs_uri.netloc}"
return f"{drs_base_path}/ga4gh/drs/v1/objects/{parsed_drs_uri.path.split('/')[-1]}"
return f"https://{parsed_drs_uri.netloc}/ga4gh/drs/v1/objects/{parsed_drs_uri.path.split('/')[-1]}"


def fetch_drs_record_by_uri(drs_uri: str, internal_drs_base_url: Optional[str] = None) -> Optional[dict]:
def fetch_drs_record_by_uri(drs_uri: str) -> dict:
"""
Given a URI in the format drs://<hostname>/<object-id>, decodes it into an
HTTP URL and fetches the object metadata.
:param drs_uri: The URI of the object to fetch.
:param internal_drs_base_url: An optional override hard-coded DRS base URL to use, for container networking etc.
:return: The fetched DRS object metadata.
"""

# TODO: Translation dictionary for internal DRS hostnames, to avoid overriding EVERY DRS host.
decoded_object_uri = decode_drs_uri(drs_uri)

decoded_object_uri = decode_drs_uri(drs_uri, internal_drs_base_url)
print(f"[Bento Lib] Attempting to fetch {decoded_object_uri}", flush=True)
params = {"internal_path": "true"} if internal_drs_base_url else {}
drs_res = requests.get(decoded_object_uri, params=params)
drs_res = requests.get(decoded_object_uri)

if drs_res.status_code != 200:
print(f"[Bento Lib] Could not fetch: '{decoded_object_uri}'", file=sys.stderr, flush=True)
print(f"\tAttempted URL: {decoded_object_uri} (status: {drs_res.status_code})", file=sys.stderr, flush=True)
if drs_res.status_code == 404:
raise DrsRecordNotFound(f"Could not find DRS record at '{decoded_object_uri}'")
elif drs_res.status_code != 200:
raise DrsRequestError(f"Could not fetch '{decoded_object_uri}' (status: {drs_res.status_code})")

# TODO: Handle JSON parse errors
# TODO: Schema for DRS response

return drs_res.json()


async def fetch_drs_record_by_uri_async(
drs_uri: str,
internal_drs_base_url: Optional[str] = None,
session_kwargs: dict | None = None,
) -> Optional[dict]:
async def fetch_drs_record_by_uri_async(drs_uri: str, session_kwargs: dict | None = None) -> dict:
"""
Given a URI in the format drs://<hostname>/<object-id>, decodes it into an
HTTP URL and asynchronously fetches the object metadata.
:param drs_uri: The URI of the object to fetch.
:param internal_drs_base_url: An optional override hard-coded DRS base URL to use, for container networking etc.
:param session_kwargs: Optional dictionary of parameters to pass to the aiohttp.ClientSession constructor.
:return: The fetched DRS object metadata.
"""

# TODO: Translation dictionary for internal DRS hostnames, to avoid overriding EVERY DRS host.
decoded_object_uri = decode_drs_uri(drs_uri)

decoded_object_uri = decode_drs_uri(drs_uri, internal_drs_base_url)
print(f"[Bento Lib] Attempting to fetch {decoded_object_uri}", flush=True)

params = {"internal_path": "true"} if internal_drs_base_url else {}
async with aiohttp.ClientSession(**(session_kwargs or {})) as session:
async with session.get(decoded_object_uri, params=params) as drs_res:
if drs_res.status != 200:
print(f"[Bento Lib] Could not fetch: '{decoded_object_uri}'", file=sys.stderr, flush=True)
print(f"\tAttempted URL: {decoded_object_uri} (status: {drs_res.status})", file=sys.stderr, flush=True)
async with session.get(decoded_object_uri) as drs_res:
if drs_res.status == 404:
raise DrsRecordNotFound(f"Could not find DRS record at '{decoded_object_uri}'")
elif drs_res.status != 200:
raise DrsRequestError(f"Could not fetch '{decoded_object_uri}' (status: {drs_res.status})")

# TODO: Handle JSON parse errors
# TODO: Schema for DRS response

return await drs_res.json()
12 changes: 9 additions & 3 deletions bento_lib/responses/errors.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
from datetime import datetime, timezone
from functools import partial
from typing import Callable
from werkzeug.http import HTTP_STATUS_CODES

from typing import Callable
from bento_lib._internal import internal_logger


__all__ = [
Expand Down Expand Up @@ -32,6 +34,7 @@ def http_error(
drs_compat: bool = False,
sr_compat: bool = False,
beacon_meta_callback: Callable[[], dict] | None = None,
logger: logging.Logger | None = None,
):
"""
Builds a dictionary for an HTTP error JSON response.
Expand All @@ -41,16 +44,19 @@ def http_error(
:param sr_compat: Whether to generate a GA4GH Service Registry backwards-compatible response.
:param beacon_meta_callback: Callback for generating GA4GH Beacon V2 backwards-compatible meta field for
error response. If this is specified, Beacon V2-compatible errors will be enabled.
:param logger: A logger object to use for internal function error logging.
:return: A dictionary to encode in JSON for the error response.
"""

logger = logger or internal_logger

if code not in HTTP_STATUS_CODES:
print(f"[Bento Lib] Error: Could not find code {code} in valid HTTP status codes.")
logger.error(f"Could not find code {code} in valid HTTP status codes.")
code = 500
errors = (*errors, f"An invalid status code of {code} was specified by the service.")

if code < 400:
print(f"[Bento Lib] Error: Code {code} is not an HTTP error code.")
logger.error(f"Code {code} is not an HTTP error code.")
code = 500
errors = (*errors, f"A non-error status code of {code} was specified by the service.")

Expand Down
16 changes: 4 additions & 12 deletions bento_lib/responses/flask_errors.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
import sys
import traceback

from flask import jsonify, request
from functools import partial
from typing import Callable

from .._internal import internal_logger
from ..auth.types import MarkAuthzDoneType
from ..responses import errors

Expand Down Expand Up @@ -34,24 +34,16 @@
def flask_error_wrap_with_traceback(fn: Callable, *args, **kwargs) -> Callable:
"""
Function to wrap flask_* error creators with something that supports the application.register_error_handler method,
while also printing a traceback. Optionally, the keyword argument service_name can be passed in to make the error
logging more precise.
while also printing a traceback.
:param fn: The flask error-generating function to wrap
:return: The wrapped function
"""

service_name = kwargs.pop("service_name", "Bento Service")

logger: logging.Logger | None = kwargs.pop("logger", None)
logger: logging.Logger = kwargs.pop("logger", internal_logger)
authz: MarkAuthzDoneType | None = kwargs.pop("authz", None)

def handle_error(e):
if logger:
logger.error(f"Encountered error:\n{traceback.format_exception(type(e), e, e.__traceback__)}")
else:
print(f"[{service_name}] Encountered error:", file=sys.stderr)
# TODO: py3.10: print_exception(e)
traceback.print_exception(type(e), e, e.__traceback__)
logger.error(f"Encountered error:\n{traceback.format_exception(type(e), e, e.__traceback__)}")
if authz:
authz.mark_authz_done(request)
return fn(str(e), *args, **kwargs)
Expand Down
Loading

0 comments on commit 240d923

Please sign in to comment.