Skip to content

Commit

Permalink
Merge branch 'main' into wip/080E-limc
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum authored Oct 11, 2024
2 parents 9bdd4d5 + acc1df7 commit 8cb74b1
Show file tree
Hide file tree
Showing 21 changed files with 250 additions and 248 deletions.
5 changes: 2 additions & 3 deletions dsp_permissions_scripts/ap/ap_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ def _delete_ap_on_server(ap: Ap, dsp_client: DspClient) -> None:


def delete_ap_of_group_on_server(
host: str,
existing_aps: list[Ap],
forGroup: Group,
dsp_client: DspClient,
) -> list[Ap]:
aps_to_delete = [ap for ap in existing_aps if ap.forGroup == forGroup]
if not aps_to_delete:
logger.warning(f"There are no APs to delete on {host} for group {forGroup}")
logger.warning(f"There are no APs to delete on {dsp_client.server} for group {forGroup}")
return existing_aps
logger.info(f"Deleting the Administrative Permissions for group {forGroup} on server {host}")
logger.info(f"Deleting the Administrative Permissions for group {forGroup} on server {dsp_client.server}")
for ap in aps_to_delete:
_delete_ap_on_server(ap, dsp_client)
existing_aps.remove(ap)
Expand Down
4 changes: 2 additions & 2 deletions dsp_permissions_scripts/ap/ap_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ def serialize_aps_of_project(
project_aps: list[Ap],
shortcode: str,
mode: Literal["original", "modified"],
host: str,
server: str,
) -> None:
"""Serialize the APs of a project to a JSON file."""
filepath = _get_file_path(shortcode, mode)
filepath.parent.mkdir(parents=True, exist_ok=True)
explanation_string = f"{get_timestamp()}: Project {shortcode} on host {host} has {len(project_aps)} APs"
explanation_string = f"{get_timestamp()}: Project {shortcode} on server {server} has {len(project_aps)} APs"
aps_as_dicts = [ap.model_dump(exclude_none=True, mode="json") for ap in project_aps]
aps_as_dict = {explanation_string: aps_as_dicts}
with open(filepath, mode="w", encoding="utf-8") as f:
Expand Down
8 changes: 4 additions & 4 deletions dsp_permissions_scripts/ap/ap_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ def _update_ap_scope_on_server(ap: Ap, dsp_client: DspClient) -> Ap:
return ap_object_updated


def apply_updated_scopes_of_aps_on_server(aps: list[Ap], host: str, dsp_client: DspClient) -> None:
def apply_updated_scopes_of_aps_on_server(aps: list[Ap], dsp_client: DspClient) -> None:
if not aps:
logger.warning(f"There are no APs to update on {host}")
logger.warning(f"There are no APs to update on {dsp_client.server}")
return
logger.info(f"****** Updating scopes of {len(aps)} Administrative Permissions on {host}... ******")
logger.info(f"****** Updating scopes of {len(aps)} Administrative Permissions on {dsp_client.server}... ******")
for ap in aps:
try:
_ = _update_ap_scope_on_server(ap, dsp_client)
logger.info(f"Successfully updated AP {ap.iri}")
except ApiError as err:
logger.error(err)
logger.info(f"Finished updating scopes of {len(aps)} Administrative Permissions on {host}")
logger.info(f"Finished updating scopes of {len(aps)} Administrative Permissions on {dsp_client.server}")


def create_new_ap_on_server(
Expand Down
38 changes: 38 additions & 0 deletions dsp_permissions_scripts/doap/doap_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from urllib.parse import quote_plus

from dsp_permissions_scripts.doap.doap_model import Doap
from dsp_permissions_scripts.doap.doap_model import GroupDoapTarget
from dsp_permissions_scripts.models.errors import ApiError
from dsp_permissions_scripts.models.group import Group
from dsp_permissions_scripts.utils.dsp_client import DspClient
from dsp_permissions_scripts.utils.get_logger import get_logger

logger = get_logger(__name__)


def _delete_doap_on_server(doap: Doap, dsp_client: DspClient) -> None:
doap_iri = quote_plus(doap.doap_iri, safe="")
try:
dsp_client.delete(f"/admin/permissions/{doap_iri}")
except ApiError as err:
err.message = f"Could not delete DOAP {doap.doap_iri}"
raise err from None


def delete_doap_of_group_on_server(
existing_doaps: list[Doap],
forGroup: Group,
dsp_client: DspClient,
) -> list[Doap]:
doaps_to_delete = [
doap for doap in existing_doaps if isinstance(doap.target, GroupDoapTarget) and doap.target.group == forGroup
]
if not doaps_to_delete:
logger.warning(f"There are no DOAPs to delete on {dsp_client.server} for group {forGroup}")
return existing_doaps
logger.info(f"Deleting the DOAP for group {forGroup} on server {dsp_client.server}")
for doap in doaps_to_delete:
_delete_doap_on_server(doap, dsp_client)
existing_doaps.remove(doap)
logger.info(f"Deleted DOAP {doap.doap_iri}")
return existing_doaps
56 changes: 14 additions & 42 deletions dsp_permissions_scripts/doap/doap_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from urllib.parse import quote_plus

from dsp_permissions_scripts.doap.doap_model import Doap
from dsp_permissions_scripts.doap.doap_model import DoapTarget
from dsp_permissions_scripts.doap.doap_model import DoapTargetType
from dsp_permissions_scripts.doap.doap_model import EntityDoapTarget
from dsp_permissions_scripts.doap.doap_model import GroupDoapTarget
from dsp_permissions_scripts.models.errors import ApiError
from dsp_permissions_scripts.models.group import Group
from dsp_permissions_scripts.utils.dsp_client import DspClient
Expand All @@ -14,26 +14,6 @@
logger = get_logger(__name__)


def _filter_doaps_by_target(
doaps: list[Doap],
target: DoapTargetType,
) -> list[Doap]:
"""
Returns only the DOAPs that are related to either a group, or a resource class, or a property.
In case of "all", return all DOAPs.
"""
match target:
case DoapTargetType.ALL:
filtered_doaps = doaps
case DoapTargetType.GROUP:
filtered_doaps = [d for d in doaps if d.target.group]
case DoapTargetType.PROPERTY:
filtered_doaps = [d for d in doaps if d.target.property]
case DoapTargetType.RESOURCE_CLASS:
filtered_doaps = [d for d in doaps if d.target.resource_class]
return filtered_doaps


def _get_all_doaps_of_project(project_iri: str, dsp_client: DspClient) -> list[Doap]:
project_iri = quote_plus(project_iri, safe="")
try:
Expand All @@ -49,24 +29,22 @@ def _get_all_doaps_of_project(project_iri: str, dsp_client: DspClient) -> list[D
def create_doap_from_admin_route_response(permission: dict[str, Any]) -> Doap:
"""Deserializes a DOAP from JSON as returned by /admin/permissions/doap/{project_iri}"""
scope = create_scope_from_admin_route_object(permission["hasPermissions"])
doap = Doap(
target=DoapTarget(
project_iri=permission["forProject"],
group=Group(val=permission["forGroup"]) if permission.get("forGroup") else None,
resource_class=permission.get("forResourceClass"),
property=permission.get("forProperty"),
),
target: GroupDoapTarget | EntityDoapTarget
match permission:
case {"forProject": project_iri, "forGroup": group}:
target = GroupDoapTarget(project_iri=project_iri, group=Group(val=group))
case {"forProject": project_iri, **p}:
target = EntityDoapTarget(
project_iri=project_iri, resource_class=p.get("forResourceClass"), property=p.get("forProperty")
)
return Doap(
target=target,
scope=scope,
doap_iri=permission["iri"],
)
return doap


def get_doaps_of_project(
shortcode: str,
dsp_client: DspClient,
target_type: DoapTargetType = DoapTargetType.ALL,
) -> list[Doap]:
def get_doaps_of_project(shortcode: str, dsp_client: DspClient) -> list[Doap]:
"""
Returns the DOAPs for a project.
Optionally, select only the DOAPs that are related to either a group, or a resource class, or a property.
Expand All @@ -75,12 +53,6 @@ def get_doaps_of_project(
logger.info("****** Retrieving all DOAPs... ******")
project_iri, _ = get_project_iri_and_onto_iris_by_shortcode(shortcode, dsp_client)
doaps = _get_all_doaps_of_project(project_iri, dsp_client)
filtered_doaps = _filter_doaps_by_target(
doaps=doaps,
target=target_type,
)
msg = f"Retrieved {len(doaps)} DOAPs"
if target_type != DoapTargetType.ALL:
msg += f", {len(filtered_doaps)} of which are related to {target_type}."
logger.info(msg)
return filtered_doaps
return doaps
53 changes: 20 additions & 33 deletions dsp_permissions_scripts/doap/doap_model.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

from enum import Enum
from typing import Self

from pydantic import BaseModel
Expand All @@ -13,54 +12,42 @@
class Doap(BaseModel):
"""Model representing a DOAP, containing the target, the scope and the IRI of the DOAP."""

target: DoapTarget
target: GroupDoapTarget | EntityDoapTarget
scope: PermissionScope
doap_iri: str


class DoapTarget(BaseModel):
class GroupDoapTarget(BaseModel):
project_iri: str
group: Group


class EntityDoapTarget(BaseModel):
project_iri: str
group: Group | None = None
resource_class: str | None = None
property: str | None = None

@model_validator(mode="after")
def assert_correct_combination(self) -> Self:
# asserts that DOAP is only defined for Group or ResourceClass or Property
# or a combination of ResourceClass and Property
match (self.group, self.resource_class, self.property):
case (None, None, None):
raise ValueError("At least one of group, resource_class or property must be set")
case (_, None, None) | (None, _, _):
pass
case _:
raise ValueError("Invalid combination of group, resource_class and property")
def _validate(self) -> Self:
if self.resource_class is None and self.property is None:
raise ValueError("At least one of resource_class or property must be set")
return self


class NewDoapTarget(BaseModel):
class NewGroupDoapTarget(BaseModel):
"""Represents the target of a DOAP that is yet to be created."""

group: Group


class NewEntityDoapTarget(BaseModel):
"""Represents the target of a DOAP that is yet to be created."""

group: Group | None = None
resource_class: str | None = None
property: str | None = None

@model_validator(mode="after")
def assert_correct_combination(self) -> Self:
# asserts that DOAP is only defined for Group or ResourceClass or Property
# or a combination of ResourceClass and Property
match (self.group, self.resource_class, self.property):
case (None, None, None):
raise ValueError("At least one of group, resource_class or property must be set")
case (_, None, None) | (None, _, _):
pass
case _:
raise ValueError("Invalid combination of group, resource_class and property")
def _validate(self) -> Self:
if self.resource_class is None and self.property is None:
raise ValueError("At least one of resource_class or property must be set")
return self


class DoapTargetType(Enum):
ALL = "all"
GROUP = "group"
RESOURCE_CLASS = "resource_class"
PROPERTY = "property"
8 changes: 2 additions & 6 deletions dsp_permissions_scripts/doap/doap_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Literal

from dsp_permissions_scripts.doap.doap_model import Doap
from dsp_permissions_scripts.doap.doap_model import DoapTargetType
from dsp_permissions_scripts.utils.get_logger import get_logger
from dsp_permissions_scripts.utils.get_logger import get_timestamp

Expand All @@ -18,15 +17,12 @@ def serialize_doaps_of_project(
project_doaps: list[Doap],
shortcode: str,
mode: Literal["original", "modified"],
host: str,
target_type: DoapTargetType = DoapTargetType.ALL,
server: str,
) -> None:
"""Serialize the DOAPs of a project to a JSON file."""
filepath = _get_file_path(shortcode, mode)
filepath.parent.mkdir(parents=True, exist_ok=True)
explanation_string = f"{get_timestamp()}: Project {shortcode} on host {host} has {len(project_doaps)} DOAPs"
if target_type != DoapTargetType.ALL:
explanation_string += f" which are related to a {target_type}"
explanation_string = f"{get_timestamp()}: Project {shortcode} on server {server} has {len(project_doaps)} DOAPs"
doaps_as_dicts = [doap.model_dump(exclude_none=True, mode="json") for doap in project_doaps]
doaps_as_dict = {explanation_string: doaps_as_dicts}
with open(filepath, mode="w", encoding="utf-8") as f:
Expand Down
19 changes: 10 additions & 9 deletions dsp_permissions_scripts/doap/doap_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from dsp_permissions_scripts.doap.doap_get import create_doap_from_admin_route_response
from dsp_permissions_scripts.doap.doap_model import Doap
from dsp_permissions_scripts.doap.doap_model import NewDoapTarget
from dsp_permissions_scripts.doap.doap_model import NewEntityDoapTarget
from dsp_permissions_scripts.doap.doap_model import NewGroupDoapTarget
from dsp_permissions_scripts.models.errors import ApiError
from dsp_permissions_scripts.models.scope import PermissionScope
from dsp_permissions_scripts.utils.dsp_client import DspClient
Expand All @@ -25,32 +26,32 @@ def _update_doap_scope_on_server(doap_iri: str, scope: PermissionScope, dsp_clie
return new_doap


def apply_updated_scopes_of_doaps_on_server(doaps: list[Doap], host: str, dsp_client: DspClient) -> None:
def apply_updated_scopes_of_doaps_on_server(doaps: list[Doap], dsp_client: DspClient) -> None:
if not doaps:
logger.warning(f"There are no DOAPs to update on {host}")
logger.warning(f"There are no DOAPs to update on {dsp_client.server}")
return
logger.info(f"****** Updating scopes of {len(doaps)} DOAPs on {host}... ******")
logger.info(f"****** Updating scopes of {len(doaps)} DOAPs on {dsp_client.server}... ******")
for d in doaps:
try:
_ = _update_doap_scope_on_server(d.doap_iri, d.scope, dsp_client)
logger.info(f"Successfully updated DOAP {d.doap_iri}")
except ApiError as err:
logger.error(err)
logger.info(f"Finished updating scopes of {len(doaps)} DOAPs on {host}")
logger.info(f"Finished updating scopes of {len(doaps)} DOAPs on {dsp_client.server}")


def create_new_doap_on_server(
target: NewDoapTarget,
target: NewGroupDoapTarget | NewEntityDoapTarget,
shortcode: str,
scope: PermissionScope,
dsp_client: DspClient,
) -> Doap | None:
proj_iri, _ = get_project_iri_and_onto_iris_by_shortcode(shortcode, dsp_client)
payload = {
"forGroup": target.group.full_iri() if target.group else None,
"forGroup": target.group.full_iri() if isinstance(target, NewGroupDoapTarget) else None,
"forProject": proj_iri,
"forProperty": target.property,
"forResourceClass": target.resource_class,
"forProperty": target.property if isinstance(target, NewEntityDoapTarget) else None,
"forResourceClass": target.resource_class if isinstance(target, NewEntityDoapTarget) else None,
"hasPermissions": create_admin_route_object_from_scope(scope),
}
try:
Expand Down
5 changes: 0 additions & 5 deletions dsp_permissions_scripts/models/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ class SpecifiedPropsNotEmptyError(Exception):
message: str = "specified_props must be empty if retrieve_values is not 'specified_props'"


@dataclass
class OapEmptyError(Exception):
message: str = "An OAP must specify at least one resource_oap or one value_oap"


@dataclass
class EmptyScopeError(Exception):
message: str = "PermissionScope must not be empty"
Expand Down
3 changes: 0 additions & 3 deletions dsp_permissions_scripts/models/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ class Hosts:

LOCALHOST = "http://0.0.0.0:3333"
PROD = "https://api.dasch.swiss"
RDU_STAGE = "https://api.rdu-stage.dasch.swiss"
DEV = "https://api.dev.dasch.swiss"
LS_PROD = "https://api.ls-prod.admin.ch"
STAGE = "https://api.stage.dasch.swiss"

@staticmethod
def get_host(identifier: str) -> str:
Expand Down
Loading

0 comments on commit 8cb74b1

Please sign in to comment.