From f404bf85696b6bce049c7447c5dc14ae5f1b6044 Mon Sep 17 00:00:00 2001 From: cyberjacob Date: Sat, 20 Mar 2021 14:56:33 +0000 Subject: [PATCH] Move all YouTube-related actions to their own module --- .../management/jobs => Youtube}/__init__.py | 0 app/Youtube/jobs.py | 64 +++++ app/Youtube/tasks.py | 190 +++++++++++++ app/Youtube/templates/Youtube/videoframe.html | 10 + app/Youtube/utils.py | 181 ++++++++++++ .../utils => Youtube}/youtube.py | 18 +- app/YtManager/__init__.py | 1 - app/YtManager/celery.py | 1 - app/YtManager/routing.py | 12 - app/YtManager/settings.py | 71 ++--- app/YtManagerApp/IProvider.py | 20 ++ app/YtManagerApp/appmain.py | 35 +-- app/YtManagerApp/consumers.py | 15 +- app/YtManagerApp/management/downloader.py | 20 +- .../management/jobs/delete_video.py | 46 --- .../management/jobs/download_video.py | 134 --------- .../management/jobs/synchronize.py | 233 --------------- .../migrations/0005_auto_20181026_2013.py | 3 +- app/YtManagerApp/models.py | 116 +++----- app/YtManagerApp/scheduler.py | 268 ------------------ app/YtManagerApp/tasks.py | 202 +------------ .../controls/subscription_update_modal.html | 2 +- .../templates/YtManagerApp/index_videos.html | 2 +- .../templates/YtManagerApp/video.html | 1 - .../templates/registration/logged_out.html | 2 +- .../registration/password_reset_complete.html | 2 +- .../registration/password_reset_done.html | 2 +- .../templates/registration/register_done.html | 2 +- app/YtManagerApp/views/actions.py | 4 +- app/YtManagerApp/views/first_time.py | 7 - app/YtManagerApp/views/index.py | 68 +++-- app/YtManagerApp/views/settings.py | 5 +- config/config.ini | 76 ++++- 33 files changed, 671 insertions(+), 1142 deletions(-) rename app/{YtManagerApp/management/jobs => Youtube}/__init__.py (100%) create mode 100644 app/Youtube/jobs.py create mode 100644 app/Youtube/tasks.py create mode 100644 app/Youtube/templates/Youtube/videoframe.html create mode 100644 app/Youtube/utils.py rename app/{YtManagerApp/utils => Youtube}/youtube.py (64%) create mode 100644 app/YtManagerApp/IProvider.py delete mode 100644 app/YtManagerApp/management/jobs/delete_video.py delete mode 100644 app/YtManagerApp/management/jobs/download_video.py delete mode 100644 app/YtManagerApp/management/jobs/synchronize.py delete mode 100644 app/YtManagerApp/scheduler.py diff --git a/app/YtManagerApp/management/jobs/__init__.py b/app/Youtube/__init__.py similarity index 100% rename from app/YtManagerApp/management/jobs/__init__.py rename to app/Youtube/__init__.py diff --git a/app/Youtube/jobs.py b/app/Youtube/jobs.py new file mode 100644 index 0000000..a2c7c8f --- /dev/null +++ b/app/Youtube/jobs.py @@ -0,0 +1,64 @@ +from YtManagerApp.IProvider import IProvider +from YtManagerApp.models import Video, Subscription +from Youtube import tasks, youtube, utils +from external.pytaw.pytaw.youtube import Channel, Playlist, InvalidURL + + +class Jobs(IProvider): + def synchronise_channel(self, subscription: Subscription): + tasks.synchronize_channel.delay(subscription) + + def download_video(self, video: Video): + tasks.download_video.delay(video) + + def delete_video(self, video: Video): + tasks.delete_video.delay(video) + + def is_url_valid_for_module(self, url: str) -> bool: + yt_api: youtube.YoutubeAPI = youtube.YoutubeAPI.build_public() + + try: + yt_api.parse_url(url) + except InvalidURL: + return False + return True + + def process_url(self, url: str, subscription: Subscription): + yt_api: youtube.YoutubeAPI = youtube.YoutubeAPI.build_public() + + url_parsed = yt_api.parse_url(url) + + if 'playlist' in url_parsed: + info_playlist = yt_api.playlist(url=url) + if info_playlist is None: + raise ValueError('Invalid playlist ID!') + + self._fill_from_playlist(subscription, info_playlist) + else: + info_channel = yt_api.channel(url=url) + if info_channel is None: + raise ValueError('Cannot find channel!') + + self._copy_from_channel(subscription, info_channel) + + @staticmethod + def _fill_from_playlist(subscription: Subscription, info_playlist: Playlist): + subscription.name = info_playlist.title + subscription.playlist_id = info_playlist.id + subscription.description = info_playlist.description + subscription.channel_id = info_playlist.channel_id + subscription.channel_name = info_playlist.channel_title + subscription.thumbnail = utils.best_thumbnail(info_playlist).url + subscription.save() + + @staticmethod + def _copy_from_channel(subscription: Subscription, info_channel: Channel): + # No point in storing info about the 'uploads from X' playlist + subscription.name = info_channel.title + subscription.playlist_id = info_channel.uploads_playlist.id + subscription.description = info_channel.description + subscription.channel_id = info_channel.id + subscription.channel_name = info_channel.title + subscription.thumbnail = utils.best_thumbnail(info_channel).url + subscription.rewrite_playlist_indices = True + subscription.save() diff --git a/app/Youtube/tasks.py b/app/Youtube/tasks.py new file mode 100644 index 0000000..f24a000 --- /dev/null +++ b/app/Youtube/tasks.py @@ -0,0 +1,190 @@ +from threading import Lock + +import youtube_dl +from celery import shared_task + +from Youtube.utils import synchronize_video, check_rss_videos, check_all_videos, build_youtube_dl_params +from YtManagerApp.management.downloader import fetch_thumbnail +from YtManagerApp.models import * +from YtManagerApp.utils import first_non_null +from Youtube import youtube + +__log = logging.getLogger(__name__) +__log_youtube_dl = logging.getLogger(youtube_dl.__name__) +_ENABLE_UPDATE_STATS = False +__api: youtube.YoutubeAPI = youtube.YoutubeAPI.build_public() +__lock = Lock() + + +@shared_task +def synchronize_channel(channel_id: int): + channel = Subscription.objects.get(id=channel_id) + __log.info("Starting synchronize "+channel.name) + videos = Video.objects.filter(subscription=channel) + + # Remove the 'new' flag + videos.update(new=False) + + __log.info("Starting check new videos " + channel.name) + if channel.last_synchronised is None: + check_all_videos(channel) + else: + check_rss_videos(channel) + channel.last_synchronised = datetime.datetime.now() + channel.save() + + fetch_missing_thumbnails_subscription.delay(channel.id) + + for video in videos: + synchronize_video(video) + + enabled = first_non_null(channel.auto_download, channel.user.preferences['auto_download']) + + if enabled: + global_limit = channel.user.preferences['download_global_limit'] + limit = first_non_null(channel.download_limit, channel.user.preferences['download_subscription_limit']) + order = first_non_null(channel.download_order, channel.user.preferences['download_order']) + order = VIDEO_ORDER_MAPPING[order] + + videos_to_download = Video.objects \ + .filter(subscription=channel, downloaded_path__isnull=True, watched=False) \ + .order_by(order) + + if global_limit > 0: + global_downloaded = Video.objects.filter(subscription__user=channel.user, downloaded_path__isnull=False).count() + allowed_count = max(global_limit - global_downloaded, 0) + videos_to_download = videos_to_download[0:allowed_count] + + if limit > 0: + sub_downloaded = Video.objects.filter(subscription=channel, downloaded_path__isnull=False).count() + allowed_count = max(limit - sub_downloaded, 0) + videos_to_download = videos_to_download[0:allowed_count] + + # enqueue download + for video in videos_to_download: + download_video.delay(video) + + +@shared_task +def actual_synchronize_video(video_id: int): + video = Video.objects.get(id=video_id) + __log.info("Starting synchronize video "+video.video_id) + if video.downloaded_path is not None: + files = list(video.get_files()) + + # Try to find a valid video file + found_video = False + for file in files: + mime, _ = mimetypes.guess_type(file) + if mime is not None and mime.startswith("video"): + found_video = True + + # Video not found, we can safely assume that the video was deleted. + if not found_video: + # Clean up + for file in files: + os.unlink(file) + video.downloaded_path = None + + # Mark watched? + user = video.subscription.user + if user.preferences['mark_deleted_as_watched']: + video.watched = True + + video.save() + + fetch_missing_thumbnails_video.delay(video.id) + + if _ENABLE_UPDATE_STATS or video.duration == 0: + video_stats = __api.video(video.video_id, part='id,statistics,contentDetails') + + if video_stats is None: + return + + if video_stats.n_likes + video_stats.n_dislikes > 0: + video.rating = video_stats.n_likes / (video_stats.n_likes + video_stats.n_dislikes) + + video.views = video_stats.n_views + video.duration = video_stats.duration.total_seconds() + video.save() + + +@shared_task +def fetch_missing_thumbnails_subscription(obj_id: int): + obj = Subscription.objects.get(id=obj_id) + if obj.thumbnail.startswith("http"): + obj.thumbnail = fetch_thumbnail(obj.thumbnail, 'sub', obj.playlist_id, settings.THUMBNAIL_SIZE_SUBSCRIPTION) + obj.save() + + +@shared_task +def fetch_missing_thumbnails_video(obj_id: int): + obj = Video.objects.get(id=obj_id) + if obj.thumbnail.startswith("http"): + obj.thumbnail = fetch_thumbnail(obj.thumbnail, 'video', obj.video_id, settings.THUMBNAIL_SIZE_VIDEO) + obj.save() + + +@shared_task +def download_video(video: Video, attempt: int = 1): + # Issue: if multiple videos are downloaded at the same time, a race condition appears in the mkdirs() call that + # youtube-dl makes, which causes it to fail with the error 'Cannot create folder - file already exists'. + # For now, allow a single download instance. + __lock.acquire() + + try: + user = video.subscription.user + max_attempts = user.preferences['max_download_attempts'] + + youtube_dl_params, output_path = build_youtube_dl_params(video) + with youtube_dl.YoutubeDL(youtube_dl_params) as yt: + ret = yt.download(["https://www.youtube.com/watch?v=" + video.video_id]) + + __log.info('Download finished with code %d', ret) + + if ret == 0: + video.downloaded_path = output_path + video.save() + __log.info('Video %d [%s %s] downloaded successfully!', video.id, video.video_id, video.name) + + elif attempt <= max_attempts: + __log.warning('Re-enqueueing video (attempt %d/%d)', attempt, max_attempts) + download_video.delay(video, attempt + 1) + + else: + __log.error('Multiple attempts to download video %d [%s %s] failed!', video.id, video.video_id, video.name) + video.downloaded_path = '' + video.save() + + finally: + __lock.release() + + +@shared_task() +def delete_video(video: Video): + count = 0 + + try: + for file in video.get_files(): + __log.info("Deleting file %s", file) + count += 1 + try: + os.unlink(file) + except OSError as e: + __log.error("Failed to delete file %s: Error: %s", file, e) + + except OSError as e: + __log.error("Failed to delete video %d [%s %s]. Error: %s", + video.id, + video.video_id, + video.name, + e) + + video.downloaded_path = None + video.save() + + __log.info('Deleted video %d successfully! (%d files) [%s %s]', + video.id, + count, + video.video_id, + video.name) diff --git a/app/Youtube/templates/Youtube/videoframe.html b/app/Youtube/templates/Youtube/videoframe.html new file mode 100644 index 0000000..566549b --- /dev/null +++ b/app/Youtube/templates/Youtube/videoframe.html @@ -0,0 +1,10 @@ + + + + + Title + + + + + \ No newline at end of file diff --git a/app/Youtube/utils.py b/app/Youtube/utils.py new file mode 100644 index 0000000..564eb0d --- /dev/null +++ b/app/Youtube/utils.py @@ -0,0 +1,181 @@ +import datetime +import re + +import os +from string import Template +from xml.etree import ElementTree +from typing import Optional + +import requests +from django.db.models import Max + +from Youtube.tasks import _ENABLE_UPDATE_STATS, actual_synchronize_video, fetch_missing_thumbnails_video, __api, __log_youtube_dl +from YtManagerApp.models import Video, Subscription +from external.pytaw.pytaw.youtube import Thumbnail, Resource + + +def synchronize_video(video: Video): + if video.downloaded_path is not None or _ENABLE_UPDATE_STATS or video.duration == 0: + actual_synchronize_video.delay(video.id) + + if video.thumbnail.startswith("http"): + fetch_missing_thumbnails_video.delay(video.id) + + +def check_rss_videos(sub: Subscription): + found_existing_video = False + + rss_request = requests.get("https://www.youtube.com/feeds/videos.xml?channel_id="+sub.channel_id) + rss_request.raise_for_status() + + rss = ElementTree.fromstring(rss_request.content) + for entry in rss.findall("{http://www.w3.org/2005/Atom}entry"): + video_id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text + results = Video.objects.filter(video_id=video_id, subscription=sub) + if results.exists(): + found_existing_video = True + else: + video_title = entry.find("{http://www.w3.org/2005/Atom}title").text + + video = Video() + video.video_id = video_id + video.name = video_title + video.description = entry.find("{http://search.yahoo.com/mrss/}group").find("{http://search.yahoo.com/mrss/}description").text or "" + video.watched = False + video.new = True + video.downloaded_path = None + video.subscription = sub + video.playlist_index = 0 + video.publish_date = datetime.datetime.fromisoformat(entry.find("{http://www.w3.org/2005/Atom}published").text) + video.thumbnail = entry\ + .find("{http://search.yahoo.com/mrss/}group")\ + .find("{http://search.yahoo.com/mrss/}thumbnail")\ + .get("url") + video.rating = entry\ + .find("{http://search.yahoo.com/mrss/}group")\ + .find("{http://search.yahoo.com/mrss/}community")\ + .find("{http://search.yahoo.com/mrss/}starRating")\ + .get("average") + video.views = entry\ + .find("{http://search.yahoo.com/mrss/}group")\ + .find("{http://search.yahoo.com/mrss/}community")\ + .find("{http://search.yahoo.com/mrss/}statistics")\ + .get("views") + video.save() + + synchronize_video(video) + + if not found_existing_video: + check_all_videos(sub) + + +def check_all_videos(sub: Subscription): + playlist_items = __api.playlist_items(sub.playlist_id) + if sub.rewrite_playlist_indices: + playlist_items = sorted(playlist_items, key=lambda x: x.published_at) + else: + playlist_items = sorted(playlist_items, key=lambda x: x.position) + + for item in playlist_items: + results = Video.objects.filter(video_id=item.resource_video_id, subscription=sub) + + if not results.exists(): + # fix playlist index if necessary + if sub.rewrite_playlist_indices or Video.objects.filter(subscription=sub, playlist_index=item.position).exists(): + highest = Video.objects.filter(subscription=sub).aggregate(Max('playlist_index'))['playlist_index__max'] + item.position = 1 + (highest or -1) + + video = Video() + video.video_id = item.resource_video_id + video.name = item.title + video.description = item.description + video.watched = False + video.new = True + video.downloaded_path = None + video.subscription = sub + video.playlist_index = item.position + video.publish_date = item.published_at + video.thumbnail = best_thumbnail(item).url + video.save() + + synchronize_video(video) + + +def build_youtube_dl_params(video: Video): + sub = video.subscription + user = sub.user + + # resolve path + download_path = user.preferences['download_path'] + + template_dict = build_template_dict(video) + output_pattern = Template(user.preferences['download_file_pattern']).safe_substitute(template_dict) + + output_path = os.path.join(download_path, output_pattern) + output_path = os.path.normpath(output_path) + + youtube_dl_params = { + 'logger': __log_youtube_dl, + 'format': user.preferences['download_format'], + 'outtmpl': output_path, + 'writethumbnail': True, + 'writedescription': True, + 'writesubtitles': user.preferences['download_subtitles'], + 'writeautomaticsub': user.preferences['download_autogenerated_subtitles'], + 'allsubtitles': user.preferences['download_subtitles_all'], + 'merge_output_format': 'mp4', + 'postprocessors': [ + { + 'key': 'FFmpegMetadata' + }, + ] + } + + sub_langs = user.preferences['download_subtitles_langs'].split(',') + sub_langs = [i.strip() for i in sub_langs] + if len(sub_langs) > 0: + youtube_dl_params['subtitleslangs'] = sub_langs + + sub_format = user.preferences['download_subtitles_format'] + if len(sub_format) > 0: + youtube_dl_params['subtitlesformat'] = sub_format + + return youtube_dl_params, output_path + + +def build_template_dict(video: Video): + return { + 'channel': video.subscription.channel_name, + 'channel_id': video.subscription.channel_id, + 'playlist': video.subscription.name, + 'playlist_id': video.subscription.playlist_id, + 'playlist_index': "{:03d}".format(1 + video.playlist_index), + 'title': video.name, + 'id': video.video_id, + } + + +def get_valid_path(path): + """ + Normalizes string, converts to lowercase, removes non-alpha characters, + and converts spaces to hyphens. + """ + import unicodedata + value = unicodedata.normalize('NFKD', path).encode('ascii', 'ignore').decode('ascii') + value = re.sub('[:"*]', '', value).strip() + value = re.sub('[?<>|]', '#', value) + return value + + +def best_thumbnail(resource: Resource) -> Optional[Thumbnail]: + """ + Gets the best thumbnail available for a resource. + :param resource: + :return: + """ + thumbs = getattr(resource, 'thumbnails', None) + + if thumbs is None or len(thumbs) <= 0: + return None + + return max(thumbs, key=lambda t: t.width * t.height) diff --git a/app/YtManagerApp/utils/youtube.py b/app/Youtube/youtube.py similarity index 64% rename from app/YtManagerApp/utils/youtube.py rename to app/Youtube/youtube.py index f8df22e..a29bdb7 100644 --- a/app/YtManagerApp/utils/youtube.py +++ b/app/Youtube/youtube.py @@ -1,7 +1,7 @@ -from django.conf import settings -from external.pytaw.pytaw.youtube import YouTube, Channel, Playlist, PlaylistItem, Thumbnail, InvalidURL, Resource, Video from typing import Optional +from external.pytaw.pytaw.youtube import YouTube, Thumbnail, Resource + class YoutubeAPI(YouTube): @@ -33,17 +33,3 @@ def default_thumbnail(resource: Resource) -> Optional[Thumbnail]: (i for i in thumbs if i.id == 'default'), thumbs[0] ) - - -def best_thumbnail(resource: Resource) -> Optional[Thumbnail]: - """ - Gets the best thumbnail available for a resource. - :param resource: - :return: - """ - thumbs = getattr(resource, 'thumbnails', None) - - if thumbs is None or len(thumbs) <= 0: - return None - - return max(thumbs, key=lambda t: t.width * t.height) \ No newline at end of file diff --git a/app/YtManager/__init__.py b/app/YtManager/__init__.py index b23e378..070e835 100644 --- a/app/YtManager/__init__.py +++ b/app/YtManager/__init__.py @@ -5,4 +5,3 @@ from .celery import app as celery_app __all__ = ('celery_app',) - diff --git a/app/YtManager/celery.py b/app/YtManager/celery.py index cc70bf2..b343a7f 100644 --- a/app/YtManager/celery.py +++ b/app/YtManager/celery.py @@ -17,4 +17,3 @@ # Load task modules from all registered Django app configs. app.autodiscover_tasks() - diff --git a/app/YtManager/routing.py b/app/YtManager/routing.py index cf1d34f..082aac8 100644 --- a/app/YtManager/routing.py +++ b/app/YtManager/routing.py @@ -1,17 +1,7 @@ # mysite/routing.py -from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter -from django.urls import re_path from django.urls import path -import YtManagerApp.routing -#application = ProtocolTypeRouter({ -# 'websocket': AuthMiddlewareStack( -# URLRouter( -# YtManagerApp.routing.websocket_urlpatterns -# ) -# ), -#}) from YtManagerApp import consumers @@ -20,5 +10,3 @@ path('ytsm/ws/events/', consumers.EventConsumer.as_asgi()), ]) }) - - diff --git a/app/YtManager/settings.py b/app/YtManager/settings.py index 05e9730..bc0b74b 100644 --- a/app/YtManager/settings.py +++ b/app/YtManager/settings.py @@ -14,24 +14,26 @@ import sys import logging from os.path import dirname as up - +from configparser import ConfigParser +from YtManagerApp.utils.extended_interpolation_with_env import ExtendedInterpolatorWithEnv +import dj_database_url # # Directories # # Build paths inside the project like this: os.path.join(BASE_DIR, ...) -PROJECT_ROOT = up(up(os.path.dirname(__file__))) # Project root -BASE_DIR = up(os.path.dirname(__file__)) # Base dir of the application +PROJECT_ROOT = up(up(os.path.dirname(__file__))) # Project root +BASE_DIR = up(os.path.dirname(__file__)) # Base dir of the application CONFIG_DIR = os.getenv("YTSM_CONFIG_DIR", os.path.join(PROJECT_ROOT, "config")) DATA_DIR = os.getenv("YTSM_DATA_DIR", os.path.join(PROJECT_ROOT, "data")) -STATIC_ROOT = "/opt/static" #os.path.join(PROJECT_ROOT, "static") +STATIC_ROOT = "/opt/static" MEDIA_ROOT = os.path.join(DATA_DIR, 'media') -print("Using static root: "+STATIC_ROOT) -print("Using media root: "+MEDIA_ROOT) +print("Using static root: " + STATIC_ROOT) +print("Using media root: " + MEDIA_ROOT) # # Defaults @@ -41,14 +43,14 @@ _DEFAULT_SECRET_KEY = '^zv8@i2h!ko2lo=%ivq(9e#x=%q*i^^)6#4@(juzdx%&0c+9a0' _DEFAULT_DATABASE = { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': os.path.join(DATA_DIR, 'ytmanager.db'), - 'HOST': None, - 'USER': None, - 'PASSWORD': None, - 'PORT': None, - 'OPTIONS': {'charset': 'utf8mb4'}, - } + 'ENGINE': 'django.db.backends.sqlite3', + 'NAME': os.path.join(DATA_DIR, 'ytmanager.db'), + 'HOST': None, + 'USER': None, + 'PASSWORD': None, + 'PORT': None, + 'OPTIONS': {'charset': 'utf8mb4'}, +} CONFIG_ERRORS = [] CONFIG_WARNINGS = [] @@ -63,6 +65,8 @@ 'DATA_DIR': DATA_DIR, } +INSTALLED_PROVIDERS = ["Youtube"] + # # Load globals from config.ini @@ -115,10 +119,6 @@ def get_global_opt(name, cfgparser, env_variable=None, fallback=None, boolean=Fa return cfgparser.get('global', name, fallback=fallback, vars=CFG_PARSER_OPTS) -from configparser import ConfigParser -from YtManagerApp.utils.extended_interpolation_with_env import ExtendedInterpolatorWithEnv -import dj_database_url - try: os.makedirs(DATA_DIR, exist_ok=True) logging.info(f"Using data directory {DATA_DIR}") @@ -132,19 +132,19 @@ def get_global_opt(name, cfgparser, env_variable=None, fallback=None, boolean=Fa if cfg_file not in read_ok: CONFIG_ERRORS.append(f'Configuration file {cfg_file} could not be read! Please make sure the file is in the ' - 'right place, and it has read permissions.') + 'right place, and it has read permissions.') # Debug -#global DEBUG +# global DEBUG DEBUG = get_global_opt('Debug', cfg, env_variable='YTSM_DEBUG', fallback=_DEFAULT_DEBUG, boolean=True) # Secret key # SECURITY WARNING: keep the secret key used in production secret! -#global SECRET_KEY +# global SECRET_KEY SECRET_KEY = get_global_opt('SecretKey', cfg, env_variable='YTSM_SECRET_KEY', fallback=_DEFAULT_SECRET_KEY) # Database -#global DATABASES +# global DATABASES DATABASES = { 'default': _DEFAULT_DATABASE } @@ -169,7 +169,7 @@ def get_global_opt(name, cfgparser, env_variable=None, fallback=None, boolean=Fa } # Log settings -#global LOG_LEVEL +# global LOG_LEVEL log_level_str = get_global_opt('LogLevel', cfg, env_variable='YTSM_LOG_LEVEL', fallback='INFO') try: @@ -177,18 +177,16 @@ def get_global_opt(name, cfgparser, env_variable=None, fallback=None, boolean=Fa except AttributeError: CONFIG_WARNINGS.append(f'Invalid log level {log_level_str}. ' f'Valid options are: DEBUG, INFO, WARN, ERROR, CRITICAL.') - print("Invalid log level " + LOG_LEVEL) + print("Invalid log level " + getattr(logging, log_level_str)) LOG_LEVEL = logging.INFO - - URL_BASE = get_global_opt('UrlBase', cfg, env_variable='YTSM_URL_BASE', fallback="") # # Basic Django stuff # ALLOWED_HOSTS = ['*'] -SESSION_COOKIE_AGE = 3600 * 30 # one month +SESSION_COOKIE_AGE = 3600 * 30 # one month # Application definition @@ -208,6 +206,8 @@ def get_global_opt(name, cfgparser, env_variable=None, fallback=None, boolean=Fa 'django_celery_results', ] +INSTALLED_APPS += INSTALLED_PROVIDERS + MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'whitenoise.middleware.WhiteNoiseMiddleware', @@ -263,9 +263,8 @@ def get_global_opt(name, cfgparser, env_variable=None, fallback=None, boolean=Fa }, ] -LOGIN_REDIRECT_URL = '/'+URL_BASE -LOGIN_URL = '/'+URL_BASE+'login' - +LOGIN_REDIRECT_URL = '/' + URL_BASE +LOGIN_URL = '/' + URL_BASE + 'login' # Internationalization # https://docs.djangoproject.com/en/1.11/topics/i18n/ @@ -287,9 +286,8 @@ def get_global_opt(name, cfgparser, env_variable=None, fallback=None, boolean=Fa # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/1.11/howto/static-files/ -STATIC_URL = get_global_opt('StaticUrl', cfg, env_variable='YTSM_STATIC_URL', fallback='/'+URL_BASE+'/static/') -MEDIA_URL = get_global_opt('MediaURL', cfg, env_variable='YTSM_MEDIA_URL', fallback='/'+URL_BASE+'/media/') - +STATIC_URL = get_global_opt('StaticUrl', cfg, env_variable='YTSM_STATIC_URL', fallback='/' + URL_BASE + '/static/') +MEDIA_URL = get_global_opt('MediaURL', cfg, env_variable='YTSM_MEDIA_URL', fallback='/' + URL_BASE + '/media/') # Misc Django stuff @@ -298,11 +296,4 @@ def get_global_opt(name, cfgparser, env_variable=None, fallback=None, boolean=Fa LOG_FORMAT = '%(asctime)s|%(process)d|%(thread)d|%(name)s|%(filename)s|%(lineno)d|%(levelname)s|%(message)s' CONSOLE_LOG_FORMAT = '%(asctime)s | %(name)s | %(filename)s:%(lineno)d | %(levelname)s | %(message)s' -## These are just to make inspector happy, they will be set in the load_config_ini() method -#DEBUG = None -#SECRET_KEY = None -#DATABASES = None -#LOG_LEVEL = None - - ASGI_APPLICATION = 'YtManager.routing.application' diff --git a/app/YtManagerApp/IProvider.py b/app/YtManagerApp/IProvider.py new file mode 100644 index 0000000..9c4e752 --- /dev/null +++ b/app/YtManagerApp/IProvider.py @@ -0,0 +1,20 @@ +from abc import ABC, abstractmethod +from YtManagerApp.models import Video, Subscription + + +class IProvider(ABC): + @abstractmethod + def download_video(self, video: Video): + pass + + @abstractmethod + def synchronise_channel(self, subscription: Subscription): + pass + + @abstractmethod + def process_url(self, url: str, subscription: Subscription) -> bool: + pass + + @abstractmethod + def is_url_valid_for_module(self, url: str) -> bool: + pass diff --git a/app/YtManagerApp/appmain.py b/app/YtManagerApp/appmain.py index 32eb66b..bd7addc 100644 --- a/app/YtManagerApp/appmain.py +++ b/app/YtManagerApp/appmain.py @@ -1,29 +1,14 @@ import logging import logging.handlers -import os import sys -from asgiref.sync import sync_to_async +import os from django.conf import settings as dj_settings -from .management.appconfig import appconfig -from .management.jobs.synchronize import SynchronizeJob -from .scheduler import scheduler -from django.db.utils import OperationalError, ProgrammingError - def __initialize_logger(): log_dir = os.path.join(dj_settings.DATA_DIR, 'logs') os.makedirs(log_dir, exist_ok=True) - - file_handler = logging.handlers.RotatingFileHandler( - os.path.join(log_dir, "log.log"), - maxBytes=1024 * 1024, - backupCount=5 - ) - file_handler.setLevel(dj_settings.LOG_LEVEL) - file_handler.setFormatter(logging.Formatter(dj_settings.LOG_FORMAT)) - logging.root.addHandler(file_handler) logging.root.setLevel(dj_settings.LOG_LEVEL) if dj_settings.DEBUG: @@ -35,22 +20,4 @@ def __initialize_logger(): def main(): __initialize_logger() - - try: -# if appconfig.initialized: -# scheduler.initialize() -# SynchronizeJob.schedule_global_job() - sync_to_async(setup_scheduler) - except (OperationalError, ProgrammingError): - # Settings table is not created when running migrate or makemigrations; - # Just don't do anything in this case. - pass - logging.info('Initialization complete.') - -@sync_to_async -def setup_scheduler(): - if appconfig.initialized: - scheduler.initialize() - SynchronizeJob.schedule_global_job() - diff --git a/app/YtManagerApp/consumers.py b/app/YtManagerApp/consumers.py index 06a87d8..be92180 100644 --- a/app/YtManagerApp/consumers.py +++ b/app/YtManagerApp/consumers.py @@ -29,6 +29,12 @@ def receive(self, text=None, bytes=None, **kwargs): text_data_json = json.loads(text) elif bytes: text_data_json = json.loads(bytes) + else: + self.send(text_data=json.dumps({ + 'request': 'error', + 'data': "Unable to find request body" + })) + return request = text_data_json['request'] if request == "jobs": @@ -37,7 +43,9 @@ def receive(self, text=None, bytes=None, **kwargs): def jobs(self): #TODO: User filtering - sync_all_tasks = django_celery_results.models.TaskResult.objects.filter(task_name="YtManagerApp.tasks.synchronize_all", date_created__gte=datetime.datetime.now()-datetime.timedelta(days=1)) + sync_all_tasks = django_celery_results.models.TaskResult.objects.filter( + task_name="YtManagerApp.tasks.synchronize_all", + date_created__gte=datetime.datetime.now()-datetime.timedelta(days=1)) all_children = [] response = [] @@ -69,7 +77,9 @@ def jobs(self): 'message': str(complete_tasks) + " / " + str(all_tasks) }] - sync_other_tasks = django_celery_results.models.TaskResult.objects.filter(date_done__isnull=True).exclude(task_id__in=all_children) + sync_other_tasks = django_celery_results.models.TaskResult.objects\ + .filter(date_done__isnull=True)\ + .exclude(task_id__in=all_children) for taskResult in sync_other_tasks: task = AsyncResult(taskResult.task_id) @@ -100,4 +110,3 @@ def jobs(self): def connect(self): self.accept() self.jobs() - diff --git a/app/YtManagerApp/management/downloader.py b/app/YtManagerApp/management/downloader.py index 5406766..57cb282 100644 --- a/app/YtManagerApp/management/downloader.py +++ b/app/YtManagerApp/management/downloader.py @@ -1,21 +1,23 @@ -from YtManagerApp.management.jobs.download_video import DownloadVideoJob -from YtManagerApp.models import Video, Subscription, VIDEO_ORDER_MAPPING -from YtManagerApp.utils import first_non_null -from django.conf import settings as srv_settings import logging -import requests import mimetypes -import os +from urllib.parse import urljoin + import PIL.Image import PIL.ImageOps -from urllib.parse import urljoin +import os +import requests +from django.conf import settings as srv_settings +from django.contrib.auth.models import User + +from YtManagerApp.models import Video, Subscription, VIDEO_ORDER_MAPPING +from YtManagerApp.utils import first_non_null log = logging.getLogger('downloader') log.setLevel(os.environ.get('LOGLEVEL', 'INFO').upper()) def __get_subscription_config(sub: Subscription): - user = sub.user + user: User = sub.user enabled = first_non_null(sub.auto_download, user.preferences['auto_download']) global_limit = user.preferences['download_global_limit'] @@ -54,7 +56,7 @@ def downloader_process_subscription(sub: Subscription): # enqueue download for video in videos_to_download: log.info('Enqueuing video %d [%s %s] index=%d', video.id, video.video_id, video.name, video.playlist_index) - DownloadVideoJob.schedule(video) + video.subscription.get_provider().download_video(video) log.info('Finished processing subscription %d [%s %s]', sub.id, sub.playlist_id, sub.id) diff --git a/app/YtManagerApp/management/jobs/delete_video.py b/app/YtManagerApp/management/jobs/delete_video.py deleted file mode 100644 index 977935a..0000000 --- a/app/YtManagerApp/management/jobs/delete_video.py +++ /dev/null @@ -1,46 +0,0 @@ -import os - -from YtManagerApp.models import Video -from YtManagerApp.scheduler import Job, scheduler - - -class DeleteVideoJob(Job): - name = "DeleteVideoJob" - - def __init__(self, job_execution, video: Video): - super().__init__(job_execution) - self._video = video - - def get_description(self): - return f"Deleting video {self._video}" - - def run(self): - count = 0 - - try: - for file in self._video.get_files(): - self.log.info("Deleting file %s", file) - count += 1 - try: - os.unlink(file) - except OSError as e: - self.log.error("Failed to delete file %s: Error: %s", file, e) - - except OSError as e: - self.log.error("Failed to delete video %d [%s %s]. Error: %s", self._video.id, - self._video.video_id, self._video.name, e) - - self._video.downloaded_path = None - self._video.save() - - self.log.info('Deleted video %d successfully! (%d files) [%s %s]', self._video.id, count, - self._video.video_id, self._video.name) - - @staticmethod - def schedule(video: Video): - """ - Schedules a delete video job to run immediately. - :param video: - :return: - """ - scheduler.add_job(DeleteVideoJob, args=[video]) diff --git a/app/YtManagerApp/management/jobs/download_video.py b/app/YtManagerApp/management/jobs/download_video.py deleted file mode 100644 index 1d8b838..0000000 --- a/app/YtManagerApp/management/jobs/download_video.py +++ /dev/null @@ -1,134 +0,0 @@ -import os -import re -from string import Template -from threading import Lock - -import youtube_dl - -from YtManagerApp.models import Video -from YtManagerApp.scheduler import Job, scheduler - - -class DownloadVideoJob(Job): - name = "DownloadVideoJob" - __lock = Lock() - - def __init__(self, job_execution, video: Video, attempt: int = 1): - super().__init__(job_execution) - self.__video = video - self.__attempt = attempt - self.__log_youtube_dl = self.log.getChild('youtube_dl') - - def get_description(self): - ret = "Downloading video " + self.__video.name - if self.__attempt > 1: - ret += f" (attempt {self.__attempt})" - return ret - - def run(self): - # Issue: if multiple videos are downloaded at the same time, a race condition appears in the mkdirs() call that - # youtube-dl makes, which causes it to fail with the error 'Cannot create folder - file already exists'. - # For now, allow a single download instance. - self.__lock.acquire() - - try: - user = self.__video.subscription.user - max_attempts = user.preferences['max_download_attempts'] - - youtube_dl_params, output_path = self.__build_youtube_dl_params(self.__video) - with youtube_dl.YoutubeDL(youtube_dl_params) as yt: - ret = yt.download(["https://www.youtube.com/watch?v=" + self.__video.video_id]) - - self.log.info('Download finished with code %d', ret) - - if ret == 0: - self.__video.downloaded_path = output_path - self.__video.save() - self.log.info('Video %d [%s %s] downloaded successfully!', self.__video.id, self.__video.video_id, self.__video.name) - - elif self.__attempt <= max_attempts: - self.log.warning('Re-enqueueing video (attempt %d/%d)', self.__attempt, max_attempts) - DownloadVideoJob.schedule(self.__video, self.__attempt + 1) - - else: - self.log.error('Multiple attempts to download video %d [%s %s] failed!', self.__video.id, self.__video.video_id, - self.__video.name) - self.__video.downloaded_path = '' - self.__video.save() - - finally: - self.__lock.release() - - def __build_youtube_dl_params(self, video: Video): - - sub = video.subscription - user = sub.user - - # resolve path - download_path = user.preferences['download_path'] - - template_dict = self.__build_template_dict(video) - output_pattern = Template(user.preferences['download_file_pattern']).safe_substitute(template_dict) - - output_path = os.path.join(download_path, output_pattern) - output_path = os.path.normpath(output_path) - - youtube_dl_params = { - 'logger': self.__log_youtube_dl, - 'format': user.preferences['download_format'], - 'outtmpl': output_path, - 'writethumbnail': True, - 'writedescription': True, - 'writesubtitles': user.preferences['download_subtitles'], - 'writeautomaticsub': user.preferences['download_autogenerated_subtitles'], - 'allsubtitles': user.preferences['download_subtitles_all'], - 'merge_output_format': 'mp4', - 'postprocessors': [ - { - 'key': 'FFmpegMetadata' - }, - ] - } - - sub_langs = user.preferences['download_subtitles_langs'].split(',') - sub_langs = [i.strip() for i in sub_langs] - if len(sub_langs) > 0: - youtube_dl_params['subtitleslangs'] = sub_langs - - sub_format = user.preferences['download_subtitles_format'] - if len(sub_format) > 0: - youtube_dl_params['subtitlesformat'] = sub_format - - return youtube_dl_params, output_path - - def __build_template_dict(self, video: Video): - return { - 'channel': video.subscription.channel_name, - 'channel_id': video.subscription.channel_id, - 'playlist': video.subscription.name, - 'playlist_id': video.subscription.playlist_id, - 'playlist_index': "{:03d}".format(1 + video.playlist_index), - 'title': video.name, - 'id': video.video_id, - } - - def __get_valid_path(self, path): - """ - Normalizes string, converts to lowercase, removes non-alpha characters, - and converts spaces to hyphens. - """ - import unicodedata - value = unicodedata.normalize('NFKD', path).encode('ascii', 'ignore').decode('ascii') - value = re.sub('[:"*]', '', value).strip() - value = re.sub('[?<>|]', '#', value) - return value - - @staticmethod - def schedule(video: Video, attempt: int = 1): - """ - Schedules to download video immediately - :param video: - :param attempt: - :return: - """ - scheduler.add_job(DownloadVideoJob, args=[video, attempt]) diff --git a/app/YtManagerApp/management/jobs/synchronize.py b/app/YtManagerApp/management/jobs/synchronize.py deleted file mode 100644 index b93c974..0000000 --- a/app/YtManagerApp/management/jobs/synchronize.py +++ /dev/null @@ -1,233 +0,0 @@ -import errno -import itertools -import datetime -from threading import Lock - -import requests -from xml.etree import ElementTree -from apscheduler.triggers.cron import CronTrigger -from django.db.models import Max, F -from django.conf import settings - -from YtManagerApp.management.appconfig import appconfig -from YtManagerApp.management.downloader import fetch_thumbnail, downloader_process_subscription -from YtManagerApp.models import * -from YtManagerApp.scheduler import scheduler, Job -from YtManagerApp.utils import youtube -from external.pytaw.pytaw.utils import iterate_chunks - -_ENABLE_UPDATE_STATS = False - - -class SynchronizeJob(Job): - name = "SynchronizeJob" - __lock = Lock() - running = False - __global_sync_job = None - - def __init__(self, job_execution, subscription: Optional[Subscription] = None): - super().__init__(job_execution) - self.__subscription = subscription - self.__api = youtube.YoutubeAPI.build_public() - self.__new_videos = [] - - def get_description(self): - if self.__subscription is not None: - return "Running synchronization for subscription " + self.__subscription.name - return "Running synchronization..." - - def get_subscription_list(self): - if self.__subscription is not None: - return [self.__subscription] - return Subscription.objects.all().order_by(F('last_synchronised').desc(nulls_first=True)) - - def run(self): - self.__lock.acquire(blocking=True) - SynchronizeJob.running = True - try: - self.log.info(self.get_description()) - - # Build list of work items - work_subs = self.get_subscription_list() - work_videos = Video.objects.filter(subscription__in=work_subs) - - self.set_total_steps(len(work_subs) + len(work_videos)) - - # Remove the 'new' flag - work_videos.update(new=False) - - # Process subscriptions - for sub in work_subs: - self.progress_advance(progress_msg="Synchronizing subscription " + sub.name) - self.check_new_videos(sub) - self.fetch_missing_thumbnails(sub) - - # Add new videos to progress calculation - self.set_total_steps(len(work_subs) + len(work_videos) + len(self.__new_videos)) - - # Process videos - all_videos = itertools.chain(work_videos, self.__new_videos) - for batch in iterate_chunks(all_videos, 50): - if _ENABLE_UPDATE_STATS: - batch_ids = [video.video_id for video in batch] - video_stats = {v.id: v for v in self.__api.videos(batch_ids, part='id,statistics,contentDetails')} - else: - batch_ids = [video.video_id for video in filter(lambda video: video.duration == 0, batch)] - video_stats = {v.id: v for v in self.__api.videos(batch_ids, part='id,statistics,contentDetails')} - - for video in batch: - self.progress_advance(progress_msg="Updating video " + video.name) - self.check_video_deleted(video) - self.fetch_missing_thumbnails(video) - - if video.video_id in video_stats: - self.update_video_stats(video, video_stats[video.video_id]) - - # Start downloading videos - for sub in work_subs: - downloader_process_subscription(sub) - - finally: - SynchronizeJob.running = False - self.__lock.release() - - def check_new_videos(self, sub: Subscription): - if sub.last_synchronised is None: - self.check_all_videos(sub) - else: - self.check_rss_videos(sub) - sub.last_synchronised = datetime.datetime.now() - sub.save() - - def check_rss_videos(self, sub: Subscription): - found_existing_video = False - - rss_request = requests.get("https://www.youtube.com/feeds/videos.xml?channel_id="+sub.channel_id) - rss_request.raise_for_status() - - rss = ElementTree.fromstring(rss_request.content) - for entry in rss.findall("{http://www.w3.org/2005/Atom}entry"): - video_id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text - results = Video.objects.filter(video_id=video_id, subscription=sub) - if results.exists(): - found_existing_video = True - else: - video_title = entry.find("{http://www.w3.org/2005/Atom}title").text - - self.log.info('New video for subscription %s: %s %s"', sub, video_id, video_title) - - video = Video() - video.video_id = video_id - video.name = video_title - video.description = entry.find("{http://search.yahoo.com/mrss/}group").find("{http://search.yahoo.com/mrss/}description").text or "" - video.watched = False - video.new = True - video.downloaded_path = None - video.subscription = sub - video.playlist_index = 0 - video.publish_date = datetime.datetime.fromisoformat(entry.find("{http://www.w3.org/2005/Atom}published").text) - video.thumbnail = entry.find("{http://search.yahoo.com/mrss/}group").find("{http://search.yahoo.com/mrss/}thumbnail").get("url") - video.rating = entry.find("{http://search.yahoo.com/mrss/}group").find("{http://search.yahoo.com/mrss/}community").find("{http://search.yahoo.com/mrss/}starRating").get("average") - video.views = entry.find("{http://search.yahoo.com/mrss/}group").find("{http://search.yahoo.com/mrss/}community").find("{http://search.yahoo.com/mrss/}statistics").get("views") - video.save() - - self.__new_videos.append(video) - - if not found_existing_video: - self.check_all_videos(sub) - - def check_all_videos(self, sub: Subscription): - playlist_items = self.__api.playlist_items(sub.playlist_id) - if sub.rewrite_playlist_indices: - playlist_items = sorted(playlist_items, key=lambda x: x.published_at) - else: - playlist_items = sorted(playlist_items, key=lambda x: x.position) - - for item in playlist_items: - results = Video.objects.filter(video_id=item.resource_video_id, subscription=sub) - - if not results.exists(): - self.log.info('New video for subscription %s: %s %s"', sub, item.resource_video_id, item.title) - - # fix playlist index if necessary - if sub.rewrite_playlist_indices or Video.objects.filter(subscription=sub, playlist_index=item.position).exists(): - highest = Video.objects.filter(subscription=sub).aggregate(Max('playlist_index'))['playlist_index__max'] - item.position = 1 + (highest or -1) - - self.__new_videos.append(Video.create(item, sub)) - - @staticmethod - def fetch_missing_thumbnails(obj: Union[Subscription, Video]): - if obj.thumbnail.startswith("http"): - if isinstance(obj, Subscription): - obj.thumbnail = fetch_thumbnail(obj.thumbnail, 'sub', obj.playlist_id, settings.THUMBNAIL_SIZE_SUBSCRIPTION) - elif isinstance(obj, Video): - obj.thumbnail = fetch_thumbnail(obj.thumbnail, 'video', obj.video_id, settings.THUMBNAIL_SIZE_VIDEO) - obj.save() - - def check_video_deleted(self, video: Video): - if video.downloaded_path is not None: - files = [] - try: - files = list(video.get_files()) - except OSError as e: - if e.errno != errno.ENOENT: - self.log.error("Could not access path %s. Error: %s", video.downloaded_path, e) - self.usr_err(f"Could not access path {video.downloaded_path}: {e}", suppress_notification=True) - return - - # Try to find a valid video file - found_video = False - for file in files: - mime, _ = mimetypes.guess_type(file) - if mime is not None and mime.startswith("video"): - found_video = True - - # Video not found, we can safely assume that the video was deleted. - if not found_video: - self.log.info("Video %d was deleted! [%s %s]", video.id, video.video_id, video.name) - # Clean up - for file in files: - try: - os.unlink(file) - except OSError as e: - self.log.error("Could not delete redundant file %s. Error: %s", file, e) - self.usr_err(f"Could not delete redundant file {file}: {e}", suppress_notification=True) - video.downloaded_path = None - - # Mark watched? - user = video.subscription.user - if user.preferences['mark_deleted_as_watched']: - video.watched = True - - video.save() - - @staticmethod - def update_video_stats(video: Video, yt_video): - if yt_video.n_likes is not None \ - and yt_video.n_dislikes is not None \ - and yt_video.n_likes + yt_video.n_dislikes > 0: - video.rating = yt_video.n_likes / (yt_video.n_likes + yt_video.n_dislikes) - - video.views = yt_video.n_views - video.duration = yt_video.duration.total_seconds() - video.save() - - @staticmethod - def schedule_global_job(): - trigger = CronTrigger.from_crontab(appconfig.sync_schedule) - - if SynchronizeJob.__global_sync_job is None: - trigger = CronTrigger.from_crontab(appconfig.sync_schedule) - SynchronizeJob.__global_sync_job = scheduler.add_job(SynchronizeJob, trigger, max_instances=1, coalesce=True) - - else: - SynchronizeJob.__global_sync_job.reschedule(trigger, max_instances=1, coalesce=True) - - @staticmethod - def schedule_now(): - scheduler.add_job(SynchronizeJob, max_instances=1, coalesce=True) - - @staticmethod - def schedule_now_for_subscription(subscription): - scheduler.add_job(SynchronizeJob, user=subscription.user, args=[subscription]) diff --git a/app/YtManagerApp/migrations/0005_auto_20181026_2013.py b/app/YtManagerApp/migrations/0005_auto_20181026_2013.py index 4270129..4970a59 100644 --- a/app/YtManagerApp/migrations/0005_auto_20181026_2013.py +++ b/app/YtManagerApp/migrations/0005_auto_20181026_2013.py @@ -1,8 +1,7 @@ # Generated by Django 2.1.2 on 2018-10-26 17:13 -from django.db import migrations, models -import django.db.models.deletion import django.db.models.functions.text +from django.db import migrations, models class Migration(migrations.Migration): diff --git a/app/YtManagerApp/models.py b/app/YtManagerApp/models.py index bbf5f2e..e64fb13 100644 --- a/app/YtManagerApp/models.py +++ b/app/YtManagerApp/models.py @@ -1,5 +1,7 @@ import logging import mimetypes + +import importlib import os import datetime from typing import Callable, Union, Any, Optional @@ -8,11 +10,14 @@ from django.db import models from django.db.models.functions import Lower -from YtManagerApp.utils import youtube +from YtManagerApp.management.appconfig import appconfig +from django.conf import settings # help_text = user shown text # verbose_name = user shown name # null = nullable, blank = user is allowed to set value to empty +from YtManagerApp.IProvider import IProvider + VIDEO_ORDER_CHOICES = [ ('newest', 'Newest'), ('oldest', 'Oldest'), @@ -51,14 +56,13 @@ def __str__(self): def __repr__(self): return f'folder {self.id}, name="{self.name}"' - def getUnwatchedCount(self): + def get_unwatched_count(self): def count(node: Union["SubscriptionFolder", "Subscription"]): if node.pk != self.pk: - return node.getUnwatchedCount() + return node.get_unwatched_count() return sum(SubscriptionFolder.traverse(self.id, self.user, count)) - def delete_folder(self, keep_subscriptions: bool): if keep_subscriptions: @@ -118,8 +122,9 @@ class Subscription(models.Model): thumbnail = models.CharField(max_length=1024) user = models.ForeignKey(User, on_delete=models.CASCADE) # youtube adds videos to the 'Uploads' playlist at the top instead of the bottom - rewrite_playlist_indices = models.BooleanField(null=False, default=False) + rewrite_playlist_indices = models.BooleanField(default=False) last_synchronised = models.DateTimeField(null=True, blank=True) + provider = models.CharField(null=False) # overrides auto_download = models.BooleanField(null=True, blank=True) @@ -136,99 +141,49 @@ def __str__(self): def __repr__(self): return f'subscription {self.id}, name="{self.name}", playlist_id="{self.playlist_id}"' - def fill_from_playlist(self, info_playlist: youtube.Playlist): - self.name = info_playlist.title - self.playlist_id = info_playlist.id - self.description = info_playlist.description - self.channel_id = info_playlist.channel_id - self.channel_name = info_playlist.channel_title - self.thumbnail = youtube.best_thumbnail(info_playlist).url - - def copy_from_channel(self, info_channel: youtube.Channel): - # No point in storing info about the 'uploads from X' playlist - self.name = info_channel.title - self.playlist_id = info_channel.uploads_playlist.id - self.description = info_channel.description - self.channel_id = info_channel.id - self.channel_name = info_channel.title - self.thumbnail = youtube.best_thumbnail(info_channel).url - self.rewrite_playlist_indices = True - - def fetch_from_url(self, url, yt_api: youtube.YoutubeAPI): - url_parsed = yt_api.parse_url(url) - if 'playlist' in url_parsed: - info_playlist = yt_api.playlist(url=url) - if info_playlist is None: - raise ValueError('Invalid playlist ID!') - - self.fill_from_playlist(info_playlist) - else: - info_channel = yt_api.channel(url=url) - if info_channel is None: - raise ValueError('Cannot find channel!') - - self.copy_from_channel(info_channel) - def delete_subscription(self, keep_downloaded_videos: bool): self.delete() def synchronize_now(self): - from YtManagerApp.management.jobs.synchronize import SynchronizeJob - SynchronizeJob.schedule_now_for_subscription(self) + self.get_provider().synchronise_channel(self) - def getUnwatchedCount(self): + def get_unwatched_count(self): return Video.objects.filter(subscription=self, watched=False).count() + def get_provider(self) -> IProvider: + if self.provider not in settings.INSTALLED_PROVIDERS: + raise Exception("Provider "+self.provider+" not loaded for subscription "+self.name+" ("+str(self.id)+")") + return importlib.import_module(self.provider).jobs.Jobs + class Video(models.Model): video_id = models.CharField(null=False, max_length=12) - name = models.TextField(null=False) + name = models.TextField() description = models.TextField() - watched = models.BooleanField(default=False, null=False) - new = models.BooleanField(default=True, null=False) + watched = models.BooleanField(default=False) + new = models.BooleanField(default=True) downloaded_path = models.TextField(null=True, blank=True) subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE) - playlist_index = models.IntegerField(null=False) + playlist_index = models.IntegerField() publish_date = models.DateTimeField(null=False) thumbnail = models.TextField() uploader_name = models.CharField(null=False, max_length=255) - views = models.IntegerField(null=False, default=0) - rating = models.FloatField(null=False, default=0.5) - duration = models.IntegerField(null=False, default=0) - - @staticmethod - def create(playlist_item: youtube.PlaylistItem, subscription: Subscription): - video = Video() - video.video_id = playlist_item.resource_video_id - video.name = playlist_item.title - video.description = playlist_item.description - video.watched = False - video.new = True - video.downloaded_path = None - video.subscription = subscription - video.playlist_index = playlist_item.position - video.publish_date = playlist_item.published_at - video.thumbnail = youtube.best_thumbnail(playlist_item).url - video.save() - return video + views = models.IntegerField(default=0) + rating = models.FloatField(default=0.5) + duration = models.IntegerField(default=0) def mark_watched(self): self.watched = True self.save() if self.downloaded_path is not None: - from YtManagerApp.management.appconfig import appconfig - from YtManagerApp.management.jobs.delete_video import DeleteVideoJob - from YtManagerApp.management.jobs.synchronize import SynchronizeJob - if appconfig.for_sub(self.subscription, 'automatically_delete_watched'): - DeleteVideoJob.schedule(self) - SynchronizeJob.schedule_now_for_subscription(self.subscription) + self.subscription.get_provider().download_video(self) + self.subscription.get_provider().synchronise_channel(self.subscription) def mark_unwatched(self): - from YtManagerApp.management.jobs.synchronize import SynchronizeJob self.watched = False self.save() - SynchronizeJob.schedule_now_for_subscription(self.subscription) + self.subscription.get_provider().synchronise_channel(self.subscription) def get_files(self): if self.downloaded_path is not None: @@ -252,21 +207,16 @@ def find_video(self): def delete_files(self): if self.downloaded_path is not None: - from YtManagerApp.management.jobs.delete_video import DeleteVideoJob - from YtManagerApp.management.appconfig import appconfig - from YtManagerApp.management.jobs.synchronize import SynchronizeJob - - DeleteVideoJob.schedule(self) + self.subscription.get_provider().download_video(self) # Mark watched? if self.subscription.user.preferences['mark_deleted_as_watched']: self.watched = True - SynchronizeJob.schedule_now_for_subscription(self.subscription) + self.subscription.get_provider().synchronise_channel(self.subscription) def download(self): if not self.downloaded_path: - from YtManagerApp.management.jobs.download_video import DownloadVideoJob - DownloadVideoJob.schedule(self) + self.subscription.get_provider().download_video(self) def __str__(self): return self.name @@ -310,7 +260,7 @@ class JobExecution(models.Model): end_date = models.DateTimeField(null=True) user = models.ForeignKey(User, on_delete=models.CASCADE, null=True) description = models.CharField(max_length=250, null=False, default="") - status = models.IntegerField(choices=JOB_STATES, null=False, default=0) + status = models.IntegerField(choices=JOB_STATES, default=0) class JobMessage(models.Model): @@ -318,5 +268,5 @@ class JobMessage(models.Model): job = models.ForeignKey(JobExecution, null=False, on_delete=models.CASCADE) progress = models.FloatField(null=True) message = models.CharField(max_length=1024, null=False, default="") - level = models.IntegerField(choices=JOB_MESSAGE_LEVELS, null=False, default=0) - suppress_notification = models.BooleanField(null=False, default=False) + level = models.IntegerField(choices=JOB_MESSAGE_LEVELS, default=0) + suppress_notification = models.BooleanField(default=False) diff --git a/app/YtManagerApp/scheduler.py b/app/YtManagerApp/scheduler.py deleted file mode 100644 index 5f3e252..0000000 --- a/app/YtManagerApp/scheduler.py +++ /dev/null @@ -1,268 +0,0 @@ -import datetime -import logging -import traceback -from typing import Type, Union, Optional, Callable, List, Any - -import pytz -from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.triggers.base import BaseTrigger -from django.contrib.auth.models import User - -from YtManagerApp.management.appconfig import appconfig -from YtManagerApp.models import JobExecution, JobMessage, JOB_STATES_MAP, JOB_MESSAGE_LEVELS_MAP - - -class ProgressTracker(object): - """ - Class which helps keep track of complex operation progress. - """ - - def __init__(self, total_steps: float = 100, initial_steps: float = 0, - listener: Callable[[float, str], None] = None, - listener_args: List[Any] = None, - parent: Optional["ProgressTracker"] = None): - """ - Constructor - :param total_steps: Total number of steps required by this operation - :param initial_steps: Starting steps - :param parent: Parent progress tracker - :param listener: Callable which is called when any progress happens - """ - - self.total_steps = total_steps - self.steps = initial_steps - - self.__subtask: ProgressTracker = None - self.__subtask_steps = 0 - - self.__parent = parent - self.__listener = listener - self.__listener_args = listener_args or [] - - def __on_progress(self, progress_msg): - if self.__listener is not None: - self.__listener(*self.__listener_args, self.compute_progress(), progress_msg) - - if self.__parent is not None: - self.__parent.__on_progress(progress_msg) - - def advance(self, steps: float = 1, progress_msg: str = ''): - """ - Advances a number of steps. - :param steps: Number of steps to advance - :param progress_msg: A message which will be passed to a listener - :return: - """ - - # We can assume previous subtask is now completed - if self.__subtask is not None: - self.steps += self.__subtask_steps - self.__subtask = None - - self.steps += steps - self.__on_progress(progress_msg) - - def subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0): - """ - Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress. - :param steps: Number of steps the subtask is 'worth' - :param subtask_total_steps: Total number of steps for subtask - :param subtask_initial_steps: Initial steps for subtask - :return: ProgressTracker for subtask - """ - - # We can assume previous subtask is now completed - if self.__subtask is not None: - self.steps += self.__subtask_steps - - self.__subtask = ProgressTracker(total_steps=subtask_total_steps, - initial_steps=subtask_initial_steps, - parent=self) - self.__subtask_steps = steps - - return self.__subtask - - def compute_progress(self): - """ - Calculates final progress value in percent. - :return: value in [0,1] interval representing progress - """ - base = float(self.steps) / self.total_steps - if self.__subtask is not None: - base += self.__subtask.compute_progress() * self.__subtask_steps / self.total_steps - - return min(base, 1.0) - - -class Job(object): - name = 'GenericJob' - - """ - Base class for jobs running in the scheduler. - """ - - def __init__(self, job_execution, *args): - self.job_execution = job_execution - self.log = logging.getLogger(self.name) - self.__progress_tracker = ProgressTracker(listener=Job.__on_progress, - listener_args=[self]) - - def get_description(self) -> str: - """ - Gets a user friendly description of this job. - Should be overriden in job classes. - :return: - """ - return "Running job..." - - # - # progress tracking - # - - def __on_progress(self, percent: float, message: str): - self.usr_log(message, progress=percent) - - def set_total_steps(self, steps: float): - """ - Sets the total number of work steps this task has. This is used for tracking progress. - Should be overriden in job classes. - :return: - """ - self.__progress_tracker.total_steps = steps - - def progress_advance(self, steps: float = 1, progress_msg: str = ''): - """ - Advances a number of steps. - :param steps: Number of steps to advance - :param progress_msg: A message which will be passed to a listener - :return: - """ - self.__progress_tracker.advance(steps, progress_msg) - - def create_subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0): - """ - Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress. - :param steps: Number of steps the subtask is 'worth' - :param subtask_total_steps: Total number of steps for subtask - :param subtask_initial_steps: Initial steps for subtask - :return: ProgressTracker for subtask - """ - return self.__progress_tracker.subtask(steps, subtask_total_steps, subtask_initial_steps) - - # - # user log messages - # - - def usr_log(self, message, progress: Optional[float] = None, level: int = JOB_MESSAGE_LEVELS_MAP['normal'], - suppress_notification: bool = False): - """ - Creates a new log message which will be shown on the user interface. - Progress can also be updated using this method. - :param message: A message to be displayed to the user - :param progress: Progress percentage in [0,1] interval - :param level: Log level (normal, warning, error) - :param suppress_notification: If set to true, a notification will not displayed to the user, but it will - appear in the system logs. - :return: - """ - - message = JobMessage(job=self.job_execution, - progress=progress, - message=message, - level=level, - suppress_notification=suppress_notification) - message.save() - - def usr_warn(self, message, progress: Optional[float] = None, suppress_notification: bool = False): - """ - Creates a new warning message which will be shown on the user interface. - Progress can also be updated using this method. - :param message: A message to be displayed to the user - :param progress: Progress percentage in [0,1] interval - :param suppress_notification: If set to true, a notification will not displayed to the user, but it will - appear in the system logs. - :return: - """ - self.usr_log(message, progress, JOB_MESSAGE_LEVELS_MAP['warning'], suppress_notification) - - def usr_err(self, message, progress: Optional[float] = None, suppress_notification: bool = False): - """ - Creates a new error message which will be shown on the user interface. - Progress can also be updated using this method. - :param message: A message to be displayed to the user - :param progress: Progress percentage in [0,1] interval - :param suppress_notification: If set to true, a notification will not displayed to the user, but it will - appear in the system logs. - :return: - """ - self.usr_log(message, progress, JOB_MESSAGE_LEVELS_MAP['error'], suppress_notification) - - # - # main run method - # - def run(self): - pass - - -class YtsmScheduler(object): - - def __init__(self): - self._apscheduler = BackgroundScheduler() - - def initialize(self): - # set state of existing jobs as "interrupted" - JobExecution.objects\ - .filter(status=JOB_STATES_MAP['running'])\ - .update(status=JOB_STATES_MAP['interrupted']) - - self._configure_scheduler() - self._apscheduler.start() - - def _configure_scheduler(self): - logger = logging.getLogger('scheduler') - executors = { - 'default': { - 'type': 'threadpool', - 'max_workers': appconfig.concurrency - } - } - job_defaults = { - 'misfire_grace_time': 60 * 60 * 24 * 365 # 1 year - } - self._apscheduler.configure(logger=logger, executors=executors, job_defaults=job_defaults) - - def _run_job(self, job_class: Type[Job], user: Optional[User], args: Union[tuple, list]): - - job_execution = JobExecution(user=user, status=JOB_STATES_MAP['running']) - job_execution.save() - job_instance = job_class(job_execution, *args) - - # update description - job_execution.description = job_instance.get_description() - job_execution.save() - - try: - job_instance.run() - job_execution.status = JOB_STATES_MAP['finished'] - - except Exception as ex: - job_instance.log.critical("Job failed with exception: %s", traceback.format_exc()) - job_instance.usr_err(job_instance.name + " operation failed: " + str(ex)) - job_execution.status = JOB_STATES_MAP['failed'] - - finally: - job_execution.end_date = datetime.datetime.now(tz=pytz.UTC) - job_execution.save() - - def add_job(self, job_class: Type[Job], trigger: Union[str, BaseTrigger] = None, - args: Union[list, tuple] = None, - user: Optional[User] = None, - **kwargs): - if args is None: - args = [] - - return self._apscheduler.add_job(YtsmScheduler._run_job, trigger=trigger, args=[self, job_class, user, args], - **kwargs) - - -scheduler = YtsmScheduler() diff --git a/app/YtManagerApp/tasks.py b/app/YtManagerApp/tasks.py index d206ed1..2b407ed 100644 --- a/app/YtManagerApp/tasks.py +++ b/app/YtManagerApp/tasks.py @@ -1,31 +1,14 @@ # Create your tasks here from __future__ import absolute_import, unicode_literals -import datetime -from xml.etree import ElementTree - -import requests from celery import shared_task -from django.conf import settings -from django.db.models import Max, F +from django.db.models import F -from YtManagerApp.management.downloader import fetch_thumbnail -from YtManagerApp.management.jobs.download_video import DownloadVideoJob from YtManagerApp.models import * -from YtManagerApp.utils import youtube, first_non_null - -_ENABLE_UPDATE_STATS = False -__api = None log = logging.getLogger(__name__) - -def _init_api(): - global __api - - if not __api: - __api = youtube.YoutubeAPI.build_public() - +providers = {} @shared_task @@ -33,183 +16,4 @@ def synchronize_all(): log.info("Starting synchronize all") channels = Subscription.objects.all().order_by(F('last_synchronised').desc(nulls_first=True)) for channel in channels: - synchronize_channel.delay(channel.id) - - -@shared_task -def synchronize_channel(channel_id: int): - channel = Subscription.objects.get(id=channel_id) - log.info("Starting synchronize "+channel.name) - videos = Video.objects.filter(subscription=channel) - - # Remove the 'new' flag - videos.update(new=False) - - log.info("Starting check new videos " + channel.name) - if channel.last_synchronised is None: - check_all_videos(channel) - else: - check_rss_videos(channel) - channel.last_synchronised = datetime.datetime.now() - channel.save() - - fetch_missing_thumbnails_subscription.delay(channel.id) - - for video in videos: - synchronize_video(video) - - enabled = first_non_null(channel.auto_download, channel.user.preferences['auto_download']) - - if enabled: - global_limit = channel.user.preferences['download_global_limit'] - limit = first_non_null(channel.download_limit, channel.user.preferences['download_subscription_limit']) - order = first_non_null(channel.download_order, channel.user.preferences['download_order']) - order = VIDEO_ORDER_MAPPING[order] - - videos_to_download = Video.objects \ - .filter(subscription=channel, downloaded_path__isnull=True, watched=False) \ - .order_by(order) - - if global_limit > 0: - global_downloaded = Video.objects.filter(subscription__user=channel.user, downloaded_path__isnull=False).count() - allowed_count = max(global_limit - global_downloaded, 0) - videos_to_download = videos_to_download[0:allowed_count] - - if limit > 0: - sub_downloaded = Video.objects.filter(subscription=channel, downloaded_path__isnull=False).count() - allowed_count = max(limit - sub_downloaded, 0) - videos_to_download = videos_to_download[0:allowed_count] - - # enqueue download - for video in videos_to_download: - DownloadVideoJob.schedule(video) - - -def synchronize_video(video: Video): - if video.downloaded_path is not None or _ENABLE_UPDATE_STATS or video.duration == 0: - actual_synchronize_video.delay(video.id) - - if video.thumbnail.startswith("http"): - fetch_missing_thumbnails_video.delay(video.id) - - -@shared_task -def actual_synchronize_video(video_id: int): - global __api - _init_api() - - video = Video.objects.get(id=video_id) - log.info("Starting synchronize video "+video.video_id) - if video.downloaded_path is not None: - files = list(video.get_files()) - - # Try to find a valid video file - found_video = False - for file in files: - mime, _ = mimetypes.guess_type(file) - if mime is not None and mime.startswith("video"): - found_video = True - - # Video not found, we can safely assume that the video was deleted. - if not found_video: - # Clean up - for file in files: - os.unlink(file) - video.downloaded_path = None - - # Mark watched? - user = video.subscription.user - if user.preferences['mark_deleted_as_watched']: - video.watched = True - - video.save() - - fetch_missing_thumbnails_video.delay(video.id) - - if _ENABLE_UPDATE_STATS or video.duration == 0: - video_stats = __api.video(video.video_id, part='id,statistics,contentDetails') - - if video_stats is None: - return - - if video_stats.n_likes + video_stats.n_dislikes > 0: - video.rating = video_stats.n_likes / (video_stats.n_likes + video_stats.n_dislikes) - - video.views = video_stats.n_views - video.duration = video_stats.duration.total_seconds() - video.save() - - -def check_rss_videos(sub: Subscription): - found_existing_video = False - - rss_request = requests.get("https://www.youtube.com/feeds/videos.xml?channel_id="+sub.channel_id) - rss_request.raise_for_status() - - rss = ElementTree.fromstring(rss_request.content) - for entry in rss.findall("{http://www.w3.org/2005/Atom}entry"): - video_id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text - results = Video.objects.filter(video_id=video_id, subscription=sub) - if results.exists(): - found_existing_video = True - else: - video_title = entry.find("{http://www.w3.org/2005/Atom}title").text - - video = Video() - video.video_id = video_id - video.name = video_title - video.description = entry.find("{http://search.yahoo.com/mrss/}group").find("{http://search.yahoo.com/mrss/}description").text or "" - video.watched = False - video.new = True - video.downloaded_path = None - video.subscription = sub - video.playlist_index = 0 - video.publish_date = datetime.datetime.fromisoformat(entry.find("{http://www.w3.org/2005/Atom}published").text) - video.thumbnail = entry.find("{http://search.yahoo.com/mrss/}group").find("{http://search.yahoo.com/mrss/}thumbnail").get("url") - video.rating = entry.find("{http://search.yahoo.com/mrss/}group").find("{http://search.yahoo.com/mrss/}community").find("{http://search.yahoo.com/mrss/}starRating").get("average") - video.views = entry.find("{http://search.yahoo.com/mrss/}group").find("{http://search.yahoo.com/mrss/}community").find("{http://search.yahoo.com/mrss/}statistics").get("views") - video.save() - - synchronize_video(video) - - if not found_existing_video: - check_all_videos(sub) - - -def check_all_videos(sub: Subscription): - global __api - _init_api() - - playlist_items = __api.playlist_items(sub.playlist_id) - if sub.rewrite_playlist_indices: - playlist_items = sorted(playlist_items, key=lambda x: x.published_at) - else: - playlist_items = sorted(playlist_items, key=lambda x: x.position) - - for item in playlist_items: - results = Video.objects.filter(video_id=item.resource_video_id, subscription=sub) - - if not results.exists(): - # fix playlist index if necessary - if sub.rewrite_playlist_indices or Video.objects.filter(subscription=sub, playlist_index=item.position).exists(): - highest = Video.objects.filter(subscription=sub).aggregate(Max('playlist_index'))['playlist_index__max'] - item.position = 1 + (highest or -1) - - synchronize_video(Video.create(item, sub)) - - -@shared_task -def fetch_missing_thumbnails_subscription(obj_id: int): - obj = Subscription.objects.get(id=obj_id) - if obj.thumbnail.startswith("http"): - obj.thumbnail = fetch_thumbnail(obj.thumbnail, 'sub', obj.playlist_id, settings.THUMBNAIL_SIZE_SUBSCRIPTION) - obj.save() - - -@shared_task -def fetch_missing_thumbnails_video(obj_id: int): - obj = Video.objects.get(id=obj_id) - if obj.thumbnail.startswith("http"): - obj.thumbnail = fetch_thumbnail(obj.thumbnail, 'video', obj.video_id, settings.THUMBNAIL_SIZE_VIDEO) - obj.save() - + channel.get_provider().synchronise_channel(channel) diff --git a/app/YtManagerApp/templates/YtManagerApp/controls/subscription_update_modal.html b/app/YtManagerApp/templates/YtManagerApp/controls/subscription_update_modal.html index 6d61db4..9e4e222 100644 --- a/app/YtManagerApp/templates/YtManagerApp/controls/subscription_update_modal.html +++ b/app/YtManagerApp/templates/YtManagerApp/controls/subscription_update_modal.html @@ -26,7 +26,7 @@ function synchronizeChannelNow() { $.post("{% url 'ajax_action_sync_now' form.instance.pk %}", { csrfmiddlewaretoken: '{{ csrf_token }}' - }, function() { + }, () => { }); } diff --git a/app/YtManagerApp/templates/YtManagerApp/index_videos.html b/app/YtManagerApp/templates/YtManagerApp/index_videos.html index 0862101..00c5f63 100644 --- a/app/YtManagerApp/templates/YtManagerApp/index_videos.html +++ b/app/YtManagerApp/templates/YtManagerApp/index_videos.html @@ -3,7 +3,7 @@ {% if videos %}
- Watch All Now  + Watch All Now  Mark All as Watched  {{duration}} Total
diff --git a/app/YtManagerApp/templates/YtManagerApp/video.html b/app/YtManagerApp/templates/YtManagerApp/video.html index c2dbcf7..f1f0199 100644 --- a/app/YtManagerApp/templates/YtManagerApp/video.html +++ b/app/YtManagerApp/templates/YtManagerApp/video.html @@ -131,7 +131,6 @@ container.css("position", "static"); container.css("width", "100%"); container.css("height", "80vh"); - parent.css("position", "relative"); } }); diff --git a/app/YtManagerApp/templates/registration/logged_out.html b/app/YtManagerApp/templates/registration/logged_out.html index 82ad362..b071969 100644 --- a/app/YtManagerApp/templates/registration/logged_out.html +++ b/app/YtManagerApp/templates/registration/logged_out.html @@ -6,7 +6,7 @@ {% block scripts %}