Skip to content

Commit

Permalink
Limit roles coming from keycloak to the ones defined on the portal.
Browse files Browse the repository at this point in the history
  • Loading branch information
ericof committed Dec 24, 2023
1 parent 38d25e4 commit 1be4b37
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions src/pas/plugins/oidc/plugins/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@ class KeycloakGroupsPlugin(BasePlugin):
meta_type = "Keycloak Group Plugin"
security = ClassSecurityInfo()

@property
@ram.cache(lambda *args: time() // (60 * 2))
def _available_roles(self) -> List[str]:
"""Return available roles in Portal."""
member_tool = api.portal.get_tool("portal_membership")
return [r for r in member_tool.getPortalRoles() if r != "Owner"]

def _wrap_group(self, group_info: dict) -> Optional[OIDCGroup]:
"""Given a dictionary with group information, return a OIDCGroup."""
group = OIDCGroup(group_info["id"], group_info["title"])
# Add title, description properties to the group object
data = {k: v for k, v in group_info.items() if k not in _ATTRS_NOT_WRAPPED}
group.addPropertysheet("temp", data)
return group

@property
def _connection_settings(self) -> dict:
"""Keycloak REST API connection settings."""
Expand Down Expand Up @@ -100,26 +115,20 @@ def _groups(self) -> dict:
plugin_id = self.getId()
client = self.get_rest_api_client()
groups_info = client.get_groups({"briefRepresentation": False})
available_roles = self._available_roles
for item in groups_info:
roles = [r for r in item.get("realmRoles", []) if r in available_roles]
groups[item["id"]] = {
"id": item["id"],
"title": item["name"],
"description": item["path"],
"pluginid": plugin_id,
"groupid": item["id"],
"principal_type": "group",
"_roles": item.get("realmRoles", []),
"_roles": roles,
}
return groups

def _wrap_group(self, group_info: dict) -> Optional[OIDCGroup]:
"""Given a dictionary with group information, return a OIDCGroup."""
group = OIDCGroup(group_info["id"], group_info["title"])
# Add title, description properties to the group object
data = {k: v for k, v in group_info.items() if k not in _ATTRS_NOT_WRAPPED}
group.addPropertysheet("temp", data)
return group

def enumerateGroups(
self,
id=None,
Expand Down Expand Up @@ -283,7 +292,7 @@ def getGroupIds(self) -> List[str]:
return [group_id for group_id in self._groups.keys()]

def getGroupMembers(self, group_id: str) -> Tuple[str]:
"""Return the members of the given group."""
"""Return the members of a group with the given group_id."""
default = tuple()
if self.is_plugin_active(IGroupsPlugin) and group_id in self._groups:
client = self.get_rest_api_client()
Expand All @@ -302,7 +311,10 @@ def getGroupMembers(self, group_id: str) -> Tuple[str]:
#
@security.private
def getRolesForPrincipal(self, principal, request=None) -> Tuple[str]:
"""See IRolesPlugin."""
"""Return roles for a given principal (See IRolesPlugin).
We only care about principals(groups) defined in this plugin.
"""
principal_id = principal.getId()
default = tuple()
if self.is_plugin_active(IGroupsPlugin) and principal_id in self._groups:
Expand Down

0 comments on commit 1be4b37

Please sign in to comment.