From 742c402c9843b9b2a8b735d29ab81d82a35ad562 Mon Sep 17 00:00:00 2001 From: Michael Loukeris Date: Tue, 22 Oct 2024 13:41:18 +0300 Subject: [PATCH 1/6] Fix synopsis when containing html chars. --- src/scraper.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/scraper.py b/src/scraper.py index c3fd671..8874091 100644 --- a/src/scraper.py +++ b/src/scraper.py @@ -1,5 +1,6 @@ import base64 import hashlib +import html import json import os import re @@ -230,5 +231,5 @@ def fetch_synopsis(game, config): (item["text"] for item in synopsis if item["langue"] == synopsis_lang), None ) if synopsis_text: - return synopsis_text + return html.unescape(synopsis_text) return None From 4978a4f042615b21f445df1dadc844deda5a8536 Mon Sep 17 00:00:00 2001 From: Michael Loukeris Date: Wed, 13 Nov 2024 19:05:20 +0200 Subject: [PATCH 2/6] Add marquee to valid media types --- src/scraper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scraper.py b/src/scraper.py index 8874091..a51c64d 100644 --- a/src/scraper.py +++ b/src/scraper.py @@ -14,7 +14,7 @@ USER_INFO_URL = "https://api.screenscraper.fr/api2/ssuserInfos.php" MAX_FILE_SIZE_BYTES = 104857600 # 100MB IMAGE_EXTENSIONS = [".jpg", ".jpeg", ".png"] -VALID_MEDIA_TYPES = {"box-2D", "box-3D", "mixrbv1", "mixrbv2", "ss"} +VALID_MEDIA_TYPES = {"box-2D", "box-3D", "mixrbv1", "mixrbv2", "ss", "marquee"} def get_image_files_without_extension(folder): From 31717322f6ecd4e3b71a98bc3beae0c86c94fa44 Mon Sep 17 00:00:00 2001 From: Michael Loukeris Date: Wed, 13 Nov 2024 22:11:01 +0200 Subject: [PATCH 3/6] Add option to show scraped roms and delete media per rom --- config.json | 1 + src/app.py | 133 +++++++++++++++++++++++++++++++--------------------- 2 files changed, 81 insertions(+), 53 deletions(-) diff --git a/config.json b/config.json index 6f0b9bd..9878b3f 100644 --- a/config.json +++ b/config.json @@ -15,6 +15,7 @@ "devid": "bWlsb3Vr", "devpassword": "dVdDNVVsWHhaaFo=", "threads": 1, + "show_scraped_roms": true, "content": { "synopsis": { "enabled": true, diff --git a/src/app.py b/src/app.py index 18c3e97..423f8e9 100644 --- a/src/app.py +++ b/src/app.py @@ -5,7 +5,7 @@ import sys import time from pathlib import Path -from typing import List +from typing import List, Callable import input from graphic import GUI @@ -78,6 +78,9 @@ def load_config(self, config_file): self.password = self.config.get("screenscraper").get("password") self.threads = self.config.get("screenscraper").get("threads") self.content = self.config.get("screenscraper").get("content") + self.show_scraped_roms = self.config.get("screenscraper").get( + "show_scraped_roms" + ) self.box_enabled = self.content["box"]["enabled"] self.preview_enabled = self.content["preview"]["enabled"] self.synopsis_enabled = self.content["synopsis"]["enabled"] @@ -150,26 +153,38 @@ def get_roms(self, system: str) -> list[Rom]: roms.append(rom) return roms - def delete_files_in_directory(self, filenames, directory_path): + def delete_files_in_directory( + self, filenames: List[str], directory_path: str + ) -> None: directory = Path(directory_path) if directory.is_dir(): + filenames_set = set(filenames) for file in directory.iterdir(): - if file.is_file() and file.stem in filenames: + if file.is_file() and file.stem in filenames_set: file.unlink() + def get_enabled_media_types(self) -> List[str]: + return [ + media + for media, enabled in [ + ("box", self.box_enabled), + ("preview", self.preview_enabled), + ("synopsis", self.synopsis_enabled), + ] + if enabled + ] + + def delete_rom_media(self, rom: Rom, selected_system: str) -> None: + system = self.systems_mapping.get(selected_system) + if system: + for media_type in self.get_enabled_media_types(): + self.delete_files_in_directory([rom.name], system.get(media_type, "")) + def delete_system_media(self) -> None: - global selected_system system = self.systems_mapping.get(selected_system) if system: roms = [rom.name for rom in self.get_roms(selected_system)] - media_types = [] - if self.box_enabled: - media_types.append("box") - if self.preview_enabled: - media_types.append("preview") - if self.synopsis_enabled: - media_types.append("synopsis") - for media_type in media_types: + for media_type in self.get_enabled_media_types(): self.delete_files_in_directory(roms, system.get(media_type, "")) def draw_available_systems(self, available_systems: List[str]) -> None: @@ -218,11 +233,10 @@ def load_emulators(self) -> None: elif input.key_pressed("X"): selected_system = available_systems[selected_position] self.delete_system_media() - self.gui.draw_log(f"Deleting all existing {selected_system} media...") + self.gui.draw_log(f"Deleting all enabled {selected_system} media...") self.gui.draw_paint() skip_input_check = True time.sleep(self.LOG_WAIT) - return elif input.key_pressed("L1"): if selected_position > 0: selected_position = max(0, selected_position - max_elem) @@ -285,7 +299,9 @@ def get_user_threads(self): if not user_info: self.threads = 1 else: - self.threads = min(self.threads, int(user_info["response"]["ssuser"]["maxthreads"])) + self.threads = min( + self.threads, int(user_info["response"]["ssuser"]["maxthreads"]) + ) logger.log_info(f"number of threads: {self.threads}") def scrape(self, rom, system_id): @@ -326,6 +342,21 @@ def process_rom(self, rom, system_id, box_dir, preview_dir, synopsis_dir): self.save_file_to_disk(scraped_synopsis.encode("utf-8"), destination) return scraped_box, scraped_preview, scraped_synopsis, rom.name + def get_roms_without_files( + self, + enabled: bool, + dir_path: Path, + roms_list: List[Rom], + get_files_func: Callable[[Path], List[str]], + ) -> List[Rom]: + if enabled: + if not dir_path.exists(): + dir_path.mkdir(parents=True, exist_ok=True) + return roms_list + files = get_files_func(dir_path) + return [rom for rom in roms_list if rom.name not in files] + return [] + def load_roms(self) -> None: global selected_position, current_window, roms_selected_position, skip_input_check, selected_system @@ -351,41 +382,30 @@ def load_roms(self) -> None: synopsis_dir = Path(system["synopsis"]) system_id = system["id"] - if self.box_enabled and not box_dir.exists(): - box_dir.mkdir(parents=True, exist_ok=True) - roms_without_box: List[Rom] = roms_list - elif self.box_enabled: - box_files = get_image_files_without_extension(box_dir) - roms_without_box = [rom for rom in roms_list if rom.name not in box_files] - else: - roms_without_box = [] - - if self.preview_enabled and not preview_dir.exists(): - preview_dir.mkdir(parents=True, exist_ok=True) - roms_without_preview: List[Rom] = roms_list - elif self.preview_enabled: - preview_files = get_image_files_without_extension(preview_dir) - roms_without_preview = [ - rom for rom in roms_list if rom.name not in preview_files - ] - else: - roms_without_preview = [] - - if self.synopsis_enabled and not synopsis_dir.exists(): - synopsis_dir.mkdir(parents=True, exist_ok=True) - roms_without_synopsis: List[Rom] = roms_list - elif self.synopsis_enabled: - synopsis_files = get_txt_files_without_extension(synopsis_dir) - roms_without_synopsis = [ - rom for rom in roms_list if rom.name not in synopsis_files - ] - else: - roms_without_synopsis = [] - - roms_to_scrape = sorted( - list(set(roms_without_box + roms_without_preview + roms_without_synopsis)), - key=lambda rom: rom.name, + roms_without_box = self.get_roms_without_files( + self.box_enabled, box_dir, roms_list, get_image_files_without_extension ) + roms_without_preview = self.get_roms_without_files( + self.preview_enabled, + preview_dir, + roms_list, + get_image_files_without_extension, + ) + roms_without_synopsis = self.get_roms_without_files( + self.synopsis_enabled, + synopsis_dir, + roms_list, + get_txt_files_without_extension, + ) + if self.show_scraped_roms: + roms_to_scrape = roms_list + else: + roms_to_scrape = sorted( + list( + set(roms_without_box + roms_without_preview + roms_without_synopsis) + ), + key=lambda rom: rom.name, + ) if len(roms_to_scrape) < 1: current_window = "emulators" @@ -414,6 +434,13 @@ def load_roms(self) -> None: self.gui.draw_paint() time.sleep(self.LOG_WAIT) exit_menu = True + elif input.key_pressed("X"): + rom = roms_to_scrape[roms_selected_position] + self.delete_rom_media(rom, selected_system) + self.gui.draw_log(f"Deleting all enabled {rom.name} media...") + self.gui.draw_paint() + skip_input_check = True + time.sleep(self.LOG_WAIT) elif input.key_pressed("START"): progress: int = 0 success: int = 0 @@ -544,10 +571,10 @@ def load_roms(self) -> None: i == (roms_selected_position % max_elem), ) self.button_rectangle((30, 450), "Start", "All") - self.button_circle((170, 450), "A", "Download") - self.button_circle((300, 450), "B", "Back") - self.button_circle((480, 450), "M", "Exit") - + self.button_circle((140, 450), "A", "Download") + self.button_circle((250, 450), "X", "Delete") + self.button_circle((370, 450), "B", "Back") + self.button_circle((500, 450), "M", "Exit") self.gui.draw_paint() def row_list( From ba0bd85efb0cb9d896c205661818cb84288f67dc Mon Sep 17 00:00:00 2001 From: Michael Loukeris Date: Sun, 1 Dec 2024 17:04:01 +0200 Subject: [PATCH 4/6] Add custom exceptions --- src/exceptions.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 src/exceptions.py diff --git a/src/exceptions.py b/src/exceptions.py new file mode 100644 index 0000000..0e78961 --- /dev/null +++ b/src/exceptions.py @@ -0,0 +1,19 @@ +from logger import LoggerSingleton as logger + + +class ScraperError(Exception): + def __init__(self, message): + super().__init__(message) + logger.log_error(message) + + def get_message(self): + s, _ = getattr(self, "message", str(self)), getattr(self, "message", repr(self)) + return s + + +class ForbiddenError(ScraperError): + pass + + +class RateLimitError(ScraperError): + pass From e1cdb729fe4aa82b9a4acb60330e1769d30f60b8 Mon Sep 17 00:00:00 2001 From: Michael Loukeris Date: Sun, 1 Dec 2024 17:04:31 +0200 Subject: [PATCH 5/6] Leverage custom exceptions for better exception handling --- src/app.py | 104 +++++++++++++-------- src/scraper.py | 238 ++++++++++++++++++++++++++----------------------- 2 files changed, 190 insertions(+), 152 deletions(-) diff --git a/src/app.py b/src/app.py index 423f8e9..da998da 100644 --- a/src/app.py +++ b/src/app.py @@ -5,8 +5,9 @@ import sys import time from pathlib import Path -from typing import List, Callable +from typing import Callable, List +import exceptions import input from graphic import GUI from logger import LoggerSingleton as logger @@ -290,47 +291,58 @@ def save_file_to_disk(self, data, destination): return True def get_user_threads(self): - user_info = get_user_data( - self.dev_id, - self.dev_password, - self.username, - self.password, - ) - if not user_info: - self.threads = 1 - else: - self.threads = min( - self.threads, int(user_info["response"]["ssuser"]["maxthreads"]) - ) - logger.log_info(f"number of threads: {self.threads}") - - def scrape(self, rom, system_id): - scraped_box = scraped_preview = scraped_synopsis = None try: - game = get_game_data( - system_id, - rom.path, + user_info = get_user_data( self.dev_id, self.dev_password, self.username, self.password, ) + max_threads = int( + user_info.get("response", {}).get("ssuser", {}).get("maxthreads", 1) + ) + self.threads = min(self.threads, max_threads) + except exceptions.ScraperError as e: + error_msg = f"Error fetching user data: {e}" + + if error_msg: + self.threads = 1 + logger.log_error(error_msg) + self.gui.draw_log(error_msg) + self.gui.draw_paint() + time.sleep(self.LOG_WAIT) + self.gui.draw_clear() + + logger.log_info(f"number of threads: {self.threads}") + + def scrape(self, rom, system_id): + scraped_box = scraped_preview = scraped_synopsis = None + game = get_game_data( + system_id, + rom.path, + self.dev_id, + self.dev_password, + self.username, + self.password, + ) - if game: - content = self.content - if self.box_enabled: - scraped_box = fetch_box(game, content) - if self.preview_enabled: - scraped_preview = fetch_preview(game, content) - if self.synopsis_enabled: - scraped_synopsis = fetch_synopsis(game, content) - except Exception as e: - logger.log_error(f"Error scraping {rom.name}: {e}") + if game: + content = self.content + if self.box_enabled: + scraped_box = fetch_box(game, content) + if self.preview_enabled: + scraped_preview = fetch_preview(game, content) + if self.synopsis_enabled: + scraped_synopsis = fetch_synopsis(game, content) return scraped_box, scraped_preview, scraped_synopsis def process_rom(self, rom, system_id, box_dir, preview_dir, synopsis_dir): - scraped_box, scraped_preview, scraped_synopsis = self.scrape(rom, system_id) + try: + scraped_box, scraped_preview, scraped_synopsis = self.scrape(rom, system_id) + except (exceptions.ForbiddenError, exceptions.RateLimitError) as e: + raise e + if scraped_box: destination: Path = box_dir / f"{rom.name}.png" self.save_file_to_disk(scraped_box, destination) @@ -422,15 +434,23 @@ def load_roms(self) -> None: self.gui.draw_log("Scraping...") self.gui.draw_paint() rom = roms_to_scrape[roms_selected_position] - scraped_box, scraped_preview, scraped_synopsis, _ = self.process_rom( - rom, system_id, box_dir, preview_dir, synopsis_dir - ) + error_msg = None + + try: + scraped_box, scraped_preview, scraped_synopsis, _ = self.process_rom( + rom, system_id, box_dir, preview_dir, synopsis_dir + ) + except exceptions.ScraperError as e: + error_msg = e.get_message() - if not scraped_box and not scraped_preview and not scraped_synopsis: + if error_msg: + self.gui.draw_log(error_msg) + elif not scraped_box and not scraped_preview and not scraped_synopsis: self.gui.draw_log("Scraping failed!") logger.log_error(f"Failed to get screenshot for {rom.name}") else: self.gui.draw_log("Scraping completed!") + self.gui.draw_paint() time.sleep(self.LOG_WAIT) exit_menu = True @@ -461,9 +481,17 @@ def load_roms(self) -> None: for rom in roms_to_scrape } for future in concurrent.futures.as_completed(futures): - scraped_box, scraped_preview, scraped_synopsis, rom_name = ( - future.result() - ) + try: + scraped_box, scraped_preview, scraped_synopsis, rom_name = ( + future.result() + ) + except (exceptions.RateLimitError, exceptions.ForbiddenError): + break + except exceptions.ScraperError as e: + logger.log_error(f"Error scraping {rom_name}: {e}") + failure += 1 + continue + if scraped_box or scraped_preview or scraped_synopsis: success += 1 else: diff --git a/src/scraper.py b/src/scraper.py index a51c64d..012afe9 100644 --- a/src/scraper.py +++ b/src/scraper.py @@ -5,33 +5,34 @@ import os import re from pathlib import Path +from typing import Any, List, Optional from urllib.parse import parse_qs, urlencode, urlparse, urlunparse +import exceptions import requests from logger import LoggerSingleton as logger GAME_INFO_URL = "https://api.screenscraper.fr/api2/jeuInfos.php" USER_INFO_URL = "https://api.screenscraper.fr/api2/ssuserInfos.php" MAX_FILE_SIZE_BYTES = 104857600 # 100MB -IMAGE_EXTENSIONS = [".jpg", ".jpeg", ".png"] +IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png"} VALID_MEDIA_TYPES = {"box-2D", "box-3D", "mixrbv1", "mixrbv2", "ss", "marquee"} -def get_image_files_without_extension(folder): +def get_image_files_without_extension(folder: str) -> List[str]: return [ f.stem for f in Path(folder).glob("*") if f.suffix.lower() in IMAGE_EXTENSIONS ] -def get_txt_files_without_extension(folder): - return [f.stem for f in Path(folder).glob("*") if f.suffix.lower() == ".txt"] +def get_txt_files_without_extension(folder: str) -> List[str]: + return [f.stem for f in Path(folder).glob("*.txt")] -def sha1sum(file_path): - file_size = os.path.getsize(file_path) - if file_size > MAX_FILE_SIZE_BYTES: - logger.log_warning(f"File {file_path} exceeds max file size limit.") - return "" +def sha1sum(file_path: str) -> str: + file_size_bytes = os.path.getsize(file_path) + if file_size_bytes > MAX_FILE_SIZE_BYTES: + raise exceptions.ScraperError(f"File {file_path} exceeds max file size limit.") hash_sha1 = hashlib.sha1() try: @@ -39,196 +40,205 @@ def sha1sum(file_path): for chunk in iter(lambda: f.read(4096), b""): hash_sha1.update(chunk) except IOError as e: - logger.log_error(f"Error reading file {file_path}: {e}") - return "" + raise exceptions.ScraperError(f"Error reading file {file_path}: {e}") + return hash_sha1.hexdigest() -def clean_rom_name(file_path): +def clean_rom_name(file_path: str) -> str: file_name = os.path.basename(file_path) - cleaned_name = re.sub( + cleaned = re.sub( r"(\.nkit|!|&|Disc |Rev |-|\s*\([^()]*\)|\s*\[[^\[\]]*\])", " ", os.path.splitext(file_name)[0], ) - return re.sub(r"\s+", " ", cleaned_name).strip() + return re.sub(r"\s+", " ", cleaned).strip() -def file_size(file_path): +def file_size(file_path: str) -> int: try: return os.path.getsize(file_path) except OSError as e: - logger.log_error(f"Error getting size of file {file_path}: {e}") - return None + raise exceptions.ScraperError(f"Error getting size of file {file_path}: {e}") -def parse_find_game_url(system_id, rom_path, dev_id, dev_password, username, password): - params = { - "devid": base64.b64decode(dev_id).decode(), - "devpassword": base64.b64decode(dev_password).decode(), - "softname": "crossmix", - "output": "json", - "ssid": username, - "sspassword": password, - "sha1": sha1sum(rom_path), - "systemeid": system_id, - "romtype": "rom", - "romnom": f"{clean_rom_name(rom_path)}.zip", - "romtaille": str(file_size(rom_path)), - } +def parse_find_game_url( + system_id: str, + rom_path: str, + dev_id: str, + dev_password: str, + username: str, + password: str, +) -> str: try: + params = { + "devid": base64.b64decode(dev_id).decode(), + "devpassword": base64.b64decode(dev_password).decode(), + "softname": "crossmix", + "output": "json", + "ssid": username, + "sspassword": password, + "sha1": sha1sum(rom_path), + "systemeid": system_id, + "romtype": "rom", + "romnom": f"{clean_rom_name(rom_path)}.zip", + "romtaille": str(file_size(rom_path)), + } return urlunparse(urlparse(GAME_INFO_URL)._replace(query=urlencode(params))) - except UnicodeDecodeError as e: - logger.log_debug("Params: %s") - logger.log_error(f"Error encoding URL: {e}. ROM params: {params}") - return None + except (UnicodeDecodeError, exceptions.ScraperError) as e: + logger.log_debug(f"Params: {params}") + raise exceptions.ScraperError(f"Error encoding URL: {e}. ROM params: {params}") -def parse_user_info_url(dev_id, dev_password, username, password): - params = { - "devid": base64.b64decode(dev_id).decode(), - "devpassword": base64.b64decode(dev_password).decode(), - "softname": "crossmix", - "output": "json", - "ssid": username, - "sspassword": password, - } +def parse_user_info_url( + dev_id: str, dev_password: str, username: str, password: str +) -> str: try: + params = { + "devid": base64.b64decode(dev_id).decode(), + "devpassword": base64.b64decode(dev_password).decode(), + "softname": "crossmix", + "output": "json", + "ssid": username, + "sspassword": password, + } return urlunparse(urlparse(USER_INFO_URL)._replace(query=urlencode(params))) except UnicodeDecodeError as e: - logger.log_error(f"Error encoding URL: {e}. User info params: {params}") - return None + raise exceptions.ScraperError( + f"Error encoding URL: {e}. User info params: {params}" + ) -def find_media_url_by_region(medias, media_type, regions): +def find_media_url_by_region( + medias: List[dict], media_type: str, regions: List[str] +) -> str: for region in regions: for media in medias: - if media["type"] == media_type and media["region"] == region: - return media["url"] - logger.log_error(f"Media not found for regions: {regions}") - return None + if media.get("type") == media_type and media.get("region") == region: + url = media.get("url") + if url is None: + raise exceptions.ScraperError( + f"Media URL not found for type '{media_type}' and region '{region}'" + ) + return url + raise exceptions.ScraperError( + f"Media of type '{media_type}' not found for regions: {regions}" + ) -def add_wh_to_media_url(media_url, width, height): - parsed_url = urlparse(media_url) - query = parse_qs(parsed_url.query) +def add_wh_to_media_url(media_url: str, width: int, height: int) -> str: + parsed = urlparse(media_url) + query = parse_qs(parsed.query) query.update({"maxwidth": [str(width)], "maxheight": [str(height)]}) - return urlunparse(parsed_url._replace(query=urlencode(query, doseq=True))) + return urlunparse(parsed._replace(query=urlencode(query, doseq=True))) -def is_media_type_valid(media_type): +def is_media_type_valid(media_type: str) -> bool: if media_type not in VALID_MEDIA_TYPES: - logger.log_error(f"Unknown media type: {media_type}") - return False + raise exceptions.ScraperError(f"Unknown media type: {media_type}") return True -def check_destination(dest): - if os.path.exists(dest): - logger.log_error(f"Destination file already exists: {dest}") - return None +def check_destination(dest: str) -> None: dest_dir = os.path.dirname(dest) try: os.makedirs(dest_dir, exist_ok=True) except OSError as e: - logger.log_error(f"Error creating directory {dest_dir}: {e}") - return None + raise exceptions.ScraperError(f"Error creating directory {dest_dir}: {e}") -def get(url): +def get(url: str) -> bytes: with requests.Session() as session: try: response = session.get(url, timeout=10) response.raise_for_status() + return response.content except requests.Timeout: - logger.log_error("Request timed out") - return None + raise exceptions.ScraperError("Request timed out") + except requests.HTTPError as e: + status = e.response.status_code + if status == 403: + raise exceptions.ForbiddenError("Forbidden request") + elif status == 430: + raise exceptions.RateLimitError("Rate Limit Exceeded") + else: + raise exceptions.ScraperError(f"HTTP error: {e}") except requests.RequestException as e: - logger.log_error(f"Error making HTTP request: {e}") - return None - return response.content + raise exceptions.ScraperError(f"HTTP request failed: {e}") -def fetch_data(url): - try: - body = get(url) - if not body: - logger.log_error("Empty response body") - return None - - body_str = body.decode("utf-8") - if "API closed" in body_str: - logger.log_error("API is closed") - return None - if "Erreur" in body_str: - logger.log_error("Error found in response: %s", body_str) - return None +def fetch_data(url: str) -> Any: + body = get(url) + body_str = body.decode("utf-8") + if not body_str: + raise exceptions.ScraperError("Empty response body") + if any(err in body_str for err in ["API closed", "Erreur"]): + raise exceptions.ScraperError(f"Error found in response: {body_str}") + try: return json.loads(body_str) except json.JSONDecodeError as e: - logger.log_error(f"Error decoding JSON response: {e}") - except Exception as e: - logger.log_error(f"Error fetching data from URL: {e}") - return None + raise exceptions.ScraperError(f"JSON decode error: {e}") -def get_game_data(system_id, rom_path, dev_id, dev_password, username, password): +def get_game_data( + system_id: str, + rom_path: str, + dev_id: str, + dev_password: str, + username: str, + password: str, +) -> Any: game_url = parse_find_game_url( system_id, rom_path, dev_id, dev_password, username, password ) return fetch_data(game_url) -def get_user_data(dev_id, dev_password, username, password): +def get_user_data(dev_id: str, dev_password: str, username: str, password: str) -> Any: user_info_url = parse_user_info_url(dev_id, dev_password, username, password) return fetch_data(user_info_url) -def _fetch_media(medias, properties, regions): +def _fetch_media(medias: List[dict], properties: dict, regions: List[str]) -> bytes: media_type = properties["type"] media_height = properties["height"] media_width = properties["width"] - if not is_media_type_valid(media_type): - return None - + is_media_type_valid(media_type) media_url = find_media_url_by_region(medias, media_type, regions) - if media_url: - media_url = add_wh_to_media_url(media_url, media_width, media_height) - return get(media_url) - return None + media_url = add_wh_to_media_url(media_url, media_width, media_height) + return get(media_url) -def fetch_box(game, config): +def fetch_box(game: dict, config: dict) -> Optional[bytes]: medias = game["response"]["jeu"]["medias"] regions = config.get("regions", ["us", "ame", "wor"]) - box = _fetch_media(medias, config["box"], regions) - if not box: - logger.log_error(f"Error downloading box: {game['response']['jeu']['medias']}") - return None - return box + try: + box = _fetch_media(medias, config["box"], regions) + return box + except exceptions.ScraperError as e: + raise exceptions.ScraperError(f"Error downloading box: {e}") -def fetch_preview(game, config): +def fetch_preview(game: dict, config: dict) -> Optional[bytes]: medias = game["response"]["jeu"]["medias"] regions = config.get("regions", ["us", "ame", "wor"]) - preview = _fetch_media(medias, config["preview"], regions) - if not preview: - logger.log_error( - f"Error downloading preview: {game['response']['jeu']['medias']}" - ) - return None - return preview + try: + preview = _fetch_media(medias, config["preview"], regions) + return preview + except exceptions.ScraperError as e: + raise exceptions.ScraperError(f"Error downloading preview: {e}") -def fetch_synopsis(game, config): +def fetch_synopsis(game: dict, config: dict) -> Optional[str]: synopsis = game["response"]["jeu"].get("synopsis") if not synopsis: return None synopsis_lang = config["synopsis"]["lang"] synopsis_text = next( - (item["text"] for item in synopsis if item["langue"] == synopsis_lang), None + (item["text"] for item in synopsis if item.get("langue") == synopsis_lang), None ) if synopsis_text: return html.unescape(synopsis_text) From c078954809c6471144b297e81aa8e45bf79ca057 Mon Sep 17 00:00:00 2001 From: Michael Loukeris Date: Sun, 1 Dec 2024 17:07:11 +0200 Subject: [PATCH 6/6] Update README to include supported media types --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index c2c465b..e5180de 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ Artie is a simple art scraper designed for Anbernic devices running MuOS. It hel - Support for Box Art, Preview and Text (Synopsis). - Option to delete all media for a selected system. - It can work in other OSs as well with little adaptation. +- Supported media includes `["box-2D", "box-3D", "mixrbv1", "mixrbv2", "ss", "marquee"]` ## Installation / Usage