-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into wip/handle-duplicates
- Loading branch information
Showing
19 changed files
with
536 additions
and
508 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,53 +1,35 @@ | ||
import warnings | ||
from urllib.parse import quote_plus | ||
|
||
import requests | ||
|
||
from dsp_permissions_scripts.ap.ap_model import Ap | ||
from dsp_permissions_scripts.models.api_error import ApiError | ||
from dsp_permissions_scripts.utils.authentication import get_protocol | ||
from dsp_permissions_scripts.models.errors import ApiError | ||
from dsp_permissions_scripts.utils.dsp_client import DspClient | ||
from dsp_permissions_scripts.utils.get_logger import get_logger | ||
from dsp_permissions_scripts.utils.try_request import http_call_with_retry | ||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
def _delete_ap_on_server( | ||
ap: Ap, | ||
host: str, | ||
token: str, | ||
) -> None: | ||
headers = {"Authorization": f"Bearer {token}"} | ||
def _delete_ap_on_server(ap: Ap, dsp_client: DspClient) -> None: | ||
ap_iri = quote_plus(ap.iri, safe="") | ||
protocol = get_protocol(host) | ||
url = f"{protocol}://{host}/admin/permissions/{ap_iri}" | ||
response = http_call_with_retry( | ||
action=lambda: requests.delete(url, headers=headers, timeout=20), | ||
err_msg=f"Could not delete Administrative Permission {ap.iri}", | ||
) | ||
if response.status_code != 200: | ||
raise ApiError(f"Could not delete Administrative Permission {ap.iri}", response.text, response.status_code) | ||
try: | ||
dsp_client.delete(f"/admin/permissions/{ap_iri}") | ||
except ApiError as err: | ||
err.message = f"Could not delete Administrative Permission {ap.iri}" | ||
raise err from None | ||
|
||
|
||
def delete_ap_of_group_on_server( | ||
host: str, | ||
token: str, | ||
existing_aps: list[Ap], | ||
forGroup: str, | ||
dsp_client: DspClient, | ||
) -> list[Ap]: | ||
aps_to_delete = [ap for ap in existing_aps if ap.forGroup == forGroup] | ||
if not aps_to_delete: | ||
logger.warning(f"There are no APs to delete on {host} for group {forGroup}") | ||
warnings.warn(f"There are no APs to delete on {host} for group {forGroup}") | ||
return existing_aps | ||
print(f"Deleting the Administrative Permissions for group {forGroup} on server {host}") | ||
logger.info(f"Deleting the Administrative Permissions for group {forGroup} on server {host}") | ||
for ap in aps_to_delete: | ||
_delete_ap_on_server( | ||
ap=ap, | ||
host=host, | ||
token=token, | ||
) | ||
_delete_ap_on_server(ap, dsp_client) | ||
existing_aps.remove(ap) | ||
logger.info(f"Deleted Administrative Permission {ap.iri} on host {host}") | ||
logger.info(f"Deleted Administrative Permission {ap.iri}") | ||
return existing_aps |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,67 +1,40 @@ | ||
import warnings | ||
from typing import Any | ||
from urllib.parse import quote_plus | ||
|
||
import requests | ||
|
||
from dsp_permissions_scripts.ap.ap_get import ( | ||
create_admin_route_object_from_ap, | ||
create_ap_from_admin_route_object, | ||
) | ||
from dsp_permissions_scripts.ap.ap_model import Ap | ||
from dsp_permissions_scripts.models.api_error import ApiError | ||
from dsp_permissions_scripts.utils.authentication import get_protocol | ||
from dsp_permissions_scripts.models.errors import ApiError | ||
from dsp_permissions_scripts.utils.dsp_client import DspClient | ||
from dsp_permissions_scripts.utils.get_logger import get_logger | ||
from dsp_permissions_scripts.utils.try_request import http_call_with_retry | ||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
def _update_ap_on_server( | ||
ap: Ap, | ||
host: str, | ||
token: str, | ||
) -> Ap: | ||
def _update_ap_on_server(ap: Ap, dsp_client: DspClient) -> Ap: | ||
iri = quote_plus(ap.iri, safe="") | ||
headers = {"Authorization": f"Bearer {token}"} | ||
protocol = get_protocol(host) | ||
url = f"{protocol}://{host}/admin/permissions/{iri}/hasPermissions" | ||
payload = {"hasPermissions": create_admin_route_object_from_ap(ap)["hasPermissions"]} | ||
response = http_call_with_retry( | ||
action=lambda: requests.put(url, headers=headers, json=payload, timeout=20), | ||
err_msg=f"Could not update Administrative Permission {ap.iri}", | ||
) | ||
if response.status_code != 200: | ||
raise ApiError( | ||
message=f"Could not update Administrative Permission {ap.iri}", | ||
response_text=response.text, | ||
status_code=response.status_code, | ||
payload=payload, | ||
) | ||
ap_updated: dict[str, Any] = response.json()["administrative_permission"] | ||
try: | ||
response = dsp_client.put(f"/admin/permissions/{iri}/hasPermissions", data=payload) | ||
except ApiError as err: | ||
err.message = f"Could not update Administrative Permission {ap.iri}" | ||
raise err from None | ||
ap_updated: dict[str, Any] = response["administrative_permission"] | ||
ap_object_updated = create_ap_from_admin_route_object(ap_updated) | ||
return ap_object_updated | ||
|
||
|
||
def apply_updated_aps_on_server( | ||
aps: list[Ap], | ||
host: str, | ||
token: str, | ||
) -> None: | ||
def apply_updated_aps_on_server(aps: list[Ap], host: str, dsp_client: DspClient) -> None: | ||
if not aps: | ||
logger.warning(f"There are no APs to update on {host}") | ||
warnings.warn(f"There are no APs to update on {host}") | ||
return | ||
logger.info(f"Updating {len(aps)} APs on {host}...") | ||
print(f"Updating {len(aps)} APs on {host}...") | ||
logger.info(f"****** Updating {len(aps)} Administrative Permissions on {host}... ******") | ||
for ap in aps: | ||
try: | ||
_ = _update_ap_on_server( | ||
ap=ap, | ||
host=host, | ||
token=token, | ||
) | ||
_ = _update_ap_on_server(ap, dsp_client) | ||
logger.info(f"Successfully updated AP {ap.iri}") | ||
except ApiError as err: | ||
logger.error(err) | ||
warnings.warn(err.message) | ||
logger.info(f"Finished updating {len(aps)} Administrative Permissions on {host}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,63 +1,39 @@ | ||
import warnings | ||
from urllib.parse import quote_plus | ||
|
||
import requests | ||
|
||
from dsp_permissions_scripts.doap.doap_get import create_doap_from_admin_route_response | ||
from dsp_permissions_scripts.doap.doap_model import Doap | ||
from dsp_permissions_scripts.models.api_error import ApiError | ||
from dsp_permissions_scripts.models.errors import ApiError | ||
from dsp_permissions_scripts.models.scope import PermissionScope | ||
from dsp_permissions_scripts.utils.authentication import get_protocol | ||
from dsp_permissions_scripts.utils.dsp_client import DspClient | ||
from dsp_permissions_scripts.utils.get_logger import get_logger | ||
from dsp_permissions_scripts.utils.scope_serialization import ( | ||
create_admin_route_object_from_scope, | ||
) | ||
from dsp_permissions_scripts.utils.try_request import http_call_with_retry | ||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
def _update_doap_scope_on_server( | ||
doap_iri: str, | ||
scope: PermissionScope, | ||
host: str, | ||
token: str, | ||
) -> Doap: | ||
def _update_doap_scope_on_server(doap_iri: str, scope: PermissionScope, dsp_client: DspClient) -> Doap: | ||
iri = quote_plus(doap_iri, safe="") | ||
headers = {"Authorization": f"Bearer {token}"} | ||
protocol = get_protocol(host) | ||
url = f"{protocol}://{host}/admin/permissions/{iri}/hasPermissions" | ||
payload = {"hasPermissions": create_admin_route_object_from_scope(scope)} | ||
response = http_call_with_retry( | ||
action=lambda: requests.put(url, headers=headers, json=payload, timeout=20), | ||
err_msg=f"Could not update scope of DOAP {doap_iri}", | ||
) | ||
if response.status_code != 200: | ||
raise ApiError( f"Could not update scope of DOAP {doap_iri}", response.text, response.status_code, payload) | ||
new_doap = create_doap_from_admin_route_response(response.json()["default_object_access_permission"]) | ||
try: | ||
response = dsp_client.put(f"/admin/permissions/{iri}/hasPermissions", data=payload) | ||
except ApiError as err: | ||
err.message = f"Could not update scope of DOAP {doap_iri}" | ||
raise err from None | ||
new_doap = create_doap_from_admin_route_response(response["default_object_access_permission"]) | ||
return new_doap | ||
|
||
|
||
def apply_updated_doaps_on_server( | ||
doaps: list[Doap], | ||
host: str, | ||
token: str, | ||
) -> None: | ||
def apply_updated_doaps_on_server(doaps: list[Doap], host: str, dsp_client: DspClient) -> None: | ||
if not doaps: | ||
logger.warning(f"There are no DOAPs to update on {host}") | ||
warnings.warn(f"There are no DOAPs to update on {host}") | ||
return | ||
logger.info(f"Updating {len(doaps)} DOAPs on {host}...") | ||
print(f"Updating {len(doaps)} DOAPs on {host}...") | ||
logger.info(f"****** Updating {len(doaps)} DOAPs on {host}... ******") | ||
for d in doaps: | ||
try: | ||
_ = _update_doap_scope_on_server( | ||
doap_iri=d.doap_iri, | ||
scope=d.scope, | ||
host=host, | ||
token=token, | ||
) | ||
_ = _update_doap_scope_on_server(d.doap_iri, d.scope, dsp_client) | ||
logger.info(f"Successfully updated DOAP {d.doap_iri}") | ||
except ApiError as err: | ||
logger.error(err) | ||
warnings.warn(err.message) | ||
logger.info(f"Finished updating {len(doaps)} DOAPs on {host}") |
Oops, something went wrong.