diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..306f58e --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "justMyCode": true + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..d99f2f3 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,6 @@ +{ + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter" + }, + "python.formatting.provider": "none" +} \ No newline at end of file diff --git a/0_download_dialogs_list.py b/0_download_dialogs_list.py index a810740..33c472d 100644 --- a/0_download_dialogs_list.py +++ b/0_download_dialogs_list.py @@ -13,9 +13,7 @@ def init_args(): :return: argparse.Namespace """ - parser = argparse.ArgumentParser( - description="Download dialogs list for user." - ) + parser = argparse.ArgumentParser(description="Download dialogs list for user.") parser.add_argument( "--dialogs_limit", type=int, help="number of dialogs", required=True @@ -46,7 +44,7 @@ async def save_dialogs(client, dialogs_limit): dialog_name = dialog.name dialog_members = [] - print(f"dialog #{dialog_id}") + skip = False dialog_type = "" if dialog.is_user: @@ -55,6 +53,12 @@ async def save_dialogs(client, dialogs_limit): dialog_type = "Group" elif dialog.is_channel: dialog_type = "Channel" + skip = True + + prefix = "SKIP " if skip else "" + print(f"{prefix}dialog #{dialog_id}, name: {dialog_name}, type: {dialog_type}") + if skip: + continue try: users = await client.get_participants(dialog) @@ -73,7 +77,9 @@ async def save_dialogs(client, dialogs_limit): except BaseException as error: print("ERROR\n", error) - save_dialog(dialog_id, dialog_name, dialog_members, dialog_type, DIALOGS_LIST_FOLDER) + save_dialog( + dialog_id, dialog_name, dialog_members, dialog_type, DIALOGS_LIST_FOLDER + ) if __name__ == "__main__": @@ -85,7 +91,12 @@ async def save_dialogs(client, dialogs_limit): SESSION_NAME = args.session_name config = init_config(CONFIG_PATH) - client = TelegramClient(SESSION_NAME, config["api_id"], config["api_hash"], system_version="4.16.30-vxCUSTOM") + client = TelegramClient( + SESSION_NAME, + config["api_id"], + config["api_hash"], + system_version="4.16.30-vxCUSTOM", + ) DIALOGS_LIST_FOLDER = config["dialogs_list_folder"] diff --git a/1_download_dialogs_data.py b/1_download_dialogs_data.py index 306b826..93f72f6 100644 --- a/1_download_dialogs_data.py +++ b/1_download_dialogs_data.py @@ -1,5 +1,9 @@ +import asyncio +import datetime import os import argparse +import queue +import threading from typing import Dict import pandas as pd @@ -11,6 +15,10 @@ from utils.utils import init_config, read_dialogs REACTIONS_LIMIT_PER_MESSAGE = 100 +DIALOG_DOWNLOAD_DELAY = 3 +DIALOG_PROCESSORS_COUNT = 3 + +DIALOG_QUEUE = queue.Queue() def init_args(): @@ -26,13 +34,14 @@ def init_args(): nargs="+", type=str, help="id(s) of dialog(s) to download, -1 for all", - required=True, + default="-", ) parser.add_argument( "--dialog_msg_limit", type=int, help="amount of messages to download from a dialog, -1 for all", - default=10000, + default="-", + required=True, ) parser.add_argument( "--config_path", @@ -42,14 +51,16 @@ def init_args(): ) parser.add_argument("--debug_mode", type=int, help="Debug mode", default=0) parser.add_argument("--session_name", type=str, help="session name", default="tmp") - parser.add_argument('--skip_private', action='store_true') - parser.add_argument('--skip_groups', action='store_true') - parser.add_argument('--skip_channels', action='store_true') + parser.add_argument("--skip_private", action="store_true") + parser.add_argument("--skip_groups", action="store_true") + parser.add_argument("--skip_channels", action="store_true") return parser.parse_args() -def dialogs_id_input_handler(input_id_lst, is_dialog_type_accepted, dialog_list="data/dialogs"): +def dialogs_id_input_handler( + input_id_lst, is_dialog_type_accepted, dialog_list="data/dialogs" +): """ Functions handles input_id_lst depending on the input @@ -60,15 +71,27 @@ def dialogs_id_input_handler(input_id_lst, is_dialog_type_accepted, dialog_list= """ if input_id_lst[0] == "-1": - return [dialog["id"] for dialog in dialog_list if is_dialog_type_accepted[dialog["type"]]] + return [ + dialog["id"] + for dialog in dialog_list + if is_dialog_type_accepted[dialog["type"]] + ] elif len(input_id_lst) == 1: provided_ids = [int(dialog_id) for dialog_id in input_id_lst[0].split(",")] - return [dialog["id"] for dialog in dialog_list if - is_dialog_type_accepted[dialog["type"]] and dialog["id"] in provided_ids] + print(f"provided_ids 1 {input_id_lst[0]}") + return [ + dialog["id"] + for dialog in dialog_list + if is_dialog_type_accepted[dialog["type"]] and dialog["id"] in provided_ids + ] elif len(input_id_lst) > 1: provided_ids = [int(dialog_id.replace(",", "")) for dialog_id in input_id_lst] - return [dialog["id"] for dialog in dialog_list if - is_dialog_type_accepted[dialog["type"]] and dialog["id"] in provided_ids] + print(f"provided_ids 2 {len(provided_ids)}") + return [ + dialog["id"] + for dialog in dialog_list + if is_dialog_type_accepted[dialog["type"]] and dialog["id"] in provided_ids + ] def msg_limit_input_handler(msg_limit): @@ -108,18 +131,21 @@ def msg_handler(msg): if isinstance(attribute, telethon.tl.types.DocumentAttributeSticker): msg_attributes["message"] = attribute.alt msg_attributes["type"] = "sticker" + break elif msg.video: for attribute in msg.video.attributes: if isinstance(attribute, telethon.tl.types.DocumentAttributeVideo): msg_attributes["duration"] = attribute.duration msg_attributes["type"] = "video" + break elif msg.voice: for attribute in msg.voice.attributes: if isinstance(attribute, telethon.tl.types.DocumentAttributeAudio): msg_attributes["duration"] = attribute.duration msg_attributes["type"] = "voice" + break elif msg.photo: msg_attributes["type"] = "photo" @@ -127,8 +153,9 @@ def msg_handler(msg): return msg_attributes -async def get_message_reactions(message: telethon.types.Message, dialog_peer: telethon.types.InputPeerChat) \ - -> Dict[int, str]: +async def get_message_reactions( + message: telethon.types.Message, dialog_peer: telethon.types.InputPeerChat +) -> Dict[int, str]: """ Loads reactions for a single message. Doesn't work for broadcast channels. @@ -137,15 +164,21 @@ async def get_message_reactions(message: telethon.types.Message, dialog_peer: te :return: dict of "user_id - reaction emoji" pairs """ + try: - result = await client(telethon.functions.messages.GetMessageReactionsListRequest( - peer=dialog_peer, - id=message.id, - limit=REACTIONS_LIMIT_PER_MESSAGE, - )) + result = await client( + telethon.functions.messages.GetMessageReactionsListRequest( + peer=dialog_peer, + id=message.id, + limit=REACTIONS_LIMIT_PER_MESSAGE, + ) + ) reaction_objects = result.reactions - reactions = {reaction_object.peer_id.user_id: reaction_object.reaction for reaction_object in reaction_objects} + reactions = { + reaction_object.peer_id.user_id: reaction_object.reaction + for reaction_object in reaction_objects + } except telethon.errors.rpcerrorlist.MsgIdInvalidError: reactions = {} @@ -156,77 +189,182 @@ async def get_message_reactions(message: telethon.types.Message, dialog_peer: te return reactions -async def download_dialog(client, id, MSG_LIMIT, config): +def timelog(): + formatted_time = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3] + return formatted_time + + +################################### + + +def get_path(dialog_id, config): + return os.path.join(config["dialogs_data_folder"], f"{str(dialog_id)}.csv") + + +async def download_dialog_by_username( + client: telethon.TelegramClient, dialog_id, MSG_LIMIT, config +): + messages = [] + dialog_data_json = f'{config["dialogs_list_folder"]}/{dialog_id}.json' + with open(dialog_data_json) as json_file: + dialog_data = json.load(json_file) + + if ( + "users" in dialog_data + and len(dialog_data["users"]) == 1 + and "username" in dialog_data["users"][0] + and dialog_data["users"][0]["username"] + ): + username = dialog_data["users"][0]["username"] + _ = await client.get_entity(username) + tg_entity = await client.get_entity(dialog_id) + messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) + + print( + f"[{timelog()}] [{dialog_id}] Downloading dialog throught username {username} finished." + ) + else: + raise Exception( + f"Cannot download through username, dialog_data: {dialog_data}" + ) + + return messages + + +async def download_dialog( + client: telethon.TelegramClient, dialog_id, MSG_LIMIT, config +): """ Download messages and their metadata for a specific dialog id, and save them in *ID*.csv :return: None """ + + print(f"[{timelog()}] [{dialog_id}] Downloading dialog started") try: - # print(f"client.get_entity({id})") + try: + tg_entity = await client.get_entity(dialog_id) + messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) + print(f"[{timelog()}] [{dialog_id}] Downloading dialog finished") + except ValueError: + messages = await download_dialog_by_username( + client, dialog_id, MSG_LIMIT, config + ) - tg_entity = await client.get_entity(id) - messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) - except ValueError: - errmsg = f"No such ID found: #{id}" - print(errmsg) + if not messages or len(messages) == 0: + raise Exception("Messages are empty.") - try: - print("Trying to init through username") + DIALOG_QUEUE.put({"dialog_id": dialog_id, "messages": messages}) + + except Exception as e: + print(f"[{timelog()}] [{dialog_id}] Dialog skipped: {e}") - username = None - dialog_data_json = f'{config["dialogs_list_folder"]}/{id}.json' - with open(dialog_data_json) as json_file: - dialog_data = json.load(json_file) +async def download_dialogs(client: telethon.TelegramClient, config): + print(f"[{timelog()}] download_dialogs started, count: {len(DIALOGS_ID)}") - if "users" in dialog_data \ - and len(dialog_data["users"]) == 1 \ - and "username" in dialog_data["users"][0]: - username = dialog_data["users"][0]["username"] + tasks = [] + for dialog_id in DIALOGS_ID: + if os.path.exists(get_path(dialog_id, config)): + # print(f"[{timelog()}] [{dialog_id}] Dialog already downloaded") + continue - print(f"Username: {username}") + task = asyncio.create_task( + download_dialog(client, dialog_id, MSG_LIMIT, config) + ) + tasks.append(task) + await asyncio.sleep(DIALOG_DOWNLOAD_DELAY) - if username: - _ = await client.get_entity(username) - tg_entity = await client.get_entity(id) - messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) + await asyncio.gather(*tasks) + + print(f"[{timelog()}] download_dialogs finished") - print(f"Done.") - else: - raise ValueError(errmsg, ) - else: - print(f"Error for dialog #{id}") - print(dialog_data) - return - except ValueError: - raise ValueError(errmsg, ) - dialog = [] +def download_dialogs_entrypoint(client: telethon.TelegramClient, config): + client.loop.run_until_complete(download_dialogs(client, config)) - for m in messages: + +#################################### + + +async def process_message(i, m, dialog_id): + # print(f"[{timelog()}] [{dialog_id}] Processing message №{i} with id={m.id}") + try: msg_attrs = msg_handler(m) - msg_reactions = await get_message_reactions(m, telethon.utils.get_peer(id)) - - dialog.append( - { - "id": m.id, - "date": m.date, - "from_id": m.from_id, - "to_id": msg_attrs["to_id"], - "fwd_from": m.fwd_from, - "message": msg_attrs["message"], - "type": msg_attrs["type"], - "duration": msg_attrs["duration"], - "reactions": msg_reactions, - } + # msg_reactions = await get_message_reactions( + # m, telethon.utils.get_peer(dialog_id) + # ) + + return { + "id": m.id, + "date": m.date, + "from_id": m.from_id, + "to_id": msg_attrs["to_id"], + "fwd_from": m.fwd_from, + "message": msg_attrs["message"], + "type": msg_attrs["type"], + "duration": msg_attrs["duration"], + # "reactions": msg_reactions, + } + except Exception as e: + print( + f"[{timelog()}] [{dialog_id}] Processing message №{i} with id={m.id} failed: {e}" + ) + return None + + +async def process_dialogs(config): + while True: + dialog_data = DIALOG_QUEUE.get() + if dialog_data is None: + break + + dialog_id = dialog_data["dialog_id"] + messages = dialog_data["messages"] + count = len(messages) + + print(f"[{timelog()}] [{dialog_id}] Processing dialog started, count: {count}") + + try: + dialog = [None] * count + for i, m in enumerate(messages): + dialog[i] = await process_message(i, m, dialog_id) + + dialog_file_path = get_path(dialog_id, config) + df = pd.DataFrame(dialog) + df.to_csv(dialog_file_path) + + print( + f"[{timelog()}] [{dialog_id}] Processing dialog finished, saved to {str(dialog_id)}.csv" + ) + except Exception as e: + print(f"[{timelog()}] [{dialog_id}] Processing dialog failed: {e}") + + DIALOG_QUEUE.task_done() + + +def process_dialogs_entrypoint(config): + asyncio.run(process_dialogs(config)) + + +def download_all(client, config): + consumer_threads = [] + for _ in range(DIALOG_PROCESSORS_COUNT): + thread = threading.Thread( + target=process_dialogs_entrypoint, args=(config,), daemon=True ) + thread.start() + consumer_threads.append(thread) - dialog_file_path = os.path.join(config["dialogs_data_folder"], f"{str(id)}.csv") + download_dialogs_entrypoint(client, config) - df = pd.DataFrame(dialog) - df.to_csv(dialog_file_path) + # Signal the consumers to stop + for _ in range(DIALOG_PROCESSORS_COUNT): + DIALOG_QUEUE.put(None) + + for thread in consumer_threads: + thread.join() if __name__ == "__main__": @@ -237,20 +375,24 @@ async def download_dialog(client, id, MSG_LIMIT, config): SESSION_NAME = args.session_name DEBUG_MODE = args.debug_mode - is_dialog_type_accepted = {'Private dialog': not args.skip_private, - 'Group': not args.skip_groups, - 'Channel': not args.skip_channels} + is_dialog_type_accepted = { + "Private dialog": not args.skip_private, + "Group": not args.skip_groups, + "Channel": not args.skip_channels, + } config = init_config(CONFIG_PATH) dialogs_list = read_dialogs(config["dialogs_list_folder"]) - client = telethon.TelegramClient(SESSION_NAME, config["api_id"], config["api_hash"], system_version="4.16.30-vxCUSTOM") - - DIALOGS_ID = dialogs_id_input_handler(args.dialogs_ids, is_dialog_type_accepted, dialogs_list) + client = telethon.TelegramClient( + SESSION_NAME, + config["api_id"], + config["api_hash"], + system_version="4.16.30-vxCUSTOM", + ) - # Dialogs are the "conversations you have open". - # This method returns a list of Dialog, which - # has the .entity attribute and other information. - # dialogs = client.get_dialogs() + DIALOGS_ID = dialogs_id_input_handler( + args.dialogs_ids, is_dialog_type_accepted, dialogs_list + ) if DEBUG_MODE: logging.basicConfig(level=logging.DEBUG) @@ -258,8 +400,15 @@ async def download_dialog(client, id, MSG_LIMIT, config): if not os.path.exists(config["dialogs_data_folder"]): os.mkdir(config["dialogs_data_folder"]) - for id in DIALOGS_ID: - print(f"Loading dialog #{id}") - - with client: - client.loop.run_until_complete(download_dialog(client, id, MSG_LIMIT, config)) + with client: + with client.takeout( + finalize=True, + contacts=False, + users=True, + chats=True, + megagroups=True, + channels=True, + files=False, + ) as takeout: + download_all(takeout, config) + # download_all(client, config) diff --git a/csv_test.py b/csv_test.py new file mode 100644 index 0000000..8cca8d1 --- /dev/null +++ b/csv_test.py @@ -0,0 +1,45 @@ +import glob +import os +import argparse +from typing import Dict + +import pandas as pd +import logging +import json + +import telethon + +from utils.utils import init_config, read_dialogs + + +dialog_file_path = os.path.join("../data/test.csv") + +dialog = [] +dialog.append( + { + "id": 1, + "message": """як можу в "С++,",","," просто розпарсити список списків +типу такого: [[4,2,1],[5,7,0],[5,0,3],[2,4],[0,3],[1,2,6],[5,7],[1,6]] + +от у хаскелі просто read і готово все""", + } +) +dialog.append( + { + "id": 2, + "message": "До речі, вони до сих пір там набирають мабуть", + } +) +dialog.append( + { + "id": 3, + "message": "test, test, test \n test, test", + } +) + +df = pd.DataFrame(dialog) +df.to_csv(dialog_file_path, encoding="utf-8") + +# d = glob.glob("../data/test/*.csv") +local_df = pd.read_csv("../data/test.csv", encoding="utf-8") +print(local_df)