Skip to content

Commit

Permalink
feat: only allow admins to control the bot in groups
Browse files Browse the repository at this point in the history
  • Loading branch information
eikowagenknecht committed Nov 7, 2023
1 parent 98fc679 commit cee6cc7
Showing 1 changed file with 128 additions and 38 deletions.
166 changes: 128 additions & 38 deletions src/lootscraper/telegrambot.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,11 @@ async def debug_command(

await self.log_call(update)

if update.effective_message is None or update.effective_chat is None:
if (
not update.effective_chat
or not update.effective_message
or not await self.user_can_control_bot(update)
):
return

await update.effective_message.reply_markdown_v2(
Expand Down Expand Up @@ -401,6 +405,13 @@ async def error_command(

await self.log_call(update)

if (
not update.effective_chat
or not update.effective_message
or not await self.user_can_control_bot(update)
):
return

raise Exception( # noqa: TRY002
"This is a test error triggered by the /error command.",
)
Expand All @@ -415,7 +426,11 @@ async def help_command(

await self.log_call(update)

if update.effective_message is None:
if (
not update.effective_chat
or not update.effective_message
or not await self.user_can_control_bot(update)
):
return

await update.effective_message.reply_markdown_v2(MESSAGE_HELP)
Expand All @@ -430,7 +445,11 @@ async def leave_command(

await self.log_call(update)

if update.effective_message is None or update.effective_chat is None:
if (
not update.effective_chat
or not update.effective_message
or not await self.user_can_control_bot(update)
):
return

db_chat = self.get_chat_by_update(update)
Expand Down Expand Up @@ -473,7 +492,11 @@ async def manage_command(

await self.log_call(update)

if update.effective_message is None or update.effective_chat is None:
if (
not update.effective_chat
or not update.effective_message
or not await self.user_can_control_bot(update)
):
return

db_chat = self.get_chat_by_update(update)
Expand All @@ -500,7 +523,11 @@ async def refresh_command(

await self.log_call(update)

if update.effective_message is None or update.effective_chat is None:
if (
not update.effective_chat
or not update.effective_message
or not await self.user_can_control_bot(update)
):
return

db_chat = self.get_chat_by_update(update)
Expand All @@ -526,7 +553,11 @@ async def start_command(

await self.log_call(update)

if update.effective_message is None or update.effective_chat is None:
if (
not update.effective_chat
or not update.effective_message
or not await self.user_can_control_bot(update)
):
return

welcome_text = (
Expand Down Expand Up @@ -631,7 +662,11 @@ async def status_command(

await self.log_call(update)

if not update.effective_chat or not update.effective_message:
if (
not update.effective_chat
or not update.effective_message
or not await self.user_can_control_bot(update)
):
return

db_chat = self.get_chat_by_update(update)
Expand Down Expand Up @@ -713,7 +748,11 @@ async def timezone_command(

await self.log_call(update)

if not update.effective_chat or not update.effective_message:
if (
not update.effective_chat
or not update.effective_message
or not await self.user_can_control_bot(update)
):
return

await update.effective_message.reply_text(
Expand All @@ -729,7 +768,11 @@ async def unknown_command(
"""Handle unknown commands."""
await self.log_call(update)

if not update.effective_chat or not update.effective_message:
if (
not update.effective_chat
or not update.effective_message
or not await self.user_can_control_bot(update)
):
return

# Special handling for channels
Expand Down Expand Up @@ -793,7 +836,11 @@ async def offer_callback(

await self.log_call(update)

if update.callback_query is None or update.effective_chat is None:
if (
update.callback_query is None
or update.effective_chat is None
or not await self.user_can_control_bot(update)
):
return

query = update.callback_query
Expand Down Expand Up @@ -853,7 +900,7 @@ async def dismiss_callback(

await self.log_call(update)

if update.callback_query is None:
if update.callback_query is None or not await self.user_can_control_bot(update):
return

try:
Expand Down Expand Up @@ -882,7 +929,11 @@ async def close_callback(
"""Callback from the menu button "Close" in various menus."""
await self.log_call(update)

if update.callback_query is None or update.effective_chat is None:
if (
update.callback_query is None
or update.effective_chat is None
or not await self.user_can_control_bot(update)
):
return

query = update.callback_query
Expand Down Expand Up @@ -912,7 +963,12 @@ async def toggle_subscription_callback(
await self.log_call(update)

query = update.callback_query
if query is None or update.effective_chat is None or query.data is None:
if (
query is None
or update.effective_chat is None
or query.data is None
or not await self.user_can_control_bot(update)
):
return

db_chat = self.get_chat_by_update(update)
Expand Down Expand Up @@ -952,7 +1008,12 @@ async def set_timezone_callback(
await self.log_call(update)

query = update.callback_query
if query is None or update.effective_chat is None or query.data is None:
if (
query is None
or update.effective_chat is None
or query.data is None
or not await self.user_can_control_bot(update)
):
return

data = int(query.data.removeprefix("settimezone").strip())
Expand Down Expand Up @@ -1156,28 +1217,6 @@ def unsubscribe(
session.rollback()
raise

async def manage_menu(
self,
update: Update,
context: ContextTypes.DEFAULT_TYPE,
) -> None:
del context # Unused

if update.callback_query is None or update.effective_chat is None:
return

db_chat = self.get_chat_by_update(update)

if db_chat is None:
await update.callback_query.answer(text=MESSAGE_CHAT_NOT_REGISTERED)
return

await update.callback_query.answer()
await update.callback_query.edit_message_text(
text=MESSAGE_MANAGE_MENU,
reply_markup=self.manage_keyboard(db_chat),
)

def manage_keyboard(self, chat: TelegramChat) -> InlineKeyboardMarkup:
keyboard: list[list[InlineKeyboardButton]] = []

Expand Down Expand Up @@ -1610,8 +1649,59 @@ async def send_message(

return None

def is_group_chat(self, chat_id: int | str) -> bool:
return int(chat_id) < 0
async def user_can_control_bot(self, update: Update) -> bool:
if update.effective_chat is None:
# This should never happen, but be safe anyways.
logger.warning("Cannot control bot: Unknown chat.")
return False

if update.effective_chat.type == ChatType.PRIVATE:
# For private chats, the user always has control.
return True

if (
update.effective_chat.type == ChatType.GROUP
or update.effective_chat.type == ChatType.SUPERGROUP
):
if update.effective_user is None:
# Unknown user in group, should not happen.
logger.warning(
"Cannot control bot: Unknown user in group with id "
f"{update.effective_chat.id}",
)
return False

return await self.is_user_admin(
update.effective_chat.id,
update.effective_user.id,
)

if update.effective_chat.type == ChatType.CHANNEL:
# We cannot restrict for chats, so we allow everyone to control the bot.
# Only admins should hopefully be able to send commands anyways.
return True

# Unknown chat type, no control
logger.warning(
"Cannot control bot: Unknown chat type with id "
f"{update.effective_chat.id}",
)
return False

async def is_user_admin(self, chat_id: int, user_id: int) -> bool:
if user_id is None:
return False

try:
chat_member = await self.application.bot.get_chat_member(chat_id, user_id) # type: ignore
if (chat_member.status == chat_member.OWNER) or (
chat_member.status == chat_member.ADMINISTRATOR
):
return True
except TelegramError as e:
logger.warning(f"Checking the admin status failed: {e}")

return False

def deactivate_chat(self, chat: TelegramChat, reason: str) -> None:
logger.debug(f"Deactivating chat {chat.chat_id}.")
Expand Down

0 comments on commit cee6cc7

Please sign in to comment.