Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Test] Improve download test #29944

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
72 changes: 63 additions & 9 deletions test/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import io
dirkf marked this conversation as resolved.
Show resolved Hide resolved
import json
import socket
import re

import youtube_dl.YoutubeDL
from youtube_dl.compat import (
dirkf marked this conversation as resolved.
Show resolved Hide resolved
compat_filter as filter,
compat_map as map,
compat_http_client,
dirkf marked this conversation as resolved.
Show resolved Hide resolved
compat_urllib_error,
dirkf marked this conversation as resolved.
Show resolved Hide resolved
compat_HTTPError,
Expand All @@ -35,9 +38,11 @@
ExtractorError,
error_to_compat_str,
format_bytes,
std_headers,
UnavailableVideoError,
)
from youtube_dl.extractor import get_info_extractor
from youtube_dl.downloader.common import FileDownloader

RETRIES = 3

Expand All @@ -57,9 +62,10 @@ def process_info(self, info_dict):
return super(YoutubeDL, self).process_info(info_dict)


def _file_md5(fn):
def _file_md5(fn, length=None):
with open(fn, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
return hashlib.md5(
f.read(length) if length is not None else f.read()).hexdigest()
dirkf marked this conversation as resolved.
Show resolved Hide resolved


defs = gettestcases()
Expand All @@ -84,6 +90,13 @@ def strclass(cls):
strclass(self.__class__),
' [%s]' % add_ie if add_ie else '')

@classmethod
def addTest(cls, test_method, test_method_name, add_ie):
test_method.__name__ = str(test_method_name)
test_method.add_ie = add_ie
setattr(TestDownload, test_method.__name__, test_method)
dirkf marked this conversation as resolved.
Show resolved Hide resolved
del test_method

def setUp(self):
self.defs = defs

Expand Down Expand Up @@ -125,6 +138,17 @@ def print_skipping(reason):
params.setdefault('playlistend', test_case.get('playlist_mincount'))
params.setdefault('skip_download', True)

if 'user_agent' in params:
std_headers['User-Agent'] = params['user_agent']

if 'referer' in params:
std_headers['Referer'] = params['referer']

for h in params.get('headers', []):
h = h.split(':', 1)
if len(h) > 1:
std_headers[h[0]] = h[1]

ydl = YoutubeDL(params, auto_init=False)
ydl.add_default_info_extractors()
finished_hook_called = set()
Expand Down Expand Up @@ -228,7 +252,7 @@ def try_rm_tcs_files(tcs=None):
(tc_filename, format_bytes(expected_minsize),
format_bytes(got_fsize)))
if 'md5' in tc:
md5_for_file = _file_md5(tc_filename)
md5_for_file = _file_md5(tc_filename) if not params.get('test') else _file_md5(tc_filename, FileDownloader._TEST_FILE_SIZE)
dirkf marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(tc['md5'], md5_for_file)
# Finally, check test cases' data again but this time against
# extracted data from info JSON file written during processing
Expand Down Expand Up @@ -258,12 +282,42 @@ def try_rm_tcs_files(tcs=None):
tname = 'test_%s_%d' % (test_case['name'], i)
i += 1
test_method = generator(test_case, tname)
test_method.__name__ = str(tname)
ie_list = test_case.get('add_ie')
test_method.add_ie = ie_list and ','.join(ie_list)
setattr(TestDownload, test_method.__name__, test_method)
del test_method

ie_list = ','.join(test_case.get('add_ie', []))
TestDownload.addTest(test_method, tname, ie_list)


def tests_for_ie(ie_key):
return filter(
lambda a: callable(getattr(TestDownload, a, None)),
filter(lambda a: re.match(r'test_%s(?:_\d+)?$' % ie_key, a),
dir(TestDownload)))


def gen_test_suite(ie_key):
def test_all(self):
print(self)
suite = unittest.TestSuite(
map(TestDownload, tests_for_ie(ie_key)))
result = self.defaultTestResult()
suite.run(result)
print('Errors: %d\t Failures: %d\tSkipped: %d' %
tuple(map(len, (result.errors, result.failures, result.skipped))))
print('Expected failures: %d\tUnexpected successes: %d' %
tuple(map(len, (result.expectedFailures, result.unexpectedSuccesses))))
return result

return test_all


for ie_key in set(
map(lambda a: a[5:],
filter(
lambda x: callable(getattr(TestDownload, x, None)),
filter(
lambda t: re.match(r"test_.+(?<!(?:_all|.._\d|._\d\d|_\d\d\d))$", t),
dir(TestDownload))))):
test_all = gen_test_suite(ie_key)
TestDownload.addTest(test_all, 'test_%s_all' % ie_key, 'Test all: %s' % ie_key)

if __name__ == '__main__':
unittest.main()