diff --git a/data/templates/ssh/sshd_config.j2 b/data/templates/ssh/sshd_config.j2
index 7e44efae85..d6e31b0f4e 100644
--- a/data/templates/ssh/sshd_config.j2
+++ b/data/templates/ssh/sshd_config.j2
@@ -114,3 +114,9 @@ RekeyLimit {{ rekey.data }}M {{ rekey.time + 'M' if rekey.time is vyos_defined }
{% if trusted_user_ca_key is vyos_defined %}
TrustedUserCAKeys /etc/ssh/trusted_user_ca_key
{% endif %}
+
+{% if trusted_user_ca_key is vyos_defined and trusted_user_ca_key.bind_user is vyos_defined %}
+AuthorizedPrincipalsFile /etc/ssh/authorized_principals/%u
+{% elif trusted_user_ca_key is vyos_defined %}
+AuthorizedPrincipalsFile none
+{% endif %}
diff --git a/interface-definitions/service_ssh.xml.in b/interface-definitions/service_ssh.xml.in
index 14d358c78b..2ab9db48b5 100644
--- a/interface-definitions/service_ssh.xml.in
+++ b/interface-definitions/service_ssh.xml.in
@@ -281,6 +281,25 @@
#include
+
+
+ user-name
+
+ #include
+
+
+
+
+
+ principal-name
+
+ #include
+
+
+
+
+
+
#include
diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py
index fa08a5b32e..db83f14c35 100755
--- a/smoketest/scripts/cli/test_service_ssh.py
+++ b/smoketest/scripts/cli/test_service_ssh.py
@@ -39,6 +39,7 @@
key_dsa = '/etc/ssh/ssh_host_dsa_key'
key_ed25519 = '/etc/ssh/ssh_host_ed25519_key'
trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key'
+authorized_principals_dir = '/etc/ssh/authorized_principals'
def get_config_value(key):
@@ -380,18 +381,104 @@ def test_ssh_trusted_user_ca_key(self):
trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys')
self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config)
+ authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile')
+ self.assertIn('none', authorize_principals_file_config)
with open(trusted_user_ca_key, 'r') as file:
ca_key_contents = file.read()
self.assertIn(ca_root_cert_data, ca_key_contents)
- self.cli_delete(base_path + ['trusted-user-ca-key'])
+ self.cli_delete(
+ base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name]
+ )
self.cli_delete(['pki', 'ca', ca_cert_name])
self.cli_commit()
# Verify the CA key is removed
trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys')
self.assertNotIn(trusted_user_ca_key, trusted_user_ca_key_config)
+ authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile')
+ self.assertNotIn('none', authorize_principals_file_config)
+
+ def test_ssh_trusted_user_ca_key_and_bind_user_with_principal(self):
+ ca_cert_name = 'test_ca'
+ bind_user = 'test_user'
+ principals = ['test_principal_alice', 'test_principal_bob']
+ test_user = 'ssh_test'
+ test_pass = 'v2i57DZs8idUwMN3VC92'
+
+ # Create a test user
+ self.cli_set(
+ [
+ 'system',
+ 'login',
+ 'user',
+ test_user,
+ 'authentication',
+ 'plaintext-password',
+ test_pass,
+ ]
+ )
+
+ # set pki ca certificate
+ # set service ssh trusted-user-ca-key ca-certificate
+ # set service ssh trusted-user-ca-key bind-user principal
+ self.cli_set(
+ pki_path
+ + [
+ 'ca',
+ ca_cert_name,
+ 'certificate',
+ ca_root_cert_data.replace('\n', ''),
+ ]
+ )
+ self.cli_set(
+ base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name]
+ )
+ for principal in principals:
+ self.cli_set(
+ base_path
+ + [
+ 'trusted-user-ca-key',
+ 'bind-user',
+ bind_user,
+ 'principal',
+ principal,
+ ]
+ )
+ self.cli_commit()
+
+ trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys')
+ self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config)
+ authorized_principals_file = f'{authorized_principals_dir}/{bind_user}'
+ self.assertTrue(os.path.exists(authorized_principals_file))
+
+ with open(authorized_principals_file, 'r') as file:
+ authorized_principals = file.read()
+ for principal in principals:
+ self.assertIn(principal, authorized_principals)
+
+ for principal in principals:
+ self.cli_delete(
+ base_path
+ + [
+ 'trusted-user-ca-key',
+ 'bind-user',
+ bind_user,
+ 'principal',
+ principal,
+ ]
+ )
+
+ self.cli_delete(
+ base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name]
+ )
+ self.cli_delete(['pki', 'ca', ca_cert_name])
+ self.cli_delete(['system', 'login', 'user', test_user])
+ self.cli_commit()
+
+ # Verify the authorized principals file is removed
+ self.assertFalse(os.path.exists(authorized_principals_file))
if __name__ == '__main__':
diff --git a/src/conf_mode/service_ssh.py b/src/conf_mode/service_ssh.py
index 759f87bb20..5e6a6f771e 100755
--- a/src/conf_mode/service_ssh.py
+++ b/src/conf_mode/service_ssh.py
@@ -45,6 +45,77 @@
key_ed25519 = '/etc/ssh/ssh_host_ed25519_key'
trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key'
+authorized_principals = '/etc/ssh/authorized_principals'
+
+
+def cleanup_authorized_principals_dir(valid_users: list[str]):
+ if not os.path.isdir(authorized_principals):
+ return
+
+ # Check the files (user name) under the directory and delete unnecessary ones.
+ for filename in os.listdir(authorized_principals):
+ file_path = os.path.join(authorized_principals, filename)
+ if os.path.isfile(file_path) and filename not in valid_users:
+ os.remove(file_path)
+
+ # If the directory is empty, delete it too
+ if not os.listdir(authorized_principals):
+ os.rmdir(authorized_principals)
+
+
+def handle_trusted_user_ca_key(ssh: dict):
+ if 'trusted_user_ca_key' not in ssh:
+ if os.path.exists(trusted_user_ca_key):
+ os.unlink(trusted_user_ca_key)
+
+ # remove authorized_principals directory if it exists
+ cleanup_authorized_principals_dir([])
+ return
+
+ # trusted_user_ca_key is present
+ ca_key_name = ssh['trusted_user_ca_key']['ca_certificate']
+ pki_ca_cert = ssh['pki']['ca'][ca_key_name]
+
+ loaded_ca_cert = load_certificate(pki_ca_cert['certificate'])
+ loaded_ca_certs = {
+ load_certificate(c['certificate'])
+ for c in ssh['pki']['ca'].values()
+ if 'certificate' in c
+ }
+
+ ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs)
+ write_file(
+ trusted_user_ca_key, '\n'.join(encode_certificate(c) for c in ca_full_chain)
+ )
+
+ if 'bind-user' not in ssh['trusted_user_ca_key']:
+ # remove authorized_principals directory if it exists
+ cleanup_authorized_principals_dir([])
+ return
+
+ # bind-user is present
+ configured_users = []
+ for bind_user, bind_user_config in ssh['trusted_user_ca_key']['bind-user'].items():
+ if bind_user not in ssh['login_users']:
+ raise ConfigError(f"User '{bind_user}' not found in system login users")
+
+ if 'principal' not in bind_user_config:
+ raise ConfigError(f"Principal not found for user '{bind_user}'")
+
+ principals = bind_user_config['principal']
+ if isinstance(principals, str):
+ principals = [principals]
+
+ if not os.path.isdir(authorized_principals):
+ os.makedirs(authorized_principals, exist_ok=True)
+
+ principal_file = os.path.join(authorized_principals, bind_user)
+ contents = '\n'.join(principals) + '\n'
+ write_file(principal_file, contents)
+ configured_users.append(bind_user)
+
+ # remove unnecessary files under authorized_principals directory
+ cleanup_authorized_principals_dir(configured_users)
def get_config(config=None):
@@ -59,7 +130,15 @@ def get_config(config=None):
ssh = conf.get_config_dict(
base, key_mangling=('-', '_'), get_first_key=True, with_pki=True
)
+ login_users_base = ['system', 'login', 'user']
+ login_users = conf.get_config_dict(
+ login_users_base,
+ key_mangling=('-', '_'),
+ no_tag_node_value_mangle=True,
+ get_first_key=True,
+ )
+ # create a list of all users, cli and users
tmp = is_node_changed(conf, base + ['vrf'])
if tmp:
ssh.update({'restart_required': {}})
@@ -71,6 +150,9 @@ def get_config(config=None):
# pass config file path - used in override template
ssh['config_file'] = config_file
+ # use for trusted ca
+ ssh['login_users'] = login_users
+
# Ignore default XML values if config doesn't exists
# Delete key from dict
if not conf.exists(base + ['dynamic-protection']):
@@ -119,23 +201,7 @@ def generate(ssh):
syslog(LOG_INFO, 'SSH ed25519 host key not found, generating new key!')
call(f'ssh-keygen -q -N "" -t ed25519 -f {key_ed25519}')
- if 'trusted_user_ca_key' in ssh:
- ca_key_name = ssh['trusted_user_ca_key']['ca_certificate']
- pki_ca_cert = ssh['pki']['ca'][ca_key_name]
-
- loaded_ca_cert = load_certificate(pki_ca_cert['certificate'])
- loaded_ca_certs = {
- load_certificate(c['certificate'])
- for c in ssh['pki']['ca'].values()
- if 'certificate' in c
- }
-
- ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs)
- write_file(
- trusted_user_ca_key, '\n'.join(encode_certificate(c) for c in ca_full_chain)
- )
- elif os.path.exists(trusted_user_ca_key):
- os.unlink(trusted_user_ca_key)
+ handle_trusted_user_ca_key(ssh)
render(config_file, 'ssh/sshd_config.j2', ssh)