diff --git a/CHANGES.rst b/CHANGES.rst index e9f46cac..e8198f4a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -7,7 +7,14 @@ Changes `Unreleased `_ (latest) ------------------------------------------------------------------------------------ -* Nothing yet. +* Add optional key ``field`` and ``regex`` to be used in the ``sync_permissions`` section found in the config. + This allows to sync permissions using a field other than ``resource_full_name`` when creating the ``name:type`` + from the segment ``ex.: /field1::type1/field2::type2``. Adds support to use ``resource_display_name``. +* The ``regex`` is used to extract the desired information from the ``nametype_path``. It should be used to do an + exact match. This new search overrides the default way of matching each segment with the ``nametype_path``. + In the case where a ``regex`` is found in the target segment, the data will be formed using the same ``resource_type`` + for every match in the same segment. Similary, as using ``- name: "**"`` in the config to match multiple segment, + it is possible to use a ``regex`` to match multiple resources in the same segment with ``regex: '(?<=:).*\/?(?=\/)'`` `2.1.0 `_ (2023-09-18) ------------------------------------------------------------------------------------ diff --git a/config/config.example.yml b/config/config.example.yml index 4d3a3544..8b3429e2 100644 --- a/config/config.example.yml +++ b/config/config.example.yml @@ -183,3 +183,58 @@ sync_permissions: - "process_job_status : read -> job_status : read" # different permission (match), otherwise all jobs/outputs become available. - "process_job_status : read -> process_description : read-match" + stac_permissions: + services: + api: + stac_collection: + - name: stac + type: service + - name: stac + type: route + - name: collections + type: route + - type: route + field: resource_display_name # Use the resource_display_name for permission mapping. + regex: '[\w]+:[\w\/]+' # This will extract the display name (ex: thredds:birdhouse/testdata/xclim/cmip6). + stac_item: + - name: stac + type: service + - name: stac + type: route + - name: collections + type: route + - name: "{collectionId}" + type: route + - name: items + type: route + - type: route + field: resource_display_name # Use the resource_display_name for permission mapping. + regex: '[\w]+:[\w\/.-]+' # This will extract the display name (ex: thredds:birdhouse/testdata/xclim/cmip6/sic_SImon_CCCma-CanESM5_ssp245_r13i1p2f1_2020.nc) + + thredds: + thredd_collection: + - name: thredds + type: service + - type: directory + regex: '(?<=:).*\/?(?=\/)' # Match everything after ":" but before last "/" (ex: thredds:birdhouse/testdata/xclim/cmip6/sic_SImon_CCCma-CanESM5_ssp245_r13i1p2f1_2020.nc + # will match to : birdhouse/testdata/xclim/cmip6). It would be equivalent as matching any number of directory using "**". + thredd_item: + - name: thredds + type: service + - type: directory + regex: '(?<=:).*\/?(?=\/)' # Match everything after ":" but before last "/" example thredds:birdhouse/testdata/xclim/cmip6/sic_SImon_CCCma-CanESM5_ssp245_r13i1p2f1_2020.nc + # will return birdhouse/testdata/xclim/cmip6. This would create recreate the same hierarchy of directories. + - type: file + regex: '[^\/]+$' # Match a file in a leaf directory (ex: sic_SImon_CCCma-CanESM5_ssp245_r13i1p2f1_2020.nc). + + permissions_mapping: + # Permission mapping relating to the stac collection mapped to a thredd directory. + - "stac_collection : read-match -> thredd_collection : browse-match" + - "stac_collection : read-allow-recursive -> thredd_collection : browse-allow-recursive" + - "stac_collection : read-deny-match -> thredd_collection : browse-deny-match" + - "stac_collection : read-deny-recursive -> thredd_collection : browse-deny-recursive" + # Permission mapping relating to the stac item mapped to a thredd file. + - "stac_item : read-match -> thredd_item : browse-match" + - "stac_item : read-allow-recursive -> thredd_item : browse-allow-recursive" + - "stac_item : read-deny-match -> thredd_item : browse-deny-match" + - "stac_item : read-deny-recursive -> thredd_item : browse-deny-recursive" diff --git a/cowbird/api/webhooks/views.py b/cowbird/api/webhooks/views.py index ec47c539..702bb65e 100644 --- a/cowbird/api/webhooks/views.py +++ b/cowbird/api/webhooks/views.py @@ -114,6 +114,7 @@ def post_permission_webhook_view(request: Request) -> AnyResponseType: param_regex_with_slashes = r"^/?[A-Za-z0-9]+(?:[\s_\-\./:][A-Za-z0-9]+)*$" resource_full_name = ar.get_multiformat_body(request, "resource_full_name", pattern=param_regex_with_slashes) + resource_display_name = ar.get_multiformat_body(request, "resource_display_name", check_type=(str, type(None))) name = ar.get_multiformat_body(request, "name") access = ar.get_multiformat_body(request, "access") scope = ar.get_multiformat_body(request, "scope") @@ -127,6 +128,7 @@ def post_permission_webhook_view(request: Request) -> AnyResponseType: service_type=service_type, resource_id=resource_id, resource_full_name=resource_full_name, + resource_display_name=resource_display_name, name=name, access=access, scope=scope, diff --git a/cowbird/config.py b/cowbird/config.py index 115dd286..c94d4ba6 100644 --- a/cowbird/config.py +++ b/cowbird/config.py @@ -4,7 +4,7 @@ from typing import Any, Dict, List, Literal, Tuple, Union, cast, overload import yaml -from schema import And, Optional, Regex, Schema +from schema import And, Optional, Or, Regex, Schema from cowbird.typedefs import ( ConfigDict, @@ -191,7 +191,7 @@ def validate_sync_perm_config_schema(sync_cfg: SyncPointConfig) -> None: "services": { str: { # Service type, must correspond to an actual Magpie service type str: [ # Resource key, used to identify the resource here and in the permissions_mapping - {"name": str, "type": str} + {Or("name", "regex", only_one=True): str, "type": str, Optional("field"): str} ] } }, @@ -213,21 +213,23 @@ def validate_and_get_resource_info(res_key: str, segments: List[ConfigSegment]) named_tokens = set() has_multi_token = False for seg in segments: - if seg["name"] == MULTI_TOKEN: - if has_multi_token: - raise ConfigErrorInvalidTokens(f"Invalid config value for resource key {res_key}. Only one " - f"`{MULTI_TOKEN}` token is permitted per resource.") - has_multi_token = True - else: - matched_groups = re.match(NAMED_TOKEN_REGEX, seg["name"]) - if matched_groups: - # Save the first group as a named token, since there's only 1 matching group in the regex. - if matched_groups.groups()[0] in named_tokens: - raise ConfigErrorInvalidTokens(f"Invalid config value for resource key {res_key}. Named token " - f"{matched_groups.groups()[0]} was found in multiple segments of " - "the resource path. Each named token should only be used once in a " - "resource path.") - named_tokens.add(matched_groups.groups()[0]) + if "name" in seg: + if seg["name"] == MULTI_TOKEN: + if has_multi_token: + raise ConfigErrorInvalidTokens(f"Invalid config value for resource key {res_key}. Only one " + f"`{MULTI_TOKEN}` token is permitted per resource.") + has_multi_token = True + else: + matched_groups = re.match(NAMED_TOKEN_REGEX, seg["name"]) + if matched_groups: + # Save the first group as a named token, since there's only 1 matching group in the regex. + if matched_groups.groups()[0] in named_tokens: + raise ConfigErrorInvalidTokens( + f"Invalid config value for resource key {res_key}. Named token " + f"{matched_groups.groups()[0]} was found in multiple segments of " + "the resource path. Each named token should only be used once in a " + "resource path.") + named_tokens.add(matched_groups.groups()[0]) return {"has_multi_token": has_multi_token, "named_tokens": named_tokens} diff --git a/cowbird/handlers/impl/magpie.py b/cowbird/handlers/impl/magpie.py index b8535123..c4ec734d 100644 --- a/cowbird/handlers/impl/magpie.py +++ b/cowbird/handlers/impl/magpie.py @@ -431,13 +431,14 @@ def delete_permission(self, permissions_data: List[Dict[str, str]]) -> None: else: LOGGER.warning("Empty permission data, no permissions to remove.") - def create_resource(self, resource_name: str, resource_type: str, parent_id: Optional[int]) -> int: + def create_resource(self, resource_name: str, resource_type: str, parent_id: Optional[int], + resource_display_name: Optional[str] = None) -> int: """ Creates the specified resource in Magpie and returns the created resource id if successful. """ resource_data = { "resource_name": resource_name, - "resource_display_name": resource_name, + "resource_display_name": resource_display_name or resource_name, "resource_type": resource_type, "parent_id": parent_id } diff --git a/cowbird/permissions_synchronizer.py b/cowbird/permissions_synchronizer.py index d98e9115..6e428b90 100644 --- a/cowbird/permissions_synchronizer.py +++ b/cowbird/permissions_synchronizer.py @@ -1,6 +1,6 @@ import re from copy import deepcopy -from typing import TYPE_CHECKING, Callable, Dict, Iterator, List, MutableMapping, Tuple, cast +from typing import TYPE_CHECKING, Callable, Collection, Dict, Iterator, List, MutableMapping, Tuple, Union, cast from cowbird.config import ( BIDIRECTIONAL_ARROW, @@ -68,12 +68,14 @@ def __init__(self, access: str, scope: str, user: str = None, - group: str = None + group: str = None, + resource_display_name: str = None ) -> None: self.service_name = service_name self.service_type = service_type self.resource_id = resource_id self.resource_full_name = resource_full_name + self.resource_display_name = resource_display_name self.name = name self.access = access self.scope = scope @@ -85,6 +87,7 @@ def __eq__(self, other: "Permission") -> bool: # type: ignore[override] self.service_type == other.service_type and self.resource_id == other.resource_id and self.resource_full_name == other.resource_full_name and + self.resource_display_name == other.resource_display_name and self.name == other.name and self.access == other.access and self.scope == other.scope and @@ -182,6 +185,11 @@ def _generate_regex_from_segments(res_segments: List[ConfigSegment]) -> Tuple[st named_segments_count = 0 res_regex = r"^" for segment in res_segments: + # if a regex is passed, override the current regex and return + regex = segment.get("regex") + if regex is not None: + return regex, -1 + matched_groups = re.match(NAMED_TOKEN_REGEX, segment["name"]) if matched_groups: # match any name with specific type 1 time only @@ -199,6 +207,29 @@ def _generate_regex_from_segments(res_segments: List[ConfigSegment]) -> Tuple[st res_regex += r"$" return res_regex, named_segments_count + @staticmethod + def _generate_nametype_path_from_segments(res_segments: List[ConfigSegment], + src_resource_tree: ResourceTree) -> str: + """ + Generate nametype path (ex.: /name1::type1/name2::type2 where name can be a field found in ResourceSegment). + + :param res_segments: list of segments + :param src_resource_tree: Resource tree associated with the permission to synchronize + """ + resource_nametype_path = "" + res_segments_len = len(res_segments) + for index, res in enumerate(src_resource_tree): + res_type = res["resource_type"] + if index < res_segments_len: + key = res_segments[index].get("field") + if key is None: + key = "resource_name" + else: + key = "resource_name" + resource_nametype_path += f"/{res[key]}{RES_NAMETYPE_SEPARATOR}{res_type}" + + return resource_nametype_path + @staticmethod def _remove_type_from_nametype_path(nametype_path: str) -> str: """ @@ -210,16 +241,18 @@ def _remove_type_from_nametype_path(nametype_path: str) -> str: formatted_path += "/" + segment.split(RES_NAMETYPE_SEPARATOR)[0] return formatted_path - def _find_matching_res(self, service_type: str, resource_nametype_path: str) -> Tuple[str, Dict[str, str]]: + def _find_matching_res(self, permission: Permission, + src_resource_tree: ResourceTree) -> Tuple[str, Union[Collection[str], Dict[str, str]]]: """ Finds a resource key that matches the input resource path, in the sync_permissions config. Note that it returns the longest match and only the named segments of the path are included in the length value. Any tokenized segment is ignored in the length. - :param service_type: Type of the service associated with the input resource. - :param resource_nametype_path: Full resource path name, which includes the type of each segment - (ex.: /name1::type1/name2::type2) + :param permission: Permission of the service associated with the input resource. + :param ResourceTree: Resource tree associated with the permission to synchronize """ + + service_type = permission.service_type if service_type in self.services: # Find which resource from the config matches with the input permission's resource tree # The length of a match is determined by the number of named segments matching the input resource. @@ -238,21 +271,42 @@ def _find_matching_res(self, service_type: str, resource_nametype_path: str) -> # An error would be raised because 2 matches of the same length would be found. # - /**/file # - /file + # + # In the case where a regex is used, the behavior is changed to search for the exact + # match in the resource_nametype_path. The lengh of the match is used to favor a more specific match. + # Example: + # 1: + # - //res1/res2// + # - //res1/res2//res3// # We favor this path if it matches since it is more specific. + # note: It is possible to have multiple resource in the same segment when using a custom + # regex that extract a display_name containing a path to a specific resource. matched_length_by_res = {} matched_groups_by_res = {} service_resources = self.services[service_type] for res_key, res_segments in service_resources.items(): res_regex, named_segments_count = SyncPoint._generate_regex_from_segments(res_segments) - matches = re.match(res_regex, resource_nametype_path) + resource_nametype_path = SyncPoint._generate_nametype_path_from_segments(res_segments, + src_resource_tree) + if named_segments_count == -1: + # To be able to match a path anywhere in the resource_nametype_path we need to use search + # only when the field regex is passed in the res_segments. This allow to stay backward compatible. + matches = re.search(res_regex, resource_nametype_path) + else: + matches = re.match(res_regex, resource_nametype_path) if matches: - matched_groups = matches.groupdict() + exact_match = matches.group() + matched_groups = matches.groupdict() if named_segments_count != -1 else exact_match if "multi_token" in matched_groups: matched_groups["multi_token"] = SyncPoint._remove_type_from_nametype_path( matched_groups["multi_token"] ) matched_groups_by_res[res_key] = matched_groups - matched_length_by_res[res_key] = named_segments_count + # Since we want to be able to match multiple dir /dir1/dir2/dir3/** in the same segment + # if a custom regex is passed. We need to use the len of the exact match to avoid matching + # the wrong res_key + matched_length_by_res[res_key] = (named_segments_count if named_segments_count != -1 + else len(exact_match)) # Find the longest match max_match_len = max(matched_length_by_res.values(), default=0) @@ -269,43 +323,59 @@ def _find_matching_res(self, service_type: str, resource_nametype_path: str) -> @staticmethod def _create_res_data(target_segments: List[ConfigSegment], - input_matched_groups: Dict[str, str], + input_matched_groups: Union[Collection[str], Dict[str, str]], ) -> List[ResourceSegment]: """ Creates resource data, by replacing any tokens found in the segment names to their actual corresponding values. - This data includes the name and type of each segments of a full resource path. + This data includes the name and type of each segments of a full resource path. In the case where a regex is + found in the target segment, the data will be formed using the same resource_type for every match in the current + segment. :param target_segments: List containing the name and type info of each segment of the target resource path. :param input_matched_groups: """ res_data: List[ResourceSegment] = [] for segment in target_segments: - matched_groups = re.match(NAMED_TOKEN_REGEX, segment["name"]) - if matched_groups: - res_data.append({ - "resource_name": input_matched_groups[matched_groups.groups()[0]], - "resource_type": segment["type"] - }) - elif segment["name"] == MULTI_TOKEN: - multi_segments = input_matched_groups["multi_token"] - # Skip the segment if the multi_token matched 0 times, resulting in an empty string. + # Use the regex to create the res_data + if segment.get("regex") is not None: + regex = segment.get("regex") + matches = re.search(regex, input_matched_groups) + multi_segments = matches.group() if multi_segments: for seg in multi_segments.split("/"): - if seg: # Ignore empty splits + if seg: res_data.append({ "resource_name": seg, "resource_type": segment["type"] }) + else: - res_data.append({ - "resource_name": segment["name"], - "resource_type": segment["type"] - }) + matched_groups = re.match(NAMED_TOKEN_REGEX, segment["name"]) + if matched_groups: + res_data.append({ + "resource_name": input_matched_groups[matched_groups.groups()[0]], + "resource_type": segment["type"] + }) + elif segment["name"] == MULTI_TOKEN: + multi_segments = input_matched_groups["multi_token"] + # Skip the segment if the multi_token matched 0 times, resulting in an empty string. + if multi_segments: + for seg in multi_segments.split("/"): + if seg: # Ignore empty splits + res_data.append({ + "resource_name": seg, + "resource_type": segment["type"] + }) + else: + res_data.append({ + "resource_name": segment["name"], + "resource_type": segment["type"] + }) return res_data def _get_resource_full_name_and_type(self, res_key: str, - matched_groups: Dict[str, str], + matched_groups: Union[Collection[str], Dict[str, str]], ) -> Tuple[str, List[ResourceSegment]]: """ Finds the resource data from the config by using the resource key. @@ -365,7 +435,7 @@ def _is_in_permissions(target_permission: str, def _filter_used_targets(self, target_res_and_permissions: TargetResourcePermissions, input_src_res_key: str, - src_matched_groups: Dict[str, str], + src_matched_groups: Union[Collection[str], Dict[str, str]], input_permission: Permission, ) -> Tuple[Dict[str, List[str]], Dict[str, List[str]]]: """ @@ -444,7 +514,7 @@ def _filter_used_targets(self, def _get_permission_data(self, user_targets: Dict[str, List[str]], group_targets: Dict[str, List[str]], - src_matched_groups: Dict[str, str], + src_matched_groups: Union[Collection[str], Dict[str, str]], input_permission: Permission) -> PermissionData: """ Formats permissions data to send to Magpie. Output contains, for each target resource key, the resource path @@ -485,7 +555,7 @@ def _prepare_permissions_to_remove(self, target_res_and_permissions: TargetResourcePermissions, input_permission: Permission, input_src_res_key: str, - src_matched_groups: Dict[str, str], + src_matched_groups: Union[Collection[str], Dict[str, str]], ) -> PermissionData: """ Removes every source resource found in the mappings that has an existing permission that is synced to one of the @@ -501,7 +571,7 @@ def _prepare_permissions_to_remove(self, def _find_permissions_to_sync(self, src_res_key: str, - src_matched_groups: Dict[str, str], + src_matched_groups: Union[Collection[str], Dict[str, str]], input_permission: Permission, perm_operation: Callable[[List[PermissionConfigItemType]], None], ) -> PermissionData: @@ -543,11 +613,7 @@ def sync(self, :param permission: Permission to synchronize with others services :param src_resource_tree: Resource tree associated with the permission to synchronize """ - resource_nametype_path = "" - for res in src_resource_tree: - resource_nametype_path += f"/{res['resource_name']}{RES_NAMETYPE_SEPARATOR}{res['resource_type']}" - - src_res_key, src_matched_groups = self._find_matching_res(permission.service_type, resource_nametype_path) + src_res_key, src_matched_groups = self._find_matching_res(permission, src_resource_tree) if not src_res_key: # A matching resource was not found in the sync config, nothing to do. return diff --git a/cowbird/typedefs.py b/cowbird/typedefs.py index bb093aeb..1699caae 100644 --- a/cowbird/typedefs.py +++ b/cowbird/typedefs.py @@ -119,7 +119,7 @@ ConfigList = List[ConfigItem] ConfigDict = Dict[str, Union[str, ConfigItem, ConfigList, JSON]] ConfigResTokenInfo = TypedDict("ConfigResTokenInfo", {"has_multi_token": bool, "named_tokens": MutableSet[str]}) -ConfigSegment = TypedDict("ConfigSegment", {"name": str, "type": str}) +ConfigSegment = TypedDict("ConfigSegment", {"name": str, "type": str, "field": Optional[str], "regex": Optional[str]}) SyncPointMappingType = List[str] SyncPointServicesType = Dict[ @@ -142,12 +142,13 @@ SyncPermissionConfig, ] -ResourceSegment = TypedDict("ResourceSegment", {"resource_name": str, "resource_type": str}) +ResourceSegment = TypedDict("ResourceSegment", {"resource_name": str, "resource_type": str, + "resource_display_name": NotRequired[str]}) ResourceTree = List[ Dict[ str, # FIXME: replace by a more specific type provided by Magpie directly if eventually implemented - # Only partial fields are provided below (resource_name/resource_type), + # Only partial fields are provided below (resource_name/resource_type/resource_display_name), # because those are the only ones used for now in Cowbird's sync operation. # This actually contains more details such as the resource ID, permission names, etc. # (see the response body of 'GET /magpie/resources/{resource_id}' for exact content). diff --git a/docs/configuration.rst b/docs/configuration.rst index 555aefa2..c626a50f 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -139,7 +139,16 @@ multiple choices of matching are possible. We could match ``seg1/seg2`` with the first token, and ``seg3`` with the second token, we could also match ``seg1`` with the first token, and ``seg2/seg3`` with the second token, etc. -The variables and tokens are useful to know the type of any segments that doesn't have a fixed name. +The key ``field`` is useful when we want to have a different mapping between resources. The ``resource_name`` +is used by default, but another key from the `Magpie`_ resource schema can be used, if specified by using the +key ``field``. + +A regular expression ``regex`` may be used to extract the desired information from the ``nametype_path``. +This will override the default behaviour of matching each segment with another segment and will instead use +what is extracted from the segment and prioritize the longest match. When used in the target, the ``regex`` +extracts each resource in the segment with the same type. + +The variables, tokens and regex are useful to know the type of any segments that doesn't have a fixed name. .. _permissions_mapping: diff --git a/setup.cfg b/setup.cfg index d4c50e4c..e271a076 100644 --- a/setup.cfg +++ b/setup.cfg @@ -59,11 +59,12 @@ exclude = eggs, parts, share, + node_modules, [pylint] [bandit] -exclude = *.egg-info,build,dist,env,./tests,test_* +exclude = *.egg-info,build,dist,env,./tests,test_*,./node_modules targets = . [tool:isort] diff --git a/tests/test_permissions_synchronizer.py b/tests/test_permissions_synchronizer.py index 178db108..de7a3afc 100644 --- a/tests/test_permissions_synchronizer.py +++ b/tests/test_permissions_synchronizer.py @@ -667,6 +667,104 @@ def test_webhooks_invalid_service(self): utils.check_raises(lambda: HandlerFactory().create_handler("Magpie"), ConfigErrorInvalidServiceKey, msg="invalid config file should raise") + def test_webhooks_valid_regex(self): + """ + Tests the permissions synchronization of resources that use a regex to extract a display_name in the config. + """ + self.data["sync_permissions"] = { + "user_workspace": { + "services": { + ServiceTHREDDS.service_type: { + "Thredds_file_src": [ + {"name": self.test_service_name, "type": Service.resource_type_name}, + {"name": "private", "type": Directory.resource_type_name}, + {"name": "directory", "type": Directory.resource_type_name}, + {"field": "resource_display_name", + "regex": r"[\w]+:[\w\/.-]+", + "type": File.resource_type_name}], + "Thredds_file_target": [ + {"name": self.test_service_name, "type": Service.resource_type_name}, + {"regex": r"(?<=catalog:)[\w\/]*\/?(?=\/)", "type": Directory.resource_type_name}, + {"regex": r"[^\/]+$", "type": File.resource_type_name}], + "Thredds_dir_src": [ + {"name": self.test_service_name, "type": Service.resource_type_name}, + {"name": "private", "type": Directory.resource_type_name}, + {"field": "resource_display_name", "regex": r"[\w]+:[\w\/]+", + "type": Directory.resource_type_name}], + "Thredds_dir_target": [ + {"name": self.test_service_name, "type": Service.resource_type_name}, + {"regex": r"(?<=:)[\w\/]+", "type": Directory.resource_type_name}]}}, + "permissions_mapping": [ + f"Thredds_file_src : {Permission.READ.value} -> " + f"Thredds_file_target : {Permission.READ.value}", + f"Thredds_dir_src : {Permission.READ.value} -> " + f"Thredds_dir_target : {Permission.READ.value}", + ] + } + } + with open(self.cfg_file.name, mode="w", encoding="utf-8") as f: + f.write(yaml.safe_dump(self.data)) + app = utils.get_test_app(settings={"cowbird.config_path": self.cfg_file.name}) + # Recreate new magpie handler instance with new config + HandlerFactory().create_handler("Magpie") + + with contextlib.ExitStack() as stack: + stack.enter_context(mock.patch("cowbird.handlers.impl.thredds.Thredds", + side_effect=utils.MockAnyHandler)) + + # Create test resources + private_src_res_id = self.magpie.create_resource("private", Directory.resource_type_name, + self.test_service_id) + parent_res_id = self.magpie.create_resource("directory", Directory.resource_type_name, private_src_res_id, + f"{self.test_service_name}:directory") + dir_src_res_id = parent_res_id + file_src_res_id = self.magpie.create_resource("file.nc", File.resource_type_name, parent_res_id, + f"{self.test_service_name}:directory/file.nc") + + parent_res_id = self.magpie.create_resource("directory", Directory.resource_type_name, self.test_service_id) + dir_target_res_id = parent_res_id + file_target_res_id = self.magpie.create_resource("file.nc", File.resource_type_name, parent_res_id) + + # Create permissions for 1st mapping case, src resource should match to + data = { + "event": ValidOperations.CreateOperation.value, + "service_name": None, + "service_type": ServiceTHREDDS.service_type, + "resource_id": dir_src_res_id, + "resource_full_name": f"/{self.test_service_name}/private/directory", + "resource_display_name": f"{self.test_service_name}:directory", + "name": Permission.READ.value, + "access": Access.ALLOW.value, + "scope": Scope.RECURSIVE.value, + "user": self.usr, + "group": None + } + # Check permission are initialy empty + self.check_user_permissions(dir_target_res_id, []) + + resp = utils.test_request(app, "POST", "/webhooks/permissions", json=data) + utils.check_response_basic_info(resp, 200, expected_method="POST") + + # check permissions with first mapping case + self.check_user_permissions(dir_target_res_id, + [Permission.READ.value, + f"{Permission.READ.value}-{Access.ALLOW.value}-{Scope.RECURSIVE.value}"]) + + # Create and check permissions with 2nd mapping case + data["resource_id"] = file_src_res_id + data["resource_full_name"] = f"/{self.test_service_name}/private/directory/file.nc" + data["resource_display_name_name"] = f"{self.test_service_name}:directory/file.nc" + + # Check permission are initialy empty before 2nd mapping case + self.check_user_permissions(file_target_res_id, []) + + # check permissions with 2nd mapping case + resp = utils.test_request(app, "POST", "/webhooks/permissions", json=data) + utils.check_response_basic_info(resp, 200, expected_method="POST") + self.check_user_permissions(file_target_res_id, + [Permission.READ.value, + f"{Permission.READ.value}-{Access.ALLOW.value}-{Scope.RECURSIVE.value}"]) + def check_config(config_data: Dict, expected_exception_type: Type[Exception] = None) -> None: """