diff --git a/LdapWrapper.py b/LdapWrapper.py
index 606076e..a5a88cf 100644
--- a/LdapWrapper.py
+++ b/LdapWrapper.py
@@ -1,6 +1,7 @@
from datetime import date
from threading import Lock
from time import time
+
# noinspection PyUnresolvedReferences
from dataclasses import dataclass
from typing import Optional, List, Dict, Tuple
@@ -18,7 +19,7 @@ def __enter__(self):
# print("Connecting to LDAP")
self.conn = ldap.initialize(self.server)
self.conn.protocol_version = ldap.VERSION3
- if not self.server.startswith('ldaps://'):
+ if not self.server.startswith("ldaps://"):
self.conn.start_tls_s()
self.conn.simple_bind_s(self.bind_dn, self.password)
if self.conn is None:
@@ -101,11 +102,11 @@ def update_invite(self, invite_code: str, tgid: int, nickname: Optional[str], co
dn = result[0][0]
del result
- modlist = [(ldap.MOD_REPLACE, 'telegramid', str(tgid).encode('UTF-8'))]
+ modlist = [(ldap.MOD_REPLACE, "telegramid", str(tgid).encode("UTF-8"))]
if nickname is None:
- modlist.append((ldap.MOD_DELETE, 'telegramnickname', None))
+ modlist.append((ldap.MOD_DELETE, "telegramnickname", None))
else:
- modlist.append((ldap.MOD_REPLACE, 'telegramnickname', nickname.encode('UTF-8')))
+ modlist.append((ldap.MOD_REPLACE, "telegramnickname", nickname.encode("UTF-8")))
c.modify_s(dn, modlist)
def delete_cache(self) -> int:
@@ -162,33 +163,38 @@ def delete_cache(self) -> int:
return busted
def __sync(self, conn):
- result = conn.search_s(self.tree, ldap.SCOPE_SUBTREE, f"(objectClass=weeeOpenPerson)", (
- 'uid',
- 'cn',
- 'memberof',
- 'telegramnickname',
- 'telegramid',
- 'schacdateofbirth',
- 'safetytestdate',
- 'haskey',
- 'signedsir',
- 'nsaccountlock',
- ))
+ result = conn.search_s(
+ self.tree,
+ ldap.SCOPE_SUBTREE,
+ f"(objectClass=weeeOpenPerson)",
+ (
+ "uid",
+ "cn",
+ "memberof",
+ "telegramnickname",
+ "telegramid",
+ "schacdateofbirth",
+ "safetytestdate",
+ "haskey",
+ "signedsir",
+ "nsaccountlock",
+ ),
+ )
for dn, attributes in result:
- dob = self.schac_to_date(attributes['schacdateofbirth'][0].decode()) if 'schacdateofbirth' in attributes else None
- dost = self.schac_to_date(attributes['safetytestdate'][0].decode()) if 'safetytestdate' in attributes else None
+ dob = self.schac_to_date(attributes["schacdateofbirth"][0].decode()) if "schacdateofbirth" in attributes else None
+ dost = self.schac_to_date(attributes["safetytestdate"][0].decode()) if "safetytestdate" in attributes else None
person = Person(
- attributes['uid'][0].decode(),
- attributes['cn'][0].decode(),
+ attributes["uid"][0].decode(),
+ attributes["cn"][0].decode(),
dob,
dost,
User.is_in_groups(self.admin_groups, attributes),
- attributes['telegramnickname'][0].decode() if 'telegramnickname' in attributes else None,
- int(attributes['telegramid'][0].decode()) if 'telegramid' in attributes else None,
- 'haskey' in attributes and attributes['haskey'][0].decode() == "true",
- 'signedsir' in attributes and attributes['signedsir'][0].decode() == "true",
- 'nsaccountlock' in attributes,
+ attributes["telegramnickname"][0].decode() if "telegramnickname" in attributes else None,
+ int(attributes["telegramid"][0].decode()) if "telegramid" in attributes else None,
+ "haskey" in attributes and attributes["haskey"][0].decode() == "true",
+ "signedsir" in attributes and attributes["signedsir"][0].decode() == "true",
+ "nsaccountlock" in attributes,
)
self.__people[person.uid.lower()] = person
@@ -198,6 +204,7 @@ def __sync(self, conn):
def schac_to_date(schac_date):
return date(year=int(schac_date[:4]), month=int(schac_date[4:6]), day=int(schac_date[6:8]))
+
# noinspection PyAttributeOutsideInit
@dataclass
class User:
@@ -233,17 +240,7 @@ def update(self, conn, admin_groups: List[str], excluded_groups: List[str], also
:return: attributes, dn
"""
print(f"Update {self.tgid} ({self.dn})")
- result = conn.read_s(self.dn, None, (
- 'uid',
- 'cn',
- 'givenname',
- 'sn',
- 'memberof',
- 'telegramnickname',
- 'telegramid',
- 'signedsir',
- 'nsaccountlock'
- ))
+ result = conn.read_s(self.dn, None, ("uid", "cn", "givenname", "sn", "memberof", "telegramnickname", "telegramid", "signedsir", "nsaccountlock"))
if len(result) == 0:
raise AccountNotFoundError()
if len(result) > 1:
@@ -256,16 +253,16 @@ def update(self, conn, admin_groups: List[str], excluded_groups: List[str], also
if isnotallowed:
raise AccountNotFoundError()
- if 'nsaccountlock' in attributes:
+ if "nsaccountlock" in attributes:
raise AccountLockedError()
# self.tgid = int(attributes['tgid'][0].decode())
- self.uid = attributes['uid'][0].decode()
- self.cn = attributes['cn'][0].decode()
- self.givenname = attributes['givenname'][0].decode()
- self.surname = attributes['surname'][0].decode()
- self.dateofsafetytest = self._schac_to_date(attributes['safetytestdate'][0].decode()) if 'safetytestdate' in attributes else None
- self.signedsir = 'signedsir' in attributes and attributes['signedsir'][0].decode() == "true"
+ self.uid = attributes["uid"][0].decode()
+ self.cn = attributes["cn"][0].decode()
+ self.givenname = attributes["givenname"][0].decode()
+ self.surname = attributes["surname"][0].decode()
+ self.dateofsafetytest = self._schac_to_date(attributes["safetytestdate"][0].decode()) if "safetytestdate" in attributes else None
+ self.signedsir = "signedsir" in attributes and attributes["signedsir"][0].decode() == "true"
self.isadmin = User.is_in_groups(admin_groups, attributes)
if also_nickname:
if User.__get_stored_nickname(attributes) != nickname:
@@ -299,7 +296,7 @@ def search(tgid: int, tgnick: Optional[str], admin_groups: List[str], excluded_g
if isnotallowed:
raise AccountNotFoundError()
- if 'nsaccountlock' in attributes:
+ if "nsaccountlock" in attributes:
raise AccountLockedError()
isadmin = User.is_in_groups(admin_groups, attributes)
@@ -311,14 +308,15 @@ def search(tgid: int, tgnick: Optional[str], admin_groups: List[str], excluded_g
return User(
dn,
tgid,
- attributes['uid'][0].decode(),
- attributes['cn'][0].decode(),
- attributes['givenname'][0].decode(),
- attributes['sn'][0].decode(),
- People.schac_to_date(attributes['safetytestdate'][0].decode()) if 'safetytestdate' in attributes else None,
- 'signedsir' in attributes and attributes['signedsir'][0].decode() == "true",
+ attributes["uid"][0].decode(),
+ attributes["cn"][0].decode(),
+ attributes["givenname"][0].decode(),
+ attributes["sn"][0].decode(),
+ People.schac_to_date(attributes["safetytestdate"][0].decode()) if "safetytestdate" in attributes else None,
+ "signedsir" in attributes and attributes["signedsir"][0].decode() == "true",
isadmin,
- tgnick)
+ tgnick,
+ )
@staticmethod
def __search_by_tgid(conn, tgid, tree) -> Tuple[Dict, str]:
@@ -330,18 +328,12 @@ def __search_by_tgid(conn, tgid, tree) -> Tuple[Dict, str]:
:param tree: Users tree DN
:return: attributes, dn
"""
- result = conn.search_s(tree, ldap.SCOPE_SUBTREE, f"(&(objectClass=weeeOpenPerson)(telegramId={tgid}))", (
- 'uid',
- 'cn',
- 'givenname',
- 'sn',
- 'memberof',
- 'telegramnickname',
- 'safetytestdate',
- 'signedsir',
- 'telegramid',
- 'nsaccountlock'
- ))
+ result = conn.search_s(
+ tree,
+ ldap.SCOPE_SUBTREE,
+ f"(&(objectClass=weeeOpenPerson)(telegramId={tgid}))",
+ ("uid", "cn", "givenname", "sn", "memberof", "telegramnickname", "safetytestdate", "signedsir", "telegramid", "nsaccountlock"),
+ )
if len(result) == 0:
raise AccountNotFoundError()
if len(result) > 1:
@@ -377,17 +369,17 @@ def __search_by_nickname(conn, tgnick: str, tgid: int, tree) -> Tuple[Dict, str]
@staticmethod
def __get_stored_nickname(attributes):
- if 'telegramnickname' in attributes:
- nickname = attributes['telegramnickname'][0].decode()
+ if "telegramnickname" in attributes:
+ nickname = attributes["telegramnickname"][0].decode()
else:
nickname = None
return nickname
@staticmethod
def is_in_groups(groups_list: List[str], attributes):
- if 'memberof' not in attributes:
+ if "memberof" not in attributes:
return False
- for group in attributes['memberof']:
+ for group in attributes["memberof"]:
if group.decode() in groups_list:
return True
return False
@@ -402,16 +394,10 @@ def __extract_the_only_result(result):
@staticmethod
def __update_nickname(dn: str, new_nickname: Optional[str], conn):
if new_nickname is None:
- conn.modify_s(dn, [
- (ldap.MOD_DELETE, 'telegramNickname', None)
- ])
+ conn.modify_s(dn, [(ldap.MOD_DELETE, "telegramNickname", None)])
else:
- conn.modify_s(dn, [
- (ldap.MOD_REPLACE, 'telegramNickname', new_nickname.encode('UTF-8'))
- ])
+ conn.modify_s(dn, [(ldap.MOD_REPLACE, "telegramNickname", new_nickname.encode("UTF-8"))])
@staticmethod
def __update_id(dn: str, new_id: int, conn):
- conn.modify_s(dn, [
- (ldap.MOD_REPLACE, 'telegramId', str(new_id).encode('UTF-8'))
- ])
+ conn.modify_s(dn, [(ldap.MOD_REPLACE, "telegramId", str(new_id).encode("UTF-8"))])
diff --git a/Quotes.py b/Quotes.py
index 6fd1bf6..4bbb0e7 100644
--- a/Quotes.py
+++ b/Quotes.py
@@ -25,16 +25,16 @@ def __init__(self, oc: owncloud, quotes_path: str, demotivational_path: str, gam
self.demotivational_last_download = None
def _download(self):
- if self.quotes_last_download is not None and self._timestamp_now() - self.quotes_last_download < 60*60*48:
+ if self.quotes_last_download is not None and self._timestamp_now() - self.quotes_last_download < 60 * 60 * 48:
return self
- self.quotes = json.loads(self.oc.get_file_contents(self.quotes_path).decode('utf-8'))
+ self.quotes = json.loads(self.oc.get_file_contents(self.quotes_path).decode("utf-8"))
self.quotes_last_download = self._timestamp_now()
authors_count_for_game = {}
for quote in self.quotes:
if "author" in quote:
- parts = quote["author"].split('/')
+ parts = quote["author"].split("/")
for author in parts:
author: str
author_not_normalized = author.strip()
@@ -59,10 +59,10 @@ def _download(self):
return self
def _download_demotivational(self):
- if self.demotivational_last_download is not None and self._timestamp_now() - self.demotivational_last_download < 60*60*48:
+ if self.demotivational_last_download is not None and self._timestamp_now() - self.demotivational_last_download < 60 * 60 * 48:
return self
- self.demotivational = self.oc.get_file_contents(self.demotivational_path).decode('utf-8').split("\n")
+ self.demotivational = self.oc.get_file_contents(self.demotivational_path).decode("utf-8").split("\n")
self.demotivational_last_download = self._timestamp_now()
return self
@@ -70,10 +70,10 @@ def _download_demotivational(self):
def _download_game(self):
if len(self.game) <= 0:
try:
- self.game = json.loads(self.oc.get_file_contents(self.game_path).decode('utf-8'))
+ self.game = json.loads(self.oc.get_file_contents(self.game_path).decode("utf-8"))
except owncloud.owncloud.HTTPResponseError as e:
if e.status_code == 404:
- self.oc.put_file_contents(self.game_path, json.dumps(self.game, indent=1).encode('utf-8'))
+ self.oc.put_file_contents(self.game_path, json.dumps(self.game, indent=1).encode("utf-8"))
else:
raise e
@@ -83,7 +83,7 @@ def _download_game(self):
def _timestamp_now() -> float:
return datetime.now().timestamp()
- def get_random_quote(self, author: Optional[str]=None):
+ def get_random_quote(self, author: Optional[str] = None):
self._download()
if author is None:
@@ -125,7 +125,7 @@ def get_quote_for_game(self, uid: str):
author_normalized = self._normalize_author(author_printable)
# 3 other possibilites
- answers = Quotes._random_choices_without_replacement(dict(filter(lambda el : el[0] != author_normalized, self.authors_weights_for_game.items())), 3)
+ answers = Quotes._random_choices_without_replacement(dict(filter(lambda el: el[0] != author_normalized, self.authors_weights_for_game.items())), 3)
# plus the right one
answers.append(author_normalized)
@@ -138,7 +138,7 @@ def get_quote_for_game(self, uid: str):
self._init_game(uid)
self.game[uid]["current_author"] = author_printable
- #self._save_game()
+ # self._save_game()
# since author_printable = '/', they're bound
# noinspection PyUnboundLocalVariable
@@ -176,7 +176,7 @@ def get_demotivational_quote(self):
@staticmethod
def _normalize_author(author):
- author = ''.join(filter(str.isalnum, author.strip().lower()))
+ author = "".join(filter(str.isalnum, author.strip().lower()))
return author
@staticmethod
@@ -218,4 +218,4 @@ def delete_cache(self) -> int:
def _save_game(self):
if len(self.game) > 0:
# indent=0 to at least have some lines, instead of no newline at all
- self.oc.put_file_contents(self.game_path, json.dumps(self.game, indent=0, separators=(',', ':')).encode('utf-8'))
+ self.oc.put_file_contents(self.game_path, json.dumps(self.game, indent=0, separators=(",", ":")).encode("utf-8"))
diff --git a/ToLab.py b/ToLab.py
index 8b78241..79debb7 100644
--- a/ToLab.py
+++ b/ToLab.py
@@ -14,7 +14,7 @@ def __init__(self, oc, tolab_path: str):
self.oc = oc
self.local_tz = pytz.timezone("Europe/Rome")
self.tolab_path = tolab_path
- self.tolab_file = json.loads(oc.get_file_contents(self.tolab_path).decode('utf-8'))
+ self.tolab_file = json.loads(oc.get_file_contents(self.tolab_path).decode("utf-8"))
for entry in self.tolab_file:
entry["tolab"] = self.string_to_datetime(entry["tolab"])
@@ -68,7 +68,8 @@ def string_to_datetime(self, from_time):
hour=the_real_date.hour,
minute=the_real_date.minute,
second=0,
- microsecond=0)
+ microsecond=0,
+ )
def __delete_user(self, telegram_id):
keep = []
@@ -84,7 +85,7 @@ def __create_entry(self, username: str, telegram_id: int, time: str, day: int):
now = datetime.now(self.local_tz)
# Assume that the time refers to today
theday = now + timedelta(days=day)
- theday = theday.strftime('%Y-%m-%d')
+ theday = theday.strftime("%Y-%m-%d")
going = self.string_to_datetime(f"{theday} {time}")
# If it already passed, user probably meant "tomorrow"
@@ -174,7 +175,7 @@ def save(self, entries: list):
for entry in serializable:
# Save it in local timezone format, because who cares
entry["tolab"] = datetime.strftime(entry["tolab"], "%Y-%m-%d %H:%M")
- self.oc.put_file_contents(self.tolab_path, json.dumps(serializable, indent=2).encode('utf-8'))
+ self.oc.put_file_contents(self.tolab_path, json.dumps(serializable, indent=2).encode("utf-8"))
class Tolab_Calendar:
@@ -188,7 +189,7 @@ def __init__(self, month_offset=0):
self.month_offset = int(month_offset)
def make(self):
- month , days, dates = self.set_calendar()
+ month, days, dates = self.set_calendar()
month_num = month.split()[0]
year_num = int(month.split()[1])
month_num = datetime.strptime(month_num, "%B").month
@@ -203,23 +204,25 @@ def make(self):
for date in row:
if date == f"{self.day}" and year_num == self.td_year and month_num == self.td_month:
week.append(inline_keyboard_button(f"📍{date}", callback_data=f"tolab:{date}:{month}"))
- elif date == ' ':
+ elif date == " ":
week.append(inline_keyboard_button(date, callback_data="tolab:None"))
elif year_num <= self.td_year and month_num <= self.td_month and int(date) <= self.day:
week.append(inline_keyboard_button(date, callback_data="tolab:None"))
else:
week.append(inline_keyboard_button(date, callback_data=f"tolab:{date}:{month}"))
keyboard.append(week)
- keyboard.append([
- inline_keyboard_button(label="⬅️", callback_data=f"tolab:backward_month:{self.month_offset-1}:"),
- inline_keyboard_button(label="❌", callback_data="tolab:cancel_tolab"),
- inline_keyboard_button(label="➡️", callback_data=f"tolab:forward_month:{self.month_offset+1}")
- ])
+ keyboard.append(
+ [
+ inline_keyboard_button(label="⬅️", callback_data=f"tolab:backward_month:{self.month_offset-1}:"),
+ inline_keyboard_button(label="❌", callback_data="tolab:cancel_tolab"),
+ inline_keyboard_button(label="➡️", callback_data=f"tolab:forward_month:{self.month_offset+1}"),
+ ]
+ )
return keyboard
def set_calendar(self):
- self.month = (self.month + self.month_offset)
- year_offset = int((self.month - 1)/12)
+ self.month = self.month + self.month_offset
+ year_offset = int((self.month - 1) / 12)
self.month = ((self.month - 1) % 12) + 1
rows = calendar.month(self.year + year_offset, self.month, 2, 1).splitlines()
month = rows[0].strip()
@@ -229,11 +232,11 @@ def set_calendar(self):
d = d.strip(" ")
d = d.split()
if len(d) != 7:
- if d[0] == '1':
+ if d[0] == "1":
for i in range(7 - len(d)):
d.insert(0, " ")
else:
for i in range(7 - len(d)):
d.append(" ")
dates[row] = d
- return month, days, dates
\ No newline at end of file
+ return month, days, dates
diff --git a/Weeelablib.py b/Weeelablib.py
index 5a3631a..b189f24 100644
--- a/Weeelablib.py
+++ b/Weeelablib.py
@@ -1,6 +1,7 @@
import datetime
import re
from time import time
+
# noinspection PyUnresolvedReferences
import owncloud
import pytz
@@ -31,7 +32,7 @@ def get_log(self):
return self
self.log = []
- log_file = self.oc.get_file_contents(self.log_path).decode('utf-8')
+ log_file = self.oc.get_file_contents(self.log_path).decode("utf-8")
log_lines = log_file.splitlines()
for line in log_lines:
@@ -99,7 +100,7 @@ def update_old_logs(self, max_month, max_year):
filename = self.log_base + "log" + str(year) + str(month).zfill(2) + ".txt"
print(f"Downloading {filename}")
try:
- log_file = self.oc.get_file_contents(filename).decode('utf-8')
+ log_file = self.oc.get_file_contents(filename).decode("utf-8")
log_lines = log_file.splitlines()
for line in log_lines:
@@ -206,7 +207,7 @@ def get_entries_inlab(self):
def store_new_user(self, tid, name: str, surname: str, username: str):
new_users_file = self.oc.get_file_contents(self.user_bot_path)
- new_users = new_users_file.decode('utf-8')
+ new_users = new_users_file.decode("utf-8")
if str(tid) in new_users:
return
@@ -214,14 +215,14 @@ def store_new_user(self, tid, name: str, surname: str, username: str):
# Store a new user name and id in a file on owncloud server,
# encoding in utf.8
try:
- if surname != '':
+ if surname != "":
surname = f" {surname}"
- if username == '':
+ if username == "":
username = " (no username)"
else:
username = f" (@{username})"
new_users = new_users + "{}{}{}: {}\n".format(name, surname, username, tid)
- self.oc.put_file_contents(self.user_bot_path, new_users.encode('utf-8'))
+ self.oc.put_file_contents(self.user_bot_path, new_users.encode("utf-8"))
except (AttributeError, UnicodeEncodeError):
print("ERROR writing " + self.user_bot_path)
pass
@@ -255,7 +256,7 @@ def mm_to_hh_mm(minutes):
class WeeelabLine:
- regex = re.compile('\[([^\]]+)\]\s*\[([^\]]+)\]\s*\[([^\]]+)\]\s*<([^>]+)>\s*[:{2}]*\s*(.*)')
+ regex = re.compile("\[([^\]]+)\]\s*\[([^\]]+)\]\s*\[([^\]]+)\]\s*<([^>]+)>\s*[:{2}]*\s*(.*)")
def __init__(self, line: str):
res = self.regex.match(line)
@@ -279,5 +280,5 @@ def duration_minutes(self):
if self.inlab:
return 0
- parts = self.duration.split(':')
+ parts = self.duration.split(":")
return int(parts[0]) * 60 + int(parts[1])
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..162ab46
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,2 @@
+[tool.black]
+line-length = 160
diff --git a/remote_commands.py b/remote_commands.py
index 3763888..dddb37f 100644
--- a/remote_commands.py
+++ b/remote_commands.py
@@ -1,11 +1,8 @@
ssh_weeelab_command = {
- 'logout': [
- "weeelab -o ",
- " -m "
- ],
- 'login': [
+ "logout": ["weeelab -o ", " -m "],
+ "login": [
"weeelab -i ",
- ]
+ ],
}
ssh_i_am_door_command = "telefono/sonoporta"
diff --git a/ssh_util.py b/ssh_util.py
index cc57fec..a8db011 100644
--- a/ssh_util.py
+++ b/ssh_util.py
@@ -13,19 +13,20 @@
class SSHUtil:
"""Class to connect to a remote server"""
- def __init__(self,
- host: str = None,
- username: str = None,
- password: str = None,
- timeout: float = 10.,
- commands: [str] = None,
- private_key_path: str = None,
- connection_port: int = 22,
- upload_remote_filepath: str = None,
- upload_local_filepath: str = None,
- download_remote_filepath: str = None,
- download_local_filepath: str = None
- ): # come on, constructor, don't be sad :)
+ def __init__(
+ self,
+ host: str = None,
+ username: str = None,
+ password: str = None,
+ timeout: float = 10.0,
+ commands: [str] = None,
+ private_key_path: str = None,
+ connection_port: int = 22,
+ upload_remote_filepath: str = None,
+ upload_local_filepath: str = None,
+ download_remote_filepath: str = None,
+ download_local_filepath: str = None,
+ ): # come on, constructor, don't be sad :)
self.ssh_output = None
self.ssh_error = None
self.return_code = None
@@ -63,12 +64,26 @@ def connect(self):
if self.password is None:
# this needs to be a PEM key (begins with RSA not OPENSSH)
# use "ssh-keygen -p -m PEM -f id_rsa_X" to convert OPENSSH to RSA
- self.client.connect(hostname=self.host, port=self.port, username=self.username, key_filename=self.pkey,
- timeout=self.timeout, allow_agent=False, look_for_keys=False)
+ self.client.connect(
+ hostname=self.host,
+ port=self.port,
+ username=self.username,
+ key_filename=self.pkey,
+ timeout=self.timeout,
+ allow_agent=False,
+ look_for_keys=False,
+ )
print("Connected to the server", self.host)
else:
- self.client.connect(hostname=self.host, port=self.port, username=self.username, password=self.password,
- timeout=self.timeout, allow_agent=False, look_for_keys=False)
+ self.client.connect(
+ hostname=self.host,
+ port=self.port,
+ username=self.username,
+ password=self.password,
+ timeout=self.timeout,
+ allow_agent=False,
+ look_for_keys=False,
+ )
print("Connected to the server", self.host)
except paramiko.AuthenticationException:
print("Authentication failed, please verify your credentials")
@@ -80,8 +95,8 @@ def connect(self):
print("Connection timed out")
result_flag = False
except Exception as e:
- print('\nException in connecting to the server')
- print('PYTHON SAYS:', e)
+ print("\nException in connecting to the server")
+ print("PYTHON SAYS:", e)
result_flag = False
self.client.close()
else:
@@ -106,8 +121,7 @@ def execute_command(self, commands=None):
self.ssh_error = stderr.read()
self.return_code = stdout.channel.recv_exit_status()
if self.ssh_error:
- print(
- "Problem occurred while running command:" + command + " The error is " + self.ssh_error.decode())
+ print("Problem occurred while running command:" + command + " The error is " + self.ssh_error.decode())
result_flag = False
else:
print("Command execution completed successfully:", '"' + command + '"')
@@ -140,8 +154,8 @@ def upload_file(self, uploadlocalfilepath, uploadremotefilepath):
print("Could not establish SSH connection")
result_flag = False
except Exception as e:
- print('\nUnable to upload the file to the remote server', uploadremotefilepath)
- print('PYTHON SAYS:', e)
+ print("\nUnable to upload the file to the remote server", uploadremotefilepath)
+ print("PYTHON SAYS:", e)
result_flag = False
ftp_client.close()
self.client.close()
@@ -161,8 +175,8 @@ def download_file(self, downloadremotefilepath, downloadlocalfilepath):
print("Could not establish SSH connection")
result_flag = False
except Exception as e:
- print('\nUnable to download the file from the remote server', downloadremotefilepath)
- print('PYTHON SAYS:', e)
+ print("\nUnable to download the file from the remote server", downloadremotefilepath)
+ print("PYTHON SAYS:", e)
result_flag = False
ftp_client.close()
self.client.close()
@@ -181,7 +195,7 @@ def __init__(self, arg):
# ---USAGE EXAMPLES
-if __name__ == '__main__':
+if __name__ == "__main__":
print("Start of %s" % __file__)
# Initialize the ssh object
diff --git a/stream_yt_audio.py b/stream_yt_audio.py
index cb71d0f..42f5a08 100644
--- a/stream_yt_audio.py
+++ b/stream_yt_audio.py
@@ -27,7 +27,7 @@ def get_player(self):
def __create_new_player(self):
playurl = self.__get_playurl()
- instance = vlc.Instance('-q')
+ instance = vlc.Instance("-q")
player = instance.media_player_new()
media = instance.media_new(playurl)
media.get_mrl() # TODO: what does this do?
@@ -49,16 +49,16 @@ def __download_metadata(self):
url = "https://www.youtube.com/watch?v=5qap5aO4i9A"
# https://stackoverflow.com/a/49249893
ydl_opts = {
- 'format': 'bestaudio/best',
+ "format": "bestaudio/best",
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
- playurl = info['formats'][0]['url']
+ playurl = info["formats"][0]["url"]
self.playurl = playurl
self.last_player_time = time()
-if __name__ == '__main__':
+if __name__ == "__main__":
# to test if streaming works
# player = LofiVlcPlayer().get_player()
# player.play()
diff --git a/variables.py b/variables.py
index fc5d04f..78f7d92 100644
--- a/variables.py
+++ b/variables.py
@@ -35,15 +35,11 @@ def __unpack_wol(wol):
LDAP_TREE_GROUPS = os.environ.get("LDAP_TREE_GROUPS") # ou=Groups,dc=weeeopen,dc=it
LDAP_TREE_PEOPLE = os.environ.get("LDAP_TREE_PEOPLE") # ou=People,dc=weeeopen,dc=it
LDAP_TREE_INVITES = os.environ.get("LDAP_TREE_INVITES") # ou=Invites,dc=weeeopen,dc=it
-LDAP_ADMIN_GROUPS = os.environ.get(
- "LDAP_ADMIN_GROUPS"
-) # ou=Group,dc=weeeopen,dc=it|ou=OtherGroup,dc=weeeopen,dc=it
+LDAP_ADMIN_GROUPS = os.environ.get("LDAP_ADMIN_GROUPS") # ou=Group,dc=weeeopen,dc=it|ou=OtherGroup,dc=weeeopen,dc=it
if LDAP_ADMIN_GROUPS is not None:
LDAP_ADMIN_GROUPS = LDAP_ADMIN_GROUPS.split("|")
-INVITE_LINK = os.environ.get(
- "INVITE_LINK"
-) # https://example.com/register.php?invite= (invite code will be appended, no spaces in invite code)
+INVITE_LINK = os.environ.get("INVITE_LINK") # https://example.com/register.php?invite= (invite code will be appended, no spaces in invite code)
SSH_SCMA_USER = os.environ.get("SSH_SCMA_USER") # foo
SSH_SCMA_HOST_IP = os.environ.get("SSH_SCMA_HOST_IP") # 10.20.30.40
@@ -53,9 +49,7 @@ def __unpack_wol(wol):
SSH_PIALL_HOST_IP = os.environ.get("SSH_PIALL_HOST_IP")
SSH_PIALL_KEY_PATH = os.environ.get("SSH_PIALL_KEY_PATH")
-WOL_MACHINES = os.environ.get(
- "WOL_MACHINES"
-) # machine:00:0a:0b:0c:0d:0e|other:10:2a:3b:4c:5d:6e
+WOL_MACHINES = os.environ.get("WOL_MACHINES") # machine:00:0a:0b:0c:0d:0e|other:10:2a:3b:4c:5d:6e
if WOL_MACHINES is not None:
WOL_MACHINES = __unpack_wol(WOL_MACHINES)
WOL_WEEELAB = os.environ.get("WOL_WEEELAB") # 00:0a:0b:0c:0d:0e
diff --git a/weeelab_bot.py b/weeelab_bot.py
index 0bf533d..880989a 100644
--- a/weeelab_bot.py
+++ b/weeelab_bot.py
@@ -122,9 +122,7 @@ def get_updates(self, timeout=120):
requests_timeout = timeout + 5
# noinspection PyBroadException
try:
- result = requests.get(
- self.api_url + "getUpdates", params, timeout=requests_timeout
- ).json()["result"]
+ result = requests.get(self.api_url + "getUpdates", params, timeout=requests_timeout).json()["result"]
if len(result) > 0:
self.offset = result[-1]["update_id"] + 1
return result
@@ -284,43 +282,21 @@ def calculate_time_to_sleep(hour: int, minute: int = 0) -> int:
"""
# hour is before given hour -> wait until today at given hour and minute
if int(datetime.datetime.now().time().strftime("%k")) < hour:
- time_to_sleep = int(
- (
- datetime.datetime.today().replace(hour=hour, minute=minute, second=0)
- - datetime.datetime.now()
- ).total_seconds()
- )
+ time_to_sleep = int((datetime.datetime.today().replace(hour=hour, minute=minute, second=0) - datetime.datetime.now()).total_seconds())
# hour is equal to given hour
elif int(datetime.datetime.now().time().strftime("%k")) == hour:
# minute is before given minute -> wait until today at given time
if int(datetime.datetime.now().time().strftime("%M")) < minute:
- time_to_sleep = int(
- (
- datetime.datetime.today().replace(
- hour=hour, minute=minute, second=0
- )
- - datetime.datetime.now()
- ).total_seconds()
- )
+ time_to_sleep = int((datetime.datetime.today().replace(hour=hour, minute=minute, second=0) - datetime.datetime.now()).total_seconds())
# minute is after given minute -> wait until tomorrow at given time
else:
time_to_sleep = int(
- (
- datetime.datetime.today().replace(
- hour=hour, minute=minute, second=0
- )
- + timedelta(days=1)
- - datetime.datetime.now()
- ).total_seconds()
+ (datetime.datetime.today().replace(hour=hour, minute=minute, second=0) + timedelta(days=1) - datetime.datetime.now()).total_seconds()
)
# hour is after given hour -> wait until tomorrow at given time
else:
time_to_sleep = int(
- (
- datetime.datetime.today().replace(hour=hour, minute=minute, second=0)
- + timedelta(days=1)
- - datetime.datetime.now()
- ).total_seconds()
+ (datetime.datetime.today().replace(hour=hour, minute=minute, second=0) + timedelta(days=1) - datetime.datetime.now()).total_seconds()
)
return time_to_sleep
@@ -391,21 +367,11 @@ def _fah_get(json_list, name: str):
with open(json_history, "r") as inf:
daily = True
json_history_content = json.load(inf)
- previous_snapshot_key = max(
- k for k, v in json_history_content.items()
- )
+ previous_snapshot_key = max(k for k, v in json_history_content.items())
- donors_previous_score = {
- _fah_get(donor, "name"): _fah_get(donor, "score")
- for donor in json_res
- }
+ donors_previous_score = {_fah_get(donor, "name"): _fah_get(donor, "score") for donor in json_res}
# associate daily increase to each name
- donors_daily_score = {
- name: donors_previous_score[name] - score
- for name, score in json_history_content[
- previous_snapshot_key
- ].items()
- }
+ donors_daily_score = {name: donors_previous_score[name] - score for name, score in json_history_content[previous_snapshot_key].items()}
# sort by top score first
top_3_donors_by_daily_score = {
k: v
@@ -417,11 +383,8 @@ def _fah_get(json_list, name: str):
}
top_3 = "\n".join(
[
- f"#{i+1}
{name} with "
- f"{human_readable_number(score)} points"
- for i, (name, score) in enumerate(
- top_3_donors_by_daily_score.items()
- )
+ f"#{i+1}
{name} with " f"{human_readable_number(score)} points"
+ for i, (name, score) in enumerate(top_3_donors_by_daily_score.items())
if score > 0
]
)
@@ -431,10 +394,7 @@ def _fah_get(json_list, name: str):
new_file = True
# insert new snapshot in JSON
- json_history_content[last] = {
- _fah_get(donor, "name"): _fah_get(donor, "score")
- for donor in json_res
- }
+ json_history_content[last] = {_fah_get(donor, "name"): _fah_get(donor, "score") for donor in json_res}
with open(json_history, "w") as outf:
json.dump(json_history_content, outf)
@@ -445,12 +405,8 @@ def _fah_get(json_list, name: str):
top_10 = []
for i, member in enumerate(json_res[:10]):
- this_top_10 = (
- f"#{i+1}
{_fah_get(member, 'name')} with "
- )
- this_top_10 += (
- f"{human_readable_number(_fah_get(member, 'score'))} points"
- )
+ this_top_10 = f"#{i+1}
{_fah_get(member, 'name')} with "
+ this_top_10 += f"{human_readable_number(_fah_get(member, 'score'))} points"
this_top_10 += f", {_fah_get(member, 'wus')} WUs"
if _fah_get(member, "rank") is not None:
this_top_10 += f", rank {human_readable_number(_fah_get(member, 'rank'))}"
@@ -465,19 +421,11 @@ def _fah_get(json_list, name: str):
delta = ""
if daily:
- delta = (
- f"Daily increase: {human_readable_number(sum(donors_daily_score.values()))}\n"
- if not new_file
- else ""
- )
+ delta = f"Daily increase: {human_readable_number(sum(donors_daily_score.values()))}\n" if not new_file else ""
top_3_daily = ""
if not new_file:
- top_3_daily = (
- f"Daily MVPs:\n{top_3}\n\n"
- if top_3
- else "No MVPs today since the score has not increased.\n\n"
- )
+ top_3_daily = f"Daily MVPs:\n{top_3}\n\n" if top_3 else "No MVPs today since the score has not increased.\n\n"
text = (
f"Total Team Score: {human_readable_number(total_credit)}\n"
@@ -560,11 +508,7 @@ def read_user_from_callback(self, last_update):
self.__last_from = last_update["callback_query"]["from"]
self.__last_chat_id = last_update["callback_query"]["message"]["chat"]["id"]
self.__last_user_id = last_update["callback_query"]["from"]["id"]
- self.__last_user_nickname = (
- last_update["callback_query"]["from"]["username"]
- if "username" in last_update["callback_query"]["from"]
- else None
- )
+ self.__last_user_nickname = last_update["callback_query"]["from"]["username"] if "username" in last_update["callback_query"]["from"] else None
return self.__read_user(None)
@@ -572,20 +516,14 @@ def read_user_from_message(self, last_update):
self.__last_from = last_update["message"]["from"]
self.__last_chat_id = last_update["message"]["chat"]["id"]
self.__last_user_id = last_update["message"]["from"]["id"]
- self.__last_user_nickname = (
- last_update["message"]["from"]["username"]
- if "username" in last_update["message"]["from"]
- else None
- )
+ self.__last_user_nickname = last_update["message"]["from"]["username"] if "username" in last_update["message"]["from"] else None
return self.__read_user(last_update["message"]["text"])
def __read_user(self, text: Optional[str]):
self.user = None
try:
- self.user = self.users.get(
- self.__last_user_id, self.__last_user_nickname, self.conn
- )
+ self.user = self.users.get(self.__last_user_id, self.__last_user_nickname, self.conn)
return True
except (LdapConnectionError, DuplicateEntryError) as e:
self.exception(e.__class__.__name__)
@@ -617,9 +555,7 @@ def __send_inline_keyboard(self, message, markup):
self.bot.send_message(self.__last_chat_id, message, reply_markup=markup)
def __edit_message(self, message_id, message, markup):
- self.bot.edit_message(
- self.__last_chat_id, message_id, message, reply_markup=markup
- )
+ self.bot.edit_message(self.__last_chat_id, message_id, message, reply_markup=markup)
def respond_to_invite_link(self, message) -> bool:
message: str
@@ -628,13 +564,9 @@ def respond_to_invite_link(self, message) -> bool:
link = message.split(" ", 1)[0]
code = link[len(INVITE_LINK) :]
try:
- self.users.update_invite(
- code, self.__last_user_id, self.__last_user_nickname, self.conn
- )
+ self.users.update_invite(code, self.__last_user_id, self.__last_user_nickname, self.conn)
except AccountNotFoundError:
- self.__send_message(
- "I couldn't find your invite. Are you sure of that link?"
- )
+ self.__send_message("I couldn't find your invite. Are you sure of that link?")
return True
self.__send_message(
"Hey, I've filled some fields in the registration form for you, no need to say thanks.\n"
@@ -658,19 +590,13 @@ def start(self):
def format_user_in_list(self, username: str, other=""):
person = self.people.get(username, self.conn)
- user_id = (
- None if person is None or person.tgid is None else person.tgid
- ) # This is unreadable. Deal with it.
+ user_id = None if person is None or person.tgid is None else person.tgid # This is unreadable. Deal with it.
display_name = CommandHandler.try_get_display_name(username, person)
haskey = chr(128273) if person.haskey else ""
sir = ""
- if (
- self.user.isadmin
- and person.dateofsafetytest is not None
- and not person.signedsir
- ):
+ if self.user.isadmin and person.dateofsafetytest is not None and not person.signedsir:
sir = f" (Remember to sign the SIR! {chr(128221)})"
if user_id is None:
@@ -728,18 +654,14 @@ def inlab(self):
elif today + datetime.timedelta(days=1) == going_day:
msg += self.format_user_in_list(username, f" tomorrow at {hh}:{mm}")
else:
- msg += self.format_user_in_list(
- username, f" on {str(going_day)} at {hh}:{mm}"
- )
+ msg += self.format_user_in_list(username, f" on {str(going_day)} at {hh}:{mm}")
if username == self.user.uid:
user_themself_tolab = True
if not user_themself_tolab and not user_themself_inlab:
msg += "\nAre you going, too? Tell everyone with /tolab."
else:
if right_now.hour > 19:
- msg += (
- "\n\nAre you going to the lab tomorrow? Tell everyone with /tolab."
- )
+ msg += "\n\nAre you going to the lab tomorrow? Tell everyone with /tolab."
elif not user_themself_inlab:
msg += "\n\nAre you going to the lab later? Tell everyone with /tolab."
@@ -751,18 +673,14 @@ def tolab(self, the_time: str, day: str = None, is_gui: bool = False):
try:
the_time = self._tolab_parse_time(the_time)
except ValueError:
- self.__send_message(
- "Use correct time format, e.g. 10:30, or no to cancel"
- )
+ self.__send_message("Use correct time format, e.g. 10:30, or no to cancel")
return
if the_time is not None:
try:
day = self._tolab_parse_day(day)
except ValueError:
- self.__send_message(
- "Use correct day format: +1 for tomorrow, +2 for the day after tomorrow and so on"
- )
+ self.__send_message("Use correct day format: +1 for tomorrow, +2 for the day after tomorrow and so on")
return
# noinspection PyBroadException
@@ -770,17 +688,13 @@ def tolab(self, the_time: str, day: str = None, is_gui: bool = False):
if the_time is None:
# Delete previous entry via Telegram ID
self.tolab_db.delete_entry(self.user.tgid)
- self.__send_message(
- f"Ok, you aren't going to the lab, I've taken note."
- )
+ self.__send_message(f"Ok, you aren't going to the lab, I've taken note.")
else:
sir_message = ""
if not self.user.signedsir and self.user.dateofsafetytest is not None:
sir_message = "\nRemember to sign the SIR when you get there!"
- days = self.tolab_db.set_entry(
- self.user.uid, self.user.tgid, the_time, day
- )
+ days = self.tolab_db.set_entry(self.user.uid, self.user.tgid, the_time, day)
if not is_gui:
if days <= 0:
self.__send_message(
@@ -790,15 +704,10 @@ def tolab(self, the_time: str, day: str = None, is_gui: bool = False):
)
elif days == 1:
self.__send_message(
- f"So you'll go the lab at {the_time} tomorrow. Use /tolab_no to cancel. "
- f"Check if anyone else is coming with /inlab{sir_message}"
+ f"So you'll go the lab at {the_time} tomorrow. Use /tolab_no to cancel. " f"Check if anyone else is coming with /inlab{sir_message}"
)
else:
- last_message = (
- sir_message
- if sir_message != ""
- else "\nMark it down on your calendar!"
- )
+ last_message = sir_message if sir_message != "" else "\nMark it down on your calendar!"
self.__send_message(
f"So you'll go the lab at {the_time} in {days} days. Use /tolab_no to "
f"cancel. Check if anyone else is coming with /inlab"
@@ -830,12 +739,7 @@ def _tolab_parse_time(the_time: str):
return f"0{the_time}:00"
elif len(the_time) == 2 and the_time.isdigit() and 0 <= int(the_time) <= 23:
return f"{the_time}:00"
- elif (
- len(the_time) == 4
- and the_time[0].isdigit()
- and the_time[2:4].isdigit()
- and 0 <= int(the_time[2:4]) <= 59
- ):
+ elif len(the_time) == 4 and the_time[0].isdigit() and the_time[2:4].isdigit() and 0 <= int(the_time[2:4]) <= 59:
if the_time[1] == ".":
return ":".join(the_time.split("."))
elif the_time[1] == ":":
@@ -900,9 +804,7 @@ def ring(self, wave_obj):
else:
wave_obj.play()
- self.__send_message(
- "You rang the bell 🔔 Wait at door 3 until someone comes. 🔔"
- )
+ self.__send_message("You rang the bell 🔔 Wait at door 3 until someone comes. 🔔")
def user_is_in_lab(self, uid):
inlab = self.logs.get_log().get_entries_inlab()
@@ -937,9 +839,7 @@ def log(self, cmd_days_to_filter=None):
break
days[this_day] = []
- print_name = CommandHandler.try_get_display_name(
- line.username, self.people.get(line.username, self.conn)
- )
+ print_name = CommandHandler.try_get_display_name(line.username, self.people.get(line.username, self.conn))
if line.inlab:
days[this_day].append(f"{print_name} is in lab\n")
@@ -948,9 +848,7 @@ def log(self, cmd_days_to_filter=None):
msg = ""
for this_day in days:
- msg += "{day}\n{rows}\n".format(
- day=this_day, rows="".join(days[this_day])
- )
+ msg += "{day}\n{rows}\n".format(day=this_day, rows="".join(days[this_day]))
msg = msg + "Latest log update: {}".format(self.logs.log_last_update)
self.__send_message(msg)
@@ -973,16 +871,12 @@ def stat(self, cmd_target_user=None):
self.logs.get_log()
if not self.logs.user_exists_in_logs(target_username):
target_username = None
- self.__send_message(
- "No statistics for the given user. Have you typed it correctly?"
- )
+ self.__send_message("No statistics for the given user. Have you typed it correctly?")
else:
target_username = person.uid
else:
target_username = None
- self.__send_message(
- "Sorry! You are not allowed to see stat of other users!\nOnly admins can!"
- )
+ self.__send_message("Sorry! You are not allowed to see stat of other users!\nOnly admins can!")
# Do we know what to search?
if target_username is not None:
@@ -994,9 +888,7 @@ def stat(self, cmd_target_user=None):
month_mins_hh, month_mins_mm = self.logs.mm_to_hh_mm(month_mins)
total_mins_hh, total_mins_mm = self.logs.mm_to_hh_mm(total_mins)
- name = CommandHandler.try_get_display_name(
- target_username, self.people.get(target_username, self.conn)
- )
+ name = CommandHandler.try_get_display_name(target_username, self.people.get(target_username, self.conn))
msg = (
f"Stat for {name}:"
f"\n{month_mins_hh} h {month_mins_mm} m this month."
@@ -1026,9 +918,7 @@ def history(self, item, cmd_limit=None):
change = history[index].change
h_user = history[index].user
h_other = history[index].other
- h_time = datetime.datetime.fromtimestamp(
- int(history[index].time)
- ).strftime("%d-%m-%Y %H:%M")
+ h_time = datetime.datetime.fromtimestamp(int(history[index].time)).strftime("%d-%m-%Y %H:%M")
if change == AuditChanges.Move:
msg += f"➡️ Moved to {h_other}\n"
elif change == AuditChanges.Update:
@@ -1044,9 +934,7 @@ def history(self, item, cmd_limit=None):
else:
msg += f"Unknown change {change.value}"
entries += 1
- display_user = CommandHandler.try_get_display_name(
- h_user, self.people.get(h_user, self.conn)
- )
+ display_user = CommandHandler.try_get_display_name(h_user, self.people.get(h_user, self.conn))
msg += f"{h_time} by {display_user}\n\n"
if entries >= 6:
self.__send_message(msg)
@@ -1073,9 +961,7 @@ def item_info(self, item):
msg += f"----------------------------\n"
for feature in item.product.features:
msg += f"{feature}: {item.product.features[feature]}\n"
- msg += (
- f'\nView on Tarallo'
- )
+ msg += f'\nView on Tarallo'
self.__send_message(msg)
except ItemNotFoundError:
@@ -1089,9 +975,7 @@ def item_location(self, item):
item = self.tarallo.get_item(item, 0)
location = " → ".join(item.location)
msg = f"Item {item.code}\nLocation: {location}\n"
- msg += (
- f'\nView on Tarallo'
- )
+ msg += f'\nView on Tarallo'
self.__send_message(msg)
except ItemNotFoundError:
@@ -1127,9 +1011,7 @@ def top(self, cmd_filter=None):
if entry is not None:
n += 1
time_hh, time_mm = self.logs.mm_to_hh_mm(the_time)
- display_user = CommandHandler.try_get_display_name(
- rival, self.people.get(rival, self.conn)
- )
+ display_user = CommandHandler.try_get_display_name(rival, self.people.get(rival, self.conn))
if entry.isadmin:
msg += f"{n}) [{time_hh}:{time_mm}] {display_user}\n"
else:
@@ -1194,10 +1076,7 @@ def game(self, param=None):
return
right_percent = right * 100 / total
wrong_percent = wrong * 100 / total
- self.__send_message(
- f"You answered {total} questions.\n"
- f"Right: {right} ({right_percent:2.1f}%)\nWrong: {wrong} ({wrong_percent:2.1f}%)"
- )
+ self.__send_message(f"You answered {total} questions.\n" f"Right: {right} ({right_percent:2.1f}%)\nWrong: {wrong} ({wrong_percent:2.1f}%)")
else:
self.unknown()
else:
@@ -1259,32 +1138,16 @@ def lofi_message(playing):
@staticmethod
def lofi_keyboard(playing: bool):
if playing:
- first_line_button = [
- inline_keyboard_button(
- "⏸ Pause", callback_data=AcceptableQueriesLoFi.pause.value
- )
- ]
+ first_line_button = [inline_keyboard_button("⏸ Pause", callback_data=AcceptableQueriesLoFi.pause.value)]
else:
- first_line_button = [
- inline_keyboard_button(
- "▶️ Play", callback_data=AcceptableQueriesLoFi.play.value
- )
- ]
+ first_line_button = [inline_keyboard_button("▶️ Play", callback_data=AcceptableQueriesLoFi.play.value)]
reply_markup = [
first_line_button,
[
- inline_keyboard_button(
- "🔉 Vol-", callback_data=AcceptableQueriesLoFi.volume_down.value
- ),
- inline_keyboard_button(
- "🔊 Vol+", callback_data=AcceptableQueriesLoFi.volume_plus.value
- ),
- ],
- [
- inline_keyboard_button(
- "❌ Close", callback_data=AcceptableQueriesLoFi.close.value
- )
+ inline_keyboard_button("🔉 Vol-", callback_data=AcceptableQueriesLoFi.volume_down.value),
+ inline_keyboard_button("🔊 Vol+", callback_data=AcceptableQueriesLoFi.volume_plus.value),
],
+ [inline_keyboard_button("❌ Close", callback_data=AcceptableQueriesLoFi.close.value)],
]
return reply_markup
@@ -1303,9 +1166,7 @@ def lofi_callback(self, query: str, messge_id: int):
if volume == -1:
volume = self.lofi_player_last_volume
if volume == 0:
- lofi_player.audio_set_volume(
- 10
- ) # automatically turn up the volume by one notch
+ lofi_player.audio_set_volume(10) # automatically turn up the volume by one notch
self.__edit_message(
messge_id,
"Playing... - current volume: " + str(volume),
@@ -1338,9 +1199,7 @@ def lofi_callback(self, query: str, messge_id: int):
if volume - 10 == 0:
self.lofi_player_last_volume = 0
lofi_player.stop() # otherwise volume == -1
- self.__edit_message(
- messge_id, "Stopping...", self.lofi_keyboard(False)
- )
+ self.__edit_message(messge_id, "Stopping...", self.lofi_keyboard(False))
else: # == -1
self.__edit_message(
messge_id,
@@ -1392,9 +1251,7 @@ def wol_callback(self, query: str, message_id: int):
self.__send_message("That machine does not exist")
return
Wol.send(mac)
- self.__edit_message(
- message_id, f"Waking up {machine} ({mac}) from its slumber...", None
- )
+ self.__edit_message(message_id, f"Waking up {machine} ({mac}) from its slumber...", None)
# noinspection PyUnusedLocal
def game_callback(self, query: str, message_id: int):
@@ -1407,9 +1264,7 @@ def game_callback(self, query: str, message_id: int):
self.__send_message("🏆 You're winner! 🏆\nAnother one? /game")
return
else:
- self.__send_message(
- f"Nope, that quote was from {result}\nAnother one? /game"
- )
+ self.__send_message(f"Nope, that quote was from {result}\nAnother one? /game")
def tolab_callback(self, query: str, message_id: int, user_id: int):
# ---------------- READMEEEEEEEEEEEEEE --------------------
@@ -1420,21 +1275,14 @@ def tolab_callback(self, query: str, message_id: int, user_id: int):
if data[0] == "hour":
for idx, session in enumerate(self.bot.active_sessions):
if session[0] == user_id:
- day = self._get_tolab_gui_days(
- idx, self.bot.active_sessions[idx][2]
- )
+ day = self._get_tolab_gui_days(idx, self.bot.active_sessions[idx][2])
sir_message = ""
if data[-2] != "hour":
hour_str = data[-2]
minute_str = data[-1]
# hour_str could be only one character, but minute_str should always be 2 characters long
# e.g. 9.27 or 8:30
- if (
- not hour_str
- or not minute_str
- or len(hour_str) > 2
- or len(minute_str) != 2
- ):
+ if not hour_str or not minute_str or len(hour_str) > 2 or len(minute_str) != 2:
self.bot.edit_message(
chat_id=self.__last_chat_id,
message_id=message_id,
@@ -1452,12 +1300,8 @@ def tolab_callback(self, query: str, message_id: int, user_id: int):
)
del self.bot.active_sessions[idx]
return
- if (not self.user.signedsir) and (
- self.user.dateofsafetytest is not None
- ):
- sir_message = (
- "\nRemember to sign the SIR when you get there! 📝"
- )
+ if (not self.user.signedsir) and (self.user.dateofsafetytest is not None):
+ sir_message = "\nRemember to sign the SIR when you get there! 📝"
# if people do tolab for a day that is after tomorrow then send also the "mark it down" message
if day > 1:
sir_message += "\nMark it down on your calendar!"
@@ -1474,9 +1318,7 @@ def tolab_callback(self, query: str, message_id: int, user_id: int):
else:
day = f"+{day}"
if len(data) > 2:
- self.tolab(
- the_time=f"{data[1]}:{data[2]}", day=day, is_gui=True
- )
+ self.tolab(the_time=f"{data[1]}:{data[2]}", day=day, is_gui=True)
self.bot.edit_message(
chat_id=self.__last_chat_id,
message_id=message_id,
@@ -1533,20 +1375,14 @@ def tolab_callback(self, query: str, message_id: int, user_id: int):
if user_id == session[0]:
return
if (idx + 1) == len(self.bot.active_sessions):
- self.bot.active_sessions.append(
- [user_id, message_id, f"{data[1]} {data[2]}"]
- )
+ self.bot.active_sessions.append([user_id, message_id, f"{data[1]} {data[2]}"])
return
# This is horrendous but it werks
- self.bot.active_sessions.append(
- [user_id, message_id, f"{data[1]} {data[2]}"]
- )
+ self.bot.active_sessions.append([user_id, message_id, f"{data[1]} {data[2]}"])
def logout(self, words):
if not self.user.isadmin:
- self.__send_message(
- "Sorry, this is a feature reserved to admins. You can ask an admin to do your logout."
- )
+ self.__send_message("Sorry, this is a feature reserved to admins. You can ask an admin to do your logout.")
return
username = words[0]
@@ -1557,28 +1393,16 @@ def logout(self, words):
logout_message.rstrip().replace(" ", " ")
if '"' in logout_message:
- self.__send_message(
- "What have I told you? The logout message cannot contain double quotes.\n"
- "Please try again."
- )
+ self.__send_message("What have I told you? The logout message cannot contain double quotes.\n" "Please try again.")
self.logout_help()
return
if logout_message.__len__() > MAX_WORK_DONE:
- self.__send_message(
- "Try not to write the story of your life. Re-send a shorter logout message with /logout"
- )
+ self.__send_message("Try not to write the story of your life. Re-send a shorter logout message with /logout")
return
# send commands
- command = str(
- ssh_weeelab_command["logout"][0]
- + username
- + ssh_weeelab_command["logout"][1]
- + '"'
- + logout_message
- + '"'
- )
+ command = str(ssh_weeelab_command["logout"][0] + username + ssh_weeelab_command["logout"][1] + '"' + logout_message + '"')
if not LOCAL_WEEELAB:
ssh_connection = SSHUtil(
username=SSH_SCMA_USER,
@@ -1597,8 +1421,7 @@ def logout(self, words):
# wol always exits with 0, cannot check if it worked
Wol.send(WOL_WEEELAB)
self.__send_message(
- "Sent wol command. Waiting a couple minutes until it's completed.\n"
- "I'll reach out to you when I've completed the logout process."
+ "Sent wol command. Waiting a couple minutes until it's completed.\n" "I'll reach out to you when I've completed the logout process."
)
# boot time is around 115 seconds
# check instead of guessing when the machine has finished booting
@@ -1618,9 +1441,7 @@ def logout(self, words):
def login(self, words):
if not self.user.isadmin:
- self.__send_message(
- "Sorry, this is a feature reserved to admins. You can ask an admin to do your login."
- )
+ self.__send_message("Sorry, this is a feature reserved to admins. You can ask an admin to do your login.")
return
username = words[0]
@@ -1646,8 +1467,7 @@ def login(self, words):
# wol always exits with 0, cannot check if it worked
Wol.send(WOL_WEEELAB)
self.__send_message(
- "Sent wol command. Waiting a couple minutes until it's completed.\n"
- "I'll reach out to you when I've completed the login process."
+ "Sent wol command. Waiting a couple minutes until it's completed.\n" "I'll reach out to you when I've completed the login process."
)
# boot time is around 115 seconds
# check instead of guessing when the machine has finished booting
@@ -1671,13 +1491,9 @@ def __check_weeelab_ssh(self, ssh_connection, username: str, action: str):
self.__send_message(action + " for " + username + " completed!")
# weeelab logout didn't work
elif ssh_connection.return_code == 3:
- self.__send_message(
- action + " didn't work. Try checking the parameters you've sent me."
- )
+ self.__send_message(action + " didn't work. Try checking the parameters you've sent me.")
else:
- self.__send_message(
- "Unexpected weeelab return code. Please check what happened."
- )
+ self.__send_message("Unexpected weeelab return code. Please check what happened.")
return
def quote(self, author: Optional[str]):
@@ -1692,9 +1508,7 @@ def quote(self, author: Optional[str]):
else:
context = ""
- self.__send_message(
- f"{escape_all(quote)} - {escape_all(author)}{escape_all(context)}"
- )
+ self.__send_message(f"{escape_all(quote)} - {escape_all(author)}{escape_all(context)}")
def motivami(self):
quote = self.quotes.get_demotivational_quote()
@@ -1707,9 +1521,7 @@ def motivami(self):
def i_am_door(self):
if not self.user.isadmin:
- self.__send_message(
- "Sorry, this is a feature reserved to admins. You can ask an admin to do your logout."
- )
+ self.__send_message("Sorry, this is a feature reserved to admins. You can ask an admin to do your logout.")
return
ssh_connection = SSHUtil(
@@ -1725,8 +1537,7 @@ def i_am_door(self):
# wol always exits with 0, cannot check if it worked
Wol.send(WOL_I_AM_DOOR)
self.__send_message(
- "Sent wol command. Waiting a couple minutes until it's completed.\n"
- "I'll reach out to you when I've completed the logout process."
+ "Sent wol command. Waiting a couple minutes until it's completed.\n" "I'll reach out to you when I've completed the logout process."
)
while True:
sleep(10)
@@ -1761,17 +1572,11 @@ def shutdown_prompt(self, machine):
message = "Do you want to shutdown the machine now?"
reply_markup = [
[inline_keyboard_button("Kill it with fire!", callback_data=yes)],
- [
- inline_keyboard_button(
- "No, it's crucial that it stays alive!", callback_data=no
- )
- ],
+ [inline_keyboard_button("No, it's crucial that it stays alive!", callback_data=no)],
]
self.__send_inline_keyboard(message, reply_markup)
- def shutdown_callback(
- self, query, message_id: int, ssh_user: str, ssh_host_ip: str, ssh_key_path: str
- ):
+ def shutdown_callback(self, query, message_id: int, ssh_user: str, ssh_host_ip: str, ssh_key_path: str):
shutdown_retry_times = 5
try:
@@ -1780,10 +1585,7 @@ def shutdown_callback(
self.__send_message("I did not understand that button press")
return
- if (
- query == AcceptableQueriesShutdown.weeelab_yes
- or query == AcceptableQueriesShutdown.i_am_door_yes
- ):
+ if query == AcceptableQueriesShutdown.weeelab_yes or query == AcceptableQueriesShutdown.i_am_door_yes:
ssh_connection = SSHUtil(
username=ssh_user,
host=ssh_host_ip,
@@ -1803,13 +1605,8 @@ def shutdown_callback(
None,
)
- elif (
- query == AcceptableQueriesShutdown.weeelab_no
- or query == AcceptableQueriesShutdown.i_am_door_no
- ):
- self.__edit_message(
- message_id, "Alright, we'll leave it alive. For now.", None
- )
+ elif query == AcceptableQueriesShutdown.weeelab_no or query == AcceptableQueriesShutdown.i_am_door_no:
+ self.__edit_message(message_id, "Alright, we'll leave it alive. For now.", None)
def status(self):
if not self.user.isadmin:
@@ -1822,9 +1619,7 @@ def status(self):
df_h_root_out = f"{run_shell_cmd('df -h /')}
"
python_out = f"{run_shell_cmd('pgrep -a python')}
"
- self.__send_message(
- "\n\n".join([uptime_out, free_h_out, df_h_root_out, python_out])
- )
+ self.__send_message("\n\n".join([uptime_out, free_h_out, df_h_root_out, python_out]))
@staticmethod
def __get_telegram_link_to_person(p: Person) -> str:
@@ -1845,12 +1640,7 @@ def __get_next_birthday_of_person(p: Person) -> datetime.date:
t = datetime.date.today()
return (
datetime.date(
- year=(
- t.year
- if (p.dateofbirth.month == t.month and p.dateofbirth.day > t.day)
- or p.dateofbirth.month > t.month
- else t.year + 1
- ),
+ year=(t.year if (p.dateofbirth.month == t.month and p.dateofbirth.day > t.day) or p.dateofbirth.month > t.month else t.year + 1),
month=p.dateofbirth.month,
day=p.dateofbirth.day,
)
@@ -1863,11 +1653,7 @@ def __sorted_birthday_people(self) -> List[Person]:
:return: list of people sorted by birth date
"""
return sorted(
- [
- p
- for p in self.people.get_all(self.conn)
- if not p.accountlocked and p.dateofbirth
- ],
+ [p for p in self.people.get_all(self.conn) if not p.accountlocked and p.dateofbirth],
key=lambda p: CommandHandler.__get_next_birthday_of_person(p),
)
@@ -1892,9 +1678,7 @@ def next_birthdays(self):
for p in self.__next_birthday_people()
]
)
- self.__send_message(
- f"The people who have a coming birthday 🎂 are:\n\n{bd_people}"
- )
+ self.__send_message(f"The people who have a coming birthday 🎂 are:\n\n{bd_people}")
def birthday_wisher(self):
"""
@@ -1911,10 +1695,7 @@ def birthday_wisher(self):
if not p.accountlocked
and p.tgid
and p.dateofbirth
- and (
- p.dateofbirth.month == datetime.date.today().month
- and p.dateofbirth.day == datetime.date.today().day
- )
+ and (p.dateofbirth.month == datetime.date.today().month and p.dateofbirth.day == datetime.date.today().day)
else None
)
for p in self.people.get_all(self.conn)
@@ -1954,11 +1735,7 @@ def __sorted_test_people(self) -> List[Person]:
:return: list of people sorted by safety test date
"""
return sorted(
- [
- p
- for p in self.people.get_all(self.conn)
- if not p.accountlocked and p.dateofsafetytest
- ],
+ [p for p in self.people.get_all(self.conn) if not p.accountlocked and p.dateofsafetytest],
key=lambda p: p.dateofsafetytest,
)
@@ -1971,10 +1748,7 @@ def __next_test_people(self) -> List[Person]:
for p in self.__sorted_test_people()
if p.dateofsafetytest.year >= datetime.date.today().year
and (
- (
- p.dateofsafetytest.month == datetime.date.today().month
- and p.dateofsafetytest.day >= datetime.date.today().day
- )
+ (p.dateofsafetytest.month == datetime.date.today().month and p.dateofsafetytest.day >= datetime.date.today().day)
or (p.dateofsafetytest.month > datetime.date.today().month)
)
]
@@ -1992,11 +1766,7 @@ def next_tests(self):
for p in self.__next_test_people()
]
)
- self.__send_message(
- f"The people who have a coming safety test 🛠 are:\n\n{test_people}"
- if test_people
- else "No safety tests planned at the moment."
- )
+ self.__send_message(f"The people who have a coming safety test 🛠 are:\n\n{test_people}" if test_people else "No safety tests planned at the moment.")
def safety_test_reminder(self):
"""
@@ -2010,14 +1780,9 @@ def safety_test_reminder(self):
test_people = []
for p in self.people.get_all(self.conn):
- if (
- p.dateofsafetytest
- and p.dateofsafetytest == datetime.date.today()
- ):
+ if p.dateofsafetytest and p.dateofsafetytest == datetime.date.today():
if p.tgid:
- test_people.append(
- CommandHandler.__get_telegram_link_to_person(p)
- )
+ test_people.append(CommandHandler.__get_telegram_link_to_person(p))
else:
test_people.append(p.cn)
@@ -2033,9 +1798,7 @@ def unknown(self):
"""
Called when an unknown command is received
"""
- self.__send_message(
- self.bot.unknown_command_message + "\n\nType /help for list of commands"
- )
+ self.__send_message(self.bot.unknown_command_message + "\n\nType /help for list of commands")
def tolab_help(self):
help_message = "Use /tolab and the time to tell the bot when you'll go to the lab.\n\n\
@@ -2105,9 +1868,7 @@ def main():
wave_obj = simpleaudio.WaveObject.from_wave_file("weeedong.wav")
else:
wave_obj = simpleaudio.WaveObject.from_wave_file("weeedong_default.wav")
- users = Users(
- LDAP_ADMIN_GROUPS, LDAP_TREE_PEOPLE, LDAP_TREE_INVITES, LDAP_TREE_GROUPS
- )
+ users = Users(LDAP_ADMIN_GROUPS, LDAP_TREE_PEOPLE, LDAP_TREE_INVITES, LDAP_TREE_GROUPS)
people = People(LDAP_ADMIN_GROUPS, LDAP_TREE_PEOPLE)
conn = LdapConnection(LDAP_SERVER, LDAP_USER, LDAP_PASS)
wol = WOL_MACHINES
@@ -2128,9 +1889,7 @@ def main():
fah_ranker_t = Thread(target=fah_ranker, args=(bot, 9, 0))
fah_ranker_t.start()
- handler = CommandHandler(
- bot, tarallo, logs, tolab, users, people, conn, wol, quotes
- )
+ handler = CommandHandler(bot, tarallo, logs, tolab, users, people, conn, wol, quotes)
birthday_wisher_t = Thread(target=handler.birthday_wisher)
birthday_wisher_t.start()
@@ -2236,10 +1995,7 @@ def main():
else:
handler.top()
- elif (
- command[0] == "/deletecache"
- or command[0] == "/deletecache@weeelab_bot"
- ):
+ elif command[0] == "/deletecache" or command[0] == "/deletecache@weeelab_bot":
handler.delete_cache()
elif command[0] == "/help" or command[0] == "/help@weeelab_bot":
@@ -2288,15 +2044,10 @@ def main():
elif command[0] == "/motivami" or command[0] == "/motivami@weeelab_bot":
handler.motivami()
- elif (
- command[0] == "/nextbirthdays"
- or command[0] == "/nextbirthdays@weeelab_bot"
- ):
+ elif command[0] == "/nextbirthdays" or command[0] == "/nextbirthdays@weeelab_bot":
handler.next_birthdays()
- elif (
- command[0] == "/nexttests" or command[0] == "/nexttests@weeelab_bot"
- ):
+ elif command[0] == "/nexttests" or command[0] == "/nexttests@weeelab_bot":
handler.next_tests()
else:
@@ -2305,9 +2056,7 @@ def main():
tolab_active_sessions = handler.get_tolab_active_sessions()
for idx, session in enumerate(tolab_active_sessions):
if user_id in session:
- handler.tolab_callback(
- f"hour:{command[0]}", session[1], user_id
- )
+ handler.tolab_callback(f"hour:{command[0]}", session[1], user_id)
flag = False
break
if flag: