diff --git a/virl2_client/models/auth_management.py b/virl2_client/models/auth_management.py index e0f3250..c60b9dc 100644 --- a/virl2_client/models/auth_management.py +++ b/virl2_client/models/auth_management.py @@ -35,6 +35,8 @@ class AuthManagement: _URL_TEMPLATES = { "config": "system/auth/config", "test": "system/auth/test", + "groups": "system/auth/groups", + "refresh": "system/auth/refresh", } def __init__(self, session: Client, auto_sync=True, auto_sync_interval=1.0): @@ -167,6 +169,29 @@ def update_settings(self, settings_dict: dict | None = None, **kwargs) -> None: self._session.put(url, json=settings) self.sync() + def get_ldap_groups(self, search_filter=None): + """ + Get CNs of groups available on the LDAP server, optionally filtered + by supplied filter. + + :param search_filter: An optional filter applied to the search. + :returns: A list of CNs of found groups. + """ + params = {"filter": search_filter} if search_filter else None + url = self._url_for("groups") + response = self._session.get(url, params=params) + return response.json() + + def refresh_ldap_groups(self): + """ + Refresh the members of LDAP groups. Removes any users from the group that are + not LDAP users or not a part of said group on LDAP, and adds any users that + are LDAP users and are a part of said group on LDAP. + """ + url = self._url_for("refresh") + response = self._session.put(url) + return response.json() + def test_auth(self, config: dict, username: str, password: str) -> dict: """ Test a set of credentials against the specified authentication configuration. @@ -185,6 +210,23 @@ def test_auth(self, config: dict, username: str, password: str) -> dict: response = self._session.post(url, json=body) return response.json() + def test_group(self, config: dict, group_name: str) -> dict: + """ + Test a group against the specified authentication configuration. + + :param config: A dictionary of authentication settings to test against + (including manager password). + :param username: The group name to test. + :returns: Results of the test. + """ + body = { + "auth-config": config, + "auth-data": {"group_name": group_name}, + } + url = self._url_for("test") + response = self._session.post(url, json=body) + return response.json() + def test_current_auth( self, manager_password: str, username: str, password: str ) -> dict: @@ -207,6 +249,25 @@ def test_current_auth( response = self._session.post(url, json=body) return response.json() + def test_current_group(self, manager_password: str, group_name: str) -> dict: + """ + Test a group against the currently applied authentication + configuration. + + :param manager_password: The manager password to allow testing. + :param username: The group name to test. + :returns: Results of the test. + """ + current = self.get_settings() + current["manager_password"] = manager_password + body = { + "auth-config": current, + "auth-data": {"group_name": group_name}, + } + url = self._url_for("test") + response = self._session.post(url, json=body) + return response.json() + class AuthMethodManager: """ diff --git a/virl2_client/models/lab.py b/virl2_client/models/lab.py index 2d5d614..c73e05f 100644 --- a/virl2_client/models/lab.py +++ b/virl2_client/models/lab.py @@ -1807,15 +1807,13 @@ def _find_interface_in_topology(interface_id: str, topology: dict) -> dict: :returns: The interface with the specified ID. :raises InterfaceNotFound: If the interface cannot be found in the topology. """ - if "interfaces" in topology: - for interface in topology["interfaces"]: + interface_containers: list = ( + [topology] if "interfaces" in topology else topology["nodes"] + ) + for container in interface_containers: + for interface in container.get("interfaces", []): if interface["id"] == interface_id: return interface - else: - for node in topology["nodes"]: - for interface in node["interfaces"]: - if interface["id"] == interface_id: - return interface # if it cannot be found, it is an internal structure error raise InterfaceNotFound