Skip to content

Commit

Permalink
chore: validate PermissionScope model (DEV-2784) (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum authored Oct 11, 2023
1 parent bfbf9f4 commit 569df84
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 38 deletions.
2 changes: 1 addition & 1 deletion dsp_permissions_scripts/models/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class BuiltinGroup(Enum):

UNKNOWN_USER = "http://www.knora.org/ontology/knora-admin#UnknownUser"
KNOWN_USER = "http://www.knora.org/ontology/knora-admin#KnownUser"
CREATOR = "http://www.knora.org/ontology/knora-admin#Creator"
PROJECT_MEMBER = "http://www.knora.org/ontology/knora-admin#ProjectMember"
PROJECT_ADMIN = "http://www.knora.org/ontology/knora-admin#ProjectAdmin"
CREATOR = "http://www.knora.org/ontology/knora-admin#Creator"
SYSTEM_ADMIN = "http://www.knora.org/ontology/knora-admin#SystemAdmin"
70 changes: 60 additions & 10 deletions dsp_permissions_scripts/models/scope.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from pydantic import BaseModel
from __future__ import annotations

from typing import Iterable, Literal

from pydantic import BaseModel, ConfigDict, model_validator

from dsp_permissions_scripts.models.groups import BuiltinGroup

Expand All @@ -8,16 +12,62 @@ class PermissionScope(BaseModel):
A scope is an object encoding the information:
"Which user group gets which permissions on a resource/value?"
"""
model_config = ConfigDict(frozen=True)

CR: frozenset[str | BuiltinGroup] = frozenset()
D: frozenset[str | BuiltinGroup] = frozenset()
M: frozenset[str | BuiltinGroup] = frozenset()
V: frozenset[str | BuiltinGroup] = frozenset()
RV: frozenset[str | BuiltinGroup] = frozenset()

@staticmethod
def create(
CR: Iterable[str | BuiltinGroup] = (),
D: Iterable[str | BuiltinGroup] = (),
M: Iterable[str | BuiltinGroup] = (),
V: Iterable[str | BuiltinGroup] = (),
RV: Iterable[str | BuiltinGroup] = (),
) -> PermissionScope:
"""Factory method to create a PermissionScope from Iterables instead of frozensets."""
return PermissionScope(
CR=frozenset(CR),
D=frozenset(D),
M=frozenset(M),
V=frozenset(V),
RV=frozenset(RV),
)

@model_validator(mode="after")
def check_group_occurs_only_once(self):
all_groups = []
for field in self.model_fields:
all_groups.extend(getattr(self, field))
all_groups_as_strs = [g.value if isinstance(g, BuiltinGroup) else g for g in all_groups]
for group in all_groups_as_strs:
if all_groups_as_strs.count(group) > 1:
raise ValueError(f"Group {group} must not occur in more than one field")
return self

CR: list[str | BuiltinGroup] = []
D: list[str | BuiltinGroup] = []
M: list[str | BuiltinGroup] = []
V: list[str | BuiltinGroup] = []
RV: list[str | BuiltinGroup] = []
def add(
self,
permission: Literal["CR", "D", "M", "V", "RV"],
group: str | BuiltinGroup,
) -> PermissionScope:
"""Return a copy of the PermissionScope instance with group added to permission."""
groups = [g.value if isinstance(g, BuiltinGroup) else g for g in getattr(self, permission)]
group = group.value if isinstance(group, BuiltinGroup) else group
if group in groups:
raise ValueError(f"Group '{group}' is already in permission '{permission}'")
groups.append(group)
kwargs: dict[str, list[str]] = {permission: groups}
for perm in ["CR", "D", "M", "V", "RV"]:
if perm != permission:
kwargs[perm] = getattr(self, perm)
return PermissionScope.create(**kwargs)


PUBLIC = PermissionScope(
CR=[BuiltinGroup.PROJECT_ADMIN],
D=[BuiltinGroup.CREATOR, BuiltinGroup.PROJECT_MEMBER],
V=[BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER],
PUBLIC = PermissionScope.create(
CR={BuiltinGroup.PROJECT_ADMIN},
D={BuiltinGroup.CREATOR, BuiltinGroup.PROJECT_MEMBER},
V={BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER},
)
2 changes: 1 addition & 1 deletion dsp_permissions_scripts/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
def modify_oaps(oaps: list[Oap]) -> list[Oap]:
"""Adapt this sample to your needs."""
for oap in oaps:
oap.scope.CR.append(BuiltinGroup.SYSTEM_ADMIN)
oap.scope = oap.scope.add("CR", BuiltinGroup.SYSTEM_ADMIN)
return oaps


Expand Down
23 changes: 23 additions & 0 deletions dsp_permissions_scripts/utils/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dsp_permissions_scripts.models.groups import BuiltinGroup


def dereference_prefix(
identifier: str,
context: dict[str, str],
) -> str:
prefix, actual_id = identifier.split(":")
return context[prefix] + actual_id


def _get_sort_pos_of_custom_group(group: str) -> int:
alphabet = list("abcdefghijklmnopqrstuvwxyz")
relevant_letter = group.replace("http://www.knora.org/ontology/knora-admin#", "")[0]
return alphabet.index(relevant_letter.lower()) + 99 # must be higher than the highest index of the builtin groups


def sort_groups(groups_original: list[str]) -> list[str]:
"""Sorts groups, first according to their power (most powerful first), then alphabetically."""
sort_key = list(reversed([x.value for x in BuiltinGroup]))
groups = groups_original.copy()
groups.sort(key=lambda x: sort_key.index(x) if x in sort_key else _get_sort_pos_of_custom_group(x))
return groups
8 changes: 2 additions & 6 deletions dsp_permissions_scripts/utils/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dsp_permissions_scripts.models.oap import Oap
from dsp_permissions_scripts.utils.authentication import get_protocol
from dsp_permissions_scripts.utils.get_logger import get_logger, get_timestamp
from dsp_permissions_scripts.utils.helpers import dereference_prefix
from dsp_permissions_scripts.utils.scope_serialization import create_scope_from_string

logger = get_logger(__name__)
Expand Down Expand Up @@ -61,15 +62,10 @@ def _get_class_iris_of_onto(
all_entities = response.json()["@graph"]
context = response.json()["@context"]
class_ids = [c["@id"] for c in all_entities if c.get("knora-api:isResourceClass")]
class_iris = [_dereference_prefix(class_id, context) for class_id in class_ids]
class_iris = [dereference_prefix(class_id, context) for class_id in class_ids]
return class_iris


def _dereference_prefix(identifier: str, context: dict[str, str]) -> str:
prefix, actual_id = identifier.split(":")
return context[prefix] + actual_id


def _get_all_resource_oaps_of_resclass(
host: str,
resclass_iri: str,
Expand Down
3 changes: 2 additions & 1 deletion dsp_permissions_scripts/utils/scope_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dsp_permissions_scripts.models.groups import BuiltinGroup
from dsp_permissions_scripts.models.scope import PermissionScope
from dsp_permissions_scripts.utils.helpers import sort_groups


def create_string_from_scope(perm_scope: PermissionScope) -> str:
Expand All @@ -10,7 +11,7 @@ def create_string_from_scope(perm_scope: PermissionScope) -> str:
for perm_letter, groups in perm_scope.model_dump().items():
if groups:
groups_as_str = [g.value if isinstance(g, BuiltinGroup) else g for g in groups]
as_dict[perm_letter] = groups_as_str
as_dict[perm_letter] = sort_groups(groups_as_str)
strs = [f"{k} {','.join(l)}" for k, l in as_dict.items()]
return "|".join(strs)

Expand Down
37 changes: 37 additions & 0 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import unittest

from dsp_permissions_scripts.models.groups import BuiltinGroup
from dsp_permissions_scripts.utils.helpers import sort_groups


class TestHelpers(unittest.TestCase):

def test_sort_groups(self) -> None:
groups_original = [
"http://www.knora.org/ontology/knora-admin#C_CustomGroup",
BuiltinGroup.UNKNOWN_USER.value,
BuiltinGroup.PROJECT_ADMIN.value,
BuiltinGroup.PROJECT_MEMBER.value,
BuiltinGroup.CREATOR.value,
"http://www.knora.org/ontology/knora-admin#A_CustomGroup",
"http://www.knora.org/ontology/knora-admin#B_CustomGroup",
BuiltinGroup.KNOWN_USER.value,
BuiltinGroup.SYSTEM_ADMIN.value,
]
groups_expected = [
BuiltinGroup.SYSTEM_ADMIN.value,
BuiltinGroup.CREATOR.value,
BuiltinGroup.PROJECT_ADMIN.value,
BuiltinGroup.PROJECT_MEMBER.value,
BuiltinGroup.KNOWN_USER.value,
BuiltinGroup.UNKNOWN_USER.value,
"http://www.knora.org/ontology/knora-admin#A_CustomGroup",
"http://www.knora.org/ontology/knora-admin#B_CustomGroup",
"http://www.knora.org/ontology/knora-admin#C_CustomGroup",
]
groups_returned = sort_groups(groups_original=groups_original)
self.assertEqual(groups_returned, groups_expected)


if __name__ == "__main__":
unittest.main()
52 changes: 52 additions & 0 deletions tests/test_scope.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import unittest

from dsp_permissions_scripts.models.groups import BuiltinGroup
from dsp_permissions_scripts.models.scope import PermissionScope


class TestScope(unittest.TestCase):

def test_scope_validation_on_creation(self) -> None:
with self.assertRaisesRegex(ValueError, "must not occur in more than one field"):
PermissionScope.create(
CR={BuiltinGroup.PROJECT_ADMIN},
D={BuiltinGroup.PROJECT_ADMIN},
V={BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER},
)

def test_scope_validation_on_add_to_same_permission(self) -> None:
scope = PermissionScope.create(
CR={BuiltinGroup.PROJECT_ADMIN},
V={BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER},
)
with self.assertRaisesRegex(
ValueError,
"Group 'http://www.knora.org/ontology/knora-admin#ProjectAdmin' is already in permission 'CR'"
):
_ = scope.add("CR", BuiltinGroup.PROJECT_ADMIN)

def test_scope_validation_on_add_to_different_permission(self) -> None:
scope = PermissionScope.create(
CR={BuiltinGroup.PROJECT_ADMIN},
V={BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER},
)
with self.assertRaisesRegex(ValueError, "must not occur in more than one field"):
_ = scope.add("RV", BuiltinGroup.PROJECT_ADMIN)

def test_add_to_scope(self) -> None:
scope = PermissionScope.create(
D={BuiltinGroup.SYSTEM_ADMIN},
M={BuiltinGroup.PROJECT_MEMBER, BuiltinGroup.KNOWN_USER},
)
scope_added = scope.add("CR", BuiltinGroup.PROJECT_ADMIN)
self.assertEqual(
scope_added.model_dump_json(),
PermissionScope.create(
CR={BuiltinGroup.PROJECT_ADMIN},
D={BuiltinGroup.SYSTEM_ADMIN},
M={BuiltinGroup.PROJECT_MEMBER, BuiltinGroup.KNOWN_USER},
).model_dump_json(),
)

if __name__ == "__main__":
unittest.main()
47 changes: 28 additions & 19 deletions tests/test_scope_serialization.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import unittest
from typing import Any

Expand Down Expand Up @@ -41,39 +42,47 @@ class TestScopeSerialization(unittest.TestCase):
],
]
scopes = [
PermissionScope(
PermissionScope.create(
CR=[BuiltinGroup.SYSTEM_ADMIN],
V=["http://www.knora.org/ontology/knora-admin#CustomGroup"],
),
PermissionScope(
D=[BuiltinGroup.PROJECT_ADMIN],
RV=[BuiltinGroup.PROJECT_MEMBER],
PermissionScope.create(
D={BuiltinGroup.PROJECT_ADMIN},
RV={BuiltinGroup.PROJECT_MEMBER},
),
PermissionScope(
M=[BuiltinGroup.PROJECT_ADMIN],
V=[BuiltinGroup.CREATOR, BuiltinGroup.KNOWN_USER],
RV=[BuiltinGroup.UNKNOWN_USER],
PermissionScope.create(
M={BuiltinGroup.PROJECT_ADMIN},
V={BuiltinGroup.CREATOR, BuiltinGroup.KNOWN_USER},
RV={BuiltinGroup.UNKNOWN_USER},
),
PermissionScope(
CR=[BuiltinGroup.SYSTEM_ADMIN, BuiltinGroup.PROJECT_ADMIN],
D=[BuiltinGroup.CREATOR],
RV=[BuiltinGroup.UNKNOWN_USER],
PermissionScope.create(
CR={BuiltinGroup.SYSTEM_ADMIN, BuiltinGroup.PROJECT_ADMIN},
D={BuiltinGroup.CREATOR},
RV={BuiltinGroup.UNKNOWN_USER},
),
]

def test_create_scope_from_string(self) -> None:
for perm_string, scope in zip(self.perm_strings, self.scopes):
self.assertEqual(
create_scope_from_string(perm_string).model_dump_json(),
scope.model_dump_json(),
returned = json.loads(create_scope_from_string(perm_string).model_dump_json())
returned = {k: sorted(v) for k, v in returned.items()}
expected = json.loads(scope.model_dump_json())
expected = {k: sorted(v) for k, v in expected.items()}
self.assertDictEqual(
returned,
expected,
msg=f"Failed with permission string '{perm_string}'",
)

def test_create_scope_from_admin_route_object(self) -> None:
for admin_route_object, scope, index in zip(self.admin_route_objects, self.scopes, range(len(self.scopes))):
self.assertEqual(
create_scope_from_admin_route_object(admin_route_object).model_dump_json(),
scope.model_dump_json(),
returned = json.loads(create_scope_from_admin_route_object(admin_route_object).model_dump_json())
returned = {k: sorted(v) for k, v in returned.items()}
expected = json.loads(scope.model_dump_json())
expected = {k: sorted(v) for k, v in expected.items()}
self.assertDictEqual(
returned,
expected,
msg=f"Failed with admin group object no. {index}",
)

Expand All @@ -89,7 +98,7 @@ def test_create_string_from_scope(self) -> None:
def test_create_admin_route_object_from_scope(self) -> None:
for admin_route_object, scope, index in zip(self.admin_route_objects, self.scopes, range(len(self.scopes))):
admin_route_object_full = self._resolve_prefixes_of_admin_route_object(admin_route_object)
self.assertEqual(
self.assertCountEqual(
create_admin_route_object_from_scope(scope),
admin_route_object_full,
msg=f"Failed with admin group object no. {index}",
Expand Down

0 comments on commit 569df84

Please sign in to comment.