Skip to content

Commit

Permalink
implement apply_updated_aps_on_server()
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum committed Oct 12, 2023
1 parent f568507 commit c0fbd69
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 7 deletions.
11 changes: 6 additions & 5 deletions dsp_permissions_scripts/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dsp_permissions_scripts.models.scope import PUBLIC
from dsp_permissions_scripts.utils.ap.ap_get import delete_ap, get_aps_of_project
from dsp_permissions_scripts.utils.ap.ap_serialize import serialize_aps_of_project
from dsp_permissions_scripts.utils.ap.ap_set import apply_updated_aps_on_server
from dsp_permissions_scripts.utils.authentication import login
from dsp_permissions_scripts.utils.doap_get import get_doaps_of_project
from dsp_permissions_scripts.utils.doap_serialize import serialize_doaps_of_project
Expand Down Expand Up @@ -70,11 +71,11 @@ def update_aps(
shortcode=shortcode,
mode="modified",
)
# apply_updated_aps_on_server(
# doaps=project_aps_updated,
# host=host,
# token=token,
# )
apply_updated_aps_on_server(
aps=project_aps_updated,
host=host,
token=token,
)


def update_doaps(
Expand Down
19 changes: 17 additions & 2 deletions dsp_permissions_scripts/utils/ap/ap_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
logger = get_logger(__name__)


def _create_ap_from_admin_route_response(permission: dict[str, Any]) -> Ap:
def create_ap_from_admin_route_object(permission: dict[str, Any]) -> Ap:
"""Deserializes a AP from JSON as returned by /admin/permissions/ap/{project_iri}"""
ap = Ap(
forGroup=permission["forGroup"],
Expand All @@ -23,6 +23,21 @@ def _create_ap_from_admin_route_response(permission: dict[str, Any]) -> Ap:
return ap


def create_admin_route_object_from_ap(ap: Ap) -> dict[str, Any]:
"""Serializes a AP to JSON as expected by /admin/permissions/ap/{project_iri}"""
has_permissions = [
{"additionalInformation": None, "name": p.value, "permissionCode": None}
for p in ap.hasPermissions
]
ap_dict = {
"forGroup": ap.forGroup,
"forProject": ap.forProject,
"hasPermissions": has_permissions,
"iri": ap.iri,
}
return ap_dict


def _get_all_aps_of_project(
project_iri: str,
host: str,
Expand All @@ -36,7 +51,7 @@ def _get_all_aps_of_project(
response = requests.get(url, headers=headers, timeout=5)
assert response.status_code == 200
aps: list[dict[str, Any]] = response.json()["administrative_permissions"]
ap_objects = [_create_ap_from_admin_route_response(ap) for ap in aps]
ap_objects = [create_ap_from_admin_route_object(ap) for ap in aps]
return ap_objects


Expand Down
79 changes: 79 additions & 0 deletions dsp_permissions_scripts/utils/ap/ap_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import warnings
from typing import Any, Literal
from urllib.parse import quote_plus

import requests

from dsp_permissions_scripts.models.ap import Ap
from dsp_permissions_scripts.utils.ap.ap_get import (
create_admin_route_object_from_ap,
create_ap_from_admin_route_object,
)
from dsp_permissions_scripts.utils.authentication import get_protocol
from dsp_permissions_scripts.utils.get_logger import get_logger, get_timestamp

logger = get_logger(__name__)

def _update_ap(
ap: Ap,
host: str,
token: str,
) -> Ap:
"""
Updates the given AP.
"""
iri = quote_plus(ap.iri, safe="")
headers = {"Authorization": f"Bearer {token}"}
protocol = get_protocol(host)
url = f"{protocol}://{host}/admin/permissions/{iri}/hasPermissions"
payload = {"hasPermissions": create_admin_route_object_from_ap(ap)["hasPermissions"]}
response = requests.put(url, headers=headers, json=payload, timeout=5)
assert response.status_code == 200
ap_updated: dict[str, Any] = response.json()["administrative_permission"]
ap_object_updated = create_ap_from_admin_route_object(ap_updated)
return ap_object_updated


def _log_and_print_ap_update(
ap: Ap,
state: Literal["before", "after"],
) -> None:
"""
Logs and prints the AP before or after the update.
"""
heading = f"AP {state}:"
body = ap.model_dump_json(indent=2)
print(f"{heading}\n{'-' * len(heading)}\n{body}\n")
logger.info(f"{heading}\n{body}")


def apply_updated_aps_on_server(
aps: list[Ap],
host: str,
token: str,
) -> None:
"""
Updates APs on the server.
Args:
aps: the APs to be sent to the server
host: the DSP server where the project is located
token: the access token
"""
logger.info(f"******* Updating {len(aps)} APs on {host} *******")
heading = f"{get_timestamp()}: Updating {len(aps)} APs on {host}..."
print(f"\n{heading}\n{'=' * len(heading)}\n")
for ap in aps:
_log_and_print_ap_update(ap=ap, state="before")
try:
new_ap = _update_ap(
ap=ap,
host=host,
token=token,
)
_log_and_print_ap_update(ap=new_ap, state="after")
except Exception: # pylint: disable=broad-exception-caught
logger.error(f"ERROR while updating Administrative Permission {ap.iri}", exc_info=True)
warnings.warn(f"ERROR while updating Administrative Permission {ap.iri}")

print(f"{get_timestamp()}: All APs have been updated.")

0 comments on commit c0fbd69

Please sign in to comment.