Skip to content

Commit

Permalink
Merge branch 'main' into wip/scenario-tanner
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum authored Oct 4, 2023
2 parents 49ddc24 + 4fb850d commit 4516dfc
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 119 deletions.
27 changes: 15 additions & 12 deletions dsp_permissions_scripts/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,31 @@

from dsp_permissions_scripts.models.groups import BuiltinGroup
from dsp_permissions_scripts.models.host import Hosts
from dsp_permissions_scripts.models.permission import Oap
from dsp_permissions_scripts.models.permission import Doap, Oap
from dsp_permissions_scripts.models.scope import PUBLIC
from dsp_permissions_scripts.utils.authentication import login
from dsp_permissions_scripts.utils.doap_get import (
get_doaps_of_project,
print_doaps_of_project,
)
from dsp_permissions_scripts.utils.doap_set import set_doaps_of_groups
from dsp_permissions_scripts.utils.doap_set import apply_updated_doaps_on_server
from dsp_permissions_scripts.utils.oap import apply_updated_oaps_on_server
from dsp_permissions_scripts.utils.project import get_all_resource_oaps_of_project


def modify_oaps(oaps: list[Oap]) -> list[Oap]:
for oap in oaps:
oap.scope.D.append(BuiltinGroup.PROJECT_MEMBER)
oap.scope.CR.append(BuiltinGroup.SYSTEM_ADMIN)
return oaps


def modify_doaps(doaps: list[Doap]) -> list[Doap]:
for doap in doaps:
if doap.target.group in [BuiltinGroup.PROJECT_MEMBER.value, BuiltinGroup.PROJECT_ADMIN.value]:
doap.scope = PUBLIC
return doaps


def main() -> None:
"""
The main method assembles a sample call of all available high-level functions.
Expand All @@ -29,24 +36,20 @@ def main() -> None:
shortcode = "F18E"
token = login(host)

new_scope = PUBLIC
groups = [BuiltinGroup.PROJECT_ADMIN, BuiltinGroup.PROJECT_MEMBER]

doaps = get_doaps_of_project(
project_doaps = get_doaps_of_project(
host=host,
shortcode=shortcode,
token=token,
)
print_doaps_of_project(
doaps=doaps,
doaps=project_doaps,
host=host,
shortcode=shortcode,
)
set_doaps_of_groups(
scope=new_scope,
groups=groups,
project_doaps_updated = modify_doaps(doaps=project_doaps)
apply_updated_doaps_on_server(
doaps=project_doaps_updated,
host=host,
shortcode=shortcode,
token=token,
)
resource_oaps = get_all_resource_oaps_of_project(
Expand Down
37 changes: 13 additions & 24 deletions dsp_permissions_scripts/utils/doap_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

from dsp_permissions_scripts.models.permission import Doap, DoapTarget, DoapTargetType
from dsp_permissions_scripts.utils.authentication import get_protocol
from dsp_permissions_scripts.utils.get_logger import get_logger, get_timestamp
from dsp_permissions_scripts.utils.project import get_project_iri_by_shortcode
from dsp_permissions_scripts.utils.scope_serialization import (
create_scope_from_admin_route_object,
)

logger = get_logger(__name__)


def __filter_doaps_by_target(
doaps: list[Doap],
Expand All @@ -31,26 +34,7 @@ def __filter_doaps_by_target(
return filtered_doaps


# TODO: this function is unused
def get_permissions_for_project(
project_iri: str,
host: str,
token: str,
) -> list[dict[str, Any]]:
"""
Returns all permissions for the given project.
"""
headers = {"Authorization": f"Bearer {token}"}
project_iri = quote_plus(project_iri, safe="")
protocol = get_protocol(host)
url = f"{protocol}://{host}/admin/permissions/{project_iri}"
response = requests.get(url, headers=headers, timeout=5)
assert response.status_code == 200
permissions: list[dict[str, Any]] = response.json()["permissions"]
return permissions


def get_all_doaps_of_project(
def __get_all_doaps_of_project(
project_iri: str,
host: str,
token: str,
Expand Down Expand Up @@ -98,11 +82,12 @@ def get_doaps_of_project(
Optionally, select only the DOAPs that are related to either a group, or a resource class, or a property.
By default, all DOAPs are returned, regardless of their target (target=all).
"""
logger.info(f"******* Getting DOAPs of project {shortcode} on server {host} *******")
project_iri = get_project_iri_by_shortcode(
shortcode=shortcode,
host=host,
)
doaps = get_all_doaps_of_project(
doaps = __get_all_doaps_of_project(
project_iri=project_iri,
host=host,
token=token,
Expand All @@ -111,6 +96,7 @@ def get_doaps_of_project(
doaps=doaps,
target=target,
)
logger.info(f"Found {len(doaps)} DOAPs, {len(filtered_doaps)} of which are related to {target}.")
return filtered_doaps


Expand All @@ -123,7 +109,10 @@ def print_doaps_of_project(
heading = f"Project {shortcode} on server {host} has {len(doaps)} DOAPs"
if target != DoapTargetType.ALL:
heading += f" which are related to a {target}"
print(f"\n{heading}\n{'=' * len(heading)}\n")
print(f"\n{get_timestamp()}: {heading}\n{'=' * (len(heading) + len(get_timestamp()) + 2)}\n")
logger.info(f"******* Printing DOAPs of project {shortcode} on server {host} *******")
logger.info(heading)
for d in doaps:
print(d.model_dump_json(indent=2, exclude_none=True))
print()
representation = d.model_dump_json(indent=2, exclude_none=True)
print(representation + "\n")
logger.info(representation)
86 changes: 24 additions & 62 deletions dsp_permissions_scripts/utils/doap_set.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
from typing import Sequence
from typing import Literal
from urllib.parse import quote_plus

import requests

from dsp_permissions_scripts.models.groups import BuiltinGroup
from dsp_permissions_scripts.models.permission import Doap
from dsp_permissions_scripts.models.scope import PermissionScope
from dsp_permissions_scripts.utils.authentication import get_protocol
from dsp_permissions_scripts.utils.doap_get import (
create_doap_from_admin_route_response,
get_all_doaps_of_project,
)
from dsp_permissions_scripts.utils.project import get_project_iri_by_shortcode
from dsp_permissions_scripts.utils.doap_get import create_doap_from_admin_route_response
from dsp_permissions_scripts.utils.get_logger import get_logger, get_timestamp
from dsp_permissions_scripts.utils.scope_serialization import (
create_admin_route_object_from_scope,
)

logger = get_logger(__name__)

def __update_doap_scope(
doap_iri: str,
Expand All @@ -37,77 +34,42 @@ def __update_doap_scope(
return new_doap


def __get_doaps_of_groups(
groups: Sequence[str | BuiltinGroup],
host: str,
shortcode: str,
token: str,
) -> list[Doap]:
def __log_and_print_doap_update(
doap: Doap,
state: Literal["before", "after"],
) -> None:
"""
Retrieves the DOAPs for the given groups.
Args:
groups: the group IRIs to whose DOAP the scope should be applied
host: the DSP server where the project is located
shortcode: the shortcode of the project
token: the access token
Returns:
applicable_doaps: the applicable DOAPs
Logs and prints the DOAP before or after the update.
"""
project_iri = get_project_iri_by_shortcode(
shortcode=shortcode,
host=host,
)
all_doaps = get_all_doaps_of_project(
project_iri=project_iri,
host=host,
token=token,
)
groups_str = []
for g in groups:
groups_str.append(g.value if isinstance(g, BuiltinGroup) else g)
applicable_doaps = [d for d in all_doaps if d.target.group in groups_str]
assert len(applicable_doaps) == len(groups)
return applicable_doaps
heading = f"DOAP {state}:"
body = doap.model_dump_json(indent=2)
print(f"{heading}\n{'-' * len(heading)}\n{body}\n")
logger.info(f"{heading}\n{body}")



def set_doaps_of_groups(
scope: PermissionScope,
groups: Sequence[str | BuiltinGroup],
def apply_updated_doaps_on_server(
doaps: list[Doap],
host: str,
shortcode: str,
token: str,
) -> None:
"""
Applies the given scope to the DOAPs of the given groups.
Updates DOAPs on the server.
Args:
scope: one of the standard scopes defined in the Scope class
groups: the group IRIs to whose DOAP the scope should be applied
doaps: the DOAPs to be sent to the server
host: the DSP server where the project is located
shortcode: the shortcode of the project
token: the access token
"""
applicable_doaps = __get_doaps_of_groups(
groups=groups,
host=host,
shortcode=shortcode,
token=token,
)
heading = f"Update {len(applicable_doaps)} DOAPs on {host}..."
logger.info(f"******* Updating {len(doaps)} DOAPs on {host} *******")
heading = f"{get_timestamp()}: Updating {len(doaps)} DOAPs on {host}..."
print(f"\n{heading}\n{'=' * len(heading)}\n")
for d in applicable_doaps:
print("Old DOAP:\n=========")
print(d.model_dump_json(indent=2))
for d in doaps:
__log_and_print_doap_update(doap=d, state="before")
new_doap = __update_doap_scope(
doap_iri=d.doap_iri,
scope=scope,
scope=d.scope,
host=host,
token=token,
)
print("\nNew DOAP:\n=========")
print(new_doap.model_dump_json(indent=2))
print()
print("All DOAPs have been updated.")
__log_and_print_doap_update(doap=new_doap, state="after")
print(f"{get_timestamp()}: All DOAPs have been updated.")
33 changes: 33 additions & 0 deletions dsp_permissions_scripts/utils/get_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging
from datetime import datetime


def get_logger(name: str) -> logging.Logger:
"""
Create a logger instance,
set its level to INFO,
and configure it to write to a file in the user's home directory.
Args:
name: name of the logger
filesize_mb: maximum size per log file in MB, defaults to 5
backupcount: number of log files to keep, defaults to 4
Returns:
the logger instance
"""
_logger = logging.getLogger(name)
_logger.setLevel(logging.INFO)
formatter = logging.Formatter(fmt="{asctime} {filename: <25} {levelname: <8} {message}", style="{")
formatter.default_time_format = "%Y-%m-%d %H:%M:%S"
handler = logging.FileHandler(
filename="logging.log",
mode="a",
)
handler.setFormatter(formatter)
_logger.addHandler(handler)
return _logger


def get_timestamp() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
Loading

0 comments on commit 4516dfc

Please sign in to comment.