Skip to content

Commit

Permalink
Merge pull request #5 from Flowtter/ffmpeg
Browse files Browse the repository at this point in the history
Ffmpeg
  • Loading branch information
Flowtter authored Jul 12, 2022
2 parents f7b953f + f5d6360 commit 4bf49e2
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 0 deletions.
3 changes: 3 additions & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ ffmpeg-python==0.2.0
yapf==0.32.0
mypy==0.961
pylint==2.14.3

# Tests
pytube==12.1.0
74 changes: 74 additions & 0 deletions backend/src/crispy/utils/video/ffmpeg_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import random
import string
from typing import Optional, Any

import ffmpeg
Expand Down Expand Up @@ -71,3 +73,75 @@ def extract_images(video_path: str, save_path: str) -> None:
final.paste(enhanced, (0, 40))

final.save(im_path)


def find_available_path(video_path: str) -> str:
"""
Find available path to store the scaled video temporarily.
"""
dirname, basename = os.path.split(video_path)
h = str(hash(basename)) + ".mp4"
while (os.path.exists(os.path.join(dirname, h))):
h = random.choice(string.ascii_letters) + h

return os.path.join(dirname, h)


def scale_video(video_path: str) -> None:
"""
Scale (up or down) a video.
"""
if os.path.exists(video_path):
save_path = find_available_path(video_path)
(
ffmpeg
.input(video_path)
.filter('scale', w=1920, h=1080)
.output(save_path, start_number=0)
.overwrite_output()
.run()
) # yapf: disable

os.remove(video_path)
os.rename(save_path, video_path)
# check if image has to be upscaled or downscaled ?


def create_new_path(video_path: str) -> str:
"""
Create new path based on the original one.
"""
drive, tail = os.path.split(video_path)
name, ext = os.path.splitext(tail)
nb = 1
cur_name = name + "_" + str(nb)
while os.path.exists(os.path.join(drive, cur_name + ext)):
nb = nb + 1
cur_name = name + "_" + str(nb)

tail = cur_name + ext
res = os.path.join(drive, cur_name + ext)

return res


def split_video_once(video_path: str, split: tuple) -> None:
"""
Split a video between 2 timestamp
"""
save_path = create_new_path(video_path)
if split[1] - split[0] > 0:
(
ffmpeg
.input(video_path)
.trim(start_frame=split[0], end_frame=split[1])
.output(save_path)
.overwrite_output()
.run(quiet=True)
) # yapf: disable


def _split_video(video_path: str, splits: list) -> None:
if os.path.exists(video_path):
for split in splits:
split_video_once(video_path, split)
101 changes: 101 additions & 0 deletions backend/src/crispy/utils/video/test_ffmpeg_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import os
import cv2
from pytube import YouTube
from ffmpeg_utils import scale_video, _split_video


def test_basic() -> None:
os.mkdir('./test_mp4')
yt = YouTube('https://www.youtube.com/watch?v=6A-hTKYBkC4')
yt.streams.order_by('resolution').desc().first().download(
filename='./test_mp4/test_basic.mp4')
vid = cv2.VideoCapture('./test_mp4/test_basic.mp4')
width = vid.get(cv2.CAP_PROP_FRAME_WIDTH)
height = vid.get(cv2.CAP_PROP_FRAME_HEIGHT)

os.remove('./test_mp4/test_basic.mp4')
assert width == 1920 and height == 1080


def test_upscale() -> None:
yt = YouTube('https://www.youtube.com/watch?v=6A-hTKYBkC4')
yt.streams.order_by('resolution').asc().first().download(
filename='./test_mp4/test_upscale.mp4')

scale_video('./test_mp4/test_upscale.mp4')
vid = cv2.VideoCapture('./test_mp4/test_upscale.mp4')
width = vid.get(cv2.CAP_PROP_FRAME_WIDTH)
height = vid.get(cv2.CAP_PROP_FRAME_HEIGHT)

os.remove('./test_mp4/test_upscale.mp4')
assert width == 1920 and height == 1080


def test_downscale() -> None:
yt = YouTube('https://www.youtube.com/watch?v=wZI9is9Ix90')
yt.streams.order_by('resolution').desc().first().download(
filename='./test_mp4/test_downscale.mp4')

scale_video('./test_mp4/test_downscale.mp4')
vid = cv2.VideoCapture('./test_mp4/test_downscale.mp4')
width = vid.get(cv2.CAP_PROP_FRAME_WIDTH)
height = vid.get(cv2.CAP_PROP_FRAME_HEIGHT)

os.remove('./test_mp4/test_downscale.mp4')
os.rmdir('./test_mp4')
assert width == 1920 and height == 1080


def abort(video_path: str) -> None:
top = os.path.split(video_path)[0]
print(top)
for root, dirs, files in os.walk(top, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
os.rmdir(top)


def split_check(video_path: str, frames: int) -> bool:
drive = os.path.split(video_path)[0]
for f in os.listdir():
path = os.path.join(drive, f)
if os.path.isfile(os.path.join(drive, f)):
vid = cv2.VideoCapture(path)
fps = vid.get(cv2.CAP_PROP_FPS)
frame_count = int(vid.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count // fps

print(duration)
if duration != frames:
abort(video_path)
return False
return True


def cut_1_100(video_path: str) -> bool:
_split_video(video_path, [(0, 100)])
return split_check(video_path, 100)


def cut_10_100(video_path: str) -> bool:
_split_video(video_path, [(0, 100), (100, 200), (200, 300), (300, 400),
(400, 500), (500, 600), (600, 700), (700, 800),
(800, 900), (900, 1000)])
return split_check(video_path, 100)


def test_split() -> None:
os.mkdir('test_split')
yt = YouTube('https://www.youtube.com/watch?v=6A-hTKYBkC4')
yt.streams.order_by('resolution').desc().first().download(
filename='./test_split/test_basic.mp4')

assert cut_1_100('./test_split/test_basic.mp4')

yt.streams.order_by('resolution').desc().first().download(
filename='./test_split/test_basic.mp4')
assert cut_10_100('./test_split/test_basic.mp4')

abort('./test_split/test_basic.mp4')

0 comments on commit 4bf49e2

Please sign in to comment.