Skip to content

Commit

Permalink
feat: use a custom scheduler for each scraper (#279)
Browse files Browse the repository at this point in the history
Signed-off-by: Eiko Wagenknecht <[email protected]>
  • Loading branch information
eikowagenknecht authored Oct 26, 2023
1 parent 15f87ef commit 81b5c22
Show file tree
Hide file tree
Showing 17 changed files with 233 additions and 122 deletions.
2 changes: 0 additions & 2 deletions config.default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ feed_file_prefix = "lootscraper"
log_file = "lootscraper.log"
# One of: DEBUG, INFO, WARNING, ERROR, CRITICAL
log_level = "INFO"
# How long the script waits between runs (in seconds). 0 for single run.
wait_between_runs_seconds = 0

[expert]
# Output all database queries to the console
Expand Down
24 changes: 23 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ SQLAlchemy = "^2.0.22"
Unidecode = "^1.3.7"
xvfbwrapper = "^0.2.9"
jinja2 = "^3.1.2"
schedule = "^1.2.1"
pytz = "^2023.3.post1"

[tool.poetry.group.dev]
optional = true
Expand Down Expand Up @@ -120,6 +122,7 @@ select = [

ignore = [
"ANN101", # Missing type annotation for self in method - Not needed with Self type
"ANN102", # Missing type annotation for cls in classmethod - Not needed
"S101", # Use of assert detected
"TRY003", # Long messages in exceptions
"D100", # Missing docstring in public module
Expand Down
6 changes: 0 additions & 6 deletions src/lootscraper/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class ParsedConfig:
feed_file_prefix: str = "lootscraper"
log_file: str = "lootscraper.log"
log_level: str = "INFO"
wait_between_runs_seconds: int = 0

# Expert
db_echo: bool = False
Expand Down Expand Up @@ -126,11 +125,6 @@ def get() -> ParsedConfig: # noqa: PLR0915
with contextlib.suppress(KeyError):
parsed_config.log_level = data["common"]["log_level"]

with contextlib.suppress(KeyError):
parsed_config.wait_between_runs_seconds = data["common"][
"wait_between_runs_seconds"
]

# Expert
with contextlib.suppress(KeyError):
parsed_config.db_echo = data["expert"]["db_echo"]
Expand Down
1 change: 1 addition & 0 deletions src/lootscraper/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ def initialize_or_update(self) -> None:
def read_active_offers(self, time: datetime) -> Sequence[Offer]:
session: Session = self.Session()
try:
# TODO: Add option for custom prefiltering because this is a lot!
# Prefilter to reduce database load. The details are handled below.
offers = (
session.execute(
Expand Down
138 changes: 96 additions & 42 deletions src/lootscraper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,33 @@
import logging
import shutil
import sys
from contextlib import ExitStack, suppress
from datetime import datetime, timedelta, timezone
from contextlib import AsyncExitStack, suppress
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import TYPE_CHECKING, Type

import schedule
from sqlalchemy.exc import OperationalError

from lootscraper import __version__
from lootscraper.browser import get_browser_context
from lootscraper.common import TIMESTAMP_LONG
from lootscraper.config import Config
from lootscraper.database import LootDatabase
from lootscraper.processing import scrape_new_offers, send_new_offers_telegram
from lootscraper.processing import (
action_generate_feed,
process_new_offers,
send_new_offers_telegram,
)
from lootscraper.scraper import get_all_scrapers
from lootscraper.telegrambot import TelegramBot, TelegramLoggingHandler
from lootscraper.tools import cleanup

if TYPE_CHECKING:
from playwright.async_api import BrowserContext

from lootscraper.scraper.scraper_base import Scraper

try:
from xvfbwrapper import Xvfb

Expand Down Expand Up @@ -184,48 +196,90 @@ async def run_scraper_loop(
db: LootDatabase,
telegram_queue: asyncio.Queue[int],
) -> None:
"""
Run the scraping job in a loop with a waiting time of x seconds (set in the
config file) between the runs.
"""
with ExitStack() as stack:
"""Run the scraping job in a loop with the scheduling set in the scraper classes."""
# No scrapers, no point in continuing
if len(Config.get().enabled_offer_sources) == 0:
logger.warning("No sources enabled, exiting.")
return

async with AsyncExitStack() as stack:
# Check the "global" variable (set on import) to see if we can use a
# virtual display
if use_virtual_display:
stack.enter_context(Xvfb())

time_between_runs = int(Config.get().wait_between_runs_seconds)

# Loop forever (until terminated by external events)
run_no = 0
# Use one single browser instance for all scrapers
browser_context: BrowserContext = await stack.enter_async_context(
get_browser_context(),
)

# Use a single database session for all scrapers
db_session = db.Session()

# Initialize the task queue. The queue is used to schedule the scraping
# tasks. Each scraper is scheduled by adding a task to the queue. The
# worker function then dequeues the task and calls the appropriate
# scraper.
task_queue: asyncio.Queue[Type[Scraper]] = asyncio.Queue()

async def worker() -> None:
run_no = 0
while True:
# This triggers when the time has come to run a scraper
scraper_class = await task_queue.get()

run_no += 1
logger.debug(f"Executing scheduled task #{run_no}.")

try:
scraper_instance = scraper_class(context=browser_context)
scraped_offers = await scraper_instance.scrape()
await process_new_offers(
db,
browser_context,
db_session,
scraped_offers,
)

if Config.get().generate_feed:
await action_generate_feed(db)
else:
logging.info("Skipping feed generation because it is disabled.")

if Config.get().telegram_bot:
await telegram_queue.put(run_no)
else:
logging.debug(
"Skipping Telegram notification because it is disabled.",
)
except OperationalError:
# We handle DB errors on a higher level
raise
except Exception as e:
# This is our catch-all. Something really unexpected occurred.
# Log it with the highest priority and continue with the
# next scheduled run when it's due.
logger.critical(e)

task_queue.task_done()

# Schedule each scraper that is enabled
for scraper_class in get_all_scrapers():
for job in scraper_class.get_schedule():
if scraper_class.is_enabled():
# Enqueue the scraper job into the task queue with the
# scraper class and the database session as arguments
job.do(task_queue.put_nowait, scraper_class)

# Create the worker task that will run the next task in the queue when
# it is due
asyncio.create_task(worker())

# Run tasks once after startup
schedule.run_all()

# Then run the tasks in a loop according to their schedule
while True:
run_no += 1

logger.info(f"Starting scraping run #{run_no}.")

try:
await scrape_new_offers(db)
except OperationalError:
# We handle DB errors on a higher level
raise
except Exception as e:
# This is our catch-all. Something really unexpected occurred.
# Log it with the highest priority and continue with the
# next run.
logger.critical(e)

if time_between_runs == 0:
break

next_execution = datetime.now(tz=timezone.utc) + timedelta(
seconds=time_between_runs,
)

logger.info(f"Next scraping run will be at {next_execution.isoformat()}.")

if Config.get().telegram_bot:
await telegram_queue.put(run_no)

await asyncio.sleep(time_between_runs)

logger.info(f"Finished {run_no} runs")
logger.debug("Checking if there are tasks to schedule.")
schedule.run_pending()
await asyncio.sleep(1)
106 changes: 35 additions & 71 deletions src/lootscraper/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from sqlalchemy import select
from sqlalchemy.orm import Session

from lootscraper.browser import get_browser_context
from lootscraper.common import TIMESTAMP_LONG, OfferDuration, OfferType, Source
from lootscraper.config import Config
from lootscraper.database import Game, IgdbInfo, LootDatabase, Offer, SteamInfo, User
Expand All @@ -23,47 +22,6 @@
logger = logging.getLogger(__name__)


async def scrape_new_offers(db: LootDatabase) -> None:
"""Do the actual scraping and processing of new offers."""
context: BrowserContext
cfg = Config.get()

if len(cfg.enabled_offer_sources) > 0:
async with get_browser_context() as context:
session: Session = db.Session()
try:
scraped_offers = await scrape_offers(context)
await process_new_offers(db, context, session, scraped_offers)
session.commit()
except Exception:
session.rollback()
raise

if cfg.generate_feed:
await action_generate_feed(db)
else:
logging.info("Skipping feed generation because it is disabled.")


async def scrape_offers(context: BrowserContext) -> list[Offer]:
cfg = Config.get()

scraped_offers: list[Offer] = []
for scraper_type in get_all_scrapers():
if (
scraper_type.get_type() in cfg.enabled_offer_types
and scraper_type.get_duration() in cfg.enabled_offer_durations
and scraper_type.get_source() in cfg.enabled_offer_sources
):
scraper = scraper_type(context=context)
scraper_results = await scraper.scrape()

if len(scraper_results) > 0:
scraped_offers.extend(scraper_results)

return scraped_offers


async def process_new_offers(
db: LootDatabase,
context: BrowserContext,
Expand All @@ -81,37 +39,43 @@ async def process_new_offers(
nr_of_new_offers: int = 0
new_offer_titles: list[str] = []

for scraped_offer in scraped_offers:
# Get the existing entry if there is one
existing_entry: Offer | None = db.find_offer(
scraped_offer.source,
scraped_offer.type,
scraped_offer.duration,
scraped_offer.title,
scraped_offer.valid_to,
)

if not existing_entry:
if scraped_offer.title:
new_offer_titles.append(scraped_offer.title)

# The enddate has been changed or it is a new offer,
# get information about it (if it's a game)
# and insert it into the database
if cfg.scrape_info:
await add_game_info(scraped_offer, session, context)
try:
for scraped_offer in scraped_offers:
# Get the existing entry if there is one
existing_entry: Offer | None = db.find_offer(
scraped_offer.source,
scraped_offer.type,
scraped_offer.duration,
scraped_offer.title,
scraped_offer.valid_to,
)

# Insert the new offer into the database.
db.add_offer(scraped_offer)
nr_of_new_offers += 1
else:
# Update offers that already have been scraped.
db.touch_db_offer(existing_entry)
if not existing_entry:
if scraped_offer.title:
new_offer_titles.append(scraped_offer.title)

# The enddate has been changed or it is a new offer,
# get information about it (if it's a game)
# and insert it into the database
if cfg.scrape_info:
await add_game_info(scraped_offer, session, context)

# Insert the new offer into the database.
db.add_offer(scraped_offer)
nr_of_new_offers += 1
else:
# Update offers that already have been scraped.
db.touch_db_offer(existing_entry)

if new_offer_titles:
logging.info(
f'Found {nr_of_new_offers} new offers: {", ".join(new_offer_titles)}',
)

if new_offer_titles:
logging.info(
f'Found {nr_of_new_offers} new offers: {", ".join(new_offer_titles)}',
)
session.commit()
except Exception:
session.rollback()
raise


async def send_new_offers_telegram(db: LootDatabase, bot: TelegramBot) -> None:
Expand Down
Loading

0 comments on commit 81b5c22

Please sign in to comment.