From 748d4312b63526ba65363b3e18795f3f5ef627fa Mon Sep 17 00:00:00 2001 From: Victor Rocheleau Date: Mon, 26 Aug 2024 17:47:18 -0400 Subject: [PATCH] feat(bentoctl): grafana roles and groups creation --- py_bentoctl/auth_helper.py | 93 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 89 insertions(+), 4 deletions(-) diff --git a/py_bentoctl/auth_helper.py b/py_bentoctl/auth_helper.py index 90211a8e..c0a5de68 100644 --- a/py_bentoctl/auth_helper.py +++ b/py_bentoctl/auth_helper.py @@ -44,7 +44,9 @@ GRAFANA_CLIENT_ID = os.getenv("BENTO_GRAFANA_CLIENT_ID") GRAFANA_PRIVATE_URL = os.getenv("BENTO_PRIVATE_GRAFANA_URL") -KC_CLIENTS_ENDPOINT = f"admin/realms/{AUTH_REALM}/clients" +KC_ADMIN_API_ENDPOINT = f"admin/realms/{AUTH_REALM}" +KC_ADMIN_API_GROUP_ENDPOINT = f"{KC_ADMIN_API_ENDPOINT}/groups" +KC_ADMIN_API_CLIENTS_ENDPOINT = f"{KC_ADMIN_API_ENDPOINT}/clients" def check_auth_admin_user(): @@ -87,7 +89,7 @@ def keycloak_req( def fetch_existing_client_id(token: str, client_id: str) -> Optional[str]: - existing_clients_res = keycloak_req(KC_CLIENTS_ENDPOINT, bearer_token=token) + existing_clients_res = keycloak_req(KC_ADMIN_API_CLIENTS_ENDPOINT, bearer_token=token) existing_clients = existing_clients_res.json() if not existing_clients_res.ok: @@ -102,6 +104,66 @@ def fetch_existing_client_id(token: str, client_id: str) -> Optional[str]: return None +def fetch_existing_group_id(token: str, group_name: str, + parent_id: str | None = None, verbose: bool = True) -> Optional[str]: + endpoint = f"{KC_ADMIN_API_GROUP_ENDPOINT}/{parent_id}/children" if parent_id else KC_ADMIN_API_GROUP_ENDPOINT + existing_groups_res = keycloak_req(endpoint, bearer_token=token) + + if not existing_groups_res.ok: + err(f" Failed to fetch group id associated with name: {group_name}") + exit(1) + + exising_groups = existing_groups_res.json() + for group in exising_groups: + if group["name"] == group_name: + if verbose: + warn(f" Found existing group: {group_name}; using that.") + return group["id"] + + return None + + +def create_client_role_or_exit(token: str, client_id: str, role_name: str) -> None: + client_roles_endpoint = f"{KC_ADMIN_API_CLIENTS_ENDPOINT}/{client_id}/roles" + + # Check if client role exists + existing_role_res = keycloak_req(f"{client_roles_endpoint}/{role_name}", bearer_token=token,) + if existing_role_res.ok: + warn(f" Found existing role: {role_name}; using that.") + return + + # Create client role if needed + res = keycloak_req(client_roles_endpoint, bearer_token=token, method="post", json={ + "clientRole": True, + "name": role_name, + }) + + if not res.ok: + err(f" Failed to create {client_id} client role : {role_name}; {res.status_code} {res.json()}") + exit(1) + + warn(f" Created role: {role_name}.") + + +def create_group_or_exit(token: str, group_representation: dict, parent_id: str = None) -> Optional[str]: + group_endpoint = f"{KC_ADMIN_API_GROUP_ENDPOINT}/{parent_id}/children" if parent_id else KC_ADMIN_API_GROUP_ENDPOINT + group_type = f"sub-group of {parent_id}" if parent_id else "group" + + if existing_group_id := fetch_existing_group_id(token, group_representation["name"], parent_id): + return existing_group_id + + res = keycloak_req(f"{group_endpoint}", bearer_token=token, method="post", json=group_representation) + + # group creation doesn't return the ID of the newly created group, extra request is required to obtain it + created_group_id = fetch_existing_group_id(token, group_representation["name"], parent_id, verbose=False) + if not res.ok or not created_group_id: + err(f" Failed to create {group_type}: {group_representation}; {res.status_code}") + exit(1) + + warn(f" Created {group_type}: {group_representation['name']}") + return created_group_id + + def create_keycloak_client_or_exit( token: str, client_id: str, @@ -113,7 +175,7 @@ def create_keycloak_client_or_exit( access_token_lifespan: int, use_refresh_tokens: bool, ) -> None: - res = keycloak_req(KC_CLIENTS_ENDPOINT, bearer_token=token, method="post", json={ + res = keycloak_req(KC_ADMIN_API_CLIENTS_ENDPOINT, bearer_token=token, method="post", json={ "clientId": client_id, "enabled": True, "protocol": "openid-connect", @@ -157,7 +219,7 @@ def create_keycloak_client_or_exit( def get_keycloak_client_secret(client_id: str, token: str): - return keycloak_req(f"{KC_CLIENTS_ENDPOINT}/{client_id}/client-secret", bearer_token=token) + return keycloak_req(f"{KC_ADMIN_API_CLIENTS_ENDPOINT}/{client_id}/client-secret", bearer_token=token) def init_auth(docker_client: docker.DockerClient): @@ -265,6 +327,25 @@ def create_grafana_client_if_needed(token: str) -> None: attrs=["bold"], ) + def create_grafana_roles_if_needed(token: str) -> None: + grafana_client_kc_id: Optional[str] = fetch_existing_client_id(token, GRAFANA_CLIENT_ID) + + for role_name in ["admin", "editor", "viewer"]: + create_client_role_or_exit(token, grafana_client_kc_id, role_name) + + def create_grafana_groups_if_needed(token: str) -> None: + parent_group = {"name": "grafana"} + parent_group_id = create_group_or_exit(token, parent_group) + + sub_groups = [ + {"name": "admin"}, + {"name": "editor"}, + {"name": "viewer"} + ] + + for subgroup in sub_groups: + create_group_or_exit(token, subgroup, parent_id=parent_group_id) + # noinspection PyUnusedLocal def create_cbioportal_client_if_needed(token: str) -> None: cbio_client_kc_id: Optional[str] = fetch_existing_client_id(token, CBIOPORTAL_CLIENT_ID) @@ -418,6 +499,10 @@ def success(): if c.BENTO_FEATURE_MONITORING.enabled: info(f" Creating Grafana client: {GRAFANA_CLIENT_ID}") create_grafana_client_if_needed(access_token) + create_grafana_roles_if_needed(access_token) + create_grafana_groups_if_needed(access_token) + # TODO: group client-role mappings + # /admin/realms/{realm}/groups/{group-id}/role-mappings/clients/{client} success() info(f" Creating user: {AUTH_TEST_USER}")