From 26447b5f874ebf83daa069060588fa23b803098b Mon Sep 17 00:00:00 2001 From: Eiko Wagenknecht Date: Fri, 27 Oct 2023 09:47:45 +0200 Subject: [PATCH] fix: stop epic games from being scraped twice at startup (#286) --- src/lootscraper/main.py | 152 ++++++++++++++++++++++------------------ 1 file changed, 85 insertions(+), 67 deletions(-) diff --git a/src/lootscraper/main.py b/src/lootscraper/main.py index 5c949471..cb1ebb14 100644 --- a/src/lootscraper/main.py +++ b/src/lootscraper/main.py @@ -22,13 +22,13 @@ send_new_offers_telegram, ) from lootscraper.scraper import get_all_scrapers +from lootscraper.scraper.scraper_base import Scraper from lootscraper.telegrambot import TelegramBot, TelegramLoggingHandler from lootscraper.tools import cleanup if TYPE_CHECKING: from playwright.async_api import BrowserContext - from lootscraper.scraper.scraper_base import Scraper try: from xvfbwrapper import Xvfb @@ -202,6 +202,52 @@ async def run_scraper_loop( logger.warning("No sources enabled, exiting.") return + # Initialize the task queue. The queue is used to schedule the scraping + # tasks. Each scraper is scheduled by adding a task to the queue. The + # worker function then dequeues the task and calls the appropriate + # scraper. + task_queue: asyncio.Queue[Type[Scraper]] = asyncio.Queue() + + # Schedule each scraper for one-time execution + for scraper_class in get_all_scrapers(): + if scraper_class.is_enabled(): + # Enqueue a fake "once per day" job for each scraper + schedule.every().day.do(task_queue.put_nowait, scraper_class) + + # Schedule all fake tasks once and then clear the schedule again + schedule.run_all() + schedule.clear() + + # Schedule each scraper that is enabled + for scraper_class in get_all_scrapers(): + for job in scraper_class.get_schedule(): + if scraper_class.is_enabled(): + # Enqueue the scraper job into the task queue with the + # scraper class and the database session as arguments + job.do(task_queue.put_nowait, scraper_class) + + # Create the worker task that will run the next task in the queue when + # it is due + asyncio.create_task( + scrape_worker( + db, + task_queue, + telegram_queue, + ), + ) + + # Then add scheduled tasks in a loop according to their schedule + while True: + logger.debug("Checking if there are tasks to schedule.") + schedule.run_pending() + await asyncio.sleep(1) + + +async def scrape_worker( + db: LootDatabase, + task_queue: asyncio.Queue[Type[Scraper]], + telegram_queue: asyncio.Queue[int], +) -> None: async with AsyncExitStack() as stack: # Check the "global" variable (set on import) to see if we can use a # virtual display @@ -213,73 +259,45 @@ async def run_scraper_loop( get_browser_context(), ) - # Use a single database session for all scrapers + # Use a single database session for all worker runs db_session = db.Session() - # Initialize the task queue. The queue is used to schedule the scraping - # tasks. Each scraper is scheduled by adding a task to the queue. The - # worker function then dequeues the task and calls the appropriate - # scraper. - task_queue: asyncio.Queue[Type[Scraper]] = asyncio.Queue() - - async def worker() -> None: - run_no = 0 - while True: - # This triggers when the time has come to run a scraper - scraper_class = await task_queue.get() - - run_no += 1 - logger.debug(f"Executing scheduled task #{run_no}.") - - try: - scraper_instance = scraper_class(context=browser_context) - scraped_offers = await scraper_instance.scrape() - await process_new_offers( - db, - browser_context, - db_session, - scraped_offers, + run_no = 0 + while True: + # This triggers when the time has come to run a scraper + scraper_class = await task_queue.get() + + run_no += 1 + logger.debug(f"Executing scheduled task #{run_no}.") + + try: + scraper_instance = scraper_class(context=browser_context) + scraped_offers = await scraper_instance.scrape() + await process_new_offers( + db, + browser_context, + db_session, + scraped_offers, + ) + + if Config.get().generate_feed: + await action_generate_feed(db) + else: + logging.info("Skipping feed generation because it is disabled.") + + if Config.get().telegram_bot: + await telegram_queue.put(run_no) + else: + logging.debug( + "Skipping Telegram notification because it is disabled.", ) + except OperationalError: + # We handle DB errors on a higher level + raise + except Exception as e: + # This is our catch-all. Something really unexpected occurred. + # Log it with the highest priority and continue with the + # next scheduled run when it's due. + logger.critical(e) - if Config.get().generate_feed: - await action_generate_feed(db) - else: - logging.info("Skipping feed generation because it is disabled.") - - if Config.get().telegram_bot: - await telegram_queue.put(run_no) - else: - logging.debug( - "Skipping Telegram notification because it is disabled.", - ) - except OperationalError: - # We handle DB errors on a higher level - raise - except Exception as e: - # This is our catch-all. Something really unexpected occurred. - # Log it with the highest priority and continue with the - # next scheduled run when it's due. - logger.critical(e) - - task_queue.task_done() - - # Schedule each scraper that is enabled - for scraper_class in get_all_scrapers(): - for job in scraper_class.get_schedule(): - if scraper_class.is_enabled(): - # Enqueue the scraper job into the task queue with the - # scraper class and the database session as arguments - job.do(task_queue.put_nowait, scraper_class) - - # Create the worker task that will run the next task in the queue when - # it is due - asyncio.create_task(worker()) - - # Run tasks once after startup - schedule.run_all() - - # Then run the tasks in a loop according to their schedule - while True: - logger.debug("Checking if there are tasks to schedule.") - schedule.run_pending() - await asyncio.sleep(1) + task_queue.task_done()