From 1be4b379cf91c24f228cf21823716930305f3380 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89rico=20Andrei?= Date: Sun, 24 Dec 2023 12:38:59 -0300 Subject: [PATCH] Limit roles coming from keycloak to the ones defined on the portal. --- src/pas/plugins/oidc/plugins/group.py | 34 ++++++++++++++++++--------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/pas/plugins/oidc/plugins/group.py b/src/pas/plugins/oidc/plugins/group.py index 1a86898..e72ff54 100644 --- a/src/pas/plugins/oidc/plugins/group.py +++ b/src/pas/plugins/oidc/plugins/group.py @@ -59,6 +59,21 @@ class KeycloakGroupsPlugin(BasePlugin): meta_type = "Keycloak Group Plugin" security = ClassSecurityInfo() + @property + @ram.cache(lambda *args: time() // (60 * 2)) + def _available_roles(self) -> List[str]: + """Return available roles in Portal.""" + member_tool = api.portal.get_tool("portal_membership") + return [r for r in member_tool.getPortalRoles() if r != "Owner"] + + def _wrap_group(self, group_info: dict) -> Optional[OIDCGroup]: + """Given a dictionary with group information, return a OIDCGroup.""" + group = OIDCGroup(group_info["id"], group_info["title"]) + # Add title, description properties to the group object + data = {k: v for k, v in group_info.items() if k not in _ATTRS_NOT_WRAPPED} + group.addPropertysheet("temp", data) + return group + @property def _connection_settings(self) -> dict: """Keycloak REST API connection settings.""" @@ -100,7 +115,9 @@ def _groups(self) -> dict: plugin_id = self.getId() client = self.get_rest_api_client() groups_info = client.get_groups({"briefRepresentation": False}) + available_roles = self._available_roles for item in groups_info: + roles = [r for r in item.get("realmRoles", []) if r in available_roles] groups[item["id"]] = { "id": item["id"], "title": item["name"], @@ -108,18 +125,10 @@ def _groups(self) -> dict: "pluginid": plugin_id, "groupid": item["id"], "principal_type": "group", - "_roles": item.get("realmRoles", []), + "_roles": roles, } return groups - def _wrap_group(self, group_info: dict) -> Optional[OIDCGroup]: - """Given a dictionary with group information, return a OIDCGroup.""" - group = OIDCGroup(group_info["id"], group_info["title"]) - # Add title, description properties to the group object - data = {k: v for k, v in group_info.items() if k not in _ATTRS_NOT_WRAPPED} - group.addPropertysheet("temp", data) - return group - def enumerateGroups( self, id=None, @@ -283,7 +292,7 @@ def getGroupIds(self) -> List[str]: return [group_id for group_id in self._groups.keys()] def getGroupMembers(self, group_id: str) -> Tuple[str]: - """Return the members of the given group.""" + """Return the members of a group with the given group_id.""" default = tuple() if self.is_plugin_active(IGroupsPlugin) and group_id in self._groups: client = self.get_rest_api_client() @@ -302,7 +311,10 @@ def getGroupMembers(self, group_id: str) -> Tuple[str]: # @security.private def getRolesForPrincipal(self, principal, request=None) -> Tuple[str]: - """See IRolesPlugin.""" + """Return roles for a given principal (See IRolesPlugin). + + We only care about principals(groups) defined in this plugin. + """ principal_id = principal.getId() default = tuple() if self.is_plugin_active(IGroupsPlugin) and principal_id in self._groups: