From cee6cc7917f34725fcfa093225d2311b31ddcdbc Mon Sep 17 00:00:00 2001 From: Eiko Wagenknecht Date: Tue, 7 Nov 2023 20:10:36 +0100 Subject: [PATCH] feat: only allow admins to control the bot in groups --- src/lootscraper/telegrambot.py | 166 +++++++++++++++++++++++++-------- 1 file changed, 128 insertions(+), 38 deletions(-) diff --git a/src/lootscraper/telegrambot.py b/src/lootscraper/telegrambot.py index adc52625..8ea2665a 100644 --- a/src/lootscraper/telegrambot.py +++ b/src/lootscraper/telegrambot.py @@ -365,7 +365,11 @@ async def debug_command( await self.log_call(update) - if update.effective_message is None or update.effective_chat is None: + if ( + not update.effective_chat + or not update.effective_message + or not await self.user_can_control_bot(update) + ): return await update.effective_message.reply_markdown_v2( @@ -401,6 +405,13 @@ async def error_command( await self.log_call(update) + if ( + not update.effective_chat + or not update.effective_message + or not await self.user_can_control_bot(update) + ): + return + raise Exception( # noqa: TRY002 "This is a test error triggered by the /error command.", ) @@ -415,7 +426,11 @@ async def help_command( await self.log_call(update) - if update.effective_message is None: + if ( + not update.effective_chat + or not update.effective_message + or not await self.user_can_control_bot(update) + ): return await update.effective_message.reply_markdown_v2(MESSAGE_HELP) @@ -430,7 +445,11 @@ async def leave_command( await self.log_call(update) - if update.effective_message is None or update.effective_chat is None: + if ( + not update.effective_chat + or not update.effective_message + or not await self.user_can_control_bot(update) + ): return db_chat = self.get_chat_by_update(update) @@ -473,7 +492,11 @@ async def manage_command( await self.log_call(update) - if update.effective_message is None or update.effective_chat is None: + if ( + not update.effective_chat + or not update.effective_message + or not await self.user_can_control_bot(update) + ): return db_chat = self.get_chat_by_update(update) @@ -500,7 +523,11 @@ async def refresh_command( await self.log_call(update) - if update.effective_message is None or update.effective_chat is None: + if ( + not update.effective_chat + or not update.effective_message + or not await self.user_can_control_bot(update) + ): return db_chat = self.get_chat_by_update(update) @@ -526,7 +553,11 @@ async def start_command( await self.log_call(update) - if update.effective_message is None or update.effective_chat is None: + if ( + not update.effective_chat + or not update.effective_message + or not await self.user_can_control_bot(update) + ): return welcome_text = ( @@ -631,7 +662,11 @@ async def status_command( await self.log_call(update) - if not update.effective_chat or not update.effective_message: + if ( + not update.effective_chat + or not update.effective_message + or not await self.user_can_control_bot(update) + ): return db_chat = self.get_chat_by_update(update) @@ -713,7 +748,11 @@ async def timezone_command( await self.log_call(update) - if not update.effective_chat or not update.effective_message: + if ( + not update.effective_chat + or not update.effective_message + or not await self.user_can_control_bot(update) + ): return await update.effective_message.reply_text( @@ -729,7 +768,11 @@ async def unknown_command( """Handle unknown commands.""" await self.log_call(update) - if not update.effective_chat or not update.effective_message: + if ( + not update.effective_chat + or not update.effective_message + or not await self.user_can_control_bot(update) + ): return # Special handling for channels @@ -793,7 +836,11 @@ async def offer_callback( await self.log_call(update) - if update.callback_query is None or update.effective_chat is None: + if ( + update.callback_query is None + or update.effective_chat is None + or not await self.user_can_control_bot(update) + ): return query = update.callback_query @@ -853,7 +900,7 @@ async def dismiss_callback( await self.log_call(update) - if update.callback_query is None: + if update.callback_query is None or not await self.user_can_control_bot(update): return try: @@ -882,7 +929,11 @@ async def close_callback( """Callback from the menu button "Close" in various menus.""" await self.log_call(update) - if update.callback_query is None or update.effective_chat is None: + if ( + update.callback_query is None + or update.effective_chat is None + or not await self.user_can_control_bot(update) + ): return query = update.callback_query @@ -912,7 +963,12 @@ async def toggle_subscription_callback( await self.log_call(update) query = update.callback_query - if query is None or update.effective_chat is None or query.data is None: + if ( + query is None + or update.effective_chat is None + or query.data is None + or not await self.user_can_control_bot(update) + ): return db_chat = self.get_chat_by_update(update) @@ -952,7 +1008,12 @@ async def set_timezone_callback( await self.log_call(update) query = update.callback_query - if query is None or update.effective_chat is None or query.data is None: + if ( + query is None + or update.effective_chat is None + or query.data is None + or not await self.user_can_control_bot(update) + ): return data = int(query.data.removeprefix("settimezone").strip()) @@ -1156,28 +1217,6 @@ def unsubscribe( session.rollback() raise - async def manage_menu( - self, - update: Update, - context: ContextTypes.DEFAULT_TYPE, - ) -> None: - del context # Unused - - if update.callback_query is None or update.effective_chat is None: - return - - db_chat = self.get_chat_by_update(update) - - if db_chat is None: - await update.callback_query.answer(text=MESSAGE_CHAT_NOT_REGISTERED) - return - - await update.callback_query.answer() - await update.callback_query.edit_message_text( - text=MESSAGE_MANAGE_MENU, - reply_markup=self.manage_keyboard(db_chat), - ) - def manage_keyboard(self, chat: TelegramChat) -> InlineKeyboardMarkup: keyboard: list[list[InlineKeyboardButton]] = [] @@ -1610,8 +1649,59 @@ async def send_message( return None - def is_group_chat(self, chat_id: int | str) -> bool: - return int(chat_id) < 0 + async def user_can_control_bot(self, update: Update) -> bool: + if update.effective_chat is None: + # This should never happen, but be safe anyways. + logger.warning("Cannot control bot: Unknown chat.") + return False + + if update.effective_chat.type == ChatType.PRIVATE: + # For private chats, the user always has control. + return True + + if ( + update.effective_chat.type == ChatType.GROUP + or update.effective_chat.type == ChatType.SUPERGROUP + ): + if update.effective_user is None: + # Unknown user in group, should not happen. + logger.warning( + "Cannot control bot: Unknown user in group with id " + f"{update.effective_chat.id}", + ) + return False + + return await self.is_user_admin( + update.effective_chat.id, + update.effective_user.id, + ) + + if update.effective_chat.type == ChatType.CHANNEL: + # We cannot restrict for chats, so we allow everyone to control the bot. + # Only admins should hopefully be able to send commands anyways. + return True + + # Unknown chat type, no control + logger.warning( + "Cannot control bot: Unknown chat type with id " + f"{update.effective_chat.id}", + ) + return False + + async def is_user_admin(self, chat_id: int, user_id: int) -> bool: + if user_id is None: + return False + + try: + chat_member = await self.application.bot.get_chat_member(chat_id, user_id) # type: ignore + if (chat_member.status == chat_member.OWNER) or ( + chat_member.status == chat_member.ADMINISTRATOR + ): + return True + except TelegramError as e: + logger.warning(f"Checking the admin status failed: {e}") + + return False def deactivate_chat(self, chat: TelegramChat, reason: str) -> None: logger.debug(f"Deactivating chat {chat.chat_id}.")