diff --git a/dsp_permissions_scripts/template.py b/dsp_permissions_scripts/template.py index e26f0f10..9a19f5d3 100644 --- a/dsp_permissions_scripts/template.py +++ b/dsp_permissions_scripts/template.py @@ -6,9 +6,12 @@ from dsp_permissions_scripts.models.host import Hosts from dsp_permissions_scripts.models.oap import Oap from dsp_permissions_scripts.models.scope import PUBLIC -from dsp_permissions_scripts.utils.ap.ap_get import delete_ap, get_aps_of_project +from dsp_permissions_scripts.utils.ap.ap_get import get_aps_of_project from dsp_permissions_scripts.utils.ap.ap_serialize import serialize_aps_of_project -from dsp_permissions_scripts.utils.ap.ap_set import apply_updated_aps_on_server +from dsp_permissions_scripts.utils.ap.ap_set import ( + apply_updated_aps_on_server, + delete_ap, +) from dsp_permissions_scripts.utils.authentication import login from dsp_permissions_scripts.utils.doap_get import get_doaps_of_project from dsp_permissions_scripts.utils.doap_serialize import serialize_doaps_of_project diff --git a/dsp_permissions_scripts/utils/ap/ap_get.py b/dsp_permissions_scripts/utils/ap/ap_get.py index 8b040a5d..2a14a7cc 100644 --- a/dsp_permissions_scripts/utils/ap/ap_get.py +++ b/dsp_permissions_scripts/utils/ap/ap_get.py @@ -55,50 +55,6 @@ def _get_all_aps_of_project( return ap_objects -def _filter_aps_by_group( - aps: list[Ap], - forGroup: str, -) -> Ap: - aps = [ap for ap in aps if ap.forGroup == forGroup] - assert len(aps) == 1 - return aps[0] - - -def _delete_single_ap( - ap: Ap, - host: str, - token: str, -) -> None: - headers = {"Authorization": f"Bearer {token}"} - ap_iri = quote_plus(ap.iri, safe="") - protocol = get_protocol(host) - url = f"{protocol}://{host}/admin/permissions/{ap_iri}" - response = requests.delete(url, headers=headers, timeout=5) - assert response.status_code == 200 - logger.info(f"Deleted Administrative Permission {ap.iri} on host {host}") - - -def delete_ap( - host: str, - token: str, - existing_aps: list[Ap], - forGroup: str, -) -> list[Ap]: - """Deletes the Administrative Permission of a group.""" - logger.info(f"Deleting the Administrative Permission for group {forGroup} on server {host}") - ap_to_delete = _filter_aps_by_group( - aps=existing_aps, - forGroup=forGroup, - ) - _delete_single_ap( - ap=ap_to_delete, - host=host, - token=token, - ) - existing_aps.remove(ap_to_delete) - return existing_aps - - def get_aps_of_project( host: str, shortcode: str, diff --git a/dsp_permissions_scripts/utils/ap/ap_set.py b/dsp_permissions_scripts/utils/ap/ap_set.py index 5e1380a1..0bb395a2 100644 --- a/dsp_permissions_scripts/utils/ap/ap_set.py +++ b/dsp_permissions_scripts/utils/ap/ap_set.py @@ -14,6 +14,30 @@ logger = get_logger(__name__) + +def _filter_aps_by_group( + aps: list[Ap], + forGroup: str, +) -> Ap: + aps = [ap for ap in aps if ap.forGroup == forGroup] + assert len(aps) == 1 + return aps[0] + + +def _delete_single_ap( + ap: Ap, + host: str, + token: str, +) -> None: + headers = {"Authorization": f"Bearer {token}"} + ap_iri = quote_plus(ap.iri, safe="") + protocol = get_protocol(host) + url = f"{protocol}://{host}/admin/permissions/{ap_iri}" + response = requests.delete(url, headers=headers, timeout=5) + assert response.status_code == 200 + logger.info(f"Deleted Administrative Permission {ap.iri} on host {host}") + + def _update_ap( ap: Ap, host: str, @@ -77,3 +101,24 @@ def apply_updated_aps_on_server( warnings.warn(f"ERROR while updating Administrative Permission {ap.iri}") print(f"{get_timestamp()}: All APs have been updated.") + + +def delete_ap( + host: str, + token: str, + existing_aps: list[Ap], + forGroup: str, +) -> list[Ap]: + """Deletes the Administrative Permission of a group.""" + logger.info(f"Deleting the Administrative Permission for group {forGroup} on server {host}") + ap_to_delete = _filter_aps_by_group( + aps=existing_aps, + forGroup=forGroup, + ) + _delete_single_ap( + ap=ap_to_delete, + host=host, + token=token, + ) + existing_aps.remove(ap_to_delete) + return existing_aps