Skip to content

Commit

Permalink
Use semaphore on get_messages()
Browse files Browse the repository at this point in the history
  • Loading branch information
BurnySc2 committed Oct 28, 2023
1 parent a7d5a3a commit 36a71a8
Showing 1 changed file with 22 additions and 17 deletions.
39 changes: 22 additions & 17 deletions transcribe_website/transcriber_backend/src/telegram_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
ffmpeg_sem = asyncio.Semaphore(value=SECRETS.parallel_ffmpeg_count)

last_client_get_messages = time.time()
get_messages_sem = asyncio.Semaphore(value=1)


def update_file_ids_cache(channel_id: str, messages: list[Message], message_ids: list[int]) -> None:
Expand Down Expand Up @@ -216,23 +217,27 @@ async def download_one(worker_id: int | None = None) -> None:
two_hundred_message_ids: list[int] = TelegramMessage.get_n_queued_by_channel(message.channel_id)
if len(two_hundred_message_ids) > 200:
raise ValueError("The api does not allow more than 200 messages to be downloaded at once.")
try:
while time.time() - last_client_get_messages < 15:
# Don't send too many requests at once, need to wait at least 15 seconds
await asyncio.sleep(0.1)
last_client_get_messages = time.time()
# pyre-fixme[9]
up_to_date_messages: list[Message] = await DownloadWorker.client.get_messages(
chat_id=message.channel_id,
message_ids=two_hundred_message_ids,
replies=0,
)
except UsernameNotOccupied:
# Error when trying to download from channel that is inaccessible
with orm.db_session():
message = TelegramMessage[message.id]
message.download_status = Status.ERROR_CHANNEL_INACCESSIBLE.name
return
# Don't send too many requests at once, need to wait at least 20 seconds
async with get_messages_sem:
try:
while time.time() - last_client_get_messages < 20:
await asyncio.sleep(0.1)
last_client_get_messages = time.time()
# pyre-fixme[9]
up_to_date_messages: list[Message] = await DownloadWorker.client.get_messages(
chat_id=message.channel_id,
message_ids=two_hundred_message_ids,
replies=0,
)
# TODO fix floodwait
# pyrogram.errors.exceptions.flood_420.FloodWait
# https://stackoverflow.com/questions/71773181/avoid-floodwait-in-python
except UsernameNotOccupied:
# Error when trying to download from channel that is inaccessible
with orm.db_session():
message = TelegramMessage[message.id]
message.download_status = Status.ERROR_CHANNEL_INACCESSIBLE.name
return
update_file_ids_cache(message.channel_id, up_to_date_messages, two_hundred_message_ids)
file_id = get_from_file_ids_cache(message.channel_id, message.message_id)
if file_id is None or file_id == "UNKNOWN":
Expand Down

0 comments on commit 36a71a8

Please sign in to comment.