Skip to content

Commit

Permalink
Merge pull request #39 from Gyarbij/dev
Browse files Browse the repository at this point in the history
2.1.1
  • Loading branch information
Gyarbij authored Nov 22, 2023
2 parents 38ef78a + c35c804 commit 98cfc64
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 112 deletions.
30 changes: 16 additions & 14 deletions plexist/modules/deezer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,22 @@ def _get_dz_playlists(
)
return playlists


def _get_dz_tracks_from_playlist(
dz: deezer.Client(),
playlist: Playlist,
) -> List[Track]:
"""Return list of tracks with metadata.
Args:
dz (deezer.Client): Deezer Client (no credentials needed)
playlist (Playlist): Playlist object
Returns:
List[Track]: list of Track objects with track metadata fields
"""
def _get_dz_tracks_from_playlist(
dz: deezer.Client(),
playlist: Playlist,
) -> List[Track]:
"""Return list of tracks with metadata.
Args:
dz (deezer.Client): Deezer Client (no credentials needed)
playlist (Playlist): Playlist object
Returns:
List[Track]: list of Track objects with track metadata fields
"""
dz_playlist = dz.get_playlist(playlist.id)
tracks = dz_playlist.get_tracks()
return [extract_dz_track_metadata(track) for track in tracks]

def extract_dz_track_metadata(track):
track = track.as_dict()
Expand Down
2 changes: 2 additions & 0 deletions plexist/modules/helperClasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class Track:
artist: str
album: str
url: str
year: str
genre: str


@dataclass
Expand Down
174 changes: 81 additions & 93 deletions plexist/modules/plex.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,57 +8,54 @@
import plexapi
from plexapi.exceptions import BadRequest, NotFound
from plexapi.server import PlexServer

from .helperClasses import Playlist, Track, UserInputs

logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Get connection object globally
conn = sqlite3.connect('matched_songs.db')

# Database functions
def initialize_db():
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS matched_songs (
title TEXT,
artist TEXT,
album TEXT,
year INTEGER,
genre TEXT,
plex_id INTEGER
)
''')
conn.commit()

def insert_matched_song(title, artist, album, plex_id):
cursor = conn.cursor()

cursor.execute('''
INSERT INTO matched_songs (title, artist, album, plex_id)
VALUES (?, ?, ?, ?)
''', (title, artist, album, plex_id))

conn.commit()

def get_matched_song(title, artist, album):
cursor = conn.cursor()

cursor.execute('''
SELECT plex_id FROM matched_songs
WHERE title = ? AND artist = ? AND album = ?
''', (title, artist, album))

result = cursor.fetchone()

conn = sqlite3.connect('plexist.db')

def initialize_db():
conn = sqlite3.connect('plexist.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS plexist (
title TEXT,
artist TEXT,
album TEXT,
year INTEGER,
genre TEXT,
plex_id INTEGER
)
''')
conn.commit()
conn.close()

def insert_matched_song(title, artist, album, plex_id):
conn = sqlite3.connect('plexist.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO plexist (title, artist, album, plex_id)
VALUES (?, ?, ?, ?)
''', (title, artist, album, plex_id))
conn.commit()
conn.close()

def get_matched_song(title, artist, album):
conn = sqlite3.connect('plexist.db')
cursor = conn.cursor()
cursor.execute('''
SELECT plex_id FROM plexist
WHERE title = ? AND artist = ? AND album = ?
''', (title, artist, album))
result = cursor.fetchone()
conn.close()
return result[0] if result else None


def _write_csv(tracks: List[Track], name: str, path: str = "/data") -> None:
data_folder = pathlib.Path(path)
data_folder.mkdir(parents=True, exist_ok=True)
file = data_folder / f"{name}.csv"

with open(file, "w", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(Track.__annotations__.keys())
Expand All @@ -67,7 +64,6 @@ def _write_csv(tracks: List[Track], name: str, path: str = "/data") -> None:
[track.title, track.artist, track.album, track.url]
)


def _delete_csv(name: str, path: str = "/data") -> None:
data_folder = pathlib.Path(path)
file = data_folder / f"{name}.csv"
Expand All @@ -79,53 +75,44 @@ def _delete_csv(name: str, path: str = "/data") -> None:
def _get_available_plex_tracks(plex: PlexServer, tracks: List[Track]) -> List:
with ThreadPoolExecutor() as executor:
results = list(executor.map(lambda track: _match_single_track(plex, track), tracks))

plex_tracks = [result[0] for result in results if result[0]]
missing_tracks = [result[1] for result in results if result[1]]

return plex_tracks, missing_tracks

MATCH_THRESHOLD = 0.8 # Set your own threshold

def _match_single_track(plex, track, year=None, genre=None):
# Check in local DB first
plex_id = get_matched_song(track.title, track.artist, track.album)
if plex_id:
return plex.fetchItem(plex_id), None

search = []
try:
# Combine track title, artist, and album for a more refined search
search_query = f"{track.title} {track.artist} {track.album}"
search = plex.search(search_query, mediatype="track", limit=5)
except BadRequest:
logging.info("Failed to search %s on Plex", track.title)

best_match = None
best_score = 0

for s in search:
artist_similarity = SequenceMatcher(None, s.artist().title.lower(), track.artist.lower()).quick_ratio()
title_similarity = SequenceMatcher(None, s.title.lower(), track.title.lower()).quick_ratio()
album_similarity = SequenceMatcher(None, s.album().title.lower(), track.album.lower()).quick_ratio()
year_similarity = 1 if year and s.year == year else 0
genre_similarity = SequenceMatcher(None, s.genre.lower(), genre.lower()).quick_ratio() if genre else 0

# Combine the scores (you can adjust the weights as needed)
combined_score = (artist_similarity * 0.4) + (title_similarity * 0.3) + (album_similarity * 0.2) + (year_similarity * 0.05) + (genre_similarity * 0.05)

if combined_score > best_score:
best_score = combined_score
best_match = s

if best_match and best_score >= MATCH_THRESHOLD:
# Insert into the local DB
insert_matched_song(track.title, track.artist, track.album, best_match.ratingKey)
return best_match, None
else:
logging.info(f"No match found for track {track.title} by {track.artist} with a score of {best_score}.")
return None, track

MATCH_THRESHOLD = 0.6

def _match_single_track(plex, track, year=None, genre=None):
# Check in local DB first
plex_id = get_matched_song(track.title, track.artist, track.album)
if plex_id:
return plex.fetchItem(plex_id), None

search = []
try:
# Combine track title and primary artist for a more refined search
primary_artist = track.artist.split("&")[0].split("ft.")[0].strip() # Get the primary artist
search_query = f"{track.title} {primary_artist}"
search = plex.search(search_query, mediatype="track", limit=5)
except BadRequest:
logging.info("Failed to search %s on Plex", track.title)
best_match = None
best_score = 0
for s in search:
artist_similarity = SequenceMatcher(None, s.artist().title.lower(), primary_artist.lower()).quick_ratio()
title_similarity = SequenceMatcher(None, s.title.lower(), track.title.lower()).quick_ratio()
album_similarity = SequenceMatcher(None, s.album().title.lower(), track.album.lower()).quick_ratio()
year_similarity = 1 if year and s.year == year else 0
genre_similarity = SequenceMatcher(None, s.genre.lower(), genre.lower()).quick_ratio() if genre else 0
combined_score = (artist_similarity * 0.4) + (title_similarity * 0.3) + (album_similarity * 0.2) + (year_similarity * 0.05) + (genre_similarity * 0.05)
if combined_score > best_score:
best_score = combined_score
best_match = s
if best_match and best_score >= MATCH_THRESHOLD:
insert_matched_song(track.title, track.artist, track.album, best_match.ratingKey)
return best_match, None
else:
logging.info(f"No match found for track {track.title} by {track.artist} with a score of {best_score}.")
return None, track

def _update_plex_playlist(
plex: PlexServer,
Expand All @@ -140,13 +127,16 @@ def _update_plex_playlist(
return plex_playlist


def update_or_create_plex_playlist(
plex: PlexServer,
playlist: Playlist,
tracks: List[Track],
userInputs: UserInputs,
) -> None:
available_tracks, missing_tracks = _get_available_plex_tracks(plex, tracks)
def update_or_create_plex_playlist(
plex: PlexServer,
playlist: Playlist,
tracks: List[Track],
userInputs: UserInputs,
) -> None:
if tracks is None:
logging.error("No tracks provided for playlist %s", playlist.name)
return
available_tracks, missing_tracks = _get_available_plex_tracks(plex, tracks)
if available_tracks:
try:
plex_playlist = _update_plex_playlist(
Expand All @@ -160,7 +150,6 @@ def update_or_create_plex_playlist(
plex.createPlaylist(title=playlist.name, items=available_tracks)
logging.info("Created playlist %s", playlist.name)
plex_playlist = plex.playlist(playlist.name)

if playlist.description and userInputs.add_playlist_description:
try:
plex_playlist.edit(summary=playlist.description)
Expand All @@ -179,7 +168,6 @@ def update_or_create_plex_playlist(
logging.info(
"Updated playlist %s with summary and poster", playlist.name
)

else:
logging.info(
"No songs for playlist %s were found on plex, skipping the"
Expand Down
8 changes: 3 additions & 5 deletions plexist/modules/spotify.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,16 @@ def extract_sp_track_metadata(track) -> Track:
artist = track["track"]["artists"][0]["name"]
album = track["track"]["album"]["name"]
url = track["track"]["external_urls"].get("spotify", "")
return Track(title, artist, album, url)

year = "" # Default value
genre = "" # Default value
return Track(title, artist, album, url, year, genre)
sp_playlist_tracks = sp.user_playlist_tracks(user_id, playlist.id)

tracks = list(
map(
extract_sp_track_metadata,
[i for i in sp_playlist_tracks["items"] if i.get("track")],
)
)

while sp_playlist_tracks["next"]:
sp_playlist_tracks = sp.next(sp_playlist_tracks)
tracks.extend(
Expand All @@ -67,7 +66,6 @@ def extract_sp_track_metadata(track) -> Track:
)
return tracks


def spotify_playlist_sync(
sp: spotipy.Spotify, plex: PlexServer, userInputs: UserInputs
) -> None:
Expand Down

0 comments on commit 98cfc64

Please sign in to comment.