From af40744d53f6a0fbed37c8910229225dc8619df5 Mon Sep 17 00:00:00 2001 From: Eiko Wagenknecht Date: Fri, 27 Oct 2023 23:46:21 +0200 Subject: [PATCH] feat: add support for channels and supergroups (#289) Signed-off-by: Eiko Wagenknecht --- README.md | 2 + alembic.ini | 6 - scripts/__init__.py | 0 scripts/db_refresh.py | 43 + .../20230202_100720_initial_sqlalchemy_2_0.py | 10 - .../20231027_115622_revamp_telegram.py | 150 ++++ src/lootscraper/database.py | 62 +- src/lootscraper/main.py | 29 +- src/lootscraper/processing.py | 15 +- src/lootscraper/telegrambot.py | 802 +++++++----------- tests/test_telegram.py | 14 +- 11 files changed, 606 insertions(+), 527 deletions(-) create mode 100644 scripts/__init__.py create mode 100644 scripts/db_refresh.py create mode 100644 src/lootscraper/alembic/versions/20231027_115622_revamp_telegram.py diff --git a/README.md b/README.md index 43f93f76..bee4d718 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,8 @@ For our mobile gamers: Want to receive only the offers *you* want in a single chat? Subscribe directly to the source: The [Telegram LootScraperBot](https://t.me/LootScraperBot) will happily send you push notifications for new offers. You can choose which categories you want to subscribe to. +If you want, you can even add the bot to your own groups (including threaded groups) and channels. Just make sure to give it the neccessary permissions (admin rights work best). + This is what it currently looks like in Telegram: ![image](https://user-images.githubusercontent.com/1475672/166058823-98e2beb9-7eb5-403d-94c7-7e17966fe9b7.png) diff --git a/alembic.ini b/alembic.ini index 3f16d605..c6dd53fe 100644 --- a/alembic.ini +++ b/alembic.ini @@ -60,12 +60,6 @@ version_path_separator = os # Use os.pathsep. Default configuration used for ne # on newly generated revision scripts. See the documentation for further # detail and examples -# format using "black" - use the console_scripts runner, against the "black" entrypoint -hooks = black -black.type = console_scripts -black.entrypoint = black -black.options = -l 79 REVISION_SCRIPT_FILENAME - # Logging configuration [loggers] keys = root,sqlalchemy,alembic diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/scripts/db_refresh.py b/scripts/db_refresh.py new file mode 100644 index 00000000..b2bf9352 --- /dev/null +++ b/scripts/db_refresh.py @@ -0,0 +1,43 @@ +# Script to copy over all data from the old database to the new one. +# The new one should be generated with a run of lootscraper without an existing +# database file. +import sqlite3 + +source_conn = sqlite3.connect("lootscraper.db") +target_conn = sqlite3.connect("lootscraper_refreshed.db") + +source_cursor = source_conn.cursor() +target_cursor = target_conn.cursor() + +# Disable foreign keys +source_cursor.execute("PRAGMA foreign_keys = OFF;") +target_cursor.execute("PRAGMA foreign_keys = OFF;") + +# Get the list of all tables in source database +source_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") +tables = source_cursor.fetchall() + +# Copy data from source to target database +for table in tables: + table_name = table[0] + source_cursor.execute(f"SELECT * FROM {table_name};") # noqa: S608 + + rows = source_cursor.fetchall() + if not rows: + continue + + placeholders = ", ".join(["?"] * len(rows[0])) + target_cursor.executemany( + f"INSERT INTO {table_name} VALUES ({placeholders});", # noqa: S608 + rows, + ) + +# Enable foreign keys +source_cursor.execute("PRAGMA foreign_keys = ON;") +target_cursor.execute("PRAGMA foreign_keys = ON;") + +source_conn.commit() +target_conn.commit() + +source_conn.close() +target_conn.close() diff --git a/src/lootscraper/alembic/versions/20230202_100720_initial_sqlalchemy_2_0.py b/src/lootscraper/alembic/versions/20230202_100720_initial_sqlalchemy_2_0.py index a276c044..5a859f93 100644 --- a/src/lootscraper/alembic/versions/20230202_100720_initial_sqlalchemy_2_0.py +++ b/src/lootscraper/alembic/versions/20230202_100720_initial_sqlalchemy_2_0.py @@ -174,13 +174,3 @@ def upgrade() -> None: ), sa.PrimaryKeyConstraint("id"), ) - - -def downgrade() -> None: - op.drop_table("offers") - op.drop_table("telegram_subscriptions") - op.drop_table("games") - op.drop_table("users") - op.drop_table("steam_info") - op.drop_table("igdb_info") - op.drop_table("announcements") diff --git a/src/lootscraper/alembic/versions/20231027_115622_revamp_telegram.py b/src/lootscraper/alembic/versions/20231027_115622_revamp_telegram.py new file mode 100644 index 00000000..5c25aabf --- /dev/null +++ b/src/lootscraper/alembic/versions/20231027_115622_revamp_telegram.py @@ -0,0 +1,150 @@ +""" +Revamp Telegram. + +Revision ID: 8338a761b831 +Revises: 023117ae895e +Create Date: 2023-10-27 11:56:22.363419+00:00 + +""" +import json +import logging + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import orm +from telegram.constants import ChatType + +from lootscraper.database import AwareDateTime, Base, TelegramChat + +# revision identifiers, used by Alembic. +revision = "8338a761b831" +down_revision = "023117ae895e" +branch_labels = None +depends_on = None + + +class TempUser(Base): + __tablename__ = "users" + + id = sa.Column(sa.Integer, primary_key=True) # noqa: A003 + registration_date = sa.Column(AwareDateTime, nullable=False) + telegram_id = sa.Column(sa.String, nullable=True) + telegram_chat_id = sa.Column(sa.String) + telegram_user_details = sa.Column(sa.JSON) + timezone_offset = sa.Column(sa.Integer) + inactive = sa.Column(sa.String, default=None) + offers_received_count = sa.Column(sa.Integer, default=0) + last_announcement_id = sa.Column(sa.Integer, default=0) + + +def upgrade() -> None: + # First create the telegram_chats table + logging.info("Creating new telegram_chats table") + op.create_table( + "telegram_chats", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("registration_date", AwareDateTime(), nullable=False), + sa.Column("chat_type", sa.Enum(ChatType), nullable=False), + sa.Column("chat_id", sa.Integer(), nullable=False), + sa.Column("user_id", sa.Integer(), nullable=True), + sa.Column("thread_id", sa.Integer(), nullable=True), + sa.Column("chat_details", sa.JSON(), nullable=True), + sa.Column("user_details", sa.JSON(), nullable=True), + sa.Column("timezone_offset", sa.Integer(), nullable=False), + sa.Column("active", sa.Boolean(), nullable=False), + sa.Column("inactive_reason", sa.String(), nullable=True), + sa.Column("offers_received_count", sa.Integer(), nullable=False), + sa.Column("last_announcement_id", sa.Integer(), nullable=False), + sa.PrimaryKeyConstraint("id"), + ) + + # Fill the new table with data from the old one + bind = op.get_bind() + with orm.Session(bind=bind) as session: + seen_chat_ids = set() + for user in session.query(TempUser): + if user.telegram_chat_id in seen_chat_ids: + logging.warning(f"Skipping duplicate chat {user.telegram_chat_id}") + continue + seen_chat_ids.add(user.telegram_chat_id) + logging.info(f"Migrating user {user.id}") + + # Determine chat type + if "Channel user created by admin" in user.telegram_user_details: + chat_type = ChatType.CHANNEL + user_details = None + elif int(user.telegram_chat_id) < 0: + chat_type = ChatType.GROUP + user_details = user.telegram_user_details + else: + chat_type = ChatType.PRIVATE + user_details = user.telegram_user_details + + user_id = int(user.telegram_id) if int(user.telegram_id) > 0 else None + + if user_details and not isinstance(user_details, dict): + user_details = json.loads(user_details) # type: ignore + + new_chat = TelegramChat( + registration_date=user.registration_date, + user_id=user_id, + chat_id=int(user.telegram_chat_id), + user_details=user_details, + chat_details=None, + timezone_offset=user.timezone_offset, + active=user.inactive is None, + inactive_reason=user.inactive, + offers_received_count=user.offers_received_count, + last_announcement_id=user.last_announcement_id, + chat_type=chat_type, + ) + # Keep the primary key! + new_chat.id = user.id + session.add(new_chat) + + session.commit() + + # Drop all foreign keys workaround. They can't be dropped directly because + # they have no name. + conn = op.get_bind() + # Disable foreign key constraint temporarily + conn.execute(sa.text("PRAGMA foreign_keys=off;")) + conn.execute( + sa.text( + """ + CREATE TABLE "new_telegram_subscriptions" ( + "id" INTEGER NOT NULL, + "chat_id" INTEGER NOT NULL, + "source" VARCHAR(7) NOT NULL, + "type" VARCHAR(4) NOT NULL, + "last_offer_id" INTEGER NOT NULL, + "duration" VARCHAR(9) NOT NULL, + CONSTRAINT "fk_telegram_subscriptions_chat_id_telegram_chats" + FOREIGN KEY("chat_id") + REFERENCES "telegram_chats"("id"), + CONSTRAINT "pk_telegram_subscriptions" + PRIMARY KEY("id") + ); + """, + ), + ) + conn.execute( + sa.text( + """ + INSERT INTO new_telegram_subscriptions + (id, chat_id, source, type, last_offer_id, duration) + SELECT id, user_id, source, type, last_offer_id, duration + FROM telegram_subscriptions; + """, + ), + ) + conn.execute(sa.text("DROP TABLE telegram_subscriptions;")) + conn.execute( + sa.text( + "ALTER TABLE new_telegram_subscriptions RENAME TO telegram_subscriptions;", + ), + ) + conn.execute(sa.text("PRAGMA foreign_keys=on;")) # Enable back the foreign keys + + # At last, drop the old users table + op.drop_table("users") diff --git a/src/lootscraper/database.py b/src/lootscraper/database.py index 380000d1..f2585d9f 100644 --- a/src/lootscraper/database.py +++ b/src/lootscraper/database.py @@ -18,8 +18,15 @@ scoped_session, sessionmaker, ) - -from lootscraper.common import Category, Channel, OfferDuration, OfferType, Source +from telegram.constants import ChatType + +from lootscraper.common import ( + Category, + Channel, + OfferDuration, + OfferType, + Source, +) from lootscraper.config import Config from lootscraper.utils import calc_real_valid_to @@ -31,12 +38,23 @@ logger = logging.getLogger(__name__) -# mapper_registry = orm.registry() +# Naming convention for keys and constraints +convention = { + "ix": "ix_%(column_0_label)s", + "uq": "uq_%(table_name)s_%(column_0_name)s", + "ck": "ck_%(table_name)s_%(constraint_name)s", + "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", + "pk": "pk_%(table_name)s", +} + +metadata_obj = sa.MetaData(naming_convention=convention) class Base(MappedAsDataclass, DeclarativeBase): """Subclasses will be converted to dataclasses.""" + metadata = metadata_obj + class AwareDateTime(sa.TypeDecorator): # type: ignore """Results returned as aware datetimes, not naive ones.""" @@ -205,45 +223,49 @@ def real_valid_to(self) -> datetime | None: return calc_real_valid_to(self.seen_last, self.valid_to) -class User(Base): - """A user of the application.""" +class TelegramChat(Base): + """A Telegram chat. Can be a single user, a group or a channel.""" - __tablename__ = "users" + __tablename__ = "telegram_chats" - telegram_subscriptions: Mapped[list[TelegramSubscription]] = relationship( + subscriptions: Mapped[list[TelegramSubscription]] = relationship( "TelegramSubscription", - back_populates="user", + back_populates="chat", cascade="all, delete-orphan", init=False, ) - id: Mapped[int] = mapped_column( # noqa: A003 - init=False, - primary_key=True, - ) + id: Mapped[int] = mapped_column(init=False, primary_key=True) # noqa: A003 registration_date: Mapped[datetime] = mapped_column(AwareDateTime) - telegram_id: Mapped[str | None] - telegram_chat_id: Mapped[str] - telegram_user_details: Mapped[dict[str, Any] | None] = mapped_column(sa.JSON) + chat_type: Mapped[ChatType] = mapped_column(sa.Enum(ChatType)) + chat_id: Mapped[int] + user_id: Mapped[int | None] = mapped_column(default=None) + thread_id: Mapped[int | None] = mapped_column(default=None) + chat_details: Mapped[dict[str, Any] | None] = mapped_column(sa.JSON, default=None) + user_details: Mapped[dict[str, Any] | None] = mapped_column(sa.JSON, default=None) timezone_offset: Mapped[int] = mapped_column(default=0) - inactive: Mapped[str | None] = mapped_column(default=None) + active: Mapped[bool] = mapped_column(default=True) + inactive_reason: Mapped[str | None] = mapped_column(default=None) offers_received_count: Mapped[int] = mapped_column(default=0) last_announcement_id: Mapped[int] = mapped_column(default=0) class TelegramSubscription(Base): - """Subscription of a user to a category for Telegram notifications.""" + """Subscription of a chat to a category for Telegram notifications.""" __tablename__ = "telegram_subscriptions" - user: Mapped[User] = relationship("User", back_populates="telegram_subscriptions") + chat: Mapped[TelegramChat] = relationship( + "TelegramChat", + back_populates="subscriptions", + ) id: Mapped[int] = mapped_column( # noqa: A003 init=False, primary_key=True, ) - user_id: Mapped[int] = mapped_column( - sa.ForeignKey("users.id"), + chat_id: Mapped[int] = mapped_column( + sa.ForeignKey("telegram_chats.id"), init=False, ) source: Mapped[Source] = mapped_column(sa.Enum(Source)) diff --git a/src/lootscraper/main.py b/src/lootscraper/main.py index cb1ebb14..e4af157f 100644 --- a/src/lootscraper/main.py +++ b/src/lootscraper/main.py @@ -14,7 +14,7 @@ from lootscraper import __version__ from lootscraper.browser import get_browser_context from lootscraper.common import TIMESTAMP_LONG -from lootscraper.config import Config +from lootscraper.config import Config, TelegramLogLevel from lootscraper.database import LootDatabase from lootscraper.processing import ( action_generate_feed, @@ -138,7 +138,12 @@ def setup_logging() -> None: handlers.append(stream_handler) # Create a rotating log file, size: 5 MB, keep 10 files - file_handler = RotatingFileHandler(filename, maxBytes=5 * 1024**2, backupCount=10) + file_handler = RotatingFileHandler( + filename, + maxBytes=5 * 1024**2, + backupCount=10, + encoding="utf-8", + ) file_handler.setFormatter(logging.Formatter(LOGFORMAT)) handlers.append(file_handler) @@ -162,11 +167,21 @@ async def run_telegram_bot( async with TelegramBot(Config.get(), db) as bot: # The bot is running now and will stop when the context exits try: - telegram_handler = TelegramLoggingHandler(bot) - # Only log errors and above to Telegram. TODO: Make this configurable. - telegram_handler.setLevel(logging.ERROR) - telegram_handler.setFormatter(logging.Formatter(LOGFORMAT)) - logger.addHandler(telegram_handler) + lvl = Config.get().telegram_log_level + if lvl != TelegramLogLevel.DISABLED: + telegram_handler = TelegramLoggingHandler(bot) + if lvl == TelegramLogLevel.DEBUG: + telegram_handler.setLevel(logging.DEBUG) + elif lvl == TelegramLogLevel.INFO: + telegram_handler.setLevel(logging.INFO) + elif lvl == TelegramLogLevel.WARNING: + telegram_handler.setLevel(logging.WARNING) + elif lvl == TelegramLogLevel.ERROR: + telegram_handler.setLevel(logging.ERROR) + + telegram_handler.setLevel(logging.ERROR) + telegram_handler.setFormatter(logging.Formatter(LOGFORMAT)) + logger.addHandler(telegram_handler) except Exception: logger.exception("Could not add Telegram logging handler.") diff --git a/src/lootscraper/processing.py b/src/lootscraper/processing.py index da8d9c3c..d3fe0a14 100644 --- a/src/lootscraper/processing.py +++ b/src/lootscraper/processing.py @@ -10,7 +10,14 @@ from lootscraper.common import TIMESTAMP_LONG, OfferDuration, OfferType, Source from lootscraper.config import Config -from lootscraper.database import Game, IgdbInfo, LootDatabase, Offer, SteamInfo, User +from lootscraper.database import ( + Game, + IgdbInfo, + LootDatabase, + Offer, + SteamInfo, + TelegramChat, +) from lootscraper.feed import generate_feed from lootscraper.html import generate_html from lootscraper.scraper import get_all_scrapers @@ -81,9 +88,9 @@ async def process_new_offers( async def send_new_offers_telegram(db: LootDatabase, bot: TelegramBot) -> None: session: Session = db.Session() try: - user: User - for user in session.execute(select(User)).scalars().all(): - if not user.inactive: + user: TelegramChat + for user in session.execute(select(TelegramChat)).scalars().all(): + if not user.inactive_reason: await bot.send_new_announcements(user) await bot.send_new_offers(user) except Exception: diff --git a/src/lootscraper/telegrambot.py b/src/lootscraper/telegrambot.py index 65d07bbb..adc52625 100644 --- a/src/lootscraper/telegrambot.py +++ b/src/lootscraper/telegrambot.py @@ -13,6 +13,7 @@ import telegram from sqlalchemy import orm from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Message, Update +from telegram.constants import ChatType from telegram.error import TelegramError from telegram.ext import ( AIORateLimiter, @@ -36,14 +37,14 @@ markdown_escape, markdown_url, ) -from lootscraper.config import Config, ParsedConfig, TelegramLogLevel +from lootscraper.config import Config, ParsedConfig from lootscraper.database import ( Announcement, Game, LootDatabase, Offer, + TelegramChat, TelegramSubscription, - User, ) from lootscraper.scraper import get_all_scrapers @@ -89,7 +90,7 @@ MESSAGE_UNKNOWN_COMMAND = ( "Sorry, I didn't understand that command. Type /help to see all commands." ) -MESSAGE_USER_NOT_REGISTERED = ( +MESSAGE_CHAT_NOT_REGISTERED = ( "You are not registered. Please, register with /start command." ) MESSAGE_NO_SUBSCRIPTIONS = "You have no subscriptions. Change that with /manage." @@ -155,7 +156,6 @@ async def start(self) -> None: # Register "/..." commands self.application.add_handler(CommandHandler("announce", self.announce_command)) - self.application.add_handler(CommandHandler("channel", self.channel_command)) self.application.add_handler(CommandHandler("debug", self.debug_command)) self.application.add_handler(CommandHandler("error", self.error_command)) self.application.add_handler(CommandHandler("help", self.help_command)) @@ -166,6 +166,7 @@ async def start(self) -> None: self.application.add_handler(CommandHandler("status", self.status_command)) self.application.add_handler(CommandHandler("timezone", self.timezone_command)) # Fallback for all other messages starting with "/" + # (also messages in channels) self.application.add_handler( MessageHandler(filters.COMMAND, self.unknown_command), ) @@ -231,12 +232,7 @@ def updater_error_handler(self, error: TelegramError) -> None: logger.error(f"Error while polling for messages from Telegram: {error}") async def notify_admin_and_stop(self, error_text: str) -> None: - await self.send_message( - chat_id=Config.get().telegram_developer_chat_id, - text=error_text, - parse_mode=None, - ) - + await self.send_dev_message(error_text) await self.stop() async def error_handler( @@ -277,12 +273,13 @@ async def error_handler( and update.effective_chat ): # The bot was removed from a group chat. - chat_id = update.effective_chat.id - logger.info( - f"Deactivating user with group chat id {chat_id} because the " - "bot was removed from the group.", - ) - self.deactivate_chat(chat_id, "removed group") + chat = self.get_chat_by_update(update) + if chat is not None: + logger.info( + f"Deactivating chat with id {chat.chat_id} because the bot was" + "removed from the group.", + ) + self.deactivate_chat(chat, "removed group") return # Build the exception string from the exception @@ -327,155 +324,35 @@ async def announce_command( if ( not update.effective_user or not update.effective_chat - or not update.message - or not update.message.text + or not update.effective_message + or not update.effective_message.text ): return - # Check if the user is an admin + # Check if the user is an admin (no matter if in a group or private chat) if update.effective_user.id != Config.get().telegram_admin_user_id: - await self.send_message( - chat_id=update.effective_chat.id, - text=markdown_escape( - "You are not an admin, so you can't use this command.", - ), - parse_mode=telegram.constants.ParseMode.MARKDOWN_V2, + await update.effective_message.reply_text( + "You are not an admin, so you can't use this command.", ) return try: # Get the announcement text - message = update.message.text.removeprefix("/announce ") + message = update.effective_message.text.removeprefix("/announce ") message_parts = message.split("||") header = message_parts[0].strip() text = message_parts[1].strip() self.add_announcement(header, text) - await self.send_message( - chat_id=update.effective_chat.id, - text=markdown_escape( - "Announcement added successfully. " - "Sending it with the next scraping run.", - ), - parse_mode=telegram.constants.ParseMode.MARKDOWN_V2, + await update.effective_message.reply_text( + "Announcement added successfully. " + "Sending it with the next scraping run.", ) except IndexError: - await self.send_message( - chat_id=update.effective_chat.id, - text=markdown_escape( - "Invalid announcement command. " - "Format needs to be /announce
|| ", - ), - parse_mode=telegram.constants.ParseMode.MARKDOWN_V2, - ) - - async def channel_command( - self, - update: Update, - context: ContextTypes.DEFAULT_TYPE, - ) -> None: - """Handle the /channel command: Manage channels (admin only).""" - del context # Unused - - await self.log_call(update) - - if ( - not update.effective_user - or not update.effective_chat - or not update.message - or not update.message.text - ): - return - - # Check if the user is an admin - if update.effective_user.id != Config.get().telegram_admin_user_id: - await self.send_message( - chat_id=update.effective_chat.id, - text=markdown_escape( - "You are not an admin, so you can't use this command.", - ), - parse_mode=telegram.constants.ParseMode.MARKDOWN_V2, - ) - return - - try: - message = update.message.text.removeprefix("/channel ") - message_parts = message.split(" ") - channel_name = message_parts[0].strip() - offer_type = OfferType[message_parts[1].strip()] - source = Source[message_parts[2].strip()] - duration = OfferDuration[message_parts[3].strip()] - - # Add the channel as a "user" to the database and set it to receive - # the appropriate type of messages. - - channel_db_user = self.get_user_by_telegram_id(channel_name) - - if channel_db_user is None: - # Register channel user if not registered yet - session: orm.Session = self.Session() - try: - latest_announcement: int = session.execute( # type: ignore - sa.select( - sa.func.max(Announcement.id), - ), - ).scalar() - - new_user = User( - telegram_id=channel_name, - telegram_chat_id=channel_name, - telegram_user_details={ - "description": "Channel user created by admin", - }, - registration_date=datetime.now(tz=timezone.utc), - last_announcement_id=latest_announcement, - ) - session.add(new_user) - session.commit() - - channel_db_user = self.get_user_by_telegram_id(channel_name) - except Exception: - session.rollback() - raise - - if channel_db_user is None: - raise Exception("Channel user not found.") # noqa: TRY301, TRY002 - - if self.is_subscribed(channel_db_user, offer_type, source, duration): - self.unsubscribe(channel_db_user, offer_type, source, duration) - - await self.send_message( - chat_id=update.effective_chat.id, - text=markdown_escape( - f"Channel {channel_db_user.telegram_chat_id} is now " - f"unsubscribed to offers from: " - f"{offer_type.value} / {source.value} / {duration.value}.", - ), - parse_mode=telegram.constants.ParseMode.MARKDOWN_V2, - ) - else: - self.subscribe(channel_db_user, offer_type, source, duration) - - await self.send_message( - chat_id=update.effective_chat.id, - text=markdown_escape( - f"Channel {channel_db_user.telegram_chat_id} is now subscribed " - f"to offers from: " - f"{offer_type.value} / {source.value} / {duration.value}." - f"Sending new offers with the next scraping run.", - ), - parse_mode=telegram.constants.ParseMode.MARKDOWN_V2, - ) - - except IndexError: - await self.send_message( - chat_id=update.effective_chat.id, - text=markdown_escape( - "Invalid channel command. Format needs to be " - "/channel .", - ), - parse_mode=telegram.constants.ParseMode.MARKDOWN_V2, + await update.effective_message.reply_text( + "Invalid announcement command. " + "Format needs to be /announce
|| ", ) async def debug_command( @@ -488,29 +365,30 @@ async def debug_command( await self.log_call(update) - if update.message is None: + if update.effective_message is None or update.effective_chat is None: return - if update.effective_chat is not None and update.effective_user is not None: - await self.send_message( - chat_id=update.effective_chat.id, - text=markdown_json_formatted( + await update.effective_message.reply_markdown_v2( + markdown_json_formatted( + "update.effective_chat = " + + json.dumps( + update.effective_chat.to_dict(), + indent=2, + ensure_ascii=False, + ), + ), + ) + + if update.effective_user is not None: + await update.effective_message.reply_markdown_v2( + markdown_json_formatted( "update.effective_user = " + json.dumps( update.effective_user.to_dict(), indent=2, ensure_ascii=False, ), - ) - + markdown_json_formatted( - "update.effective_chat = " - + json.dumps( - update.effective_chat.to_dict(), - indent=2, - ensure_ascii=False, - ), ), - parse_mode=telegram.constants.ParseMode.MARKDOWN_V2, ) async def error_command( @@ -532,58 +410,58 @@ async def help_command( update: Update, context: ContextTypes.DEFAULT_TYPE, ) -> None: - """Handle the /help command: Display all available commands to the user.""" + """Handle the /help command: Display all available commands.""" del context # Unused await self.log_call(update) - if update.message is None: + if update.effective_message is None: return - await update.message.reply_markdown_v2(MESSAGE_HELP) + await update.effective_message.reply_markdown_v2(MESSAGE_HELP) async def leave_command( self, update: Update, context: ContextTypes.DEFAULT_TYPE, ) -> None: - """Handle the /leave command: Unregister the user.""" + """Handle the /leave command: Unregister the chat.""" del context # Unused await self.log_call(update) - if update.message is None or update.effective_user is None: + if update.effective_message is None or update.effective_chat is None: return - db_user = self.get_user_by_telegram_id(update.effective_user.id) + db_chat = self.get_chat_by_update(update) - if db_user is None: + if db_chat is None: message = ( - Rf"Hi {update.effective_user.mention_markdown_v2()}, " + Rf"Hi {self.get_caller_name(update)}, " Rf"you are currently not registered\. " R"So you can't leave ;\-\)" ) logger.debug(f"Sending /leave reply: {message}") - await update.message.reply_markdown_v2(message) + await update.effective_message.reply_markdown_v2(message) return - # Delete user from database (if registered) + # Delete chat from database (if registered) session: orm.Session = self.Session() try: - session.delete(db_user) + session.delete(db_chat) session.commit() except Exception: session.rollback() raise message = ( - Rf"Bye {update.effective_user.mention_markdown_v2()}, " + Rf"Bye {self.get_caller_name(update)}, " Rf"I'm sad to see you go\. " - R"Your user data has been deleted\. " + R"The data stored for this chat has been deleted\. " R"If you want to come back at any time, just type /start\!" ) logger.debug(f"Sending /leave reply: {message}") - await update.message.reply_markdown_v2(message) + await update.effective_message.reply_markdown_v2(message) async def manage_command( self, @@ -595,17 +473,18 @@ async def manage_command( await self.log_call(update) - if update.message is None or update.effective_user is None: + if update.effective_message is None or update.effective_chat is None: return - db_user = self.get_user_by_telegram_id(update.effective_user.id) - if db_user is None: - await update.message.reply_text(MESSAGE_USER_NOT_REGISTERED) + db_chat = self.get_chat_by_update(update) + + if db_chat is None: + await update.effective_message.reply_text(MESSAGE_CHAT_NOT_REGISTERED) return - await update.message.reply_text( + await update.effective_message.reply_text( MESSAGE_MANAGE_MENU, - reply_markup=self.manage_keyboard(db_user), + reply_markup=self.manage_keyboard(db_chat), ) async def refresh_command( @@ -621,178 +500,161 @@ async def refresh_command( await self.log_call(update) - if update.effective_chat is None or update.effective_user is None: + if update.effective_message is None or update.effective_chat is None: return - db_user = self.get_user_by_telegram_id(update.effective_user.id) - if db_user is None: - await self.send_message( - chat_id=update.effective_chat.id, - text=MESSAGE_USER_NOT_REGISTERED, - parse_mode=None, - ) + db_chat = self.get_chat_by_update(update) + + if db_chat is None: + await update.effective_message.reply_text(MESSAGE_CHAT_NOT_REGISTERED) return - if ( - db_user.telegram_subscriptions is None - or len(db_user.telegram_subscriptions) == 0 - ): - await self.send_message( - chat_id=update.effective_chat.id, - text=MESSAGE_NO_SUBSCRIPTIONS, - parse_mode=None, - ) + if db_chat.subscriptions is None or len(db_chat.subscriptions) == 0: + await update.effective_message.reply_text(MESSAGE_NO_SUBSCRIPTIONS) return - if not await self.send_new_offers(db_user): - await self.send_message( - chat_id=update.effective_chat.id, - text=MESSAGE_NO_NEW_OFFERS, - parse_mode=None, - ) + if not await self.send_new_offers(db_chat): + await update.effective_message.reply_text(MESSAGE_NO_NEW_OFFERS) async def start_command( self, update: Update, context: ContextTypes.DEFAULT_TYPE, ) -> None: - """Handle the /start command: Register the user and display guide.""" + """Handle the /start command: Register the chat and display guide.""" + del context # Unused + await self.log_call(update) - if update.message is None or update.effective_user is None: + if update.effective_message is None or update.effective_chat is None: return welcome_text = ( - R"I belong to the [LootScraper]" + R"I am part of the [LootScraper]" R"(https://github\.com/eikowagenknecht/lootscraper) project\. " - R"If you have any issues or feature request, please use the " - R"[Github issues]" + R"If you have any problems or feature request, please use the " + R"[GitHub issues]" R"(https://github\.com/eikowagenknecht/lootscraper/issues) " R"to report them\. " R"And if you like it, please consider " - R"[⭐ starring it on GitHub]" + R"[starring it ⭐]" R"(https://github\.com/eikowagenknecht/lootscraper/stargazers)\. " - R"Thanks\!" + R"Thanks a lot\!" "\n\n" - R"*How this works*" + R"*How it works*" "\n" - R"For a quick start, " - R"I just subscribed you to free offers from Steam, Epic and GOG\. " - R"There are more sources available\. " + R"For a quick start, I have just subscribed you to free offers from " + R"Steam, Epic and GOG\. There are more sources available\. " R"To configure what kind of offers you want to see, " R"you can use the /manage command\. " R"I will then send you a message every time a new offer is added\. " R"To see the commands you can use to talk to me, type /help\." "\n\n" - R"*Privacy*" + R"*Send me offers, now\!*" + "\n" + R"I haven't sent you any offers yet to give you some time to " + R"read this message\. To see what's currently on offer, you can use " + R"the /refresh command now\. It's not necessary to do this however, " + R"I'll send you offers automatically whenever a new one comes in\." + "\n\n" + R"*About privacy*" "\n" R"I need to store some user data " - R"\(e\.g\. your Telegram user ID and your subscriptions\) to work\. " + R"\(e\.g\. your Telegram chat ID and your subscriptions\) in order " + R"to work\. " R"You can leave any time by typing /leave\. " - R"This instantly deletes all data about you\. " - R"Also I will be sad to see you go\." + R"This will immediately delete all data about you\. " + R"Also, I will be sad to see you go\." ) - db_user = self.get_user_by_telegram_id(update.effective_user.id) + db_chat = self.get_chat_by_update(update) - if db_user is not None: + if db_chat is not None: message = ( - Rf"Welcome back, {update.effective_user.mention_markdown_v2()} 👋\. " + Rf"Welcome back, {self.get_caller_name(update)} 👋\. " R"You are already registered ❤\. " R"In case you forgot, this was my initial message to you:" "\n\n" + welcome_text ) logger.debug(f"Sending /start reply: {message}") - await update.message.reply_markdown_v2(message) + await update.effective_message.reply_markdown_v2(message) return - # Register user if not registered yet + # Register chat if not registered yet session: orm.Session = self.Session() - if not update.effective_chat: - session.rollback() - raise ValueError("Effective chat doesn't exist.") - try: latest_announcement: int = session.execute( # type: ignore sa.select(sa.func.max(Announcement.id)), ).scalar() - chatid: str = str(update.effective_chat.id) - new_user = User( - telegram_id=str(update.effective_user.id), - telegram_chat_id=chatid, - telegram_user_details=update.effective_user.to_dict(), + new_chat = TelegramChat( registration_date=datetime.now(tz=timezone.utc), + chat_type=update.effective_chat.type, # type: ignore + chat_id=update.effective_chat.id, + user_id=update.effective_user.id if update.effective_user else None, + chat_details=update.effective_chat.to_dict(), + user_details=update.effective_user.to_dict() + if update.effective_user + else None, + thread_id=update.effective_message.message_thread_id + if update.effective_message.message_thread_id + else None, last_announcement_id=latest_announcement, ) - session.add(new_user) + session.add(new_chat) session.commit() except Exception: session.rollback() raise - # Subscribe user to some default categories - self.subscribe(new_user, OfferType.GAME, Source.STEAM, OfferDuration.CLAIMABLE) - self.subscribe(new_user, OfferType.GAME, Source.GOG, OfferDuration.CLAIMABLE) - self.subscribe(new_user, OfferType.GAME, Source.EPIC, OfferDuration.CLAIMABLE) + # Subscribe chat to some default categories + self.subscribe(new_chat, OfferType.GAME, Source.STEAM, OfferDuration.CLAIMABLE) + self.subscribe(new_chat, OfferType.GAME, Source.GOG, OfferDuration.CLAIMABLE) + self.subscribe(new_chat, OfferType.GAME, Source.EPIC, OfferDuration.CLAIMABLE) message = ( - Rf"Hi {update.effective_user.mention_markdown_v2()} 👋, " + Rf"Hi {self.get_caller_name(update)} 👋, " R"welcome to the LootScraper Telegram Bot and thank you for registering\!" "\n\n" + welcome_text ) logger.debug(f"Sending /start reply: {message}") - await update.message.reply_markdown_v2(message) - - # Send all current offers once - await self.refresh_command(update, context) - - # Notify about the new registration - if Config.get().telegram_log_level.value >= TelegramLogLevel.DEBUG.value: - message = ( - Rf"New user {update.effective_user.mention_markdown_v2()} registered\." - ) - logger.debug(f"Sending user registered message: {message}") - await self.send_message( - chat_id=Config.get().telegram_developer_chat_id, - text=message, - parse_mode=telegram.constants.ParseMode.MARKDOWN_V2, - ) + await update.effective_message.reply_markdown_v2(message) async def status_command( self, update: Update, context: ContextTypes.DEFAULT_TYPE, ) -> None: - """Handle the /status command: Display some statistics about the user.""" + """Handle the /status command: Display some statistics about the chat.""" del context # Unused await self.log_call(update) - if not update.effective_chat or not update.effective_user or not update.message: + if not update.effective_chat or not update.effective_message: return - db_user = self.get_user_by_telegram_id(update.effective_user.id) - if db_user is None: + db_chat = self.get_chat_by_update(update) + + if db_chat is None: message = ( - Rf"Hi {update.effective_user.mention_markdown_v2()}, " + Rf"Hi {self.get_caller_name(update)}, " Rf"you are currently not registered\. " R"So there is no data stored about you\. " R"But I'd be happy to see you register any time with the " R"/start command\!" ) logger.debug(f"Sending /status reply: {message}") - await update.message.reply_markdown_v2(message) + await update.effective_message.reply_markdown_v2(message) return subscriptions_text: str - if len(db_user.telegram_subscriptions) > 0: + if len(db_chat.subscriptions) > 0: subscriptions_text = ( - Rf"\- You have {len(db_user.telegram_subscriptions)} subscriptions\. " + Rf"\- You have {len(db_chat.subscriptions)} subscriptions\. " ) - subscriptions_text += R"Here are the categories you are subscribed to: \n" - for subscription in db_user.telegram_subscriptions: + subscriptions_text += "Here are the categories you are subscribed to: \n" + for subscription in db_chat.subscriptions: subscriptions_text += markdown_escape( f" * {subscription.source.value} / {subscription.type.value} / " f"{subscription.duration.value}\n", @@ -806,10 +668,10 @@ async def status_command( R"You can change that with the /manage command if you wish\." ) - if db_user.timezone_offset: + if db_chat.timezone_offset: timezone_text = ( Rf"\- Your timezone is set to " - Rf"{markdown_escape(db_user.timezone_offset)} hours\. " + Rf"{markdown_escape(db_chat.timezone_offset)} hours\. " R"You can change that with the /timezone command if you wish\." ) else: @@ -819,27 +681,27 @@ async def status_command( ) message = ( - Rf"Hi {update.effective_user.mention_markdown_v2()}, " + Rf"Hi {self.get_caller_name(update)}, " Rf"you are currently registered\. " R"But I'm not storing much user data, so this is all I know about you: " "\n\n" Rf"\- You registered on " - Rf"{markdown_escape(db_user.registration_date.strftime(TIMESTAMP_READABLE_WITH_HOUR))}" + Rf"{markdown_escape(db_chat.registration_date.strftime(TIMESTAMP_READABLE_WITH_HOUR))}" Rf" with the /start command\." "\n" Rf"\- Your Telegram chat id is " - Rf"{markdown_escape(db_user.telegram_chat_id)}\. " + Rf"{markdown_escape(db_chat.chat_id)}\. " R"Neat, huh? I use it to send you notifications\." "\n" f"{subscriptions_text}" "\n" - Rf"\- You received {markdown_escape(db_user.offers_received_count)} " + Rf"\- You received {markdown_escape(db_chat.offers_received_count)} " R"offers so far\. " "\n" f"{timezone_text}" ) logger.debug(f"Sending /status reply: {message}") - await update.message.reply_markdown_v2(message) + await update.effective_message.reply_markdown_v2(message) async def timezone_command( self, @@ -851,14 +713,12 @@ async def timezone_command( await self.log_call(update) - if not update.effective_chat: + if not update.effective_chat or not update.effective_message: return - await self.send_message( - chat_id=update.effective_chat.id, - text="Choose one of these available timezones:", + await update.effective_message.reply_text( + "Choose one of these available timezones:", reply_markup=self.timezone_keyboard(), - parse_mode=None, ) async def unknown_command( @@ -867,31 +727,58 @@ async def unknown_command( context: ContextTypes.DEFAULT_TYPE, ) -> None: """Handle unknown commands.""" - del context # Unused - await self.log_call(update) - if not update.effective_chat: + if not update.effective_chat or not update.effective_message: return # Special handling for channels if update.effective_chat.type == update.effective_chat.CHANNEL: - await self.send_message( - chat_id=update.effective_chat.id, - text=markdown_escape( - f"Commands are not supported in channels. " - f"This channel has the id {update.effective_chat.id}.", - ), - parse_mode=telegram.constants.ParseMode.MARKDOWN_V2, - ) + message_text = update.effective_message.text + bot_username = update.get_bot().username + + if message_text is None or f"@{bot_username}" not in message_text: + await update.effective_message.reply_text( + "Commands in channels (like this one with the id " + f"{update.effective_chat.id}) need to use the " + f"/command@Bot syntax. " + f"For example: /start@{bot_username}.", + ) + return + + # Extract the actual command from the message_text. + # Perform appropriate action. + if message_text.startswith("/start"): + await self.start_command(update, context) + elif message_text.startswith("/help"): + await self.help_command(update, context) + elif message_text.startswith("/status"): + await self.status_command(update, context) + elif message_text.startswith("/manage"): + await self.manage_command(update, context) + elif message_text.startswith("/refresh"): + await self.refresh_command(update, context) + elif message_text.startswith("/leave"): + await self.leave_command(update, context) + elif message_text.startswith("/announce"): + await self.announce_command(update, context) + elif message_text.startswith("/debug"): + await self.debug_command(update, context) + elif message_text.startswith("/error"): + await self.error_command(update, context) + elif message_text.startswith("/timezone"): + await self.timezone_command(update, context) + else: + await update.effective_message.reply_text( + "Commands in channels (like this one with the id " + f"{update.effective_chat.id}) need to use the " + f"/command@Bot syntax. " + f"For example: /start@{bot_username}.", + ) return # Normal chats - await self.send_message( - chat_id=update.effective_chat.id, - text=MESSAGE_UNKNOWN_COMMAND, - parse_mode=None, - ) + await update.effective_message.reply_text(MESSAGE_UNKNOWN_COMMAND) async def offer_callback( self, @@ -906,7 +793,7 @@ async def offer_callback( await self.log_call(update) - if update.callback_query is None or update.effective_user is None: + if update.callback_query is None or update.effective_chat is None: return query = update.callback_query @@ -914,8 +801,9 @@ async def offer_callback( if query.data is None: return - db_user = self.get_user_by_telegram_id(update.effective_user.id) - if db_user is None: + db_chat = self.get_chat_by_update(update) + + if db_chat is None: return offer_id = int(query.data.split(" ")[2]) @@ -929,7 +817,7 @@ async def offer_callback( await query.edit_message_text( text=self.offer_details_message( offer, - tzoffset=db_user.timezone_offset, + tzoffset=db_chat.timezone_offset, ), parse_mode=telegram.constants.ParseMode.MARKDOWN_V2, reply_markup=self.offer_keyboard( @@ -942,7 +830,7 @@ async def offer_callback( elif query.data.startswith("details hide"): await query.answer() await query.edit_message_text( - text=self.offer_message(offer, tzoffset=db_user.timezone_offset), + text=self.offer_message(offer, tzoffset=db_chat.timezone_offset), parse_mode=telegram.constants.ParseMode.MARKDOWN_V2, reply_markup=self.offer_keyboard( offer, @@ -994,7 +882,7 @@ async def close_callback( """Callback from the menu button "Close" in various menus.""" await self.log_call(update) - if update.callback_query is None or update.effective_user is None: + if update.callback_query is None or update.effective_chat is None: return query = update.callback_query @@ -1010,7 +898,7 @@ async def close_callback( text=MESSAGE_TIMEZONE_MENU_CLOSED, ) - # Send outstanding offers to the user + # Send outstanding offers to the chat await self.refresh_command(update, context) async def toggle_subscription_callback( @@ -1024,12 +912,13 @@ async def toggle_subscription_callback( await self.log_call(update) query = update.callback_query - if query is None or update.effective_user is None or query.data is None: + if query is None or update.effective_chat is None or query.data is None: return - db_user = self.get_user_by_telegram_id(update.effective_user.id) - if db_user is None: - await query.answer(text=MESSAGE_USER_NOT_REGISTERED) + db_chat = self.get_chat_by_update(update) + + if db_chat is None: + await query.answer(text=MESSAGE_CHAT_NOT_REGISTERED) return data = query.data.lower().removeprefix("toggle").strip().upper().split(" ") @@ -1039,17 +928,17 @@ async def toggle_subscription_callback( answer_text = None - if not self.is_subscribed(db_user, type_, source, duration): - self.subscribe(db_user, type_, source, duration) + if not self.is_subscribed(db_chat, type_, source, duration): + self.subscribe(db_chat, type_, source, duration) answer_text = POPUP_SUBSCRIBED else: - self.unsubscribe(db_user, type_, source, duration) + self.unsubscribe(db_chat, type_, source, duration) answer_text = POPUP_UNSUBSCRIBED await query.answer(text=answer_text) await query.edit_message_text( text=MESSAGE_MANAGE_MENU, - reply_markup=self.manage_keyboard(db_user), + reply_markup=self.manage_keyboard(db_chat), ) async def set_timezone_callback( @@ -1063,18 +952,18 @@ async def set_timezone_callback( await self.log_call(update) query = update.callback_query - if query is None or update.effective_user is None or query.data is None: + if query is None or update.effective_chat is None or query.data is None: return data = int(query.data.removeprefix("settimezone").strip()) session: orm.Session = self.Session() try: - db_user = self.get_user_by_telegram_id(update.effective_user.id) - if db_user is None: - await query.answer(text=MESSAGE_USER_NOT_REGISTERED) + db_chat = self.get_chat_by_update(update) + if db_chat is None: + await query.answer(text=MESSAGE_CHAT_NOT_REGISTERED) return - db_user.timezone_offset = data + db_chat.timezone_offset = data session.commit() except Exception: session.rollback() @@ -1085,9 +974,9 @@ async def set_timezone_callback( + MESSAGE_TIMEZONE_MENU_CLOSED, ) - async def send_new_offers(self, user: User) -> bool: - """Send all new offers for the user.""" - subscriptions = user.telegram_subscriptions + async def send_new_offers(self, chat: TelegramChat) -> bool: + """Send all new offers for the chat.""" + subscriptions = chat.subscriptions offers_sent = 0 subscription: TelegramSubscription @@ -1109,11 +998,11 @@ async def send_new_offers(self, user: User) -> bool: # Send the offers for offer in offers: - await self.send_offer(offer, user) + await self.send_offer(offer, chat) # Update the last offer id subscription.last_offer_id = offers[-1].id - user.offers_received_count = user.offers_received_count + len( + chat.offers_received_count = chat.offers_received_count + len( offers, ) session.commit() @@ -1123,7 +1012,7 @@ async def send_new_offers(self, user: User) -> bool: return bool(offers_sent) - async def send_new_announcements(self, user: User) -> None: + async def send_new_announcements(self, chat: TelegramChat) -> None: session: orm.Session = self.Session() try: announcements: Sequence[Announcement] = ( @@ -1134,7 +1023,7 @@ async def send_new_announcements(self, user: User) -> None: Announcement.channel == Channel.ALL, Announcement.channel == Channel.TELEGRAM, ), - Announcement.id > user.last_announcement_id, + Announcement.id > chat.last_announcement_id, ), ), ) @@ -1145,72 +1034,56 @@ async def send_new_announcements(self, user: User) -> None: if len(announcements) == 0: return - user.last_announcement_id = announcements[-1].id + chat.last_announcement_id = announcements[-1].id session.commit() # Send the offers for announcement in announcements: - await self.send_announcement(announcement, user) + await self.send_announcement(announcement, chat) except Exception: session.rollback() raise - def get_user_by_telegram_id( - self, - telegram_id: int | str, - ) -> User | None: - session: orm.Session = self.Session() - try: - db_user = ( - session.execute(sa.select(User).where(User.telegram_id == telegram_id)) - .scalars() - .one_or_none() - ) - except Exception: - session.rollback() - raise - - return db_user - - def get_user_by_chat_id(self, chat_id: int | str) -> User | None: - """ - Returns the user that belongs to a given chat id. If the chat id is - negative, it is a group chat of some sort. In that case, return None. - """ - if self.is_group_chat(chat_id): + def get_chat_by_update(self, update: Update) -> TelegramChat | None: + if update.effective_chat is None: return None - try: - session: orm.Session = self.Session() - db_user = ( - session.execute(sa.select(User).where(User.telegram_chat_id == chat_id)) - .scalars() - .one_or_none() + if update.effective_message and update.effective_message.message_thread_id: + return self.get_chat_by_id( + update.effective_chat.id, + update.effective_message.message_thread_id, ) - except Exception: - session.rollback() - raise - return db_user + return self.get_chat_by_id(update.effective_chat.id) - def get_users_by_chat_id(self, chat_id: int | str) -> list[User] | None: - """Returns the all users that belongs to a given chat id.""" + def get_chat_by_id( + self, + chat_id: int | str, + thread_id: int | None = None, + ) -> TelegramChat | None: try: session: orm.Session = self.Session() - db_users = list( - session.execute(sa.select(User).where(User.telegram_chat_id == chat_id)) + db_chat = ( + session.execute( + sa.select(TelegramChat).where( + sa.and_( + TelegramChat.chat_id == chat_id, + TelegramChat.thread_id == thread_id, + ), + ), + ) .scalars() - .all(), + .one_or_none() ) except Exception: session.rollback() raise - return db_users + return db_chat def is_subscribed( self, - user: User, + chat: TelegramChat, type_: OfferType, source: Source, duration: OfferDuration, @@ -1222,7 +1095,7 @@ def is_subscribed( session.query(TelegramSubscription) .filter( sa.and_( - TelegramSubscription.user_id == user.id, + TelegramSubscription.chat_id == chat.id, TelegramSubscription.type == type_, TelegramSubscription.source == source, TelegramSubscription.duration == duration, @@ -1238,19 +1111,19 @@ def is_subscribed( def subscribe( self, - user: User, + chat: TelegramChat, type_: OfferType, source: Source, duration: OfferDuration, ) -> None: session: orm.Session = self.Session() try: - if self.is_subscribed(user, type_, source, duration): + if self.is_subscribed(chat, type_, source, duration): return session.add( TelegramSubscription( - user=user, + chat=chat, source=source, type=type_, duration=duration, @@ -1263,7 +1136,7 @@ def subscribe( def unsubscribe( self, - user: User, + chat: TelegramChat, type_: OfferType, source: Source, duration: OfferDuration, @@ -1272,7 +1145,7 @@ def unsubscribe( try: session.query(TelegramSubscription).filter( sa.and_( - TelegramSubscription.user_id == user.id, + TelegramSubscription.chat_id == chat.id, TelegramSubscription.type == type_, TelegramSubscription.source == source, TelegramSubscription.duration == duration, @@ -1290,21 +1163,22 @@ async def manage_menu( ) -> None: del context # Unused - if update.callback_query is None or update.effective_user is None: + if update.callback_query is None or update.effective_chat is None: return - db_user = self.get_user_by_telegram_id(update.effective_user.id) - if db_user is None: - await update.callback_query.answer(text=MESSAGE_USER_NOT_REGISTERED) + db_chat = self.get_chat_by_update(update) + + if db_chat is None: + await update.callback_query.answer(text=MESSAGE_CHAT_NOT_REGISTERED) return await update.callback_query.answer() await update.callback_query.edit_message_text( text=MESSAGE_MANAGE_MENU, - reply_markup=self.manage_keyboard(db_user), + reply_markup=self.manage_keyboard(db_chat), ) - def manage_keyboard(self, user: User) -> InlineKeyboardMarkup: + def manage_keyboard(self, chat: TelegramChat) -> InlineKeyboardMarkup: keyboard: list[list[InlineKeyboardButton]] = [] # Add buttons for all available categories @@ -1317,7 +1191,7 @@ def manage_keyboard(self, user: User) -> InlineKeyboardMarkup: x.source == scraper_source and x.type == scraper_type and x.duration == scraper_duration - for x in user.telegram_subscriptions + for x in chat.subscriptions ): keyboard.append( subscription_button( @@ -1413,28 +1287,29 @@ def offer_keyboard( return InlineKeyboardMarkup(keyboard) - async def send_offer(self, offer: Offer, user: User) -> bool: + async def send_offer(self, offer: Offer, chat: TelegramChat) -> bool: logger.debug( - f"Sending offer {offer.title} to Telegram user {user.telegram_id}.", + f"Sending offer {offer.title} to Telegram chat {chat.chat_id}.", ) - # Special treatment for channels - if user.telegram_chat_id.startswith("@") or user.telegram_chat_id.startswith( - "-100", - ): - markup = self.offer_keyboard(offer) - + # Directly display details for channels, groups and supergroups. + # The details button would toggle for all users in the chat, so that + # would be confusing. The dismiss button would also delete for + # everyone, which probably is a bad idea. + # Also rate limiting is harder for those chats and button presses + # count as messages, so it's better to avoid them. + if chat.chat_type in [ChatType.CHANNEL, ChatType.GROUP, ChatType.SUPERGROUP]: success = await self.send_message( - chat_id=user.telegram_chat_id, + chat=chat, text=self.offer_details_message( offer, - tzoffset=user.timezone_offset, + tzoffset=chat.timezone_offset, ), - reply_markup=markup, + reply_markup=self.offer_keyboard(offer), ) return success is not None - # Normal users + # Normal chats details_button = bool( offer.game and (offer.game.igdb_info or offer.game.steam_info), ) @@ -1447,10 +1322,10 @@ async def send_offer(self, offer: Offer, user: User) -> bool: ) success = await self.send_message( - chat_id=user.telegram_chat_id, + chat=chat, text=self.offer_message( offer, - tzoffset=user.timezone_offset, + tzoffset=chat.timezone_offset, ), reply_markup=markup, ) @@ -1459,14 +1334,14 @@ async def send_offer(self, offer: Offer, user: User) -> bool: async def send_announcement( self, announcement: Announcement, - user: User, + chat: TelegramChat, ) -> None: logger.debug( f"Sending announcement {announcement.id} " - f"to Telegram user {user.telegram_id}.", + f"to Telegram chat {chat.chat_id}.", ) await self.send_message( - chat_id=user.telegram_chat_id, + chat=chat, text=announcement.text_markdown, reply_markup=None, ) @@ -1647,9 +1522,26 @@ def offer_details_message( return content + async def send_dev_message( + self, + text: str, + parse_mode: str | None = None, + ) -> None: + if self.application is None: + logger.error( + "Tried to send message while the application is not initialized: " + f"{text}", + ) + return + await self.application.bot.send_message( + chat_id=Config.get().telegram_developer_chat_id, + text=text, + parse_mode=parse_mode, + ) + async def send_message( self, - chat_id: int | str, + chat: TelegramChat, text: str, parse_mode: str | None = telegram.constants.ParseMode.MARKDOWN_V2, reply_markup: ReplyMarkup | None = None, @@ -1665,22 +1557,26 @@ async def send_message( send_attempt = 0 while not message_handled: - try: - send_attempt = send_attempt + 1 + send_attempt = send_attempt + 1 - if not self.chat_is_active(chat_id): - logger.info( - f"Not sending to chat id {chat_id} because the chat is " - "inactive.", - ) - return None + if not chat.active: + logger.info( + f"Not sending to chat id {chat.chat_id} because the chat is " + "inactive.", + ) + return None + + logger.debug( + f"Sending message to chat {chat.chat_id} with content {text}.", + ) - logger.debug(f"Sending message to chat {chat_id} with content {text}.") + try: message = await self.application.bot.send_message( - chat_id=chat_id, + chat_id=chat.chat_id, text=text, parse_mode=parse_mode, reply_markup=reply_markup, + message_thread_id=chat.thread_id if chat.thread_id else None, ) except telegram.error.TimedOut: # Telegram is not responding. This is not handled by the AIORateLimiter. @@ -1691,21 +1587,21 @@ async def send_message( await asyncio.sleep(retry_in_seconds) except telegram.error.Forbidden: message_handled = True - # The user blocked the chat. + # The chat was blocked. logger.info( - f"Deactivating chat id {chat_id} " + f"Deactivating chat id {chat.chat_id} " "because the chat was blocked: {e}", ) - self.deactivate_chat(chat_id, "blocked chat") + self.deactivate_chat(chat, "blocked chat") except TelegramError as e: message_handled = True # The chat could not be found if e.message == "Chat not found": logger.info( - f"Deactivating chat id {chat_id} " + f"Deactivating chat id {chat.chat_id} " "because the chat could not be found.", ) - self.deactivate_chat(chat_id, "not found") + self.deactivate_chat(chat, "not found") else: logger.exception("Telegram error while sending message.") else: @@ -1717,80 +1613,29 @@ async def send_message( def is_group_chat(self, chat_id: int | str) -> bool: return int(chat_id) < 0 - def chat_is_active(self, chat_id: int | str) -> bool: - # Group chat (one entry being marked as inactive is enough) - if self.is_group_chat(chat_id): - db_users = self.get_users_by_chat_id(chat_id) - if db_users is None or any(x.inactive is not None for x in db_users): - return False - # Single user - else: - db_user = self.get_user_by_chat_id(chat_id) - if db_user is None or db_user.inactive is not None: - return False - - return True - - def deactivate_chat(self, chat_id: int | str, reason: str) -> None: - db_users = self.get_users_by_chat_id(chat_id) - - if db_users is None: - return - - # Deactivate all users that are registered for this chat. - for db_user in db_users: - logger.debug(f"Deactivating user {db_user.telegram_id}.") - - session: orm.Session = self.Session() - try: - db_user.inactive = reason - session.commit() - except Exception: - session.rollback() - raise - - def remove_user(self, chat_id: int | str) -> None: - db_user = self.get_user_by_chat_id(chat_id) - - if db_user is None: - return - - # User is registered, remove him from the database. - logger.debug(f"Removing user {db_user.telegram_id} from database.") + def deactivate_chat(self, chat: TelegramChat, reason: str) -> None: + logger.debug(f"Deactivating chat {chat.chat_id}.") session: orm.Session = self.Session() try: - session.delete(db_user) + chat.inactive_reason = reason session.commit() except Exception: session.rollback() raise async def log_call(self, update: Update) -> None: - # Only log calls from users if the log level is set to debug. - if Config.get().telegram_log_level.value >= TelegramLogLevel.DEBUG.value: - type_ = "Callback query" if update.callback_query else "Message" - - if update.effective_user: - user = update.effective_user.name - if user and user.startswith("@"): - user = "(@)" + user.removeprefix("@") - else: - user = "unknown user" + type_ = "Callback query" if update.callback_query else "Message" - if update.callback_query and update.callback_query.data: - content = f"with content {update.callback_query.data}" - elif update.effective_message: - content = f"with content {update.effective_message.text_markdown_v2}" - else: - content = "without content" + if update.callback_query and update.callback_query.data: + content = f"with content {update.callback_query.data}" + elif update.effective_message: + content = f"with content {update.effective_message.text_markdown_v2}" + else: + content = "without content" - message = markdown_escape(f"{type_} from {user} {content}") - logger.debug(message) - await self.send_message( - chat_id=Config.get().telegram_developer_chat_id, - text=message, - ) + message = f"{type_} from {self.get_caller_name(update)} {content}" + logger.debug(message) def add_announcement(self, header: str, text: str) -> None: """Add an announcement to the database.""" @@ -1812,6 +1657,14 @@ def add_announcement(self, header: str, text: str) -> None: session.rollback() raise + def get_caller_name(self, update: Update) -> str: + if update.effective_chat and update.effective_chat.title: + return markdown_escape(update.effective_chat.title) + if update.effective_user: + return update.effective_user.mention_markdown_v2() + + return "unknown user" + class TelegramLoggingHandler(logging.Handler): def __init__(self, bot: TelegramBot) -> None: @@ -1831,9 +1684,8 @@ def emit(self, record: logging.LogRecord) -> None: message = "```\n" + (markdown_escape(chunk)) + "\n```" asyncio.create_task( - self.bot.send_message( - chat_id=Config.get().telegram_developer_chat_id, - text=message, + self.bot.send_dev_message( + message, parse_mode=telegram.constants.ParseMode.MARKDOWN_V2, ), ) diff --git a/tests/test_telegram.py b/tests/test_telegram.py index cdaaa817..b6b246e6 100644 --- a/tests/test_telegram.py +++ b/tests/test_telegram.py @@ -6,7 +6,7 @@ import sqlalchemy as sa from lootscraper.config import Config -from lootscraper.database import LootDatabase, Offer, User +from lootscraper.database import LootDatabase, Offer, TelegramChat from lootscraper.telegrambot import TelegramBot from sqlalchemy import orm @@ -31,9 +31,11 @@ async def test_telegram_messagesend_registered_user(self) -> None: # Act # TODO: Replace with mock offer offer: Offer = session.execute(sa.select(Offer)).scalars().first() - user: User = ( + user: TelegramChat = ( session.execute( - sa.select(User).where(User.telegram_id == 724039662), + sa.select(TelegramChat).where( + TelegramChat.user_id == 724039662, + ), ) # Eiko .scalars() .first() @@ -51,9 +53,11 @@ async def test_telegram_new_offers(self) -> None: session: orm.Session = db.Session() # Act - user: User = ( + user: TelegramChat = ( session.execute( - sa.select(User).where(User.telegram_id == 724039662), + sa.select(TelegramChat).where( + TelegramChat.user_id == 724039662, + ), ) .scalars() .first()