From 0c0cac880e75cf494ffa80bfc1b8523306992bad Mon Sep 17 00:00:00 2001 From: "Gyarbij (Chono N)" <49493993+Gyarbij@users.noreply.github.com> Date: Sat, 31 Aug 2024 16:54:38 +0200 Subject: [PATCH] chore: Refactor plex.py for improved matching and logging --- plexist/modules/plex.py | 138 +++++++++++++++++++++++----------------- 1 file changed, 80 insertions(+), 58 deletions(-) diff --git a/plexist/modules/plex.py b/plexist/modules/plex.py index ea5bc80..1248d2f 100644 --- a/plexist/modules/plex.py +++ b/plexist/modules/plex.py @@ -16,21 +16,28 @@ DB_PATH = os.getenv('DB_PATH', 'plexist.db') -def initialize_db(): - conn = sqlite3.connect('plexist.db') - cursor = conn.cursor() - cursor.execute(''' - CREATE TABLE IF NOT EXISTS plexist ( - title TEXT, - artist TEXT, - album TEXT, - year INTEGER, - genre TEXT, - plex_id INTEGER - ) - ''') - conn.commit() - conn.close() +def initialize_db(): + conn = sqlite3.connect('plexist.db') + cursor = conn.cursor() + cursor.execute(''' + CREATE TABLE IF NOT EXISTS plexist ( + title TEXT, + artist TEXT, + album TEXT, + year INTEGER, + genre TEXT, + plex_id INTEGER + ) + ''') + conn.commit() + conn.close() + +def _get_available_plex_tracks(plex: PlexServer, tracks: List[Track]) -> List: + with ThreadPoolExecutor() as executor: + results = list(executor.map(lambda track: _match_single_track(plex, track), tracks)) + plex_tracks = [result[0] for result in results if result[0]] + missing_tracks = [result[1] for result in results if result[1]] + return plex_tracks, missing_tracks def _match_single_track(plex, track, year=None, genre=None): plex_id = get_matched_song(track.title, track.artist, track.album) @@ -40,9 +47,9 @@ def _match_single_track(plex, track, year=None, genre=None): def similarity(a, b): return SequenceMatcher(None, a.lower(), b.lower()).ratio() - def search_and_score(query, threshold): + def search_and_score(query, threshold, year_weight=0, genre_weight=0): try: - search = plex.search(query, mediatype="track", limit=10) + search = plex.search(query, mediatype="track", limit=20) # Increased limit except BadRequest: logging.info(f"Failed to search {query} on Plex") return None, 0 @@ -55,7 +62,7 @@ def search_and_score(query, threshold): score += similarity(s.title, track.title) * 0.4 score += similarity(s.artist().title, track.artist) * 0.3 score += similarity(s.album().title, track.album) * 0.2 - + # Check for version in parentheses if '(' in track.title and '(' in s.title: version_similarity = similarity( @@ -64,50 +71,63 @@ def search_and_score(query, threshold): ) score += version_similarity * 0.1 + # Year and Genre Matching (if available) + if year and s.year: + score += (s.year == year) * year_weight + if genre and s.genres: + genre_matches = any(similarity(g.tag, genre) > 0.8 for g in s.genres) + score += genre_matches * genre_weight + if score > best_score: best_score = score best_match = s return (best_match, best_score) if best_score >= threshold else (None, 0) - # Stage 1: Strict matching + # Stage 1: Strict matching (including year and genre if available) query = f"{track.title} {track.artist} {track.album}" - match, score = search_and_score(query, 0.8) + match, score = search_and_score(query, 0.85, year_weight=0.2, genre_weight=0.1) # Adjusted weights if match: insert_matched_song(track.title, track.artist, track.album, match.ratingKey) return match, None # Stage 2: Relax album requirement query = f"{track.title} {track.artist}" - match, score = search_and_score(query, 0.7) + match, score = search_and_score(query, 0.75) # Slightly higher threshold if match: logging.info(f"Matched '{track.title}' by '{track.artist}' with relaxed album criteria. Score: {score}") insert_matched_song(track.title, track.artist, track.album, match.ratingKey) return match, None - # Stage 3: Further relaxation + # Stage 3: Further relaxation (partial title) words = track.title.split() if len(words) > 1: query = f"{' '.join(words[:2])} {track.artist}" - match, score = search_and_score(query, 0.4) + match, score = search_and_score(query, 0.6) # Increased threshold if match: logging.info(f"Matched '{track.title}' by '{track.artist}' with partial title. Score: {score}") insert_matched_song(track.title, track.artist, track.album, match.ratingKey) return match, None + # Stage 4: Artist Only Match (for compilations, soundtracks, etc.) + query = f"{track.artist}" + match, score = search_and_score(query, 0.65) + if match: + logging.info(f"Matched '{track.title}' by '{track.artist}' with artist only. Score: {score}") + insert_matched_song(track.title, track.artist, track.album, match.ratingKey) + return match, None + + # Stage 5: Title Only Match (last resort) + query = f"{track.title}" + match, score = search_and_score(query, 0.55) + if match: + logging.info(f"Matched '{track.title}' by '{track.artist}' with title only. Score: {score}") + insert_matched_song(track.title, track.artist, track.album, match.ratingKey) + return match, None + logging.info(f"No match found for track {track.title} by {track.artist}.") return None, track - -def insert_matched_song(title, artist, album, plex_id): - conn = sqlite3.connect('plexist.db') - cursor = conn.cursor() - cursor.execute(''' - INSERT INTO plexist (title, artist, album, plex_id) - VALUES (?, ?, ?, ?) - ''', (title, artist, album, plex_id)) - conn.commit() - conn.close() - + def get_matched_song(title, artist, album): conn = sqlite3.connect('plexist.db') cursor = conn.cursor() @@ -119,30 +139,15 @@ def get_matched_song(title, artist, album): conn.close() return result[0] if result else None - -def _write_csv(tracks: List[Track], name: str, path: str = "/data") -> None: - data_folder = pathlib.Path(path) - data_folder.mkdir(parents=True, exist_ok=True) - file = data_folder / f"{name}.csv" - with open(file, "w", encoding="utf-8") as csvfile: - writer = csv.writer(csvfile) - writer.writerow(Track.__annotations__.keys()) - for track in tracks: - writer.writerow( - [track.title, track.artist, track.album, track.url] - ) - -def _delete_csv(name: str, path: str = "/data") -> None: - data_folder = pathlib.Path(path) - file = data_folder / f"{name}.csv" - file.unlink() - -def _get_available_plex_tracks(plex: PlexServer, tracks: List[Track]) -> List: - with ThreadPoolExecutor() as executor: - results = list(executor.map(lambda track: _match_single_track(plex, track), tracks)) - plex_tracks = [result[0] for result in results if result[0]] - missing_tracks = [result[1] for result in results if result[1]] - return plex_tracks, missing_tracks +def insert_matched_song(title, artist, album, plex_id): + conn = sqlite3.connect('plexist.db') + cursor = conn.cursor() + cursor.execute(''' + INSERT INTO plexist (title, artist, album, plex_id) + VALUES (?, ?, ?, ?) + ''', (title, artist, album, plex_id)) + conn.commit() + conn.close() def _update_plex_playlist( plex: PlexServer, @@ -213,6 +218,23 @@ def update_or_create_plex_playlist( except Exception as e: logging.error("Failed to delete %s.csv: %s", playlist.name, str(e)) +def _write_csv(tracks: List[Track], name: str, path: str = "/data") -> None: + data_folder = pathlib.Path(path) + data_folder.mkdir(parents=True, exist_ok=True) + file = data_folder / f"{name}.csv" + with open(file, "w", encoding="utf-8") as csvfile: + writer = csv.writer(csvfile) + writer.writerow(Track.__annotations__.keys()) + for track in tracks: + writer.writerow( + [track.title, track.artist, track.album, track.url] + ) + +def _delete_csv(name: str, path: str = "/data") -> None: + data_folder = pathlib.Path(path) + file = data_folder / f"{name}.csv" + file.unlink() + def end_session(): if 'conn' in locals() or 'conn' in globals(): conn.close()