Skip to content

Commit

Permalink
starting with black and isort
Browse files Browse the repository at this point in the history
  • Loading branch information
sametd committed Aug 19, 2024
1 parent 95ea047 commit 69f503e
Show file tree
Hide file tree
Showing 20 changed files with 125 additions and 124 deletions.
17 changes: 12 additions & 5 deletions polytope_server/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def check_limits(self, active_requests, request):

# Check collection total limit
if collection_total_limit is not None and collection_active_requests >= collection_total_limit:
logging.debug(f"Collection has {collection_active_requests} of {collection_total_limit} total active requests")
logging.debug(
f"Collection has {collection_active_requests} of {collection_total_limit} total active requests"
)
return False

# Determine the effective limit based on role or per-user setting
Expand All @@ -106,19 +108,24 @@ def check_limits(self, active_requests, request):

# Check if user exceeds the effective limit
if limit > 0:
user_active_requests = sum(qr.collection == request.collection and qr.user == request.user for qr in active_requests)
user_active_requests = sum(
qr.collection == request.collection and qr.user == request.user for qr in active_requests
)
if user_active_requests >= limit:
logging.debug(f"User {request.user} has {user_active_requests} of {limit} active requests in collection {request.collection}")
logging.debug(
f"User {request.user} has {user_active_requests} of {limit} active requests in collection {request.collection}"
)
return False
else:
logging.debug(f"User {request.user} has {user_active_requests} of {limit} active requests in collection {request.collection}")
logging.debug(
f"User {request.user} has {user_active_requests} of {limit} active requests in collection {request.collection}"
)
return True

# Allow if no limits are exceeded
logging.debug(f"No limit for user {request.user} in collection {request.collection}")
return True


def enqueue(self, request):

logging.info("Queuing request", extra={"request_id": request.id})
Expand Down
4 changes: 2 additions & 2 deletions polytope_server/common/authentication/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def name(self) -> str:
"plain": "PlainAuthentication",
"keycloak": "KeycloakAuthentication",
"federation": "FederationAuthentication",
"jwt" : "JWTAuthentication",
"openid_offline_access" : "OpenIDOfflineAuthentication",
"jwt": "JWTAuthentication",
"openid_offline_access": "OpenIDOfflineAuthentication",
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def authenticate(self, credentials: str) -> User:

logging.info("Decoded JWT: {}".format(decoded_token))


user = User(decoded_token["sub"], self.realm())

roles = decoded_token.get("resource_access", {}).get(self.client_id, {}).get("roles", [])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@

import logging
import os

import requests
from jose import jwt

from ..auth import User
from ..caching import cache
from . import authentication
from ..exceptions import ForbiddenRequest
from . import authentication


class OpenIDOfflineAuthentication(authentication.Authentication):
Expand All @@ -40,7 +41,6 @@ def __init__(self, name, realm, config):
self.iam_url = config["iam_url"]
self.iam_realm = config["iam_realm"]


super().__init__(name, realm, config)

def authentication_type(self):
Expand All @@ -52,18 +52,18 @@ def authentication_info(self):
@cache(lifetime=120)
def get_certs(self):
return requests.get(self.certs_url).json()

@cache(lifetime=120)
def check_offline_access_token(self, token: str) -> bool:
"""
We check if the token is recognised by the IAM service, and we cache this result.
We cannot simply try to get the access token because we would spam the IAM server with invalid tokens, and the
failure at that point would not be cached.
"""
keycloak_token_introspection = self.iam_url + "/realms/" + self.iam_realm + "/protocol/openid-connect/token/introspect"
introspection_data = {
"token": token
}
keycloak_token_introspection = (
self.iam_url + "/realms/" + self.iam_realm + "/protocol/openid-connect/token/introspect"
)
introspection_data = {"token": token}
b_auth = requests.auth.HTTPBasicAuth(self.private_client_id, self.private_client_secret)
resp = requests.post(url=keycloak_token_introspection, data=introspection_data, auth=b_auth).json()
if resp["active"] and resp["token_type"] == "Offline":
Expand All @@ -79,22 +79,19 @@ def authenticate(self, credentials: str) -> User:
# Check if this is a valid offline_access token
if not self.check_offline_access_token(credentials):
raise ForbiddenRequest("Not a valid offline_access token")

# Generate an access token from the offline_access token (like a refresh token)
refresh_data = {
"client_id": self.public_client_id,
"grant_type": "refresh_token",
"refresh_token": credentials
"refresh_token": credentials,
}
keycloak_token_endpoint = self.iam_url + "/realms/" + self.iam_realm + "/protocol/openid-connect/token"
resp = requests.post(url=keycloak_token_endpoint, data=refresh_data)
token = resp.json()['access_token']
token = resp.json()["access_token"]

certs = self.get_certs()
decoded_token = jwt.decode(token=token,
algorithms=jwt.get_unverified_header(token).get('alg'),
key=certs
)
decoded_token = jwt.decode(token=token, algorithms=jwt.get_unverified_header(token).get("alg"), key=certs)

logging.info("Decoded JWT: {}".format(decoded_token))

Expand All @@ -113,6 +110,5 @@ def authenticate(self, credentials: str) -> User:
raise ForbiddenRequest("Could not authenticate user from openid offline_access token")
return user


def collect_metric_info(self):
return {}
6 changes: 5 additions & 1 deletion polytope_server/common/caching/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ def __init__(self, cache_config):
password = cache_config.get("password")

collection = cache_config.get("collection", "cache")
self.client = mongo_client_factory.create_client(uri, username, password,)
self.client = mongo_client_factory.create_client(
uri,
username,
password,
)

self.database = self.client.cache
self.collection = self.database[collection]
Expand Down
8 changes: 5 additions & 3 deletions polytope_server/common/datasource/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#

import logging
from abc import ABC
import traceback
from abc import ABC
from importlib import import_module
from typing import Iterator

Expand Down Expand Up @@ -84,7 +84,9 @@ def dispatch(self, request, input_data) -> bool:
if hasattr(self, "silent_match") and self.silent_match:
pass
else:
request.user_message += "Skipping datasource {} due to match error: {}\n".format(self.get_type(), repr(e))
request.user_message += "Skipping datasource {} due to match error: {}\n".format(
self.get_type(), repr(e)
)
tb = traceback.format_exception(None, e, e.__traceback__)
logging.info(tb)

Expand Down Expand Up @@ -128,7 +130,7 @@ def dispatch(self, request, input_data) -> bool:
"echo": "EchoDataSource",
"dummy": "DummyDataSource",
"raise": "RaiseDataSource",
"ionbeam": "IonBeamDataSource"
"ionbeam": "IonBeamDataSource",
}


Expand Down
4 changes: 2 additions & 2 deletions polytope_server/common/datasource/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def retrieve(self, request):

if self.size < 0:
raise ValueError("Size must be non-negative")

return True

def result(self, request):
Expand All @@ -49,7 +49,7 @@ def result(self, request):
while data_generated < self.size:
remaining_size = self.size - data_generated
current_chunk_size = min(chunk_size, remaining_size)
yield b'x' * current_chunk_size
yield b"x" * current_chunk_size
data_generated += current_chunk_size

def destroy(self, request) -> None:
Expand Down
8 changes: 4 additions & 4 deletions polytope_server/common/datasource/fdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self, config):
os.environ["FDB5_HOME"] = self.config.get("fdb_home", "/opt/fdb")
os.environ["FDB_HOME"] = self.config.get("fdb_home", "/opt/fdb")
import pyfdb

self.fdb = pyfdb.FDB()

if "spaces" in self.fdb_config:
Expand Down Expand Up @@ -145,18 +146,17 @@ def match(self, request):
r = yaml.safe_load(request.user_request) or {}

for k, v in self.match_rules.items():

# An empty match rule means that the key must not be present
if v is None or len(v) == 0:
if k in r:
raise Exception("Request containing key '{}' is not allowed".format(k))
else:
continue # no more checks to do
continue # no more checks to do

# Check that all required keys exist
if k not in r and not (v is None or len(v) == 0):
raise Exception("Request does not contain expected key '{}'".format(k))


# Process date rules
if k == "date":
Expand Down
61 changes: 31 additions & 30 deletions polytope_server/common/datasource/ionbeam.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,47 @@
# granted to it by virtue of its status as an intergovernmental organisation nor
# does it submit to any jurisdiction.
#
import yaml
import logging
import requests
from dataclasses import dataclass
from urllib.parse import urljoin

from . import datasource
import requests
import yaml
from requests import Request
from dataclasses import dataclass

from . import datasource


@dataclass
class IonBeamAPI:
endpoint : str
endpoint: str

def __post_init__(self):
assert not self.endpoint.endswith("/")
self.session = requests.Session()

def get(self, path : str, **kwargs) -> requests.Response:
def get(self, path: str, **kwargs) -> requests.Response:
return self.session.get(f"{self.endpoint}/{path}", stream=True, **kwargs)

def get_bytes(self, path : str, **kwargs) -> requests.Response:
kwargs["headers"] = kwargs.get("headers", {}) | {
'Accept': 'application/octet-stream'
}

def get_bytes(self, path: str, **kwargs) -> requests.Response:
kwargs["headers"] = kwargs.get("headers", {}) | {"Accept": "application/octet-stream"}
return self.get(path, **kwargs)

def get_json(self, path, **kwargs):
return self.get(path, **kwargs).json()
def list(self, request : dict[str, str] = {}):
return self.get_json("list", params = request)
def head(self, request : dict[str, str] = {}):
return self.get_json("head", params = request)
def retrieve(self, request : dict[str, str]) -> requests.Response:
return self.get_bytes("retrieve", params = request)

def list(self, request: dict[str, str] = {}):
return self.get_json("list", params=request)

def head(self, request: dict[str, str] = {}):
return self.get_json("head", params=request)

def retrieve(self, request: dict[str, str]) -> requests.Response:
return self.get_bytes("retrieve", params=request)

def archive(self, request, file) -> requests.Response:
files = {'file': file}
return self.session.post(f"{self.endpoint}/archive", files=files, params = request)
files = {"file": file}
return self.session.post(f"{self.endpoint}/archive", files=files, params=request)


class IonBeamDataSource(datasource.DataSource):
Expand All @@ -66,6 +66,7 @@ class IonBeamDataSource(datasource.DataSource):
"""

read_chunk_size = 2 * 1024 * 1024

def __init__(self, config):
Expand All @@ -84,32 +85,32 @@ def mime_type(self) -> str:
def get_type(self):
return self.type

def archive(self, request : Request):
def archive(self, request: Request):
"""Archive data, returns nothing but updates datasource state"""
r = yaml.safe_load(request.user_request)
keys = r["keys"]

with open(r["path"], 'rb') as f:
with open(r["path"], "rb") as f:
return self.api.archive(keys, f)

def list(self, request : Request) -> list:
def list(self, request: Request) -> list:
request_keys = yaml.safe_load(request.user_request)
return self.api.list(request_keys)

def retrieve(self, request : Request) -> bool:
def retrieve(self, request: Request) -> bool:
"""Retrieve data, returns nothing but updates datasource state"""

request_keys = yaml.safe_load(request.user_request)
self.response = self.api.retrieve(request_keys)
return True

def result(self, request : Request):
def result(self, request: Request):
"""Returns a generator for the resultant data"""
return self.response.iter_content(chunk_size = self.read_chunk_size, decode_unicode=False)
return self.response.iter_content(chunk_size=self.read_chunk_size, decode_unicode=False)

def destroy(self, request) -> None:
"""A hook to do essential freeing of resources, called upon success or failure"""

# requests response objects with stream=True can remain open indefinitely if not read to completion
# or closed explicitly
if self.response:
Expand Down
25 changes: 12 additions & 13 deletions polytope_server/common/datasource/mars.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import logging
import os
import re
import tempfile
from datetime import datetime, timedelta
import re

import yaml
from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -262,20 +262,19 @@ def check_single_date(self, date, offset, offset_fmted, after=False):

def parse_relativedelta(self, time_str):

pattern = r'(\d+)([dhm])'
time_dict = {'d': 0, 'h': 0, 'm': 0}
pattern = r"(\d+)([dhm])"
time_dict = {"d": 0, "h": 0, "m": 0}
matches = re.findall(pattern, time_str)

for value, unit in matches:
if unit == 'd':
time_dict['d'] += int(value)
elif unit == 'h':
time_dict['h'] += int(value)
elif unit == 'm':
time_dict['m'] += int(value)

return relativedelta(days=time_dict['d'], hours=time_dict['h'], minutes=time_dict['m'])

for value, unit in matches:
if unit == "d":
time_dict["d"] += int(value)
elif unit == "h":
time_dict["h"] += int(value)
elif unit == "m":
time_dict["m"] += int(value)

return relativedelta(days=time_dict["d"], hours=time_dict["h"], minutes=time_dict["m"])

def date_check(self, date, offsets, after=False):
"""Process special match rules for DATE constraints"""
Expand Down
Loading

0 comments on commit 69f503e

Please sign in to comment.