-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathutils.py
162 lines (125 loc) · 4.18 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import re
import sys
import requests
from zipfile import ZipFile
from pathlib import Path
import apiaudio
from moviepy.editor import VideoFileClip
from moviepy.editor import AudioFileClip
from moviepy.editor import CompositeAudioClip
from config import AUDIO_API_KEY, ASSEMBLY_AUTH_TOKEN
### VIDEO2SUBTITLE SECTION
def read_file(filename, chunk_size=5242880):
with open(filename, "rb") as _file:
while True:
data = _file.read(chunk_size)
if not data:
break
yield data
def upload_file(filename, auth=ASSEMBLY_AUTH_TOKEN):
if auth is None:
raise ValueError("A token is required")
headers = {"authorization": auth}
if isinstance(filename, str):
sys.stdout.flush()
print(f"{filename} is passed")
response = requests.post(
"https://api.assemblyai.com/v2/upload",
headers=headers,
data=read_file(filename),
)
else:
sys.stdout.flush()
print("Byte stream")
response = requests.post(
"https://api.assemblyai.com/v2/upload", headers=headers, data=filename
)
print(response.json())
video_url = response.json()["upload_url"]
return video_url
def send_to_assembly(filename, auth=ASSEMBLY_AUTH_TOKEN):
if auth is None:
raise ValueError("A token is required")
transcript_endpoint = "https://api.assemblyai.com/v2/transcript"
headers = {"authorization": auth, "content-type": "application/json"}
transcript_request = {"audio_url": upload_file(filename, auth=auth)}
transcript_response = requests.post(
transcript_endpoint, json=transcript_request, headers=headers
)
transcript_id = transcript_response.json()["id"]
sub_endpoint = transcript_endpoint + "/" + transcript_id
return headers, sub_endpoint
### APIAUDIO FOR TEXT2SPEECH SECTION
def is_time_stamp(l):
if l[:2].isnumeric() and l[2] == ":":
return True
return False
def has_letters(line):
if re.search("[a-zA-Z]", line):
return True
return False
def has_no_text(line):
l = line.strip()
if not len(l):
return True
if l.isnumeric():
return True
if is_time_stamp(l):
return True
if l[0] == "(" and l[-1] == ")":
return True
if not has_letters(line):
return True
return False
def is_lowercase_letter_or_comma(letter):
if letter.isalpha() and letter.lower() == letter:
return True
if letter == ",":
return True
return False
def clean_up(lines):
new_lines = []
for line in lines[1:]:
if has_no_text(line):
continue
elif len(new_lines) and is_lowercase_letter_or_comma(line[0]):
new_lines[-1] = new_lines[-1].strip() + " " + line
else:
new_lines.append(line)
return new_lines
def dubbing(p_name, subtitle, video_name, speed=105, voice="liam", auth=AUDIO_API_KEY):
if auth is None:
raise ValueError("A token is required")
apiaudio.api_key = auth
script = apiaudio.Script.create(
scriptText=(
f"""
<<soundSegment::intro>><<sectionName::first>> {clean_up(subtitle)[0]}
"""
),
scriptName=f"{Path(p_name).stem.split('.')[0]}",
)
print(clean_up(subtitle))
r = apiaudio.Speech.create(
scriptId=script.get("scriptId"), voice=voice, speed=speed
)
r = apiaudio.Mastering().create(
scriptId=script.get("scriptId"),
)
audio_file = apiaudio.Mastering.download(scriptId=script.get("scriptId"))
print(audio_file)
### EDIT VIDEO ADDING THE NEW DUBBED FILE
videoclip = VideoFileClip(f"{video_name}")
new_clip = videoclip.without_audio()
audioclip = AudioFileClip(f"{audio_file}")
new_audioclip = CompositeAudioClip([audioclip])
new_clip.audio = new_audioclip
final_video = f"Dubbed_{Path(video_name).stem.split('.')[0]}.mp4"
new_clip.write_videofile(final_video)
return final_video, audio_file
def create_zip(p_name, srt_file, video_file, audio_file):
final_zip = ZipFile(f"{p_name}.zip", "w")
final_zip.write(srt_file)
final_zip.write(video_file)
final_zip.write(audio_file)
final_zip.close()