Skip to content

Commit

Permalink
fix: stop epic games from being scraped twice at startup (#286)
Browse files Browse the repository at this point in the history
  • Loading branch information
eikowagenknecht authored Oct 27, 2023
1 parent 1a09cb6 commit 26447b5
Showing 1 changed file with 85 additions and 67 deletions.
152 changes: 85 additions & 67 deletions src/lootscraper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
send_new_offers_telegram,
)
from lootscraper.scraper import get_all_scrapers
from lootscraper.scraper.scraper_base import Scraper
from lootscraper.telegrambot import TelegramBot, TelegramLoggingHandler
from lootscraper.tools import cleanup

if TYPE_CHECKING:
from playwright.async_api import BrowserContext

from lootscraper.scraper.scraper_base import Scraper

try:
from xvfbwrapper import Xvfb
Expand Down Expand Up @@ -202,6 +202,52 @@ async def run_scraper_loop(
logger.warning("No sources enabled, exiting.")
return

# Initialize the task queue. The queue is used to schedule the scraping
# tasks. Each scraper is scheduled by adding a task to the queue. The
# worker function then dequeues the task and calls the appropriate
# scraper.
task_queue: asyncio.Queue[Type[Scraper]] = asyncio.Queue()

# Schedule each scraper for one-time execution
for scraper_class in get_all_scrapers():
if scraper_class.is_enabled():
# Enqueue a fake "once per day" job for each scraper
schedule.every().day.do(task_queue.put_nowait, scraper_class)

# Schedule all fake tasks once and then clear the schedule again
schedule.run_all()
schedule.clear()

# Schedule each scraper that is enabled
for scraper_class in get_all_scrapers():
for job in scraper_class.get_schedule():
if scraper_class.is_enabled():
# Enqueue the scraper job into the task queue with the
# scraper class and the database session as arguments
job.do(task_queue.put_nowait, scraper_class)

# Create the worker task that will run the next task in the queue when
# it is due
asyncio.create_task(
scrape_worker(
db,
task_queue,
telegram_queue,
),
)

# Then add scheduled tasks in a loop according to their schedule
while True:
logger.debug("Checking if there are tasks to schedule.")
schedule.run_pending()
await asyncio.sleep(1)


async def scrape_worker(
db: LootDatabase,
task_queue: asyncio.Queue[Type[Scraper]],
telegram_queue: asyncio.Queue[int],
) -> None:
async with AsyncExitStack() as stack:
# Check the "global" variable (set on import) to see if we can use a
# virtual display
Expand All @@ -213,73 +259,45 @@ async def run_scraper_loop(
get_browser_context(),
)

# Use a single database session for all scrapers
# Use a single database session for all worker runs
db_session = db.Session()

# Initialize the task queue. The queue is used to schedule the scraping
# tasks. Each scraper is scheduled by adding a task to the queue. The
# worker function then dequeues the task and calls the appropriate
# scraper.
task_queue: asyncio.Queue[Type[Scraper]] = asyncio.Queue()

async def worker() -> None:
run_no = 0
while True:
# This triggers when the time has come to run a scraper
scraper_class = await task_queue.get()

run_no += 1
logger.debug(f"Executing scheduled task #{run_no}.")

try:
scraper_instance = scraper_class(context=browser_context)
scraped_offers = await scraper_instance.scrape()
await process_new_offers(
db,
browser_context,
db_session,
scraped_offers,
run_no = 0
while True:
# This triggers when the time has come to run a scraper
scraper_class = await task_queue.get()

run_no += 1
logger.debug(f"Executing scheduled task #{run_no}.")

try:
scraper_instance = scraper_class(context=browser_context)
scraped_offers = await scraper_instance.scrape()
await process_new_offers(
db,
browser_context,
db_session,
scraped_offers,
)

if Config.get().generate_feed:
await action_generate_feed(db)
else:
logging.info("Skipping feed generation because it is disabled.")

if Config.get().telegram_bot:
await telegram_queue.put(run_no)
else:
logging.debug(
"Skipping Telegram notification because it is disabled.",
)
except OperationalError:
# We handle DB errors on a higher level
raise
except Exception as e:
# This is our catch-all. Something really unexpected occurred.
# Log it with the highest priority and continue with the
# next scheduled run when it's due.
logger.critical(e)

if Config.get().generate_feed:
await action_generate_feed(db)
else:
logging.info("Skipping feed generation because it is disabled.")

if Config.get().telegram_bot:
await telegram_queue.put(run_no)
else:
logging.debug(
"Skipping Telegram notification because it is disabled.",
)
except OperationalError:
# We handle DB errors on a higher level
raise
except Exception as e:
# This is our catch-all. Something really unexpected occurred.
# Log it with the highest priority and continue with the
# next scheduled run when it's due.
logger.critical(e)

task_queue.task_done()

# Schedule each scraper that is enabled
for scraper_class in get_all_scrapers():
for job in scraper_class.get_schedule():
if scraper_class.is_enabled():
# Enqueue the scraper job into the task queue with the
# scraper class and the database session as arguments
job.do(task_queue.put_nowait, scraper_class)

# Create the worker task that will run the next task in the queue when
# it is due
asyncio.create_task(worker())

# Run tasks once after startup
schedule.run_all()

# Then run the tasks in a loop according to their schedule
while True:
logger.debug("Checking if there are tasks to schedule.")
schedule.run_pending()
await asyncio.sleep(1)
task_queue.task_done()

0 comments on commit 26447b5

Please sign in to comment.