From 89f99ed18c738adb043e35bf540b2a30ffd3892f Mon Sep 17 00:00:00 2001 From: Johannes Nussbaum <39048939+jnussbaum@users.noreply.github.com> Date: Fri, 29 Sep 2023 11:10:17 +0200 Subject: [PATCH] feat: new class to handle scopes (DEV-2726) (#14) --- dsp_permissions_scripts/main.py | 2 +- dsp_permissions_scripts/models/groups.py | 1 + dsp_permissions_scripts/models/permission.py | 17 +--- dsp_permissions_scripts/models/scope.py | 45 +++------ dsp_permissions_scripts/utils/permissions.py | 78 +++++---------- dsp_permissions_scripts/utils/project.py | 12 +-- .../utils/scope_serialization.py | 61 ++++++++++++ tests/test_scope_serialization.py | 97 +++++++++++++++++++ 8 files changed, 207 insertions(+), 106 deletions(-) create mode 100644 dsp_permissions_scripts/utils/scope_serialization.py create mode 100644 tests/test_scope_serialization.py diff --git a/dsp_permissions_scripts/main.py b/dsp_permissions_scripts/main.py index 5aa8af40..c76946c0 100644 --- a/dsp_permissions_scripts/main.py +++ b/dsp_permissions_scripts/main.py @@ -28,7 +28,7 @@ def main() -> None: host=host, ) token = login(host) - + new_scope = StandardScope().PUBLIC groups = [BuiltinGroup.PROJECT_ADMIN, BuiltinGroup.PROJECT_MEMBER] diff --git a/dsp_permissions_scripts/models/groups.py b/dsp_permissions_scripts/models/groups.py index c24790b8..38de52a2 100644 --- a/dsp_permissions_scripts/models/groups.py +++ b/dsp_permissions_scripts/models/groups.py @@ -11,4 +11,5 @@ class BuiltinGroup(Enum): CREATOR = "http://www.knora.org/ontology/knora-admin#Creator" PROJECT_MEMBER = "http://www.knora.org/ontology/knora-admin#ProjectMember" PROJECT_ADMIN = "http://www.knora.org/ontology/knora-admin#ProjectAdmin" + SYSTEM_USER = "http://www.knora.org/ontology/knora-admin#SystemUser" SYSTEM_ADMIN = "http://www.knora.org/ontology/knora-admin#SystemAdmin" diff --git a/dsp_permissions_scripts/models/permission.py b/dsp_permissions_scripts/models/permission.py index b47021c4..a98a6947 100644 --- a/dsp_permissions_scripts/models/permission.py +++ b/dsp_permissions_scripts/models/permission.py @@ -1,20 +1,9 @@ from enum import Enum from typing import Self -from pydantic import BaseModel, field_validator, model_validator +from pydantic import BaseModel, model_validator -from dsp_permissions_scripts.models.groups import BuiltinGroup - - -class PermissionScopeElement(BaseModel): - info: str | BuiltinGroup - name: str - - @field_validator("name") - @classmethod - def name_must_represent_permission(cls, v: str) -> str: - assert v in {"RV", "V", "M", "D", "CR"} - return v +from dsp_permissions_scripts.models.scope import PermissionScope class DoapTarget(BaseModel): @@ -43,7 +32,7 @@ class Doap(BaseModel): """ target: DoapTarget - scope: list[PermissionScopeElement] + scope: PermissionScope iri: str diff --git a/dsp_permissions_scripts/models/scope.py b/dsp_permissions_scripts/models/scope.py index 5dcdd60b..d21ea8a2 100644 --- a/dsp_permissions_scripts/models/scope.py +++ b/dsp_permissions_scripts/models/scope.py @@ -1,43 +1,30 @@ -from typing import Sequence +from pydantic import BaseModel from dsp_permissions_scripts.models.groups import BuiltinGroup -from dsp_permissions_scripts.models.permission import PermissionScopeElement + + +class PermissionScope(BaseModel): + CR: list[str | BuiltinGroup] | None = None + D: list[str | BuiltinGroup] | None = None + M: list[str | BuiltinGroup] | None = None + V: list[str | BuiltinGroup] | None = None + RV: list[str | BuiltinGroup] | None = None class StandardScope: """ A scope is an object encoding the information: - "Which user group gets which permissions, if a certain DOAP gets applied?" + "Which user group gets which permissions on a resource/value?" This class offers some predefined scopes. If your preferred scope is not available, - please add a new class variable and implement it in the __init__ method. + please add a new class attribute and implement it in the __init__ method. """ - PUBLIC: list[PermissionScopeElement] + PUBLIC: PermissionScope def __init__(self): - self.PUBLIC = self._make_scope( - view=[BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER], - change_rights=[BuiltinGroup.PROJECT_ADMIN], - delete=[BuiltinGroup.CREATOR, BuiltinGroup.PROJECT_MEMBER], + self.PUBLIC = PermissionScope( + CR=[BuiltinGroup.PROJECT_ADMIN], + D=[BuiltinGroup.CREATOR, BuiltinGroup.PROJECT_MEMBER], + V=[BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER], ) - - def _make_scope( - self, - restricted_view: Sequence[str | BuiltinGroup] = (), - view: Sequence[str | BuiltinGroup] = (), - modify: Sequence[str | BuiltinGroup] = (), - delete: Sequence[str | BuiltinGroup] = (), - change_rights: Sequence[str | BuiltinGroup] = (), - ) -> list[PermissionScopeElement]: - """ - Create scopes by providing group IRIs for different permission levels. - Every parameter represents the groups that get the corresponding permission. - """ - perm_codes_to_groups = {"RV": restricted_view, "V": view, "M": modify, "D": delete, "CR": change_rights} - res = [] - for perm_code, groups in perm_codes_to_groups.items(): - res.extend( - [PermissionScopeElement(info=x if isinstance(x, str) else x.value, name=perm_code) for x in groups] - ) - return res diff --git a/dsp_permissions_scripts/utils/permissions.py b/dsp_permissions_scripts/utils/permissions.py index 1547e455..07d46117 100644 --- a/dsp_permissions_scripts/utils/permissions.py +++ b/dsp_permissions_scripts/utils/permissions.py @@ -9,13 +9,16 @@ Doap, DoapTarget, DoapTargetType, - PermissionScopeElement, + PermissionScope, ) from dsp_permissions_scripts.models.value import ValueUpdate from dsp_permissions_scripts.utils.authentication import get_protocol from dsp_permissions_scripts.utils.project import get_project_iri_by_shortcode - -KB_DOAP = "http://www.knora.org/ontology/knora-admin#DefaultObjectAccessPermission" +from dsp_permissions_scripts.utils.scope_serialization import ( + create_admin_route_object_from_scope, + create_scope_from_admin_route_object, + create_string_from_scope, +) def get_doaps_of_project( @@ -46,7 +49,7 @@ def get_doaps_of_project( def set_doaps_of_groups( - scope: list[PermissionScopeElement], + scope: PermissionScope, groups: Sequence[str | BuiltinGroup], host: str, shortcode: str, @@ -85,48 +88,11 @@ def set_doaps_of_groups( print("All DOAPs have been updated.") -# TODO: maybe these methods should live on the PermissionScopeElement model? - -def __marshal_scope(scope_element: PermissionScopeElement) -> dict[str, Any]: - """ - Serializes a permission scope element to a dict - in the shape that it can be used for JSON requests to /admin/permissions routes. - """ - return { - "additionalInformation": scope_element.info, - "name": scope_element.name, - "permissionCode": None, - } - - -def __marshal_scope_as_permission_string(scope: list[PermissionScopeElement]) -> str: - """ - Serializes a permission scope to a permissions string as used by /v2 routes. - """ - lookup: dict[str, list[str]] = {} - for s in scope: - p = lookup.get(s.name, []) - p.append(str(s.info).replace("http://www.knora.org/ontology/knora-admin#", "knora-admin:")) - lookup[s.name] = p - strs = [f"{k} {','.join(l)}" for k, l in lookup.items()] - return "|".join(strs) - - -def __get_scope_element(scope: dict[str, Any]) -> PermissionScopeElement: - """ - turns permissions JSON as returned by /admin/permissions routes into a PermissionScopeElement object. - """ - return PermissionScopeElement( - info=scope["additionalInformation"], - name=scope["name"], - ) - - def __get_doap(permission: dict[str, Any]) -> Doap: """ Deserializes a DOAP from JSON as returned by /admin/permissions/doap/{project_iri} """ - scope = [__get_scope_element(s) for s in permission["hasPermissions"]] + scope = create_scope_from_admin_route_object(permission["hasPermissions"]) doap = Doap( target=DoapTarget( project=permission["forProject"], @@ -195,7 +161,7 @@ def get_doaps_of_groups( def filter_doaps_by_target( - doaps: list[Doap], + doaps: list[Doap], target: DoapTargetType, ) -> list[Doap]: """ @@ -203,13 +169,13 @@ def filter_doaps_by_target( In case of "all", return all DOAPs. """ match target: - case DoapTargetType.ALL: + case DoapTargetType.ALL: filtered_doaps = doaps - case DoapTargetType.GROUP: + case DoapTargetType.GROUP: filtered_doaps = [d for d in doaps if d.target.group] - case DoapTargetType.PROPERTY: + case DoapTargetType.PROPERTY: filtered_doaps = [d for d in doaps if d.target.property] - case DoapTargetType.RESOURCE_CLASS: + case DoapTargetType.RESOURCE_CLASS: filtered_doaps = [d for d in doaps if d.target.resource_class] return filtered_doaps @@ -225,7 +191,7 @@ def print_doaps_of_project( heading += f" which are related to a {target}" print(f"\n{heading}\n{'=' * len(heading)}\n") for d in doaps: - print(d.model_dump_json(indent=2)) + print(d.model_dump_json(indent=2, exclude_none=True)) print() @@ -249,7 +215,7 @@ def get_permissions_for_project( def update_doap_scope( permission_iri: str, - scope: list[PermissionScopeElement], + scope: PermissionScope, host: str, token: str, ) -> Doap: @@ -260,7 +226,7 @@ def update_doap_scope( headers = {"Authorization": f"Bearer {token}"} protocol = get_protocol(host) url = f"{protocol}://{host}/admin/permissions/{iri}/hasPermissions" - payload = {"hasPermissions": [__marshal_scope(s) for s in scope]} + payload = {"hasPermissions": create_admin_route_object_from_scope(scope)} response = requests.put(url, headers=headers, json=payload, timeout=5) assert response.status_code == 200 new_doap = __get_doap(response.json()["default_object_access_permission"]) @@ -269,7 +235,7 @@ def update_doap_scope( def update_permissions_for_resources_and_values( resource_iris: list[str], - scope: list[PermissionScopeElement], + scope: PermissionScope, host: str, token: str, ) -> None: @@ -282,7 +248,7 @@ def update_permissions_for_resources_and_values( def update_permissions_for_resource_and_values( resource_iri: str, - scope: list[PermissionScopeElement], + scope: PermissionScope, host: str, token: str, ) -> None: @@ -306,7 +272,7 @@ def update_permissions_for_resource( lmd: str | None, type_: str, context: dict[str, str], - scope: list[PermissionScopeElement], + scope: PermissionScope, host: str, token: str, ) -> None: @@ -316,7 +282,7 @@ def update_permissions_for_resource( payload = { "@id": resource_iri, "@type": type_, - "knora-api:hasPermissions": __marshal_scope_as_permission_string(scope), + "knora-api:hasPermissions": create_string_from_scope(scope), "@context": context, } if lmd: @@ -334,7 +300,7 @@ def update_permissions_for_value( value: ValueUpdate, resource_type: str, context: dict[str, str], - scope: list[PermissionScopeElement], + scope: PermissionScope, host: str, token: str, ) -> None: @@ -348,7 +314,7 @@ def update_permissions_for_value( value.property: { "@id": value.value_iri, "@type": value.value_type, - "knora-api:hasPermissions": __marshal_scope_as_permission_string(scope), + "knora-api:hasPermissions": create_string_from_scope(scope), }, "@context": context, } diff --git a/dsp_permissions_scripts/utils/project.py b/dsp_permissions_scripts/utils/project.py index b764f89a..e74b742a 100644 --- a/dsp_permissions_scripts/utils/project.py +++ b/dsp_permissions_scripts/utils/project.py @@ -18,7 +18,7 @@ def get_project_iri_by_shortcode(shortcode: str, host: str) -> str: def get_all_resource_iris_of_project( - project_iri: str, + project_iri: str, host: str, token: str, ) -> list[str]: @@ -40,7 +40,7 @@ def get_all_resource_iris_of_project( def __get_all_resource_class_iris_of_project( - project_iri: str, + project_iri: str, host: str, token: str, ) -> list[str]: @@ -61,7 +61,7 @@ def __get_all_resource_class_iris_of_project( def __get_onto_iris_of_project( - project_iri: str, + project_iri: str, host: str, token: str, ) -> list[str]: @@ -98,7 +98,7 @@ def __dereference_prefix(identifier: str, context: dict[str, str]) -> str: def __get_all_resource_iris_of_resclass( - host: str, + host: str, resclass: str, project_iri: str, token: str, @@ -130,7 +130,7 @@ def __get_next_page( ) -> tuple[bool, list[str]]: """ Get the resource IRIs of a resource class, one page at a time. - DSP-API returns results page-wise: + DSP-API returns results page-wise: a list of 25 resources if there are 25 resources or more, a list of less than 25 resources if there are less than 25 remaining, 1 resource (not packed in a list) if there is only 1 remaining, @@ -146,7 +146,7 @@ def __get_next_page( return True, [r["@id"] for r in result["@graph"]] elif "@id" in result: # result contains only 1 resource: return it, then stop (there will be no more resources) - return False, [result["@id"], ] + return False, [result["@id"]] else: # there are no more resources return False, [] diff --git a/dsp_permissions_scripts/utils/scope_serialization.py b/dsp_permissions_scripts/utils/scope_serialization.py new file mode 100644 index 00000000..8eca0588 --- /dev/null +++ b/dsp_permissions_scripts/utils/scope_serialization.py @@ -0,0 +1,61 @@ +from typing import Any + +from dsp_permissions_scripts.models.groups import BuiltinGroup +from dsp_permissions_scripts.models.scope import PermissionScope + + +def create_string_from_scope(perm_scope: PermissionScope) -> str: + """Serializes a permission scope to a permissions string as used by /v2 routes.""" + as_dict = {} + for perm_letter, groups in perm_scope.model_dump().items(): + if groups: + groups_as_str = [g.value if isinstance(g, BuiltinGroup) else g for g in groups] + as_dict[perm_letter] = [ + g.replace("http://www.knora.org/ontology/knora-admin#", "knora-admin:") for g in groups_as_str + ] + strs = [f"{k} {','.join(l)}" for k, l in as_dict.items()] + return "|".join(strs) + + +def create_scope_from_string(permission_string: str) -> PermissionScope: + kwargs: dict[str, list[str]] = {} + scopes = permission_string.split("|") + for scope in scopes: + perm_letter, groups_as_str = scope.split(" ") + groups = groups_as_str.split(",") + groups = [g.replace("knora-admin:", "http://www.knora.org/ontology/knora-admin#") for g in groups] + kwargs[perm_letter] = groups + return PermissionScope(**kwargs) # type: ignore[arg-type] + + +def create_scope_from_admin_route_object(admin_route_object: list[dict[str, Any]]) -> PermissionScope: + kwargs: dict[str, list[str]] = {} + for obj in admin_route_object: + attr_name: str = obj["name"] + group: str = obj["additionalInformation"] + group = group.replace("knora-admin:", "http://www.knora.org/ontology/knora-admin#") + if attr_name in kwargs: + kwargs[attr_name].append(group) + else: + kwargs[attr_name] = [group] + return PermissionScope(**kwargs) # type: ignore[arg-type] + + +def create_admin_route_object_from_scope(perm_scope: PermissionScope) -> list[dict[str, str | None]]: + """Serializes a permission scope to a shape that can be used for requests to /admin/permissions routes.""" + scope_elements: list[dict[str, str | None]] = [] + for perm_letter, groups in perm_scope.model_dump().items(): + if groups: + groups_as_str = [g.value if isinstance(g, BuiltinGroup) else g for g in groups] + groups_as_str = [ + g.replace("http://www.knora.org/ontology/knora-admin#", "knora-admin:") for g in groups_as_str + ] + for group in groups_as_str: + scope_elements.append( + { + "additionalInformation": group, + "name": perm_letter, + "permissionCode": None, + } + ) + return scope_elements diff --git a/tests/test_scope_serialization.py b/tests/test_scope_serialization.py new file mode 100644 index 00000000..f9e66ffb --- /dev/null +++ b/tests/test_scope_serialization.py @@ -0,0 +1,97 @@ +import unittest + +from dsp_permissions_scripts.models.groups import BuiltinGroup +from dsp_permissions_scripts.models.scope import PermissionScope +from dsp_permissions_scripts.utils.scope_serialization import ( + create_admin_route_object_from_scope, + create_scope_from_admin_route_object, + create_scope_from_string, + create_string_from_scope, +) + + +class TestScopeSerialization(unittest.TestCase): + perm_strings = [ + "CR knora-admin:SystemUser|V knora-admin:CustomGroup", + "D knora-admin:ProjectAdmin|RV knora-admin:ProjectMember", + "M knora-admin:ProjectAdmin|V knora-admin:Creator,knora-admin:KnownUser|RV knora-admin:UnknownUser", + "CR knora-admin:SystemAdmin,knora-admin:ProjectAdmin|D knora-admin:Creator|RV knora-admin:UnknownUser", + ] + admin_route_objects = [ + [ + {"name": "CR", "additionalInformation": "knora-admin:SystemUser", "permissionCode": None}, + {"name": "V", "additionalInformation": "knora-admin:CustomGroup", "permissionCode": None}, + ], + [ + {"name": "D", "additionalInformation": "knora-admin:ProjectAdmin", "permissionCode": None}, + {"name": "RV", "additionalInformation": "knora-admin:ProjectMember", "permissionCode": None}, + ], + [ + {"name": "M", "additionalInformation": "knora-admin:ProjectAdmin", "permissionCode": None}, + {"name": "V", "additionalInformation": "knora-admin:Creator", "permissionCode": None}, + {"name": "V", "additionalInformation": "knora-admin:KnownUser", "permissionCode": None}, + {"name": "RV", "additionalInformation": "knora-admin:UnknownUser", "permissionCode": None}, + ], + [ + {"name": "CR", "additionalInformation": "knora-admin:SystemAdmin", "permissionCode": None}, + {"name": "CR", "additionalInformation": "knora-admin:ProjectAdmin", "permissionCode": None}, + {"name": "D", "additionalInformation": "knora-admin:Creator", "permissionCode": None}, + {"name": "RV", "additionalInformation": "knora-admin:UnknownUser", "permissionCode": None}, + ], + ] + scopes = [ + PermissionScope( + CR=[BuiltinGroup.SYSTEM_USER], + V=["http://www.knora.org/ontology/knora-admin#CustomGroup"] + ), + PermissionScope( + D=[BuiltinGroup.PROJECT_ADMIN], + RV=[BuiltinGroup.PROJECT_MEMBER] + ), + PermissionScope( + M=[BuiltinGroup.PROJECT_ADMIN], + V=[BuiltinGroup.CREATOR, BuiltinGroup.KNOWN_USER], + RV=[BuiltinGroup.UNKNOWN_USER] + ), + PermissionScope( + CR=[BuiltinGroup.SYSTEM_ADMIN, BuiltinGroup.PROJECT_ADMIN], + D=[BuiltinGroup.CREATOR], + RV=[BuiltinGroup.UNKNOWN_USER] + ), + ] + + def test_create_scope_from_string(self) -> None: + for perm_string, scope in zip(self.perm_strings, self.scopes): + self.assertEqual( + create_scope_from_string(perm_string).model_dump_json(), + scope.model_dump_json(), + msg=f"Failed with permission string '{perm_string}'", + ) + + def test_create_scope_from_admin_route_object(self) -> None: + for admin_route_object, scope, index in zip(self.admin_route_objects, self.scopes, range(len(self.scopes))): + self.assertEqual( + create_scope_from_admin_route_object(admin_route_object).model_dump_json(), + scope.model_dump_json(), + msg=f"Failed with admin group object no. {index}", + ) + + def test_create_string_from_scope(self) -> None: + for perm_string, scope in zip(self.perm_strings, self.scopes): + self.assertEqual( + create_string_from_scope(scope), + perm_string, + msg=f"Failed with permission string '{perm_string}'", + ) + + def test_create_admin_route_object_from_scope(self) -> None: + for admin_route_object, scope, index in zip(self.admin_route_objects, self.scopes, range(len(self.scopes))): + self.assertEqual( + create_admin_route_object_from_scope(scope), + admin_route_object, + msg=f"Failed with admin group object no. {index}", + ) + + +if __name__ == "__main__": + unittest.main()