Skip to content

Commit

Permalink
feat: new class to handle scopes (DEV-2726) (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum authored Sep 29, 2023
1 parent ce783b5 commit 89f99ed
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 106 deletions.
2 changes: 1 addition & 1 deletion dsp_permissions_scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def main() -> None:
host=host,
)
token = login(host)

new_scope = StandardScope().PUBLIC
groups = [BuiltinGroup.PROJECT_ADMIN, BuiltinGroup.PROJECT_MEMBER]

Expand Down
1 change: 1 addition & 0 deletions dsp_permissions_scripts/models/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ class BuiltinGroup(Enum):
CREATOR = "http://www.knora.org/ontology/knora-admin#Creator"
PROJECT_MEMBER = "http://www.knora.org/ontology/knora-admin#ProjectMember"
PROJECT_ADMIN = "http://www.knora.org/ontology/knora-admin#ProjectAdmin"
SYSTEM_USER = "http://www.knora.org/ontology/knora-admin#SystemUser"
SYSTEM_ADMIN = "http://www.knora.org/ontology/knora-admin#SystemAdmin"
17 changes: 3 additions & 14 deletions dsp_permissions_scripts/models/permission.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
from enum import Enum
from typing import Self

from pydantic import BaseModel, field_validator, model_validator
from pydantic import BaseModel, model_validator

from dsp_permissions_scripts.models.groups import BuiltinGroup


class PermissionScopeElement(BaseModel):
info: str | BuiltinGroup
name: str

@field_validator("name")
@classmethod
def name_must_represent_permission(cls, v: str) -> str:
assert v in {"RV", "V", "M", "D", "CR"}
return v
from dsp_permissions_scripts.models.scope import PermissionScope


class DoapTarget(BaseModel):
Expand Down Expand Up @@ -43,7 +32,7 @@ class Doap(BaseModel):
"""

target: DoapTarget
scope: list[PermissionScopeElement]
scope: PermissionScope
iri: str


Expand Down
45 changes: 16 additions & 29 deletions dsp_permissions_scripts/models/scope.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,30 @@
from typing import Sequence
from pydantic import BaseModel

from dsp_permissions_scripts.models.groups import BuiltinGroup
from dsp_permissions_scripts.models.permission import PermissionScopeElement


class PermissionScope(BaseModel):
CR: list[str | BuiltinGroup] | None = None
D: list[str | BuiltinGroup] | None = None
M: list[str | BuiltinGroup] | None = None
V: list[str | BuiltinGroup] | None = None
RV: list[str | BuiltinGroup] | None = None


class StandardScope:
"""
A scope is an object encoding the information:
"Which user group gets which permissions, if a certain DOAP gets applied?"
"Which user group gets which permissions on a resource/value?"
This class offers some predefined scopes.
If your preferred scope is not available,
please add a new class variable and implement it in the __init__ method.
please add a new class attribute and implement it in the __init__ method.
"""

PUBLIC: list[PermissionScopeElement]
PUBLIC: PermissionScope

def __init__(self):
self.PUBLIC = self._make_scope(
view=[BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER],
change_rights=[BuiltinGroup.PROJECT_ADMIN],
delete=[BuiltinGroup.CREATOR, BuiltinGroup.PROJECT_MEMBER],
self.PUBLIC = PermissionScope(
CR=[BuiltinGroup.PROJECT_ADMIN],
D=[BuiltinGroup.CREATOR, BuiltinGroup.PROJECT_MEMBER],
V=[BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER],
)

def _make_scope(
self,
restricted_view: Sequence[str | BuiltinGroup] = (),
view: Sequence[str | BuiltinGroup] = (),
modify: Sequence[str | BuiltinGroup] = (),
delete: Sequence[str | BuiltinGroup] = (),
change_rights: Sequence[str | BuiltinGroup] = (),
) -> list[PermissionScopeElement]:
"""
Create scopes by providing group IRIs for different permission levels.
Every parameter represents the groups that get the corresponding permission.
"""
perm_codes_to_groups = {"RV": restricted_view, "V": view, "M": modify, "D": delete, "CR": change_rights}
res = []
for perm_code, groups in perm_codes_to_groups.items():
res.extend(
[PermissionScopeElement(info=x if isinstance(x, str) else x.value, name=perm_code) for x in groups]
)
return res
78 changes: 22 additions & 56 deletions dsp_permissions_scripts/utils/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
Doap,
DoapTarget,
DoapTargetType,
PermissionScopeElement,
PermissionScope,
)
from dsp_permissions_scripts.models.value import ValueUpdate
from dsp_permissions_scripts.utils.authentication import get_protocol
from dsp_permissions_scripts.utils.project import get_project_iri_by_shortcode

KB_DOAP = "http://www.knora.org/ontology/knora-admin#DefaultObjectAccessPermission"
from dsp_permissions_scripts.utils.scope_serialization import (
create_admin_route_object_from_scope,
create_scope_from_admin_route_object,
create_string_from_scope,
)


def get_doaps_of_project(
Expand Down Expand Up @@ -46,7 +49,7 @@ def get_doaps_of_project(


def set_doaps_of_groups(
scope: list[PermissionScopeElement],
scope: PermissionScope,
groups: Sequence[str | BuiltinGroup],
host: str,
shortcode: str,
Expand Down Expand Up @@ -85,48 +88,11 @@ def set_doaps_of_groups(
print("All DOAPs have been updated.")


# TODO: maybe these methods should live on the PermissionScopeElement model?

def __marshal_scope(scope_element: PermissionScopeElement) -> dict[str, Any]:
"""
Serializes a permission scope element to a dict
in the shape that it can be used for JSON requests to /admin/permissions routes.
"""
return {
"additionalInformation": scope_element.info,
"name": scope_element.name,
"permissionCode": None,
}


def __marshal_scope_as_permission_string(scope: list[PermissionScopeElement]) -> str:
"""
Serializes a permission scope to a permissions string as used by /v2 routes.
"""
lookup: dict[str, list[str]] = {}
for s in scope:
p = lookup.get(s.name, [])
p.append(str(s.info).replace("http://www.knora.org/ontology/knora-admin#", "knora-admin:"))
lookup[s.name] = p
strs = [f"{k} {','.join(l)}" for k, l in lookup.items()]
return "|".join(strs)


def __get_scope_element(scope: dict[str, Any]) -> PermissionScopeElement:
"""
turns permissions JSON as returned by /admin/permissions routes into a PermissionScopeElement object.
"""
return PermissionScopeElement(
info=scope["additionalInformation"],
name=scope["name"],
)


def __get_doap(permission: dict[str, Any]) -> Doap:
"""
Deserializes a DOAP from JSON as returned by /admin/permissions/doap/{project_iri}
"""
scope = [__get_scope_element(s) for s in permission["hasPermissions"]]
scope = create_scope_from_admin_route_object(permission["hasPermissions"])
doap = Doap(
target=DoapTarget(
project=permission["forProject"],
Expand Down Expand Up @@ -195,21 +161,21 @@ def get_doaps_of_groups(


def filter_doaps_by_target(
doaps: list[Doap],
doaps: list[Doap],
target: DoapTargetType,
) -> list[Doap]:
"""
Returns only the DOAPs that are related to either a group, or a resource class, or a property.
In case of "all", return all DOAPs.
"""
match target:
case DoapTargetType.ALL:
case DoapTargetType.ALL:
filtered_doaps = doaps
case DoapTargetType.GROUP:
case DoapTargetType.GROUP:
filtered_doaps = [d for d in doaps if d.target.group]
case DoapTargetType.PROPERTY:
case DoapTargetType.PROPERTY:
filtered_doaps = [d for d in doaps if d.target.property]
case DoapTargetType.RESOURCE_CLASS:
case DoapTargetType.RESOURCE_CLASS:
filtered_doaps = [d for d in doaps if d.target.resource_class]
return filtered_doaps

Expand All @@ -225,7 +191,7 @@ def print_doaps_of_project(
heading += f" which are related to a {target}"
print(f"\n{heading}\n{'=' * len(heading)}\n")
for d in doaps:
print(d.model_dump_json(indent=2))
print(d.model_dump_json(indent=2, exclude_none=True))
print()


Expand All @@ -249,7 +215,7 @@ def get_permissions_for_project(

def update_doap_scope(
permission_iri: str,
scope: list[PermissionScopeElement],
scope: PermissionScope,
host: str,
token: str,
) -> Doap:
Expand All @@ -260,7 +226,7 @@ def update_doap_scope(
headers = {"Authorization": f"Bearer {token}"}
protocol = get_protocol(host)
url = f"{protocol}://{host}/admin/permissions/{iri}/hasPermissions"
payload = {"hasPermissions": [__marshal_scope(s) for s in scope]}
payload = {"hasPermissions": create_admin_route_object_from_scope(scope)}
response = requests.put(url, headers=headers, json=payload, timeout=5)
assert response.status_code == 200
new_doap = __get_doap(response.json()["default_object_access_permission"])
Expand All @@ -269,7 +235,7 @@ def update_doap_scope(

def update_permissions_for_resources_and_values(
resource_iris: list[str],
scope: list[PermissionScopeElement],
scope: PermissionScope,
host: str,
token: str,
) -> None:
Expand All @@ -282,7 +248,7 @@ def update_permissions_for_resources_and_values(

def update_permissions_for_resource_and_values(
resource_iri: str,
scope: list[PermissionScopeElement],
scope: PermissionScope,
host: str,
token: str,
) -> None:
Expand All @@ -306,7 +272,7 @@ def update_permissions_for_resource(
lmd: str | None,
type_: str,
context: dict[str, str],
scope: list[PermissionScopeElement],
scope: PermissionScope,
host: str,
token: str,
) -> None:
Expand All @@ -316,7 +282,7 @@ def update_permissions_for_resource(
payload = {
"@id": resource_iri,
"@type": type_,
"knora-api:hasPermissions": __marshal_scope_as_permission_string(scope),
"knora-api:hasPermissions": create_string_from_scope(scope),
"@context": context,
}
if lmd:
Expand All @@ -334,7 +300,7 @@ def update_permissions_for_value(
value: ValueUpdate,
resource_type: str,
context: dict[str, str],
scope: list[PermissionScopeElement],
scope: PermissionScope,
host: str,
token: str,
) -> None:
Expand All @@ -348,7 +314,7 @@ def update_permissions_for_value(
value.property: {
"@id": value.value_iri,
"@type": value.value_type,
"knora-api:hasPermissions": __marshal_scope_as_permission_string(scope),
"knora-api:hasPermissions": create_string_from_scope(scope),
},
"@context": context,
}
Expand Down
12 changes: 6 additions & 6 deletions dsp_permissions_scripts/utils/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def get_project_iri_by_shortcode(shortcode: str, host: str) -> str:


def get_all_resource_iris_of_project(
project_iri: str,
project_iri: str,
host: str,
token: str,
) -> list[str]:
Expand All @@ -40,7 +40,7 @@ def get_all_resource_iris_of_project(


def __get_all_resource_class_iris_of_project(
project_iri: str,
project_iri: str,
host: str,
token: str,
) -> list[str]:
Expand All @@ -61,7 +61,7 @@ def __get_all_resource_class_iris_of_project(


def __get_onto_iris_of_project(
project_iri: str,
project_iri: str,
host: str,
token: str,
) -> list[str]:
Expand Down Expand Up @@ -98,7 +98,7 @@ def __dereference_prefix(identifier: str, context: dict[str, str]) -> str:


def __get_all_resource_iris_of_resclass(
host: str,
host: str,
resclass: str,
project_iri: str,
token: str,
Expand Down Expand Up @@ -130,7 +130,7 @@ def __get_next_page(
) -> tuple[bool, list[str]]:
"""
Get the resource IRIs of a resource class, one page at a time.
DSP-API returns results page-wise:
DSP-API returns results page-wise:
a list of 25 resources if there are 25 resources or more,
a list of less than 25 resources if there are less than 25 remaining,
1 resource (not packed in a list) if there is only 1 remaining,
Expand All @@ -146,7 +146,7 @@ def __get_next_page(
return True, [r["@id"] for r in result["@graph"]]
elif "@id" in result:
# result contains only 1 resource: return it, then stop (there will be no more resources)
return False, [result["@id"], ]
return False, [result["@id"]]
else:
# there are no more resources
return False, []
Loading

0 comments on commit 89f99ed

Please sign in to comment.