Skip to content

Commit

Permalink
Revert vendoring conda-store schema
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelovilla committed Nov 12, 2024
1 parent 03a1ab4 commit 05d80a2
Showing 1 changed file with 18 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,44 +1,20 @@
import dataclasses
import datetime
import functools
import json
import logging
import re
import tempfile
import typing
import urllib
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Dict, List, Optional, TypeAlias

import requests
from conda_store_server import api
from conda_store_server._internal import orm, schema
from conda_store_server.server.auth import GenericOAuthAuthentication
from conda_store_server.server.dependencies import get_conda_store
from conda_store_server.storage import S3Storage
from pydantic import BaseModel, Field, constr

# The following definitions (from line 24 to line 40) have been copied over
# from conda-store source code in order to avoid accessing a "private" schema
# that does not follow a backwards compatibility policy.
ALLOWED_CHARACTERS = "A-Za-z0-9-+_@$&?^~.="
ARN_ALLOWED = f"^([{ALLOWED_CHARACTERS}*]+)/([{ALLOWED_CHARACTERS}*]+)$"


def _datetime_factory(offset: datetime.timedelta):
"""utcnow datetime + timezone as string"""
return datetime.datetime.utcnow() + offset


RoleBindings: TypeAlias = Dict[constr(regex=ARN_ALLOWED), List[str]]


class AuthenticationToken(BaseModel):
exp: datetime.datetime = Field(
default_factory=functools.partial(_datetime_factory, datetime.timedelta(days=1))
)
primary_namespace: str = "default"
role_bindings: RoleBindings = {}


def conda_store_config(path="/var/lib/conda-store/config.json"):
Expand Down Expand Up @@ -135,7 +111,9 @@ def _validate_role(self, role):
self.log.info(f"role: {role} is {'valid' if valid else 'invalid'}")
return valid

def parse_role_and_namespace(self, text) -> Optional[CondaStoreNamespaceRole]:
def parse_role_and_namespace(
self, text
) -> typing.Optional[CondaStoreNamespaceRole]:
# The regex pattern
pattern = r"^(\w+)!namespace=([^!]+)$"

Expand All @@ -150,7 +128,7 @@ def parse_role_and_namespace(self, text) -> Optional[CondaStoreNamespaceRole]:
else:
return None

def parse_scope(self) -> List[CondaStoreNamespaceRole]:
def parse_scope(self) -> typing.List[CondaStoreNamespaceRole]:
"""Parsed scopes from keycloak role's attribute and returns a list of role/namespace
if scopes' syntax is valid otherwise return []
Expand Down Expand Up @@ -271,7 +249,7 @@ async def _apply_roles_from_keycloak(self, request, user_data):
)

def _filter_duplicate_namespace_roles_with_max_permissions(
self, namespace_roles: List[CondaStoreNamespaceRole]
self, namespace_roles: typing.List[CondaStoreNamespaceRole]
):
"""Filter duplicate roles in keycloak such that to apply only the one with the highest
permissions.
Expand All @@ -282,7 +260,7 @@ def _filter_duplicate_namespace_roles_with_max_permissions(
We need to apply only the role 2 as that one has higher permissions.
"""
self.log.info("Filtering duplicate roles for same namespace")
namespace_role_mapping: Dict[str:CondaStoreNamespaceRole] = {}
namespace_role_mapping: typing.Dict[str:CondaStoreNamespaceRole] = {}
for namespace_role in namespace_roles:
namespace = namespace_role.namespace
new_role = namespace_role.role
Expand All @@ -305,7 +283,7 @@ def _filter_duplicate_namespace_roles_with_max_permissions(

def _get_permissions_from_keycloak_role(
self, keycloak_role
) -> List[CondaStoreNamespaceRole]:
) -> typing.List[CondaStoreNamespaceRole]:
self.log.info(f"Getting permissions from keycloak role: {keycloak_role}")
role_attributes = keycloak_role["attributes"]
# scopes returns a list with a value say ["viewer!namespace=pycon,developer!namespace=scipy"]
Expand All @@ -319,14 +297,14 @@ async def _apply_conda_store_roles_from_keycloak(
self.log.info(
f"Apply conda store roles from keycloak roles: {conda_store_client_roles}, user: {username}"
)
role_permissions: List[CondaStoreNamespaceRole] = []
role_permissions: typing.List[CondaStoreNamespaceRole] = []
for conda_store_client_role in conda_store_client_roles:
role_permissions += self._get_permissions_from_keycloak_role(
conda_store_client_role
)

self.log.info("Filtering duplicate namespace role for max permissions")
filtered_namespace_role: List[CondaStoreNamespaceRole] = (
filtered_namespace_role: typing.List[CondaStoreNamespaceRole] = (
self._filter_duplicate_namespace_roles_with_max_permissions(
role_permissions
)
Expand Down Expand Up @@ -379,7 +357,9 @@ def _get_conda_store_client_roles_for_user(
return client_roles_rich

def _get_current_entity_bindings(self, username):
entity = AuthenticationToken(primary_namespace=username, role_bindings={})
entity = schema.AuthenticationToken(
primary_namespace=username, role_bindings={}
)
self.log.info(f"entity: {entity}")
entity_bindings = self.authorization.get_entity_bindings(entity)
self.log.info(f"current entity_bindings: {entity_bindings}")
Expand Down Expand Up @@ -407,7 +387,7 @@ async def authenticate(self, request):

# superadmin gets access to everything
if "conda_store_superadmin" in user_data.get("roles", []):
return AuthenticationToken(
return schema.AuthenticationToken(
primary_namespace=username,
role_bindings={"*/*": {"admin"}},
)
Expand Down Expand Up @@ -443,9 +423,10 @@ async def authenticate(self, request):
for namespace in namespaces:
_namespace = api.get_namespace(db, name=namespace)
if _namespace is None:
api.create_namespace(db, name=namespace)
db.add(orm.Namespace(name=namespace))
db.commit()

return AuthenticationToken(
return schema.AuthenticationToken(
primary_namespace=username,
role_bindings=role_bindings,
)
Expand Down

0 comments on commit 05d80a2

Please sign in to comment.