Skip to content

Commit

Permalink
Improved compare tools utilites, added same metadata checker, fixed, …
Browse files Browse the repository at this point in the history
…some typos, reverted cli behavior, WATCH_ID
  • Loading branch information
Egezenn committed Dec 26, 2024
1 parent d08f393 commit 64c9b68
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 122 deletions.
7 changes: 7 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@
"request": "launch",
"module": "ytmasc",
"args": ["db-replace-fails", "-v", "i"]
},
{
"name": "YTMASC: db-find-same",
"type": "debugpy",
"request": "launch",
"module": "ytmasc",
"args": ["db-find-same", "-v", "i"]
}
]
}
8 changes: 4 additions & 4 deletions ytmasc/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

def convert_bulk(json: dict):
fail_amount = 0
for i, key in enumerate(json.keys(), start=1):
for i, watch_id in enumerate(json.keys(), start=1):
logger.info(f"<<< CONVERSION {i} >>>")
fail_state = convert(key)
fail_state = convert(watch_id)
logger.info(f">>> CONVERSION {i} <<<")
fail_amount += fail_state

Expand All @@ -24,8 +24,8 @@ def convert_bulk(json: dict):
pass


def convert(key: str):
file_name = key
def convert(watch_id: str):
file_name = watch_id
output_audio_file = path.join(file_name + audio_conversion_ext)
output_audio_file_path = path.join(download_path, output_audio_file)

Expand Down
97 changes: 65 additions & 32 deletions ytmasc/database_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
"""
truly sorry for the abominations here
`[:-4]`
`.split(".")[0]`
`next(iter())`
"""

from logging import getLogger
from os import listdir, mkdir, path, rename, system
from random import shuffle
from re import search
from time import sleep

Expand All @@ -17,9 +23,9 @@
new_music_library,
old_music_library,
)
from ytmasc.intermediates import update_library_for_key
from ytmasc.intermediates import update_library_for_watch_id
from ytmasc.utility import (
count_key_amount_in_json,
count_watch_id_amount_in_json,
download_path,
fail_log_path,
library_data_path,
Expand Down Expand Up @@ -57,27 +63,27 @@ def compare():

# comparisons
system("clear")
for i, data_OLD in enumerate(old_database, start=1):
for i, data_old in enumerate(old_database, start=1):
scores = []
old_title = list(data_OLD.items())[0][0]
old_artist = list(data_OLD.items())[0][1]

for data_NEW in new_database:
NEW_title = list(data_NEW.items())[0][0]
NEW_artist = list(data_NEW.items())[0][1]
title_score = fuzz.ratio(old_title.lower(), NEW_title.lower())
artist_score = fuzz.ratio(old_artist.lower(), NEW_artist.lower())
old_title = data_old[next(iter(data_old))]["title"]
old_artist = data_old[next(iter(data_old))]["artist"]

for data_new in new_database:
new_title = data_new[next(iter(data_new))]["title"]
new_artist = data_new[next(iter(data_new))]["artist"]
title_score = fuzz.ratio(old_title.lower(), new_title.lower())
artist_score = fuzz.ratio(old_artist.lower(), new_artist.lower())
scores.append(
{
title_score: {
"ツ": artist_score,
"title": NEW_title,
"artist": NEW_artist,
"title": new_title,
"artist": new_artist,
}
}
)

file = path.join(old_music_library, f"{old_title}.mp3")
file = path.join(old_music_library, f"{next(iter(data_old))}.mp3")
sorted_data_title = utils.sort_based_on_score(scores, "title_score")
sorted_data_artist = utils.sort_based_on_score(scores, "artist_score")

Expand All @@ -88,7 +94,7 @@ def compare():
):
system("clear")
print(f"{operation_zfill_print(i, old_file_amt)}\nremove: {file}\n")
rename(file, path.join("!removal", f"{old_title}.mp3"))
rename(file, path.join(files_to_remove, f"{next(iter(data_old))}.mp3"))

# user decisions
else:
Expand All @@ -114,13 +120,17 @@ def compare():
if input_key == "r":
system("clear")
print(f"{operation_zfill_print(i, old_file_amt)}\nremove: {file}\n")
rename(file, path.join("!remove", f"{old_title}.mp3"))
rename(
file, path.join(files_to_remove, f"{next(iter(data_old))}.mp3")
)
elif input_key == "k":
system("clear")
print(
f"{operation_zfill_print(i, old_file_amt)}/{old_file_amt}\nkeep: {file}\n"
)
rename(file, path.join("!keep", f"{old_title}.mp3"))
rename(
file, path.join(files_to_keep, f"{next(iter(data_old))}.mp3")
)
elif input_key == "i":
system("clear")
print(
Expand All @@ -133,20 +143,30 @@ def compare():
# wait for key up


def find_unpaired_files():
files = listdir(download_path)

mp3_files = {f[:-4] for f in files if f.endswith(".mp3")}
jpg_files = {f[:-4] for f in files if f.endswith(".jpg")}

unpaired_mp3 = mp3_files - jpg_files
unpaired_jpg = jpg_files - mp3_files

print("Unpaired MP3 files:", *unpaired_mp3)
print("Unpaired JPG files:", *unpaired_jpg)
def find_same_metadata():
# TODO add functionality to remove either one, create a blacklist and add that to it
utils = ComparisonUtilities()
data = utils.create_new_database()

for watch_id in data:
for watch_id2 in data:
if watch_id != watch_id2:
artist_score = fuzz.ratio(
watch_id[next(iter(watch_id))]["artist"],
watch_id2[next(iter(watch_id2))]["artist"],
)
title_score = fuzz.ratio(
watch_id[next(iter(watch_id))]["title"],
watch_id2[next(iter(watch_id2))]["title"],
)
if artist_score == 100 and title_score == 100:
print(
f"{next(iter(watch_id))} and {next(iter(watch_id2))} are same."
)


def replace_fails():
# TODO add functionality to replace the watch id on the library with the users choice, blacklist the bad one
utils = FailReplacementUtilities()
lines = read_txt_as_list(fail_log_path)
for line in lines:
Expand All @@ -161,14 +181,14 @@ def replace_fails():
utils.insert_data(table, *result)
print(table)
input_key = ""
while input_key not in ["q"]:
while input_key not in ["esc"]:
input_key = read_key()


def replace_current_metadata_with_youtube(skip_until=-1):
# TODO do the skip amount properly, theres some offset to it, too lazy to debug it
json_data = read_json(library_data_path)
total_operations = count_key_amount_in_json(library_data_path)
total_operations = count_watch_id_amount_in_json(library_data_path)
for i, watch_id in enumerate(json_data, start=1):
if i + 1 <= skip_until:
continue
Expand All @@ -181,7 +201,7 @@ def replace_current_metadata_with_youtube(skip_until=-1):
f"Got metadata for {watch_id}: artist: {artist}, title: {title}"
)

json_data = update_library_for_key(
json_data = update_library_for_watch_id(
json_data, watch_id, artist, title, overwrite=True
)
except:
Expand All @@ -191,3 +211,16 @@ def replace_current_metadata_with_youtube(skip_until=-1):
break

write_json(library_data_path, json_data)


def find_unpaired_files():
files = listdir(download_path)

mp3_files = {f[:-4] for f in files if f.endswith(".mp3")}
jpg_files = {f[:-4] for f in files if f.endswith(".jpg")}

unpaired_mp3 = mp3_files - jpg_files
unpaired_jpg = jpg_files - mp3_files

print("Unpaired MP3 files:", *unpaired_mp3)
print("Unpaired JPG files:", *unpaired_jpg)
55 changes: 29 additions & 26 deletions ytmasc/database_utilities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
from logging import getLogger

Expand All @@ -17,43 +16,45 @@


class ComparisonUtilities:
def list_mp3(self, dir: str) -> list:
def list_mp3(self, dir: str) -> list[list[dict], int]:
filtered = [f for f in os.listdir(dir) if f.endswith(audio_conversion_ext)]

return filtered

# will fail without the fallback
def create_old_database(self, title_filename_fallback=False):
old_files = self.list_mp3(old_music_library)
old_database = []
for OLD_song in old_files:
data = EasyID3(os.path.join(old_music_library, OLD_song))
for old_song in old_files:
data = EasyID3(os.path.join(old_music_library, old_song))
if title_filename_fallback:
title = OLD_song.split(".")[0] # make a switch based on an user input
title = old_song.split(".")[0] # make a switch based on an user input
else:
title = data.get("Title")[0]
artist = data.get("Artist")[0]
old_database.append({title: artist})
old_database.append({old_song[:-4]: {"artist": artist, "title": title}})

old_file_amt = 0
for _ in old_files:
old_file_amt += 1

return old_database, old_file_amt

def create_new_database(self):
def create_new_database(self) -> list[dict]:
new_files = self.list_mp3(new_music_library)
new_database = []
for NEW_song in new_files:
data = EasyID3(os.path.join(new_music_library, NEW_song))
for new_song in new_files:
data = EasyID3(os.path.join(new_music_library, new_song))
title = data.get("Title")[0]
if data.get("Artist") is not None:
artist = data.get("Artist")[0]
else:
artist = "░" # so that there's no unpacking errors or something xd
new_database.append({title: artist})
new_database.append({new_song[:-4]: {"artist": artist, "title": title}})

return new_database

def sort_based_on_score(self, scores, by_which):
def sort_based_on_score(self, scores, by_which) -> list:
if by_which == "title_score":
sorted_data = sorted(
scores,
Expand All @@ -67,6 +68,7 @@ def sort_based_on_score(self, scores, by_which):
key=lambda x: (x[next(iter(x))]["ツ"], int(next(iter(x)))),
reverse=True,
)

return sorted_data

def init_table(self) -> classmethod:
Expand Down Expand Up @@ -153,19 +155,6 @@ def init_table(self) -> classmethod:

return table

def remove_duplicates_by_second_item(self, list_of_lists: list) -> list:
seen = set()
result = []

for sublist in list_of_lists:
if len(sublist) > 1:
second_item = sublist[1]
if second_item not in seen:
seen.add(second_item)
result.append(sublist)

return result

def insert_data(
self, table: classmethod, artist: str, watch_id: str, title: str, album: str
):
Expand All @@ -179,6 +168,20 @@ def insert_data(
)


def remove_duplicates_by_second_item(list_of_lists: list) -> list:
seen = set()
result = []

for sublist in list_of_lists:
if len(sublist) > 1:
second_item = sublist[1]
if second_item not in seen:
seen.add(second_item)
result.append(sublist)

return result


def get_metadata_from_query(query: str) -> list:
"""Get songs metadata from the provided query. e.g Linkin Park - Numb
First one is the most popular video as a fallback. (for some reason the artist for it returns as watch count)
Expand Down Expand Up @@ -211,12 +214,12 @@ def get_metadata_from_query(query: str) -> list:
album = None

results_metadata.append([artists, watch_id, title, album])
results_metadata = self.remove_duplicates_by_second_item(results_metadata)
results_metadata = remove_duplicates_by_second_item(results_metadata)

return results_metadata


def get_metadata_from_watch_id(watch_id: str) -> [str, str]:
def get_metadata_from_watch_id(watch_id: str) -> list[str, str]:
yt = YTMusic()

search_results = yt.get_song(watch_id)
Expand Down
Loading

0 comments on commit 64c9b68

Please sign in to comment.