Skip to content

Commit

Permalink
New database tool to fetch YouTube metadata, zfill stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
Egezenn committed Dec 25, 2024
1 parent ccda2af commit 26d0a6f
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 89 deletions.
59 changes: 48 additions & 11 deletions ytmasc/database_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from logging import getLogger
from os import listdir, mkdir, path, rename, system
from random import shuffle
from re import search
from time import sleep

Expand All @@ -11,10 +12,22 @@
FailReplacementUtilities,
files_to_keep,
files_to_remove,
get_metadata_from_query,
get_metadata_from_watch_id,
new_music_library,
old_music_library,
)
from ytmasc.utility import download_path, fail_log_path, read_txt_as_list
from ytmasc.intermediates import update_library_for_key
from ytmasc.utility import (
count_key_amount_in_json,
download_path,
fail_log_path,
library_data_path,
operation_zfill_print,
read_json,
read_txt_as_list,
write_json,
)

logger = getLogger(__name__)

Expand Down Expand Up @@ -74,9 +87,7 @@ def compare():
and sorted_data_title[0][next(iter(sorted_data_title[0]))]["ツ"] == 100
):
system("clear")
print(
f"{str(i).zfill(len(str(old_file_amt)))}/{old_file_amt}\nremove: {file}\n"
)
print(f"{operation_zfill_print(i, old_file_amt)}\nremove: {file}\n")
rename(file, path.join("!removal", f"{old_title}.mp3"))

# user decisions
Expand All @@ -102,20 +113,18 @@ def compare():
# time.sleep(0.5)
if input_key == "r":
system("clear")
print(
f"{str(i).zfill(len(str(old_file_amt)))}/{old_file_amt}\nremove: {file}\n"
)
print(f"{operation_zfill_print(i, old_file_amt)}\nremove: {file}\n")
rename(file, path.join("!remove", f"{old_title}.mp3"))
elif input_key == "k":
system("clear")
print(
f"{str(i).zfill(len(str(old_file_amt)))}/{old_file_amt}\nkeep: {file}\n"
f"{operation_zfill_print(i, old_file_amt)}/{old_file_amt}\nkeep: {file}\n"
)
rename(file, path.join("!keep", f"{old_title}.mp3"))
elif input_key == "i":
system("clear")
print(
f"{str(i).zfill(len(str(old_file_amt)))}/{old_file_amt}\nignore: {file}\n"
f"{operation_zfill_print(i, old_file_amt)}/{old_file_amt}\nignore: {file}\n"
)
else:
quit()
Expand All @@ -142,15 +151,43 @@ def replace_fails():
lines = read_txt_as_list(fail_log_path)
for line in lines:
watch_id = search(r"\[youtube\] ([a-zA-Z0-9\-_]*?):", line).group(1)
artist, title = utils.get_metadata_from_watch_id(watch_id)
artist, title = get_metadata_from_watch_id(watch_id)
system("clear")
query = f"{artist} - {title}"
print(query)
results = utils.get_metadata_from_query(query)
results = get_metadata_from_query(query)
table = utils.init_table()
for result in results:
utils.insert_data(table, *result)
print(table)
input_key = ""
while input_key not in ["q"]:
input_key = read_key()


def replace_current_metadata_with_youtube(skip_until=-1):
# TODO do the skip amount properly, theres some offset to it, too lazy to debug it
json_data = read_json(library_data_path)
total_operations = count_key_amount_in_json(library_data_path)
for i, watch_id in enumerate(json_data, start=1):
if i + 1 <= skip_until:
continue
try:
logger.info(
f"{operation_zfill_print(i, total_operations)} Getting metadata for {watch_id}"
)
artist, title = get_metadata_from_watch_id(watch_id)
logger.info(
f"Got metadata for {watch_id}: artist: {artist}, title: {title}"
)

json_data = update_library_for_key(
json_data, watch_id, artist, title, overwrite=True
)
except:
logger.warning(
f"YouTube denied to provide information, switch your network and input the latest operation number to skip until that point."
)
break

write_json(library_data_path, json_data)
104 changes: 56 additions & 48 deletions ytmasc/database_utilities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
from logging import getLogger

from mutagen.easyid3 import EasyID3
from prettytable import PrettyTable
Expand All @@ -12,6 +13,8 @@
files_to_keep = r"!keep"
files_to_remove = r"!remove"

logger = getLogger(__name__)


class ComparisonUtilities:
def list_mp3(self, dir: str) -> list:
Expand Down Expand Up @@ -163,54 +166,9 @@ def remove_duplicates_by_second_item(self, list_of_lists: list) -> list:

return result

def get_metadata_from_query(self, query: str) -> list:
"""Get songs metadata from the provided query. e.g Linkin Park - Numb
First one is the most popular video as a fallback. (for some reason the artist for it returns as watch count)
"""

yt = YTMusic()
search_results = yt.search(query, ignore_spelling=True)

results_metadata = []
for result in search_results:
if result["category"] not in [
"More from YouTube",
"Videos",
"Community playlists",
"Featured playlists",
"Artists",
"Podcasts",
"Profiles",
"Episodes",
"Albums",
]:
artists = []
for artist in result["artists"]:
artists.append(artist["name"])
watch_id = result["videoId"]
title = result["title"]
try:
album = result["album"]["name"]
except:
album = None

results_metadata.append([artists, watch_id, title, album])
results_metadata = self.remove_duplicates_by_second_item(results_metadata)

return results_metadata

def get_metadata_from_watch_id(self, watch_id: str) -> [str, str]:
yt = YTMusic()

search_results = yt.get_song(watch_id)

artist = search_results["videoDetails"]["author"]
# does this provide a list if there's more than one?
title = search_results["videoDetails"]["title"]

return artist, title

def insert_data(self, table, artist, watch_id, title, album):
def insert_data(
self, table: classmethod, artist: str, watch_id: str, title: str, album: str
):
table.add_row(
[
f"\x1b[101m\x1b[1m {artist} \x1b[0m",
Expand All @@ -219,3 +177,53 @@ def insert_data(self, table, artist, watch_id, title, album):
f"\x1b[104m\x1b[1m {album} \x1b[0m",
]
)


def get_metadata_from_query(query: str) -> list:
"""Get songs metadata from the provided query. e.g Linkin Park - Numb
First one is the most popular video as a fallback. (for some reason the artist for it returns as watch count)
"""

yt = YTMusic()
search_results = yt.search(query, ignore_spelling=True)

results_metadata = []
for result in search_results:
if result["category"] not in [
"More from YouTube",
"Videos",
"Community playlists",
"Featured playlists",
"Artists",
"Podcasts",
"Profiles",
"Episodes",
"Albums",
]:
artists = []
for artist in result["artists"]:
artists.append(artist["name"])
watch_id = result["videoId"]
title = result["title"]
try:
album = result["album"]["name"]
except:
album = None

results_metadata.append([artists, watch_id, title, album])
results_metadata = self.remove_duplicates_by_second_item(results_metadata)

return results_metadata


def get_metadata_from_watch_id(watch_id: str) -> [str, str]:
yt = YTMusic()

search_results = yt.get_song(watch_id)
logger.debug(search_results)

artist = search_results["videoDetails"]["author"]
# does this provide a list if there's more than one?
title = search_results["videoDetails"]["title"]

return artist, title
67 changes: 42 additions & 25 deletions ytmasc/intermediates.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,35 +142,52 @@ def create_config():
update_yaml(yaml_config, default_config)


def import_csv(csv_file: str, json_file: str, overwrite=True):
def import_csv(csv_file: str, overwrite=True):
df = read_csv(csv_file)
df.fillna("", inplace=True)
json_data = read_json(json_file)
json_data = read_json(library_data_path)

for index, row in df.iterrows():
key = row.iloc[0]
value1 = row.iloc[1]
value2 = row.iloc[2]

if key in json_data:
logger.info(f"Key {key} is already in the library.")
if (
(json_data[key]["artist"] != value1)
or (json_data[key]["title"] != value2)
) and overwrite:
logger.info(
f"Values don't match, updating with:\n"
f"artist: {json_data[key]['artist']} -> {row.iloc[1]}\n"
f"title: {json_data[key]['title']} -> {row.iloc[2]}"
)
json_data[key] = {"artist": value1, "title": value2}
else:
watch_id = row.iloc[0]
artist = row.iloc[1]
title = row.iloc[2]

json_data = update_library_for_key(
json_data, watch_id, artist, title, overwrite
)

write_json(library_data_path, json_data)


def update_library_for_key(json_data, watch_id, artist, title, overwrite):
if watch_id in json_data:
logger.info(f"Key {watch_id} is already in the library.")
if (
(json_data[watch_id]["artist"] != artist)
or (json_data[watch_id]["title"] != title)
) and overwrite:
logger.info(
f"Key {key} is not in library, adding it with values:\n"
f"artist: {row.iloc[1]}\n"
f"title: {row.iloc[2]}"
f"Values don't match, updating with:\n"
f"\tartist: {json_data[watch_id]['artist']} -> {artist}\n"
f"\ttitle: {json_data[watch_id]['title']} -> {title}"
)
json_data[key] = {"artist": value1, "title": value2}
json_data[watch_id] = {"artist": artist, "title": title}

elif (json_data[watch_id]["artist"] == "") or (
json_data[watch_id]["title"] == ""
):
logger.info(
f"Overwrite not specified but artist and/or title metadata is empty:\n"
f"\tartist: {json_data[watch_id]['artist']} -> {artist}\n"
f"\ttitle: {json_data[watch_id]['title']} -> {title}"
)
json_data[watch_id] = {"artist": artist, "title": title}
else:
logger.info(
f"Key {watch_id} is not in json_data, adding it with values:\n"
f"\tartist: {artist}\n"
f"\ttitle: {title}"
)
json_data[watch_id] = {"artist": artist, "title": title}

with open(json_file, "w") as f:
dump(json_data, f, indent=2)
return json_data
37 changes: 35 additions & 2 deletions ytmasc/intermediates_cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from argparse import ArgumentParser

from ytmasc.database_helpers import compare, find_unpaired_files, replace_fails
from ytmasc.database_helpers import (
compare,
find_unpaired_files,
replace_current_metadata_with_youtube,
replace_fails,
)
from ytmasc.intermediates import (
import_csv,
run_tasks,
Expand Down Expand Up @@ -38,6 +43,13 @@ def get_cli_args():
help="parser -> inbetween-delay, dialog-wait-delay == float, else == boolean | byte",
)

parser.add_argument(
"--replace-current-metadata-with-youtube",
nargs="?",
const=True,
type=int,
help="Replace the whole library's metadata with YouTube metadata.",
)
parser.add_argument(
"--update-library-with-manual-changes-on-files",
action="store_true",
Expand Down Expand Up @@ -96,7 +108,28 @@ def handle_cli(args: classmethod, parser: classmethod):
handle_settings(args)

else:
parser.print_help()
if not any(
[
args.replace_current_metadata_with_youtube,
args.update_library_with_manual_changes_on_files,
args.export_library_as_csv,
args.import_csv_to_library,
args.import_csv_to_library_no_overwrite,
args.direct_import,
args.db_compare,
args.db_find_unpaired,
args.db_replace_fails,
]
):
parser.print_help()

# TODO also implement this behavior to csv import
if args.replace_current_metadata_with_youtube is not None:
replace_current_metadata_with_youtube(
args.replace_current_metadata_with_youtube
)
elif args.replace_current_metadata_with_youtube is None:
replace_current_metadata_with_youtube()

if args.update_library_with_manual_changes_on_files:
update_library_with_manual_changes_on_files()
Expand Down
6 changes: 3 additions & 3 deletions ytmasc/tagger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ytmasc.utility import (
audio_conversion_ext,
count_files,
count_key_amount_in_json,
download_path,
possible_audio_ext,
source_cover_ext,
Expand All @@ -17,10 +18,9 @@
def tag_bulk(json: dict):
"Tag files in bulk"
fail_amount = 0
total_files = count_files(download_path, possible_audio_ext)
num_digits = len(str(total_files))
total_operations = count_key_amount_in_json(library_data_path)
num_digits = len(str(total_operations))
for i, (watch_id, value) in enumerate(json.items(), start=1):

logger.info(f"<<< TAG {i} >>>")
fail_status = tag(watch_id, value, num_digits, i - fail_amount)
logger.info(f">>> TAG {i} <<<")
Expand Down
Loading

0 comments on commit 26d0a6f

Please sign in to comment.