diff --git a/plexist/modules/deezer.py b/plexist/modules/deezer.py index 8ed6b11..1143af4 100644 --- a/plexist/modules/deezer.py +++ b/plexist/modules/deezer.py @@ -65,20 +65,22 @@ def _get_dz_playlists( ) return playlists - -def _get_dz_tracks_from_playlist( - dz: deezer.Client(), - playlist: Playlist, -) -> List[Track]: - """Return list of tracks with metadata. - - Args: - dz (deezer.Client): Deezer Client (no credentials needed) - playlist (Playlist): Playlist object - - Returns: - List[Track]: list of Track objects with track metadata fields - """ +def _get_dz_tracks_from_playlist( + dz: deezer.Client(), + playlist: Playlist, +) -> List[Track]: + """Return list of tracks with metadata. + + Args: + dz (deezer.Client): Deezer Client (no credentials needed) + playlist (Playlist): Playlist object + + Returns: + List[Track]: list of Track objects with track metadata fields + """ + dz_playlist = dz.get_playlist(playlist.id) + tracks = dz_playlist.get_tracks() + return [extract_dz_track_metadata(track) for track in tracks] def extract_dz_track_metadata(track): track = track.as_dict() diff --git a/plexist/modules/helperClasses.py b/plexist/modules/helperClasses.py index d462381..58bd947 100644 --- a/plexist/modules/helperClasses.py +++ b/plexist/modules/helperClasses.py @@ -7,6 +7,8 @@ class Track: artist: str album: str url: str + year: str + genre: str @dataclass diff --git a/plexist/modules/plex.py b/plexist/modules/plex.py index fc2cd18..f38e5ba 100644 --- a/plexist/modules/plex.py +++ b/plexist/modules/plex.py @@ -8,49 +8,47 @@ import plexapi from plexapi.exceptions import BadRequest, NotFound from plexapi.server import PlexServer - from .helperClasses import Playlist, Track, UserInputs logging.basicConfig(stream=sys.stdout, level=logging.INFO) -# Get connection object globally -conn = sqlite3.connect('matched_songs.db') - -# Database functions -def initialize_db(): - cursor = conn.cursor() - cursor.execute(''' - CREATE TABLE IF NOT EXISTS matched_songs ( - title TEXT, - artist TEXT, - album TEXT, - year INTEGER, - genre TEXT, - plex_id INTEGER - ) - ''') - conn.commit() - -def insert_matched_song(title, artist, album, plex_id): - cursor = conn.cursor() - - cursor.execute(''' - INSERT INTO matched_songs (title, artist, album, plex_id) - VALUES (?, ?, ?, ?) - ''', (title, artist, album, plex_id)) - - conn.commit() - -def get_matched_song(title, artist, album): - cursor = conn.cursor() - - cursor.execute(''' - SELECT plex_id FROM matched_songs - WHERE title = ? AND artist = ? AND album = ? - ''', (title, artist, album)) - - result = cursor.fetchone() - +conn = sqlite3.connect('plexist.db') + +def initialize_db(): + conn = sqlite3.connect('plexist.db') + cursor = conn.cursor() + cursor.execute(''' + CREATE TABLE IF NOT EXISTS plexist ( + title TEXT, + artist TEXT, + album TEXT, + year INTEGER, + genre TEXT, + plex_id INTEGER + ) + ''') + conn.commit() + conn.close() + +def insert_matched_song(title, artist, album, plex_id): + conn = sqlite3.connect('plexist.db') + cursor = conn.cursor() + cursor.execute(''' + INSERT INTO plexist (title, artist, album, plex_id) + VALUES (?, ?, ?, ?) + ''', (title, artist, album, plex_id)) + conn.commit() + conn.close() + +def get_matched_song(title, artist, album): + conn = sqlite3.connect('plexist.db') + cursor = conn.cursor() + cursor.execute(''' + SELECT plex_id FROM plexist + WHERE title = ? AND artist = ? AND album = ? + ''', (title, artist, album)) + result = cursor.fetchone() + conn.close() return result[0] if result else None @@ -58,7 +56,6 @@ def _write_csv(tracks: List[Track], name: str, path: str = "/data") -> None: data_folder = pathlib.Path(path) data_folder.mkdir(parents=True, exist_ok=True) file = data_folder / f"{name}.csv" - with open(file, "w", encoding="utf-8") as csvfile: writer = csv.writer(csvfile) writer.writerow(Track.__annotations__.keys()) @@ -67,7 +64,6 @@ def _write_csv(tracks: List[Track], name: str, path: str = "/data") -> None: [track.title, track.artist, track.album, track.url] ) - def _delete_csv(name: str, path: str = "/data") -> None: data_folder = pathlib.Path(path) file = data_folder / f"{name}.csv" @@ -79,53 +75,44 @@ def _delete_csv(name: str, path: str = "/data") -> None: def _get_available_plex_tracks(plex: PlexServer, tracks: List[Track]) -> List: with ThreadPoolExecutor() as executor: results = list(executor.map(lambda track: _match_single_track(plex, track), tracks)) - plex_tracks = [result[0] for result in results if result[0]] missing_tracks = [result[1] for result in results if result[1]] - return plex_tracks, missing_tracks -MATCH_THRESHOLD = 0.8 # Set your own threshold - -def _match_single_track(plex, track, year=None, genre=None): - # Check in local DB first - plex_id = get_matched_song(track.title, track.artist, track.album) - if plex_id: - return plex.fetchItem(plex_id), None - - search = [] - try: - # Combine track title, artist, and album for a more refined search - search_query = f"{track.title} {track.artist} {track.album}" - search = plex.search(search_query, mediatype="track", limit=5) - except BadRequest: - logging.info("Failed to search %s on Plex", track.title) - - best_match = None - best_score = 0 - - for s in search: - artist_similarity = SequenceMatcher(None, s.artist().title.lower(), track.artist.lower()).quick_ratio() - title_similarity = SequenceMatcher(None, s.title.lower(), track.title.lower()).quick_ratio() - album_similarity = SequenceMatcher(None, s.album().title.lower(), track.album.lower()).quick_ratio() - year_similarity = 1 if year and s.year == year else 0 - genre_similarity = SequenceMatcher(None, s.genre.lower(), genre.lower()).quick_ratio() if genre else 0 - - # Combine the scores (you can adjust the weights as needed) - combined_score = (artist_similarity * 0.4) + (title_similarity * 0.3) + (album_similarity * 0.2) + (year_similarity * 0.05) + (genre_similarity * 0.05) - - if combined_score > best_score: - best_score = combined_score - best_match = s - - if best_match and best_score >= MATCH_THRESHOLD: - # Insert into the local DB - insert_matched_song(track.title, track.artist, track.album, best_match.ratingKey) - return best_match, None - else: - logging.info(f"No match found for track {track.title} by {track.artist} with a score of {best_score}.") - return None, track - +MATCH_THRESHOLD = 0.6 + +def _match_single_track(plex, track, year=None, genre=None): + # Check in local DB first + plex_id = get_matched_song(track.title, track.artist, track.album) + if plex_id: + return plex.fetchItem(plex_id), None + + search = [] + try: + # Combine track title and primary artist for a more refined search + primary_artist = track.artist.split("&")[0].split("ft.")[0].strip() # Get the primary artist + search_query = f"{track.title} {primary_artist}" + search = plex.search(search_query, mediatype="track", limit=5) + except BadRequest: + logging.info("Failed to search %s on Plex", track.title) + best_match = None + best_score = 0 + for s in search: + artist_similarity = SequenceMatcher(None, s.artist().title.lower(), primary_artist.lower()).quick_ratio() + title_similarity = SequenceMatcher(None, s.title.lower(), track.title.lower()).quick_ratio() + album_similarity = SequenceMatcher(None, s.album().title.lower(), track.album.lower()).quick_ratio() + year_similarity = 1 if year and s.year == year else 0 + genre_similarity = SequenceMatcher(None, s.genre.lower(), genre.lower()).quick_ratio() if genre else 0 + combined_score = (artist_similarity * 0.4) + (title_similarity * 0.3) + (album_similarity * 0.2) + (year_similarity * 0.05) + (genre_similarity * 0.05) + if combined_score > best_score: + best_score = combined_score + best_match = s + if best_match and best_score >= MATCH_THRESHOLD: + insert_matched_song(track.title, track.artist, track.album, best_match.ratingKey) + return best_match, None + else: + logging.info(f"No match found for track {track.title} by {track.artist} with a score of {best_score}.") + return None, track def _update_plex_playlist( plex: PlexServer, @@ -140,13 +127,16 @@ def _update_plex_playlist( return plex_playlist -def update_or_create_plex_playlist( - plex: PlexServer, - playlist: Playlist, - tracks: List[Track], - userInputs: UserInputs, -) -> None: - available_tracks, missing_tracks = _get_available_plex_tracks(plex, tracks) +def update_or_create_plex_playlist( + plex: PlexServer, + playlist: Playlist, + tracks: List[Track], + userInputs: UserInputs, +) -> None: + if tracks is None: + logging.error("No tracks provided for playlist %s", playlist.name) + return + available_tracks, missing_tracks = _get_available_plex_tracks(plex, tracks) if available_tracks: try: plex_playlist = _update_plex_playlist( @@ -160,7 +150,6 @@ def update_or_create_plex_playlist( plex.createPlaylist(title=playlist.name, items=available_tracks) logging.info("Created playlist %s", playlist.name) plex_playlist = plex.playlist(playlist.name) - if playlist.description and userInputs.add_playlist_description: try: plex_playlist.edit(summary=playlist.description) @@ -179,7 +168,6 @@ def update_or_create_plex_playlist( logging.info( "Updated playlist %s with summary and poster", playlist.name ) - else: logging.info( "No songs for playlist %s were found on plex, skipping the" diff --git a/plexist/modules/spotify.py b/plexist/modules/spotify.py index c6d126d..7a88d17 100644 --- a/plexist/modules/spotify.py +++ b/plexist/modules/spotify.py @@ -44,17 +44,16 @@ def extract_sp_track_metadata(track) -> Track: artist = track["track"]["artists"][0]["name"] album = track["track"]["album"]["name"] url = track["track"]["external_urls"].get("spotify", "") - return Track(title, artist, album, url) - + year = "" # Default value + genre = "" # Default value + return Track(title, artist, album, url, year, genre) sp_playlist_tracks = sp.user_playlist_tracks(user_id, playlist.id) - tracks = list( map( extract_sp_track_metadata, [i for i in sp_playlist_tracks["items"] if i.get("track")], ) ) - while sp_playlist_tracks["next"]: sp_playlist_tracks = sp.next(sp_playlist_tracks) tracks.extend( @@ -67,7 +66,6 @@ def extract_sp_track_metadata(track) -> Track: ) return tracks - def spotify_playlist_sync( sp: spotipy.Spotify, plex: PlexServer, userInputs: UserInputs ) -> None: