diff --git a/dsp_permissions_scripts/models/builtin_groups.py b/dsp_permissions_scripts/models/builtin_groups.py new file mode 100644 index 00000000..c04aa514 --- /dev/null +++ b/dsp_permissions_scripts/models/builtin_groups.py @@ -0,0 +1,6 @@ +UNKNOWN_USER = "http://www.knora.org/ontology/knora-admin#UnknownUser" +KNOWN_USER = "http://www.knora.org/ontology/knora-admin#KnownUser" +PROJECT_MEMBER = "http://www.knora.org/ontology/knora-admin#ProjectMember" +PROJECT_ADMIN = "http://www.knora.org/ontology/knora-admin#ProjectAdmin" +CREATOR = "http://www.knora.org/ontology/knora-admin#Creator" +SYSTEM_ADMIN = "http://www.knora.org/ontology/knora-admin#SystemAdmin" diff --git a/dsp_permissions_scripts/models/doap.py b/dsp_permissions_scripts/models/doap.py index 1dfdfea5..7a3e8e6e 100644 --- a/dsp_permissions_scripts/models/doap.py +++ b/dsp_permissions_scripts/models/doap.py @@ -20,9 +20,9 @@ class Doap(BaseModel): class DoapTarget(BaseModel): project: str - group: str | None - resource_class: str | None - property: str | None + group: str | None = None + resource_class: str | None = None + property: str | None = None @model_validator(mode="after") def assert_correct_combination(self) -> Self: diff --git a/dsp_permissions_scripts/models/groups.py b/dsp_permissions_scripts/models/groups.py deleted file mode 100644 index 2e0ad89d..00000000 --- a/dsp_permissions_scripts/models/groups.py +++ /dev/null @@ -1,14 +0,0 @@ -from enum import Enum - - -class BuiltinGroup(Enum): - """ - Enumeration of the built in DSP user groups. - """ - - UNKNOWN_USER = "http://www.knora.org/ontology/knora-admin#UnknownUser" - KNOWN_USER = "http://www.knora.org/ontology/knora-admin#KnownUser" - PROJECT_MEMBER = "http://www.knora.org/ontology/knora-admin#ProjectMember" - PROJECT_ADMIN = "http://www.knora.org/ontology/knora-admin#ProjectAdmin" - CREATOR = "http://www.knora.org/ontology/knora-admin#Creator" - SYSTEM_ADMIN = "http://www.knora.org/ontology/knora-admin#SystemAdmin" diff --git a/dsp_permissions_scripts/models/scope.py b/dsp_permissions_scripts/models/scope.py index 111b1365..72efd459 100644 --- a/dsp_permissions_scripts/models/scope.py +++ b/dsp_permissions_scripts/models/scope.py @@ -4,7 +4,7 @@ from pydantic import BaseModel, ConfigDict, model_validator -from dsp_permissions_scripts.models.groups import BuiltinGroup +from dsp_permissions_scripts.models import builtin_groups class PermissionScope(BaseModel): @@ -14,19 +14,19 @@ class PermissionScope(BaseModel): """ model_config = ConfigDict(frozen=True) - CR: frozenset[str | BuiltinGroup] = frozenset() - D: frozenset[str | BuiltinGroup] = frozenset() - M: frozenset[str | BuiltinGroup] = frozenset() - V: frozenset[str | BuiltinGroup] = frozenset() - RV: frozenset[str | BuiltinGroup] = frozenset() + CR: frozenset[str] = frozenset() + D: frozenset[str] = frozenset() + M: frozenset[str] = frozenset() + V: frozenset[str] = frozenset() + RV: frozenset[str] = frozenset() @staticmethod def create( - CR: Iterable[str | BuiltinGroup] = (), - D: Iterable[str | BuiltinGroup] = (), - M: Iterable[str | BuiltinGroup] = (), - V: Iterable[str | BuiltinGroup] = (), - RV: Iterable[str | BuiltinGroup] = (), + CR: Iterable[str] = (), + D: Iterable[str] = (), + M: Iterable[str] = (), + V: Iterable[str] = (), + RV: Iterable[str] = (), ) -> PermissionScope: """Factory method to create a PermissionScope from Iterables instead of frozensets.""" return PermissionScope( @@ -42,23 +42,21 @@ def check_group_occurs_only_once(self): all_groups = [] for field in self.model_fields: all_groups.extend(getattr(self, field)) - all_groups_as_strs = [g.value if isinstance(g, BuiltinGroup) else g for g in all_groups] - for group in all_groups_as_strs: - if all_groups_as_strs.count(group) > 1: + for group in all_groups: + if all_groups.count(group) > 1: raise ValueError(f"Group {group} must not occur in more than one field") return self def add( self, permission: Literal["CR", "D", "M", "V", "RV"], - group: str | BuiltinGroup, + group: str, ) -> PermissionScope: """Return a copy of the PermissionScope instance with group added to permission.""" - groups = [g.value if isinstance(g, BuiltinGroup) else g for g in getattr(self, permission)] - group = group.value if isinstance(group, BuiltinGroup) else group + groups = getattr(self, permission) if group in groups: raise ValueError(f"Group '{group}' is already in permission '{permission}'") - groups.append(group) + groups = groups | {group} kwargs: dict[str, list[str]] = {permission: groups} for perm in ["CR", "D", "M", "V", "RV"]: if perm != permission: @@ -68,14 +66,13 @@ def add( def remove( self, permission: Literal["CR", "D", "M", "V", "RV"], - group: str | BuiltinGroup, + group: str, ) -> PermissionScope: """Return a copy of the PermissionScope instance with group removed from permission.""" - groups = [g.value if isinstance(g, BuiltinGroup) else g for g in getattr(self, permission)] - group = group.value if isinstance(group, BuiltinGroup) else group + groups = getattr(self, permission) if group not in groups: raise ValueError(f"Group '{group}' is not in permission '{permission}'") - groups.remove(group) + groups = groups - {group} kwargs: dict[str, list[str]] = {permission: groups} for perm in ["CR", "D", "M", "V", "RV"]: if perm != permission: @@ -84,7 +81,7 @@ def remove( PUBLIC = PermissionScope.create( - CR={BuiltinGroup.PROJECT_ADMIN}, - D={BuiltinGroup.CREATOR, BuiltinGroup.PROJECT_MEMBER}, - V={BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER}, + CR={builtin_groups.PROJECT_ADMIN}, + D={builtin_groups.CREATOR, builtin_groups.PROJECT_MEMBER}, + V={builtin_groups.UNKNOWN_USER, builtin_groups.KNOWN_USER}, ) diff --git a/dsp_permissions_scripts/template.py b/dsp_permissions_scripts/template.py index 37f6f055..2c7eec4c 100644 --- a/dsp_permissions_scripts/template.py +++ b/dsp_permissions_scripts/template.py @@ -1,7 +1,7 @@ from dotenv import load_dotenv +from dsp_permissions_scripts.models import builtin_groups from dsp_permissions_scripts.models.doap import Doap -from dsp_permissions_scripts.models.groups import BuiltinGroup from dsp_permissions_scripts.models.host import Hosts from dsp_permissions_scripts.models.oap import Oap from dsp_permissions_scripts.models.scope import PUBLIC @@ -14,85 +14,96 @@ from dsp_permissions_scripts.utils.project import get_all_resource_oaps_of_project -def modify_oaps(oaps: list[Oap]) -> list[Oap]: - """Adapt this sample to your needs.""" - for oap in oaps: - if BuiltinGroup.SYSTEM_ADMIN.value not in oap.scope.CR: - oap.scope = oap.scope.add("CR", BuiltinGroup.SYSTEM_ADMIN) - return oaps - - def modify_doaps(doaps: list[Doap]) -> list[Doap]: """Adapt this sample to your needs.""" for doap in doaps: - if doap.target.group in [BuiltinGroup.PROJECT_MEMBER.value, BuiltinGroup.PROJECT_ADMIN.value]: + if doap.target.group in [builtin_groups.PROJECT_MEMBER, builtin_groups.PROJECT_ADMIN]: doap.scope = PUBLIC return doaps -def update_oaps( +def modify_oaps(oaps: list[Oap]) -> list[Oap]: + """Adapt this sample to your needs.""" + for oap in oaps: + if builtin_groups.SYSTEM_ADMIN not in oap.scope.CR: + oap.scope = oap.scope.add("CR", builtin_groups.SYSTEM_ADMIN) + return oaps + + +def update_doaps( host: str, shortcode: str, token: str, ) -> None: - """Sample function to modify the Object Access Permissions of a project.""" - resource_oaps = get_all_resource_oaps_of_project( - shortcode=shortcode, + """Sample function to modify the Default Object Access Permissions of a project.""" + project_doaps = get_doaps_of_project( host=host, + shortcode=shortcode, token=token, ) - serialize_resource_oaps( - resource_oaps=resource_oaps, + serialize_doaps_of_project( + project_doaps=project_doaps, shortcode=shortcode, mode="original", ) - resource_oaps_updated = modify_oaps(oaps=resource_oaps) - serialize_resource_oaps( - resource_oaps=resource_oaps_updated, - shortcode=shortcode, - mode="modified", + project_doaps_modified = modify_doaps(doaps=project_doaps) + apply_updated_doaps_on_server( + doaps=project_doaps_modified, + host=host, + token=token, ) - apply_updated_oaps_on_server( - resource_oaps=resource_oaps_updated, + project_doaps_updated = get_doaps_of_project( host=host, + shortcode=shortcode, token=token, ) + serialize_doaps_of_project( + project_doaps=project_doaps_updated, + shortcode=shortcode, + mode="modified", + ) -def update_doaps( +def update_oaps( host: str, shortcode: str, token: str, ) -> None: - """Sample function to modify the Default Object Access Permissions of a project.""" - project_doaps = get_doaps_of_project( - host=host, + """Sample function to modify the Object Access Permissions of a project.""" + resource_oaps = get_all_resource_oaps_of_project( shortcode=shortcode, + host=host, token=token, ) - serialize_doaps_of_project( - project_doaps=project_doaps, + serialize_resource_oaps( + resource_oaps=resource_oaps, shortcode=shortcode, mode="original", ) - project_doaps_updated = modify_doaps(doaps=project_doaps) - serialize_doaps_of_project( - project_doaps=project_doaps_updated, + resource_oaps_modified = modify_oaps(oaps=resource_oaps) + apply_updated_oaps_on_server( + resource_oaps=resource_oaps_modified, + host=host, + token=token, shortcode=shortcode, - mode="modified", ) - apply_updated_doaps_on_server( - doaps=project_doaps_updated, + resource_oaps_updated = get_all_resource_oaps_of_project( + shortcode=shortcode, host=host, token=token, ) + serialize_resource_oaps( + resource_oaps=resource_oaps_updated, + shortcode=shortcode, + mode="modified", + ) def main() -> None: """ The main function provides you with 2 sample functions: - one to update the Object Access Permissions of a project, - and one to update the Default Object Access Permissions of a project. + one to update the Default Object Access Permissions of a project, + and one to update the Object Access Permissions of a project. Both must first be adapted to your needs. """ load_dotenv() # set login credentials from .env file as environment variables @@ -100,12 +111,12 @@ def main() -> None: shortcode = "F18E" token = login(host) - update_oaps( + update_doaps( host=host, shortcode=shortcode, token=token, ) - update_doaps( + update_oaps( host=host, shortcode=shortcode, token=token, diff --git a/dsp_permissions_scripts/utils/doap_get.py b/dsp_permissions_scripts/utils/doap_get.py index 28faab83..97c629f9 100644 --- a/dsp_permissions_scripts/utils/doap_get.py +++ b/dsp_permissions_scripts/utils/doap_get.py @@ -75,7 +75,7 @@ def get_doaps_of_project( host: str, shortcode: str, token: str, - target: DoapTargetType = DoapTargetType.ALL, + target_type: DoapTargetType = DoapTargetType.ALL, ) -> list[Doap]: """ Returns the doaps for a project. @@ -94,7 +94,7 @@ def get_doaps_of_project( ) filtered_doaps = _filter_doaps_by_target( doaps=doaps, - target=target, + target=target_type, ) - logger.info(f"Found {len(doaps)} DOAPs, {len(filtered_doaps)} of which are related to {target}.") + logger.info(f"Found {len(doaps)} DOAPs, {len(filtered_doaps)} of which are related to {target_type}.") return filtered_doaps diff --git a/dsp_permissions_scripts/utils/doap_serialize.py b/dsp_permissions_scripts/utils/doap_serialize.py index e15889ac..e1b44f3d 100644 --- a/dsp_permissions_scripts/utils/doap_serialize.py +++ b/dsp_permissions_scripts/utils/doap_serialize.py @@ -16,14 +16,14 @@ def serialize_doaps_of_project( project_doaps: list[Doap], shortcode: str, mode: Literal["original", "modified"], - target: DoapTargetType = DoapTargetType.ALL, + target_type: DoapTargetType = DoapTargetType.ALL, ) -> None: """Serialize the DOAPs of a project to a JSON file.""" filepath = _get_file_path(shortcode, mode) filepath.parent.mkdir(parents=True, exist_ok=True) explanation_string = f"Project {shortcode} has {len(project_doaps)} DOAPs" - if target != DoapTargetType.ALL: - explanation_string += f" which are related to a {target}" + if target_type != DoapTargetType.ALL: + explanation_string += f" which are related to a {target_type}" doaps_as_dicts = [doap.model_dump(exclude_none=True, mode="json") for doap in project_doaps] doaps_as_dict = {explanation_string: doaps_as_dicts} with open(filepath, mode="w", encoding="utf-8") as f: @@ -38,5 +38,5 @@ def deserialize_doaps_of_project( filepath = _get_file_path(shortcode, mode) with open(filepath, mode="r", encoding="utf-8") as f: doaps_as_dict = json.load(f) - doaps_as_dicts = doaps_as_dict.values()[0] + doaps_as_dicts = list(doaps_as_dict.values())[0] return [Doap.model_validate(d) for d in doaps_as_dicts] diff --git a/dsp_permissions_scripts/utils/doap_set.py b/dsp_permissions_scripts/utils/doap_set.py index c946cc13..6dcf668b 100644 --- a/dsp_permissions_scripts/utils/doap_set.py +++ b/dsp_permissions_scripts/utils/doap_set.py @@ -1,3 +1,4 @@ +import warnings from typing import Literal from urllib.parse import quote_plus @@ -65,11 +66,16 @@ def apply_updated_doaps_on_server( print(f"\n{heading}\n{'=' * len(heading)}\n") for d in doaps: _log_and_print_doap_update(doap=d, state="before") - new_doap = _update_doap_scope( - doap_iri=d.doap_iri, - scope=d.scope, - host=host, - token=token, - ) - _log_and_print_doap_update(doap=new_doap, state="after") + try: + new_doap = _update_doap_scope( + doap_iri=d.doap_iri, + scope=d.scope, + host=host, + token=token, + ) + _log_and_print_doap_update(doap=new_doap, state="after") + except Exception: # pylint: disable=broad-exception-caught + logger.error(f"ERROR while updating DOAP {d.doap_iri}", exc_info=True) + warnings.warn(f"ERROR while updating DOAP {d.doap_iri}") + print(f"{get_timestamp()}: All DOAPs have been updated.") diff --git a/dsp_permissions_scripts/utils/helpers.py b/dsp_permissions_scripts/utils/helpers.py index 3f631381..1fda24e7 100644 --- a/dsp_permissions_scripts/utils/helpers.py +++ b/dsp_permissions_scripts/utils/helpers.py @@ -1,4 +1,4 @@ -from dsp_permissions_scripts.models.groups import BuiltinGroup +from dsp_permissions_scripts.models import builtin_groups def dereference_prefix( @@ -17,7 +17,14 @@ def _get_sort_pos_of_custom_group(group: str) -> int: def sort_groups(groups_original: list[str]) -> list[str]: """Sorts groups, first according to their power (most powerful first), then alphabetically.""" - sort_key = list(reversed([x.value for x in BuiltinGroup])) + sort_key = [ + builtin_groups.SYSTEM_ADMIN, + builtin_groups.CREATOR, + builtin_groups.PROJECT_ADMIN, + builtin_groups.PROJECT_MEMBER, + builtin_groups.KNOWN_USER, + builtin_groups.UNKNOWN_USER + ] groups = groups_original.copy() groups.sort(key=lambda x: sort_key.index(x) if x in sort_key else _get_sort_pos_of_custom_group(x)) return groups diff --git a/dsp_permissions_scripts/utils/oap.py b/dsp_permissions_scripts/utils/oap.py index 070d6c9a..fe4a6822 100644 --- a/dsp_permissions_scripts/utils/oap.py +++ b/dsp_permissions_scripts/utils/oap.py @@ -1,4 +1,5 @@ import json +import warnings from typing import Any from urllib.parse import quote_plus @@ -179,23 +180,51 @@ def _update_permissions_for_resource_and_values( ) +def _write_failed_res_iris_to_file( + failed_res_iris: list[str], + shortcode: str, + host: str, + filename: str, +) -> None: + with open(filename, "w", encoding="utf-8") as f: + f.write(f"Failed to update the OAPs of the following resources in project {shortcode} on host {host}:\n") + f.write("\n".join(failed_res_iris)) + + def apply_updated_oaps_on_server( resource_oaps: list[Oap], host: str, token: str, + shortcode: str, ) -> None: """Applies object access permissions on a DSP server.""" logger.info("******* Applying updated object access permissions on server *******") print(f"{get_timestamp()}: ******* Applying updated object access permissions on server *******") + failed_res_iris: list[str] = [] for index, resource_oap in enumerate(resource_oaps): msg = f"Updating permissions of resource {index + 1}/{len(resource_oaps)}: {resource_oap.object_iri}..." - logger.info("=====") - logger.info(msg) + logger.info(f"=====\n{msg}") print(f"{get_timestamp()}: {msg}") - _update_permissions_for_resource_and_values( - resource_iri=resource_oap.object_iri, - scope=resource_oap.scope, + try: + _update_permissions_for_resource_and_values( + resource_iri=resource_oap.object_iri, + scope=resource_oap.scope, + host=host, + token=token, + ) + except Exception: # pylint: disable=broad-exception-caught + logger.error(f"ERROR while updating permissions of resource {resource_oap.object_iri}", exc_info=True) + warnings.warn(f"ERROR while updating permissions of resource {resource_oap.object_iri}") + failed_res_iris.append(resource_oap.object_iri) + logger.info(f"Updated permissions of resource {resource_oap.object_iri} and its values.") + + if failed_res_iris: + filename = "FAILED_RESOURCES.txt" + _write_failed_res_iris_to_file( + failed_res_iris=failed_res_iris, + shortcode=shortcode, host=host, - token=token, + filename=filename, ) - logger.info(f"Updated permissions of resource {resource_oap.object_iri} and its values.") + logger.error(f"ERROR: {len(failed_res_iris)} resources could not be updated. They were written to {filename}.") + warnings.warn(f"ERROR: {len(failed_res_iris)} resources could not be updated. They were written to {filename}.") diff --git a/tests/test_doap_serialization.py b/tests/test_doap_serialization.py new file mode 100644 index 00000000..bab79fe0 --- /dev/null +++ b/tests/test_doap_serialization.py @@ -0,0 +1,65 @@ +import shutil +import unittest +from pathlib import Path + +from dsp_permissions_scripts.models import builtin_groups +from dsp_permissions_scripts.models.doap import Doap, DoapTarget +from dsp_permissions_scripts.models.scope import PermissionScope +from dsp_permissions_scripts.utils.doap_serialize import ( + deserialize_doaps_of_project, + serialize_doaps_of_project, +) +from tests.test_scope_serialization import compare_scopes + + +class TestDoapSerialization(unittest.TestCase): + shortcode = "1234" + + def tearDown(self) -> None: + testdata_dir = Path(f"project_data/{self.shortcode}") + if testdata_dir.is_dir(): + shutil.rmtree(testdata_dir) + + def test_doap_serialization(self): + doap1 = Doap( + target=DoapTarget( + project="http://rdfh.ch/projects/MsOaiQkcQ7-QPxsYBKckfQ", + group=builtin_groups.PROJECT_ADMIN, + ), + scope=PermissionScope.create( + CR=[builtin_groups.PROJECT_ADMIN], + V=[builtin_groups.PROJECT_MEMBER], + ), + doap_iri="http://rdfh.ch/doap-1", + ) + doap2 = Doap( + target=DoapTarget( + project="http://rdfh.ch/projects/MsOaiQkcQ7-QPxsYBKckfQ", + group=builtin_groups.PROJECT_MEMBER, + ), + scope=PermissionScope.create( + D=[builtin_groups.SYSTEM_ADMIN], + M=[builtin_groups.KNOWN_USER], + ), + doap_iri="http://rdfh.ch/doap-2", + ) + serialize_doaps_of_project( + project_doaps=[doap1, doap2], + shortcode=self.shortcode, + mode="original", + ) + deserialized_doaps = deserialize_doaps_of_project( + shortcode=self.shortcode, + mode="original", + ) + self._compare_doaps(deserialized_doaps[0], doap1) + self._compare_doaps(deserialized_doaps[1], doap2) + + def _compare_doaps(self, doap1: Doap, doap2: Doap) -> None: + self.assertEqual(doap1.target, doap2.target) + compare_scopes(doap1.scope, doap2.scope) + self.assertEqual(doap1.doap_iri, doap2.doap_iri) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_helpers.py b/tests/test_helpers.py index e741c6d7..4be5c18f 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -1,6 +1,6 @@ import unittest -from dsp_permissions_scripts.models.groups import BuiltinGroup +from dsp_permissions_scripts.models import builtin_groups from dsp_permissions_scripts.utils.helpers import sort_groups @@ -9,22 +9,22 @@ class TestHelpers(unittest.TestCase): def test_sort_groups(self) -> None: groups_original = [ "http://www.knora.org/ontology/knora-admin#C_CustomGroup", - BuiltinGroup.UNKNOWN_USER.value, - BuiltinGroup.PROJECT_ADMIN.value, - BuiltinGroup.PROJECT_MEMBER.value, - BuiltinGroup.CREATOR.value, + builtin_groups.UNKNOWN_USER, + builtin_groups.PROJECT_ADMIN, + builtin_groups.PROJECT_MEMBER, + builtin_groups.CREATOR, "http://www.knora.org/ontology/knora-admin#A_CustomGroup", "http://www.knora.org/ontology/knora-admin#B_CustomGroup", - BuiltinGroup.KNOWN_USER.value, - BuiltinGroup.SYSTEM_ADMIN.value, + builtin_groups.KNOWN_USER, + builtin_groups.SYSTEM_ADMIN, ] groups_expected = [ - BuiltinGroup.SYSTEM_ADMIN.value, - BuiltinGroup.CREATOR.value, - BuiltinGroup.PROJECT_ADMIN.value, - BuiltinGroup.PROJECT_MEMBER.value, - BuiltinGroup.KNOWN_USER.value, - BuiltinGroup.UNKNOWN_USER.value, + builtin_groups.SYSTEM_ADMIN, + builtin_groups.CREATOR, + builtin_groups.PROJECT_ADMIN, + builtin_groups.PROJECT_MEMBER, + builtin_groups.KNOWN_USER, + builtin_groups.UNKNOWN_USER, "http://www.knora.org/ontology/knora-admin#A_CustomGroup", "http://www.knora.org/ontology/knora-admin#B_CustomGroup", "http://www.knora.org/ontology/knora-admin#C_CustomGroup", diff --git a/tests/test_oap_serialization.py b/tests/test_oap_serialization.py new file mode 100644 index 00000000..fa87b019 --- /dev/null +++ b/tests/test_oap_serialization.py @@ -0,0 +1,57 @@ +import shutil +import unittest +from pathlib import Path + +from dsp_permissions_scripts.models import builtin_groups +from dsp_permissions_scripts.models.oap import Oap +from dsp_permissions_scripts.models.scope import PermissionScope +from dsp_permissions_scripts.utils.oap_serialize import ( + deserialize_resource_oaps, + serialize_resource_oaps, +) +from tests.test_scope_serialization import compare_scopes + + +class TestOapSerialization(unittest.TestCase): + shortcode = "1234" + + def tearDown(self) -> None: + testdata_dir = Path(f"project_data/{self.shortcode}") + if testdata_dir.is_dir(): + shutil.rmtree(testdata_dir) + + def test_oap_serialization(self): + oap1 = Oap( + scope=PermissionScope.create( + CR=[builtin_groups.PROJECT_ADMIN], + V=[builtin_groups.PROJECT_MEMBER], + ), + object_iri=f"http://rdfh.ch/{self.shortcode}/resource-1", + ) + oap2 = Oap( + scope=PermissionScope.create( + D=[builtin_groups.SYSTEM_ADMIN], + M=[builtin_groups.KNOWN_USER], + ), + object_iri=f"http://rdfh.ch/{self.shortcode}/resource-2", + ) + serialize_resource_oaps( + resource_oaps=[oap1, oap2], + shortcode=self.shortcode, + mode="original", + ) + deserialized_oaps = deserialize_resource_oaps( + shortcode=self.shortcode, + mode="original", + ) + deserialized_oaps.sort(key=lambda oap: oap.object_iri) + self._compare_oaps(deserialized_oaps[0], oap1) + self._compare_oaps(deserialized_oaps[1], oap2) + + def _compare_oaps(self, oap1: Oap, oap2: Oap) -> None: + compare_scopes(oap1.scope, oap2.scope) + self.assertEqual(oap1.object_iri, oap2.object_iri) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_scope.py b/tests/test_scope.py index 69b79602..117f2e70 100644 --- a/tests/test_scope.py +++ b/tests/test_scope.py @@ -1,6 +1,6 @@ import unittest -from dsp_permissions_scripts.models.groups import BuiltinGroup +from dsp_permissions_scripts.models import builtin_groups from dsp_permissions_scripts.models.scope import PermissionScope from tests.test_scope_serialization import compare_scopes @@ -10,73 +10,73 @@ class TestScope(unittest.TestCase): def test_scope_validation_on_creation(self) -> None: with self.assertRaisesRegex(ValueError, "must not occur in more than one field"): PermissionScope.create( - CR={BuiltinGroup.PROJECT_ADMIN}, - D={BuiltinGroup.PROJECT_ADMIN}, - V={BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER}, + CR={builtin_groups.PROJECT_ADMIN}, + D={builtin_groups.PROJECT_ADMIN}, + V={builtin_groups.UNKNOWN_USER, builtin_groups.KNOWN_USER}, ) def test_scope_validation_on_add_to_same_permission(self) -> None: scope = PermissionScope.create( - CR={BuiltinGroup.PROJECT_ADMIN}, - V={BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER}, + CR={builtin_groups.PROJECT_ADMIN}, + V={builtin_groups.UNKNOWN_USER, builtin_groups.KNOWN_USER}, ) with self.assertRaisesRegex( ValueError, "Group 'http://www.knora.org/ontology/knora-admin#ProjectAdmin' is already in permission 'CR'" ): - _ = scope.add("CR", BuiltinGroup.PROJECT_ADMIN) + _ = scope.add("CR", builtin_groups.PROJECT_ADMIN) def test_scope_validation_on_add_to_different_permission(self) -> None: scope = PermissionScope.create( - CR={BuiltinGroup.PROJECT_ADMIN}, - V={BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER}, + CR={builtin_groups.PROJECT_ADMIN}, + V={builtin_groups.UNKNOWN_USER, builtin_groups.KNOWN_USER}, ) with self.assertRaisesRegex(ValueError, "must not occur in more than one field"): - _ = scope.add("RV", BuiltinGroup.PROJECT_ADMIN) + _ = scope.add("RV", builtin_groups.PROJECT_ADMIN) def test_add_to_scope(self) -> None: scope = PermissionScope.create( - D={BuiltinGroup.SYSTEM_ADMIN}, - M={BuiltinGroup.PROJECT_MEMBER, BuiltinGroup.KNOWN_USER}, + D={builtin_groups.SYSTEM_ADMIN}, + M={builtin_groups.PROJECT_MEMBER, builtin_groups.KNOWN_USER}, ) - scope_added = scope.add("CR", BuiltinGroup.PROJECT_ADMIN) + scope_added = scope.add("CR", builtin_groups.PROJECT_ADMIN) compare_scopes( scope1=scope_added, scope2=PermissionScope.create( - CR={BuiltinGroup.PROJECT_ADMIN}, - D={BuiltinGroup.SYSTEM_ADMIN}, - M={BuiltinGroup.PROJECT_MEMBER, BuiltinGroup.KNOWN_USER}, + CR={builtin_groups.PROJECT_ADMIN}, + D={builtin_groups.SYSTEM_ADMIN}, + M={builtin_groups.PROJECT_MEMBER, builtin_groups.KNOWN_USER}, ), ) def test_remove_inexisting_group(self) -> None: scope = PermissionScope.create( - D={BuiltinGroup.SYSTEM_ADMIN}, - M={BuiltinGroup.PROJECT_MEMBER, BuiltinGroup.KNOWN_USER}, + D={builtin_groups.SYSTEM_ADMIN}, + M={builtin_groups.PROJECT_MEMBER, builtin_groups.KNOWN_USER}, ) with self.assertRaisesRegex(ValueError, "is not in permission 'D'"): - _ = scope.remove("D", BuiltinGroup.UNKNOWN_USER) + _ = scope.remove("D", builtin_groups.UNKNOWN_USER) def test_remove_from_empty_perm(self) -> None: scope = PermissionScope.create( - D={BuiltinGroup.PROJECT_ADMIN}, - V={BuiltinGroup.PROJECT_MEMBER, BuiltinGroup.UNKNOWN_USER}, + D={builtin_groups.PROJECT_ADMIN}, + V={builtin_groups.PROJECT_MEMBER, builtin_groups.UNKNOWN_USER}, ) with self.assertRaisesRegex(ValueError, "is not in permission 'CR'"): - _ = scope.remove("CR", BuiltinGroup.PROJECT_ADMIN) + _ = scope.remove("CR", builtin_groups.PROJECT_ADMIN) def test_remove_from_scope(self) -> None: scope = PermissionScope.create( - CR={BuiltinGroup.PROJECT_ADMIN}, - D={BuiltinGroup.SYSTEM_ADMIN}, - M={BuiltinGroup.PROJECT_MEMBER, BuiltinGroup.KNOWN_USER}, + CR={builtin_groups.PROJECT_ADMIN}, + D={builtin_groups.SYSTEM_ADMIN}, + M={builtin_groups.PROJECT_MEMBER, builtin_groups.KNOWN_USER}, ) - scope_removed = scope.remove("CR", BuiltinGroup.PROJECT_ADMIN) + scope_removed = scope.remove("CR", builtin_groups.PROJECT_ADMIN) compare_scopes( scope1=scope_removed, scope2=PermissionScope.create( - D={BuiltinGroup.SYSTEM_ADMIN}, - M={BuiltinGroup.PROJECT_MEMBER, BuiltinGroup.KNOWN_USER}, + D={builtin_groups.SYSTEM_ADMIN}, + M={builtin_groups.PROJECT_MEMBER, builtin_groups.KNOWN_USER}, ), ) diff --git a/tests/test_scope_serialization.py b/tests/test_scope_serialization.py index 866d3bf5..7fb1965e 100644 --- a/tests/test_scope_serialization.py +++ b/tests/test_scope_serialization.py @@ -1,8 +1,7 @@ -import json import unittest from typing import Any -from dsp_permissions_scripts.models.groups import BuiltinGroup +from dsp_permissions_scripts.models import builtin_groups from dsp_permissions_scripts.models.scope import PermissionScope from dsp_permissions_scripts.utils.scope_serialization import ( create_admin_route_object_from_scope, @@ -17,9 +16,9 @@ def compare_scopes( scope2: PermissionScope, msg: str | None = None, ) -> None: - scope1_dict = json.loads(scope1.model_dump_json()) + scope1_dict = scope1.model_dump(mode="json") scope1_dict = {k: sorted(v) for k, v in scope1_dict.items()} - scope2_dict = json.loads(scope2.model_dump_json()) + scope2_dict = scope2.model_dump(mode="json") scope2_dict = {k: sorted(v) for k, v in scope2_dict.items()} unittest.TestCase().assertDictEqual(scope1_dict, scope2_dict, msg=msg) @@ -55,22 +54,22 @@ class TestScopeSerialization(unittest.TestCase): ] scopes = [ PermissionScope.create( - CR=[BuiltinGroup.SYSTEM_ADMIN], + CR=[builtin_groups.SYSTEM_ADMIN], V=["http://www.knora.org/ontology/knora-admin#CustomGroup"], ), PermissionScope.create( - D={BuiltinGroup.PROJECT_ADMIN}, - RV={BuiltinGroup.PROJECT_MEMBER}, + D={builtin_groups.PROJECT_ADMIN}, + RV={builtin_groups.PROJECT_MEMBER}, ), PermissionScope.create( - M={BuiltinGroup.PROJECT_ADMIN}, - V={BuiltinGroup.CREATOR, BuiltinGroup.KNOWN_USER}, - RV={BuiltinGroup.UNKNOWN_USER}, + M={builtin_groups.PROJECT_ADMIN}, + V={builtin_groups.CREATOR, builtin_groups.KNOWN_USER}, + RV={builtin_groups.UNKNOWN_USER}, ), PermissionScope.create( - CR={BuiltinGroup.SYSTEM_ADMIN, BuiltinGroup.PROJECT_ADMIN}, - D={BuiltinGroup.CREATOR}, - RV={BuiltinGroup.UNKNOWN_USER}, + CR={builtin_groups.SYSTEM_ADMIN, builtin_groups.PROJECT_ADMIN}, + D={builtin_groups.CREATOR}, + RV={builtin_groups.UNKNOWN_USER}, ), ]