Skip to content

Commit

Permalink
feat: users add ldif parser and ldap list
Browse files Browse the repository at this point in the history
  • Loading branch information
gsanchietti committed Dec 7, 2023
1 parent 3196662 commit 238f10c
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 9 deletions.
92 changes: 83 additions & 9 deletions src/nethsec/users/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import hashlib
import ipaddress
import os
import subprocess
from nethsec import utils
from urllib.parse import urlparse

Expand Down Expand Up @@ -190,21 +191,31 @@ def list_users(uci, database='main'):
- A list of user objects
'''
users = []
if get_database_type(uci, database) == "local":
try:
dbconf = uci.get_all('users', database)
except:
raise utils.ValidationError('database', 'db_not_found', database)
dbtype = get_database_type(uci, database)
if dbtype == "local":
for u in utils.get_all_by_type(uci, 'users', 'user'):
if uci.get('users', u, 'database', default='') != database:
continue
if uci.get('users', u, default='') == "user":
username = uci.get('users', u, 'name', default='')
user = get_user_by_name(uci, username, database)
users.append(get_user_by_name(uci, username, database))
else:
# FIXME: list also remote users + merge with local copy
elif dbtype == "ldap":

users = list_remote_users(dbconf.get('uri'), dbconf.get('user_dn'), dbconf.get('user_attr'), dbconf.get('user_cn'), dbconf.get('start_tls'), dbconf.get('tls_reqcert'))
for u in users:
u['local'] = False
u['database'] = database
u['id'] = u['name']
pass

return users

def add_ldap_database(uci, name, uri, schema, base_dn, user_dn, user_attr, start_tls=False, tls_reqcert='never', description=""):
def add_ldap_database(uci, name, uri, schema, base_dn, user_dn, user_attr, user_cn, start_tls=False, tls_reqcert='never', description=""):
'''
Add a new LDAP database
Expand All @@ -216,6 +227,7 @@ def add_ldap_database(uci, name, uri, schema, base_dn, user_dn, user_attr, start
- base_dn -- LDAP base DN
- user_dn -- LDAP user DN
- user_attr -- LDAP user attribute
- user_cn -- LDAP user common name
- start_tls -- Use TLS (default: False)
- tls_reqcert -- TLS certificate validation (default: never)
- description -- Database description (default: "")
Expand All @@ -231,13 +243,14 @@ def add_ldap_database(uci, name, uri, schema, base_dn, user_dn, user_attr, start
uci.set('users', name, 'base_dn', base_dn)
uci.set('users', name, 'user_dn', user_dn)
uci.set('users', name, 'user_attr', user_attr)
uci.set('users', name, 'user_cn', user_cn)
uci.set('users', name, 'start_tls', '1' if start_tls else '0')
uci.set('users', name, 'tls_reqcert', tls_reqcert)
uci.set('users', name, 'description', description)
uci.save("users")
return ldap

def edit_ldap_database(uci, name, uri, schema, base_dn, user_dn, user_attr, start_tls=False, tls_reqcert='never', description=""):
def edit_ldap_database(uci, name, uri, schema, base_dn, user_dn, user_attr, user_cn, start_tls=False, tls_reqcert='never', description=""):
'''
Edit an existing LDAP database
Expand All @@ -249,6 +262,7 @@ def edit_ldap_database(uci, name, uri, schema, base_dn, user_dn, user_attr, star
- base_dn -- LDAP base DN
- user_dn -- LDAP user DN
- user_attr -- LDAP user attribute
- user_cn -- LDAP user common name
- start_tls -- Use TLS (default: False)
- tls_reqcert -- TLS certificate validation (default: never)
- description -- Database description (default: "")
Expand All @@ -265,6 +279,7 @@ def edit_ldap_database(uci, name, uri, schema, base_dn, user_dn, user_attr, star
uci.set('users', name, 'base_dn', base_dn)
uci.set('users', name, 'user_dn', user_dn)
uci.set('users', name, 'user_attr', user_attr)
uci.set('users', name, 'user_cn', user_cn)
uci.set('users', name, 'start_tls', '1' if start_tls else '0')
uci.set('users', name, 'tls_reqcert', tls_reqcert)
uci.set('users', name, 'description', description)
Expand Down Expand Up @@ -455,23 +470,24 @@ def get_ldap_defaults(uri, schema):
- base_dn: LDAP base DN
- user_dn: LDAP user DN
- user_attr: LDAP user attribute
- user_cn: LDAP user common name
'''
parsed = urlparse(uri)
try:
ipaddress.ip_address(parsed.hostname)
# it's an IP address, just suggest example defaults
if schema == "rfc2307":
return {"base_dn": "dc=directory,dc=nh", "user_dn": "ou=People,dc=directory,dc=nh", "user_attr": "uid"}
return {"base_dn": "dc=directory,dc=nh", "user_dn": "ou=People,dc=directory,dc=nh", "user_attr": "uid", "user_cn": "cn"}
elif schema == "ad":
return {"base_dn": "dc=directory,dc=nh", "user_dn": "cn=Users,dc=directory,dc=nh", "user_attr": "cn"}
return {"base_dn": "dc=directory,dc=nh", "user_dn": "cn=Users,dc=directory,dc=nh", "user_attr": "cn", "user_cn": "cn"}
except:
# it's a hostname, suggest defaults based on domain
parts = parsed.hostname.split(".")[1:]
base_dn = ','.join(['dc=' + part for part in parts])
if schema == "rfc2307":
return {"base_dn": base_dn, "user_dn": "ou=People," + base_dn, "user_attr": "uid"}
return {"base_dn": base_dn, "user_dn": "ou=People," + base_dn, "user_attr": "uid", "user_cn": "cn"}
elif schema == "ad":
return {"base_dn": base_dn, "user_dn": "cn=Users," + base_dn, "user_attr": "cn"}
return {"base_dn": base_dn, "user_dn": "cn=Users," + base_dn, "user_attr": "cn", "user_cn": "cn"}

def shadow_password(password):
'''
Expand Down Expand Up @@ -501,3 +517,61 @@ def check_password(password, shadow):
(_, alg, salt, curhash) = shadow.split("$")
phash = base64.b64encode(hashlib.pbkdf2_hmac('sha512', bytes(password, 'UTF-8'), salt.encode("UTF-8"), 200000))
return phash.decode("UTF-8") == curhash

def ldif2users(ldif_data, user_attr="uid", user_cn="cn"):
'''
Parse an LDIF file and return a list of users
Arguments:
- ldif_data -- LDIF data
- user_attr -- User attribute (default: uid)
Returns:
- A list of users
'''
users = []
user = None
dn = None
# Parse only the dn lines. Example: dn: uid=user1,ou=People,dc=example,dc=com
for line in ldif_data.split('\n'):
if line.startswith('#'):
if user:
users.append(user)
user = {}
dn = None
continue
if line.startswith('dn:'):
dn = line[3:].strip()
if dn.startswith(f'{user_attr}='):
user["name"] = dn.split(",")[0].removeprefix(f'{user_attr}=')
if line.startswith(f'{user_cn}:') and dn:
user["description"] = line[len(user_cn)+1:].strip()
return users

def list_remote_users(uri, user_dn, user_attr, user_cn, start_tls=False, tls_reqcert="never"):
'''
Test LDAP connection
Arguments:
- uri -- LDAP URI
- user_dn -- LDAP user DN
- user_attr -- LDAP user attribute
- user_cn -- LDAP user common name
- start_tls -- Use TLS (default: False)
- tls_reqcert -- TLS certificate validation (default: never)
Returns:
- A list of users, each one containing:
- name: user name
- description: user description
'''
env = os.environ.copy()
env['LDAPTLS_REQCERT'] = tls_reqcert
try:
cmd = ["ldapsearch", "-x", "-H", uri, "-b", user_dn, "(objectClass=*)", user_cn]
if start_tls:
cmd.append("-ZZ")
p = subprocess.run(cmd, env=env, capture_output=True, text=True)
return ldif2users(p.stdout, user_attr, user_cn)
except subprocess.CalledProcessError as e:
return []
31 changes: 31 additions & 0 deletions tests/test_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,34 @@ def test_get_ldap_defaults():
def test_shadow_password():
shadow = users.shadow_password("test")
assert(users.check_password("test", shadow))

def test_ldif2users():
ldif_data = """
# extended LDIF
#
# LDAPv3
# base <ou=People,dc=directory,dc=nh> with scope subtree
# filter: (objectClass=*)
# requesting: dn
#
# People, directory.nh
dn: ou=People,dc=directory,dc=nh
# admin, People, directory.nh
dn: uid=admin,ou=People,dc=directory,dc=nh
cn: admin
# pluto, People, directory.nh
dn: uid=pluto,ou=People,dc=directory,dc=nh
cn: Pluto Rossi
# search result
search: 2
result: 0 Success
# numResponses: 4
# numEntries: 3
"""
assert users.ldif2users(ldif_data) == [{"name": "admin", "description": "admin"},{"name":"pluto", "description": "Pluto Rossi"}]

0 comments on commit 238f10c

Please sign in to comment.