-
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathcli.py
574 lines (519 loc) · 23.6 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
import logging.handlers
import os
import re
import sys
import time
from pathlib import Path
from random import shuffle as unsort
from time import strftime
import click
from click import get_current_context
from ytmusic_deleter._version import __version__
from ytmusic_deleter.auth import ensure_auth
from ytmusic_deleter.common import can_edit_playlist
from ytmusic_deleter.common import INDIFFERENT
from ytmusic_deleter.common import SORTABLE_ATTRIBUTES
from ytmusic_deleter.common import UNKNOWN_ARTIST
from ytmusic_deleter.duplicates import check_for_duplicates
from ytmusic_deleter.duplicates import determine_tracks_to_remove
from ytmusic_deleter.progress import manager
from ytmusic_deleter.uploads import maybe_delete_uploaded_albums
from ytmusicapi import YTMusic
@click.group()
@click.version_option(__version__)
@click.option(
"--credential-dir",
"-c",
default=os.getcwd(),
help="Custom directory in which to locate/create JSON credential file, instead of current working directory",
)
@click.option(
"--static-progress",
"-p",
is_flag=True,
help="Log the progress statically instead of an animated progress bar",
)
@click.option(
"--log-dir",
"-l",
default=os.getcwd(),
help="Custom directory in which to write log files, instead of current working directory.",
)
@click.option("--no-logfile", "-n", is_flag=True, help="Only log to stdout, no file logging")
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose (debug) logging")
@click.option("--oauth", "-o", is_flag=True, help="Enable OAuth authentication (may not work)")
@click.pass_context
def cli(ctx, log_dir, credential_dir, static_progress, no_logfile, verbose, oauth):
"""Perform batch delete operations on your YouTube Music library."""
handlers = [logging.StreamHandler(sys.stdout)]
if not no_logfile:
handlers.append(logging.FileHandler(Path(log_dir) / f"ytmusic-deleter-cli_{strftime('%Y-%m-%d')}.log"))
logging.basicConfig(
force=True,
level=logging.DEBUG if verbose else logging.INFO,
format="[%(asctime)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=handlers,
)
if ctx.obj is not None:
# Allows yt_auth to be provided by pytest
yt_auth: YTMusic = ctx.obj
else:
yt_auth: YTMusic = ensure_auth(credential_dir, oauth)
ctx.ensure_object(dict)
ctx.obj["STATIC_PROGRESS"] = static_progress
ctx.obj["YT_AUTH"] = yt_auth
@cli.command()
def whoami():
"""Print the name of the authenticated user, or sign in"""
pass
@cli.command()
@click.option(
"--add-to-library",
"-a",
is_flag=True,
help="Add the corresponding album to your library before deleting a song from uploads.",
)
@click.option(
"--score-cutoff",
"-s",
default=90,
help="When combined with the --add-to-library flag, this optional integer argument between 0 and 100 is used when finding matches in the YTM online catalog. No matches with a score less than this number will be added to your library. Defaults to 85.", # noqa: B950
)
@click.pass_context
def delete_uploads(ctx: click.Context, **kwargs):
"""Delete all songs that you have uploaded to your YT Music library."""
(albums_deleted, albums_total) = maybe_delete_uploaded_albums()
logging.info(f"Deleted {albums_deleted} out of {albums_total} uploaded albums (or songs).")
remaining_count = albums_total - albums_deleted
if (ctx.params["add_to_library"]) and remaining_count > 0:
logging.info(
f"\tRemaining {remaining_count} albums (or songs) did not have a match in YouTube Music's online catalog."
)
logging.info("\tRe-run without the 'Add to library' option to delete the rest.")
return (albums_deleted, albums_total)
@cli.command()
@click.pass_context
def remove_library(ctx: click.Context):
"""Remove all songs and podcasts that you have added to your library from within YouTube Music."""
yt_auth: YTMusic = ctx.obj["YT_AUTH"]
logging.info("Retrieving all library albums...")
try:
library_albums = yt_auth.get_library_albums(limit=None)
logging.info(f"Retrieved {len(library_albums)} albums from your library.")
except Exception:
logging.exception("Failed to get library albums.")
library_albums = []
global progress_bar
progress_bar = manager.counter(
total=len(library_albums),
desc="Albums Processed",
unit="albums",
enabled=not ctx.obj["STATIC_PROGRESS"],
)
albums_removed = remove_library_items(library_albums)
logging.info("Retrieving all songs...")
try:
library_songs = yt_auth.get_library_songs(limit=None)
logging.info(f"Retrieved {len(library_songs)} songs from your library.")
except Exception:
logging.exception("Failed to get library songs.")
library_songs = []
progress_bar = manager.counter(
total=len(library_songs),
desc="Songs Processed",
unit="songs",
enabled=not ctx.obj["STATIC_PROGRESS"],
)
songs_removed = remove_library_items(library_songs)
podcasts_removed, library_podcasts = remove_library_podcasts()
items_removed = albums_removed + songs_removed + podcasts_removed
items_total = len(library_albums) + len(library_songs) + len(library_podcasts)
logging.info(f"Removed {items_removed} out of {items_total} albums, songs, and podcasts from your library.")
return (items_removed, items_total)
def remove_library_podcasts():
yt_auth: YTMusic = get_current_context().obj["YT_AUTH"]
logging.info("Retreiving all podcasts...")
library_podcasts = yt_auth.get_library_podcasts(limit=None)
# Filter out the "New Episodes" auto-playlist that can't be deleted
library_podcasts = list(filter(lambda podcast: podcast["channel"]["id"], library_podcasts))
logging.info(f"Retrieved {len(library_podcasts)} podcasts from your library.")
global progress_bar
progress_bar = manager.counter(
total=len(library_podcasts),
desc="Podcasts Removed",
unit="podcasts",
enabled=not get_current_context().obj["STATIC_PROGRESS"],
)
podcasts_removed = 0
for podcast in library_podcasts:
id = podcast.get("podcastId")
title = podcast.get("title")
if not id:
logging.debug(f"\tCan't delete podcast {title!r} because it doesn't have an ID.")
continue
response = yt_auth.rate_playlist(id, INDIFFERENT)
if "actions" in response:
logging.info(f"\tRemoved {title!r} from your library.")
podcasts_removed += 1
else:
logging.error(f"\tFailed to remove {title!r} from your library.")
update_progress()
logging.info(f"Removed {podcasts_removed} out of {len(library_podcasts)} podcasts from your library.")
return podcasts_removed, library_podcasts
def remove_library_items(library_items):
yt_auth: YTMusic = get_current_context().obj["YT_AUTH"]
items_removed = 0
for item in library_items:
logging.debug(f"Full album or song item: {item}")
artist = item["artists"][0]["name"] if "artists" in item else UNKNOWN_ARTIST
title = item.get("title")
logging.info(f"Processing item: {artist} - {title!r}")
id = item.get("playlistId")
if id:
logging.debug(f"Removing album using id: {id}")
response = yt_auth.rate_playlist(id, INDIFFERENT)
elif item.get("feedbackTokens") and isinstance(item.get("feedbackTokens"), dict):
logging.debug("This is a song, removing item using feedbackTokens")
response = yt_auth.edit_song_library_status([item.get("feedbackTokens").get("remove")])
else:
logging.error(
f"""
Library item {artist} - {title!r} was in an unexpected format, unable to remove.
Provide this to the developer:
{item}
"""
)
response = None
if response and "Removed from library" in str(response):
logging.debug(response)
logging.info(f"\tRemoved {artist} - {title!r} from your library.")
items_removed += 1
else:
logging.error(f"\tFailed to remove {artist} - {title!r} from your library.")
update_progress()
return items_removed
@cli.command()
@click.pass_context
def unlike_all(ctx: click.Context):
"""Reset all Thumbs Up ratings back to neutral"""
yt_auth: YTMusic = ctx.obj["YT_AUTH"]
logging.info("Retrieving all your liked songs...")
try:
your_likes = yt_auth.get_liked_songs(limit=None)
except Exception:
logging.error("\tNo liked songs found or error retrieving liked songs.")
raise
logging.info(f"\tRetrieved {len(your_likes['tracks'])} liked songs.")
logging.info("Begin unliking songs...")
global progress_bar
progress_bar = manager.counter(
total=len(your_likes["tracks"]),
desc="Songs Unliked",
unit="songs",
enabled=not ctx.obj["STATIC_PROGRESS"],
)
songs_unliked = 0
for track in your_likes["tracks"]:
artist = (
track["artists"][0]["name"]
if track.get("artists") # Using `get` ensures key exists and isn't []
else UNKNOWN_ARTIST
)
title = track["title"]
logging.info(f"Processing track: {artist} - {title!r}")
try:
yt_auth.rate_song(track["videoId"], INDIFFERENT)
logging.info("\tRemoved track from Likes.")
songs_unliked += 1
except Exception as e:
logging.exception(e)
logging.error(f"\tFailed to unlike {artist} - {title!r}")
update_progress()
logging.info(f"Finished unliking {songs_unliked} out of {len(your_likes['tracks'])} songs.")
return (songs_unliked, len(your_likes["tracks"]))
@cli.command()
@click.pass_context
def delete_playlists(ctx: click.Context):
"""Delete all playlists"""
yt_auth: YTMusic = ctx.obj["YT_AUTH"]
logging.info("Retrieving all your playlists...")
library_playlists = yt_auth.get_library_playlists(limit=None)
# Can't delete "Your Likes" playlist
library_playlists = list(filter(lambda playlist: playlist["playlistId"] != "LM", library_playlists))
logging.info(f"\tRetrieved {len(library_playlists)} playlists.")
logging.info("Begin deleting playlists...")
global progress_bar
progress_bar = manager.counter(
total=len(library_playlists),
desc="Playlists Deleted",
unit="playlists",
enabled=not ctx.obj["STATIC_PROGRESS"],
)
playlists_deleted = 0
for playlist in library_playlists:
logging.info(f"Processing playlist: {playlist['title']}")
try:
response = yt_auth.delete_playlist(playlist["playlistId"])
if response:
logging.info(f"\tRemoved playlist {playlist['title']!r} from your library.")
playlists_deleted += 1
else:
logging.error(f"\tFailed to remove playlist {playlist['title']!r} from your library.")
except Exception:
logging.error(
f"\tCould not delete playlist {playlist['title']!r}. You might not have permission to delete it."
)
update_progress()
logging.info(f"Deleted {playlists_deleted} out of {len(library_playlists)} playlists from your library.")
return (playlists_deleted, len(library_playlists))
@cli.command()
@click.pass_context
def delete_history(ctx: click.Context, items_deleted: int = 0):
"""
Delete your play history. This does not currently work with brand accounts.
The API can only retrieve 200 history items at a time, so the process will appear to
start over and repeat multiple times if necessary until all history is deleted.
"""
yt_auth: YTMusic = ctx.obj["YT_AUTH"]
logging.info("Begin deleting history...")
try:
history_items = yt_auth.get_history()
except Exception as e:
if str(e) == "None":
logging.info("History is empty, nothing left to delete.")
else:
logging.exception(e)
logging.info(f"Deleted {items_deleted} history items.")
return items_deleted
global progress_bar
progress_bar = manager.counter(
total=len(history_items),
desc="History Items Deleted",
unit="items",
enabled=not ctx.obj["STATIC_PROGRESS"],
)
logging.info(f"Found {len(history_items)} history items to delete.")
for item in history_items:
artist = (
item["artists"][0]["name"]
if item.get("artists") # Using `get` ensures key exists and isn't []
else UNKNOWN_ARTIST
)
logging.info(f"\tProcessing history item: {artist} - {item['title']!r}")
response = yt_auth.remove_history_items(item["feedbackToken"])
if response.get("feedbackResponses")[0].get("isProcessed"):
logging.info(f"\tDeleted history item: {artist} - {item['title']!r}")
items_deleted += 1
else:
logging.info(f"\tFailed to delete history item: {response}")
update_progress()
logging.info("Restarting history deletion to ensure all songs are deleted.")
time.sleep(5) # Wait before checking for new items as they take time to disappear
return ctx.invoke(delete_history, items_deleted) # repeat until history is empty
@cli.command()
@click.pass_context
def delete_all(ctx: click.Context):
"""Executes delete-uploads, remove-library, delete-playlists, unlike-all, and delete-history"""
ctx.invoke(delete_uploads)
ctx.invoke(remove_library)
ctx.invoke(delete_playlists)
ctx.invoke(unlike_all)
ctx.invoke(delete_history)
@cli.command()
@click.argument("playlist_titles", nargs=-1, required=True)
@click.option("--shuffle", "-s", is_flag=True, help="Shuffle the playlist(s) instead of sorting.")
@click.option("--custom-sort", "-c", multiple=True, metavar="ATTRIBUTE", help="Enable custom sorting")
@click.option("--reverse", "-r", is_flag=True, help="Reverse the entire playlist after sorting")
@click.pass_context
def sort_playlist(ctx: click.Context, shuffle, playlist_titles, custom_sort, reverse):
"""Sort or shuffle one or more playlists alphabetically by artist and by album"""
invalid_keys = [attr for attr in custom_sort if attr not in SORTABLE_ATTRIBUTES]
if invalid_keys:
raise ValueError(f"Invalid sort option(s): {', '.join(invalid_keys)}")
yt_auth: YTMusic = ctx.obj["YT_AUTH"]
all_playlists = yt_auth.get_library_playlists(limit=None)
lowercase_playlist_titles = [title.lower() for title in playlist_titles]
selected_playlist_list = [
playlist for playlist in all_playlists if playlist["title"].lower() in lowercase_playlist_titles
]
for selected_playlist in selected_playlist_list:
logging.info(f'Processing playlist: {selected_playlist["title"]}')
playlist = yt_auth.get_playlist(selected_playlist["playlistId"], limit=None)
if not can_edit_playlist(playlist):
logging.error(f"Cannot modify playlist {playlist.get('title')!r}. You are not the owner of this playlist.")
continue
current_tracklist = [t for t in playlist["tracks"]]
if shuffle:
logging.info(f"\tPlaylist: {selected_playlist['title']} will be shuffled")
desired_tracklist = [t for t in playlist["tracks"]]
unsort(desired_tracklist)
else:
desired_tracklist = [
t for t in sorted(playlist["tracks"], key=lambda t: make_sort_key(t, custom_sort), reverse=reverse)
]
global progress_bar
progress_bar = manager.counter(
total=len(desired_tracklist),
desc=f'{selected_playlist["title"]!r} Tracks {"Shuffled" if shuffle else "Sorted"}',
unit="tracks",
enabled=not ctx.obj["STATIC_PROGRESS"],
)
for cur_track in desired_tracklist:
cur_idx = desired_tracklist.index(cur_track)
track_after = current_tracklist[cur_idx]
cur_artist = cur_track["artists"][0]["name"] if cur_track["artists"] else UNKNOWN_ARTIST
track_after_artist = track_after["artists"][0]["name"] if track_after["artists"] else UNKNOWN_ARTIST
if cur_track != track_after:
logging.debug(
f"Moving {cur_artist} - {cur_track['title']!r} before {track_after_artist} - {track_after['title']!r}"
)
try:
if "setVideoId" not in cur_track or "setVideoId" not in track_after:
logging.error(
"Encountered track(s) with missing 'setVideoId'. Cannot sort the following track(s):"
)
if "setVideoId" not in cur_track:
logging.error(f"setVideoId attribute not in cur_track: {cur_track}")
if "setVideoId" not in track_after:
logging.error(f"setVideoId attribute not in track_after: {track_after}")
continue
response = yt_auth.edit_playlist(
playlist["id"],
moveItem=(
cur_track["setVideoId"],
track_after["setVideoId"],
),
)
if not response:
logging.error(
f"Failed to move {cur_artist} - {cur_track['title']!r} "
f"before {track_after_artist} - {track_after['title']!r}"
)
except Exception:
logging.error(
f"Failed to move {cur_artist} - {cur_track['title']!r} "
f"before {track_after_artist} - {track_after['title']!r}"
)
current_tracklist.remove(cur_track)
current_tracklist.insert(cur_idx, cur_track)
update_progress()
not_found_playlists = []
for title in lowercase_playlist_titles:
if title not in [x["title"].lower() for x in selected_playlist_list]:
not_found_playlists.append(title)
if not_found_playlists:
raise click.BadParameter(
f'No playlists found named {", ".join(not_found_playlists)!r}. Double-check your playlist name(s) '
'(or surround them "with quotes") and try again.'
)
def make_sort_key(track, sort_attributes):
artists = track["artists"]
artist = artists[0]["name"].lower() if artists else "z"
artist = re.sub(r"^(the |a )", "", artist)
album = track["album"]
album_title = album["name"].lower() if album else "z"
album_title = re.sub(r"^(the |a )", "", album_title)
track_title = track["title"]
duration = track.get("duration_seconds", 0) # noqa: F841
if sort_attributes:
return tuple([locals()[attr] for attr in sort_attributes])
else:
return (artist, album_title, track_title)
@cli.command()
@click.argument("playlist_title")
@click.option("--exact", "-e", is_flag=True, help="Only remove exact duplicates")
@click.pass_context
def remove_duplicates(ctx: click.Context, playlist_title, exact):
"""Delete all duplicates in a given playlist"""
yt_auth: YTMusic = ctx.obj["YT_AUTH"]
playlist = get_playlist_from_title(yt_auth, playlist_title)
# Get a list of all the sets of duplicates
duplicates = check_for_duplicates(playlist)
if not duplicates:
logging.info("No duplicates found. If you think this is an error open an issue on GitHub or message on Discord")
return
# For each dupe group, remove all but the first song
items_to_remove, _ = determine_tracks_to_remove(duplicates)
if not items_to_remove:
logging.info("Finished: No duplicate tracks were marked for deletion.")
return
logging.info(f"Removing {len(items_to_remove)} tracks total.")
playlist_id = playlist.get("id")
if playlist_id == "LM":
for song in items_to_remove:
yt_auth.rate_song(song["videoId"], INDIFFERENT)
else:
yt_auth.remove_playlist_items(playlist_id, items_to_remove)
logging.info("Finished removing duplicate tracks.")
@cli.command
@click.argument("playlist_title")
@click.option("--library", "-l", is_flag=True, help="Add all library songs to a playlist")
@click.option("--uploads", "-u", is_flag=True, help="Add all uploaded songs to a playlist")
@click.pass_context
def add_all_to_playlist(ctx: click.Context, playlist_title, library, uploads):
"""Add all library songs or uploaded songs to a playlist."""
if not (library or uploads):
raise click.BadParameter("You must specify either --library or --uploads.")
yt_auth: YTMusic = ctx.obj["YT_AUTH"]
playlist = get_playlist_from_title(yt_auth, playlist_title)
if library:
logging.info("User has selected 'Library' option. Retrieving all library songs...")
library_songs = yt_auth.get_library_songs(limit=None)
logging.info(f"Retrieved {len(library_songs)} library songs.")
songs_to_add = library_songs
elif uploads:
logging.info("User has selected 'Uploads' option. Retriving all uploaded songs...")
uploaded_songs = yt_auth.get_library_upload_songs(limit=None)
logging.info(f"Retrieved {len(uploaded_songs)} uploaded songs.")
songs_to_add = uploaded_songs
if not songs_to_add:
raise ValueError("No songs were found to add to the playlist.")
video_ids = []
for song in songs_to_add:
if "videoId" in song:
video_ids.append(song["videoId"])
else:
logging.warning(f"Warning: 'videoId' not found for {song.get('title')!r}, cannot add to playlist:", song)
logging.info(f"Preparing to add all {len(video_ids)} songs to playlist {playlist.get('title')!r}.")
response = yt_auth.add_playlist_items(playlist.get("id"), video_ids, duplicates=True)
if "status" not in response or "STATUS_SUCCEEDED" not in response.get("status"):
logging.error(response)
raise RuntimeError("API did not return a success message. See response object above.")
logging.info(f"Finished adding {len(video_ids)} songs to playlist {playlist.get('title')!r}.")
def get_playlist_from_title(yt_auth: YTMusic, playlist_title: str) -> dict:
"""
Takes the given playlist title string and returns the full dict object for that playlist.
Raises an error if the playlist is not modifiable.
"""
# Get all playlists
all_playlists = yt_auth.get_library_playlists(limit=None)
# Get the ID of the matching playlist
selected_playlist_id = next(
(
playlist["playlistId"]
for playlist in all_playlists
if playlist.get("title").lower() == playlist_title.lower()
),
None,
)
if not selected_playlist_id:
raise click.BadParameter(
f"No playlist found named {playlist_title!r}. Double-check your playlist name "
'(or surround it "with quotes") and try again.'
)
playlist: dict = yt_auth.get_playlist(selected_playlist_id, limit=None)
playlist_title_formatted = playlist.get("title")
logging.info(f"Retrieved playlist named {playlist_title_formatted!r} with {len(playlist.get('tracks'))} tracks.")
if not can_edit_playlist(playlist):
raise click.BadParameter(
f"Cannot modify playlist {playlist_title_formatted!r}. You are not the owner of this playlist."
)
return playlist
def update_progress():
global progress_bar
progress_bar.update()
if get_current_context().obj["STATIC_PROGRESS"]:
logging.info(f"Total complete: {round(progress_bar.count / progress_bar.total * 100)}%")
if __name__ == "__main__":
cli()