Skip to content

Commit

Permalink
Merge branch 'main' into wip/080E-limc
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum authored Nov 11, 2024
2 parents c0a900c + efdf313 commit 489919f
Show file tree
Hide file tree
Showing 31 changed files with 1,333 additions and 591 deletions.
12 changes: 2 additions & 10 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,13 @@
repos:

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.5.7
rev: v0.6.9
hooks:
- id: ruff
# args: [
# --ignore=A002,
# --ignore=D101,
# --ignore=D102,
# --ignore=PLR0913,
# --ignore=PLR2004,
# --ignore=PLW0603,
# ]
- id: ruff-format

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v5.0.0
hooks:
- id: check-added-large-files
args: ['--maxkb=1000']
Expand Down
14 changes: 8 additions & 6 deletions dsp_permissions_scripts/ap/ap_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@
from dsp_permissions_scripts.ap.ap_model import Ap
from dsp_permissions_scripts.ap.ap_model import ApValue
from dsp_permissions_scripts.models.errors import ApiError
from dsp_permissions_scripts.models.group import Group
from dsp_permissions_scripts.models.group import group_builder
from dsp_permissions_scripts.models.group_utils import get_prefixed_iri_from_full_iri
from dsp_permissions_scripts.utils.dsp_client import DspClient
from dsp_permissions_scripts.utils.get_logger import get_logger
from dsp_permissions_scripts.utils.project import get_project_iri_and_onto_iris_by_shortcode
from dsp_permissions_scripts.utils.project import get_proj_iri_and_onto_iris_by_shortcode

logger = get_logger(__name__)


def create_ap_from_admin_route_object(permission: dict[str, Any]) -> Ap:
def create_ap_from_admin_route_object(permission: dict[str, Any], dsp_client: DspClient) -> Ap:
"""Deserializes a AP from JSON as returned by /admin/permissions/ap/{project_iri}"""
prefixed_group_iri = get_prefixed_iri_from_full_iri(permission["forGroup"], dsp_client)
ap = Ap(
forGroup=Group(val=permission["forGroup"]),
forGroup=group_builder(prefixed_group_iri),
forProject=permission["forProject"],
hasPermissions=frozenset(ApValue(p["name"]) for p in permission["hasPermissions"]),
iri=permission["iri"],
Expand Down Expand Up @@ -45,14 +47,14 @@ def _get_all_aps_of_project(project_iri: str, dsp_client: DspClient) -> list[Ap]
err.message = f"Could not get APs of project {project_iri}"
raise err from None
aps: list[dict[str, Any]] = response["administrative_permissions"]
ap_objects = [create_ap_from_admin_route_object(ap) for ap in aps]
ap_objects = [create_ap_from_admin_route_object(ap, dsp_client) for ap in aps]
return ap_objects


def get_aps_of_project(shortcode: str, dsp_client: DspClient) -> list[Ap]:
"""Returns the Administrative Permissions for a project."""
logger.info("****** Retrieving all Administrative Permissions... ******")
project_iri, _ = get_project_iri_and_onto_iris_by_shortcode(shortcode, dsp_client)
project_iri, _ = get_proj_iri_and_onto_iris_by_shortcode(shortcode, dsp_client)
aps = _get_all_aps_of_project(project_iri, dsp_client)
logger.info(f"Retrieved {len(aps)} Administrative Permissions")
return aps
13 changes: 7 additions & 6 deletions dsp_permissions_scripts/ap/ap_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from dsp_permissions_scripts.ap.ap_model import ApValue
from dsp_permissions_scripts.models.errors import ApiError
from dsp_permissions_scripts.models.group import Group
from dsp_permissions_scripts.models.group_utils import get_full_iri_from_prefixed_iri
from dsp_permissions_scripts.utils.dsp_client import DspClient
from dsp_permissions_scripts.utils.get_logger import get_logger
from dsp_permissions_scripts.utils.project import get_project_iri_and_onto_iris_by_shortcode
from dsp_permissions_scripts.utils.project import get_proj_iri_and_onto_iris_by_shortcode

logger = get_logger(__name__)

Expand All @@ -23,7 +24,7 @@ def _update_ap_scope_on_server(ap: Ap, dsp_client: DspClient) -> Ap:
err.message = f"Could not update scope of Administrative Permission {ap.iri}"
raise err from None
ap_updated: dict[str, Any] = response["administrative_permission"]
ap_object_updated = create_ap_from_admin_route_object(ap_updated)
ap_object_updated = create_ap_from_admin_route_object(ap_updated, dsp_client)
return ap_object_updated


Expand All @@ -47,18 +48,18 @@ def create_new_ap_on_server(
hasPermissions: list[ApValue],
dsp_client: DspClient,
) -> Ap | None:
proj_iri, _ = get_project_iri_and_onto_iris_by_shortcode(shortcode, dsp_client)
proj_iri, _ = get_proj_iri_and_onto_iris_by_shortcode(shortcode, dsp_client)
payload = {
"forGroup": forGroup.full_iri(),
"forGroup": get_full_iri_from_prefixed_iri(forGroup.prefixed_iri, dsp_client),
"forProject": proj_iri,
"hasPermissions": [
{"additionalInformation": None, "name": ap_val.value, "permissionCode": None} for ap_val in hasPermissions
],
}
try:
response = dsp_client.post("/admin/permissions/ap", data=payload)
logger.info(f"Successfully created new AP for group {forGroup.val}")
return create_ap_from_admin_route_object(response["administrative_permission"])
logger.info(f"Successfully created new AP for group {forGroup.prefixed_iri}")
return create_ap_from_admin_route_object(response["administrative_permission"], dsp_client)
except ApiError:
logger.error(f"Could not create new AP for group {forGroup}")
return None
16 changes: 9 additions & 7 deletions dsp_permissions_scripts/doap/doap_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from dsp_permissions_scripts.doap.doap_model import EntityDoapTarget
from dsp_permissions_scripts.doap.doap_model import GroupDoapTarget
from dsp_permissions_scripts.models.errors import ApiError
from dsp_permissions_scripts.models.group import Group
from dsp_permissions_scripts.models.group import group_builder
from dsp_permissions_scripts.models.group_utils import get_prefixed_iri_from_full_iri
from dsp_permissions_scripts.utils.dsp_client import DspClient
from dsp_permissions_scripts.utils.get_logger import get_logger
from dsp_permissions_scripts.utils.project import get_project_iri_and_onto_iris_by_shortcode
from dsp_permissions_scripts.utils.project import get_proj_iri_and_onto_iris_by_shortcode
from dsp_permissions_scripts.utils.scope_serialization import create_scope_from_admin_route_object

logger = get_logger(__name__)
Expand All @@ -22,17 +23,18 @@ def _get_all_doaps_of_project(project_iri: str, dsp_client: DspClient) -> list[D
err.message = f"Error while getting DOAPs of project {project_iri}"
raise err from None
doaps: list[dict[str, Any]] = response["default_object_access_permissions"]
doap_objects = [create_doap_from_admin_route_response(doap) for doap in doaps]
doap_objects = [create_doap_from_admin_route_response(doap, dsp_client) for doap in doaps]
return doap_objects


def create_doap_from_admin_route_response(permission: dict[str, Any]) -> Doap:
def create_doap_from_admin_route_response(permission: dict[str, Any], dsp_client: DspClient) -> Doap:
"""Deserializes a DOAP from JSON as returned by /admin/permissions/doap/{project_iri}"""
scope = create_scope_from_admin_route_object(permission["hasPermissions"])
scope = create_scope_from_admin_route_object(permission["hasPermissions"], dsp_client)
target: GroupDoapTarget | EntityDoapTarget
match permission:
case {"forProject": project_iri, "forGroup": group}:
target = GroupDoapTarget(project_iri=project_iri, group=Group(val=group))
prefixed_group_iri = get_prefixed_iri_from_full_iri(group, dsp_client)
target = GroupDoapTarget(project_iri=project_iri, group=group_builder(prefixed_group_iri))
case {"forProject": project_iri, **p}:
target = EntityDoapTarget(
project_iri=project_iri, resource_class=p.get("forResourceClass"), property=p.get("forProperty")
Expand All @@ -51,7 +53,7 @@ def get_doaps_of_project(shortcode: str, dsp_client: DspClient) -> list[Doap]:
By default, all DOAPs are returned, regardless of their target (target=all).
"""
logger.info("****** Retrieving all DOAPs... ******")
project_iri, _ = get_project_iri_and_onto_iris_by_shortcode(shortcode, dsp_client)
project_iri, _ = get_proj_iri_and_onto_iris_by_shortcode(shortcode, dsp_client)
doaps = _get_all_doaps_of_project(project_iri, dsp_client)
msg = f"Retrieved {len(doaps)} DOAPs"
logger.info(msg)
Expand Down
18 changes: 11 additions & 7 deletions dsp_permissions_scripts/doap/doap_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@
from dsp_permissions_scripts.doap.doap_model import NewEntityDoapTarget
from dsp_permissions_scripts.doap.doap_model import NewGroupDoapTarget
from dsp_permissions_scripts.models.errors import ApiError
from dsp_permissions_scripts.models.group_utils import get_full_iri_from_prefixed_iri
from dsp_permissions_scripts.models.scope import PermissionScope
from dsp_permissions_scripts.utils.dsp_client import DspClient
from dsp_permissions_scripts.utils.get_logger import get_logger
from dsp_permissions_scripts.utils.project import get_project_iri_and_onto_iris_by_shortcode
from dsp_permissions_scripts.utils.project import get_proj_iri_and_onto_iris_by_shortcode
from dsp_permissions_scripts.utils.scope_serialization import create_admin_route_object_from_scope

logger = get_logger(__name__)


def _update_doap_scope_on_server(doap_iri: str, scope: PermissionScope, dsp_client: DspClient) -> Doap:
iri = quote_plus(doap_iri, safe="")
payload = {"hasPermissions": create_admin_route_object_from_scope(scope)}
payload = {"hasPermissions": create_admin_route_object_from_scope(scope, dsp_client)}
try:
response = dsp_client.put(f"/admin/permissions/{iri}/hasPermissions", data=payload)
except ApiError as err:
err.message = f"Could not update scope of DOAP {doap_iri}"
raise err from None
new_doap = create_doap_from_admin_route_response(response["default_object_access_permission"])
new_doap = create_doap_from_admin_route_response(response["default_object_access_permission"], dsp_client)
return new_doap


Expand All @@ -46,18 +47,21 @@ def create_new_doap_on_server(
scope: PermissionScope,
dsp_client: DspClient,
) -> Doap | None:
proj_iri, _ = get_project_iri_and_onto_iris_by_shortcode(shortcode, dsp_client)
proj_iri, _ = get_proj_iri_and_onto_iris_by_shortcode(shortcode, dsp_client)
forGroup = None
if isinstance(target, NewGroupDoapTarget):
forGroup = get_full_iri_from_prefixed_iri(target.group.prefixed_iri, dsp_client)
payload = {
"forGroup": target.group.full_iri() if isinstance(target, NewGroupDoapTarget) else None,
"forGroup": forGroup,
"forProject": proj_iri,
"forProperty": target.property if isinstance(target, NewEntityDoapTarget) else None,
"forResourceClass": target.resource_class if isinstance(target, NewEntityDoapTarget) else None,
"hasPermissions": create_admin_route_object_from_scope(scope),
"hasPermissions": create_admin_route_object_from_scope(scope, dsp_client),
}
try:
response = dsp_client.post("/admin/permissions/doap", data=payload)
logger.info(f"Successfully created new DOAP for target {target}")
return create_doap_from_admin_route_response(response["default_object_access_permission"])
return create_doap_from_admin_route_response(response["default_object_access_permission"], dsp_client)
except ApiError:
logger.error(f"Could not create new DOAP for target {target}")
return None
5 changes: 5 additions & 0 deletions dsp_permissions_scripts/models/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,8 @@ class EmptyScopeError(Exception):
@dataclass
class InvalidGroupError(Exception):
message: str


@dataclass
class InvalidIRIError(Exception):
message: str
109 changes: 88 additions & 21 deletions dsp_permissions_scripts/models/group.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,109 @@
from __future__ import annotations

import re
from typing import Any
from typing import Self
from typing import TypeAlias
from typing import Union

from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Discriminator
from pydantic import Tag
from pydantic import model_validator
from typing_extensions import Annotated

from dsp_permissions_scripts.models.errors import InvalidGroupError
from dsp_permissions_scripts.utils.helpers import KNORA_ADMIN_ONTO_NAMESPACE

KNORA_ADMIN_ONTO_NAMESPACE = "http://www.knora.org/ontology/knora-admin#"
PREFIXED_IRI_REGEX = r"^[\w-]+:[\w -]+$"
NAMES_OF_BUILTIN_GROUPS = ["SystemAdmin", "Creator", "ProjectAdmin", "ProjectMember", "KnownUser", "UnknownUser"]


class Group(BaseModel):
class BuiltinGroup(BaseModel):
"""Represents a DSP-builtin group, in the form 'knora-admin:ProjectAdmin'"""

model_config = ConfigDict(frozen=True)

val: str
prefixed_iri: str

@model_validator(mode="after")
def _validate(self) -> Self:
valid_group_names = ["SystemAdmin", "Creator", "ProjectAdmin", "ProjectMember", "KnownUser", "UnknownUser"]
prefix, name = self.prefixed_iri.split(":")
if prefix != "knora-admin" or name not in valid_group_names:
raise InvalidGroupError(f"{self.prefixed_iri} is not a valid group IRI")
return self


@model_validator(mode="before")
@classmethod
def _shorten_iri(cls, data: Any) -> Any:
if not isinstance(data, dict):
return data
data["val"] = data["val"].replace(KNORA_ADMIN_ONTO_NAMESPACE, "knora-admin:")
return data
class CustomGroup(BaseModel):
"""Represents a custom group, in the form 'project-shortname:groupname'"""

model_config = ConfigDict(frozen=True)

prefixed_iri: str

@model_validator(mode="after")
def _check_regex(self) -> Group:
if not self.val.startswith("knora-admin:"):
raise InvalidGroupError(f"{self.val} is not a valid group IRI")
def _validate(self) -> Self:
if not is_valid_prefixed_group_iri(self.prefixed_iri):
raise InvalidGroupError(f"{self.prefixed_iri} is not a valid group IRI")
if self.prefixed_iri.startswith(("knora-admin:", "knora-base:", "knora-api:")):
raise InvalidGroupError(f"{self.prefixed_iri} is not a custom group")
return self

def full_iri(self) -> str:
return self.val.replace("knora-admin:", KNORA_ADMIN_ONTO_NAMESPACE)

def group_builder(prefixed_iri: str) -> BuiltinGroup | CustomGroup:
"""
Accepts a prefixed IRI, and converts it to a BuiltinGroup or CustomGroup.
Args:
prefixed_iri: string in the form 'knora-admin:ProjectAdmin' or 'project-shortname:groupname'
Raises:
InvalidGroupError: if the input is neither a valid builtin group nor a valid custom group
"""
if prefixed_iri.startswith("knora-admin:"):
return BuiltinGroup(prefixed_iri=prefixed_iri)
elif re.search(PREFIXED_IRI_REGEX, prefixed_iri):
return CustomGroup(prefixed_iri=prefixed_iri)
else:
raise InvalidGroupError(f"{prefixed_iri} is not a valid group IRI")


def _group_discriminator(v: Any) -> str:
"""
This is only for typing purposes,
see https://docs.pydantic.dev/latest/concepts/unions/#discriminated-unions-with-callable-discriminator
"""
if isinstance(v, dict):
return "builtin" if v["prefixed_iri"].startswith("knora-admin:") else "custom"
else:
return "builtin" if getattr(v, "prefixed_iri").startswith("knora-admin:") else "custom"


Group: TypeAlias = Annotated[
Union[
Annotated[BuiltinGroup, Tag("builtin")],
Annotated[CustomGroup, Tag("custom")],
],
Discriminator(_group_discriminator),
]


def is_valid_prefixed_group_iri(iri: str) -> bool:
if iri.startswith((KNORA_ADMIN_ONTO_NAMESPACE, "http://rdfh.ch/groups/", "knora-base:", "knora-api:")):
return False
elif iri.startswith("knora-admin:") and not iri.endswith(tuple(NAMES_OF_BUILTIN_GROUPS)):
return False
elif re.search(PREFIXED_IRI_REGEX, iri):
return True
else:
return False


UNKNOWN_USER = Group(val="knora-admin:UnknownUser")
KNOWN_USER = Group(val="knora-admin:KnownUser")
PROJECT_MEMBER = Group(val="knora-admin:ProjectMember")
PROJECT_ADMIN = Group(val="knora-admin:ProjectAdmin")
CREATOR = Group(val="knora-admin:Creator")
SYSTEM_ADMIN = Group(val="knora-admin:SystemAdmin")
UNKNOWN_USER = BuiltinGroup(prefixed_iri="knora-admin:UnknownUser")
KNOWN_USER = BuiltinGroup(prefixed_iri="knora-admin:KnownUser")
PROJECT_MEMBER = BuiltinGroup(prefixed_iri="knora-admin:ProjectMember")
PROJECT_ADMIN = BuiltinGroup(prefixed_iri="knora-admin:ProjectAdmin")
CREATOR = BuiltinGroup(prefixed_iri="knora-admin:Creator")
SYSTEM_ADMIN = BuiltinGroup(prefixed_iri="knora-admin:SystemAdmin")
Loading

0 comments on commit 489919f

Please sign in to comment.