Skip to content

Commit

Permalink
new functions approach
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum committed Sep 24, 2024
1 parent 85e8263 commit 548dd52
Showing 1 changed file with 79 additions and 72 deletions.
151 changes: 79 additions & 72 deletions dsp_permissions_scripts/oap/update_iris.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
from __future__ import annotations

import re
from abc import ABC
from abc import abstractmethod
from dataclasses import dataclass
from dataclasses import field
from pathlib import Path
from typing import Any
from urllib.parse import quote_plus
Expand All @@ -23,97 +20,107 @@


@dataclass
class IRIUpdater(ABC):
class ResourceIri:
iri: str
dsp_client: DspClient
err_msg: str | None = field(init=False, default=None)

@abstractmethod
def update_iri(self, new_scope: PermissionScope) -> None:
pass

@staticmethod
def from_string(string: str, dsp_client: DspClient) -> ResourceIRIUpdater | ValueIRIUpdater:
if re.search(r"^http://rdfh\.ch/[^/]{4}/[^/]{22}/values/[^/]{22}$", string):
return ValueIRIUpdater(string, dsp_client)
elif re.search(r"^http://rdfh\.ch/[^/]{4}/[^/]{22}$", string):
return ResourceIRIUpdater(string, dsp_client)
else:
raise InvalidIRIError(f"Could not parse IRI {string}")

def _get_res_dict(self, res_iri: str) -> dict[str, Any]:
return self.dsp_client.get(f"/v2/resources/{quote_plus(res_iri, safe='')}")


@dataclass
class ResourceIRIUpdater(IRIUpdater):
def update_iri(self, new_scope: PermissionScope) -> None:
res_dict = self._get_res_dict(self.iri)
try:
update_permissions_for_resource(
resource_iri=self.iri,
lmd=res_dict["knora-api:lastModificationDate"],
resource_type=res_dict["@type"],
context=res_dict["@context"] | {"knora-admin": KNORA_ADMIN_ONTO_NAMESPACE},
scope=new_scope,
dsp_client=self.dsp_client,
)
except ApiError as err:
self.err_msg = err.message
logger.error(self.err_msg)
class ValueIri:
iri: str


@dataclass
class ValueIRIUpdater(IRIUpdater):
def update_iri(self, new_scope: PermissionScope) -> None:
res_iri = re.sub(r"/values/[^/]{22}$", "", self.iri)
res_dict = self._get_res_dict(res_iri)
val_oap = next((v for v in get_value_oaps(res_dict) if v.value_iri == self.iri), None)
if not val_oap:
self.err_msg = f"Could not find value {self.iri} in resource {res_dict['@id']}"
logger.error(self.err_msg)
return
val_oap.scope = new_scope
try:
update_permissions_for_value(
value=val_oap,
resource_type=res_dict["@type"],
context=res_dict["@context"] | {"knora-admin": KNORA_ADMIN_ONTO_NAMESPACE},
dsp_client=self.dsp_client,
)
except ApiError as err:
self.err_msg = err.message
logger.error(self.err_msg)
class IriUpdateProblem:
iri: ResourceIri | ValueIri
err_msg: str


def update_iris(
iri_file: Path,
new_scope: PermissionScope,
dsp_client: DspClient,
) -> None:
iri_updaters = _initialize_iri_updaters(iri_file, dsp_client)
for iri in iri_updaters:
iri.update_iri(new_scope)
_tidy_up(iri_updaters, iri_file)
iris = _get_iris_from_file(iri_file, dsp_client)
problems: list[IriUpdateProblem] = []
for iri in iris:
if isinstance(iri, ResourceIri) and (prob := _update_res_iri(iri, new_scope, dsp_client)):
problems.append(prob)
elif isinstance(iri, ValueIri) and (prob := _update_val_iri(iri, new_scope, dsp_client)):
problems.append(prob)
_tidy_up(problems, iri_file)


def _initialize_iri_updaters(iri_file: Path, dsp_client: DspClient) -> list[ResourceIRIUpdater | ValueIRIUpdater]:
def _get_iris_from_file(iri_file: Path, dsp_client: DspClient) -> list[ResourceIri | ValueIri]:
logger.info(f"Read IRIs from file {iri_file} and initialize IRI updaters...")
iris_raw = {x for x in iri_file.read_text().splitlines() if re.search(r"\w", x)}
iri_updaters = [IRIUpdater.from_string(iri, dsp_client) for iri in iris_raw]
res_counter = sum(isinstance(x, ResourceIRIUpdater) for x in iri_updaters)
val_counter = sum(isinstance(x, ValueIRIUpdater) for x in iri_updaters)
iris = [_get_iri_wrapper(iri) for iri in iris_raw]
res_counter = sum(isinstance(x, ResourceIri) for x in iris)
val_counter = sum(isinstance(x, ValueIri) for x in iris)
logger.info(
f"Perform {len(iri_updaters)} updates ({res_counter} resources and {val_counter} values) "
f"Perform {len(iris)} updates ({res_counter} resources and {val_counter} values) "
f"on server {dsp_client.server}..."
)
return iri_updaters
return iris


def _tidy_up(iri_updaters: list[ResourceIRIUpdater | ValueIRIUpdater], iri_file: Path) -> None:
if failed_updaters := [x for x in iri_updaters if x.err_msg]:
def _get_iri_wrapper(string: str) -> ResourceIri | ValueIri:
if re.search(r"^http://rdfh\.ch/[^/]{4}/[^/]{22}/values/[^/]{22}$", string):
return ValueIri(string)
elif re.search(r"^http://rdfh\.ch/[^/]{4}/[^/]{22}$", string):
return ResourceIri(string)
else:
raise InvalidIRIError(f"Could not parse IRI {string}")


def _update_res_iri(iri: ResourceIri, new_scope: PermissionScope, dsp_client: DspClient) -> IriUpdateProblem | None:
res_dict = _get_res_dict(dsp_client, iri)
try:
update_permissions_for_resource(
resource_iri=iri.iri,
lmd=res_dict["knora-api:lastModificationDate"],
resource_type=res_dict["@type"],
context=res_dict["@context"] | {"knora-admin": KNORA_ADMIN_ONTO_NAMESPACE},
scope=new_scope,
dsp_client=dsp_client,
)
return None
except ApiError as err:
problem = IriUpdateProblem(iri, err.message)
logger.error(problem.err_msg)
return problem


def _update_val_iri(iri: ValueIri, new_scope: PermissionScope, dsp_client: DspClient) -> IriUpdateProblem | None:
res_dict = _get_res_dict(dsp_client, iri)
val_oap = next((v for v in get_value_oaps(res_dict) if v.value_iri == iri.iri), None)
if not val_oap:
problem = IriUpdateProblem(iri, f"Could not find value {iri} in resource {res_dict['@id']}")
logger.error(problem.err_msg)
return problem
val_oap.scope = new_scope
try:
update_permissions_for_value(
value=val_oap,
resource_type=res_dict["@type"],
context=res_dict["@context"] | {"knora-admin": KNORA_ADMIN_ONTO_NAMESPACE},
dsp_client=dsp_client,
)
return None
except ApiError as err:
problem = IriUpdateProblem(iri, err.message)
logger.error(problem.err_msg)
return problem


def _get_res_dict(dsp_client: DspClient, iri: ResourceIri | ValueIri) -> dict[str, Any]:
res_iri = re.sub(r"/values/[^/]{22}$", "", iri.iri)
return dsp_client.get(f"/v2/resources/{quote_plus(res_iri, safe='')}")


def _tidy_up(problems: list[IriUpdateProblem], iri_file: Path) -> None:
if problems:
failed_iris_file = iri_file.with_stem(f"{iri_file.stem}_failed")
failed_iris_file.write_text("\n".join([f"{x.iri}\t\t{x.err_msg}" for x in failed_updaters]))
logger.info(f"Some updates failed. The failed IRIs and error messages have been saved to {failed_iris_file}.")
failed_iris_file.write_text("\n".join([f"{x.iri}\t\t{x.err_msg}" for x in problems]))
logger.info(f"{len(problems)} updates failed. The failed IRIs have been saved to {failed_iris_file}.")
else:
logger.info(f"All {len(iri_updaters)} updates were successful.")
logger.info("All updates were successful.")

0 comments on commit 548dd52

Please sign in to comment.