Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #16

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open

Dev #16

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Artie is a simple art scraper designed for Anbernic devices running MuOS. It hel
- Support for Box Art, Preview and Text (Synopsis).
- Option to delete all media for a selected system.
- It can work in other OSs as well with little adaptation.
- Supported media includes `["box-2D", "box-3D", "mixrbv1", "mixrbv2", "ss", "marquee"]`

## Installation / Usage

Expand Down
1 change: 1 addition & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"devid": "bWlsb3Vr",
"devpassword": "dVdDNVVsWHhaaFo=",
"threads": 1,
"show_scraped_roms": true,
"content": {
"synopsis": {
"enabled": true,
Expand Down
229 changes: 142 additions & 87 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import sys
import time
from pathlib import Path
from typing import List
from typing import Callable, List

import exceptions
import input
from graphic import GUI
from logger import LoggerSingleton as logger
Expand Down Expand Up @@ -78,6 +79,9 @@ def load_config(self, config_file):
self.password = self.config.get("screenscraper").get("password")
self.threads = self.config.get("screenscraper").get("threads")
self.content = self.config.get("screenscraper").get("content")
self.show_scraped_roms = self.config.get("screenscraper").get(
"show_scraped_roms"
)
self.box_enabled = self.content["box"]["enabled"]
self.preview_enabled = self.content["preview"]["enabled"]
self.synopsis_enabled = self.content["synopsis"]["enabled"]
Expand Down Expand Up @@ -150,26 +154,38 @@ def get_roms(self, system: str) -> list[Rom]:
roms.append(rom)
return roms

def delete_files_in_directory(self, filenames, directory_path):
def delete_files_in_directory(
self, filenames: List[str], directory_path: str
) -> None:
directory = Path(directory_path)
if directory.is_dir():
filenames_set = set(filenames)
for file in directory.iterdir():
if file.is_file() and file.stem in filenames:
if file.is_file() and file.stem in filenames_set:
file.unlink()

def get_enabled_media_types(self) -> List[str]:
return [
media
for media, enabled in [
("box", self.box_enabled),
("preview", self.preview_enabled),
("synopsis", self.synopsis_enabled),
]
if enabled
]

def delete_rom_media(self, rom: Rom, selected_system: str) -> None:
system = self.systems_mapping.get(selected_system)
if system:
for media_type in self.get_enabled_media_types():
self.delete_files_in_directory([rom.name], system.get(media_type, ""))

def delete_system_media(self) -> None:
global selected_system
system = self.systems_mapping.get(selected_system)
if system:
roms = [rom.name for rom in self.get_roms(selected_system)]
media_types = []
if self.box_enabled:
media_types.append("box")
if self.preview_enabled:
media_types.append("preview")
if self.synopsis_enabled:
media_types.append("synopsis")
for media_type in media_types:
for media_type in self.get_enabled_media_types():
self.delete_files_in_directory(roms, system.get(media_type, ""))

def draw_available_systems(self, available_systems: List[str]) -> None:
Expand Down Expand Up @@ -218,11 +234,10 @@ def load_emulators(self) -> None:
elif input.key_pressed("X"):
selected_system = available_systems[selected_position]
self.delete_system_media()
self.gui.draw_log(f"Deleting all existing {selected_system} media...")
self.gui.draw_log(f"Deleting all enabled {selected_system} media...")
self.gui.draw_paint()
skip_input_check = True
time.sleep(self.LOG_WAIT)
return
elif input.key_pressed("L1"):
if selected_position > 0:
selected_position = max(0, selected_position - max_elem)
Expand Down Expand Up @@ -276,45 +291,58 @@ def save_file_to_disk(self, data, destination):
return True

def get_user_threads(self):
user_info = get_user_data(
self.dev_id,
self.dev_password,
self.username,
self.password,
)
if not user_info:
self.threads = 1
else:
self.threads = min(self.threads, int(user_info["response"]["ssuser"]["maxthreads"]))
logger.log_info(f"number of threads: {self.threads}")

def scrape(self, rom, system_id):
scraped_box = scraped_preview = scraped_synopsis = None
try:
game = get_game_data(
system_id,
rom.path,
user_info = get_user_data(
self.dev_id,
self.dev_password,
self.username,
self.password,
)
max_threads = int(
user_info.get("response", {}).get("ssuser", {}).get("maxthreads", 1)
)
self.threads = min(self.threads, max_threads)
except exceptions.ScraperError as e:
error_msg = f"Error fetching user data: {e}"

if game:
content = self.content
if self.box_enabled:
scraped_box = fetch_box(game, content)
if self.preview_enabled:
scraped_preview = fetch_preview(game, content)
if self.synopsis_enabled:
scraped_synopsis = fetch_synopsis(game, content)
except Exception as e:
logger.log_error(f"Error scraping {rom.name}: {e}")
if error_msg:
self.threads = 1
logger.log_error(error_msg)
self.gui.draw_log(error_msg)
self.gui.draw_paint()
time.sleep(self.LOG_WAIT)
self.gui.draw_clear()

logger.log_info(f"number of threads: {self.threads}")

def scrape(self, rom, system_id):
scraped_box = scraped_preview = scraped_synopsis = None
game = get_game_data(
system_id,
rom.path,
self.dev_id,
self.dev_password,
self.username,
self.password,
)

if game:
content = self.content
if self.box_enabled:
scraped_box = fetch_box(game, content)
if self.preview_enabled:
scraped_preview = fetch_preview(game, content)
if self.synopsis_enabled:
scraped_synopsis = fetch_synopsis(game, content)

return scraped_box, scraped_preview, scraped_synopsis

def process_rom(self, rom, system_id, box_dir, preview_dir, synopsis_dir):
scraped_box, scraped_preview, scraped_synopsis = self.scrape(rom, system_id)
try:
scraped_box, scraped_preview, scraped_synopsis = self.scrape(rom, system_id)
except (exceptions.ForbiddenError, exceptions.RateLimitError) as e:
raise e

if scraped_box:
destination: Path = box_dir / f"{rom.name}.png"
self.save_file_to_disk(scraped_box, destination)
Expand All @@ -326,6 +354,21 @@ def process_rom(self, rom, system_id, box_dir, preview_dir, synopsis_dir):
self.save_file_to_disk(scraped_synopsis.encode("utf-8"), destination)
return scraped_box, scraped_preview, scraped_synopsis, rom.name

def get_roms_without_files(
self,
enabled: bool,
dir_path: Path,
roms_list: List[Rom],
get_files_func: Callable[[Path], List[str]],
) -> List[Rom]:
if enabled:
if not dir_path.exists():
dir_path.mkdir(parents=True, exist_ok=True)
return roms_list
files = get_files_func(dir_path)
return [rom for rom in roms_list if rom.name not in files]
return []

def load_roms(self) -> None:
global selected_position, current_window, roms_selected_position, skip_input_check, selected_system

Expand All @@ -351,41 +394,30 @@ def load_roms(self) -> None:
synopsis_dir = Path(system["synopsis"])
system_id = system["id"]

if self.box_enabled and not box_dir.exists():
box_dir.mkdir(parents=True, exist_ok=True)
roms_without_box: List[Rom] = roms_list
elif self.box_enabled:
box_files = get_image_files_without_extension(box_dir)
roms_without_box = [rom for rom in roms_list if rom.name not in box_files]
else:
roms_without_box = []

if self.preview_enabled and not preview_dir.exists():
preview_dir.mkdir(parents=True, exist_ok=True)
roms_without_preview: List[Rom] = roms_list
elif self.preview_enabled:
preview_files = get_image_files_without_extension(preview_dir)
roms_without_preview = [
rom for rom in roms_list if rom.name not in preview_files
]
else:
roms_without_preview = []

if self.synopsis_enabled and not synopsis_dir.exists():
synopsis_dir.mkdir(parents=True, exist_ok=True)
roms_without_synopsis: List[Rom] = roms_list
elif self.synopsis_enabled:
synopsis_files = get_txt_files_without_extension(synopsis_dir)
roms_without_synopsis = [
rom for rom in roms_list if rom.name not in synopsis_files
]
else:
roms_without_synopsis = []

roms_to_scrape = sorted(
list(set(roms_without_box + roms_without_preview + roms_without_synopsis)),
key=lambda rom: rom.name,
roms_without_box = self.get_roms_without_files(
self.box_enabled, box_dir, roms_list, get_image_files_without_extension
)
roms_without_preview = self.get_roms_without_files(
self.preview_enabled,
preview_dir,
roms_list,
get_image_files_without_extension,
)
roms_without_synopsis = self.get_roms_without_files(
self.synopsis_enabled,
synopsis_dir,
roms_list,
get_txt_files_without_extension,
)
if self.show_scraped_roms:
roms_to_scrape = roms_list
else:
roms_to_scrape = sorted(
list(
set(roms_without_box + roms_without_preview + roms_without_synopsis)
),
key=lambda rom: rom.name,
)

if len(roms_to_scrape) < 1:
current_window = "emulators"
Expand All @@ -402,18 +434,33 @@ def load_roms(self) -> None:
self.gui.draw_log("Scraping...")
self.gui.draw_paint()
rom = roms_to_scrape[roms_selected_position]
scraped_box, scraped_preview, scraped_synopsis, _ = self.process_rom(
rom, system_id, box_dir, preview_dir, synopsis_dir
)
error_msg = None

if not scraped_box and not scraped_preview and not scraped_synopsis:
try:
scraped_box, scraped_preview, scraped_synopsis, _ = self.process_rom(
rom, system_id, box_dir, preview_dir, synopsis_dir
)
except exceptions.ScraperError as e:
error_msg = e.get_message()

if error_msg:
self.gui.draw_log(error_msg)
elif not scraped_box and not scraped_preview and not scraped_synopsis:
self.gui.draw_log("Scraping failed!")
logger.log_error(f"Failed to get screenshot for {rom.name}")
else:
self.gui.draw_log("Scraping completed!")

self.gui.draw_paint()
time.sleep(self.LOG_WAIT)
exit_menu = True
elif input.key_pressed("X"):
rom = roms_to_scrape[roms_selected_position]
self.delete_rom_media(rom, selected_system)
self.gui.draw_log(f"Deleting all enabled {rom.name} media...")
self.gui.draw_paint()
skip_input_check = True
time.sleep(self.LOG_WAIT)
elif input.key_pressed("START"):
progress: int = 0
success: int = 0
Expand All @@ -434,9 +481,17 @@ def load_roms(self) -> None:
for rom in roms_to_scrape
}
for future in concurrent.futures.as_completed(futures):
scraped_box, scraped_preview, scraped_synopsis, rom_name = (
future.result()
)
try:
scraped_box, scraped_preview, scraped_synopsis, rom_name = (
future.result()
)
except (exceptions.RateLimitError, exceptions.ForbiddenError):
break
except exceptions.ScraperError as e:
logger.log_error(f"Error scraping {rom_name}: {e}")
failure += 1
continue

if scraped_box or scraped_preview or scraped_synopsis:
success += 1
else:
Expand Down Expand Up @@ -544,10 +599,10 @@ def load_roms(self) -> None:
i == (roms_selected_position % max_elem),
)
self.button_rectangle((30, 450), "Start", "All")
self.button_circle((170, 450), "A", "Download")
self.button_circle((300, 450), "B", "Back")
self.button_circle((480, 450), "M", "Exit")

self.button_circle((140, 450), "A", "Download")
self.button_circle((250, 450), "X", "Delete")
self.button_circle((370, 450), "B", "Back")
self.button_circle((500, 450), "M", "Exit")
self.gui.draw_paint()

def row_list(
Expand Down
19 changes: 19 additions & 0 deletions src/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from logger import LoggerSingleton as logger


class ScraperError(Exception):
def __init__(self, message):
super().__init__(message)
logger.log_error(message)

def get_message(self):
s, _ = getattr(self, "message", str(self)), getattr(self, "message", repr(self))
return s


class ForbiddenError(ScraperError):
pass


class RateLimitError(ScraperError):
pass
Loading