diff --git a/dsp_permissions_scripts/models/errors.py b/dsp_permissions_scripts/models/errors.py index 7cb3baf8..67617057 100644 --- a/dsp_permissions_scripts/models/errors.py +++ b/dsp_permissions_scripts/models/errors.py @@ -17,3 +17,18 @@ def __str__(self) -> str: @dataclass class PermissionsAlreadyUpToDate(Exception): message: str = "The submitted permissions are the same as the current ones" + + +@dataclass +class SpecifiedPropsEmptyError(ValueError): + message: str = "specified_props must not be empty if retrieve_values is 'specified_props'" + + +@dataclass +class SpecifiedPropsNotEmptyError(ValueError): + message: str = "specified_props must be empty if retrieve_values is not 'specified_props'" + + +@dataclass +class OapRetrieveConfigEmptyError(ValueError): + message: str = "retrieve_resources cannot be False if retrieve_values is 'none'" diff --git a/dsp_permissions_scripts/models/value.py b/dsp_permissions_scripts/models/value.py deleted file mode 100644 index a17cac8e..00000000 --- a/dsp_permissions_scripts/models/value.py +++ /dev/null @@ -1,14 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class ValueUpdate: - """ - DTO for updating a value. - Contains the property whith which the value relates to its resource, - the value IRI and the value type. - """ - - property: str - value_iri: str - value_type: str diff --git a/dsp_permissions_scripts/oap/oap_get.py b/dsp_permissions_scripts/oap/oap_get.py index 1edc0348..a6599658 100644 --- a/dsp_permissions_scripts/oap/oap_get.py +++ b/dsp_permissions_scripts/oap/oap_get.py @@ -4,6 +4,9 @@ from dsp_permissions_scripts.models.errors import ApiError from dsp_permissions_scripts.oap.oap_model import Oap +from dsp_permissions_scripts.oap.oap_model import OapRetrieveConfig +from dsp_permissions_scripts.oap.oap_model import ResourceOap +from dsp_permissions_scripts.oap.oap_model import ValueOap from dsp_permissions_scripts.utils.dsp_client import DspClient from dsp_permissions_scripts.utils.get_logger import get_logger from dsp_permissions_scripts.utils.project import get_all_resource_class_iris_of_project @@ -13,8 +16,9 @@ logger = get_logger(__name__) -def _get_all_oaps_of_resclass(resclass_iri: str, project_iri: str, dsp_client: DspClient) -> list[Oap]: - logger.info(f"Getting all resource OAPs of class {resclass_iri}...") +def _get_all_oaps_of_resclass( + resclass_iri: str, project_iri: str, dsp_client: DspClient, oap_config: OapRetrieveConfig +) -> list[Oap]: headers = {"X-Knora-Accept-Project": project_iri} all_oaps: list[Oap] = [] page = 0 @@ -27,6 +31,7 @@ def _get_all_oaps_of_resclass(resclass_iri: str, project_iri: str, dsp_client: D page=page, headers=headers, dsp_client=dsp_client, + oap_config=oap_config, ) all_oaps.extend(oaps) page += 1 @@ -42,6 +47,7 @@ def _get_next_page( page: int, headers: dict[str, str], dsp_client: DspClient, + oap_config: OapRetrieveConfig, ) -> tuple[bool, list[Oap]]: """ Get the resource IRIs of a resource class, one page at a time. @@ -63,19 +69,62 @@ def _get_next_page( if "@graph" in result: oaps = [] for r in result["@graph"]: - scope = create_scope_from_string(r["knora-api:hasPermissions"]) - oaps.append(Oap(scope=scope, object_iri=r["@id"])) + if oap := _get_oap_of_one_resource(r, oap_config): + oaps.append(oap) return True, oaps # result contains only 1 resource: return it, then stop (there will be no more resources) if "@id" in result: - scope = create_scope_from_string(result["knora-api:hasPermissions"]) - return False, [Oap(scope=scope, object_iri=result["@id"])] + oaps = [] + if oap := _get_oap_of_one_resource(result, oap_config): + oaps.append(oap) + return False, oaps # there are no more resources return False, [] +def _get_oap_of_one_resource(r: dict[str, Any], oap_config: OapRetrieveConfig) -> Oap | None: + if oap_config.retrieve_resources: + scope = create_scope_from_string(r["knora-api:hasPermissions"]) + resource_oap = ResourceOap(scope=scope, resource_iri=r["@id"]) + else: + resource_oap = None + + if oap_config.retrieve_values == "none": + value_oaps = [] + elif oap_config.retrieve_values == "all": + value_oaps = _get_value_oaps(r) + else: + value_oaps = _get_value_oaps(r, oap_config.specified_props) + + if resource_oap or value_oaps: + return Oap(resource_oap=resource_oap, value_oaps=value_oaps) + else: + return None + + +def _get_value_oaps(resource: dict[str, Any], restrict_to_props: list[str] | None = None) -> list[ValueOap]: + res = [] + for k, v in resource.items(): + if k in {"@id", "@type", "@context", "rdfs:label", "knora-api:DeletedValue"}: + continue + if restrict_to_props is not None and k not in restrict_to_props: + continue + match v: + case { + "@id": id_, + "@type": type_, + "knora-api:hasPermissions": perm_str, + } if "/values/" in id_: + scope = create_scope_from_string(perm_str) + oap = ValueOap(scope=scope, property=k, value_type=type_, value_iri=id_, resource_iri=resource["@id"]) + res.append(oap) + case _: + continue + return res + + def get_resource(resource_iri: str, dsp_client: DspClient) -> dict[str, Any]: """Requests the resource with the given IRI from DSP-API""" iri = quote_plus(resource_iri, safe="") @@ -86,15 +135,16 @@ def get_resource(resource_iri: str, dsp_client: DspClient) -> dict[str, Any]: raise err from None -def get_resource_oap_by_iri(resource_iri: str, dsp_client: DspClient) -> Oap: +def get_resource_oap_by_iri(resource_iri: str, dsp_client: DspClient) -> ResourceOap: resource = get_resource(resource_iri, dsp_client) scope = create_scope_from_string(resource["knora-api:hasPermissions"]) - return Oap(scope=scope, object_iri=resource_iri) + return ResourceOap(scope=scope, resource_iri=resource_iri) -def get_all_resource_oaps_of_project( +def get_all_oaps_of_project( shortcode: str, dsp_client: DspClient, + oap_config: OapRetrieveConfig, excluded_class_iris: Iterable[str] = (), ) -> list[Oap]: logger.info("******* Retrieving all OAPs... *******") @@ -103,7 +153,7 @@ def get_all_resource_oaps_of_project( resclass_iris = [x for x in resclass_iris if x not in excluded_class_iris] all_oaps = [] for resclass_iri in resclass_iris: - oaps = _get_all_oaps_of_resclass(resclass_iri, project_iri, dsp_client) + oaps = _get_all_oaps_of_resclass(resclass_iri, project_iri, dsp_client, oap_config) all_oaps.extend(oaps) logger.info(f"Retrieved a TOTAL of {len(all_oaps)} OAPs") return all_oaps diff --git a/dsp_permissions_scripts/oap/oap_model.py b/dsp_permissions_scripts/oap/oap_model.py index cc6200a4..1fb9bad2 100644 --- a/dsp_permissions_scripts/oap/oap_model.py +++ b/dsp_permissions_scripts/oap/oap_model.py @@ -1,10 +1,76 @@ +from __future__ import annotations + +from typing import Literal + from pydantic import BaseModel +from pydantic import ConfigDict +from pydantic import model_validator +from dsp_permissions_scripts.models.errors import OapRetrieveConfigEmptyError +from dsp_permissions_scripts.models.errors import SpecifiedPropsEmptyError +from dsp_permissions_scripts.models.errors import SpecifiedPropsNotEmptyError from dsp_permissions_scripts.models.scope import PermissionScope class Oap(BaseModel): - """Model representing an object access permission, containing a scope and the IRI of the resource/value""" + """ + Model representing an object access permission of a resource and its values. + If only the resource is of interest, value_oaps will be an empty list. + If only the values (or a part of them) are of interest, resource_oap will be None. + """ + + resource_oap: ResourceOap | None + value_oaps: list[ValueOap] + + @model_validator(mode="after") + def check_consistency(self) -> Oap: + if not self.resource_oap and not self.value_oaps: + raise ValueError("An OAP must have at least one resource_oap or one value_oap") + return self + + +class ResourceOap(BaseModel): + """Model representing an object access permission of a resource""" + + scope: PermissionScope + resource_iri: str + + +class ValueOap(BaseModel): + """ + Model representing an object access permission of a value. + + Fields: + scope: permissions of this value + value_iri: IRI of the value + property: property whith which the value relates to its resource + value_type: type of the value, e.g. "knora-api:TextValue" + """ scope: PermissionScope - object_iri: str + property: str + value_type: str + value_iri: str + resource_iri: str + + +class OapRetrieveConfig(BaseModel): + model_config = ConfigDict(frozen=True) + + retrieve_resources: bool = True + retrieve_values: Literal["all", "specified_props", "none"] = "none" + specified_props: list[str] = [] + + @model_validator(mode="after") + def check_specified_props(self) -> OapRetrieveConfig: + if self.retrieve_values == "specified_props" and not self.specified_props: + raise SpecifiedPropsEmptyError() + if self.retrieve_values != "specified_props" and self.specified_props: + raise SpecifiedPropsNotEmptyError() + return self + + @model_validator(mode="after") + def check_config_empty(self) -> OapRetrieveConfig: + if not self.retrieve_resources and self.retrieve_values == "none": + raise OapRetrieveConfigEmptyError() + return self diff --git a/dsp_permissions_scripts/oap/oap_serialize.py b/dsp_permissions_scripts/oap/oap_serialize.py index 5c28ba34..8113094f 100644 --- a/dsp_permissions_scripts/oap/oap_serialize.py +++ b/dsp_permissions_scripts/oap/oap_serialize.py @@ -1,8 +1,11 @@ +import itertools import re from pathlib import Path from typing import Literal from dsp_permissions_scripts.oap.oap_model import Oap +from dsp_permissions_scripts.oap.oap_model import ResourceOap +from dsp_permissions_scripts.oap.oap_model import ValueOap from dsp_permissions_scripts.utils.get_logger import get_logger logger = get_logger(__name__) @@ -24,21 +27,62 @@ def serialize_oaps( folder = _get_project_data_path(shortcode, mode) folder.mkdir(parents=True, exist_ok=True) logger.info(f"Writing {len(oaps)} OAPs into {folder}") - for res_oap in oaps: - filename = re.sub(r"http://rdfh\.ch/[^/]+/", "resource_", res_oap.object_iri) - with open(folder / f"{filename}.json", mode="w", encoding="utf-8") as f: - f.write(res_oap.model_dump_json(indent=2)) - logger.info(f"Successfully wrote {len(oaps)} OAPs into {folder}") + counter = 0 + for oap in oaps: + if oap.resource_oap: + _serialize_oap(oap.resource_oap, folder) + counter += 1 + for value_oap in oap.value_oaps: + _serialize_oap(value_oap, folder) + counter += 1 + logger.info(f"Successfully wrote {len(oaps)} OAPs into {counter} files in folder {folder}") + + +def _serialize_oap(oap: ResourceOap | ValueOap, folder: Path) -> None: + iri = oap.resource_iri if isinstance(oap, ResourceOap) else oap.value_iri + filename = re.sub(r"http://rdfh\.ch/[^/]+/", "resource_", iri) + filename = re.sub(r"/", "_", filename) + Path(folder / f"{filename}.json").write_text(oap.model_dump_json(indent=2), encoding="utf-8") def deserialize_oaps( shortcode: str, mode: Literal["original", "modified"], ) -> list[Oap]: - """Deserialize the resource OAPs from JSON files.""" + """Deserialize the OAPs from JSON files.""" + res_oaps, val_oaps = _read_all_oaps_from_files(shortcode, mode) + oaps = _group_oaps_together(res_oaps, val_oaps) + return oaps + + +def _read_all_oaps_from_files( + shortcode: str, mode: Literal["original", "modified"] +) -> tuple[list[ResourceOap], list[ValueOap]]: folder = _get_project_data_path(shortcode, mode) - oaps = [] - for file in [f for f in folder.iterdir() if f.suffix == ".json"]: - with open(file, mode="r", encoding="utf-8") as f: - oaps.append(Oap.model_validate_json(f.read())) + res_oaps: list[ResourceOap] = [] + val_oaps: list[ValueOap] = [] + for file in folder.glob("**/*.json"): + content = file.read_text(encoding="utf-8") + if "_values_" in file.name: + val_oaps.append(ValueOap.model_validate_json(content)) + else: + res_oaps.append(ResourceOap.model_validate_json(content)) + return res_oaps, val_oaps + + +def _group_oaps_together(res_oaps: list[ResourceOap], val_oaps: list[ValueOap]) -> list[Oap]: + oaps: list[Oap] = [] + deserialized_resource_iris = [] + + for res_iri, _val_oaps in itertools.groupby(val_oaps, key=lambda x: x.resource_iri): + res_oaps_filtered = [x for x in res_oaps if x.resource_iri == res_iri] + res_oap = res_oaps_filtered[0] if res_oaps_filtered else None + oaps.append(Oap(resource_oap=res_oap, value_oaps=sorted(_val_oaps, key=lambda x: x.value_iri))) + deserialized_resource_iris.append(res_iri) + + remaining_res_oaps = [oap for oap in res_oaps if oap.resource_iri not in deserialized_resource_iris] + for res_oap in remaining_res_oaps: + oaps.append(Oap(resource_oap=res_oap, value_oaps=[])) + + oaps.sort(key=lambda oap: oap.resource_oap.resource_iri if oap.resource_oap else "") return oaps diff --git a/dsp_permissions_scripts/oap/oap_set.py b/dsp_permissions_scripts/oap/oap_set.py index f4259596..e8518896 100644 --- a/dsp_permissions_scripts/oap/oap_set.py +++ b/dsp_permissions_scripts/oap/oap_set.py @@ -1,16 +1,17 @@ # pylint: disable=too-many-arguments +import itertools +import re from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from datetime import datetime -from typing import Any from dsp_permissions_scripts.models.errors import ApiError from dsp_permissions_scripts.models.errors import PermissionsAlreadyUpToDate from dsp_permissions_scripts.models.scope import PermissionScope -from dsp_permissions_scripts.models.value import ValueUpdate from dsp_permissions_scripts.oap.oap_get import get_resource from dsp_permissions_scripts.oap.oap_model import Oap +from dsp_permissions_scripts.oap.oap_model import ValueOap from dsp_permissions_scripts.utils.dsp_client import DspClient from dsp_permissions_scripts.utils.get_logger import get_logger from dsp_permissions_scripts.utils.scope_serialization import create_string_from_scope @@ -18,30 +19,11 @@ logger = get_logger(__name__) -def _get_values_to_update(resource: dict[str, Any]) -> list[ValueUpdate]: - """Returns a list of values that have permissions and hence should be updated.""" - res: list[ValueUpdate] = [] - for k, v in resource.items(): - if k in {"@id", "@type", "@context", "rdfs:label", "knora-api:DeletedValue"}: - continue - match v: - case { - "@id": id_, - "@type": type_, - **properties, - } if "/values/" in id_ and "knora-api:hasPermissions" in properties: - res.append(ValueUpdate(k, id_, type_)) - case _: - continue - return res - - -def _update_permissions_for_value( # noqa: PLR0913 +def _update_permissions_for_value( resource_iri: str, - value: ValueUpdate, + value: ValueOap, resource_type: str, context: dict[str, str], - scope: PermissionScope, dsp_client: DspClient, ) -> None: """Updates the permissions for the given value (of a property) on a DSP server""" @@ -51,7 +33,7 @@ def _update_permissions_for_value( # noqa: PLR0913 value.property: { "@id": value.value_iri, "@type": value.value_type, - "knora-api:hasPermissions": create_string_from_scope(scope), + "knora-api:hasPermissions": create_string_from_scope(value.scope), }, "@context": context, } @@ -92,48 +74,46 @@ def _update_permissions_for_resource( # noqa: PLR0913 raise err from None -def _update_permissions_for_resource_and_values( - resource_iri: str, - scope: PermissionScope, - dsp_client: DspClient, -) -> tuple[str, bool]: - """Updates the permissions for the given resource and its values on a DSP server""" - try: - resource = get_resource(resource_iri, dsp_client) - except Exception as exc: # noqa: BLE001 (blind exception) - logger.error(f"Cannot update resource {resource_iri}: {exc}") - return resource_iri, False - values = _get_values_to_update(resource) - - success = True - try: - _update_permissions_for_resource( - resource_iri=resource_iri, - lmd=resource.get("knora-api:lastModificationDate"), - resource_type=resource["@type"], - context=resource["@context"], - scope=scope, - dsp_client=dsp_client, +def _update_batch(batch: tuple[Oap, ...], dsp_client: DspClient) -> list[str]: + failed_iris = [] + for oap in batch: + resource_iri = ( + oap.resource_oap.resource_iri + if oap.resource_oap + else re.sub(r"/values/.+$", "", oap.value_oaps[0].value_iri) ) - except ApiError as err: - logger.error(err) - success = False - - for v in values: try: - _update_permissions_for_value( - resource_iri=resource_iri, - value=v, - resource_type=resource["@type"], - context=resource["@context"], - scope=scope, - dsp_client=dsp_client, - ) - except ApiError as err: - logger.error(err) - success = False - - return resource_iri, success + resource = get_resource(resource_iri, dsp_client) + except Exception as exc: # noqa: BLE001 + logger.error(f"Cannot update resource {resource_iri}: {exc}") + failed_iris.append(resource_iri) + continue + if oap.resource_oap: + try: + _update_permissions_for_resource( + resource_iri=resource_iri, + lmd=resource.get("knora-api:lastModificationDate"), + resource_type=resource["@type"], + context=resource["@context"], + scope=oap.resource_oap.scope, + dsp_client=dsp_client, + ) + except ApiError as err: + logger.error(err) + failed_iris.append(resource_iri) + for v in oap.value_oaps: + try: + _update_permissions_for_value( + resource_iri=resource_iri, + value=v, + resource_type=resource["@type"], + context=resource["@context"], + dsp_client=dsp_client, + ) + except ApiError as err: + logger.error(err) + failed_iris.append(v.value_iri) + return failed_iris def _write_failed_iris_to_file( @@ -147,28 +127,13 @@ def _write_failed_iris_to_file( f.write("\n".join(failed_iris)) -def _launch_thread_pool(resource_oaps: list[Oap], nthreads: int, dsp_client: DspClient) -> list[str]: - counter = 0 - total = len(resource_oaps) +def _launch_thread_pool(oaps: list[Oap], nthreads: int, dsp_client: DspClient) -> list[str]: all_failed_iris: list[str] = [] with ThreadPoolExecutor(max_workers=nthreads) as pool: - jobs = [ - pool.submit( - _update_permissions_for_resource_and_values, - resource_oap.object_iri, - resource_oap.scope, - dsp_client, - ) - for resource_oap in resource_oaps - ] + jobs = [pool.submit(_update_batch, batch, dsp_client) for batch in itertools.batched(oaps, 100)] for result in as_completed(jobs): - resource_iri, success = result.result() - counter += 1 - if not success: - all_failed_iris.append(resource_iri) - logger.info(f"Failed updating resource {counter}/{total} ({resource_iri}) and its values.") - else: - logger.info(f"Updated resource {counter}/{total} ({resource_iri}) and its values.") + failed_iris = result.result() + all_failed_iris.extend(failed_iris) return all_failed_iris @@ -191,16 +156,13 @@ def apply_updated_oaps_on_server( failed_iris = _launch_thread_pool(oaps, nthreads, dsp_client) if failed_iris: timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - filename = f"FAILED_RESOURCES_{timestamp}.txt" + filename = f"FAILED_RESOURCES_AND_VALUES_{timestamp}.txt" _write_failed_iris_to_file( - failed_iris=failed_iris, + failed_iris=sorted(failed_iris), shortcode=shortcode, host=host, filename=filename, ) - msg = ( - f"ERROR: {len(failed_iris)} resources could not (or only partially) be updated. " - f"They were written to {filename}." - ) + msg = f"ERROR: {len(failed_iris)} resources or values could not be updated. They were written to {filename}." logger.error(msg) logger.info(f"Updated OAPs of {len(oaps)} resources on {host}") diff --git a/dsp_permissions_scripts/template.py b/dsp_permissions_scripts/template.py index cfd37918..8f644233 100644 --- a/dsp_permissions_scripts/template.py +++ b/dsp_permissions_scripts/template.py @@ -11,8 +11,9 @@ from dsp_permissions_scripts.models import group from dsp_permissions_scripts.models.host import Hosts from dsp_permissions_scripts.models.scope import PUBLIC -from dsp_permissions_scripts.oap.oap_get import get_all_resource_oaps_of_project +from dsp_permissions_scripts.oap.oap_get import get_all_oaps_of_project from dsp_permissions_scripts.oap.oap_model import Oap +from dsp_permissions_scripts.oap.oap_model import OapRetrieveConfig from dsp_permissions_scripts.oap.oap_serialize import serialize_oaps from dsp_permissions_scripts.oap.oap_set import apply_updated_oaps_on_server from dsp_permissions_scripts.utils.authentication import login @@ -47,9 +48,13 @@ def modify_oaps(oaps: list[Oap]) -> list[Oap]: """Adapt this sample to your needs.""" modified_oaps = [] for oap in oaps: - if group.SYSTEM_ADMIN not in oap.scope.CR: - oap.scope = oap.scope.add("CR", group.SYSTEM_ADMIN) - modified_oaps.append(oap) + if oap.resource_oap: + if group.SYSTEM_ADMIN not in oap.resource_oap.scope.CR: + oap.resource_oap.scope = oap.resource_oap.scope.add("CR", group.SYSTEM_ADMIN) + for value_oap in oap.value_oaps: + if group.SYSTEM_ADMIN not in value_oap.scope.CR: + value_oap.scope = value_oap.scope.add("CR", group.SYSTEM_ADMIN) + modified_oaps.append(oap) return modified_oaps @@ -101,18 +106,21 @@ def update_doaps(host: str, shortcode: str, dsp_client: DspClient) -> None: def update_oaps(host: str, shortcode: str, dsp_client: DspClient) -> None: """Sample function to modify the Object Access Permissions of a project.""" - resource_oaps = get_all_resource_oaps_of_project(shortcode, dsp_client) - serialize_oaps(resource_oaps, shortcode, mode="original") - resource_oaps_modified = modify_oaps(oaps=resource_oaps) + oap_config = OapRetrieveConfig( + retrieve_resources=True, retrieve_values="specified_props", specified_props=["knora-api:hasStillImageFileValue"] + ) + oaps = get_all_oaps_of_project(shortcode, dsp_client, oap_config) + serialize_oaps(oaps, shortcode, mode="original") + oaps_modified = modify_oaps(oaps) apply_updated_oaps_on_server( - oaps=resource_oaps_modified, + oaps=oaps_modified, host=host, shortcode=shortcode, dsp_client=dsp_client, nthreads=4, ) - resource_oaps_updated = get_all_resource_oaps_of_project(shortcode, dsp_client) - serialize_oaps(resource_oaps_updated, shortcode, mode="modified") + oaps_updated = get_all_oaps_of_project(shortcode, dsp_client, oap_config) + serialize_oaps(oaps_updated, shortcode, mode="modified") def main() -> None: diff --git a/tests/test_oap_model.py b/tests/test_oap_model.py new file mode 100644 index 00000000..633a717e --- /dev/null +++ b/tests/test_oap_model.py @@ -0,0 +1,98 @@ +import pytest + +from dsp_permissions_scripts.models import group +from dsp_permissions_scripts.models.scope import PermissionScope +from dsp_permissions_scripts.oap.oap_model import Oap +from dsp_permissions_scripts.oap.oap_model import OapRetrieveConfig +from dsp_permissions_scripts.oap.oap_model import ResourceOap +from dsp_permissions_scripts.oap.oap_model import ValueOap + +# ruff: noqa: PT011 (exception too broad) + + +class TestOap: + def test_oap_one_val(self) -> None: + res_iri = "http://rdfh.ch/0803/foo" + scope = PermissionScope.create(D=[group.UNKNOWN_USER]) + res_oap = ResourceOap(scope=scope, resource_iri=res_iri) + val_oaps = [ + ValueOap( + scope=scope, + property="foo:prop", + value_type="foo:valtype", + value_iri=f"{res_iri}/values/bar", + resource_iri=res_iri, + ) + ] + oap = Oap(resource_oap=res_oap, value_oaps=val_oaps) + assert oap.resource_oap == res_oap + assert oap.value_oaps == val_oaps + + def test_oap_multiple_vals(self) -> None: + res_iri = "http://rdfh.ch/0803/foo" + res_scope = PermissionScope.create(D=[group.UNKNOWN_USER]) + val_scope = PermissionScope.create(M=[group.KNOWN_USER]) + res_oap = ResourceOap(scope=res_scope, resource_iri=res_iri) + val_oap_1 = ValueOap( + scope=val_scope, + property="foo:prop", + value_type="foo:valtype", + value_iri=f"{res_iri}/values/bar", + resource_iri=res_iri, + ) + val_oap_2 = val_oap_1.model_copy(update={"value_iri": f"{res_iri}/values/baz"}) + oap = Oap(resource_oap=res_oap, value_oaps=[val_oap_1, val_oap_2]) + assert oap.resource_oap == res_oap + assert oap.value_oaps == [val_oap_1, val_oap_2] + + def test_oap_no_res(self) -> None: + res_iri = "http://rdfh.ch/0803/foo" + scope = PermissionScope.create(D=[group.UNKNOWN_USER]) + val_oaps = [ + ValueOap( + scope=scope, + property="foo:prop", + value_type="foo:valtype", + value_iri=f"{res_iri}/values/bar", + resource_iri=res_iri, + ) + ] + oap = Oap(resource_oap=None, value_oaps=val_oaps) + assert oap.resource_oap is None + assert oap.value_oaps == val_oaps + + def test_oap_no_res_no_vals(self) -> None: + with pytest.raises(ValueError): + Oap(resource_oap=None, value_oaps=[]) + + +class TestOapRetrieveConfig: + def test_with_resource(self) -> None: + conf = OapRetrieveConfig(retrieve_resources=True, retrieve_values="none") + assert conf.retrieve_resources is True + assert conf.retrieve_values == "none" + assert conf.specified_props == [] + + def test_without_resource(self) -> None: + conf = OapRetrieveConfig(retrieve_resources=False, retrieve_values="all") + assert conf.retrieve_resources is False + assert conf.retrieve_values == "all" + assert conf.specified_props == [] + + def test_empty(self) -> None: + with pytest.raises(ValueError): + OapRetrieveConfig(retrieve_resources=False, retrieve_values="none") + with pytest.raises(ValueError): + OapRetrieveConfig(retrieve_resources=False, retrieve_values="none", specified_props=[]) + + def test_all_values_but_specified(self) -> None: + with pytest.raises(ValueError): + OapRetrieveConfig(retrieve_resources=False, retrieve_values="all", specified_props=["foo"]) + + def test_no_values_but_specified(self) -> None: + with pytest.raises(ValueError): + OapRetrieveConfig(retrieve_resources=False, retrieve_values="none", specified_props=["foo"]) + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/test_oap_serialization.py b/tests/test_oap_serialization.py index ce0a20c3..367205d0 100644 --- a/tests/test_oap_serialization.py +++ b/tests/test_oap_serialization.py @@ -7,6 +7,8 @@ from dsp_permissions_scripts.models import group from dsp_permissions_scripts.models.scope import PermissionScope from dsp_permissions_scripts.oap.oap_model import Oap +from dsp_permissions_scripts.oap.oap_model import ResourceOap +from dsp_permissions_scripts.oap.oap_model import ValueOap from dsp_permissions_scripts.oap.oap_serialize import deserialize_oaps from dsp_permissions_scripts.oap.oap_serialize import serialize_oaps from tests.test_scope_serialization import compare_scopes @@ -23,36 +25,70 @@ def _setup_teardown(self) -> Iterator[None]: shutil.rmtree(testdata_dir) def test_oap_serialization(self) -> None: - oap1 = Oap( - scope=PermissionScope.create( - CR=[group.PROJECT_ADMIN], - V=[group.PROJECT_MEMBER], - ), - object_iri=f"http://rdfh.ch/{self.shortcode}/resource-1", - ) - oap2 = Oap( - scope=PermissionScope.create( - D=[group.SYSTEM_ADMIN], - M=[group.KNOWN_USER], - ), - object_iri=f"http://rdfh.ch/{self.shortcode}/resource-2", + oap1 = self._get_oap_one_value_only() + oap2 = self._get_oap_full() + oap3 = self._get_oap_res_only() + + serialize_oaps([oap1, oap2, oap3], self.shortcode, "original") + deserialized_oaps = deserialize_oaps(self.shortcode, "original") + self._compare_oaps(deserialized_oaps[0], oap1) + self._compare_oaps(deserialized_oaps[1], oap2) + self._compare_oaps(deserialized_oaps[2], oap3) + + def _get_oap_full(self) -> Oap: + scope = PermissionScope.create(CR=[group.PROJECT_ADMIN], V=[group.PROJECT_MEMBER]) + res_iri = f"http://rdfh.ch/{self.shortcode}/resource-1" + res_oap = ResourceOap(scope=scope, resource_iri=res_iri) + val1_oap = ValueOap( + scope=scope, + property="foo:prop1", + value_type="bar:val1", + value_iri=f"{res_iri}/values/foobar1", + resource_iri=res_iri, ) - serialize_oaps( - oaps=[oap1, oap2], - shortcode=self.shortcode, - mode="original", + val2_oap = ValueOap( + scope=scope, + property="foo:prop2", + value_type="bar:val2", + value_iri=f"{res_iri}/values/foobar2", + resource_iri=res_iri, ) - deserialized_oaps = deserialize_oaps( - shortcode=self.shortcode, - mode="original", + oap = Oap(resource_oap=res_oap, value_oaps=[val1_oap, val2_oap]) + return oap + + def _get_oap_one_value_only(self) -> Oap: + scope = PermissionScope.create(D=[group.SYSTEM_ADMIN], M=[group.KNOWN_USER]) + res_iri = f"http://rdfh.ch/{self.shortcode}/resource-2" + val_oap = ValueOap( + scope=scope, + property="foo:prop3", + value_type="bar:val3", + value_iri=f"{res_iri}/values/foobar3", + resource_iri=res_iri, ) - deserialized_oaps.sort(key=lambda oap: oap.object_iri) - self._compare_oaps(deserialized_oaps[0], oap1) - self._compare_oaps(deserialized_oaps[1], oap2) + return Oap(resource_oap=None, value_oaps=[val_oap]) + + def _get_oap_res_only(self) -> Oap: + scope = PermissionScope.create(V=[group.KNOWN_USER], RV=[group.UNKNOWN_USER]) + res_iri = f"http://rdfh.ch/{self.shortcode}/resource-3" + res_oap = ResourceOap(scope=scope, resource_iri=res_iri) + return Oap(resource_oap=res_oap, value_oaps=[]) def _compare_oaps(self, oap1: Oap, oap2: Oap) -> None: - compare_scopes(oap1.scope, oap2.scope) - assert oap1.object_iri == oap2.object_iri + if oap1.resource_oap is None: + assert oap2.resource_oap is None + elif oap2.resource_oap is None: + assert oap1.resource_oap is None + else: + assert oap1.resource_oap.resource_iri == oap2.resource_oap.resource_iri + compare_scopes(oap1.resource_oap.scope, oap2.resource_oap.scope) + + assert len(oap1.value_oaps) == len(oap2.value_oaps) + for val_oap1, val_oap2 in zip(oap1.value_oaps, oap2.value_oaps): + assert val_oap1.value_iri == val_oap2.value_iri + assert val_oap1.property == val_oap2.property + assert val_oap1.value_type == val_oap2.value_type + compare_scopes(val_oap1.scope, val_oap2.scope) if __name__ == "__main__":