-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into wip/scenario-tanner
- Loading branch information
Showing
14 changed files
with
468 additions
and
26 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
from typing import Any | ||
from urllib.parse import quote_plus | ||
|
||
import requests | ||
|
||
from dsp_permissions_scripts.ap.ap_model import Ap, ApValue | ||
from dsp_permissions_scripts.utils.authentication import get_protocol | ||
from dsp_permissions_scripts.utils.get_logger import get_logger | ||
from dsp_permissions_scripts.utils.project import get_project_iri_by_shortcode | ||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
def create_ap_from_admin_route_object(permission: dict[str, Any]) -> Ap: | ||
"""Deserializes a AP from JSON as returned by /admin/permissions/ap/{project_iri}""" | ||
ap = Ap( | ||
forGroup=permission["forGroup"], | ||
forProject=permission["forProject"], | ||
hasPermissions=frozenset(ApValue(p["name"]) for p in permission["hasPermissions"]), | ||
iri=permission["iri"], | ||
) | ||
return ap | ||
|
||
|
||
def create_admin_route_object_from_ap(ap: Ap) -> dict[str, Any]: | ||
"""Serializes a AP to JSON as expected by /admin/permissions/ap/{project_iri}""" | ||
has_permissions = [ | ||
{"additionalInformation": None, "name": p.value, "permissionCode": None} for p in ap.hasPermissions | ||
] | ||
ap_dict = { | ||
"forGroup": ap.forGroup, | ||
"forProject": ap.forProject, | ||
"hasPermissions": has_permissions, | ||
"iri": ap.iri, | ||
} | ||
return ap_dict | ||
|
||
|
||
def _get_all_aps_of_project( | ||
project_iri: str, | ||
host: str, | ||
token: str, | ||
) -> list[Ap]: | ||
"""Returns all Administrative Permissions of the given project.""" | ||
headers = {"Authorization": f"Bearer {token}"} | ||
project_iri = quote_plus(project_iri, safe="") | ||
protocol = get_protocol(host) | ||
url = f"{protocol}://{host}/admin/permissions/ap/{project_iri}" | ||
response = requests.get(url, headers=headers, timeout=5) | ||
assert response.status_code == 200, f"Status {response.status_code}. Error message from DSP-API: {response.text}" | ||
aps: list[dict[str, Any]] = response.json()["administrative_permissions"] | ||
ap_objects = [create_ap_from_admin_route_object(ap) for ap in aps] | ||
return ap_objects | ||
|
||
|
||
def get_aps_of_project( | ||
host: str, | ||
shortcode: str, | ||
token: str, | ||
) -> list[Ap]: | ||
"""Returns the Administrative Permissions for a project.""" | ||
logger.info(f"******* Getting Administrative Permissions of project {shortcode} on server {host} *******") | ||
project_iri = get_project_iri_by_shortcode( | ||
shortcode=shortcode, | ||
host=host, | ||
) | ||
aps = _get_all_aps_of_project( | ||
project_iri=project_iri, | ||
host=host, | ||
token=token, | ||
) | ||
logger.info(f"Found {len(aps)} Administrative Permissions") | ||
return aps |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
from enum import Enum | ||
|
||
from pydantic import BaseModel | ||
|
||
|
||
class ApValue(Enum): | ||
# is allowed to create resources inside the project | ||
ProjectResourceCreateAllPermission = "ProjectResourceCreateAllPermission" | ||
# is allowed to create resources of certain classes inside the project | ||
ProjectResourceCreateRestrictedPermission = "ProjectResourceCreateRestrictedPermission" | ||
# is allowed to do anything on project level | ||
ProjectAdminAllPermission = "ProjectAdminAllPermission" | ||
# is allowed to modify group info and group membership on all groups belonging to the project | ||
ProjectAdminGroupAllPermission = "ProjectAdminGroupAllPermission" | ||
# is allowed to modify group info and group membership on certain groups belonging to the project | ||
ProjectAdminGroupRestrictedPermission = "ProjectAdminGroupRestrictedPermission" | ||
# is allowed to change the permissions on all objects belonging to the project | ||
ProjectAdminRightsAllPermission = "ProjectAdminRightsAllPermission" | ||
# is allowed to administrate the project ontologies | ||
ProjectAdminOntologyAllPermission = "ProjectAdminOntologyAllPermission" | ||
|
||
|
||
class Ap(BaseModel): | ||
"""Represents an Administrative Permission""" | ||
|
||
forGroup: str | ||
forProject: str | ||
hasPermissions: frozenset[ApValue] | ||
iri: str | ||
|
||
def add_permission(self, permission: ApValue) -> None: | ||
"""Adds a permission to the AP.""" | ||
if permission in self.hasPermissions: | ||
raise ValueError(f"Permission {permission} is already in the AP") | ||
self.hasPermissions = self.hasPermissions.union({permission}) | ||
|
||
def remove_permission(self, permission: ApValue) -> None: | ||
"""Removes a permission from the AP.""" | ||
if permission not in self.hasPermissions: | ||
raise ValueError(f"Permission {permission} is not in the AP") | ||
self.hasPermissions = self.hasPermissions.difference({permission}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import json | ||
from pathlib import Path | ||
from typing import Literal | ||
|
||
from dsp_permissions_scripts.ap.ap_model import Ap | ||
|
||
|
||
def _get_file_path(shortcode: str, mode: Literal["original", "modified"]) -> Path: | ||
return Path(f"project_data/{shortcode}/APs_{mode}.json") | ||
|
||
|
||
def serialize_aps_of_project( | ||
project_aps: list[Ap], | ||
shortcode: str, | ||
mode: Literal["original", "modified"], | ||
) -> None: | ||
"""Serialize the APs of a project to a JSON file.""" | ||
filepath = _get_file_path(shortcode, mode) | ||
filepath.parent.mkdir(parents=True, exist_ok=True) | ||
explanation_string = f"Project {shortcode} has {len(project_aps)} APs" | ||
aps_as_dicts = [ap.model_dump(exclude_none=True, mode="json") for ap in project_aps] | ||
aps_as_dict = {explanation_string: aps_as_dicts} | ||
with open(filepath, mode="w", encoding="utf-8") as f: | ||
f.write(json.dumps(aps_as_dict, ensure_ascii=False, indent=2)) | ||
|
||
|
||
def deserialize_aps_of_project( | ||
shortcode: str, | ||
mode: Literal["original", "modified"], | ||
) -> list[Ap]: | ||
"""Deserialize the APs of a project from a JSON file.""" | ||
filepath = _get_file_path(shortcode, mode) | ||
with open(filepath, mode="r", encoding="utf-8") as f: | ||
aps_as_dict = json.load(f) | ||
aps_as_dicts = list(aps_as_dict.values())[0] | ||
return [Ap.model_validate(d) for d in aps_as_dicts] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
import warnings | ||
from typing import Any | ||
from urllib.parse import quote_plus | ||
|
||
import requests | ||
|
||
from dsp_permissions_scripts.ap.ap_get import ( | ||
create_admin_route_object_from_ap, | ||
create_ap_from_admin_route_object, | ||
) | ||
from dsp_permissions_scripts.ap.ap_model import Ap | ||
from dsp_permissions_scripts.utils.authentication import get_protocol | ||
from dsp_permissions_scripts.utils.get_logger import get_logger, get_timestamp | ||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
def _filter_aps_by_group( | ||
aps: list[Ap], | ||
forGroup: str, | ||
) -> Ap: | ||
aps = [ap for ap in aps if ap.forGroup == forGroup] | ||
assert len(aps) == 1 | ||
return aps[0] | ||
|
||
|
||
def _delete_single_ap( | ||
ap: Ap, | ||
host: str, | ||
token: str, | ||
) -> None: | ||
headers = {"Authorization": f"Bearer {token}"} | ||
ap_iri = quote_plus(ap.iri, safe="") | ||
protocol = get_protocol(host) | ||
url = f"{protocol}://{host}/admin/permissions/{ap_iri}" | ||
response = requests.delete(url, headers=headers, timeout=5) | ||
assert response.status_code == 200, f"Status {response.status_code}. Error message from DSP-API: {response.text}" | ||
logger.info(f"Deleted Administrative Permission {ap.iri} on host {host}") | ||
|
||
|
||
def _update_ap( | ||
ap: Ap, | ||
host: str, | ||
token: str, | ||
) -> Ap: | ||
""" | ||
Updates the given AP. | ||
""" | ||
iri = quote_plus(ap.iri, safe="") | ||
headers = {"Authorization": f"Bearer {token}"} | ||
protocol = get_protocol(host) | ||
url = f"{protocol}://{host}/admin/permissions/{iri}/hasPermissions" | ||
payload = {"hasPermissions": create_admin_route_object_from_ap(ap)["hasPermissions"]} | ||
response = requests.put(url, headers=headers, json=payload, timeout=5) | ||
assert response.status_code == 200, f"Status {response.status_code}. Error message from DSP-API: {response.text}" | ||
ap_updated: dict[str, Any] = response.json()["administrative_permission"] | ||
ap_object_updated = create_ap_from_admin_route_object(ap_updated) | ||
return ap_object_updated | ||
|
||
|
||
def _log_and_print_ap_update(ap: Ap) -> None: | ||
"""Logs and prints the AP after the update.""" | ||
heading = "Updated AP as per response from server:" | ||
body = ap.model_dump_json(indent=2) | ||
print(f"{heading}\n{'-' * len(heading)}\n{body}\n") | ||
logger.info(f"{heading}\n{body}") | ||
|
||
|
||
def apply_updated_aps_on_server( | ||
aps: list[Ap], | ||
host: str, | ||
token: str, | ||
) -> None: | ||
""" | ||
Updates APs on the server. | ||
Args: | ||
aps: the APs to be sent to the server | ||
host: the DSP server where the project is located | ||
token: the access token | ||
""" | ||
logger.info(f"******* Updating {len(aps)} APs on {host} *******") | ||
heading = f"{get_timestamp()}: Updating {len(aps)} APs on {host}..." | ||
print(f"\n{heading}\n{'=' * len(heading)}\n") | ||
for ap in aps: | ||
try: | ||
new_ap = _update_ap( | ||
ap=ap, | ||
host=host, | ||
token=token, | ||
) | ||
_log_and_print_ap_update(ap=new_ap) | ||
except Exception: # pylint: disable=broad-exception-caught | ||
logger.error(f"ERROR while updating Administrative Permission {ap.iri}", exc_info=True) | ||
warnings.warn(f"ERROR while updating Administrative Permission {ap.iri}") | ||
|
||
print(f"{get_timestamp()}: All APs have been updated.") | ||
|
||
|
||
def delete_ap( | ||
host: str, | ||
token: str, | ||
existing_aps: list[Ap], | ||
forGroup: str, | ||
) -> list[Ap]: | ||
"""Deletes the Administrative Permission of a group.""" | ||
logger.info(f"Deleting the Administrative Permission for group {forGroup} on server {host}") | ||
ap_to_delete = _filter_aps_by_group( | ||
aps=existing_aps, | ||
forGroup=forGroup, | ||
) | ||
_delete_single_ap( | ||
ap=ap_to_delete, | ||
host=host, | ||
token=token, | ||
) | ||
existing_aps.remove(ap_to_delete) | ||
return existing_aps |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.