Skip to content

Commit

Permalink
Really fix code style issues with black
Browse files Browse the repository at this point in the history
  • Loading branch information
lvps committed Nov 4, 2024
1 parent 3682ce0 commit 1705abf
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 501 deletions.
138 changes: 62 additions & 76 deletions LdapWrapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import date
from threading import Lock
from time import time

# noinspection PyUnresolvedReferences
from dataclasses import dataclass
from typing import Optional, List, Dict, Tuple
Expand All @@ -18,7 +19,7 @@ def __enter__(self):
# print("Connecting to LDAP")
self.conn = ldap.initialize(self.server)
self.conn.protocol_version = ldap.VERSION3
if not self.server.startswith('ldaps://'):
if not self.server.startswith("ldaps://"):
self.conn.start_tls_s()
self.conn.simple_bind_s(self.bind_dn, self.password)
if self.conn is None:
Expand Down Expand Up @@ -101,11 +102,11 @@ def update_invite(self, invite_code: str, tgid: int, nickname: Optional[str], co
dn = result[0][0]
del result

modlist = [(ldap.MOD_REPLACE, 'telegramid', str(tgid).encode('UTF-8'))]
modlist = [(ldap.MOD_REPLACE, "telegramid", str(tgid).encode("UTF-8"))]
if nickname is None:
modlist.append((ldap.MOD_DELETE, 'telegramnickname', None))
modlist.append((ldap.MOD_DELETE, "telegramnickname", None))
else:
modlist.append((ldap.MOD_REPLACE, 'telegramnickname', nickname.encode('UTF-8')))
modlist.append((ldap.MOD_REPLACE, "telegramnickname", nickname.encode("UTF-8")))
c.modify_s(dn, modlist)

def delete_cache(self) -> int:
Expand Down Expand Up @@ -162,33 +163,38 @@ def delete_cache(self) -> int:
return busted

def __sync(self, conn):
result = conn.search_s(self.tree, ldap.SCOPE_SUBTREE, f"(objectClass=weeeOpenPerson)", (
'uid',
'cn',
'memberof',
'telegramnickname',
'telegramid',
'schacdateofbirth',
'safetytestdate',
'haskey',
'signedsir',
'nsaccountlock',
))
result = conn.search_s(
self.tree,
ldap.SCOPE_SUBTREE,
f"(objectClass=weeeOpenPerson)",
(
"uid",
"cn",
"memberof",
"telegramnickname",
"telegramid",
"schacdateofbirth",
"safetytestdate",
"haskey",
"signedsir",
"nsaccountlock",
),
)

for dn, attributes in result:
dob = self.schac_to_date(attributes['schacdateofbirth'][0].decode()) if 'schacdateofbirth' in attributes else None
dost = self.schac_to_date(attributes['safetytestdate'][0].decode()) if 'safetytestdate' in attributes else None
dob = self.schac_to_date(attributes["schacdateofbirth"][0].decode()) if "schacdateofbirth" in attributes else None
dost = self.schac_to_date(attributes["safetytestdate"][0].decode()) if "safetytestdate" in attributes else None
person = Person(
attributes['uid'][0].decode(),
attributes['cn'][0].decode(),
attributes["uid"][0].decode(),
attributes["cn"][0].decode(),
dob,
dost,
User.is_in_groups(self.admin_groups, attributes),
attributes['telegramnickname'][0].decode() if 'telegramnickname' in attributes else None,
int(attributes['telegramid'][0].decode()) if 'telegramid' in attributes else None,
'haskey' in attributes and attributes['haskey'][0].decode() == "true",
'signedsir' in attributes and attributes['signedsir'][0].decode() == "true",
'nsaccountlock' in attributes,
attributes["telegramnickname"][0].decode() if "telegramnickname" in attributes else None,
int(attributes["telegramid"][0].decode()) if "telegramid" in attributes else None,
"haskey" in attributes and attributes["haskey"][0].decode() == "true",
"signedsir" in attributes and attributes["signedsir"][0].decode() == "true",
"nsaccountlock" in attributes,
)
self.__people[person.uid.lower()] = person

Expand All @@ -198,6 +204,7 @@ def __sync(self, conn):
def schac_to_date(schac_date):
return date(year=int(schac_date[:4]), month=int(schac_date[4:6]), day=int(schac_date[6:8]))


# noinspection PyAttributeOutsideInit
@dataclass
class User:
Expand Down Expand Up @@ -233,17 +240,7 @@ def update(self, conn, admin_groups: List[str], excluded_groups: List[str], also
:return: attributes, dn
"""
print(f"Update {self.tgid} ({self.dn})")
result = conn.read_s(self.dn, None, (
'uid',
'cn',
'givenname',
'sn',
'memberof',
'telegramnickname',
'telegramid',
'signedsir',
'nsaccountlock'
))
result = conn.read_s(self.dn, None, ("uid", "cn", "givenname", "sn", "memberof", "telegramnickname", "telegramid", "signedsir", "nsaccountlock"))
if len(result) == 0:
raise AccountNotFoundError()
if len(result) > 1:
Expand All @@ -256,16 +253,16 @@ def update(self, conn, admin_groups: List[str], excluded_groups: List[str], also
if isnotallowed:
raise AccountNotFoundError()

if 'nsaccountlock' in attributes:
if "nsaccountlock" in attributes:
raise AccountLockedError()

# self.tgid = int(attributes['tgid'][0].decode())
self.uid = attributes['uid'][0].decode()
self.cn = attributes['cn'][0].decode()
self.givenname = attributes['givenname'][0].decode()
self.surname = attributes['surname'][0].decode()
self.dateofsafetytest = self._schac_to_date(attributes['safetytestdate'][0].decode()) if 'safetytestdate' in attributes else None
self.signedsir = 'signedsir' in attributes and attributes['signedsir'][0].decode() == "true"
self.uid = attributes["uid"][0].decode()
self.cn = attributes["cn"][0].decode()
self.givenname = attributes["givenname"][0].decode()
self.surname = attributes["surname"][0].decode()
self.dateofsafetytest = self._schac_to_date(attributes["safetytestdate"][0].decode()) if "safetytestdate" in attributes else None
self.signedsir = "signedsir" in attributes and attributes["signedsir"][0].decode() == "true"
self.isadmin = User.is_in_groups(admin_groups, attributes)
if also_nickname:
if User.__get_stored_nickname(attributes) != nickname:
Expand Down Expand Up @@ -299,7 +296,7 @@ def search(tgid: int, tgnick: Optional[str], admin_groups: List[str], excluded_g
if isnotallowed:
raise AccountNotFoundError()

if 'nsaccountlock' in attributes:
if "nsaccountlock" in attributes:
raise AccountLockedError()

isadmin = User.is_in_groups(admin_groups, attributes)
Expand All @@ -311,14 +308,15 @@ def search(tgid: int, tgnick: Optional[str], admin_groups: List[str], excluded_g
return User(
dn,
tgid,
attributes['uid'][0].decode(),
attributes['cn'][0].decode(),
attributes['givenname'][0].decode(),
attributes['sn'][0].decode(),
People.schac_to_date(attributes['safetytestdate'][0].decode()) if 'safetytestdate' in attributes else None,
'signedsir' in attributes and attributes['signedsir'][0].decode() == "true",
attributes["uid"][0].decode(),
attributes["cn"][0].decode(),
attributes["givenname"][0].decode(),
attributes["sn"][0].decode(),
People.schac_to_date(attributes["safetytestdate"][0].decode()) if "safetytestdate" in attributes else None,
"signedsir" in attributes and attributes["signedsir"][0].decode() == "true",
isadmin,
tgnick)
tgnick,
)

@staticmethod
def __search_by_tgid(conn, tgid, tree) -> Tuple[Dict, str]:
Expand All @@ -330,18 +328,12 @@ def __search_by_tgid(conn, tgid, tree) -> Tuple[Dict, str]:
:param tree: Users tree DN
:return: attributes, dn
"""
result = conn.search_s(tree, ldap.SCOPE_SUBTREE, f"(&(objectClass=weeeOpenPerson)(telegramId={tgid}))", (
'uid',
'cn',
'givenname',
'sn',
'memberof',
'telegramnickname',
'safetytestdate',
'signedsir',
'telegramid',
'nsaccountlock'
))
result = conn.search_s(
tree,
ldap.SCOPE_SUBTREE,
f"(&(objectClass=weeeOpenPerson)(telegramId={tgid}))",
("uid", "cn", "givenname", "sn", "memberof", "telegramnickname", "safetytestdate", "signedsir", "telegramid", "nsaccountlock"),
)
if len(result) == 0:
raise AccountNotFoundError()
if len(result) > 1:
Expand Down Expand Up @@ -377,17 +369,17 @@ def __search_by_nickname(conn, tgnick: str, tgid: int, tree) -> Tuple[Dict, str]

@staticmethod
def __get_stored_nickname(attributes):
if 'telegramnickname' in attributes:
nickname = attributes['telegramnickname'][0].decode()
if "telegramnickname" in attributes:
nickname = attributes["telegramnickname"][0].decode()
else:
nickname = None
return nickname

@staticmethod
def is_in_groups(groups_list: List[str], attributes):
if 'memberof' not in attributes:
if "memberof" not in attributes:
return False
for group in attributes['memberof']:
for group in attributes["memberof"]:
if group.decode() in groups_list:
return True
return False
Expand All @@ -402,16 +394,10 @@ def __extract_the_only_result(result):
@staticmethod
def __update_nickname(dn: str, new_nickname: Optional[str], conn):
if new_nickname is None:
conn.modify_s(dn, [
(ldap.MOD_DELETE, 'telegramNickname', None)
])
conn.modify_s(dn, [(ldap.MOD_DELETE, "telegramNickname", None)])
else:
conn.modify_s(dn, [
(ldap.MOD_REPLACE, 'telegramNickname', new_nickname.encode('UTF-8'))
])
conn.modify_s(dn, [(ldap.MOD_REPLACE, "telegramNickname", new_nickname.encode("UTF-8"))])

@staticmethod
def __update_id(dn: str, new_id: int, conn):
conn.modify_s(dn, [
(ldap.MOD_REPLACE, 'telegramId', str(new_id).encode('UTF-8'))
])
conn.modify_s(dn, [(ldap.MOD_REPLACE, "telegramId", str(new_id).encode("UTF-8"))])
24 changes: 12 additions & 12 deletions Quotes.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ def __init__(self, oc: owncloud, quotes_path: str, demotivational_path: str, gam
self.demotivational_last_download = None

def _download(self):
if self.quotes_last_download is not None and self._timestamp_now() - self.quotes_last_download < 60*60*48:
if self.quotes_last_download is not None and self._timestamp_now() - self.quotes_last_download < 60 * 60 * 48:
return self

self.quotes = json.loads(self.oc.get_file_contents(self.quotes_path).decode('utf-8'))
self.quotes = json.loads(self.oc.get_file_contents(self.quotes_path).decode("utf-8"))
self.quotes_last_download = self._timestamp_now()

authors_count_for_game = {}
for quote in self.quotes:
if "author" in quote:
parts = quote["author"].split('/')
parts = quote["author"].split("/")
for author in parts:
author: str
author_not_normalized = author.strip()
Expand All @@ -59,21 +59,21 @@ def _download(self):
return self

def _download_demotivational(self):
if self.demotivational_last_download is not None and self._timestamp_now() - self.demotivational_last_download < 60*60*48:
if self.demotivational_last_download is not None and self._timestamp_now() - self.demotivational_last_download < 60 * 60 * 48:
return self

self.demotivational = self.oc.get_file_contents(self.demotivational_path).decode('utf-8').split("\n")
self.demotivational = self.oc.get_file_contents(self.demotivational_path).decode("utf-8").split("\n")
self.demotivational_last_download = self._timestamp_now()

return self

def _download_game(self):
if len(self.game) <= 0:
try:
self.game = json.loads(self.oc.get_file_contents(self.game_path).decode('utf-8'))
self.game = json.loads(self.oc.get_file_contents(self.game_path).decode("utf-8"))
except owncloud.owncloud.HTTPResponseError as e:
if e.status_code == 404:
self.oc.put_file_contents(self.game_path, json.dumps(self.game, indent=1).encode('utf-8'))
self.oc.put_file_contents(self.game_path, json.dumps(self.game, indent=1).encode("utf-8"))
else:
raise e

Expand All @@ -83,7 +83,7 @@ def _download_game(self):
def _timestamp_now() -> float:
return datetime.now().timestamp()

def get_random_quote(self, author: Optional[str]=None):
def get_random_quote(self, author: Optional[str] = None):
self._download()

if author is None:
Expand Down Expand Up @@ -125,7 +125,7 @@ def get_quote_for_game(self, uid: str):
author_normalized = self._normalize_author(author_printable)

# 3 other possibilites
answers = Quotes._random_choices_without_replacement(dict(filter(lambda el : el[0] != author_normalized, self.authors_weights_for_game.items())), 3)
answers = Quotes._random_choices_without_replacement(dict(filter(lambda el: el[0] != author_normalized, self.authors_weights_for_game.items())), 3)
# plus the right one
answers.append(author_normalized)

Expand All @@ -138,7 +138,7 @@ def get_quote_for_game(self, uid: str):
self._init_game(uid)

self.game[uid]["current_author"] = author_printable
#self._save_game()
# self._save_game()

# since author_printable = '/', they're bound
# noinspection PyUnboundLocalVariable
Expand Down Expand Up @@ -176,7 +176,7 @@ def get_demotivational_quote(self):

@staticmethod
def _normalize_author(author):
author = ''.join(filter(str.isalnum, author.strip().lower()))
author = "".join(filter(str.isalnum, author.strip().lower()))
return author

@staticmethod
Expand Down Expand Up @@ -218,4 +218,4 @@ def delete_cache(self) -> int:
def _save_game(self):
if len(self.game) > 0:
# indent=0 to at least have some lines, instead of no newline at all
self.oc.put_file_contents(self.game_path, json.dumps(self.game, indent=0, separators=(',', ':')).encode('utf-8'))
self.oc.put_file_contents(self.game_path, json.dumps(self.game, indent=0, separators=(",", ":")).encode("utf-8"))
Loading

0 comments on commit 1705abf

Please sign in to comment.