From d840c40be30efc4c2d2145d861e551dfef5b4261 Mon Sep 17 00:00:00 2001 From: McCloudS <64094529+McCloudS@users.noreply.github.com> Date: Wed, 4 Dec 2024 13:05:44 -0700 Subject: [PATCH] Language code improvements (#150) * Language code improvements (#147) * improved language code handling * expanded skipping behaviour * remove unused code * Added an option to detect language with whisper before choosing to skip it --------- Co-authored-by: muisje <27768559+muisje@users.noreply.github.com> * Default LanguageCode inputs to from_string and fix detect_langauge * Skip detect-language if we have forced a detected language * Typecast user inputs to ints as appropriate. * Update subgen.py * Update subgen.py * Update subgen.py --------- Co-authored-by: muisje <27768559+muisje@users.noreply.github.com> --- language_code.py | 14 +- subgen.py | 391 ++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 329 insertions(+), 76 deletions(-) diff --git a/language_code.py b/language_code.py index 7c25e8d..f5876fa 100644 --- a/language_code.py +++ b/language_code.py @@ -102,7 +102,8 @@ class LanguageCode(Enum): YORUBA = ("yo", "yor", "yor", "Yoruba", "Yorùbá") CHINESE = ("zh", "zho", "chi", "Chinese", "中文") CANTONESE = ("yue", "yue", "yue", "Cantonese", "粵語") - NONE = (None, None, None, None, None) # For unknown languages or no language + NONE = (None, None, None, None, None) # For no language + # und for Undetermined aka unknown language https://www.loc.gov/standards/iso639-2/faq.html#25 def __init__(self, iso_639_1, iso_639_2_t, iso_639_2_b, name_en, name_native): self.iso_639_1 = iso_639_1 @@ -155,6 +156,11 @@ def from_string(value: str): return lang return LanguageCode.NONE + # is valid language + @staticmethod + def is_valid_language(language: str): + return LanguageCode.from_string(language) is not LanguageCode.NONE + def to_iso_639_1(self): return self.iso_639_1 @@ -180,10 +186,10 @@ def __eq__(self, other): Explicitly handle comparison to None. """ if other is None: - # If compared to None, return False - # print(other) - # print(self) + # If compared to None, return False unless self is None return self.iso_639_1 is None + if isinstance(other, str): # Allow comparison with a string + return self.value == LanguageCode.from_string(other) if isinstance(other, LanguageCode): # Normal comparison for LanguageCode instances return self.iso_639_1 == other.iso_639_1 diff --git a/subgen.py b/subgen.py index 2da1a27..41dba39 100644 --- a/subgen.py +++ b/subgen.py @@ -12,7 +12,7 @@ import logging import gc import random -from typing import Union, Any +from typing import Union, Any, Optional from fastapi import FastAPI, File, UploadFile, Query, Header, Body, Form, Request from fastapi.responses import StreamingResponse import numpy as np @@ -26,9 +26,9 @@ from watchdog.observers.polling import PollingObserver as Observer from watchdog.events import FileSystemEventHandler import faster_whisper +from io import BytesIO import io - def get_key_by_value(d, value): reverse_dict = {v: k for k, v in d.items()} return reverse_dict.get(value) @@ -64,17 +64,33 @@ def convert_to_bool(in_bool): reload_script_on_change = convert_to_bool(os.getenv('RELOAD_SCRIPT_ON_CHANGE', False)) lrc_for_audio_files = convert_to_bool(os.getenv('LRC_FOR_AUDIO_FILES', True)) custom_regroup = os.getenv('CUSTOM_REGROUP', 'cm_sl=84_sl=42++++++1') -detect_language_length = os.getenv('DETECT_LANGUAGE_LENGTH', 30) +detect_language_length = int(os.getenv('DETECT_LANGUAGE_LENGTH', 30)) +detect_language_offset = int(os.getenv('DETECT_LANGUAGE_START_OFFSET', 0)) skipifexternalsub = convert_to_bool(os.getenv('SKIPIFEXTERNALSUB', False)) skip_if_to_transcribe_sub_already_exist = convert_to_bool(os.getenv('SKIP_IF_TO_TRANSCRIBE_SUB_ALREADY_EXIST', True)) -skipifinternalsublang = LanguageCode.from_iso_639_2(os.getenv('SKIPIFINTERNALSUBLANG', '')) -skip_lang_codes_list = [LanguageCode.from_iso_639_2(code) for code in os.getenv("SKIP_LANG_CODES", "").split("|")] -force_detected_language_to = LanguageCode.from_iso_639_2(os.getenv('FORCE_DETECTED_LANGUAGE_TO', '')) -preferred_audio_language = LanguageCode.from_iso_639_2(os.getenv('PREFERRED_AUDIO_LANGUAGE', 'eng')) -skip_if_audio_track_is_in_list = [LanguageCode.from_iso_639_2(code) for code in os.getenv('SKIP_IF_AUDIO_TRACK_IS', '').split("|")] -# Maybe just have skip_if_audio_track_is_in_list and skip_lang_codes_list and remove skipifinternalsublang -# TODO option which iso code to write in the subtitle file1 +skipifinternalsublang = LanguageCode.from_string(os.getenv('SKIPIFINTERNALSUBLANG', '')) +skip_lang_codes_list = ( + [LanguageCode.from_string(code) for code in os.getenv("SKIP_LANG_CODES", "").split("|")] + if os.getenv('SKIP_LANG_CODES') + else [] +) +force_detected_language_to = LanguageCode.from_string(os.getenv('FORCE_DETECTED_LANGUAGE_TO', '')) +preferred_audio_languages = ( + [LanguageCode.from_string(code) for code in os.getenv('PREFERRED_AUDIO_LANGUAGES', 'eng').split("|")] + if os.getenv('PREFERRED_AUDIO_LANGUAGES') + else [] +) # in order of preferrence +limit_to_preferred_audio_languages = convert_to_bool(os.getenv('LIMIT_TO_PREFERRED_AUDIO_LANGUAGE', False)) #TODO: add support for this +skip_if_audio_track_is_in_list = ( + [LanguageCode.from_string(code) for code in os.getenv('SKIP_IF_AUDIO_TRACK_IS', '').split("|")] + if os.getenv('SKIP_IF_AUDIO_TRACK_IS') + else [] +) subtitle_language_naming_type = os.getenv('SUBTITLE_LANGUAGE_NAMING_TYPE', 'ISO_639_2_B') +only_skip_if_subgen_subtitle = convert_to_bool(os.getenv('ONLY_SKIP_IF_SUBGEN_SUBTITLE', False)) +skip_unknown_language = convert_to_bool(os.getenv('SKIP_UNKNOWN_LANGUAGE', False)) +skip_if_language_is_not_set_but_subtitles_exist = convert_to_bool(os.getenv('SKIP_IF_LANGUAGE_IS_NOT_SET_BUT_SUBTITLES_EXIST', False)) +should_whiser_detect_audio_language = convert_to_bool(os.getenv('SHOULD_WHISPER_DETECT_AUDIO_LANGUAGE', False)) try: kwargs = ast.literal_eval(os.getenv('SUBGEN_KWARGS', '{}') or '{}') @@ -86,6 +102,19 @@ def convert_to_bool(in_bool): transcribe_device = "cuda" +VIDEO_EXTENSIONS = ( + ".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", ".mpg", ".mpeg", + ".3gp", ".ogv", ".vob", ".rm", ".rmvb", ".ts", ".m4v", ".f4v", ".svq3", + ".asf", ".m2ts", ".divx", ".xvid" +) + +AUDIO_EXTENSIONS = ( + ".mp3", ".wav", ".aac", ".flac", ".ogg", ".wma", ".alac", ".m4a", ".opus", + ".aiff", ".aif", ".pcm", ".ra", ".ram", ".mid", ".midi", ".ape", ".wv", + ".amr", ".vox", ".tak", ".spx", '.m4b' +) + + app = FastAPI() model = None @@ -99,9 +128,13 @@ def convert_to_bool(in_bool): def transcription_worker(): while True: task = task_queue.get() + + if "type" in task and task["type"] == "detect_language": + detect_language_task(task['path']) if 'Bazarr-' in task['path']: logging.info(f"Task {task['path']} is being handled by ASR.") else: + logging.info(f"Task {task['path']} is being handled by Subgen.") gen_subtitles(task['path'], task['transcribe_or_translate'], task['force_language']) task_queue.task_done() # show queue @@ -189,15 +222,6 @@ def appendLine(result): # Append the new segment to the result's segments result.segments.append(newSegment) -def has_image_extension(file_path): - valid_extensions = ['.rgb', '.gif', '.pbm', '.pgm', '.ppm', '.tiff', '.rast', '.xbm', '.jpg', '.jpeg', '.bmp', '.png', '.webp', '.exr', '.bif'] # taken from the extensions detected by the imghdr module & added Emby's '.bif' files - - if os.path.exists(file_path): - file_extension = os.path.splitext(file_path)[1].lower() - return file_extension in valid_extensions - else: - return True # return a value that causes the file to be skipped. - @app.get("/plex") @app.get("/webhook") @app.get("/jellyfin") @@ -345,7 +369,7 @@ async def asr( random_name = ''.join(random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6)) if force_detected_language_to: - language = force_detected_language_to + language = force_detected_language_to.to_iso_639_1() logging.info(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}") start_time = time.time() @@ -356,12 +380,14 @@ async def asr( args = {} args['progress_callback'] = progress + + file_content = audio_file.file.read() - if not encode: - args['audio'] = np.frombuffer(audio_file.file.read(), np.int16).flatten().astype(np.float32) / 32768.0 - args['input_sr'] = 16000 + if encode: + args['audio'] = file_content else: - args['audio'] = audio_file.file.read() + args['audio'] = np.frombuffer(file_content, np.int16).flatten().astype(np.float32) / 32768.0 + args['input_sr'] = 16000 if custom_regroup: args['regroup'] = custom_regroup @@ -402,19 +428,35 @@ async def asr( @app.post("/detect-language") async def detect_language( audio_file: UploadFile = File(...), - #encode: bool = Query(default=True, description="Encode audio first through ffmpeg") # This is always false from Bazarr - detect_lang_length: int = Query(default=30, description="Detect language on the first X seconds of the file") + encode: bool = Query(default=True, description="Encode audio first through ffmpeg"), # This is always false from Bazarr + detect_lang_length: int = Query(default=detect_language_length, description="Detect language on X seconds of the file"), + detect_lang_offset: int = Query(default=detect_language_offset, description="Start Detect language X seconds into the file") ): + + if force_detected_language_to: + logging.info(f"language is: {force_detected_language_to.to_name()}") + logging.debug(f"Skipping detect language, we have forced it as {force_detected_language_to.to_name()}") + return { + "detected_language": force_detected_language_to.to_name(), + "language_code": force_detected_language_to.to_iso_639_1() + } + + global detect_language_length, detect_language_offset detected_language = LanguageCode.NONE language_code = 'und' if force_detected_language_to: logging.info(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}\n Returning without detection") return {"detected_language": force_detected_language_to.to_name(), "language_code": force_detected_language_to.to_iso_639_1()} - if int(detect_lang_length) != 30: - global detect_language_length + + # Log custom detection time settings if modified + if detect_lang_length != detect_language_length: + logging.info(f"Detecting language on the first {detect_lang_length} seconds of the audio.") detect_language_length = detect_lang_length - if int(detect_language_length) != 30: - logging.info(f"Detect language is set to detect on the first {detect_language_length} seconds of the audio.") + + if detect_lang_offset != detect_language_offset: + logging.info(f"Offsetting language detection by {detect_language_offset} seconds.") + detect_language_offset = detect_lang_offset + #audio_file = extract_audio_segment_to_memory(audio_file, detect_language_offset, detect_language_length) try: start_model() random_name = ''.join(random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6)) @@ -423,10 +465,17 @@ async def detect_language( task_queue.put(task_id) args = {} #sample_rate = next(stream.rate for stream in av.open(audio_file.file).streams if stream.type == 'audio') + #logging.info(f"Sample rate is: {sample_rate}") audio_file.file.seek(0) args['progress_callback'] = progress - args['input_sr'] = 16000 - args['audio'] = whisper.pad_or_trim(np.frombuffer(audio_file.file.read(), np.int16).flatten().astype(np.float32) / 32768.0, args['input_sr'] * int(detect_language_length)) + + if encode: + args['audio'] = extract_audio_segment_to_memory(audio_file, detect_language_offset, detect_language_length).read() + args['input_sr'] = 16000 + else: + #args['audio'] = whisper.pad_or_trim(np.frombuffer(audio_file.file.read(), np.int16).flatten().astype(np.float32) / 32768.0, args['input_sr'] * int(detect_language_length)) + args['audio'] = await get_audio_chunk(audio_file, detect_lang_offset, detect_lang_length) + args['input_sr'] = 16000 args.update(kwargs) detected_language = LanguageCode.from_name(model.transcribe_stable(**args).language) @@ -439,12 +488,128 @@ async def detect_language( logging.info(f"Error processing or transcribing Bazarr {audio_file.filename}: {e}") finally: - await audio_file.close() + #await audio_file.close() task_queue.task_done() delete_model() return {"detected_language": detected_language.to_name(), "language_code": language_code} +async def get_audio_chunk(audio_file, offset=detect_language_offset, length=detect_language_length, sample_rate=16000, audio_format=np.int16): + """ + Extract a chunk of audio from a file, starting at the given offset and of the given length. + + :param audio_file: The audio file (UploadFile or file-like object). + :param offset: The offset in seconds to start the extraction. + :param length: The length in seconds for the chunk to be extracted. + :param sample_rate: The sample rate of the audio (default 16000). + :param audio_format: The audio format to interpret (default int16, 2 bytes per sample). + + :return: A numpy array containing the extracted audio chunk. + """ + + # Number of bytes per sample (for int16, 2 bytes per sample) + bytes_per_sample = np.dtype(audio_format).itemsize + + # Calculate the start byte based on offset and sample rate + start_byte = offset * sample_rate * bytes_per_sample + + # Calculate the length in bytes based on the length in seconds + length_in_bytes = length * sample_rate * bytes_per_sample + + # Seek to the start position (this assumes the audio_file is a file-like object) + await audio_file.seek(start_byte) + + # Read the required chunk of audio (length_in_bytes) + chunk = await audio_file.read(length_in_bytes) + + # Convert the chunk into a numpy array (normalized to float32) + audio_data = np.frombuffer(chunk, dtype=audio_format).flatten().astype(np.float32) / 32768.0 + + return audio_data + +def detect_language_task(path): + detected_language = LanguageCode.NONE + language_code = 'und' + global detect_language_length + + logger.info(f"Detecting language of file: {path} on the first {detect_language_length} seconds of the file") + + try: + start_model() + + audio_segment = extract_audio_segment_to_memory(path, detect_language_offset, int(detect_language_length)).read() + + + detected_language = LanguageCode.from_name(model.transcribe_stable(audio_segment).language) + logging.debug(f"Detected language: {detected_language.to_name()}") + # reverse lookup of language -> code, ex: "english" -> "en", "nynorsk" -> "nn", ... + language_code = detected_language.to_iso_639_1() + logging.debug(f"Language Code: {language_code}") + + except Exception as e: + logging.info(f"Error detectign language of file with whisper: {e}") + + finally: + task_queue.task_done() + delete_model() + # put task to transcribe this with the detected language + task_id = { 'path': path, "transcribe_or_translate": transcribe_or_translate, 'force_language': detected_language } + task_queue.put(task_id) + + #maybe modify the file to contain detected language so we won't trigger this again + + return + +def extract_audio_segment_to_memory(input_file, start_time, duration): + """ + Extract a segment of audio from input_file, starting at start_time for duration seconds. + + :param input_file: UploadFile object or path to the input audio file + :param start_time: Start time in seconds (e.g., 60 for 1 minute) + :param duration: Duration in seconds (e.g., 30 for 30 seconds) + :return: BytesIO object containing the audio segment + """ + try: + if hasattr(input_file, 'file') and hasattr(input_file.file, 'read'): # Handling UploadFile + input_file.file.seek(0) # Ensure the file pointer is at the beginning + input_stream = 'pipe:0' + input_kwargs = {'input': input_file.file.read()} + elif isinstance(input_file, str): # Handling local file path + input_stream = input_file + input_kwargs = {} + else: + raise ValueError("Invalid input: input_file must be a file path or an UploadFile object.") + + logging.info(f"Extracting audio from: {input_stream}, start_time: {start_time}, duration: {duration}") + + # Run FFmpeg to extract the desired segment + out, _ = ( + ffmpeg + .input(input_stream, ss=start_time, t=duration) # Set start time and duration + .output('pipe:1', format='wav', acodec='pcm_s16le', ar=16000) # Output to pipe as WAV + .run(capture_stdout=True, capture_stderr=True, **input_kwargs) + ) + + # Check if the output is empty or null + if not out: + raise ValueError("FFmpeg output is empty, possibly due to invalid input.") + + return io.BytesIO(out) # Convert output to BytesIO for in-memory processing + + except ffmpeg.Error as e: + logging.error(f"FFmpeg error: {e.stderr.decode()}") + return None + except Exception as e: + logging.error(f"Error: {str(e)}") + return None + + except ffmpeg.Error as e: + logging.error(f"FFmpeg error: {e.stderr.decode()}") + return None + except Exception as e: + logging.error(f"Error: {str(e)}") + return None + def start_model(): global model if model is None: @@ -460,7 +625,7 @@ def delete_model(): def isAudioFileExtension(file_extension): return file_extension.casefold() in \ - [ '.mp3', '.flac', '.wav', '.alac', '.ape', '.ogg', '.wma', '.m4a', '.m4b', '.aac', '.aiff' ] + AUDIO_EXTENSIONS def write_lrc(result, file_path): with open(file_path, "w") as file: @@ -469,7 +634,7 @@ def write_lrc(result, file_path): fraction = int((segment.start - int(segment.start)) * 100) file.write(f"[{minutes:02d}:{seconds:02d}.{fraction:02d}] {segment.text}\n") -def gen_subtitles(file_path: str, transcription_type: str, force_language : LanguageCode | None = None) -> None: +def gen_subtitles(file_path: str, transcription_type: str, force_language : LanguageCode = LanguageCode.NONE) -> None: """Generates subtitles for a video file. Args: @@ -512,6 +677,8 @@ def gen_subtitles(file_path: str, transcription_type: str, force_language : Lang if is_audio_file and lrc_for_audio_files: write_lrc(result, file_name + '.lrc') else: + if not force_language: + force_language = LanguageCode.from_string(result.language) result.to_srt_vtt(name_subtitle(file_path, force_language), word_level=word_level_highlight) elapsed_time = time.time() - start_time @@ -520,7 +687,7 @@ def gen_subtitles(file_path: str, transcription_type: str, force_language : Lang f"Transcription of {os.path.basename(file_path)} is completed, it took {minutes} minutes and {seconds} seconds to complete.") except Exception as e: - logging.info(f"Error processing or transcribing {file_path}: {e}") + logging.info(f"Error processing or transcribing {file_path} in {force_language}: {e}") finally: delete_model() @@ -560,7 +727,7 @@ def name_subtitle(file_path: str, language: LanguageCode) -> str: """ return f"{os.path.splitext(file_path)[0]}.subgen.{whisper_model.split('.')[0]}.{define_subtitle_language_naming(language, subtitle_language_naming_type)}.srt" -def handle_multiple_audio_tracks(file_path: str, language: LanguageCode | None = None) -> io.BytesIO | None: +def handle_multiple_audio_tracks(file_path: str, language: LanguageCode | None = None) -> BytesIO | None: """ Handles the possibility of a media file having multiple audio tracks. @@ -594,7 +761,7 @@ def handle_multiple_audio_tracks(file_path: str, language: LanguageCode | None = return None return audio_bytes -def extract_audio_track_to_memory(input_video_path, track_index) -> io.BytesIO | None: +def extract_audio_track_to_memory(input_video_path, track_index) -> BytesIO | None: """ Extract a specific audio track from a video file to memory using FFmpeg. @@ -624,7 +791,7 @@ def extract_audio_track_to_memory(input_video_path, track_index) -> io.BytesIO | .run(capture_stdout=True, capture_stderr=True) # Capture output in memory ) # Return the audio data as a BytesIO object - return io.BytesIO(out) + return BytesIO(out) except ffmpeg.Error as e: print("An error occurred:", e.stderr.decode()) @@ -663,23 +830,31 @@ def choose_transcribe_language(file_path, forced_language): determined. """ - # todo handle iso 2/3 + logger.debug(f"choose_transcribe_language({file_path}, {forced_language})") + if forced_language: + logger.debug(f"ENV FORCE_LANGUAGE is set: Forcing language to {forced_language}") return forced_language if force_detected_language_to: + logger.debug(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}") return force_detected_language_to audio_tracks = get_audio_tracks(file_path) - if has_language_audio_track(audio_tracks, preferred_audio_language): - language = preferred_audio_language + + found_track_in_language = find_language_audio_track(audio_tracks, preferred_audio_languages) + if found_track_in_language: + language = found_track_in_language if language: + logger.debug(f"Preferred language found: {language}") return language + default_language = find_default_audio_track_language(audio_tracks) if default_language: + logger.debug(f"Default language found: {default_language}") return default_language - return None + return LanguageCode.NONE def get_audio_tracks(video_file): @@ -758,22 +933,23 @@ def get_audio_tracks(video_file): logging.error(f"An error occurred while reading audio track information: {str(e)}") return [] -def has_language_audio_track(audio_tracks, find_language): +def find_language_audio_track(audio_tracks, find_languages): """ - Checks if an audio track with the given language is present in the list of audio tracks. + Checks if an audio track with any of the given languages is present in the list of audio tracks. + Returns the first language from `find_languages` that matches. Args: audio_tracks (list): A list of dictionaries containing information about each audio track. - find_language (str): The ISO 639-2 code of the language to search for. + find_languages (list): A list language codes to search for. Returns: - bool: True if an audio track with the given language was found, False otherwise. + str or None: The first language found from `find_languages`, or None if no match is found. """ - for track in audio_tracks: - if track['language'] == find_language: #ISO 639-2 - return True - return False - + for language in find_languages: + for track in audio_tracks: + if track['language'] == language: + return language + return None def find_default_audio_track_language(audio_tracks): """ Finds the language of the default audio track in the given list of audio tracks. @@ -791,7 +967,7 @@ def find_default_audio_track_language(audio_tracks): return None -def gen_subtitles_queue(file_path: str, transcription_type: str, force_language: LanguageCode | None = None) -> None: +def gen_subtitles_queue(file_path: str, transcription_type: str, force_language: LanguageCode = LanguageCode.NONE) -> None: global task_queue if not has_audio(file_path): @@ -800,7 +976,17 @@ def gen_subtitles_queue(file_path: str, transcription_type: str, force_language: force_language = choose_transcribe_language(file_path, force_language) + # check if we would like to detect audio language in case of no audio language specified. Will return here again with specified language from whisper + if not force_language and should_whiser_detect_audio_language: + # make a detect language task + task_id = { 'path': file_path, 'type': "detect_language" } + task_queue.put(task_id) + logging.info(f"task_queue.put(task_id)({file_path}, detect_language)") + return + + if have_to_skip(file_path, force_language): + logging.debug(f"{file_path} already has subtitles in {force_language}, skipping.") return task = { @@ -823,6 +1009,10 @@ def have_to_skip(file_path: str, transcribe_language: LanguageCode) -> bool: Returns: True if subtitle generation should be skipped; otherwise, False. """ + if skip_unknown_language and transcribe_language == LanguageCode.NONE: + logging.debug(f"{file_path} has unknown language, skipping.") + return True + # Check if subtitles in the desired transcription language already exist if skip_if_to_transcribe_sub_already_exist and has_subtitle_language(file_path, transcribe_language): logging.debug(f"{file_path} already has subtitles in {transcribe_language}, skipping.") @@ -834,9 +1024,11 @@ def have_to_skip(file_path: str, transcribe_language: LanguageCode) -> bool: return True # Check if external subtitles exist for the specified language - if skipifexternalsub and has_subtitle_language(file_path, LanguageCode.from_string(namesublang)): - logging.debug(f"{file_path} has external subtitles in {namesublang}, skipping.") - return True + # Probably not use LanguageCode for this, but just check with strings, to be able to skip with custom named languages. + if LanguageCode.is_valid_language(namesublang): + if skipifexternalsub and has_subtitle_language(file_path, LanguageCode.from_string(namesublang)): + logging.debug(f"{file_path} has external subtitles in {namesublang}, skipping.") + return True # Skip if any language in the skip list is detected in existing subtitles existing_sub_langs = get_subtitle_languages(file_path) @@ -844,11 +1036,17 @@ def have_to_skip(file_path: str, transcribe_language: LanguageCode) -> bool: logging.debug(f"Languages in skip list {skip_lang_codes_list} detected in {file_path}, skipping.") return True - # Skip if any language in the audio track skip list is detected audio_langs = get_audio_languages(file_path) - if any(lang in skip_if_audio_track_is_in_list for lang in audio_langs): - logging.debug(f"Audio language in skip list {skip_if_audio_track_is_in_list} detected in {file_path}, skipping.") - return True + if preferred_audio_languages in audio_langs: + logging.debug(f"Preferred audio language {preferred_audio_languages} detected in {file_path}.") + # maybe not skip if subtitle exist in preferred audio language, but not in another preferred audio language if the file has multiple audio tracks matching the preferred audio languages + else: + if limit_to_preferred_audio_languages: + logging.debug(f"Only non-preferred audio language detected in {file_path}, skipping.") + return True + if any(lang in skip_if_audio_track_is_in_list for lang in audio_langs): + logging.debug(f"Audio language in skip list {skip_if_audio_track_is_in_list} detected in {file_path}, skipping.") + return True # If none of the conditions matched, do not skip return False @@ -903,9 +1101,6 @@ def has_subtitle_language(video_file, target_language: LanguageCode): Returns: bool: True if a subtitle file with the target language is found, False otherwise. """ - logging.debug(f"has_subtitle_language({video_file}, {target_language})") - if target_language == LanguageCode.NONE: - return False return has_subtitle_language_in_file(video_file, target_language) or has_subtitle_of_language_in_folder(video_file, target_language) def has_subtitle_language_in_file(video_file, target_language: LanguageCode): @@ -919,21 +1114,25 @@ def has_subtitle_language_in_file(video_file, target_language: LanguageCode): Returns: bool: True if a subtitle file with the target language is found, False otherwise. """ - logging.debug(f"has_subtitle_language_in_file({video_file}, {target_language})") - if target_language == LanguageCode.NONE: + # logging.debug(f"has_subtitle_language_in_file({video_file}, {target_language})") + if (target_language == LanguageCode.NONE and not skip_if_language_is_not_set_but_subtitles_exist) or only_skip_if_subgen_subtitle: # skip if language is not set or we are only interested in subgen subtitles which are not internal, only external return False try: with av.open(video_file) as container: - subtitle_stream = next((stream for stream in container.streams if stream.type == 'subtitle' and 'language' in stream.metadata and LanguageCode.from_string(stream.metadata['language']) == target_language), None) + subtitle_streams = (stream for stream in container.streams if stream.type == 'subtitle' and 'language' in stream.metadata) + + if skip_if_language_is_not_set_but_subtitles_exist and target_language == LanguageCode.NONE and any(subtitle_streams): + logging.debug("Language is not set but internal subtitles exist.") + return True - if subtitle_stream: + if next(stream for stream in subtitle_streams if LanguageCode.from_string(stream.metadata['language']) == target_language): logging.debug(f"Subtitles in '{target_language}' language found in the video.") return True else: logging.debug(f"No subtitles in '{target_language}' language found in the video.") return False except Exception as e: - logging.info(f"An error occurred: {e}") + logging.error(f"An error occurred while checking the file with pyav: {e}") # TODO: figure out why this throws (empty) errors return False def has_subtitle_of_language_in_folder(video_file, target_language: LanguageCode, recursion = True): @@ -961,9 +1160,32 @@ def has_subtitle_of_language_in_folder(video_file, target_language: LanguageCode root, ext = os.path.splitext(file_name) if root.startswith(video_file_stripped) and ext.lower() in subtitle_extensions: parts = root[len(video_file_stripped):].lstrip(".").split(".") - # Check if the target language is one of the parts + + has_subgen = "subgen" in parts # Checks if "subgen" is in parts + + #checking this first because e.g LanguageCode.from_string("subgen") == LanguageCode.NONE is equal to True. Maybe handle this better with a check with a function like is language code. To check if part is a valid language before comparing it to target_language + + if target_language == LanguageCode.NONE: + if only_skip_if_subgen_subtitle: + if has_subgen: + logger.debug("Subtitles from subgen found in the folder. ") + return skip_if_language_is_not_set_but_subtitles_exist + else: + #might be other subtitles that have subgen in the name + continue + logger.debug("Subtitles exist in the folder. and only_skip_if_subgen_subtitle is False.") + return skip_if_language_is_not_set_but_subtitles_exist + if any(LanguageCode.from_string(part) == target_language for part in parts): - # If the language is found, return True + # If the subtitle is found, return True + if only_skip_if_subgen_subtitle: + if has_subgen: + logger.debug(f"Subtitles from subgen in '{target_language}' language found in the folder.") + return True + else: + #might be other subtitles that have subgen in the name + continue + logger.debug(f"Subtitles in '{target_language}' language found in the folder.") return True elif os.path.isdir(file_path) and recursion: # Looking in the subfolders of the video for subtitles @@ -1105,8 +1327,11 @@ def get_jellyfin_admin(users): def has_audio(file_path): try: - if has_image_extension(file_path): - logging.debug(f"{file_path} is an image or is an invalid file or path (are your volumes correct?), skipping processing") + if not is_valid_path(file_path): + return False + + if not (has_video_extension(file_path) or has_audio_extension(file_path)): + # logging.debug(f"{file_path} is an not a video or audio file, skipping processing. skipping processing") return False with av.open(file_path) as container: @@ -1124,6 +1349,28 @@ def has_audio(file_path): logging.debug(f"Error processing file {file_path}") return False +def is_valid_path(file_path): + # Check if the path is a file + if not os.path.isfile(file_path): + # If it's not a file, check if it's a directory + if not os.path.isdir(file_path): + logging.warning(f"{file_path} is neither a file nor a directory. Are your volumes correct?") + return False + else: + logging.debug(f"{file_path} is a directory, skipping processing as a file.") + return False + else: + return True + +def has_video_extension(file_name): + file_extension = os.path.splitext(file_name)[1].lower() # Get the file extension + return file_extension in VIDEO_EXTENSIONS + +def has_audio_extension(file_name): + file_extension = os.path.splitext(file_name)[1].lower() # Get the file extension + return file_extension in AUDIO_EXTENSIONS + + def path_mapping(fullpath): if use_path_mapping: logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to))