Skip to content

Commit

Permalink
Merge branch 'main' into wip/scenario-tanner
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum authored Oct 12, 2023
2 parents ccb7fb9 + 5e6dcd6 commit 708e3b2
Show file tree
Hide file tree
Showing 15 changed files with 321 additions and 158 deletions.
6 changes: 6 additions & 0 deletions dsp_permissions_scripts/models/builtin_groups.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
UNKNOWN_USER = "http://www.knora.org/ontology/knora-admin#UnknownUser"
KNOWN_USER = "http://www.knora.org/ontology/knora-admin#KnownUser"
PROJECT_MEMBER = "http://www.knora.org/ontology/knora-admin#ProjectMember"
PROJECT_ADMIN = "http://www.knora.org/ontology/knora-admin#ProjectAdmin"
CREATOR = "http://www.knora.org/ontology/knora-admin#Creator"
SYSTEM_ADMIN = "http://www.knora.org/ontology/knora-admin#SystemAdmin"
6 changes: 3 additions & 3 deletions dsp_permissions_scripts/models/doap.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ class Doap(BaseModel):

class DoapTarget(BaseModel):
project: str
group: str | None
resource_class: str | None
property: str | None
group: str | None = None
resource_class: str | None = None
property: str | None = None

@model_validator(mode="after")
def assert_correct_combination(self) -> Self:
Expand Down
14 changes: 0 additions & 14 deletions dsp_permissions_scripts/models/groups.py

This file was deleted.

47 changes: 22 additions & 25 deletions dsp_permissions_scripts/models/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pydantic import BaseModel, ConfigDict, model_validator

from dsp_permissions_scripts.models.groups import BuiltinGroup
from dsp_permissions_scripts.models import builtin_groups


class PermissionScope(BaseModel):
Expand All @@ -14,19 +14,19 @@ class PermissionScope(BaseModel):
"""
model_config = ConfigDict(frozen=True)

CR: frozenset[str | BuiltinGroup] = frozenset()
D: frozenset[str | BuiltinGroup] = frozenset()
M: frozenset[str | BuiltinGroup] = frozenset()
V: frozenset[str | BuiltinGroup] = frozenset()
RV: frozenset[str | BuiltinGroup] = frozenset()
CR: frozenset[str] = frozenset()
D: frozenset[str] = frozenset()
M: frozenset[str] = frozenset()
V: frozenset[str] = frozenset()
RV: frozenset[str] = frozenset()

@staticmethod
def create(
CR: Iterable[str | BuiltinGroup] = (),
D: Iterable[str | BuiltinGroup] = (),
M: Iterable[str | BuiltinGroup] = (),
V: Iterable[str | BuiltinGroup] = (),
RV: Iterable[str | BuiltinGroup] = (),
CR: Iterable[str] = (),
D: Iterable[str] = (),
M: Iterable[str] = (),
V: Iterable[str] = (),
RV: Iterable[str] = (),
) -> PermissionScope:
"""Factory method to create a PermissionScope from Iterables instead of frozensets."""
return PermissionScope(
Expand All @@ -42,23 +42,21 @@ def check_group_occurs_only_once(self):
all_groups = []
for field in self.model_fields:
all_groups.extend(getattr(self, field))
all_groups_as_strs = [g.value if isinstance(g, BuiltinGroup) else g for g in all_groups]
for group in all_groups_as_strs:
if all_groups_as_strs.count(group) > 1:
for group in all_groups:
if all_groups.count(group) > 1:
raise ValueError(f"Group {group} must not occur in more than one field")
return self

def add(
self,
permission: Literal["CR", "D", "M", "V", "RV"],
group: str | BuiltinGroup,
group: str,
) -> PermissionScope:
"""Return a copy of the PermissionScope instance with group added to permission."""
groups = [g.value if isinstance(g, BuiltinGroup) else g for g in getattr(self, permission)]
group = group.value if isinstance(group, BuiltinGroup) else group
groups = getattr(self, permission)
if group in groups:
raise ValueError(f"Group '{group}' is already in permission '{permission}'")
groups.append(group)
groups = groups | {group}
kwargs: dict[str, list[str]] = {permission: groups}
for perm in ["CR", "D", "M", "V", "RV"]:
if perm != permission:
Expand All @@ -68,14 +66,13 @@ def add(
def remove(
self,
permission: Literal["CR", "D", "M", "V", "RV"],
group: str | BuiltinGroup,
group: str,
) -> PermissionScope:
"""Return a copy of the PermissionScope instance with group removed from permission."""
groups = [g.value if isinstance(g, BuiltinGroup) else g for g in getattr(self, permission)]
group = group.value if isinstance(group, BuiltinGroup) else group
groups = getattr(self, permission)
if group not in groups:
raise ValueError(f"Group '{group}' is not in permission '{permission}'")
groups.remove(group)
groups = groups - {group}
kwargs: dict[str, list[str]] = {permission: groups}
for perm in ["CR", "D", "M", "V", "RV"]:
if perm != permission:
Expand All @@ -84,7 +81,7 @@ def remove(


PUBLIC = PermissionScope.create(
CR={BuiltinGroup.PROJECT_ADMIN},
D={BuiltinGroup.CREATOR, BuiltinGroup.PROJECT_MEMBER},
V={BuiltinGroup.UNKNOWN_USER, BuiltinGroup.KNOWN_USER},
CR={builtin_groups.PROJECT_ADMIN},
D={builtin_groups.CREATOR, builtin_groups.PROJECT_MEMBER},
V={builtin_groups.UNKNOWN_USER, builtin_groups.KNOWN_USER},
)
89 changes: 50 additions & 39 deletions dsp_permissions_scripts/template.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dotenv import load_dotenv

from dsp_permissions_scripts.models import builtin_groups
from dsp_permissions_scripts.models.doap import Doap
from dsp_permissions_scripts.models.groups import BuiltinGroup
from dsp_permissions_scripts.models.host import Hosts
from dsp_permissions_scripts.models.oap import Oap
from dsp_permissions_scripts.models.scope import PUBLIC
Expand All @@ -14,98 +14,109 @@
from dsp_permissions_scripts.utils.project import get_all_resource_oaps_of_project


def modify_oaps(oaps: list[Oap]) -> list[Oap]:
"""Adapt this sample to your needs."""
for oap in oaps:
if BuiltinGroup.SYSTEM_ADMIN.value not in oap.scope.CR:
oap.scope = oap.scope.add("CR", BuiltinGroup.SYSTEM_ADMIN)
return oaps


def modify_doaps(doaps: list[Doap]) -> list[Doap]:
"""Adapt this sample to your needs."""
for doap in doaps:
if doap.target.group in [BuiltinGroup.PROJECT_MEMBER.value, BuiltinGroup.PROJECT_ADMIN.value]:
if doap.target.group in [builtin_groups.PROJECT_MEMBER, builtin_groups.PROJECT_ADMIN]:
doap.scope = PUBLIC
return doaps


def update_oaps(
def modify_oaps(oaps: list[Oap]) -> list[Oap]:
"""Adapt this sample to your needs."""
for oap in oaps:
if builtin_groups.SYSTEM_ADMIN not in oap.scope.CR:
oap.scope = oap.scope.add("CR", builtin_groups.SYSTEM_ADMIN)
return oaps


def update_doaps(
host: str,
shortcode: str,
token: str,
) -> None:
"""Sample function to modify the Object Access Permissions of a project."""
resource_oaps = get_all_resource_oaps_of_project(
shortcode=shortcode,
"""Sample function to modify the Default Object Access Permissions of a project."""
project_doaps = get_doaps_of_project(
host=host,
shortcode=shortcode,
token=token,
)
serialize_resource_oaps(
resource_oaps=resource_oaps,
serialize_doaps_of_project(
project_doaps=project_doaps,
shortcode=shortcode,
mode="original",
)
resource_oaps_updated = modify_oaps(oaps=resource_oaps)
serialize_resource_oaps(
resource_oaps=resource_oaps_updated,
shortcode=shortcode,
mode="modified",
project_doaps_modified = modify_doaps(doaps=project_doaps)
apply_updated_doaps_on_server(
doaps=project_doaps_modified,
host=host,
token=token,
)
apply_updated_oaps_on_server(
resource_oaps=resource_oaps_updated,
project_doaps_updated = get_doaps_of_project(
host=host,
shortcode=shortcode,
token=token,
)
serialize_doaps_of_project(
project_doaps=project_doaps_updated,
shortcode=shortcode,
mode="modified",
)


def update_doaps(
def update_oaps(
host: str,
shortcode: str,
token: str,
) -> None:
"""Sample function to modify the Default Object Access Permissions of a project."""
project_doaps = get_doaps_of_project(
host=host,
"""Sample function to modify the Object Access Permissions of a project."""
resource_oaps = get_all_resource_oaps_of_project(
shortcode=shortcode,
host=host,
token=token,
)
serialize_doaps_of_project(
project_doaps=project_doaps,
serialize_resource_oaps(
resource_oaps=resource_oaps,
shortcode=shortcode,
mode="original",
)
project_doaps_updated = modify_doaps(doaps=project_doaps)
serialize_doaps_of_project(
project_doaps=project_doaps_updated,
resource_oaps_modified = modify_oaps(oaps=resource_oaps)
apply_updated_oaps_on_server(
resource_oaps=resource_oaps_modified,
host=host,
token=token,
shortcode=shortcode,
mode="modified",
)
apply_updated_doaps_on_server(
doaps=project_doaps_updated,
resource_oaps_updated = get_all_resource_oaps_of_project(
shortcode=shortcode,
host=host,
token=token,
)
serialize_resource_oaps(
resource_oaps=resource_oaps_updated,
shortcode=shortcode,
mode="modified",
)


def main() -> None:
"""
The main function provides you with 2 sample functions:
one to update the Object Access Permissions of a project,
and one to update the Default Object Access Permissions of a project.
one to update the Default Object Access Permissions of a project,
and one to update the Object Access Permissions of a project.
Both must first be adapted to your needs.
"""
load_dotenv() # set login credentials from .env file as environment variables
host = Hosts.get_host("test")
shortcode = "F18E"
token = login(host)

update_oaps(
update_doaps(
host=host,
shortcode=shortcode,
token=token,
)
update_doaps(
update_oaps(
host=host,
shortcode=shortcode,
token=token,
Expand Down
6 changes: 3 additions & 3 deletions dsp_permissions_scripts/utils/doap_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def get_doaps_of_project(
host: str,
shortcode: str,
token: str,
target: DoapTargetType = DoapTargetType.ALL,
target_type: DoapTargetType = DoapTargetType.ALL,
) -> list[Doap]:
"""
Returns the doaps for a project.
Expand All @@ -94,7 +94,7 @@ def get_doaps_of_project(
)
filtered_doaps = _filter_doaps_by_target(
doaps=doaps,
target=target,
target=target_type,
)
logger.info(f"Found {len(doaps)} DOAPs, {len(filtered_doaps)} of which are related to {target}.")
logger.info(f"Found {len(doaps)} DOAPs, {len(filtered_doaps)} of which are related to {target_type}.")
return filtered_doaps
8 changes: 4 additions & 4 deletions dsp_permissions_scripts/utils/doap_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ def serialize_doaps_of_project(
project_doaps: list[Doap],
shortcode: str,
mode: Literal["original", "modified"],
target: DoapTargetType = DoapTargetType.ALL,
target_type: DoapTargetType = DoapTargetType.ALL,
) -> None:
"""Serialize the DOAPs of a project to a JSON file."""
filepath = _get_file_path(shortcode, mode)
filepath.parent.mkdir(parents=True, exist_ok=True)
explanation_string = f"Project {shortcode} has {len(project_doaps)} DOAPs"
if target != DoapTargetType.ALL:
explanation_string += f" which are related to a {target}"
if target_type != DoapTargetType.ALL:
explanation_string += f" which are related to a {target_type}"
doaps_as_dicts = [doap.model_dump(exclude_none=True, mode="json") for doap in project_doaps]
doaps_as_dict = {explanation_string: doaps_as_dicts}
with open(filepath, mode="w", encoding="utf-8") as f:
Expand All @@ -38,5 +38,5 @@ def deserialize_doaps_of_project(
filepath = _get_file_path(shortcode, mode)
with open(filepath, mode="r", encoding="utf-8") as f:
doaps_as_dict = json.load(f)
doaps_as_dicts = doaps_as_dict.values()[0]
doaps_as_dicts = list(doaps_as_dict.values())[0]
return [Doap.model_validate(d) for d in doaps_as_dicts]
20 changes: 13 additions & 7 deletions dsp_permissions_scripts/utils/doap_set.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from typing import Literal
from urllib.parse import quote_plus

Expand Down Expand Up @@ -65,11 +66,16 @@ def apply_updated_doaps_on_server(
print(f"\n{heading}\n{'=' * len(heading)}\n")
for d in doaps:
_log_and_print_doap_update(doap=d, state="before")
new_doap = _update_doap_scope(
doap_iri=d.doap_iri,
scope=d.scope,
host=host,
token=token,
)
_log_and_print_doap_update(doap=new_doap, state="after")
try:
new_doap = _update_doap_scope(
doap_iri=d.doap_iri,
scope=d.scope,
host=host,
token=token,
)
_log_and_print_doap_update(doap=new_doap, state="after")
except Exception: # pylint: disable=broad-exception-caught
logger.error(f"ERROR while updating DOAP {d.doap_iri}", exc_info=True)
warnings.warn(f"ERROR while updating DOAP {d.doap_iri}")

print(f"{get_timestamp()}: All DOAPs have been updated.")
11 changes: 9 additions & 2 deletions dsp_permissions_scripts/utils/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dsp_permissions_scripts.models.groups import BuiltinGroup
from dsp_permissions_scripts.models import builtin_groups


def dereference_prefix(
Expand All @@ -17,7 +17,14 @@ def _get_sort_pos_of_custom_group(group: str) -> int:

def sort_groups(groups_original: list[str]) -> list[str]:
"""Sorts groups, first according to their power (most powerful first), then alphabetically."""
sort_key = list(reversed([x.value for x in BuiltinGroup]))
sort_key = [
builtin_groups.SYSTEM_ADMIN,
builtin_groups.CREATOR,
builtin_groups.PROJECT_ADMIN,
builtin_groups.PROJECT_MEMBER,
builtin_groups.KNOWN_USER,
builtin_groups.UNKNOWN_USER
]
groups = groups_original.copy()
groups.sort(key=lambda x: sort_key.index(x) if x in sort_key else _get_sort_pos_of_custom_group(x))
return groups
Loading

0 comments on commit 708e3b2

Please sign in to comment.