From bd7d1cfe0f2d62d24d5d2a4bbf4ca950a5fcda12 Mon Sep 17 00:00:00 2001 From: Ilya Date: Sun, 5 Nov 2023 12:13:03 +0200 Subject: [PATCH 1/9] Small optimizations --- .vscode/launch.json | 16 ++++ .vscode/settings.json | 6 ++ 0_download_dialogs_list.py | 12 ++- 1_download_dialogs_data.py | 163 ++++++++++++++++++++++++++----------- csv_test.py | 45 ++++++++++ 5 files changed, 194 insertions(+), 48 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 .vscode/settings.json create mode 100644 csv_test.py diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..306f58e --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "justMyCode": true + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..d99f2f3 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,6 @@ +{ + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter" + }, + "python.formatting.provider": "none" +} \ No newline at end of file diff --git a/0_download_dialogs_list.py b/0_download_dialogs_list.py index a810740..dc24950 100644 --- a/0_download_dialogs_list.py +++ b/0_download_dialogs_list.py @@ -46,7 +46,7 @@ async def save_dialogs(client, dialogs_limit): dialog_name = dialog.name dialog_members = [] - print(f"dialog #{dialog_id}") + skip = False dialog_type = "" if dialog.is_user: @@ -55,6 +55,15 @@ async def save_dialogs(client, dialogs_limit): dialog_type = "Group" elif dialog.is_channel: dialog_type = "Channel" + skip = True + + if (dialog.unread_count > 1000): + skip = True + + prefix = "SKIP " if skip else "" + print(f"{prefix}dialog #{dialog_id}, name: {dialog_name}, type: {dialog_type}") + if skip: + continue try: users = await client.get_participants(dialog) @@ -92,3 +101,4 @@ async def save_dialogs(client, dialogs_limit): # save dialogs with client: client.loop.run_until_complete(save_dialogs(client, DIALOGS_LIMIT)) + \ No newline at end of file diff --git a/1_download_dialogs_data.py b/1_download_dialogs_data.py index 306b826..16e4d99 100644 --- a/1_download_dialogs_data.py +++ b/1_download_dialogs_data.py @@ -26,13 +26,13 @@ def init_args(): nargs="+", type=str, help="id(s) of dialog(s) to download, -1 for all", - required=True, + default="-", ) parser.add_argument( "--dialog_msg_limit", type=int, help="amount of messages to download from a dialog, -1 for all", - default=10000, + default=1000, ) parser.add_argument( "--config_path", @@ -42,14 +42,16 @@ def init_args(): ) parser.add_argument("--debug_mode", type=int, help="Debug mode", default=0) parser.add_argument("--session_name", type=str, help="session name", default="tmp") - parser.add_argument('--skip_private', action='store_true') - parser.add_argument('--skip_groups', action='store_true') - parser.add_argument('--skip_channels', action='store_true') + parser.add_argument("--skip_private", action="store_true") + parser.add_argument("--skip_groups", action="store_true") + parser.add_argument("--skip_channels", action="store_true") return parser.parse_args() -def dialogs_id_input_handler(input_id_lst, is_dialog_type_accepted, dialog_list="data/dialogs"): +def dialogs_id_input_handler( + input_id_lst, is_dialog_type_accepted, dialog_list="data/dialogs" +): """ Functions handles input_id_lst depending on the input @@ -59,16 +61,26 @@ def dialogs_id_input_handler(input_id_lst, is_dialog_type_accepted, dialog_list= :return: """ - if input_id_lst[0] == "-1": - return [dialog["id"] for dialog in dialog_list if is_dialog_type_accepted[dialog["type"]]] + if input_id_lst[0] == "-": + return [ + dialog["id"] + for dialog in dialog_list + if is_dialog_type_accepted[dialog["type"]] + ] elif len(input_id_lst) == 1: provided_ids = [int(dialog_id) for dialog_id in input_id_lst[0].split(",")] - return [dialog["id"] for dialog in dialog_list if - is_dialog_type_accepted[dialog["type"]] and dialog["id"] in provided_ids] + return [ + dialog["id"] + for dialog in dialog_list + if is_dialog_type_accepted[dialog["type"]] and dialog["id"] in provided_ids + ] elif len(input_id_lst) > 1: provided_ids = [int(dialog_id.replace(",", "")) for dialog_id in input_id_lst] - return [dialog["id"] for dialog in dialog_list if - is_dialog_type_accepted[dialog["type"]] and dialog["id"] in provided_ids] + return [ + dialog["id"] + for dialog in dialog_list + if is_dialog_type_accepted[dialog["type"]] and dialog["id"] in provided_ids + ] def msg_limit_input_handler(msg_limit): @@ -98,6 +110,8 @@ def msg_handler(msg): "to_id": "", } + print("msg_handler") + if hasattr(msg.to_id, "user_id"): msg_attributes["to_id"] = msg.to_id.user_id else: @@ -108,27 +122,33 @@ def msg_handler(msg): if isinstance(attribute, telethon.tl.types.DocumentAttributeSticker): msg_attributes["message"] = attribute.alt msg_attributes["type"] = "sticker" + break elif msg.video: for attribute in msg.video.attributes: if isinstance(attribute, telethon.tl.types.DocumentAttributeVideo): msg_attributes["duration"] = attribute.duration msg_attributes["type"] = "video" + break elif msg.voice: for attribute in msg.voice.attributes: if isinstance(attribute, telethon.tl.types.DocumentAttributeAudio): msg_attributes["duration"] = attribute.duration msg_attributes["type"] = "voice" + break elif msg.photo: msg_attributes["type"] = "photo" + print("msg_handler finish") + return msg_attributes -async def get_message_reactions(message: telethon.types.Message, dialog_peer: telethon.types.InputPeerChat) \ - -> Dict[int, str]: +async def get_message_reactions( + message: telethon.types.Message, dialog_peer: telethon.types.InputPeerChat +) -> Dict[int, str]: """ Loads reactions for a single message. Doesn't work for broadcast channels. @@ -137,15 +157,23 @@ async def get_message_reactions(message: telethon.types.Message, dialog_peer: te :return: dict of "user_id - reaction emoji" pairs """ + + print("msg_handler") + try: - result = await client(telethon.functions.messages.GetMessageReactionsListRequest( - peer=dialog_peer, - id=message.id, - limit=REACTIONS_LIMIT_PER_MESSAGE, - )) + result = await client( + telethon.functions.messages.GetMessageReactionsListRequest( + peer=dialog_peer, + id=message.id, + limit=REACTIONS_LIMIT_PER_MESSAGE, + ) + ) reaction_objects = result.reactions - reactions = {reaction_object.peer_id.user_id: reaction_object.reaction for reaction_object in reaction_objects} + reactions = { + reaction_object.peer_id.user_id: reaction_object.reaction + for reaction_object in reaction_objects + } except telethon.errors.rpcerrorlist.MsgIdInvalidError: reactions = {} @@ -153,9 +181,23 @@ async def get_message_reactions(message: telethon.types.Message, dialog_peer: te except: reactions = None + print("msg_handler finish") + return reactions +def print_dialog(id, config): + try: + dialog_data_json = f'{config["dialogs_list_folder"]}/{id}.json' + with open(dialog_data_json) as json_file: + dialog_data = json.load(json_file) + print( + f"Loading dialog #{id}, name: {dialog_data.name}, type: {dialog_data.type}" + ) + except: + print(f"Loading dialog #{id}") + + async def download_dialog(client, id, MSG_LIMIT, config): """ Download messages and their metadata for a specific dialog id, @@ -166,8 +208,11 @@ async def download_dialog(client, id, MSG_LIMIT, config): try: # print(f"client.get_entity({id})") + print(f"downloading {id}") + tg_entity = await client.get_entity(id) messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) + except ValueError: errmsg = f"No such ID found: #{id}" print(errmsg) @@ -181,9 +226,11 @@ async def download_dialog(client, id, MSG_LIMIT, config): with open(dialog_data_json) as json_file: dialog_data = json.load(json_file) - if "users" in dialog_data \ - and len(dialog_data["users"]) == 1 \ - and "username" in dialog_data["users"][0]: + if ( + "users" in dialog_data + and len(dialog_data["users"]) == 1 + and "username" in dialog_data["users"][0] + ): username = dialog_data["users"][0]["username"] print(f"Username: {username}") @@ -195,33 +242,44 @@ async def download_dialog(client, id, MSG_LIMIT, config): print(f"Done.") else: - raise ValueError(errmsg, ) + raise ValueError( + errmsg, + ) else: print(f"Error for dialog #{id}") print(dialog_data) return except ValueError: - raise ValueError(errmsg, ) + raise ValueError( + errmsg, + ) + + count = len(messages) + dialog = [count] - dialog = [] + print(f"processing messages started, count: {count}") + + for [i, m] in enumerate(messages): + print(f"#{i} processing message {m.id}") - for m in messages: msg_attrs = msg_handler(m) msg_reactions = await get_message_reactions(m, telethon.utils.get_peer(id)) - dialog.append( - { - "id": m.id, - "date": m.date, - "from_id": m.from_id, - "to_id": msg_attrs["to_id"], - "fwd_from": m.fwd_from, - "message": msg_attrs["message"], - "type": msg_attrs["type"], - "duration": msg_attrs["duration"], - "reactions": msg_reactions, - } - ) + dialog[i] = { + "id": m.id, + "date": m.date, + "from_id": m.from_id, + "to_id": msg_attrs["to_id"], + "fwd_from": m.fwd_from, + "message": msg_attrs["message"], + "type": msg_attrs["type"], + "duration": msg_attrs["duration"], + "reactions": msg_reactions, + } + + print(f"#{i} processing message {m.id} finished") + + print(f"processing messages finished") dialog_file_path = os.path.join(config["dialogs_data_folder"], f"{str(id)}.csv") @@ -237,15 +295,24 @@ async def download_dialog(client, id, MSG_LIMIT, config): SESSION_NAME = args.session_name DEBUG_MODE = args.debug_mode - is_dialog_type_accepted = {'Private dialog': not args.skip_private, - 'Group': not args.skip_groups, - 'Channel': not args.skip_channels} + is_dialog_type_accepted = { + "Private dialog": not args.skip_private, + "Group": not args.skip_groups, + "Channel": not args.skip_channels, + } config = init_config(CONFIG_PATH) dialogs_list = read_dialogs(config["dialogs_list_folder"]) - client = telethon.TelegramClient(SESSION_NAME, config["api_id"], config["api_hash"], system_version="4.16.30-vxCUSTOM") + client = telethon.TelegramClient( + SESSION_NAME, + config["api_id"], + config["api_hash"], + system_version="4.16.30-vxCUSTOM", + ) - DIALOGS_ID = dialogs_id_input_handler(args.dialogs_ids, is_dialog_type_accepted, dialogs_list) + DIALOGS_ID = dialogs_id_input_handler( + args.dialogs_ids, is_dialog_type_accepted, dialogs_list + ) # Dialogs are the "conversations you have open". # This method returns a list of Dialog, which @@ -259,7 +326,9 @@ async def download_dialog(client, id, MSG_LIMIT, config): os.mkdir(config["dialogs_data_folder"]) for id in DIALOGS_ID: - print(f"Loading dialog #{id}") + print_dialog(id, config) with client: - client.loop.run_until_complete(download_dialog(client, id, MSG_LIMIT, config)) + client.loop.run_until_complete( + download_dialog(client, id, MSG_LIMIT, config) + ) diff --git a/csv_test.py b/csv_test.py new file mode 100644 index 0000000..8cca8d1 --- /dev/null +++ b/csv_test.py @@ -0,0 +1,45 @@ +import glob +import os +import argparse +from typing import Dict + +import pandas as pd +import logging +import json + +import telethon + +from utils.utils import init_config, read_dialogs + + +dialog_file_path = os.path.join("../data/test.csv") + +dialog = [] +dialog.append( + { + "id": 1, + "message": """як можу в "С++,",","," просто розпарсити список списків +типу такого: [[4,2,1],[5,7,0],[5,0,3],[2,4],[0,3],[1,2,6],[5,7],[1,6]] + +от у хаскелі просто read і готово все""", + } +) +dialog.append( + { + "id": 2, + "message": "До речі, вони до сих пір там набирають мабуть", + } +) +dialog.append( + { + "id": 3, + "message": "test, test, test \n test, test", + } +) + +df = pd.DataFrame(dialog) +df.to_csv(dialog_file_path, encoding="utf-8") + +# d = glob.glob("../data/test/*.csv") +local_df = pd.read_csv("../data/test.csv", encoding="utf-8") +print(local_df) From 6667cb2fdc084a2a07493212e90b6de1bb2c22e2 Mon Sep 17 00:00:00 2001 From: Ilya Date: Sun, 5 Nov 2023 12:30:40 +0200 Subject: [PATCH 2/9] Parallel processing --- 0_download_dialogs_list.py | 19 ++--- 1_download_dialogs_data.py | 159 +++++++++++++++++++------------------ 2 files changed, 93 insertions(+), 85 deletions(-) diff --git a/0_download_dialogs_list.py b/0_download_dialogs_list.py index dc24950..33c472d 100644 --- a/0_download_dialogs_list.py +++ b/0_download_dialogs_list.py @@ -13,9 +13,7 @@ def init_args(): :return: argparse.Namespace """ - parser = argparse.ArgumentParser( - description="Download dialogs list for user." - ) + parser = argparse.ArgumentParser(description="Download dialogs list for user.") parser.add_argument( "--dialogs_limit", type=int, help="number of dialogs", required=True @@ -57,9 +55,6 @@ async def save_dialogs(client, dialogs_limit): dialog_type = "Channel" skip = True - if (dialog.unread_count > 1000): - skip = True - prefix = "SKIP " if skip else "" print(f"{prefix}dialog #{dialog_id}, name: {dialog_name}, type: {dialog_type}") if skip: @@ -82,7 +77,9 @@ async def save_dialogs(client, dialogs_limit): except BaseException as error: print("ERROR\n", error) - save_dialog(dialog_id, dialog_name, dialog_members, dialog_type, DIALOGS_LIST_FOLDER) + save_dialog( + dialog_id, dialog_name, dialog_members, dialog_type, DIALOGS_LIST_FOLDER + ) if __name__ == "__main__": @@ -94,11 +91,15 @@ async def save_dialogs(client, dialogs_limit): SESSION_NAME = args.session_name config = init_config(CONFIG_PATH) - client = TelegramClient(SESSION_NAME, config["api_id"], config["api_hash"], system_version="4.16.30-vxCUSTOM") + client = TelegramClient( + SESSION_NAME, + config["api_id"], + config["api_hash"], + system_version="4.16.30-vxCUSTOM", + ) DIALOGS_LIST_FOLDER = config["dialogs_list_folder"] # save dialogs with client: client.loop.run_until_complete(save_dialogs(client, DIALOGS_LIMIT)) - \ No newline at end of file diff --git a/1_download_dialogs_data.py b/1_download_dialogs_data.py index 16e4d99..9050a8b 100644 --- a/1_download_dialogs_data.py +++ b/1_download_dialogs_data.py @@ -1,3 +1,4 @@ +import asyncio import os import argparse from typing import Dict @@ -186,16 +187,23 @@ async def get_message_reactions( return reactions -def print_dialog(id, config): - try: - dialog_data_json = f'{config["dialogs_list_folder"]}/{id}.json' - with open(dialog_data_json) as json_file: - dialog_data = json.load(json_file) - print( - f"Loading dialog #{id}, name: {dialog_data.name}, type: {dialog_data.type}" - ) - except: - print(f"Loading dialog #{id}") +async def process_message(i, m, id): + print(f"#{i} processing message {m.id}") + + msg_attrs = msg_handler(m) + msg_reactions = await get_message_reactions(m, telethon.utils.get_peer(id)) + + return { + "id": m.id, + "date": m.date, + "from_id": m.from_id, + "to_id": msg_attrs["to_id"], + "fwd_from": m.fwd_from, + "message": msg_attrs["message"], + "type": msg_attrs["type"], + "duration": msg_attrs["duration"], + "reactions": msg_reactions, + } async def download_dialog(client, id, MSG_LIMIT, config): @@ -205,86 +213,75 @@ async def download_dialog(client, id, MSG_LIMIT, config): :return: None """ - try: - # print(f"client.get_entity({id})") - - print(f"downloading {id}") + print(f"#{id} Downloading dialog") + try: tg_entity = await client.get_entity(id) messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) - except ValueError: - errmsg = f"No such ID found: #{id}" - print(errmsg) + messages = await download_dialog_by_username(client, id, MSG_LIMIT, config) - try: - print("Trying to init through username") + count = len(messages) - username = None - dialog_data_json = f'{config["dialogs_list_folder"]}/{id}.json' + print(f"#{id} Processing messages started, count: {count}") + tasks = [count] + for i, m in enumerate(messages): + task = process_message(i, m, id) + tasks[i] = task - with open(dialog_data_json) as json_file: - dialog_data = json.load(json_file) + dialog = await asyncio.gather(*tasks) + print(f"#{id} Processing messages finished") - if ( - "users" in dialog_data - and len(dialog_data["users"]) == 1 - and "username" in dialog_data["users"][0] - ): - username = dialog_data["users"][0]["username"] + print(f"#{id} Saving dialog to {str(id)}.csv") + dialog_file_path = os.path.join(config["dialogs_data_folder"], f"{str(id)}.csv") + df = pd.DataFrame(dialog) + df.to_csv(dialog_file_path) + print(f"#{id} Dialog saved to {str(id)}.csv") - print(f"Username: {username}") - if username: - _ = await client.get_entity(username) - tg_entity = await client.get_entity(id) - messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) +async def download_dialog_by_username(client, id, MSG_LIMIT, config): + errmsg = f"No such ID found: #{id}" + print(errmsg) - print(f"Done.") - else: - raise ValueError( - errmsg, - ) - else: - print(f"Error for dialog #{id}") - print(dialog_data) - return - except ValueError: - raise ValueError( - errmsg, - ) + messages = [] - count = len(messages) - dialog = [count] - - print(f"processing messages started, count: {count}") - - for [i, m] in enumerate(messages): - print(f"#{i} processing message {m.id}") - - msg_attrs = msg_handler(m) - msg_reactions = await get_message_reactions(m, telethon.utils.get_peer(id)) - - dialog[i] = { - "id": m.id, - "date": m.date, - "from_id": m.from_id, - "to_id": msg_attrs["to_id"], - "fwd_from": m.fwd_from, - "message": msg_attrs["message"], - "type": msg_attrs["type"], - "duration": msg_attrs["duration"], - "reactions": msg_reactions, - } + try: + print("Trying to init through username") - print(f"#{i} processing message {m.id} finished") + username = None + dialog_data_json = f'{config["dialogs_list_folder"]}/{id}.json' - print(f"processing messages finished") + with open(dialog_data_json) as json_file: + dialog_data = json.load(json_file) - dialog_file_path = os.path.join(config["dialogs_data_folder"], f"{str(id)}.csv") + if ( + "users" in dialog_data + and len(dialog_data["users"]) == 1 + and "username" in dialog_data["users"][0] + ): + username = dialog_data["users"][0]["username"] - df = pd.DataFrame(dialog) - df.to_csv(dialog_file_path) + print(f"Username: {username}") + + if username: + _ = await client.get_entity(username) + tg_entity = await client.get_entity(id) + messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) + print(f"Done.") + else: + raise ValueError( + errmsg, + ) + else: + print(f"Error for dialog #{id}") + print(dialog_data) + return + except ValueError: + raise ValueError( + errmsg, + ) + + return messages if __name__ == "__main__": @@ -326,9 +323,19 @@ async def download_dialog(client, id, MSG_LIMIT, config): os.mkdir(config["dialogs_data_folder"]) for id in DIALOGS_ID: - print_dialog(id, config) - with client: client.loop.run_until_complete( download_dialog(client, id, MSG_LIMIT, config) ) + + +# def print_dialog(id, config): +# try: +# dialog_data_json = f'{config["dialogs_list_folder"]}/{id}.json' +# with open(dialog_data_json) as json_file: +# dialog_data = json.load(json_file) +# print( +# f"Loading dialog #{id}, name: {dialog_data.name}, type: {dialog_data.type}" +# ) +# except: +# print(f"Loading dialog #{id}") From 44decac4666c8bc65ba21653c34342a7d771b7a0 Mon Sep 17 00:00:00 2001 From: Ilya Date: Sun, 5 Nov 2023 13:16:33 +0200 Subject: [PATCH 3/9] More modular code --- 1_download_dialogs_data.py | 155 +++++++++++++++++++++---------------- 1 file changed, 87 insertions(+), 68 deletions(-) diff --git a/1_download_dialogs_data.py b/1_download_dialogs_data.py index 9050a8b..f034a48 100644 --- a/1_download_dialogs_data.py +++ b/1_download_dialogs_data.py @@ -1,4 +1,5 @@ import asyncio +import datetime import os import argparse from typing import Dict @@ -33,7 +34,8 @@ def init_args(): "--dialog_msg_limit", type=int, help="amount of messages to download from a dialog, -1 for all", - default=1000, + default="-", + required=True, ) parser.add_argument( "--config_path", @@ -62,7 +64,7 @@ def dialogs_id_input_handler( :return: """ - if input_id_lst[0] == "-": + if input_id_lst[0] == "-1": return [ dialog["id"] for dialog in dialog_list @@ -70,6 +72,7 @@ def dialogs_id_input_handler( ] elif len(input_id_lst) == 1: provided_ids = [int(dialog_id) for dialog_id in input_id_lst[0].split(",")] + print(f"provided_ids 1 {input_id_lst[0]}") return [ dialog["id"] for dialog in dialog_list @@ -77,6 +80,7 @@ def dialogs_id_input_handler( ] elif len(input_id_lst) > 1: provided_ids = [int(dialog_id.replace(",", "")) for dialog_id in input_id_lst] + print(f"provided_ids 2 {len(provided_ids)}") return [ dialog["id"] for dialog in dialog_list @@ -187,56 +191,8 @@ async def get_message_reactions( return reactions -async def process_message(i, m, id): - print(f"#{i} processing message {m.id}") - - msg_attrs = msg_handler(m) - msg_reactions = await get_message_reactions(m, telethon.utils.get_peer(id)) - - return { - "id": m.id, - "date": m.date, - "from_id": m.from_id, - "to_id": msg_attrs["to_id"], - "fwd_from": m.fwd_from, - "message": msg_attrs["message"], - "type": msg_attrs["type"], - "duration": msg_attrs["duration"], - "reactions": msg_reactions, - } - - -async def download_dialog(client, id, MSG_LIMIT, config): - """ - Download messages and their metadata for a specific dialog id, - and save them in *ID*.csv - - :return: None - """ - - print(f"#{id} Downloading dialog") - try: - tg_entity = await client.get_entity(id) - messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) - except ValueError: - messages = await download_dialog_by_username(client, id, MSG_LIMIT, config) - - count = len(messages) - - print(f"#{id} Processing messages started, count: {count}") - tasks = [count] - for i, m in enumerate(messages): - task = process_message(i, m, id) - tasks[i] = task - - dialog = await asyncio.gather(*tasks) - print(f"#{id} Processing messages finished") - - print(f"#{id} Saving dialog to {str(id)}.csv") - dialog_file_path = os.path.join(config["dialogs_data_folder"], f"{str(id)}.csv") - df = pd.DataFrame(dialog) - df.to_csv(dialog_file_path) - print(f"#{id} Dialog saved to {str(id)}.csv") +def time(): + return datetime.datetime.now() async def download_dialog_by_username(client, id, MSG_LIMIT, config): @@ -284,6 +240,79 @@ async def download_dialog_by_username(client, id, MSG_LIMIT, config): return messages +async def download_dialog(client, dialog_id, MSG_LIMIT, config): + """ + Download messages and their metadata for a specific dialog id, + and save them in *ID*.csv + + :return: None + """ + + print(f"[{time()}] [{m.id}] Downloading dialog started") + try: + tg_entity = await client.get_entity(dialog_id) + messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) + except ValueError: + messages = await download_dialog_by_username( + client, dialog_id, MSG_LIMIT, config + ) + print(f"[{time()}] [{m.id}] Downloading dialog finished") + + return {"dialog_id": dialog_id, "messages": messages} + + +async def download_dialogs(client, DIALOGS_ID, MSG_LIMIT, config): + with client: + tasks = [len(DIALOGS_ID)] + for [i, id] in enumerate(DIALOGS_ID): + task = download_dialog(client, id, MSG_LIMIT, config) + tasks[i] = task + + await asyncio.gather(*tasks) + + +async def process_message(i, m, dialog_id): + print(f"[{time()}] [{m.id}] processing message #{i}") + msg_attrs = msg_handler(m) + msg_reactions = await get_message_reactions(m, telethon.utils.get_peer(dialog_id)) + + return { + "id": m.id, + "date": m.date, + "from_id": m.from_id, + "to_id": msg_attrs["to_id"], + "fwd_from": m.fwd_from, + "message": msg_attrs["message"], + "type": msg_attrs["type"], + "duration": msg_attrs["duration"], + "reactions": msg_reactions, + } + + +async def process_dialog(dialog_data): + dialog_id = dialog_data.dialog_data + messages = dialog_data.messages + count = len(messages) + + print(f"[{time()}] [{m.id}] Processing dialog started, count: {count}") + # tasks = [count] + dialog = [count] + for i, m in enumerate(messages): + dialog[i] = process_message(i, m, dialog_id) + # tasks[i] = task + + # dialog = await asyncio.gather(*tasks) + print(f"[{time()}] [{m.id}] Processing dialog finished") + + print(f"[{time()}] [{m.id}] Saving dialog to {str(dialog_id)}.csv") + dialog_file_path = os.path.join( + config["dialogs_data_folder"], f"{str(dialog_id)}.csv" + ) + df = pd.DataFrame(dialog) + df.to_csv(dialog_file_path) + print(f"[{time()}] [{m.id}] Dialog saved to {str(dialog_id)}.csv") + + if __name__ == "__main__": args = init_args() @@ -322,20 +351,10 @@ async def download_dialog_by_username(client, id, MSG_LIMIT, config): if not os.path.exists(config["dialogs_data_folder"]): os.mkdir(config["dialogs_data_folder"]) - for id in DIALOGS_ID: - with client: - client.loop.run_until_complete( - download_dialog(client, id, MSG_LIMIT, config) - ) - + asyncio.run(download_dialogs(client, DIALOGS_ID, MSG_LIMIT, config)) -# def print_dialog(id, config): -# try: -# dialog_data_json = f'{config["dialogs_list_folder"]}/{id}.json' -# with open(dialog_data_json) as json_file: -# dialog_data = json.load(json_file) -# print( -# f"Loading dialog #{id}, name: {dialog_data.name}, type: {dialog_data.type}" -# ) -# except: -# print(f"Loading dialog #{id}") + # with client: + # for id in DIALOGS_ID: + # client.loop.run_until_complete( + # download_dialog(client, id, MSG_LIMIT, config) + # ) From b0d5912181e5b983b5b189a4f0520f88c1fb22e7 Mon Sep 17 00:00:00 2001 From: Ilya Date: Sun, 5 Nov 2023 14:22:14 +0200 Subject: [PATCH 4/9] Producer-consumer implenetation --- 1_download_dialogs_data.py | 105 +++++++++++++++++++++++++++---------- 1 file changed, 78 insertions(+), 27 deletions(-) diff --git a/1_download_dialogs_data.py b/1_download_dialogs_data.py index f034a48..8a388fb 100644 --- a/1_download_dialogs_data.py +++ b/1_download_dialogs_data.py @@ -2,6 +2,7 @@ import datetime import os import argparse +import time from typing import Dict import pandas as pd @@ -13,6 +14,9 @@ from utils.utils import init_config, read_dialogs REACTIONS_LIMIT_PER_MESSAGE = 100 +MAX_ACTIVE_PRODUCER_COUNT = 3 +CONSUMER_COUNT = 25 +DIALOG_QUEUE = asyncio.Queue() def init_args(): @@ -191,11 +195,12 @@ async def get_message_reactions( return reactions -def time(): - return datetime.datetime.now() +def timelog(): + formatted_time = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3] + return formatted_time -async def download_dialog_by_username(client, id, MSG_LIMIT, config): +async def download_dialog_by_username(takeout, id, MSG_LIMIT, config): errmsg = f"No such ID found: #{id}" print(errmsg) @@ -220,9 +225,9 @@ async def download_dialog_by_username(client, id, MSG_LIMIT, config): print(f"Username: {username}") if username: - _ = await client.get_entity(username) - tg_entity = await client.get_entity(id) - messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) + _ = await takeout.get_entity(username) + tg_entity = await takeout.get_entity(id) + messages = await takeout.get_messages(tg_entity, limit=MSG_LIMIT) print(f"Done.") else: raise ValueError( @@ -240,7 +245,9 @@ async def download_dialog_by_username(client, id, MSG_LIMIT, config): return messages -async def download_dialog(client, dialog_id, MSG_LIMIT, config): +async def download_dialog( + takeout: telethon.TelegramClient, dialog_id, MSG_LIMIT, config, semaphore +): """ Download messages and their metadata for a specific dialog id, and save them in *ID*.csv @@ -248,31 +255,35 @@ async def download_dialog(client, dialog_id, MSG_LIMIT, config): :return: None """ - print(f"[{time()}] [{m.id}] Downloading dialog started") + await semaphore.acquire() + + print(f"[{timelog()}] [{dialog_id}] Downloading dialog started") try: - tg_entity = await client.get_entity(dialog_id) - messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) + tg_entity = await takeout.get_entity(dialog_id) + messages = await takeout.get_messages(tg_entity, limit=MSG_LIMIT) except ValueError: messages = await download_dialog_by_username( - client, dialog_id, MSG_LIMIT, config + takeout, dialog_id, MSG_LIMIT, config ) - print(f"[{time()}] [{m.id}] Downloading dialog finished") + print(f"[{timelog()}] [{dialog_id}] Downloading dialog finished") - return {"dialog_id": dialog_id, "messages": messages} + await DIALOG_QUEUE.put({"dialog_id": dialog_id, "messages": messages}) + semaphore.release() -async def download_dialogs(client, DIALOGS_ID, MSG_LIMIT, config): - with client: - tasks = [len(DIALOGS_ID)] - for [i, id] in enumerate(DIALOGS_ID): - task = download_dialog(client, id, MSG_LIMIT, config) - tasks[i] = task - await asyncio.gather(*tasks) +# async def download_dialogs(client, DIALOGS_ID, MSG_LIMIT, config): +# with client: +# tasks = [len(DIALOGS_ID)] +# for [i, id] in enumerate(DIALOGS_ID): +# task = download_dialog(client, id, MSG_LIMIT, config) +# tasks[i] = task + +# await asyncio.gather(*tasks) async def process_message(i, m, dialog_id): - print(f"[{time()}] [{m.id}] processing message #{i}") + print(f"[{timelog()}] [{dialog_id}] processing message #{i} {m.id}") msg_attrs = msg_handler(m) msg_reactions = await get_message_reactions(m, telethon.utils.get_peer(dialog_id)) @@ -289,12 +300,12 @@ async def process_message(i, m, dialog_id): } -async def process_dialog(dialog_data): +async def process_dialog(dialog_data, config): dialog_id = dialog_data.dialog_data messages = dialog_data.messages count = len(messages) - print(f"[{time()}] [{m.id}] Processing dialog started, count: {count}") + print(f"[{timelog()}] [{dialog_id}] Processing dialog started, count: {count}") # tasks = [count] dialog = [count] for i, m in enumerate(messages): @@ -302,15 +313,41 @@ async def process_dialog(dialog_data): # tasks[i] = task # dialog = await asyncio.gather(*tasks) - print(f"[{time()}] [{m.id}] Processing dialog finished") + print(f"[{timelog()}] [{dialog_id}] Processing dialog finished") - print(f"[{time()}] [{m.id}] Saving dialog to {str(dialog_id)}.csv") + print(f"[{timelog()}] [{dialog_id}] Saving dialog to {str(dialog_id)}.csv") dialog_file_path = os.path.join( config["dialogs_data_folder"], f"{str(dialog_id)}.csv" ) df = pd.DataFrame(dialog) df.to_csv(dialog_file_path) - print(f"[{time()}] [{m.id}] Dialog saved to {str(dialog_id)}.csv") + print(f"[{timelog()}] [{dialog_id}] Dialog saved to {str(dialog_id)}.csv") + + +async def download_all(client, DIALOGS_ID, MSG_LIMIT, config): + # Create a semaphore to control the number of concurrent producers + producer_semaphore = asyncio.Semaphore(MAX_ACTIVE_PRODUCER_COUNT) + + # Start the producer coroutine + producer_tasks = [ + download_dialog(client, dialog_id, MSG_LIMIT, config, producer_semaphore) + for dialog_id in DIALOGS_ID + ] + + print(f"[{timelog()}] download_all - producers started") + await asyncio.gather(*producer_tasks) + + ################################ + + # Start multiple consumer coroutines (processing tasks) + consumer_tasks = [ + process_dialog(await DIALOG_QUEUE.get(), config) for _ in range(CONSUMER_COUNT) + ] + + print(f"[{timelog()}] download_all - consumers started") + await asyncio.gather(*consumer_tasks) + + print(f"[{timelog()}] download_all finished") if __name__ == "__main__": @@ -351,7 +388,21 @@ async def process_dialog(dialog_data): if not os.path.exists(config["dialogs_data_folder"]): os.mkdir(config["dialogs_data_folder"]) - asyncio.run(download_dialogs(client, DIALOGS_ID, MSG_LIMIT, config)) + with client: + with client.takeout( + finalize=True, + contacts=False, + users=True, + chats=True, + megagroups=True, + channels=False, + files=False, + ) as takeout: + asyncio.run(download_all(takeout, DIALOGS_ID, MSG_LIMIT, config)) + + # try: + # except telethon.errors.TakeoutInitDelayError as e: + # print(f"[{timelog()}]Waiting {e.seconds} seconds before takeout") # with client: # for id in DIALOGS_ID: From 4f855d6a4aa28ca374b5f494939b9600ea68456e Mon Sep 17 00:00:00 2001 From: Ilya Date: Sun, 5 Nov 2023 17:34:34 +0200 Subject: [PATCH 5/9] Added more logging, exception handlings and fixed bugs --- 1_download_dialogs_data.py | 211 ++++++++++++++++--------------------- 1 file changed, 93 insertions(+), 118 deletions(-) diff --git a/1_download_dialogs_data.py b/1_download_dialogs_data.py index 8a388fb..f38d062 100644 --- a/1_download_dialogs_data.py +++ b/1_download_dialogs_data.py @@ -119,8 +119,6 @@ def msg_handler(msg): "to_id": "", } - print("msg_handler") - if hasattr(msg.to_id, "user_id"): msg_attributes["to_id"] = msg.to_id.user_id else: @@ -150,8 +148,6 @@ def msg_handler(msg): elif msg.photo: msg_attributes["type"] = "photo" - print("msg_handler finish") - return msg_attributes @@ -167,8 +163,6 @@ async def get_message_reactions( :return: dict of "user_id - reaction emoji" pairs """ - print("msg_handler") - try: result = await client( telethon.functions.messages.GetMessageReactionsListRequest( @@ -190,8 +184,6 @@ async def get_message_reactions( except: reactions = None - print("msg_handler finish") - return reactions @@ -200,53 +192,38 @@ def timelog(): return formatted_time -async def download_dialog_by_username(takeout, id, MSG_LIMIT, config): - errmsg = f"No such ID found: #{id}" - print(errmsg) - +async def download_dialog_by_username( + client: telethon.TelegramClient, dialog_id, MSG_LIMIT, config +): messages = [] - - try: - print("Trying to init through username") - - username = None - dialog_data_json = f'{config["dialogs_list_folder"]}/{id}.json' - - with open(dialog_data_json) as json_file: - dialog_data = json.load(json_file) - - if ( - "users" in dialog_data - and len(dialog_data["users"]) == 1 - and "username" in dialog_data["users"][0] - ): - username = dialog_data["users"][0]["username"] - - print(f"Username: {username}") - - if username: - _ = await takeout.get_entity(username) - tg_entity = await takeout.get_entity(id) - messages = await takeout.get_messages(tg_entity, limit=MSG_LIMIT) - print(f"Done.") - else: - raise ValueError( - errmsg, - ) - else: - print(f"Error for dialog #{id}") - print(dialog_data) - return - except ValueError: - raise ValueError( - errmsg, - ) + dialog_data_json = f'{config["dialogs_list_folder"]}/{dialog_id}.json' + with open(dialog_data_json) as json_file: + dialog_data = json.load(json_file) + + if ( + "users" in dialog_data + and len(dialog_data["users"]) == 1 + and "username" in dialog_data["users"][0] + and dialog_data["users"][0]["username"] + ): + username = dialog_data["users"][0]["username"] + _ = await client.get_entity(username) + tg_entity = await client.get_entity(dialog_id) + messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) + + print( + f"[{timelog()}] [{dialog_id}] Dialog downloaded throught username {username}." + ) + else: + raise Exception( + f"Cannot download through username, dialog_data: {dialog_data}" + ) return messages async def download_dialog( - takeout: telethon.TelegramClient, dialog_id, MSG_LIMIT, config, semaphore + client: telethon.TelegramClient, dialog_id, MSG_LIMIT, config, semaphore ): """ Download messages and their metadata for a specific dialog id, @@ -257,71 +234,75 @@ async def download_dialog( await semaphore.acquire() - print(f"[{timelog()}] [{dialog_id}] Downloading dialog started") try: - tg_entity = await takeout.get_entity(dialog_id) - messages = await takeout.get_messages(tg_entity, limit=MSG_LIMIT) - except ValueError: - messages = await download_dialog_by_username( - takeout, dialog_id, MSG_LIMIT, config - ) - print(f"[{timelog()}] [{dialog_id}] Downloading dialog finished") + try: + tg_entity = await client.get_entity(dialog_id) + messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) + print(f"[{timelog()}] [{dialog_id}] Dialog downloaded") + except ValueError: + messages = await download_dialog_by_username( + client, dialog_id, MSG_LIMIT, config + ) - await DIALOG_QUEUE.put({"dialog_id": dialog_id, "messages": messages}) + if not messages or len(messages) == 0: + raise Exception("Messages are empty.") - semaphore.release() + await DIALOG_QUEUE.put({"dialog_id": dialog_id, "messages": messages}) + except Exception as e: + print(f"[{timelog()}] [{dialog_id}] Dialog skipped: {e}") -# async def download_dialogs(client, DIALOGS_ID, MSG_LIMIT, config): -# with client: -# tasks = [len(DIALOGS_ID)] -# for [i, id] in enumerate(DIALOGS_ID): -# task = download_dialog(client, id, MSG_LIMIT, config) -# tasks[i] = task - -# await asyncio.gather(*tasks) + semaphore.release() async def process_message(i, m, dialog_id): - print(f"[{timelog()}] [{dialog_id}] processing message #{i} {m.id}") - msg_attrs = msg_handler(m) - msg_reactions = await get_message_reactions(m, telethon.utils.get_peer(dialog_id)) - - return { - "id": m.id, - "date": m.date, - "from_id": m.from_id, - "to_id": msg_attrs["to_id"], - "fwd_from": m.fwd_from, - "message": msg_attrs["message"], - "type": msg_attrs["type"], - "duration": msg_attrs["duration"], - "reactions": msg_reactions, - } + # print(f"[{timelog()}] [{dialog_id}] Processing message №{i} with id={m.id}") + try: + msg_attrs = msg_handler(m) + msg_reactions = await get_message_reactions( + m, telethon.utils.get_peer(dialog_id) + ) + return { + "id": m.id, + "date": m.date, + "from_id": m.from_id, + "to_id": msg_attrs["to_id"], + "fwd_from": m.fwd_from, + "message": msg_attrs["message"], + "type": msg_attrs["type"], + "duration": msg_attrs["duration"], + "reactions": msg_reactions, + } + except Exception as e: + print( + f"[{timelog()}] [{dialog_id}] Processing message №{i} with id={m.id} failed: {e}" + ) + return None -async def process_dialog(dialog_data, config): - dialog_id = dialog_data.dialog_data - messages = dialog_data.messages +async def process_dialog(dialog, config): + dialog_id = dialog["dialog_id"] + messages = dialog["messages"] count = len(messages) - print(f"[{timelog()}] [{dialog_id}] Processing dialog started, count: {count}") - # tasks = [count] - dialog = [count] - for i, m in enumerate(messages): - dialog[i] = process_message(i, m, dialog_id) - # tasks[i] = task + try: + print(f"[{timelog()}] [{dialog_id}] Processing dialog started, count: {count}") - # dialog = await asyncio.gather(*tasks) - print(f"[{timelog()}] [{dialog_id}] Processing dialog finished") + dialog = [None] * count + for i, m in enumerate(messages): + dialog[i] = await process_message(i, m, dialog_id) - print(f"[{timelog()}] [{dialog_id}] Saving dialog to {str(dialog_id)}.csv") - dialog_file_path = os.path.join( - config["dialogs_data_folder"], f"{str(dialog_id)}.csv" - ) - df = pd.DataFrame(dialog) - df.to_csv(dialog_file_path) - print(f"[{timelog()}] [{dialog_id}] Dialog saved to {str(dialog_id)}.csv") + dialog_file_path = os.path.join( + config["dialogs_data_folder"], f"{str(dialog_id)}.csv" + ) + df = pd.DataFrame(dialog) + df.to_csv(dialog_file_path) + + print( + f"[{timelog()}] [{dialog_id}] Processing dialog finished, saved to {str(dialog_id)}.csv" + ) + except Exception as e: + print(f"[{timelog()}] [{dialog_id}] Processing dialog failed: {e}") async def download_all(client, DIALOGS_ID, MSG_LIMIT, config): @@ -329,25 +310,27 @@ async def download_all(client, DIALOGS_ID, MSG_LIMIT, config): producer_semaphore = asyncio.Semaphore(MAX_ACTIVE_PRODUCER_COUNT) # Start the producer coroutine + print(f"[{timelog()}] download_all - producers started") producer_tasks = [ download_dialog(client, dialog_id, MSG_LIMIT, config, producer_semaphore) for dialog_id in DIALOGS_ID ] - - print(f"[{timelog()}] download_all - producers started") - await asyncio.gather(*producer_tasks) + producer_promise = asyncio.gather(*producer_tasks) + print(f"[{timelog()}] download_all - producers finished") ################################ # Start multiple consumer coroutines (processing tasks) + print(f"[{timelog()}] download_all - consumers started") consumer_tasks = [ process_dialog(await DIALOG_QUEUE.get(), config) for _ in range(CONSUMER_COUNT) ] + consumer_promise = asyncio.gather(*consumer_tasks) + print(f"[{timelog()}] download_all - consumers finished") - print(f"[{timelog()}] download_all - consumers started") - await asyncio.gather(*consumer_tasks) - - print(f"[{timelog()}] download_all finished") + print(f"[{timelog()}] download_all - gather started") + await asyncio.gather(producer_promise, consumer_promise) + print(f"[{timelog()}] download_all - gather finished") if __name__ == "__main__": @@ -398,14 +381,6 @@ async def download_all(client, DIALOGS_ID, MSG_LIMIT, config): channels=False, files=False, ) as takeout: - asyncio.run(download_all(takeout, DIALOGS_ID, MSG_LIMIT, config)) - - # try: - # except telethon.errors.TakeoutInitDelayError as e: - # print(f"[{timelog()}]Waiting {e.seconds} seconds before takeout") - - # with client: - # for id in DIALOGS_ID: - # client.loop.run_until_complete( - # download_dialog(client, id, MSG_LIMIT, config) - # ) + takeout.loop.run_until_complete( + download_all(takeout, DIALOGS_ID, MSG_LIMIT, config) + ) From 724e731816f795c1b70c953be55b2813ca49c2fa Mon Sep 17 00:00:00 2001 From: Ilya Date: Sun, 5 Nov 2023 20:48:23 +0200 Subject: [PATCH 6/9] Removed get_message_reactions --- 1_download_dialogs_data.py | 190 +++++++++++++++++++++++++------------ 1 file changed, 128 insertions(+), 62 deletions(-) diff --git a/1_download_dialogs_data.py b/1_download_dialogs_data.py index f38d062..7a66578 100644 --- a/1_download_dialogs_data.py +++ b/1_download_dialogs_data.py @@ -2,7 +2,8 @@ import datetime import os import argparse -import time +import queue +import threading from typing import Dict import pandas as pd @@ -14,9 +15,9 @@ from utils.utils import init_config, read_dialogs REACTIONS_LIMIT_PER_MESSAGE = 100 -MAX_ACTIVE_PRODUCER_COUNT = 3 -CONSUMER_COUNT = 25 -DIALOG_QUEUE = asyncio.Queue() +DIALOG_DOWNLOADERS_COUNT = 10 +DIALOG_PROCESSORS_COUNT = 3 +DIALOG_QUEUE = queue.Queue() def init_args(): @@ -192,6 +193,9 @@ def timelog(): return formatted_time +################################### + + async def download_dialog_by_username( client: telethon.TelegramClient, dialog_id, MSG_LIMIT, config ): @@ -212,7 +216,7 @@ async def download_dialog_by_username( messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) print( - f"[{timelog()}] [{dialog_id}] Dialog downloaded throught username {username}." + f"[{timelog()}] [{dialog_id}] Downloading dialog throught username {username} finished." ) else: raise Exception( @@ -234,11 +238,12 @@ async def download_dialog( await semaphore.acquire() + print(f"[{timelog()}] [{dialog_id}] Downloading dialog started") try: try: tg_entity = await client.get_entity(dialog_id) messages = await client.get_messages(tg_entity, limit=MSG_LIMIT) - print(f"[{timelog()}] [{dialog_id}] Dialog downloaded") + print(f"[{timelog()}] [{dialog_id}] Downloading dialog finished") except ValueError: messages = await download_dialog_by_username( client, dialog_id, MSG_LIMIT, config @@ -247,7 +252,7 @@ async def download_dialog( if not messages or len(messages) == 0: raise Exception("Messages are empty.") - await DIALOG_QUEUE.put({"dialog_id": dialog_id, "messages": messages}) + DIALOG_QUEUE.put({"dialog_id": dialog_id, "messages": messages}) except Exception as e: print(f"[{timelog()}] [{dialog_id}] Dialog skipped: {e}") @@ -255,13 +260,35 @@ async def download_dialog( semaphore.release() +async def download_dialogs(client: telethon.TelegramClient, config): + # Create a semaphore to control the number of concurrent producers + semaphore = asyncio.Semaphore(DIALOG_DOWNLOADERS_COUNT) + + # Start the producer coroutine + print(f"[{timelog()}] download_dialogs started") + producer_tasks = [ + download_dialog(client, dialog_id, MSG_LIMIT, config, semaphore) + for dialog_id in DIALOGS_ID + ] + await asyncio.gather(*producer_tasks) + print(f"[{timelog()}] download_dialogs finished") + + +def download_dialogs_entrypoint(client: telethon.TelegramClient, config): + client.loop.run_until_complete(download_dialogs(client, config)) + + +#################################### + + async def process_message(i, m, dialog_id): # print(f"[{timelog()}] [{dialog_id}] Processing message №{i} with id={m.id}") try: msg_attrs = msg_handler(m) - msg_reactions = await get_message_reactions( - m, telethon.utils.get_peer(dialog_id) - ) + # msg_reactions = await get_message_reactions( + # m, telethon.utils.get_peer(dialog_id) + # ) + return { "id": m.id, "date": m.date, @@ -271,7 +298,7 @@ async def process_message(i, m, dialog_id): "message": msg_attrs["message"], "type": msg_attrs["type"], "duration": msg_attrs["duration"], - "reactions": msg_reactions, + # "reactions": msg_reactions, } except Exception as e: print( @@ -280,57 +307,97 @@ async def process_message(i, m, dialog_id): return None -async def process_dialog(dialog, config): - dialog_id = dialog["dialog_id"] - messages = dialog["messages"] - count = len(messages) +async def process_dialogs(config): + while True: + dialog_data = DIALOG_QUEUE.get() + if dialog_data is None: + break + + dialog_id = dialog_data["dialog_id"] + messages = dialog_data["messages"] + count = len(messages) - try: print(f"[{timelog()}] [{dialog_id}] Processing dialog started, count: {count}") - dialog = [None] * count - for i, m in enumerate(messages): - dialog[i] = await process_message(i, m, dialog_id) + try: + dialog = [None] * count + for i, m in enumerate(messages): + dialog[i] = await process_message(i, m, dialog_id) - dialog_file_path = os.path.join( - config["dialogs_data_folder"], f"{str(dialog_id)}.csv" - ) - df = pd.DataFrame(dialog) - df.to_csv(dialog_file_path) + dialog_file_path = os.path.join( + config["dialogs_data_folder"], f"{str(dialog_id)}.csv" + ) + df = pd.DataFrame(dialog) + df.to_csv(dialog_file_path) - print( - f"[{timelog()}] [{dialog_id}] Processing dialog finished, saved to {str(dialog_id)}.csv" + print( + f"[{timelog()}] [{dialog_id}] Processing dialog finished, saved to {str(dialog_id)}.csv" + ) + except Exception as e: + print(f"[{timelog()}] [{dialog_id}] Processing dialog failed: {e}") + + DIALOG_QUEUE.task_done() + + +def process_dialogs_entrypoint(config): + asyncio.run(process_dialogs(config)) + + +def download_all(client, config): + # producer_thread = threading.Thread( + # target=download_dialogs_entrypoint, + # args=( + # client, + # config, + # ), + # daemon=True, + # ) + # producer_thread.start() + + consumer_threads = [] + for _ in range(DIALOG_PROCESSORS_COUNT): + thread = threading.Thread( + target=process_dialogs_entrypoint, args=(config,), daemon=True ) - except Exception as e: - print(f"[{timelog()}] [{dialog_id}] Processing dialog failed: {e}") + thread.start() + consumer_threads.append(thread) + # producer_thread.join() -async def download_all(client, DIALOGS_ID, MSG_LIMIT, config): - # Create a semaphore to control the number of concurrent producers - producer_semaphore = asyncio.Semaphore(MAX_ACTIVE_PRODUCER_COUNT) + download_dialogs_entrypoint(client, config) - # Start the producer coroutine - print(f"[{timelog()}] download_all - producers started") - producer_tasks = [ - download_dialog(client, dialog_id, MSG_LIMIT, config, producer_semaphore) - for dialog_id in DIALOGS_ID - ] - producer_promise = asyncio.gather(*producer_tasks) - print(f"[{timelog()}] download_all - producers finished") + # Signal the consumers to stop + for _ in range(DIALOG_PROCESSORS_COUNT): + DIALOG_QUEUE.put(None) - ################################ + for thread in consumer_threads: + thread.join() - # Start multiple consumer coroutines (processing tasks) - print(f"[{timelog()}] download_all - consumers started") - consumer_tasks = [ - process_dialog(await DIALOG_QUEUE.get(), config) for _ in range(CONSUMER_COUNT) - ] - consumer_promise = asyncio.gather(*consumer_tasks) - print(f"[{timelog()}] download_all - consumers finished") + # # Create a semaphore to control the number of concurrent producers + # producer_semaphore = asyncio.Semaphore(MAX_ACTIVE_DOWNLOAD_COUNT) + + # # Start the producer coroutine + # print(f"[{timelog()}] download_all - producers started") + # producer_tasks = [ + # download_dialog(client, dialog_id, MSG_LIMIT, config, producer_semaphore) + # for dialog_id in DIALOGS_ID + # ] + # producer_promise = asyncio.gather(*producer_tasks) + # print(f"[{timelog()}] download_all - producers finished") + + # ################################ - print(f"[{timelog()}] download_all - gather started") - await asyncio.gather(producer_promise, consumer_promise) - print(f"[{timelog()}] download_all - gather finished") + # # Start multiple consumer coroutines (processing tasks) + # print(f"[{timelog()}] download_all - consumers started") + # consumer_tasks = [ + # process_dialog(await DIALOG_QUEUE.get(), config) for _ in range(CONSUMER_COUNT) + # ] + # consumer_promise = asyncio.gather(*consumer_tasks) + # print(f"[{timelog()}] download_all - consumers finished") + + # print(f"[{timelog()}] download_all - gather started") + # await asyncio.gather(producer_promise, consumer_promise) + # print(f"[{timelog()}] download_all - gather finished") if __name__ == "__main__": @@ -372,15 +439,14 @@ async def download_all(client, DIALOGS_ID, MSG_LIMIT, config): os.mkdir(config["dialogs_data_folder"]) with client: - with client.takeout( - finalize=True, - contacts=False, - users=True, - chats=True, - megagroups=True, - channels=False, - files=False, - ) as takeout: - takeout.loop.run_until_complete( - download_all(takeout, DIALOGS_ID, MSG_LIMIT, config) - ) + # asyncio.run(client.end_takeout(False)) + # with client.takeout( + # finalize=True, + # contacts=False, + # users=True, + # chats=True, + # megagroups=True, + # channels=False, + # files=False, + # ) as takeout: + download_all(client, config) From 10981cc1138c641f41e436b280e091d782d80321 Mon Sep 17 00:00:00 2001 From: Ilya Date: Sun, 5 Nov 2023 22:15:28 +0200 Subject: [PATCH 7/9] Introduced delay --- 1_download_dialogs_data.py | 77 +++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/1_download_dialogs_data.py b/1_download_dialogs_data.py index 7a66578..14ce6fa 100644 --- a/1_download_dialogs_data.py +++ b/1_download_dialogs_data.py @@ -14,9 +14,11 @@ from utils.utils import init_config, read_dialogs -REACTIONS_LIMIT_PER_MESSAGE = 100 +# REACTIONS_LIMIT_PER_MESSAGE = 100 +DIALOG_DOWNLOAD_DELAY = 5 DIALOG_DOWNLOADERS_COUNT = 10 DIALOG_PROCESSORS_COUNT = 3 + DIALOG_QUEUE = queue.Queue() @@ -196,6 +198,10 @@ def timelog(): ################################### +def get_path(dialog_id, config): + return os.path.join(config["dialogs_data_folder"], f"{str(dialog_id)}.csv") + + async def download_dialog_by_username( client: telethon.TelegramClient, dialog_id, MSG_LIMIT, config ): @@ -236,8 +242,6 @@ async def download_dialog( :return: None """ - await semaphore.acquire() - print(f"[{timelog()}] [{dialog_id}] Downloading dialog started") try: try: @@ -257,7 +261,7 @@ async def download_dialog( except Exception as e: print(f"[{timelog()}] [{dialog_id}] Dialog skipped: {e}") - semaphore.release() + # semaphore.release() async def download_dialogs(client: telethon.TelegramClient, config): @@ -265,14 +269,37 @@ async def download_dialogs(client: telethon.TelegramClient, config): semaphore = asyncio.Semaphore(DIALOG_DOWNLOADERS_COUNT) # Start the producer coroutine - print(f"[{timelog()}] download_dialogs started") - producer_tasks = [ - download_dialog(client, dialog_id, MSG_LIMIT, config, semaphore) - for dialog_id in DIALOGS_ID - ] - await asyncio.gather(*producer_tasks) + print(f"[{timelog()}] download_dialogs started, count: {len(DIALOGS_ID)}") + + tasks = [] + for dialog_id in DIALOGS_ID: + if os.path.exists(get_path(dialog_id, config)): + # print(f"[{timelog()}] [{dialog_id}] Dialog already downloaded") + continue + + # await semaphore.acquire() + task = asyncio.create_task( + download_dialog(client, dialog_id, MSG_LIMIT, config, semaphore) + ) + tasks.append(task) + await asyncio.sleep(DIALOG_DOWNLOAD_DELAY) + + await asyncio.gather(*tasks) + print(f"[{timelog()}] download_dialogs finished") + # # Create a semaphore to control the number of concurrent producers + # semaphore = asyncio.Semaphore(DIALOG_DOWNLOADERS_COUNT) + + # # Start the producer coroutine + # print(f"[{timelog()}] download_dialogs started") + # producer_tasks = [ + # download_dialog(client, dialog_id, MSG_LIMIT, config, semaphore) + # for dialog_id in DIALOGS_ID + # ] + # await asyncio.gather(*producer_tasks) + # print(f"[{timelog()}] download_dialogs finished") + def download_dialogs_entrypoint(client: telethon.TelegramClient, config): client.loop.run_until_complete(download_dialogs(client, config)) @@ -324,9 +351,7 @@ async def process_dialogs(config): for i, m in enumerate(messages): dialog[i] = await process_message(i, m, dialog_id) - dialog_file_path = os.path.join( - config["dialogs_data_folder"], f"{str(dialog_id)}.csv" - ) + dialog_file_path = get_path(dialog_id, config) df = pd.DataFrame(dialog) df.to_csv(dialog_file_path) @@ -373,32 +398,6 @@ def download_all(client, config): for thread in consumer_threads: thread.join() - # # Create a semaphore to control the number of concurrent producers - # producer_semaphore = asyncio.Semaphore(MAX_ACTIVE_DOWNLOAD_COUNT) - - # # Start the producer coroutine - # print(f"[{timelog()}] download_all - producers started") - # producer_tasks = [ - # download_dialog(client, dialog_id, MSG_LIMIT, config, producer_semaphore) - # for dialog_id in DIALOGS_ID - # ] - # producer_promise = asyncio.gather(*producer_tasks) - # print(f"[{timelog()}] download_all - producers finished") - - # ################################ - - # # Start multiple consumer coroutines (processing tasks) - # print(f"[{timelog()}] download_all - consumers started") - # consumer_tasks = [ - # process_dialog(await DIALOG_QUEUE.get(), config) for _ in range(CONSUMER_COUNT) - # ] - # consumer_promise = asyncio.gather(*consumer_tasks) - # print(f"[{timelog()}] download_all - consumers finished") - - # print(f"[{timelog()}] download_all - gather started") - # await asyncio.gather(producer_promise, consumer_promise) - # print(f"[{timelog()}] download_all - gather finished") - if __name__ == "__main__": args = init_args() From b0d6c05a8d742f8c17da3afc756003015c446d13 Mon Sep 17 00:00:00 2001 From: Ilya Date: Sun, 5 Nov 2023 22:37:38 +0200 Subject: [PATCH 8/9] Final cleanup --- 1_download_dialogs_data.py | 65 ++++++++------------------------------ 1 file changed, 14 insertions(+), 51 deletions(-) diff --git a/1_download_dialogs_data.py b/1_download_dialogs_data.py index 14ce6fa..f2d50b8 100644 --- a/1_download_dialogs_data.py +++ b/1_download_dialogs_data.py @@ -14,9 +14,8 @@ from utils.utils import init_config, read_dialogs -# REACTIONS_LIMIT_PER_MESSAGE = 100 +REACTIONS_LIMIT_PER_MESSAGE = 100 DIALOG_DOWNLOAD_DELAY = 5 -DIALOG_DOWNLOADERS_COUNT = 10 DIALOG_PROCESSORS_COUNT = 3 DIALOG_QUEUE = queue.Queue() @@ -233,7 +232,7 @@ async def download_dialog_by_username( async def download_dialog( - client: telethon.TelegramClient, dialog_id, MSG_LIMIT, config, semaphore + client: telethon.TelegramClient, dialog_id, MSG_LIMIT, config ): """ Download messages and their metadata for a specific dialog id, @@ -261,14 +260,8 @@ async def download_dialog( except Exception as e: print(f"[{timelog()}] [{dialog_id}] Dialog skipped: {e}") - # semaphore.release() - async def download_dialogs(client: telethon.TelegramClient, config): - # Create a semaphore to control the number of concurrent producers - semaphore = asyncio.Semaphore(DIALOG_DOWNLOADERS_COUNT) - - # Start the producer coroutine print(f"[{timelog()}] download_dialogs started, count: {len(DIALOGS_ID)}") tasks = [] @@ -277,9 +270,8 @@ async def download_dialogs(client: telethon.TelegramClient, config): # print(f"[{timelog()}] [{dialog_id}] Dialog already downloaded") continue - # await semaphore.acquire() task = asyncio.create_task( - download_dialog(client, dialog_id, MSG_LIMIT, config, semaphore) + download_dialog(client, dialog_id, MSG_LIMIT, config) ) tasks.append(task) await asyncio.sleep(DIALOG_DOWNLOAD_DELAY) @@ -288,18 +280,6 @@ async def download_dialogs(client: telethon.TelegramClient, config): print(f"[{timelog()}] download_dialogs finished") - # # Create a semaphore to control the number of concurrent producers - # semaphore = asyncio.Semaphore(DIALOG_DOWNLOADERS_COUNT) - - # # Start the producer coroutine - # print(f"[{timelog()}] download_dialogs started") - # producer_tasks = [ - # download_dialog(client, dialog_id, MSG_LIMIT, config, semaphore) - # for dialog_id in DIALOGS_ID - # ] - # await asyncio.gather(*producer_tasks) - # print(f"[{timelog()}] download_dialogs finished") - def download_dialogs_entrypoint(client: telethon.TelegramClient, config): client.loop.run_until_complete(download_dialogs(client, config)) @@ -369,16 +349,6 @@ def process_dialogs_entrypoint(config): def download_all(client, config): - # producer_thread = threading.Thread( - # target=download_dialogs_entrypoint, - # args=( - # client, - # config, - # ), - # daemon=True, - # ) - # producer_thread.start() - consumer_threads = [] for _ in range(DIALOG_PROCESSORS_COUNT): thread = threading.Thread( @@ -387,8 +357,6 @@ def download_all(client, config): thread.start() consumer_threads.append(thread) - # producer_thread.join() - download_dialogs_entrypoint(client, config) # Signal the consumers to stop @@ -426,11 +394,6 @@ def download_all(client, config): args.dialogs_ids, is_dialog_type_accepted, dialogs_list ) - # Dialogs are the "conversations you have open". - # This method returns a list of Dialog, which - # has the .entity attribute and other information. - # dialogs = client.get_dialogs() - if DEBUG_MODE: logging.basicConfig(level=logging.DEBUG) @@ -438,14 +401,14 @@ def download_all(client, config): os.mkdir(config["dialogs_data_folder"]) with client: - # asyncio.run(client.end_takeout(False)) - # with client.takeout( - # finalize=True, - # contacts=False, - # users=True, - # chats=True, - # megagroups=True, - # channels=False, - # files=False, - # ) as takeout: - download_all(client, config) + with client.takeout( + finalize=True, + contacts=False, + users=True, + chats=True, + megagroups=True, + channels=True, + files=False, + ) as takeout: + download_all(takeout, config) + # download_all(client, config) From fbb18f79e2627be8fffba8c367cf2655c3344bb3 Mon Sep 17 00:00:00 2001 From: Ilya Date: Sun, 5 Nov 2023 22:39:13 +0200 Subject: [PATCH 9/9] Reduced delay --- 1_download_dialogs_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/1_download_dialogs_data.py b/1_download_dialogs_data.py index f2d50b8..93f72f6 100644 --- a/1_download_dialogs_data.py +++ b/1_download_dialogs_data.py @@ -15,7 +15,7 @@ from utils.utils import init_config, read_dialogs REACTIONS_LIMIT_PER_MESSAGE = 100 -DIALOG_DOWNLOAD_DELAY = 5 +DIALOG_DOWNLOAD_DELAY = 3 DIALOG_PROCESSORS_COUNT = 3 DIALOG_QUEUE = queue.Queue()