Skip to content

Commit

Permalink
[ldap] when syncing allow to disable users when user limit is reached
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanBldy committed Aug 24, 2023
1 parent d934864 commit ea82d43
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 98 deletions.
212 changes: 123 additions & 89 deletions zou/app/utils/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
IsUserLimitReachedException,
)

from zou.app import config

from zou.app import app


Expand Down Expand Up @@ -214,7 +216,12 @@ def search_ldap_users(conn, excluded_accounts):
"objectGUID",
]
else:
attributes += ["uid", "jpegPhoto", "uniqueIdentifier"]
attributes += [
"uid",
"jpegPhoto",
"uniqueIdentifier",
"organizationalStatus",
]
query = "(objectClass=person)"
if is_ad:
query = "(&(objectClass=person)(!(objectClass=computer)))"
Expand Down Expand Up @@ -244,22 +251,51 @@ def search_ldap_users(conn, excluded_accounts):
ldap_uid = clean_value(entry.uniqueIdentifier)
else:
ldap_uid = None
thumbnails = (
entry.thumbnailPhoto if is_ad else entry.jpegPhoto
).raw_values
if len(thumbnails) > 0 and len(thumbnails[0]) > 0:
thumbnail = thumbnails[0]
else:
thumbnail = None

desktop_login = clean_value(
entry.sAMAccountName if is_ad else entry.uid
)

emails = entry.mail.values
if len(emails) == 0:
emails = ["%s@%s" % (desktop_login, EMAIL_DOMAIN)]
else:

def sort_mails(email):
if email == desktop_login:
return -2
elif EMAIL_DOMAIN in email:
return -1
else:
return 0

emails = sorted(emails, key=sort_mails)

if is_ad:
active = bool(entry.userAccountControl.value & 2) is False
elif entry.organizationalStatus:
active = (
entry.organizationalStatus.value.lower() == "active"
)
else:
active = False

ldap_users.append(
{
"first_name": clean_value(entry.givenName or entry.cn),
"last_name": clean_value(entry.sn),
"email": entry.mail.values,
"desktop_login": clean_value(
entry.sAMAccountName if is_ad else entry.uid
),
"thumbnail": (
entry.thumbnailPhoto if is_ad else entry.jpegPhoto
).raw_values,
"active": (
bool(entry.userAccountControl.value & 2) is False
)
if is_ad
else True,
"email": emails[0],
"emails": emails,
"desktop_login": desktop_login,
"thumbnail": thumbnail,
"active": active,
"ldap_uid": ldap_uid,
}
)
Expand Down Expand Up @@ -292,106 +328,104 @@ def get_ldap_users():
return search_ldap_users(conn, excluded_accounts)

def update_person_list_with_ldap_users(users):
persons_updated = []
for user in users:
if persons_service.is_user_limit_reached():
raise IsUserLimitReachedException
first_name = user["first_name"]
last_name = user["last_name"]
desktop_login = user["desktop_login"]
email = user["email"]
ldap_uid = user["ldap_uid"]
active = user.get("active", True)
if "thumbnail" in user and len(user["thumbnail"]) > 0:
thumbnail = user["thumbnail"][0]
else:
thumbnail = ""

if len(email) == 0:
email_list = ["%s@%s" % (desktop_login, EMAIL_DOMAIN)]
else:

def sort_mails(email):
if email == desktop_login:
return -2
elif EMAIL_DOMAIN in email:
return -1
else:
return 0

email_list = sorted(email, key=sort_mails)
email = email_list[0]

persons_to_update = []
persons_to_create = []
for user in sorted(users, key=lambda k: k["active"]):
person = None
try:
person = persons_service.get_person_by_ldap_uid(ldap_uid)
person = persons_service.get_person_by_ldap_uid(
user["ldap_uid"]
)
except PersonNotFoundException:
try:
person = persons_service.get_person_by_desktop_login(
desktop_login
user["desktop_login"]
)
except PersonNotFoundException:
for mail in email_list:
for mail in user["emails"]:
try:
person = persons_service.get_person_by_email(mail)
email = mail
break
except PersonNotFoundException:
pass

if person is None and active is True:
if person is None:
persons_to_create.append(user)
else:
persons_to_update.append((person, user))

for person in (
Person.query.filter_by(is_generated_from_ldap=True, active=True)
.filter(
not_(Person.id.in_([p[0]["id"] for p in persons_to_update]))
)
.all()
):
persons_service.update_person(person.id, {"active": False})
print(
"User %s disabled (not found in LDAP)." % person.desktop_login
)

for person, user in persons_to_update:
try:
if (
not person["active"]
and user["active"]
and persons_service.is_user_limit_reached()
):
raise IsUserLimitReachedException
persons_service.update_person(
person["id"],
{
"email": user["email"],
"first_name": user["first_name"],
"last_name": user["last_name"],
"active": user["active"],
"is_generated_from_ldap": True,
"desktop_login": user["desktop_login"],
"ldap_uid": user["ldap_uid"],
},
)
print(f"User {user['desktop_login']} updated.")
except IsUserLimitReachedException:
print(
f"User {user['desktop_login']} update failed (limit reached, limit {config.USER_LIMIT})."
)
except BaseException:
print(
f"User {user['desktop_login']} update failed (email duplicated?)."
)

if user["thumbnail"] is not None:
save_thumbnail(person, user["thumbnail"])

person = None
for user in persons_to_create:
if user["active"]:
try:
if persons_service.is_user_limit_reached():
raise IsUserLimitReachedException
person = persons_service.create_person(
email,
user["email"],
"default".encode("utf-8"),
first_name,
last_name,
desktop_login=desktop_login,
user["first_name"],
user["last_name"],
desktop_login=user["desktop_login"],
is_generated_from_ldap=True,
ldap_uid=ldap_uid,
ldap_uid=user["ldap_uid"],
)
print("User %s created." % desktop_login)
persons_updated.append(person["id"])
except BaseException:
print(f"User {user['desktop_login']} created.")
except IsUserLimitReachedException:
print(
"User %s creation failed (email duplicated?)."
% (desktop_login)
)

elif person is not None:
try:
person = persons_service.update_person(
person["id"],
{
"email": email,
"first_name": first_name,
"last_name": last_name,
"active": active,
"is_generated_from_ldap": True,
"desktop_login": desktop_login,
"ldap_uid": ldap_uid,
},
f"User {user['desktop_login']} creation failed (limit reached, limit {config.USER_LIMIT})."
)
print("User %s updated." % desktop_login)
persons_updated.append(person["id"])
except BaseException:
print(
"User %s update failed (email duplicated?)."
% (desktop_login)
f"User {user['desktop_login']} creation failed (email duplicated?)."
)

if person is not None and len(thumbnail) > 0:
save_thumbnail(person, thumbnail)

for person in (
Person.query.filter_by(is_generated_from_ldap=True, active=True)
.filter(not_(Person.id.in_(persons_updated)))
.all()
):
persons_service.update_person(person.id, {"active": False})
print(
"User %s disabled (not found in LDAP)." % person.desktop_login
)
if person is not None and user["thumbnail"] is not None:
save_thumbnail(person, user["thumbnail"])

def save_thumbnail(person, thumbnail):
thumbnail_path = "/tmp/ldap_th.jpg"
Expand Down
21 changes: 12 additions & 9 deletions zou/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,22 +133,25 @@ def create_admin(email, password):
# Allow "[email protected]" to be invalid.
if email != "[email protected]":
auth.validate_email(email)
if persons_service.is_user_limit_reached():
raise IsUserLimitReachedException
password = auth.encrypt_password(password)
persons_service.create_person(
email, password, "Super", "Admin", role="admin"
)
print("Admin successfully created.")

except IntegrityError:
print("User already exists for this email.")
sys.exit(1)

except auth.PasswordTooShortException:
print("Password is too short.")
sys.exit(1)
except auth.EmailNotValidException:
print("Email is not valid.")
sys.exit(1)
except IsUserLimitReachedException:
print(f"User limit reached (limit {config.USER_LIMIT}).")
sys.exit(1)


@cli.command()
Expand Down Expand Up @@ -223,7 +226,7 @@ def set_person_as_active(email, unactive):
"""
with app.app_context():
try:
if persons_service.is_user_limit_reached():
if persons_service.is_user_limit_reached() and not unactive:
raise IsUserLimitReachedException
person = persons_service.get_person_by_email_raw(email)
person.update({"active": not unactive})
Expand All @@ -244,12 +247,12 @@ def sync_with_ldap_server():
For each user account in your LDAP server, it creates a new user.
"""
with app.app_context():
try:
if persons_service.is_user_limit_reached():
raise IsUserLimitReachedException
commands.sync_with_ldap_server()
except IsUserLimitReachedException:
print("User limit reached (limit %i)." % config.USER_LIMIT)
commands.sync_with_ldap_server()
if persons_service.is_user_limit_reached():
print(
"User limit reached (limit %i). New users will not be added."
% config.USER_LIMIT
)
sys.exit(1)


Expand Down

0 comments on commit ea82d43

Please sign in to comment.