Skip to content

Commit

Permalink
Refactor Group plugin, increase test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
ericof committed Feb 27, 2024
1 parent 799085c commit 8da5fdc
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 133 deletions.
3 changes: 3 additions & 0 deletions src/pas/plugins/oidc/plugins/group/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .oidc_group import OIDCGroup # noQA
from .plugin import IKeycloakGroupsPlugin # noQA
from .plugin import KeycloakGroupsPlugin # noQA
35 changes: 35 additions & 0 deletions src/pas/plugins/oidc/plugins/group/oidc_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from AccessControl import ClassSecurityInfo
from pas.plugins.oidc import logger
from Products.PlonePAS.plugins.group import PloneGroup
from typing import Union


_ATTRS_NOT_WRAPPED = [
"id",
"_members",
"_roles",
]


class OIDCGroup(PloneGroup):
security = ClassSecurityInfo()

@security.public
def addMember(self, id: str) -> None:
logger.info(f"{self._id} does not support user assignment")

@security.public
def removeMember(self, id: str) -> None:
logger.info(f"{self._id} does not support user removal")


MaybeOIDCGroup = Union[OIDCGroup, None]


def wrap_group(group_info: dict) -> MaybeOIDCGroup:
"""Given a dictionary with group information, return a OIDCGroup."""
group = OIDCGroup(group_info["id"], group_info["title"])
# Add title, description properties to the group object
data = {k: v for k, v in group_info.items() if k not in _ATTRS_NOT_WRAPPED}
group.addPropertysheet("temp", data)
return group
Original file line number Diff line number Diff line change
@@ -1,51 +1,22 @@
from AccessControl import ClassSecurityInfo
from AccessControl.class_init import InitializeClass
from keycloak import KeycloakAdmin
from keycloak import KeycloakOpenIDConnection
from keycloak.exceptions import KeycloakGetError
from pas.plugins.oidc import logger
from plone import api
from plone.memoize import ram
from pas.plugins.oidc.plugins.group import oidc_group
from pas.plugins.oidc.plugins.group import utils
from plone.memoize.ram import cache
from Products.PlonePAS.interfaces.group import IGroupIntrospection
from Products.PlonePAS.plugins.group import PloneGroup
from Products.PluggableAuthService.interfaces import plugins
from Products.PluggableAuthService.permissions import ManageGroups
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.utils import classImplements
from time import time
from typing import Any
from typing import List
from typing import Optional
from typing import Tuple
from zope.interface import implementer
from zope.interface import Interface


def config(key: str) -> Any:
"""Get registry configuration for a given key."""
name = f"keycloak_groups.{key}"
return api.portal.get_registry_record(name=name)


_ATTRS_NOT_WRAPPED = [
"id",
"_members",
"_roles",
]


class OIDCGroup(PloneGroup):
security = ClassSecurityInfo()

@security.public
def addMember(self, id) -> None:
logger.info(f"{self._id} does not support user assignment")

@security.public
def removeMember(self, id) -> None:
logger.info(f"{self._id} does not support user removal")


class IKeycloakGroupsPlugin(Interface):
"""Interface for PAS plugin for using groups in Keycloak"""

Expand All @@ -58,73 +29,68 @@ class KeycloakGroupsPlugin(BasePlugin):
security = ClassSecurityInfo()

@property
@ram.cache(lambda *args: time() // (60 * 2))
@cache(utils._cache_5_minutes)
def _available_roles(self) -> List[str]:
"""Return available roles in Portal."""
member_tool = api.portal.get_tool("portal_membership")
return [r for r in member_tool.getPortalRoles() if r != "Owner"]

def _wrap_group(self, group_info: dict) -> Optional[OIDCGroup]:
"""Given a dictionary with group information, return a OIDCGroup."""
group = OIDCGroup(group_info["id"], group_info["title"])
# Add title, description properties to the group object
data = {k: v for k, v in group_info.items() if k not in _ATTRS_NOT_WRAPPED}
group.addPropertysheet("temp", data)
return group
return utils.list_available_roles()

@property
def _connection_settings(self) -> dict:
"""Keycloak REST API connection settings."""
enabled = config("enabled")
if enabled:
return {
"server_url": config("server_url"),
"realm_name": config("realm_name"),
"client_id": config("client_id"),
"client_secret_key": config("client_secret"),
"verify": config("verify"),
}
else:
return {}
def _keycloak_client(self) -> KeycloakAdmin:
"""Return an instance of KeycloakAdmin."""
return utils.get_keycloak_client()

@security.private
def is_plugin_active(self, iface) -> bool:
"""Check if Plugin is active for given interface."""
enabled = config("enabled")
if not enabled:
if not utils.is_plugin_enabled():
logger.info(f"Group Plugin ({self.getId()}): Plugin is not active.")
return False
pas = self._getPAS()
ids = pas.plugins.listPluginIds(iface)
return self.getId() in ids

@property
def client(self) -> KeycloakAdmin:
settings = self._connection_settings
keycloak_connection = KeycloakOpenIDConnection(**settings)
keycloak_admin = KeycloakAdmin(connection=keycloak_connection)
return keycloak_admin
@cache(utils._cache_5_minutes)
def _keycloak_groups(self) -> dict:
"""Return a dictionary with all keycloak groups.
@property
@ram.cache(lambda *args: time() // (60 * 2))
def _groups(self) -> dict:
"""Query keycloak and return group information."""
groups = {}
This will be cached in memory for 5 minutes.
"""
plugin_id = self.getId()
client = self.client
groups_info = client.get_groups({"briefRepresentation": False})
available_roles = self._available_roles
for item in groups_info:
roles = [r for r in item.get("realmRoles", []) if r in available_roles]
groups[item["id"]] = {
"id": item["id"],
"title": item["name"],
"description": item["path"],
"pluginid": plugin_id,
"groupid": item["id"],
"principal_type": "group",
"_roles": roles,
}
return groups
client = self._keycloak_client
allowed_roles = self._available_roles
logger.info(f"Group Plugin ({plugin_id}): Get all Keycloak groups")
return utils.get_all_keycloak_groups(plugin_id, client, allowed_roles)

@cache(utils._cache_principal_info)
def _keycloak_groups_for_principal(self, principal_id: str) -> Tuple[str]:
"""Return a tuple with all group ids for a given principal.
This will be cached in memory for 5 minutes (for each principal).
"""
client = self._keycloak_client
logger.info(
f"Group Plugin ({self.getId()}): Get all Keycloak groups for principal {principal_id}"
)
return utils.get_groups_for_principal(client=client, principal_id=principal_id)

@cache(utils._cache_group_info)
def _keycloak_group_members(self, group_id: str) -> Tuple[str]:
client = self._keycloak_client
logger.info(
f"Group Plugin ({self.getId()}): Get all Keycloak members for group {group_id}"
)
return utils.get_group_members(client=client, group_id=group_id)

@cache(utils._cache_group_info)
def _get_group_info(self, group_id: str) -> dict:
"""Return group information, including members, for a given group_id."""
default = {}
group_info = self._keycloak_groups.get(group_id, default)
if group_info:
# Get Members
group_info["_members"] = self._keycloak_group_members(group_id=group_id)
return group_info

def enumerateGroups(
self,
Expand Down Expand Up @@ -178,7 +144,7 @@ def enumerateGroups(
default = ()
if not self.is_plugin_active(plugins.IGroupEnumerationPlugin):
return default
groups = self._groups
groups = self._keycloak_groups
if not groups:
return default
matches = []
Expand Down Expand Up @@ -211,18 +177,8 @@ def getGroupsForPrincipal(self, principal, request=None) -> Tuple[str]:
"""See IGroupsPlugin."""
if not self.is_plugin_active(plugins.IGroupsPlugin):
return tuple()
client = self.client
try:
groups = client.get_user_groups(user_id=principal.getId())
except KeycloakGetError as exc:
if exc.response_code == 404:
logger.debug(f"{principal.getId()} not found in OIDC provider")
else:
logger.debug(
f"Error {exc.response_code} looking groups for {principal.getId()}"
)
return []
return tuple([x.get("id") for x in groups])
principal_id = principal.getId()
return self._keycloak_groups_for_principal(principal_id=principal_id)

#
# (notional)IZODBGroupManager interface
Expand All @@ -232,7 +188,7 @@ def listGroupIds(self) -> Tuple[str]:
"""-> (group_id_1, ... group_id_n)"""
if not self.is_plugin_active(plugins.IGroupsPlugin):
return tuple()
return tuple(group_id for group_id in self._groups.keys())
return tuple(group_id for group_id in self._keycloak_groups.keys())

@security.protected(ManageGroups)
def listGroupInfo(self) -> Tuple[dict]:
Expand All @@ -244,39 +200,23 @@ def listGroupInfo(self) -> Tuple[dict]:
"""
if not self.is_plugin_active(plugins.IGroupsPlugin):
return tuple()
return tuple(group_info for group_info in self._groups.values())

def _get_group_info(self, group_id: str) -> dict:
default = None
group_info = self._groups.get(group_id, default)
if not group_info:
return group_info
# Get Members
client = self.client
try:
members = client.get_group_members(group_id=group_id)
except KeycloakGetError as exc:
logger.debug(f"Error {exc.response_code} looking up members for {group_id}")
members = []
group_info["_members"] = [member["id"] for member in members]
return group_info
return tuple(group_info for group_info in self._keycloak_groups.values())

@security.protected(ManageGroups)
def getGroupInfo(self, group_id: str) -> Optional[dict]:
"""group_id -> dict"""
if not self.is_plugin_active(plugins.IGroupsPlugin):
return None
group_info = self._get_group_info(group_id)
return group_info
return self._get_group_info(group_id=group_id)

def getGroupById(self, group_id: str) -> Optional[OIDCGroup]:
def getGroupById(self, group_id: str) -> oidc_group.MaybeOIDCGroup:
"""Return the portal_groupdata object for a group corresponding to this id."""
if not self.is_plugin_active(plugins.IGroupsPlugin):
return None
group_info = self.getGroupInfo(group_id)
return self._wrap_group(group_info) if group_info else None
return oidc_group.wrap_group(group_info) if group_info else None

def getGroups(self) -> List[OIDCGroup]:
def getGroups(self) -> List[oidc_group.OIDCGroup]:
"""Return an iterator of the available groups."""
if not self.is_plugin_active(plugins.IGroupsPlugin):
return []
Expand All @@ -286,24 +226,19 @@ def getGroupIds(self) -> List[str]:
"""Return a list of the available groups."""
if not self.is_plugin_active(plugins.IGroupsPlugin):
return []
return [group_id for group_id in self._groups.keys()]
return [group_id for group_id in self._keycloak_groups.keys()]

def getGroupMembers(self, group_id: str) -> Tuple[str]:
"""Return the members of a group with the given group_id.
We only care about groups defined in this plugin.
"""
default = tuple()
if self.is_plugin_active(plugins.IGroupsPlugin) and group_id in self._groups:
client = self.client
try:
members = client.get_group_members(group_id=group_id)
except KeycloakGetError as exc:
logger.debug(
f"Error {exc.response_code} looking up members for {group_id}"
)
members = default
return tuple(member["id"] for member in members)
if (
self.is_plugin_active(plugins.IGroupsPlugin)
and group_id in self._keycloak_groups
):
return self._keycloak_group_members(group_id=group_id)
return default

#
Expand All @@ -315,13 +250,13 @@ def getRolesForPrincipal(self, principal, request=None) -> Tuple[str]:
We only care about principals(groups) defined in this plugin.
"""
principal_id = principal.getId()
default = tuple()
principal_id = principal.getId()
if (
self.is_plugin_active(plugins.IGroupsPlugin)
and principal_id in self._groups
self.is_plugin_active(plugins.IRolesPlugin)
and principal_id in self._keycloak_groups
):
group_info = self._get_group_info(principal_id)
group_info = self._get_group_info(group_id=principal_id)
if group_info:
return tuple(group_info.get("_roles", default))
return default
Expand Down
Loading

0 comments on commit 8da5fdc

Please sign in to comment.