diff --git a/src/_nebari/stages/kubernetes_services/template/modules/kubernetes/services/conda-store/config/conda_store_config.py b/src/_nebari/stages/kubernetes_services/template/modules/kubernetes/services/conda-store/config/conda_store_config.py index cc946de5cf..b41f29d250 100644 --- a/src/_nebari/stages/kubernetes_services/template/modules/kubernetes/services/conda-store/config/conda_store_config.py +++ b/src/_nebari/stages/kubernetes_services/template/modules/kubernetes/services/conda-store/config/conda_store_config.py @@ -1,44 +1,20 @@ import dataclasses -import datetime -import functools import json import logging import re import tempfile +import typing import urllib import urllib.parse import urllib.request from pathlib import Path -from typing import Dict, List, Optional, TypeAlias import requests from conda_store_server import api +from conda_store_server._internal import orm, schema from conda_store_server.server.auth import GenericOAuthAuthentication from conda_store_server.server.dependencies import get_conda_store from conda_store_server.storage import S3Storage -from pydantic import BaseModel, Field, constr - -# The following definitions (from line 24 to line 40) have been copied over -# from conda-store source code in order to avoid accessing a "private" schema -# that does not follow a backwards compatibility policy. -ALLOWED_CHARACTERS = "A-Za-z0-9-+_@$&?^~.=" -ARN_ALLOWED = f"^([{ALLOWED_CHARACTERS}*]+)/([{ALLOWED_CHARACTERS}*]+)$" - - -def _datetime_factory(offset: datetime.timedelta): - """utcnow datetime + timezone as string""" - return datetime.datetime.utcnow() + offset - - -RoleBindings: TypeAlias = Dict[constr(regex=ARN_ALLOWED), List[str]] - - -class AuthenticationToken(BaseModel): - exp: datetime.datetime = Field( - default_factory=functools.partial(_datetime_factory, datetime.timedelta(days=1)) - ) - primary_namespace: str = "default" - role_bindings: RoleBindings = {} def conda_store_config(path="/var/lib/conda-store/config.json"): @@ -135,7 +111,9 @@ def _validate_role(self, role): self.log.info(f"role: {role} is {'valid' if valid else 'invalid'}") return valid - def parse_role_and_namespace(self, text) -> Optional[CondaStoreNamespaceRole]: + def parse_role_and_namespace( + self, text + ) -> typing.Optional[CondaStoreNamespaceRole]: # The regex pattern pattern = r"^(\w+)!namespace=([^!]+)$" @@ -150,7 +128,7 @@ def parse_role_and_namespace(self, text) -> Optional[CondaStoreNamespaceRole]: else: return None - def parse_scope(self) -> List[CondaStoreNamespaceRole]: + def parse_scope(self) -> typing.List[CondaStoreNamespaceRole]: """Parsed scopes from keycloak role's attribute and returns a list of role/namespace if scopes' syntax is valid otherwise return [] @@ -271,7 +249,7 @@ async def _apply_roles_from_keycloak(self, request, user_data): ) def _filter_duplicate_namespace_roles_with_max_permissions( - self, namespace_roles: List[CondaStoreNamespaceRole] + self, namespace_roles: typing.List[CondaStoreNamespaceRole] ): """Filter duplicate roles in keycloak such that to apply only the one with the highest permissions. @@ -282,7 +260,7 @@ def _filter_duplicate_namespace_roles_with_max_permissions( We need to apply only the role 2 as that one has higher permissions. """ self.log.info("Filtering duplicate roles for same namespace") - namespace_role_mapping: Dict[str:CondaStoreNamespaceRole] = {} + namespace_role_mapping: typing.Dict[str:CondaStoreNamespaceRole] = {} for namespace_role in namespace_roles: namespace = namespace_role.namespace new_role = namespace_role.role @@ -305,7 +283,7 @@ def _filter_duplicate_namespace_roles_with_max_permissions( def _get_permissions_from_keycloak_role( self, keycloak_role - ) -> List[CondaStoreNamespaceRole]: + ) -> typing.List[CondaStoreNamespaceRole]: self.log.info(f"Getting permissions from keycloak role: {keycloak_role}") role_attributes = keycloak_role["attributes"] # scopes returns a list with a value say ["viewer!namespace=pycon,developer!namespace=scipy"] @@ -319,14 +297,14 @@ async def _apply_conda_store_roles_from_keycloak( self.log.info( f"Apply conda store roles from keycloak roles: {conda_store_client_roles}, user: {username}" ) - role_permissions: List[CondaStoreNamespaceRole] = [] + role_permissions: typing.List[CondaStoreNamespaceRole] = [] for conda_store_client_role in conda_store_client_roles: role_permissions += self._get_permissions_from_keycloak_role( conda_store_client_role ) self.log.info("Filtering duplicate namespace role for max permissions") - filtered_namespace_role: List[CondaStoreNamespaceRole] = ( + filtered_namespace_role: typing.List[CondaStoreNamespaceRole] = ( self._filter_duplicate_namespace_roles_with_max_permissions( role_permissions ) @@ -379,7 +357,9 @@ def _get_conda_store_client_roles_for_user( return client_roles_rich def _get_current_entity_bindings(self, username): - entity = AuthenticationToken(primary_namespace=username, role_bindings={}) + entity = schema.AuthenticationToken( + primary_namespace=username, role_bindings={} + ) self.log.info(f"entity: {entity}") entity_bindings = self.authorization.get_entity_bindings(entity) self.log.info(f"current entity_bindings: {entity_bindings}") @@ -407,7 +387,7 @@ async def authenticate(self, request): # superadmin gets access to everything if "conda_store_superadmin" in user_data.get("roles", []): - return AuthenticationToken( + return schema.AuthenticationToken( primary_namespace=username, role_bindings={"*/*": {"admin"}}, ) @@ -443,9 +423,10 @@ async def authenticate(self, request): for namespace in namespaces: _namespace = api.get_namespace(db, name=namespace) if _namespace is None: - api.create_namespace(db, name=namespace) + db.add(orm.Namespace(name=namespace)) + db.commit() - return AuthenticationToken( + return schema.AuthenticationToken( primary_namespace=username, role_bindings=role_bindings, )