Skip to content

Commit

Permalink
feat: users support admin promotion
Browse files Browse the repository at this point in the history
Changes:
- use same shadow format of rpcd
- allow to make a local user an admin by copying the record to the
  rpcd database
  • Loading branch information
gsanchietti committed Dec 11, 2023
1 parent 861c139 commit 6ca20ad
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 12 deletions.
2 changes: 1 addition & 1 deletion builder/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ buildah run ${container} /bin/bash -c "apt-get -y install --no-install-recommend
apt-get clean"

echo "Install packages with pip"
buildah run ${container} /bin/bash -c "pip install pytest==7.1.2 pyuci pytest-mock"
buildah run ${container} /bin/bash -c "pip install pytest==7.1.2 pyuci pytest-mock passlib"

echo "Setup image"
buildah config --workingdir /root ${container}
Expand Down
93 changes: 83 additions & 10 deletions src/nethsec/users/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import ipaddress
import os
import subprocess
import secrets
from nethsec import utils
from urllib.parse import urlparse
from passlib import hash

def get_database_type(uci, database):
'''
Expand Down Expand Up @@ -441,7 +443,14 @@ def edit_local_user(uci, name, password="", description="", database="main", ext
if get_database_type(uci, database) != "local":
raise utils.ValidationError('database', 'db_not_local', database)
if password:
uci.set('users', user["id"], 'password', shadow_password(password))
shadow = shadow_password(password)
uci.set('users', user["id"], 'password', shadow)
# update password inside the rpcd configuration database
if is_admin(uci, name):
for l in utils.get_all_by_type(uci, 'rpcd', 'login'):
if uci.get('rpcd', l, 'username', default='') == name:
uci.set('rpcd', l, 'password', shadow)
uci.save("rpcd")
uci.set('users', user["id"], 'description', description)
for key in uci.get_all('users', user["id"]):
if not key in ["name", "description", "password", "database"]:
Expand Down Expand Up @@ -479,6 +488,8 @@ def delete_local_user(uci, name, database="main"):
uci.set('users', g, 'user', gusers) # the user field may be deleted by uci if the list is empty
uci.delete('users', user["id"])
uci.save("users")
if is_admin(uci, name):
remove_admin(uci, name)
return True

def add_local_group(uci, name, users=[], description="", database="main"):
Expand Down Expand Up @@ -667,26 +678,22 @@ def shadow_password(password):
- password -- Clear text password
Returns:
- A shadow password, format: $6$salt$hash
- A shadow password in crypt(3) format, as generate by mkpasswd. Format: $6$salt$hash
'''
salt = base64.b64encode(os.urandom(12))
phash = base64.b64encode(hashlib.pbkdf2_hmac('sha512', bytes(password, 'UTF-8'), salt, 200000))
return f"$6${salt.decode('UTF-8')}${phash.decode('UTF-8')}"
return hash.sha512_crypt.using(salt=secrets.token_hex(8), rounds=5000).hash(password)

def check_password(password, shadow):
'''
Check a shadow password
Arguments:
- password -- Clear text password
- shadow -- Shadow password
- shadow -- Shadow password in crypt(3) format
Returns:
- True if password matches, False otherwise
'''
(_, alg, salt, curhash) = shadow.split("$")
phash = base64.b64encode(hashlib.pbkdf2_hmac('sha512', bytes(password, 'UTF-8'), salt.encode("UTF-8"), 200000))
return phash.decode("UTF-8") == curhash
return hash.sha512_crypt.verify(password, shadow)

def ldif2users(ldif_data, user_attr="uid", user_cn="cn"):
'''
Expand Down Expand Up @@ -744,4 +751,70 @@ def list_remote_users(uri, user_dn, user_attr, user_cn, start_tls=False, tls_req
p = subprocess.run(cmd, env=env, capture_output=True, text=True)
return ldif2users(p.stdout, user_attr, user_cn)
except subprocess.CalledProcessError as e:
return []
return []

def set_admin(uci, username, database):
'''
Set a user as admin by creating a login record in rpcd configuration database
Arguments:
- uci -- EUci pointer
- username -- User name
- database -- Database identifier
Returns:
- The user identifier inside the rpcd configuration database
'''
user = get_user_by_name(uci, username, database)
if not user:
raise utils.ValidationError('name', 'user_not_found', username)
logins = utils.get_all_by_type(uci, 'rpcd', 'login')
for l in logins:
if logins[l].get("username") == username:
raise utils.ValidationError('name', 'admin_user_already_exists', username)
id = utils.get_random_id()
uci.set('rpcd', id, 'login')
uci.set('rpcd', id, 'username', username)
uci.set('rpcd', id, 'password', user["password"])
uci.set('rpcd', id, 'read', '*')
uci.set('rpcd', id, 'write', '*')
uci.save("rpcd")
return id

def remove_admin(uci, username):
'''
Remove a user from rpcd configuration database
Arguments:
- uci -- EUci pointer
- username -- User name
Returns:
- True if successful
'''
logins = utils.get_all_by_type(uci, 'rpcd', 'login')
for l in logins:
if logins[l].get("username") == username:
uci.delete('rpcd', l)
uci.save("rpcd")
return True
raise utils.ValidationError('name', 'admin_user_not_found', username)

def is_admin(uci, username):
'''
Check if a user is admin
Arguments:
- uci -- EUci pointer
- username -- User name
Returns:
- True if user is admin, False otherwise
'''
logins = utils.get_all_by_type(uci, 'rpcd', 'login')
if logins is None:
return False
for l in logins:
if logins[l].get("username") == username:
return True
return False
37 changes: 36 additions & 1 deletion tests/test_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
config local 'second'
option description 'Secondary local database'
config local 'third'
option description 'Third local database'
config ldap 'ldap1'
option description 'Remote OpenLDAP server'
option uri 'ldaps://192.168.100.234'
Expand Down Expand Up @@ -99,12 +102,16 @@
option description 'Goofy Workstation'
"""

rpcd_db = ""

def _setup_db(tmp_path):
# setup fake db
with tmp_path.joinpath('users').open('w') as fp:
fp.write(users_db)
with tmp_path.joinpath('dhcp').open('w') as fp:
fp.write(dhcp_db)
with tmp_path.joinpath('rpcd').open('w') as fp:
fp.write(rpcd_db)

return EUci(confdir=tmp_path.as_posix())

Expand Down Expand Up @@ -421,4 +428,32 @@ def test_ldif2users():
# numEntries: 3
"""
assert users.ldif2users(ldif_data) == [{"name": "admin", "description": "admin"},{"name":"pluto", "description": "Pluto Rossi"}]


def test_set_admin_user(tmp_path):
u = _setup_db(tmp_path)
users.add_local_user(u, "admin2", password="nethesis", description="mydesc", database="third")
admin_id_rpcd = users.set_admin(u, "admin2", "third")
for user in utils.get_all_by_type(u, "rpcd", "login"):
if user == admin_id_rpcd:
assert u.get('rpcd', admin_id_rpcd, "username") == "admin2"
assert users.check_password("nethesis", u.get('rpcd', admin_id_rpcd, "password"))

def test_is_admin(tmp_path):
assert users.is_admin(_setup_db(tmp_path), "admin2")

def test_change_admin_password(tmp_path):
u = _setup_db(tmp_path)
local_id = users.edit_local_user(u, "admin2", password="nethesis", description="mydesc", database="third")
logins = utils.get_all_by_type(u, "rpcd", "login")
for l in logins:
if logins[l].get("username") == "admin2":
assert logins[l].get("password") == u.get("users", local_id, "password")

def test_remove_admin_user(tmp_path):
u = _setup_db(tmp_path)
users.remove_admin(u, "admin2")
found = False
for user in utils.get_all_by_type(u, "rpcd", "login"):
if user.get("username") == "admin2":
found = True
assert not found

0 comments on commit 6ca20ad

Please sign in to comment.