From 569df84919f2fbfc7a4a7e54fb0a7f8e0c96f3f5 Mon Sep 17 00:00:00 2001 From: Johannes Nussbaum <39048939+jnussbaum@users.noreply.github.com> Date: Wed, 11 Oct 2023 09:07:40 +0200 Subject: [PATCH] chore: validate PermissionScope model (DEV-2784) (#31) --- dsp_permissions_scripts/models/groups.py | 2 +- dsp_permissions_scripts/models/scope.py | 70 ++++++++++++++++--- dsp_permissions_scripts/template.py | 2 +- dsp_permissions_scripts/utils/helpers.py | 23 ++++++ dsp_permissions_scripts/utils/project.py | 8 +-- .../utils/scope_serialization.py | 3 +- tests/test_helpers.py | 37 ++++++++++ tests/test_scope.py | 52 ++++++++++++++ tests/test_scope_serialization.py | 47 ++++++++----- 9 files changed, 206 insertions(+), 38 deletions(-) create mode 100644 dsp_permissions_scripts/utils/helpers.py create mode 100644 tests/test_helpers.py create mode 100644 tests/test_scope.py diff --git a/dsp_permissions_scripts/models/groups.py b/dsp_permissions_scripts/models/groups.py index c24790b8..2e0ad89d 100644 --- a/dsp_permissions_scripts/models/groups.py +++ b/dsp_permissions_scripts/models/groups.py @@ -8,7 +8,7 @@ class BuiltinGroup(Enum): UNKNOWN_USER = "http://www.knora.org/ontology/knora-admin#UnknownUser" KNOWN_USER = "http://www.knora.org/ontology/knora-admin#KnownUser" - CREATOR = "http://www.knora.org/ontology/knora-admin#Creator" PROJECT_MEMBER = "http://www.knora.org/ontology/knora-admin#ProjectMember" PROJECT_ADMIN = "http://www.knora.org/ontology/knora-admin#ProjectAdmin" + CREATOR = "http://www.knora.org/ontology/knora-admin#Creator" SYSTEM_ADMIN = "http://www.knora.org/ontology/knora-admin#SystemAdmin" diff --git a/dsp_permissions_scripts/models/scope.py b/dsp_permissions_scripts/models/scope.py index 920180a1..2acead56 100644 --- a/dsp_permissions_scripts/models/scope.py +++ b/dsp_permissions_scripts/models/scope.py @@ -1,4 +1,8 @@ -from pydantic import BaseModel +from __future__ import annotations + +from typing import Iterable, Literal + +from pydantic import BaseModel, ConfigDict, model_validator from dsp_permissions_scripts.models.groups import BuiltinGroup @@ -8,16 +12,62 @@ class PermissionScope(BaseModel): A scope is an object encoding the information: "Which user group gets which permissions on a resource/value?" """ + model_config = ConfigDict(frozen=True) + + CR: frozenset[str | BuiltinGroup] = frozenset() + D: frozenset[str | BuiltinGroup] = frozenset() + M: frozenset[str | BuiltinGroup] = frozenset() + V: frozenset[str | BuiltinGroup] = frozenset() + RV: frozenset[str | BuiltinGroup] = frozenset() + + @staticmethod + def create( + CR: Iterable[str | BuiltinGroup] = (), + D: Iterable[str | BuiltinGroup] = (), + M: Iterable[str | BuiltinGroup] = (), + V: Iterable[str | BuiltinGroup] = (), + RV: Iterable[str | BuiltinGroup] = (), + ) -> PermissionScope: + """Factory method to create a PermissionScope from Iterables instead of frozensets.""" + return PermissionScope( + CR=frozenset(CR), + D=frozenset(D), + M=frozenset(M), + V=frozenset(V), + RV=frozenset(RV), + ) + + @model_validator(mode="after") + def check_group_occurs_only_once(self): + all_groups = [] + for field in self.model_fields: + all_groups.extend(getattr(self, field)) + all_groups_as_strs = [g.value if isinstance(g, BuiltinGroup) else g for g in all_groups] + for group in all_groups_as_strs: + if all_groups_as_strs.count(group) > 1: + raise ValueError(f"Group {group} must not occur in more than one field") + return self - CR: list[str | BuiltinGroup] = [] - D: list[str | BuiltinGroup] = [] - M: list[str | BuiltinGroup] = [] - V: list[str | BuiltinGroup] = [] - RV: list[str | BuiltinGroup] = [] + def add( + self, + permission: Literal["CR", "D", "M", "V", "RV"], + group: str | BuiltinGroup, + ) -> PermissionScope: + """Return a copy of the PermissionScope instance with group added to permission.""" + groups = [g.value if isinstance(g, BuiltinGroup) else g for g in getattr(self, permission)] + group = group.value if isinstance(group, BuiltinGroup) else group + if group in groups: + raise ValueError(f"Group '{group}' is already in permission '{permission}'") + groups.append(group) + kwargs: dict[str, list[str]] = {permission: groups} + for perm in ["CR", "D", "M", "V", "RV"]: + if perm != permission: + kwargs[perm] = getattr(self, perm) + return PermissionScope.create(**kwargs) -PUBLIC = PermissionScope( - CR=[BuiltinGroup.PROJECT_ADMIN], - D=[BuiltinGroup.CREATOR, BuiltinGroup.PROJECT_MEMBER], - V=[BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER], +PUBLIC = PermissionScope.create( + CR={BuiltinGroup.PROJECT_ADMIN}, + D={BuiltinGroup.CREATOR, BuiltinGroup.PROJECT_MEMBER}, + V={BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER}, ) diff --git a/dsp_permissions_scripts/template.py b/dsp_permissions_scripts/template.py index 20fa1bd3..d93754a7 100644 --- a/dsp_permissions_scripts/template.py +++ b/dsp_permissions_scripts/template.py @@ -22,7 +22,7 @@ def modify_oaps(oaps: list[Oap]) -> list[Oap]: """Adapt this sample to your needs.""" for oap in oaps: - oap.scope.CR.append(BuiltinGroup.SYSTEM_ADMIN) + oap.scope = oap.scope.add("CR", BuiltinGroup.SYSTEM_ADMIN) return oaps diff --git a/dsp_permissions_scripts/utils/helpers.py b/dsp_permissions_scripts/utils/helpers.py new file mode 100644 index 00000000..3f631381 --- /dev/null +++ b/dsp_permissions_scripts/utils/helpers.py @@ -0,0 +1,23 @@ +from dsp_permissions_scripts.models.groups import BuiltinGroup + + +def dereference_prefix( + identifier: str, + context: dict[str, str], +) -> str: + prefix, actual_id = identifier.split(":") + return context[prefix] + actual_id + + +def _get_sort_pos_of_custom_group(group: str) -> int: + alphabet = list("abcdefghijklmnopqrstuvwxyz") + relevant_letter = group.replace("http://www.knora.org/ontology/knora-admin#", "")[0] + return alphabet.index(relevant_letter.lower()) + 99 # must be higher than the highest index of the builtin groups + + +def sort_groups(groups_original: list[str]) -> list[str]: + """Sorts groups, first according to their power (most powerful first), then alphabetically.""" + sort_key = list(reversed([x.value for x in BuiltinGroup])) + groups = groups_original.copy() + groups.sort(key=lambda x: sort_key.index(x) if x in sort_key else _get_sort_pos_of_custom_group(x)) + return groups diff --git a/dsp_permissions_scripts/utils/project.py b/dsp_permissions_scripts/utils/project.py index 4c9b82ce..c6e3e9fd 100644 --- a/dsp_permissions_scripts/utils/project.py +++ b/dsp_permissions_scripts/utils/project.py @@ -5,6 +5,7 @@ from dsp_permissions_scripts.models.oap import Oap from dsp_permissions_scripts.utils.authentication import get_protocol from dsp_permissions_scripts.utils.get_logger import get_logger, get_timestamp +from dsp_permissions_scripts.utils.helpers import dereference_prefix from dsp_permissions_scripts.utils.scope_serialization import create_scope_from_string logger = get_logger(__name__) @@ -61,15 +62,10 @@ def _get_class_iris_of_onto( all_entities = response.json()["@graph"] context = response.json()["@context"] class_ids = [c["@id"] for c in all_entities if c.get("knora-api:isResourceClass")] - class_iris = [_dereference_prefix(class_id, context) for class_id in class_ids] + class_iris = [dereference_prefix(class_id, context) for class_id in class_ids] return class_iris -def _dereference_prefix(identifier: str, context: dict[str, str]) -> str: - prefix, actual_id = identifier.split(":") - return context[prefix] + actual_id - - def _get_all_resource_oaps_of_resclass( host: str, resclass_iri: str, diff --git a/dsp_permissions_scripts/utils/scope_serialization.py b/dsp_permissions_scripts/utils/scope_serialization.py index 4ed92f5e..503e7975 100644 --- a/dsp_permissions_scripts/utils/scope_serialization.py +++ b/dsp_permissions_scripts/utils/scope_serialization.py @@ -2,6 +2,7 @@ from dsp_permissions_scripts.models.groups import BuiltinGroup from dsp_permissions_scripts.models.scope import PermissionScope +from dsp_permissions_scripts.utils.helpers import sort_groups def create_string_from_scope(perm_scope: PermissionScope) -> str: @@ -10,7 +11,7 @@ def create_string_from_scope(perm_scope: PermissionScope) -> str: for perm_letter, groups in perm_scope.model_dump().items(): if groups: groups_as_str = [g.value if isinstance(g, BuiltinGroup) else g for g in groups] - as_dict[perm_letter] = groups_as_str + as_dict[perm_letter] = sort_groups(groups_as_str) strs = [f"{k} {','.join(l)}" for k, l in as_dict.items()] return "|".join(strs) diff --git a/tests/test_helpers.py b/tests/test_helpers.py new file mode 100644 index 00000000..e741c6d7 --- /dev/null +++ b/tests/test_helpers.py @@ -0,0 +1,37 @@ +import unittest + +from dsp_permissions_scripts.models.groups import BuiltinGroup +from dsp_permissions_scripts.utils.helpers import sort_groups + + +class TestHelpers(unittest.TestCase): + + def test_sort_groups(self) -> None: + groups_original = [ + "http://www.knora.org/ontology/knora-admin#C_CustomGroup", + BuiltinGroup.UNKNOWN_USER.value, + BuiltinGroup.PROJECT_ADMIN.value, + BuiltinGroup.PROJECT_MEMBER.value, + BuiltinGroup.CREATOR.value, + "http://www.knora.org/ontology/knora-admin#A_CustomGroup", + "http://www.knora.org/ontology/knora-admin#B_CustomGroup", + BuiltinGroup.KNOWN_USER.value, + BuiltinGroup.SYSTEM_ADMIN.value, + ] + groups_expected = [ + BuiltinGroup.SYSTEM_ADMIN.value, + BuiltinGroup.CREATOR.value, + BuiltinGroup.PROJECT_ADMIN.value, + BuiltinGroup.PROJECT_MEMBER.value, + BuiltinGroup.KNOWN_USER.value, + BuiltinGroup.UNKNOWN_USER.value, + "http://www.knora.org/ontology/knora-admin#A_CustomGroup", + "http://www.knora.org/ontology/knora-admin#B_CustomGroup", + "http://www.knora.org/ontology/knora-admin#C_CustomGroup", + ] + groups_returned = sort_groups(groups_original=groups_original) + self.assertEqual(groups_returned, groups_expected) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_scope.py b/tests/test_scope.py new file mode 100644 index 00000000..bd5fb853 --- /dev/null +++ b/tests/test_scope.py @@ -0,0 +1,52 @@ +import unittest + +from dsp_permissions_scripts.models.groups import BuiltinGroup +from dsp_permissions_scripts.models.scope import PermissionScope + + +class TestScope(unittest.TestCase): + + def test_scope_validation_on_creation(self) -> None: + with self.assertRaisesRegex(ValueError, "must not occur in more than one field"): + PermissionScope.create( + CR={BuiltinGroup.PROJECT_ADMIN}, + D={BuiltinGroup.PROJECT_ADMIN}, + V={BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER}, + ) + + def test_scope_validation_on_add_to_same_permission(self) -> None: + scope = PermissionScope.create( + CR={BuiltinGroup.PROJECT_ADMIN}, + V={BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER}, + ) + with self.assertRaisesRegex( + ValueError, + "Group 'http://www.knora.org/ontology/knora-admin#ProjectAdmin' is already in permission 'CR'" + ): + _ = scope.add("CR", BuiltinGroup.PROJECT_ADMIN) + + def test_scope_validation_on_add_to_different_permission(self) -> None: + scope = PermissionScope.create( + CR={BuiltinGroup.PROJECT_ADMIN}, + V={BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER}, + ) + with self.assertRaisesRegex(ValueError, "must not occur in more than one field"): + _ = scope.add("RV", BuiltinGroup.PROJECT_ADMIN) + + def test_add_to_scope(self) -> None: + scope = PermissionScope.create( + D={BuiltinGroup.SYSTEM_ADMIN}, + M={BuiltinGroup.PROJECT_MEMBER, BuiltinGroup.KNOWN_USER}, + ) + scope_added = scope.add("CR", BuiltinGroup.PROJECT_ADMIN) + self.assertEqual( + scope_added.model_dump_json(), + PermissionScope.create( + CR={BuiltinGroup.PROJECT_ADMIN}, + D={BuiltinGroup.SYSTEM_ADMIN}, + M={BuiltinGroup.PROJECT_MEMBER, BuiltinGroup.KNOWN_USER}, + ).model_dump_json(), + ) + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_scope_serialization.py b/tests/test_scope_serialization.py index b4627e74..873dd536 100644 --- a/tests/test_scope_serialization.py +++ b/tests/test_scope_serialization.py @@ -1,3 +1,4 @@ +import json import unittest from typing import Any @@ -41,39 +42,47 @@ class TestScopeSerialization(unittest.TestCase): ], ] scopes = [ - PermissionScope( + PermissionScope.create( CR=[BuiltinGroup.SYSTEM_ADMIN], V=["http://www.knora.org/ontology/knora-admin#CustomGroup"], ), - PermissionScope( - D=[BuiltinGroup.PROJECT_ADMIN], - RV=[BuiltinGroup.PROJECT_MEMBER], + PermissionScope.create( + D={BuiltinGroup.PROJECT_ADMIN}, + RV={BuiltinGroup.PROJECT_MEMBER}, ), - PermissionScope( - M=[BuiltinGroup.PROJECT_ADMIN], - V=[BuiltinGroup.CREATOR, BuiltinGroup.KNOWN_USER], - RV=[BuiltinGroup.UNKNOWN_USER], + PermissionScope.create( + M={BuiltinGroup.PROJECT_ADMIN}, + V={BuiltinGroup.CREATOR, BuiltinGroup.KNOWN_USER}, + RV={BuiltinGroup.UNKNOWN_USER}, ), - PermissionScope( - CR=[BuiltinGroup.SYSTEM_ADMIN, BuiltinGroup.PROJECT_ADMIN], - D=[BuiltinGroup.CREATOR], - RV=[BuiltinGroup.UNKNOWN_USER], + PermissionScope.create( + CR={BuiltinGroup.SYSTEM_ADMIN, BuiltinGroup.PROJECT_ADMIN}, + D={BuiltinGroup.CREATOR}, + RV={BuiltinGroup.UNKNOWN_USER}, ), ] def test_create_scope_from_string(self) -> None: for perm_string, scope in zip(self.perm_strings, self.scopes): - self.assertEqual( - create_scope_from_string(perm_string).model_dump_json(), - scope.model_dump_json(), + returned = json.loads(create_scope_from_string(perm_string).model_dump_json()) + returned = {k: sorted(v) for k, v in returned.items()} + expected = json.loads(scope.model_dump_json()) + expected = {k: sorted(v) for k, v in expected.items()} + self.assertDictEqual( + returned, + expected, msg=f"Failed with permission string '{perm_string}'", ) def test_create_scope_from_admin_route_object(self) -> None: for admin_route_object, scope, index in zip(self.admin_route_objects, self.scopes, range(len(self.scopes))): - self.assertEqual( - create_scope_from_admin_route_object(admin_route_object).model_dump_json(), - scope.model_dump_json(), + returned = json.loads(create_scope_from_admin_route_object(admin_route_object).model_dump_json()) + returned = {k: sorted(v) for k, v in returned.items()} + expected = json.loads(scope.model_dump_json()) + expected = {k: sorted(v) for k, v in expected.items()} + self.assertDictEqual( + returned, + expected, msg=f"Failed with admin group object no. {index}", ) @@ -89,7 +98,7 @@ def test_create_string_from_scope(self) -> None: def test_create_admin_route_object_from_scope(self) -> None: for admin_route_object, scope, index in zip(self.admin_route_objects, self.scopes, range(len(self.scopes))): admin_route_object_full = self._resolve_prefixes_of_admin_route_object(admin_route_object) - self.assertEqual( + self.assertCountEqual( create_admin_route_object_from_scope(scope), admin_route_object_full, msg=f"Failed with admin group object no. {index}",