diff --git a/ytmasc/database_helpers.py b/ytmasc/database_helpers.py index 7527751..de4b386 100644 --- a/ytmasc/database_helpers.py +++ b/ytmasc/database_helpers.py @@ -1,5 +1,6 @@ from logging import getLogger from os import listdir, mkdir, path, rename, system +from random import shuffle from re import search from time import sleep @@ -11,10 +12,22 @@ FailReplacementUtilities, files_to_keep, files_to_remove, + get_metadata_from_query, + get_metadata_from_watch_id, new_music_library, old_music_library, ) -from ytmasc.utility import download_path, fail_log_path, read_txt_as_list +from ytmasc.intermediates import update_library_for_key +from ytmasc.utility import ( + count_key_amount_in_json, + download_path, + fail_log_path, + library_data_path, + operation_zfill_print, + read_json, + read_txt_as_list, + write_json, +) logger = getLogger(__name__) @@ -74,9 +87,7 @@ def compare(): and sorted_data_title[0][next(iter(sorted_data_title[0]))]["ツ"] == 100 ): system("clear") - print( - f"{str(i).zfill(len(str(old_file_amt)))}/{old_file_amt}\nremove: {file}\n" - ) + print(f"{operation_zfill_print(i, old_file_amt)}\nremove: {file}\n") rename(file, path.join("!removal", f"{old_title}.mp3")) # user decisions @@ -102,20 +113,18 @@ def compare(): # time.sleep(0.5) if input_key == "r": system("clear") - print( - f"{str(i).zfill(len(str(old_file_amt)))}/{old_file_amt}\nremove: {file}\n" - ) + print(f"{operation_zfill_print(i, old_file_amt)}\nremove: {file}\n") rename(file, path.join("!remove", f"{old_title}.mp3")) elif input_key == "k": system("clear") print( - f"{str(i).zfill(len(str(old_file_amt)))}/{old_file_amt}\nkeep: {file}\n" + f"{operation_zfill_print(i, old_file_amt)}/{old_file_amt}\nkeep: {file}\n" ) rename(file, path.join("!keep", f"{old_title}.mp3")) elif input_key == "i": system("clear") print( - f"{str(i).zfill(len(str(old_file_amt)))}/{old_file_amt}\nignore: {file}\n" + f"{operation_zfill_print(i, old_file_amt)}/{old_file_amt}\nignore: {file}\n" ) else: quit() @@ -142,11 +151,11 @@ def replace_fails(): lines = read_txt_as_list(fail_log_path) for line in lines: watch_id = search(r"\[youtube\] ([a-zA-Z0-9\-_]*?):", line).group(1) - artist, title = utils.get_metadata_from_watch_id(watch_id) + artist, title = get_metadata_from_watch_id(watch_id) system("clear") query = f"{artist} - {title}" print(query) - results = utils.get_metadata_from_query(query) + results = get_metadata_from_query(query) table = utils.init_table() for result in results: utils.insert_data(table, *result) @@ -154,3 +163,31 @@ def replace_fails(): input_key = "" while input_key not in ["q"]: input_key = read_key() + + +def replace_current_metadata_with_youtube(skip_until=-1): + # TODO do the skip amount properly, theres some offset to it, too lazy to debug it + json_data = read_json(library_data_path) + total_operations = count_key_amount_in_json(library_data_path) + for i, watch_id in enumerate(json_data, start=1): + if i + 1 <= skip_until: + continue + try: + logger.info( + f"{operation_zfill_print(i, total_operations)} Getting metadata for {watch_id}" + ) + artist, title = get_metadata_from_watch_id(watch_id) + logger.info( + f"Got metadata for {watch_id}: artist: {artist}, title: {title}" + ) + + json_data = update_library_for_key( + json_data, watch_id, artist, title, overwrite=True + ) + except: + logger.warning( + f"YouTube denied to provide information, switch your network and input the latest operation number to skip until that point." + ) + break + + write_json(library_data_path, json_data) diff --git a/ytmasc/database_utilities.py b/ytmasc/database_utilities.py index 30bf592..a6476d7 100644 --- a/ytmasc/database_utilities.py +++ b/ytmasc/database_utilities.py @@ -1,5 +1,6 @@ import json import os +from logging import getLogger from mutagen.easyid3 import EasyID3 from prettytable import PrettyTable @@ -12,6 +13,8 @@ files_to_keep = r"!keep" files_to_remove = r"!remove" +logger = getLogger(__name__) + class ComparisonUtilities: def list_mp3(self, dir: str) -> list: @@ -163,54 +166,9 @@ def remove_duplicates_by_second_item(self, list_of_lists: list) -> list: return result - def get_metadata_from_query(self, query: str) -> list: - """Get songs metadata from the provided query. e.g Linkin Park - Numb - First one is the most popular video as a fallback. (for some reason the artist for it returns as watch count) - """ - - yt = YTMusic() - search_results = yt.search(query, ignore_spelling=True) - - results_metadata = [] - for result in search_results: - if result["category"] not in [ - "More from YouTube", - "Videos", - "Community playlists", - "Featured playlists", - "Artists", - "Podcasts", - "Profiles", - "Episodes", - "Albums", - ]: - artists = [] - for artist in result["artists"]: - artists.append(artist["name"]) - watch_id = result["videoId"] - title = result["title"] - try: - album = result["album"]["name"] - except: - album = None - - results_metadata.append([artists, watch_id, title, album]) - results_metadata = self.remove_duplicates_by_second_item(results_metadata) - - return results_metadata - - def get_metadata_from_watch_id(self, watch_id: str) -> [str, str]: - yt = YTMusic() - - search_results = yt.get_song(watch_id) - - artist = search_results["videoDetails"]["author"] - # does this provide a list if there's more than one? - title = search_results["videoDetails"]["title"] - - return artist, title - - def insert_data(self, table, artist, watch_id, title, album): + def insert_data( + self, table: classmethod, artist: str, watch_id: str, title: str, album: str + ): table.add_row( [ f"\x1b[101m\x1b[1m {artist} \x1b[0m", @@ -219,3 +177,53 @@ def insert_data(self, table, artist, watch_id, title, album): f"\x1b[104m\x1b[1m {album} \x1b[0m", ] ) + + +def get_metadata_from_query(query: str) -> list: + """Get songs metadata from the provided query. e.g Linkin Park - Numb + First one is the most popular video as a fallback. (for some reason the artist for it returns as watch count) + """ + + yt = YTMusic() + search_results = yt.search(query, ignore_spelling=True) + + results_metadata = [] + for result in search_results: + if result["category"] not in [ + "More from YouTube", + "Videos", + "Community playlists", + "Featured playlists", + "Artists", + "Podcasts", + "Profiles", + "Episodes", + "Albums", + ]: + artists = [] + for artist in result["artists"]: + artists.append(artist["name"]) + watch_id = result["videoId"] + title = result["title"] + try: + album = result["album"]["name"] + except: + album = None + + results_metadata.append([artists, watch_id, title, album]) + results_metadata = self.remove_duplicates_by_second_item(results_metadata) + + return results_metadata + + +def get_metadata_from_watch_id(watch_id: str) -> [str, str]: + yt = YTMusic() + + search_results = yt.get_song(watch_id) + logger.debug(search_results) + + artist = search_results["videoDetails"]["author"] + # does this provide a list if there's more than one? + title = search_results["videoDetails"]["title"] + + return artist, title diff --git a/ytmasc/intermediates.py b/ytmasc/intermediates.py index 7400a08..971d7cd 100644 --- a/ytmasc/intermediates.py +++ b/ytmasc/intermediates.py @@ -142,35 +142,52 @@ def create_config(): update_yaml(yaml_config, default_config) -def import_csv(csv_file: str, json_file: str, overwrite=True): +def import_csv(csv_file: str, overwrite=True): df = read_csv(csv_file) df.fillna("", inplace=True) - json_data = read_json(json_file) + json_data = read_json(library_data_path) for index, row in df.iterrows(): - key = row.iloc[0] - value1 = row.iloc[1] - value2 = row.iloc[2] - - if key in json_data: - logger.info(f"Key {key} is already in the library.") - if ( - (json_data[key]["artist"] != value1) - or (json_data[key]["title"] != value2) - ) and overwrite: - logger.info( - f"Values don't match, updating with:\n" - f"artist: {json_data[key]['artist']} -> {row.iloc[1]}\n" - f"title: {json_data[key]['title']} -> {row.iloc[2]}" - ) - json_data[key] = {"artist": value1, "title": value2} - else: + watch_id = row.iloc[0] + artist = row.iloc[1] + title = row.iloc[2] + + json_data = update_library_for_key( + json_data, watch_id, artist, title, overwrite + ) + + write_json(library_data_path, json_data) + + +def update_library_for_key(json_data, watch_id, artist, title, overwrite): + if watch_id in json_data: + logger.info(f"Key {watch_id} is already in the library.") + if ( + (json_data[watch_id]["artist"] != artist) + or (json_data[watch_id]["title"] != title) + ) and overwrite: logger.info( - f"Key {key} is not in library, adding it with values:\n" - f"artist: {row.iloc[1]}\n" - f"title: {row.iloc[2]}" + f"Values don't match, updating with:\n" + f"\tartist: {json_data[watch_id]['artist']} -> {artist}\n" + f"\ttitle: {json_data[watch_id]['title']} -> {title}" ) - json_data[key] = {"artist": value1, "title": value2} + json_data[watch_id] = {"artist": artist, "title": title} + + elif (json_data[watch_id]["artist"] == "") or ( + json_data[watch_id]["title"] == "" + ): + logger.info( + f"Overwrite not specified but artist and/or title metadata is empty:\n" + f"\tartist: {json_data[watch_id]['artist']} -> {artist}\n" + f"\ttitle: {json_data[watch_id]['title']} -> {title}" + ) + json_data[watch_id] = {"artist": artist, "title": title} + else: + logger.info( + f"Key {watch_id} is not in json_data, adding it with values:\n" + f"\tartist: {artist}\n" + f"\ttitle: {title}" + ) + json_data[watch_id] = {"artist": artist, "title": title} - with open(json_file, "w") as f: - dump(json_data, f, indent=2) + return json_data diff --git a/ytmasc/intermediates_cli.py b/ytmasc/intermediates_cli.py index 87e7f46..6025b1e 100644 --- a/ytmasc/intermediates_cli.py +++ b/ytmasc/intermediates_cli.py @@ -1,6 +1,11 @@ from argparse import ArgumentParser -from ytmasc.database_helpers import compare, find_unpaired_files, replace_fails +from ytmasc.database_helpers import ( + compare, + find_unpaired_files, + replace_current_metadata_with_youtube, + replace_fails, +) from ytmasc.intermediates import ( import_csv, run_tasks, @@ -38,6 +43,13 @@ def get_cli_args(): help="parser -> inbetween-delay, dialog-wait-delay == float, else == boolean | byte", ) + parser.add_argument( + "--replace-current-metadata-with-youtube", + nargs="?", + const=True, + type=int, + help="Replace the whole library's metadata with YouTube metadata.", + ) parser.add_argument( "--update-library-with-manual-changes-on-files", action="store_true", @@ -96,7 +108,28 @@ def handle_cli(args: classmethod, parser: classmethod): handle_settings(args) else: - parser.print_help() + if not any( + [ + args.replace_current_metadata_with_youtube, + args.update_library_with_manual_changes_on_files, + args.export_library_as_csv, + args.import_csv_to_library, + args.import_csv_to_library_no_overwrite, + args.direct_import, + args.db_compare, + args.db_find_unpaired, + args.db_replace_fails, + ] + ): + parser.print_help() + + # TODO also implement this behavior to csv import + if args.replace_current_metadata_with_youtube is not None: + replace_current_metadata_with_youtube( + args.replace_current_metadata_with_youtube + ) + elif args.replace_current_metadata_with_youtube is None: + replace_current_metadata_with_youtube() if args.update_library_with_manual_changes_on_files: update_library_with_manual_changes_on_files() diff --git a/ytmasc/tagger.py b/ytmasc/tagger.py index 7badd87..0ac0b15 100644 --- a/ytmasc/tagger.py +++ b/ytmasc/tagger.py @@ -6,6 +6,7 @@ from ytmasc.utility import ( audio_conversion_ext, count_files, + count_key_amount_in_json, download_path, possible_audio_ext, source_cover_ext, @@ -17,10 +18,9 @@ def tag_bulk(json: dict): "Tag files in bulk" fail_amount = 0 - total_files = count_files(download_path, possible_audio_ext) - num_digits = len(str(total_files)) + total_operations = count_key_amount_in_json(library_data_path) + num_digits = len(str(total_operations)) for i, (watch_id, value) in enumerate(json.items(), start=1): - logger.info(f"<<< TAG {i} >>>") fail_status = tag(watch_id, value, num_digits, i - fail_amount) logger.info(f">>> TAG {i} <<<") diff --git a/ytmasc/utility.py b/ytmasc/utility.py index 043e330..a32b728 100644 --- a/ytmasc/utility.py +++ b/ytmasc/utility.py @@ -167,3 +167,11 @@ def count_files(directory: str, extensions: list[str]) -> int: files = glob(pattern) count += len(files) return count + + +def count_key_amount_in_json(file_path: str) -> int: + return len(read_json(file_path)) + + +def operation_zfill_print(num: int, reference: int) -> str: + return f"{str(num).zfill(len(str(reference)))}/{str(reference)}"