Skip to content

Commit

Permalink
feat: Merge
Browse files Browse the repository at this point in the history
  • Loading branch information
Flowtter committed Jul 16, 2022
2 parents 8f111fd + 4bf49e2 commit 47bda12
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 0 deletions.
3 changes: 3 additions & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ yapf==0.32.0
mypy==0.961
pylint==2.14.3
pylint-quotes==0.2.3

# Tests
pytube==12.1.0
54 changes: 54 additions & 0 deletions backend/src/crispy/utils/ffmpeg_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import random
import string
from typing import Optional, Any, List, Tuple

import ffmpeg
Expand Down Expand Up @@ -93,4 +95,56 @@ def segment_video(video_path: str, save_path: str,
to=f"{end}")
.overwrite_output()
.run(quiet=False)
) # yaPf: disable


def find_available_path(video_path: str) -> str:
"""
Find available path to store the scaled video temporarily.
"""
dirname, basename = os.path.split(video_path)
h = str(hash(basename)) + ".mp4"
while (os.path.exists(os.path.join(dirname, h))):
h = random.choice(string.ascii_letters) + h

return os.path.join(dirname, h)


def scale_video(video_path: str) -> None:
"""
Scale (up or down) a video.
"""
if os.path.exists(video_path):
save_path = find_available_path(video_path)
(
ffmpeg
.input(video_path)
.filter("scale", w=1920, h=1080)
.output(save_path, start_number=0)
.overwrite_output()
.run()
) # yapf: disable

os.remove(video_path)
os.rename(save_path, video_path)
# check if image has to be upscaled or downscaled ?
else:
raise FileNotFoundError(f"{video_path} not found")


def create_new_path(video_path: str) -> str:
"""
Create new path based on the original one.
"""
drive, tail = os.path.split(video_path)
name, ext = os.path.splitext(tail)
nb = 1
cur_name = name + "_" + str(nb)
while os.path.exists(os.path.join(drive, cur_name + ext)):
nb = nb + 1
cur_name = name + "_" + str(nb)

tail = cur_name + ext
res = os.path.join(drive, cur_name + ext)

return res
74 changes: 74 additions & 0 deletions backend/src/crispy/utils/test_ffmpeg_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os
import cv2
from pytube import YouTube
from ffmpeg_utils import scale_video


def test_basic() -> None:
os.mkdir("./test_mp4")
yt = YouTube("https://www.youtube.com/watch?v=6A-hTKYBkC4")
yt.streams.order_by("resolution").desc().first().download(
filename="./test_mp4/test_basic.mp4")
vid = cv2.VideoCapture("./test_mp4/test_basic.mp4")
width = vid.get(cv2.CAP_PROP_FRAME_WIDTH)
height = vid.get(cv2.CAP_PROP_FRAME_HEIGHT)

os.remove("./test_mp4/test_basic.mp4")
assert width == 1920 and height == 1080


def test_upscale() -> None:
yt = YouTube("https://www.youtube.com/watch?v=6A-hTKYBkC4")
yt.streams.order_by("resolution").asc().first().download(
filename="./test_mp4/test_upscale.mp4")

scale_video("./test_mp4/test_upscale.mp4")
vid = cv2.VideoCapture("./test_mp4/test_upscale.mp4")
width = vid.get(cv2.CAP_PROP_FRAME_WIDTH)
height = vid.get(cv2.CAP_PROP_FRAME_HEIGHT)

os.remove("./test_mp4/test_upscale.mp4")
assert width == 1920 and height == 1080


def test_downscale() -> None:
yt = YouTube("https://www.youtube.com/watch?v=wZI9is9Ix90")
yt.streams.order_by("resolution").desc().first().download(
filename="./test_mp4/test_downscale.mp4")

scale_video("./test_mp4/test_downscale.mp4")
vid = cv2.VideoCapture("./test_mp4/test_downscale.mp4")
width = vid.get(cv2.CAP_PROP_FRAME_WIDTH)
height = vid.get(cv2.CAP_PROP_FRAME_HEIGHT)

os.remove("./test_mp4/test_downscale.mp4")
os.rmdir("./test_mp4")
assert width == 1920 and height == 1080


def abort(video_path: str) -> None:
top = os.path.split(video_path)[0]
print(top)
for root, dirs, files in os.walk(top, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
os.rmdir(top)


def split_check(video_path: str, frames: int) -> bool:
drive = os.path.split(video_path)[0]
for f in os.listdir():
path = os.path.join(drive, f)
if os.path.isfile(os.path.join(drive, f)):
vid = cv2.VideoCapture(path)
fps = vid.get(cv2.CAP_PROP_FPS)
frame_count = int(vid.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count // fps

print(duration)
if duration != frames:
abort(video_path)
return False
return True

0 comments on commit 47bda12

Please sign in to comment.