diff --git a/dsp_permissions_scripts/template.py b/dsp_permissions_scripts/template.py index 99c32177..e0907a91 100644 --- a/dsp_permissions_scripts/template.py +++ b/dsp_permissions_scripts/template.py @@ -2,24 +2,31 @@ from dsp_permissions_scripts.models.groups import BuiltinGroup from dsp_permissions_scripts.models.host import Hosts -from dsp_permissions_scripts.models.permission import Oap +from dsp_permissions_scripts.models.permission import Doap, Oap from dsp_permissions_scripts.models.scope import PUBLIC from dsp_permissions_scripts.utils.authentication import login from dsp_permissions_scripts.utils.doap_get import ( get_doaps_of_project, print_doaps_of_project, ) -from dsp_permissions_scripts.utils.doap_set import set_doaps_of_groups +from dsp_permissions_scripts.utils.doap_set import apply_updated_doaps_on_server from dsp_permissions_scripts.utils.oap import apply_updated_oaps_on_server from dsp_permissions_scripts.utils.project import get_all_resource_oaps_of_project def modify_oaps(oaps: list[Oap]) -> list[Oap]: for oap in oaps: - oap.scope.D.append(BuiltinGroup.PROJECT_MEMBER) + oap.scope.CR.append(BuiltinGroup.SYSTEM_ADMIN) return oaps +def modify_doaps(doaps: list[Doap]) -> list[Doap]: + for doap in doaps: + if doap.target.group in [BuiltinGroup.PROJECT_MEMBER.value, BuiltinGroup.PROJECT_ADMIN.value]: + doap.scope = PUBLIC + return doaps + + def main() -> None: """ The main method assembles a sample call of all available high-level functions. @@ -29,24 +36,20 @@ def main() -> None: shortcode = "F18E" token = login(host) - new_scope = PUBLIC - groups = [BuiltinGroup.PROJECT_ADMIN, BuiltinGroup.PROJECT_MEMBER] - - doaps = get_doaps_of_project( + project_doaps = get_doaps_of_project( host=host, shortcode=shortcode, token=token, ) print_doaps_of_project( - doaps=doaps, + doaps=project_doaps, host=host, shortcode=shortcode, ) - set_doaps_of_groups( - scope=new_scope, - groups=groups, + project_doaps_updated = modify_doaps(doaps=project_doaps) + apply_updated_doaps_on_server( + doaps=project_doaps_updated, host=host, - shortcode=shortcode, token=token, ) resource_oaps = get_all_resource_oaps_of_project( diff --git a/dsp_permissions_scripts/utils/doap_get.py b/dsp_permissions_scripts/utils/doap_get.py index 6f83ee93..577636e3 100644 --- a/dsp_permissions_scripts/utils/doap_get.py +++ b/dsp_permissions_scripts/utils/doap_get.py @@ -5,11 +5,14 @@ from dsp_permissions_scripts.models.permission import Doap, DoapTarget, DoapTargetType from dsp_permissions_scripts.utils.authentication import get_protocol +from dsp_permissions_scripts.utils.get_logger import get_logger, get_timestamp from dsp_permissions_scripts.utils.project import get_project_iri_by_shortcode from dsp_permissions_scripts.utils.scope_serialization import ( create_scope_from_admin_route_object, ) +logger = get_logger(__name__) + def __filter_doaps_by_target( doaps: list[Doap], @@ -31,26 +34,7 @@ def __filter_doaps_by_target( return filtered_doaps -# TODO: this function is unused -def get_permissions_for_project( - project_iri: str, - host: str, - token: str, -) -> list[dict[str, Any]]: - """ - Returns all permissions for the given project. - """ - headers = {"Authorization": f"Bearer {token}"} - project_iri = quote_plus(project_iri, safe="") - protocol = get_protocol(host) - url = f"{protocol}://{host}/admin/permissions/{project_iri}" - response = requests.get(url, headers=headers, timeout=5) - assert response.status_code == 200 - permissions: list[dict[str, Any]] = response.json()["permissions"] - return permissions - - -def get_all_doaps_of_project( +def __get_all_doaps_of_project( project_iri: str, host: str, token: str, @@ -98,11 +82,12 @@ def get_doaps_of_project( Optionally, select only the DOAPs that are related to either a group, or a resource class, or a property. By default, all DOAPs are returned, regardless of their target (target=all). """ + logger.info(f"******* Getting DOAPs of project {shortcode} on server {host} *******") project_iri = get_project_iri_by_shortcode( shortcode=shortcode, host=host, ) - doaps = get_all_doaps_of_project( + doaps = __get_all_doaps_of_project( project_iri=project_iri, host=host, token=token, @@ -111,6 +96,7 @@ def get_doaps_of_project( doaps=doaps, target=target, ) + logger.info(f"Found {len(doaps)} DOAPs, {len(filtered_doaps)} of which are related to {target}.") return filtered_doaps @@ -123,7 +109,10 @@ def print_doaps_of_project( heading = f"Project {shortcode} on server {host} has {len(doaps)} DOAPs" if target != DoapTargetType.ALL: heading += f" which are related to a {target}" - print(f"\n{heading}\n{'=' * len(heading)}\n") + print(f"\n{get_timestamp()}: {heading}\n{'=' * (len(heading) + len(get_timestamp()) + 2)}\n") + logger.info(f"******* Printing DOAPs of project {shortcode} on server {host} *******") + logger.info(heading) for d in doaps: - print(d.model_dump_json(indent=2, exclude_none=True)) - print() + representation = d.model_dump_json(indent=2, exclude_none=True) + print(representation + "\n") + logger.info(representation) diff --git a/dsp_permissions_scripts/utils/doap_set.py b/dsp_permissions_scripts/utils/doap_set.py index 2833d5b7..a1794e0e 100644 --- a/dsp_permissions_scripts/utils/doap_set.py +++ b/dsp_permissions_scripts/utils/doap_set.py @@ -1,21 +1,18 @@ -from typing import Sequence +from typing import Literal from urllib.parse import quote_plus import requests -from dsp_permissions_scripts.models.groups import BuiltinGroup from dsp_permissions_scripts.models.permission import Doap from dsp_permissions_scripts.models.scope import PermissionScope from dsp_permissions_scripts.utils.authentication import get_protocol -from dsp_permissions_scripts.utils.doap_get import ( - create_doap_from_admin_route_response, - get_all_doaps_of_project, -) -from dsp_permissions_scripts.utils.project import get_project_iri_by_shortcode +from dsp_permissions_scripts.utils.doap_get import create_doap_from_admin_route_response +from dsp_permissions_scripts.utils.get_logger import get_logger, get_timestamp from dsp_permissions_scripts.utils.scope_serialization import ( create_admin_route_object_from_scope, ) +logger = get_logger(__name__) def __update_doap_scope( doap_iri: str, @@ -37,77 +34,42 @@ def __update_doap_scope( return new_doap -def __get_doaps_of_groups( - groups: Sequence[str | BuiltinGroup], - host: str, - shortcode: str, - token: str, -) -> list[Doap]: +def __log_and_print_doap_update( + doap: Doap, + state: Literal["before", "after"], +) -> None: """ - Retrieves the DOAPs for the given groups. - - Args: - groups: the group IRIs to whose DOAP the scope should be applied - host: the DSP server where the project is located - shortcode: the shortcode of the project - token: the access token - - Returns: - applicable_doaps: the applicable DOAPs + Logs and prints the DOAP before or after the update. """ - project_iri = get_project_iri_by_shortcode( - shortcode=shortcode, - host=host, - ) - all_doaps = get_all_doaps_of_project( - project_iri=project_iri, - host=host, - token=token, - ) - groups_str = [] - for g in groups: - groups_str.append(g.value if isinstance(g, BuiltinGroup) else g) - applicable_doaps = [d for d in all_doaps if d.target.group in groups_str] - assert len(applicable_doaps) == len(groups) - return applicable_doaps + heading = f"DOAP {state}:" + body = doap.model_dump_json(indent=2) + print(f"{heading}\n{'-' * len(heading)}\n{body}\n") + logger.info(f"{heading}\n{body}") - -def set_doaps_of_groups( - scope: PermissionScope, - groups: Sequence[str | BuiltinGroup], +def apply_updated_doaps_on_server( + doaps: list[Doap], host: str, - shortcode: str, token: str, ) -> None: """ - Applies the given scope to the DOAPs of the given groups. + Updates DOAPs on the server. Args: - scope: one of the standard scopes defined in the Scope class - groups: the group IRIs to whose DOAP the scope should be applied + doaps: the DOAPs to be sent to the server host: the DSP server where the project is located - shortcode: the shortcode of the project token: the access token """ - applicable_doaps = __get_doaps_of_groups( - groups=groups, - host=host, - shortcode=shortcode, - token=token, - ) - heading = f"Update {len(applicable_doaps)} DOAPs on {host}..." + logger.info(f"******* Updating {len(doaps)} DOAPs on {host} *******") + heading = f"{get_timestamp()}: Updating {len(doaps)} DOAPs on {host}..." print(f"\n{heading}\n{'=' * len(heading)}\n") - for d in applicable_doaps: - print("Old DOAP:\n=========") - print(d.model_dump_json(indent=2)) + for d in doaps: + __log_and_print_doap_update(doap=d, state="before") new_doap = __update_doap_scope( doap_iri=d.doap_iri, - scope=scope, + scope=d.scope, host=host, token=token, ) - print("\nNew DOAP:\n=========") - print(new_doap.model_dump_json(indent=2)) - print() - print("All DOAPs have been updated.") + __log_and_print_doap_update(doap=new_doap, state="after") + print(f"{get_timestamp()}: All DOAPs have been updated.") diff --git a/dsp_permissions_scripts/utils/get_logger.py b/dsp_permissions_scripts/utils/get_logger.py new file mode 100644 index 00000000..7af284e5 --- /dev/null +++ b/dsp_permissions_scripts/utils/get_logger.py @@ -0,0 +1,33 @@ +import logging +from datetime import datetime + + +def get_logger(name: str) -> logging.Logger: + """ + Create a logger instance, + set its level to INFO, + and configure it to write to a file in the user's home directory. + + Args: + name: name of the logger + filesize_mb: maximum size per log file in MB, defaults to 5 + backupcount: number of log files to keep, defaults to 4 + + Returns: + the logger instance + """ + _logger = logging.getLogger(name) + _logger.setLevel(logging.INFO) + formatter = logging.Formatter(fmt="{asctime} {filename: <25} {levelname: <8} {message}", style="{") + formatter.default_time_format = "%Y-%m-%d %H:%M:%S" + handler = logging.FileHandler( + filename="logging.log", + mode="a", + ) + handler.setFormatter(formatter) + _logger.addHandler(handler) + return _logger + + +def get_timestamp() -> str: + return datetime.now().strftime("%Y-%m-%d %H:%M:%S") diff --git a/dsp_permissions_scripts/utils/oap.py b/dsp_permissions_scripts/utils/oap.py index 2acd1667..606765cc 100644 --- a/dsp_permissions_scripts/utils/oap.py +++ b/dsp_permissions_scripts/utils/oap.py @@ -8,8 +8,10 @@ from dsp_permissions_scripts.models.scope import PermissionScope from dsp_permissions_scripts.models.value import ValueUpdate from dsp_permissions_scripts.utils.authentication import get_protocol +from dsp_permissions_scripts.utils.get_logger import get_logger, get_timestamp from dsp_permissions_scripts.utils.scope_serialization import create_string_from_scope +logger = get_logger(__name__) def apply_updated_oaps_on_server( resource_oaps: list[Oap], @@ -17,13 +19,20 @@ def apply_updated_oaps_on_server( token: str, ) -> None: """Applies object access permissions on a DSP server.""" - for resource_oap in resource_oaps: + logger.info("******* Applying updated object access permissions on server *******") + print(f"{get_timestamp()}: ******* Applying updated object access permissions on server *******") + for index, resource_oap in enumerate(resource_oaps): + msg = f"Updating permissions of resource {index + 1}/{len(resource_oaps)}: {resource_oap.object_iri}..." + logger.info("=====") + logger.info(msg) + print(f"{get_timestamp()}: {msg}") update_permissions_for_resources_and_values( resource_iris=[resource_oap.object_iri], scope=resource_oap.scope, host=host, token=token, ) + logger.info(f"Updated permissions of resource {resource_oap.object_iri} and its values.") def update_permissions_for_resources_and_values( @@ -48,7 +57,6 @@ def __update_permissions_for_resource_and_values( """ Updates the permissions for the given resource and its values. """ - print(f"Updating permissions for {resource_iri}...") resource = __get_resource(resource_iri, host, token) lmd = __get_lmd(resource) type_ = __get_type(resource) @@ -57,7 +65,6 @@ def __update_permissions_for_resource_and_values( update_permissions_for_resource(resource_iri, lmd, type_, context, scope, host, token) for v in values: __update_permissions_for_value(resource_iri, v, type_, context, scope, host, token) - print("Done. \n") def update_permissions_for_resource( @@ -85,7 +92,7 @@ def update_permissions_for_resource( headers = {"Authorization": f"Bearer {token}"} response = requests.put(url, headers=headers, json=payload, timeout=5) assert response.status_code == 200 - print(f"Updated permissions for {resource_iri}") + logger.info(f"Updated permissions of resource {resource_iri}") def __update_permissions_for_value( @@ -100,7 +107,6 @@ def __update_permissions_for_value( """ Updates the permissions for the given value. """ - print(value.value_iri) payload = { "@id": resource_iri, "@type": resource_type, @@ -116,22 +122,19 @@ def __update_permissions_for_value( headers = {"Authorization": f"Bearer {token}"} response = requests.put(url, headers=headers, json=payload, timeout=5) if response.status_code == 400 and response.text: - if ( - "dsp.errors.BadRequestException: " - "The submitted permissions are the same as the current ones" in response.text - ): - print(f"Permissions for {value.value_iri} are already up to date") - return - if response.status_code != 200: - print(response.status_code) - print(response.text) - print(resource_iri, value.value_iri) - print(json.dumps(payload, indent=4)) - print("!!!!!") - print() - return - # raise Exception(f"Error updating permissions for {value.value_iri}") - print(f"Updated permissions for {value.value_iri}") + already = "dsp.errors.BadRequestException: The submitted permissions are the same as the current ones" + if already in response.text: + msg = f"Permissions of resource {resource_iri}, value {value.value_iri} are already up to date" + logger.warning(msg) + elif response.status_code != 200: + logger.error( + f"Error while updating permissions of resource {resource_iri}, value {value.value_iri}. " + f"Response status code: {response.status_code}. " + f"Response text: {response.text}. " + f"Payload: {json.dumps(payload, indent=4)}" + ) + else: + logger.info(f"Updated permissions of resource {resource_iri}, value {value.value_iri}") def __get_value_iris(resource: dict[str, Any]) -> list[ValueUpdate]: diff --git a/dsp_permissions_scripts/utils/project.py b/dsp_permissions_scripts/utils/project.py index 070b8bf5..600b2234 100644 --- a/dsp_permissions_scripts/utils/project.py +++ b/dsp_permissions_scripts/utils/project.py @@ -4,8 +4,11 @@ from dsp_permissions_scripts.models.permission import Oap from dsp_permissions_scripts.utils.authentication import get_protocol +from dsp_permissions_scripts.utils.get_logger import get_logger, get_timestamp from dsp_permissions_scripts.utils.scope_serialization import create_scope_from_string +logger = get_logger(__name__) + def get_project_iri_by_shortcode(shortcode: str, host: str) -> str: """ @@ -24,6 +27,8 @@ def get_all_resource_oaps_of_project( host: str, token: str, ) -> list[Oap]: + logger.info(f"******* Getting all resource OAPs of project {shortcode} *******") + print(f"{get_timestamp()}: ******* Getting all resource OAPs of project {shortcode} *******") project_iri = get_project_iri_by_shortcode( shortcode=shortcode, host=host, @@ -42,6 +47,8 @@ def get_all_resource_oaps_of_project( token=token, ) all_resource_oaps.extend(resource_oaps) + logger.info(f"Retrieved a TOTAL of {len(all_resource_oaps)} resource OAPs of project {shortcode}.") + print(f"{get_timestamp()}: Retrieved a TOTAL of {len(all_resource_oaps)} resource OAPs of project {shortcode}.") return all_resource_oaps @@ -50,6 +57,7 @@ def __get_all_resource_class_iris_of_project( host: str, token: str, ) -> list[str]: + logger.info(f"Getting all resource class IRIs of project {project_iri}...") project_onto_iris = __get_onto_iris_of_project( project_iri=project_iri, host=host, @@ -63,6 +71,7 @@ def __get_all_resource_class_iris_of_project( token=token, ) all_class_iris.extend(class_iris) + logger.info(f"Found {len(class_iris)} resource classes in onto {onto_iri}.") return all_class_iris @@ -109,12 +118,15 @@ def __get_all_resource_oaps_of_resclass( project_iri: str, token: str, ) -> list[Oap]: + print(f"{get_timestamp()}: Getting all resource OAPs of class {resclass_iri}...") + logger.info(f"Getting all resource OAPs of class {resclass_iri}...") protocol = get_protocol(host) headers = {"X-Knora-Accept-Project": project_iri, "Authorization": f"Bearer {token}"} resources: list[Oap] = [] page = 0 more = True while more: + logger.info(f"Getting page {page}...") more, iris = __get_next_page( protocol=protocol, host=host, @@ -124,6 +136,8 @@ def __get_all_resource_oaps_of_resclass( ) resources.extend(iris) page += 1 + print(f"{get_timestamp()}: Retrieved {len(resources)} resource OAPs of class {resclass_iri}.") + logger.info(f"Retrieved {len(resources)} resource OAPs of class {resclass_iri}.") return resources diff --git a/pyproject.toml b/pyproject.toml index 501dccd0..993a6690 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ disable = [ "invalid-name", "trailing-whitespace", "too-few-public-methods", + "logging-fstring-interpolation", ] [tool.black]