diff --git a/backend/requirements.txt b/backend/requirements.txt index 1788273..a5fc7ba 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -18,3 +18,6 @@ yapf==0.32.0 mypy==0.961 pylint==2.14.3 pylint-quotes==0.2.3 + +# Tests +pytube==12.1.0 diff --git a/backend/src/crispy/utils/ffmpeg_utils.py b/backend/src/crispy/utils/ffmpeg_utils.py index cbf686c..68b2426 100644 --- a/backend/src/crispy/utils/ffmpeg_utils.py +++ b/backend/src/crispy/utils/ffmpeg_utils.py @@ -1,4 +1,6 @@ import os +import random +import string from typing import Optional, Any, List, Tuple import ffmpeg @@ -93,4 +95,56 @@ def segment_video(video_path: str, save_path: str, to=f"{end}") .overwrite_output() .run(quiet=False) + ) # yaPf: disable + + +def find_available_path(video_path: str) -> str: + """ + Find available path to store the scaled video temporarily. + """ + dirname, basename = os.path.split(video_path) + h = str(hash(basename)) + ".mp4" + while (os.path.exists(os.path.join(dirname, h))): + h = random.choice(string.ascii_letters) + h + + return os.path.join(dirname, h) + + +def scale_video(video_path: str) -> None: + """ + Scale (up or down) a video. + """ + if os.path.exists(video_path): + save_path = find_available_path(video_path) + ( + ffmpeg + .input(video_path) + .filter("scale", w=1920, h=1080) + .output(save_path, start_number=0) + .overwrite_output() + .run() ) # yapf: disable + + os.remove(video_path) + os.rename(save_path, video_path) + # check if image has to be upscaled or downscaled ? + else: + raise FileNotFoundError(f"{video_path} not found") + + +def create_new_path(video_path: str) -> str: + """ + Create new path based on the original one. + """ + drive, tail = os.path.split(video_path) + name, ext = os.path.splitext(tail) + nb = 1 + cur_name = name + "_" + str(nb) + while os.path.exists(os.path.join(drive, cur_name + ext)): + nb = nb + 1 + cur_name = name + "_" + str(nb) + + tail = cur_name + ext + res = os.path.join(drive, cur_name + ext) + + return res diff --git a/backend/src/crispy/utils/test_ffmpeg_utils.py b/backend/src/crispy/utils/test_ffmpeg_utils.py new file mode 100644 index 0000000..956c333 --- /dev/null +++ b/backend/src/crispy/utils/test_ffmpeg_utils.py @@ -0,0 +1,74 @@ +import os +import cv2 +from pytube import YouTube +from ffmpeg_utils import scale_video + + +def test_basic() -> None: + os.mkdir("./test_mp4") + yt = YouTube("https://www.youtube.com/watch?v=6A-hTKYBkC4") + yt.streams.order_by("resolution").desc().first().download( + filename="./test_mp4/test_basic.mp4") + vid = cv2.VideoCapture("./test_mp4/test_basic.mp4") + width = vid.get(cv2.CAP_PROP_FRAME_WIDTH) + height = vid.get(cv2.CAP_PROP_FRAME_HEIGHT) + + os.remove("./test_mp4/test_basic.mp4") + assert width == 1920 and height == 1080 + + +def test_upscale() -> None: + yt = YouTube("https://www.youtube.com/watch?v=6A-hTKYBkC4") + yt.streams.order_by("resolution").asc().first().download( + filename="./test_mp4/test_upscale.mp4") + + scale_video("./test_mp4/test_upscale.mp4") + vid = cv2.VideoCapture("./test_mp4/test_upscale.mp4") + width = vid.get(cv2.CAP_PROP_FRAME_WIDTH) + height = vid.get(cv2.CAP_PROP_FRAME_HEIGHT) + + os.remove("./test_mp4/test_upscale.mp4") + assert width == 1920 and height == 1080 + + +def test_downscale() -> None: + yt = YouTube("https://www.youtube.com/watch?v=wZI9is9Ix90") + yt.streams.order_by("resolution").desc().first().download( + filename="./test_mp4/test_downscale.mp4") + + scale_video("./test_mp4/test_downscale.mp4") + vid = cv2.VideoCapture("./test_mp4/test_downscale.mp4") + width = vid.get(cv2.CAP_PROP_FRAME_WIDTH) + height = vid.get(cv2.CAP_PROP_FRAME_HEIGHT) + + os.remove("./test_mp4/test_downscale.mp4") + os.rmdir("./test_mp4") + assert width == 1920 and height == 1080 + + +def abort(video_path: str) -> None: + top = os.path.split(video_path)[0] + print(top) + for root, dirs, files in os.walk(top, topdown=False): + for name in files: + os.remove(os.path.join(root, name)) + for name in dirs: + os.rmdir(os.path.join(root, name)) + os.rmdir(top) + + +def split_check(video_path: str, frames: int) -> bool: + drive = os.path.split(video_path)[0] + for f in os.listdir(): + path = os.path.join(drive, f) + if os.path.isfile(os.path.join(drive, f)): + vid = cv2.VideoCapture(path) + fps = vid.get(cv2.CAP_PROP_FPS) + frame_count = int(vid.get(cv2.CAP_PROP_FRAME_COUNT)) + duration = frame_count // fps + + print(duration) + if duration != frames: + abort(video_path) + return False + return True