From 04b1fa1ae081a5e1595d467e2227859d3c7f83c0 Mon Sep 17 00:00:00 2001 From: Giacomo Sanchietti Date: Mon, 11 Dec 2023 16:26:44 +0100 Subject: [PATCH] feat: users, support admin promotion Changes: - use same shadow format of rpcd - allow to make a local user an admin by copying the record to the rpcd database --- builder/build.sh | 2 +- src/nethsec/users/__init__.py | 100 ++++++++++++++++++++++++++++++---- tests/test_users.py | 44 ++++++++++++++- 3 files changed, 133 insertions(+), 13 deletions(-) diff --git a/builder/build.sh b/builder/build.sh index 5e918cae..508ec0b5 100755 --- a/builder/build.sh +++ b/builder/build.sh @@ -49,7 +49,7 @@ buildah run ${container} /bin/bash -c "apt-get -y install --no-install-recommend apt-get clean" echo "Install packages with pip" -buildah run ${container} /bin/bash -c "pip install pytest==7.1.2 pyuci pytest-mock" +buildah run ${container} /bin/bash -c "pip install pytest==7.1.2 pyuci pytest-mock passlib" echo "Setup image" buildah config --workingdir /root ${container} diff --git a/src/nethsec/users/__init__.py b/src/nethsec/users/__init__.py index 1c4fe65c..8d606480 100644 --- a/src/nethsec/users/__init__.py +++ b/src/nethsec/users/__init__.py @@ -14,8 +14,10 @@ import ipaddress import os import subprocess +import secrets from nethsec import utils from urllib.parse import urlparse +from passlib import hash def get_database_type(uci, database): ''' @@ -53,6 +55,10 @@ def get_user_by_name(uci, name, database="main"): # convert tuple to list if type(user[opt]) is tuple: user[opt] = list(user[opt]) + if user["local"]: + user["admin"] = is_admin(uci, name) + else: + user["admin"] = False user['id'] = u return user return None @@ -202,12 +208,13 @@ def list_users(uci, database='main'): if uci.get('users', u, default='') == "user": username = uci.get('users', u, 'name', default='') user = get_user_by_name(uci, username, database) - users.append(get_user_by_name(uci, username, database)) + users.append(user) elif dbtype == "ldap": users = list_remote_users(dbconf.get('uri'), dbconf.get('user_dn'), dbconf.get('user_attr'), dbconf.get('user_cn'), dbconf.get('start_tls'), dbconf.get('tls_reqcert')) for u in users: u['local'] = False + u['admin'] = False u['database'] = database u['id'] = u['name'] pass @@ -441,7 +448,14 @@ def edit_local_user(uci, name, password="", description="", database="main", ext if get_database_type(uci, database) != "local": raise utils.ValidationError('database', 'db_not_local', database) if password: - uci.set('users', user["id"], 'password', shadow_password(password)) + shadow = shadow_password(password) + uci.set('users', user["id"], 'password', shadow) + # update password inside the rpcd configuration database + if is_admin(uci, name): + for l in utils.get_all_by_type(uci, 'rpcd', 'login'): + if uci.get('rpcd', l, 'username', default='') == name: + uci.set('rpcd', l, 'password', shadow) + uci.save("rpcd") uci.set('users', user["id"], 'description', description) for key in uci.get_all('users', user["id"]): if not key in ["name", "description", "password", "database"]: @@ -479,6 +493,8 @@ def delete_local_user(uci, name, database="main"): uci.set('users', g, 'user', gusers) # the user field may be deleted by uci if the list is empty uci.delete('users', user["id"]) uci.save("users") + if is_admin(uci, name): + remove_admin(uci, name) return True def add_local_group(uci, name, users=[], description="", database="main"): @@ -667,11 +683,9 @@ def shadow_password(password): - password -- Clear text password Returns: - - A shadow password, format: $6$salt$hash + - A shadow password in crypt(3) format, as generate by mkpasswd. Format: $6$salt$hash ''' - salt = base64.b64encode(os.urandom(12)) - phash = base64.b64encode(hashlib.pbkdf2_hmac('sha512', bytes(password, 'UTF-8'), salt, 200000)) - return f"$6${salt.decode('UTF-8')}${phash.decode('UTF-8')}" + return hash.sha512_crypt.using(salt=secrets.token_hex(8), rounds=5000).hash(password) def check_password(password, shadow): ''' @@ -679,14 +693,12 @@ def check_password(password, shadow): Arguments: - password -- Clear text password - - shadow -- Shadow password + - shadow -- Shadow password in crypt(3) format Returns: - True if password matches, False otherwise ''' - (_, alg, salt, curhash) = shadow.split("$") - phash = base64.b64encode(hashlib.pbkdf2_hmac('sha512', bytes(password, 'UTF-8'), salt.encode("UTF-8"), 200000)) - return phash.decode("UTF-8") == curhash + return hash.sha512_crypt.verify(password, shadow) def ldif2users(ldif_data, user_attr="uid", user_cn="cn"): ''' @@ -744,4 +756,70 @@ def list_remote_users(uri, user_dn, user_attr, user_cn, start_tls=False, tls_req p = subprocess.run(cmd, env=env, capture_output=True, text=True) return ldif2users(p.stdout, user_attr, user_cn) except subprocess.CalledProcessError as e: - return [] \ No newline at end of file + return [] + +def set_admin(uci, username, database): + ''' + Set a user as admin by creating a login record in rpcd configuration database + + Arguments: + - uci -- EUci pointer + - username -- User name + - database -- Database identifier + + Returns: + - The user identifier inside the rpcd configuration database + ''' + user = get_user_by_name(uci, username, database) + if not user: + raise utils.ValidationError('name', 'user_not_found', username) + logins = utils.get_all_by_type(uci, 'rpcd', 'login') + for l in logins: + if logins[l].get("username") == username: + raise utils.ValidationError('name', 'admin_user_already_exists', username) + id = utils.get_random_id() + uci.set('rpcd', id, 'login') + uci.set('rpcd', id, 'username', username) + uci.set('rpcd', id, 'password', user["password"]) + uci.set('rpcd', id, 'read', '*') + uci.set('rpcd', id, 'write', '*') + uci.save("rpcd") + return id + +def remove_admin(uci, username): + ''' + Remove a user from rpcd configuration database + + Arguments: + - uci -- EUci pointer + - username -- User name + + Returns: + - True if successful + ''' + logins = utils.get_all_by_type(uci, 'rpcd', 'login') + for l in logins: + if logins[l].get("username") == username: + uci.delete('rpcd', l) + uci.save("rpcd") + return True + raise utils.ValidationError('name', 'admin_user_not_found', username) + +def is_admin(uci, username): + ''' + Check if a user is admin + + Arguments: + - uci -- EUci pointer + - username -- User name + + Returns: + - True if user is admin, False otherwise + ''' + logins = utils.get_all_by_type(uci, 'rpcd', 'login') + if logins is None: + return False + for l in logins: + if logins[l].get("username") == username: + return True + return False \ No newline at end of file diff --git a/tests/test_users.py b/tests/test_users.py index 153718d2..514a6661 100644 --- a/tests/test_users.py +++ b/tests/test_users.py @@ -11,6 +11,9 @@ config local 'second' option description 'Secondary local database' +config local 'third' + option description 'Third local database' + config ldap 'ldap1' option description 'Remote OpenLDAP server' option uri 'ldaps://192.168.100.234' @@ -99,12 +102,16 @@ option description 'Goofy Workstation' """ +rpcd_db = "" + def _setup_db(tmp_path): # setup fake db with tmp_path.joinpath('users').open('w') as fp: fp.write(users_db) with tmp_path.joinpath('dhcp').open('w') as fp: fp.write(dhcp_db) + with tmp_path.joinpath('rpcd').open('w') as fp: + fp.write(rpcd_db) return EUci(confdir=tmp_path.as_posix()) @@ -168,6 +175,7 @@ def test_get_user_by_name(tmp_path): "host": ["ns_goofy_pc"], "openvpn_ipaddr": "10.9.9.38", "local": True, + "admin": False, "id": "u1", }) @@ -176,6 +184,7 @@ def test_get_user_by_name(tmp_path): "description": "Another Goofy", "database": "second", "local": True, + "admin": False, "id": "u3" }) @@ -288,6 +297,7 @@ def test_add_local_user(tmp_path): "description": "mydesc", "database": "second", "local": True, + "admin": False, "id": id, "openvpn_ipaddr": "1.2.3.4" } @@ -302,6 +312,7 @@ def test_edit_local_user(tmp_path): "description": "mydesc2", "database": "second", "local": True, + "admin": False, "id": id, "openvpn_ipaddr": "1.2.3.5", "openvpn_enabled": "1" @@ -421,4 +432,35 @@ def test_ldif2users(): # numEntries: 3 """ assert users.ldif2users(ldif_data) == [{"name": "admin", "description": "admin"},{"name":"pluto", "description": "Pluto Rossi"}] - \ No newline at end of file + +def test_set_admin_user(tmp_path): + u = _setup_db(tmp_path) + users.add_local_user(u, "admin2", password="nethesis", description="mydesc", database="third") + admin_id_rpcd = users.set_admin(u, "admin2", "third") + for user in utils.get_all_by_type(u, "rpcd", "login"): + if user == admin_id_rpcd: + assert u.get('rpcd', admin_id_rpcd, "username") == "admin2" + assert users.check_password("nethesis", u.get('rpcd', admin_id_rpcd, "password")) + +def test_is_admin(tmp_path): + u = _setup_db(tmp_path) + assert users.is_admin(u, "admin2") + user = users.get_user_by_name(u, "admin2", 'third') + assert user.get("admin") + +def test_change_admin_password(tmp_path): + u = _setup_db(tmp_path) + local_id = users.edit_local_user(u, "admin2", password="nethesis", description="mydesc", database="third") + logins = utils.get_all_by_type(u, "rpcd", "login") + for l in logins: + if logins[l].get("username") == "admin2": + assert logins[l].get("password") == u.get("users", local_id, "password") + +def test_remove_admin_user(tmp_path): + u = _setup_db(tmp_path) + users.remove_admin(u, "admin2") + found = False + for user in utils.get_all_by_type(u, "rpcd", "login"): + if user.get("username") == "admin2": + found = True + assert not found \ No newline at end of file