From ded37087072b945d7c3c6a20c7870847b4456d6e Mon Sep 17 00:00:00 2001 From: Breakthrough Date: Sat, 28 Sep 2024 17:55:41 -0400 Subject: [PATCH] [cli] Centralize preconditions in CliContext --- scenedetect/_cli/__init__.py | 109 +++++++++++------- scenedetect/_cli/commands.py | 93 ++++++++------- scenedetect/_cli/config.py | 3 - scenedetect/_cli/context.py | 202 +++++++++------------------------ scenedetect/_cli/controller.py | 22 +--- scenedetect/scene_manager.py | 54 +++++---- 6 files changed, 203 insertions(+), 280 deletions(-) diff --git a/scenedetect/_cli/__init__.py b/scenedetect/_cli/__init__.py index 86767dcb..ac7e0976 100644 --- a/scenedetect/_cli/__init__.py +++ b/scenedetect/_cli/__init__.py @@ -20,6 +20,8 @@ import inspect import logging +import os +import os.path import typing as ty import click @@ -32,10 +34,9 @@ CONFIG_MAP, DEFAULT_JPG_QUALITY, DEFAULT_WEBP_QUALITY, - USER_CONFIG, TimecodeFormat, ) -from scenedetect._cli.context import CliContext, check_split_video_requirements +from scenedetect._cli.context import USER_CONFIG, CliContext, check_split_video_requirements from scenedetect.backends import AVAILABLE_BACKENDS from scenedetect.detectors import ( AdaptiveDetector, @@ -315,8 +316,10 @@ def scenedetect( Global options (e.g. -i/--input, -c/--config) must be specified before any commands and their options. The order of commands is not strict, but each command must only be specified once. """ - assert isinstance(ctx.obj, CliContext) - ctx.obj.handle_options( + ctx = ctx.obj + assert isinstance(ctx, CliContext) + + ctx.handle_options( input_path=input, output=output, framerate=framerate, @@ -344,7 +347,6 @@ def scenedetect( @click.pass_context def help_command(ctx: click.Context, command_name: str): """Print help for command (`help [command]`).""" - assert isinstance(ctx.obj, CliContext) assert isinstance(ctx.parent.command, click.MultiCommand) parent_command = ctx.parent.command all_commands = set(parent_command.list_commands(ctx)) @@ -368,7 +370,6 @@ def help_command(ctx: click.Context, command_name: str): @click.pass_context def about_command(ctx: click.Context): """Print license/copyright info.""" - assert isinstance(ctx.obj, CliContext) click.echo("") click.echo(click.style(_LINE_SEPARATOR, fg="cyan")) click.echo(click.style(" About PySceneDetect %s" % _PROGRAM_VERSION, fg="yellow")) @@ -381,7 +382,6 @@ def about_command(ctx: click.Context): @click.pass_context def version_command(ctx: click.Context): """Print PySceneDetect version.""" - assert isinstance(ctx.obj, CliContext) click.echo("") click.echo(get_system_version_info()) ctx.exit() @@ -431,12 +431,23 @@ def time_command( {scenedetect_with_video} time --start 0 --end 1000 """ - assert isinstance(ctx.obj, CliContext) - ctx.obj.handle_time( - start=start, - duration=duration, - end=end, - ) + ctx = ctx.obj + assert isinstance(ctx, CliContext) + + if duration is not None and end is not None: + raise click.BadParameter( + "Only one of --duration/-d or --end/-e can be specified, not both.", + param_hint="time", + ) + logger.debug("Setting video time:\n start: %s, duration: %s, end: %s", start, duration, end) + # *NOTE*: The Python API uses 0-based frame indices, but the CLI uses 1-based indices to + # match the default start number used by `ffmpeg` when saving frames as images. As such, + # we must correct start time if set as frames. See the test_cli_time* tests for for details. + ctx.start_time = ctx.parse_timecode(start, correct_pts=True) + ctx.end_time = ctx.parse_timecode(end) + ctx.duration = ctx.parse_timecode(duration) + if ctx.start_time and ctx.end_time and (ctx.start_time + 1) > ctx.end_time: + raise click.BadParameter("-e/--end time must be greater than -s/--start") @click.command("detect-content", cls=_Command) @@ -535,8 +546,9 @@ def detect_content_command( {scenedetect_with_video} detect-content --threshold 27.5 """ - assert isinstance(ctx.obj, CliContext) - detector_args = ctx.obj.get_detect_content_params( + ctx = ctx.obj + assert isinstance(ctx, CliContext) + detector_args = ctx.get_detect_content_params( threshold=threshold, luma_only=luma_only, min_scene_len=min_scene_len, @@ -544,8 +556,7 @@ def detect_content_command( kernel_size=kernel_size, filter_mode=filter_mode, ) - logger.debug("Adding detector: ContentDetector(%s)", detector_args) - ctx.obj.add_detector(ContentDetector(**detector_args)) + ctx.add_detector(ContentDetector, detector_args) @click.command("detect-adaptive", cls=_Command) @@ -646,8 +657,9 @@ def detect_adaptive_command( {scenedetect_with_video} detect-adaptive --threshold 3.2 """ - assert isinstance(ctx.obj, CliContext) - detector_args = ctx.obj.get_detect_adaptive_params( + ctx = ctx.obj + assert isinstance(ctx, CliContext) + detector_args = ctx.get_detect_adaptive_params( threshold=threshold, min_content_val=min_content_val, min_delta_hsv=min_delta_hsv, @@ -657,8 +669,7 @@ def detect_adaptive_command( weights=weights, kernel_size=kernel_size, ) - logger.debug("Adding detector: AdaptiveDetector(%s)", detector_args) - ctx.obj.add_detector(AdaptiveDetector(**detector_args)) + ctx.add_detector(AdaptiveDetector, detector_args) @click.command("detect-threshold", cls=_Command) @@ -725,15 +736,15 @@ def detect_threshold_command( {scenedetect_with_video} detect-threshold --threshold 15 """ - assert isinstance(ctx.obj, CliContext) - detector_args = ctx.obj.get_detect_threshold_params( + ctx = ctx.obj + assert isinstance(ctx, CliContext) + detector_args = ctx.get_detect_threshold_params( threshold=threshold, fade_bias=fade_bias, add_last_scene=add_last_scene, min_scene_len=min_scene_len, ) - logger.debug("Adding detector: ThresholdDetector(%s)", detector_args) - ctx.obj.add_detector(ThresholdDetector(**detector_args)) + ctx.add_detector(ThresholdDetector, detector_args) @click.command("detect-hist", cls=_Command) @@ -795,14 +806,12 @@ def detect_hist_command( {scenedetect_with_video} detect-hist --threshold 0.1 --bins 240 """ - assert isinstance(ctx.obj, CliContext) - - assert isinstance(ctx.obj, CliContext) - detector_args = ctx.obj.get_detect_hist_params( + ctx = ctx.obj + assert isinstance(ctx, CliContext) + detector_args = ctx.get_detect_hist_params( threshold=threshold, bins=bins, min_scene_len=min_scene_len ) - logger.debug("Adding detector: HistogramDetector(%s)", detector_args) - ctx.obj.add_detector(HistogramDetector(**detector_args)) + ctx.add_detector(HistogramDetector, detector_args) @click.command("detect-hash", cls=_Command) @@ -880,14 +889,12 @@ def detect_hash_command( {scenedetect_with_video} detect-hash --size 32 --lowpass 3 """ - assert isinstance(ctx.obj, CliContext) - - assert isinstance(ctx.obj, CliContext) - detector_args = ctx.obj.get_detect_hash_params( + ctx = ctx.obj + assert isinstance(ctx, CliContext) + detector_args = ctx.get_detect_hash_params( threshold=threshold, size=size, lowpass=lowpass, min_scene_len=min_scene_len ) - logger.debug("Adding detector: HashDetector(%s)", detector_args) - ctx.obj.add_detector(HashDetector(**detector_args)) + ctx.add_detector(HashDetector, detector_args) @click.command("load-scenes", cls=_Command) @@ -921,9 +928,23 @@ def load_scenes_command( {scenedetect_with_video} load-scenes -i scenes.csv --start-col-name "Start Timecode" """ - assert isinstance(ctx.obj, CliContext) - logger.debug("Loading scenes from %s (start_col_name = %s)", input, start_col_name) - ctx.obj.handle_load_scenes(input=input, start_col_name=start_col_name) + ctx = ctx.obj + assert isinstance(ctx, CliContext) + + logger.debug("Will load scenes from %s (start_col_name = %s)", input, start_col_name) + if ctx.scene_manager.get_num_detectors() > 0: + raise click.ClickException("The load-scenes command cannot be used with detectors.") + if ctx.load_scenes_input: + raise click.ClickException("The load-scenes command must only be specified once.") + input = os.path.abspath(input) + if not os.path.exists(input): + raise click.BadParameter( + f"Could not load scenes, file does not exist: {input}", param_hint="-i/--input" + ) + ctx.load_scenes_input = input + ctx.load_scenes_column_name = ctx.config.get_value( + "load-scenes", "start-col-name", start_col_name + ) @click.command("export-html", cls=_Command) @@ -970,7 +991,7 @@ def export_html_command( """Export scene list to HTML file. Requires save-images unless --no-images is specified.""" ctx = ctx.obj assert isinstance(ctx, CliContext) - ctx.ensure_input_open() + no_images = no_images or ctx.config.get_value("export-html", "no-images") if not ctx.save_images and not no_images: raise click.BadArgumentUsage( @@ -1037,7 +1058,7 @@ def list_scenes_command( """Create scene list CSV file (will be named $VIDEO_NAME-Scenes.csv by default).""" ctx = ctx.obj assert isinstance(ctx, CliContext) - ctx.ensure_input_open() + no_output_file = no_output_file or ctx.config.get_value("list-scenes", "no-output-file") scene_list_dir = ctx.config.get_value("list-scenes", "output", output, ignore_default=True) scene_list_name_format = ctx.config.get_value("list-scenes", "filename", filename) @@ -1162,7 +1183,7 @@ def split_video_command( """ ctx = ctx.obj assert isinstance(ctx, CliContext) - ctx.ensure_input_open() + check_split_video_requirements(use_mkvmerge=mkvmerge) if "%" in ctx.video_stream.path or "://" in ctx.video_stream.path: error = "The split-video command is incompatible with image sequences/URLs." @@ -1362,7 +1383,7 @@ def save_images_command( """ ctx = ctx.obj assert isinstance(ctx, CliContext) - ctx.ensure_input_open() + if "://" in ctx.video_stream.path: error_str = "\nThe save-images command is incompatible with URLs." logger.error(error_str) diff --git a/scenedetect/_cli/commands.py b/scenedetect/_cli/commands.py index 077ac00e..83a23bf2 100644 --- a/scenedetect/_cli/commands.py +++ b/scenedetect/_cli/commands.py @@ -9,23 +9,59 @@ # PySceneDetect is licensed under the BSD 3-Clause License; see the # included LICENSE file, or visit one of the above pages for details. # -"""Logic for the PySceneDetect command.""" +"""Logic for PySceneDetect commands that operate on the result of the processing pipeline. + +In addition to the the arguments registered with the command, commands will be called with the +current command-line context, as well as the processing result (scenes and cuts). +""" import logging import typing as ty from string import Template -import scenedetect.scene_manager as scene_manager from scenedetect._cli.context import CliContext -from scenedetect.frame_timecode import FrameTimecode from scenedetect.platform import get_and_create_path +from scenedetect.scene_manager import ( + CutList, + Interpolation, + SceneList, + write_scene_list, + write_scene_list_html, +) +from scenedetect.scene_manager import ( + save_images as save_images_impl, +) from scenedetect.video_splitter import split_video_ffmpeg, split_video_mkvmerge logger = logging.getLogger("pyscenedetect") -SceneList = ty.List[ty.Tuple[FrameTimecode, FrameTimecode]] -CutList = ty.List[FrameTimecode] +def export_html( + context: CliContext, + scenes: SceneList, + cuts: CutList, + image_width: int, + image_height: int, + html_name_format: str, +): + """Handles the `export-html` command.""" + (image_filenames, output_dir) = ( + context.save_images_result + if context.save_images_result is not None + else (None, context.output_dir) + ) + html_filename = Template(html_name_format).safe_substitute(VIDEO_NAME=context.video_stream.name) + if not html_filename.lower().endswith(".html"): + html_filename += ".html" + html_path = get_and_create_path(html_filename, output_dir) + write_scene_list_html( + output_html_filename=html_path, + scene_list=scenes, + cut_list=cuts, + image_filenames=image_filenames, + image_width=image_width, + image_height=image_height, + ) def list_scenes( @@ -40,7 +76,6 @@ def list_scenes( display_scenes: bool, display_cuts: bool, cut_format: str, - **kwargs, ): """Handles the `list-scenes` command.""" # Write scene list CSV to if required. @@ -56,7 +91,7 @@ def list_scenes( ) logger.info("Writing scene list to CSV file:\n %s", scene_list_path) with open(scene_list_path, "w") as scene_list_file: - scene_manager.write_scene_list( + write_scene_list( output_csv_file=scene_list_file, scene_list=scenes, include_cut_list=not skip_cuts, @@ -99,6 +134,7 @@ def list_scenes( def save_images( context: CliContext, scenes: SceneList, + cuts: CutList, num_images: int, frame_margin: int, image_extension: str, @@ -109,13 +145,12 @@ def save_images( scale: int, height: int, width: int, - interpolation: scene_manager.Interpolation, - **kwargs, + interpolation: Interpolation, ): """Handles the `save-images` command.""" - logger.info(f"Saving images to {output_dir} with format {image_extension}") - logger.debug(f"encoder param: {encoder_param}") - images = scene_manager.save_images( + del cuts # save-images only uses scenes. + + images = save_images_impl( scene_list=scenes, video=context.video_stream, num_images=num_images, @@ -130,49 +165,23 @@ def save_images( width=width, interpolation=interpolation, ) + # Save the result for use by `export-html` if required. context.save_images_result = (images, output_dir) -def export_html( - context: CliContext, - scenes: SceneList, - cuts: CutList, - image_width: int, - image_height: int, - html_name_format: str, - **kwargs, -): - """Handles the `export-html` command.""" - save_images_result = context.save_images_result - # Command can override global output directory setting. - output_dir = save_images_result[1] if save_images_result[1] is not None else context.output_dir - html_filename = Template(html_name_format).safe_substitute(VIDEO_NAME=context.video_stream.name) - - if not html_filename.lower().endswith(".html"): - html_filename += ".html" - html_path = get_and_create_path(html_filename, output_dir) - logger.info("Exporting to html file:\n %s:", html_path) - scene_manager.write_scene_list_html( - output_html_filename=html_path, - scene_list=scenes, - cut_list=cuts, - image_filenames=save_images_result[0], - image_width=image_width, - image_height=image_height, - ) - - def split_video( context: CliContext, scenes: SceneList, + cuts: CutList, name_format: str, use_mkvmerge: bool, output_dir: str, show_output: bool, ffmpeg_args: str, - **kwargs, ): """Handles the `split-video` command.""" + del cuts # split-video only uses scenes. + # Add proper extension to filename template if required. dot_pos = name_format.rfind(".") extension_length = 0 if dot_pos < 0 else len(name_format) - (dot_pos + 1) diff --git a/scenedetect/_cli/config.py b/scenedetect/_cli/config.py index b8aa733d..5e03e5d2 100644 --- a/scenedetect/_cli/config.py +++ b/scenedetect/_cli/config.py @@ -621,6 +621,3 @@ def get_help_string( ): return "" return " [default: %s]" % (str(CONFIG_MAP[command][option])) - - -USER_CONFIG = ConfigRegistry(throw_exception=False) diff --git a/scenedetect/_cli/context.py b/scenedetect/_cli/context.py index a9e02a46..551eee21 100644 --- a/scenedetect/_cli/context.py +++ b/scenedetect/_cli/context.py @@ -12,7 +12,6 @@ """Context of which command-line options and config settings the user provided.""" import logging -import os import typing as ty import click @@ -42,34 +41,7 @@ logger = logging.getLogger("pyscenedetect") USER_CONFIG = ConfigRegistry(throw_exception=False) - -SceneList = ty.List[ty.Tuple[FrameTimecode, FrameTimecode]] - -CutList = ty.List[FrameTimecode] - - -def parse_timecode( - value: ty.Optional[str], frame_rate: float, correct_pts: bool = False -) -> FrameTimecode: - """Parses a user input string into a FrameTimecode assuming the given framerate. - - If value is None, None will be returned instead of processing the value. - - Raises: - click.BadParameter - """ - if value is None: - return None - try: - if correct_pts and value.isdigit(): - value = int(value) - if value >= 1: - value -= 1 - return FrameTimecode(timecode=value, fps=frame_rate) - except ValueError as ex: - raise click.BadParameter( - "timecode must be in seconds (100.0), frames (100), or HH:MM:SS" - ) from ex +"""The user config, which can be overriden by command-line. If not found, will be default config.""" def check_split_video_requirements(use_mkvmerge: bool) -> None: @@ -98,29 +70,6 @@ def check_split_video_requirements(use_mkvmerge: bool) -> None: raise click.BadParameter(error_str, param_hint="split-video") -class AppState: - def __init__(self): - self.video_stream: VideoStream = None - self.scene_manager: SceneManager = None - self.stats_manager: StatsManager = None - self.output: str = None - self.quiet_mode: bool = None - self.stats_file_path: str = None - self.drop_short_scenes: bool = None - self.merge_last_scene: bool = None - self.min_scene_len: FrameTimecode = None - self.frame_skip: int = None - self.default_detector: ty.Tuple[ty.Type[SceneDetector], ty.Dict[str, ty.Any]] = None - self.start_time: FrameTimecode = None # time -s/--start - self.end_time: FrameTimecode = None # time -e/--end - self.duration: FrameTimecode = None # time -d/--duration - self.load_scenes_input: str = None # load-scenes -i/--input - self.load_scenes_column_name: str = None # load-scenes -c/--start-col-name - self.save_images: bool = False # True if the save-images command was specified - # Result of save-images function output stored for use by export-html - self.save_images_result: ty.Any = (None, None) - - class CliContext: """The state of the application representing what video will be processed, how, and what to do with the result. This includes handling all input options via command line and config file. @@ -159,14 +108,48 @@ def __init__(self): # the results of the detection pipeline by the controller. self.commands: ty.List[ty.Tuple[ty.Callable, ty.Dict[str, ty.Any]]] = [] - def add_command(self, command: ty.Callable, command_args: dict): - """Add `command` to the processing pipeline. Will be invoked after processing the input - the `context`, the resulting `scenes` and `cuts`, and `command_args`.""" + def add_command(self, command: ty.Callable, command_args: ty.Dict[str, ty.Any]): + """Add `command` to the processing pipeline. Will be called after processing the input.""" + if "output_dir" in command_args and command_args["output_dir"] is None: + command_args["output_dir"] = self.output_dir + logger.debug("Adding command: %s(%s)", command.__name__, command_args) self.commands.append((command, command_args)) - # - # Command Handlers - # + def add_detector(self, detector: ty.Type[SceneDetector], detector_args: ty.Dict[str, ty.Any]): + """Instantiate and add `detector` to the processing pipeline.""" + if self.load_scenes_input: + raise click.ClickException("The load-scenes command cannot be used with detectors.") + logger.debug("Adding detector: %s(%s)", detector.__name__, detector_args) + self.scene_manager.add_detector(detector(**detector_args)) + + def ensure_detector(self): + """Ensures at least one detector has been instantiated, otherwise adds a default one.""" + if self.scene_manager.get_num_detectors() == 0: + logger.debug("No detector specified, adding default detector.") + (detector_type, detector_args) = self.default_detector + self.add_detector(detector_type, detector_args) + + def parse_timecode(self, value: ty.Optional[str], correct_pts: bool = False) -> FrameTimecode: + """Parses a user input string into a FrameTimecode assuming the given framerate. If `value` + is None it will be passed through without processing. + + Raises: + click.BadParameter, click.ClickException + """ + if value is None: + return None + try: + if self.video_stream is None: + raise click.ClickException("No input video (-i/--input) was specified.") + if correct_pts and value.isdigit(): + value = int(value) + if value >= 1: + value -= 1 + return FrameTimecode(timecode=value, fps=self.video_stream.frame_rate) + except ValueError as ex: + raise click.BadParameter( + "timecode must be in seconds (100.0), frames (100), or HH:MM:SS" + ) from ex def handle_options( self, @@ -261,11 +244,10 @@ def handle_options( if self.output_dir: logger.info("Output directory set:\n %s", self.output_dir) - self.min_scene_len = parse_timecode( + self.min_scene_len = self.parse_timecode( min_scene_len if min_scene_len is not None else self.config.get_value("global", "min-scene-len"), - self.video_stream.frame_rate, ) self.drop_short_scenes = drop_short_scenes or self.config.get_value( "global", "drop-short-scenes" @@ -313,6 +295,10 @@ def handle_options( ] self.scene_manager = scene_manager + # + # Detector Parameters + # + def get_detect_content_params( self, threshold: ty.Optional[float] = None, @@ -322,9 +308,7 @@ def get_detect_content_params( kernel_size: ty.Optional[int] = None, filter_mode: ty.Optional[str] = None, ) -> ty.Dict[str, ty.Any]: - """Handle detect-content command options and return args to construct one with.""" - self.ensure_input_open() - + """Get a dict containing user options to construct a ContentDetector with.""" if self.drop_short_scenes: min_scene_len = 0 else: @@ -333,7 +317,7 @@ def get_detect_content_params( min_scene_len = self.min_scene_len.frame_num else: min_scene_len = self.config.get_value("detect-content", "min-scene-len") - min_scene_len = parse_timecode(min_scene_len, self.video_stream.frame_rate).frame_num + min_scene_len = self.parse_timecode(min_scene_len).frame_num if weights is not None: try: @@ -365,7 +349,6 @@ def get_detect_adaptive_params( min_delta_hsv: ty.Optional[float] = None, ) -> ty.Dict[str, ty.Any]: """Handle detect-adaptive command options and return args to construct one with.""" - self.ensure_input_open() # TODO(v0.7): Remove these branches when removing -d/--min-delta-hsv. if min_delta_hsv is not None: @@ -391,7 +374,7 @@ def get_detect_adaptive_params( min_scene_len = self.min_scene_len.frame_num else: min_scene_len = self.config.get_value("detect-adaptive", "min-scene-len") - min_scene_len = parse_timecode(min_scene_len, self.video_stream.frame_rate).frame_num + min_scene_len = self.parse_timecode(min_scene_len).frame_num if weights is not None: try: @@ -419,7 +402,6 @@ def get_detect_threshold_params( min_scene_len: ty.Optional[str] = None, ) -> ty.Dict[str, ty.Any]: """Handle detect-threshold command options and return args to construct one with.""" - self.ensure_input_open() if self.drop_short_scenes: min_scene_len = 0 @@ -429,7 +411,7 @@ def get_detect_threshold_params( min_scene_len = self.min_scene_len.frame_num else: min_scene_len = self.config.get_value("detect-threshold", "min-scene-len") - min_scene_len = parse_timecode(min_scene_len, self.video_stream.frame_rate).frame_num + min_scene_len = self.parse_timecode(min_scene_len).frame_num # TODO(v1.0): add_last_scene cannot be disabled right now. return { "add_final_scene": add_last_scene @@ -439,23 +421,6 @@ def get_detect_threshold_params( "threshold": self.config.get_value("detect-threshold", "threshold", threshold), } - def handle_load_scenes(self, input: ty.AnyStr, start_col_name: ty.Optional[str]): - """Handle `load-scenes` command options.""" - self.ensure_input_open() - if self.scene_manager.get_num_detectors() > 0: - raise click.ClickException("The load-scenes command cannot be used with detectors.") - if self.load_scenes_input: - raise click.ClickException("The load-scenes command must only be specified once.") - input = os.path.abspath(input) - if not os.path.exists(input): - raise click.BadParameter( - f"Could not load scenes, file does not exist: {input}", param_hint="-i/--input" - ) - self.load_scenes_input = input - self.load_scenes_column_name = self.config.get_value( - "load-scenes", "start-col-name", start_col_name - ) - def get_detect_hist_params( self, threshold: ty.Optional[float] = None, @@ -463,7 +428,7 @@ def get_detect_hist_params( min_scene_len: ty.Optional[str] = None, ) -> ty.Dict[str, ty.Any]: """Handle detect-hist command options and return args to construct one with.""" - self.ensure_input_open() + if self.drop_short_scenes: min_scene_len = 0 else: @@ -472,7 +437,7 @@ def get_detect_hist_params( min_scene_len = self.min_scene_len.frame_num else: min_scene_len = self.config.get_value("detect-hist", "min-scene-len") - min_scene_len = parse_timecode(min_scene_len, self.video_stream.frame_rate).frame_num + min_scene_len = self.parse_timecode(min_scene_len).frame_num return { "bins": self.config.get_value("detect-hist", "bins", bins), "min_scene_len": min_scene_len, @@ -487,7 +452,7 @@ def get_detect_hash_params( min_scene_len: ty.Optional[str] = None, ) -> ty.Dict[str, ty.Any]: """Handle detect-hash command options and return args to construct one with.""" - self.ensure_input_open() + if self.drop_short_scenes: min_scene_len = 0 else: @@ -496,7 +461,7 @@ def get_detect_hash_params( min_scene_len = self.min_scene_len.frame_num else: min_scene_len = self.config.get_value("detect-hash", "min-scene-len") - min_scene_len = parse_timecode(min_scene_len, self.video_stream.frame_rate).frame_num + min_scene_len = self.parse_timecode(min_scene_len).frame_num return { "lowpass": self.config.get_value("detect-hash", "lowpass", lowpass), "min_scene_len": min_scene_len, @@ -504,26 +469,6 @@ def get_detect_hash_params( "threshold": self.config.get_value("detect-hash", "threshold", threshold), } - def handle_time(self, start, duration, end): - """Handle `time` command options.""" - self.ensure_input_open() - if duration is not None and end is not None: - raise click.BadParameter( - "Only one of --duration/-d or --end/-e can be specified, not both.", - param_hint="time", - ) - logger.debug( - "Setting video time:\n start: %s, duration: %s, end: %s", start, duration, end - ) - # *NOTE*: The Python API uses 0-based frame indices, but the CLI uses 1-based indices to - # match the default start number used by `ffmpeg` when saving frames as images. As such, - # we must correct start time if set as frames. See the test_cli_time* tests for for details. - self.start_time = parse_timecode(start, self.video_stream.frame_rate, correct_pts=True) - self.end_time = parse_timecode(end, self.video_stream.frame_rate) - self.duration = parse_timecode(duration, self.video_stream.frame_rate) - if self.start_time and self.end_time and (self.start_time + 1) > self.end_time: - raise click.BadParameter("-e/--end time must be greater than -s/--start") - # # Private Methods # @@ -561,26 +506,6 @@ def _initialize_logging( # Initialize logger with the set CLI args / user configuration. init_logger(log_level=curr_verbosity, show_stdout=not self.quiet_mode, log_file=logfile) - def add_detector(self, detector): - """Add Detector: Adds a detection algorithm to the CliContext's SceneManager.""" - if self.load_scenes_input: - raise click.ClickException("The load-scenes command cannot be used with detectors.") - self.ensure_input_open() - self.scene_manager.add_detector(detector) - - def ensure_input_open(self): - """Ensure self.video_stream was initialized (i.e. -i/--input was specified), - otherwise raises an exception. Should only be used from commands that require an - input video to process the options (e.g. those that require a timecode). - - Raises: - click.BadParameter: self.video_stream was not initialized. - """ - # TODO: Do we still need to do this for each command? Originally this was added for the - # help command to function correctly. - if self.video_stream is None: - raise click.ClickException("No input video (-i/--input) was specified.") - def _open_video_stream( self, input_path: ty.AnyStr, framerate: ty.Optional[float], backend: ty.Optional[str] ): @@ -642,22 +567,3 @@ def _open_video_stream( raise click.BadParameter( "Input error:\n\n\t%s\n" % str(ex), param_hint="-i/--input" ) from None - - def _on_duplicate_command(self, command: str) -> None: - """Called when a command is duplicated to stop parsing and raise an error. - - Arguments: - command: Command that was duplicated for error context. - - Raises: - click.BadParameter - """ - error_strs = [] - error_strs.append("Error: Command %s specified multiple times." % command) - error_strs.append("The %s command may appear only one time.") - - logger.error("\n".join(error_strs)) - raise click.BadParameter( - "\n Command %s may only be specified once." % command, - param_hint="%s command" % command, - ) diff --git a/scenedetect/_cli/controller.py b/scenedetect/_cli/controller.py index 4147d43f..313997fd 100644 --- a/scenedetect/_cli/controller.py +++ b/scenedetect/_cli/controller.py @@ -20,15 +20,11 @@ from scenedetect._cli.context import CliContext from scenedetect.frame_timecode import FrameTimecode from scenedetect.platform import get_and_create_path -from scenedetect.scene_manager import get_scenes_from_cuts +from scenedetect.scene_manager import CutList, SceneList, get_scenes_from_cuts from scenedetect.video_stream import SeekError logger = logging.getLogger("pyscenedetect") -SceneList = ty.List[ty.Tuple[FrameTimecode, FrameTimecode]] - -CutList = ty.List[FrameTimecode] - def run_scenedetect(context: CliContext): """Perform main CLI application control logic. Run once all command-line options and @@ -43,11 +39,6 @@ def run_scenedetect(context: CliContext): logger.debug("No input specified.") return - if context.commands: - logger.debug("Commands to run after processing:") - for func, args in context.commands: - logger.debug("%s(%s)", func.__name__, args) - if context.load_scenes_input: # Skip detection if load-scenes was used. logger.info("Skipping detection, loading scenes from: %s", context.load_scenes_input) @@ -74,9 +65,6 @@ def run_scenedetect(context: CliContext): # Handle post-processing commands the user wants to run (see scenedetect._cli.commands). for handler, kwargs in context.commands: - # TODO: This override should be handled inside the config manager get_value function. - if "output_dir" in kwargs and kwargs["output_dir"] is None: - kwargs["output_dir"] = context.output_dir handler(context=context, scenes=scenes, cuts=cuts, **kwargs) @@ -96,13 +84,9 @@ def _postprocess_scene_list(context: CliContext, scene_list: SceneList) -> Scene def _detect(context: CliContext) -> ty.Optional[ty.Tuple[SceneList, CutList]]: - # Use default detector if one was not specified. - if context.scene_manager.get_num_detectors() == 0: - detector_type, detector_args = context.default_detector - logger.debug("Using default detector: %s(%s)" % (detector_type.__name__, detector_args)) - context.scene_manager.add_detector(detector_type(**detector_args)) - perf_start_time = time.time() + + context.ensure_detector() if context.start_time is not None: logger.debug("Seeking to start time...") try: diff --git a/scenedetect/scene_manager.py b/scenedetect/scene_manager.py index dc3bba04..f844ba57 100644 --- a/scenedetect/scene_manager.py +++ b/scenedetect/scene_manager.py @@ -106,6 +106,12 @@ def on_new_scene(frame_img: numpy.ndarray, frame_num: int): logger = logging.getLogger("pyscenedetect") +SceneList = List[Tuple[FrameTimecode, FrameTimecode]] +"""Type hint for a list of scenes in the form (start time, end time).""" + +CutList = List[FrameTimecode] +"""Type hint for a list of cuts, where each timecode represents the first frame of a new shot.""" + # TODO: This value can and should be tuned for performance improvements as much as possible, # until accuracy falls, on a large enough dataset. This has yet to be done, but the current # value doesn't seem to have caused any issues at least. @@ -158,11 +164,11 @@ def compute_downscale_factor(frame_width: int, effective_width: int = DEFAULT_MI def get_scenes_from_cuts( - cut_list: Iterable[FrameTimecode], + cut_list: CutList, start_pos: Union[int, FrameTimecode], end_pos: Union[int, FrameTimecode], base_timecode: Optional[FrameTimecode] = None, -) -> List[Tuple[FrameTimecode, FrameTimecode]]: +) -> SceneList: """Returns a list of tuples of start/end FrameTimecodes for each scene based on a list of detected scene cuts/breaks. @@ -207,9 +213,9 @@ def get_scenes_from_cuts( def write_scene_list( output_csv_file: TextIO, - scene_list: Iterable[Tuple[FrameTimecode, FrameTimecode]], + scene_list: SceneList, include_cut_list: bool = True, - cut_list: Optional[Iterable[FrameTimecode]] = None, + cut_list: Optional[CutList] = None, ) -> None: """Writes the given list of scenes to an output file handle in CSV format. @@ -263,14 +269,14 @@ def write_scene_list( def write_scene_list_html( - output_html_filename, - scene_list, - cut_list=None, - css=None, - css_class="mytable", - image_filenames=None, - image_width=None, - image_height=None, + output_html_filename: str, + scene_list: SceneList, + cut_list: Optional[CutList] = None, + css: str = None, + css_class: str = "mytable", + image_filenames: Optional[Dict[int, List[str]]] = None, + image_width: Optional[int] = None, + image_height: Optional[int] = None, ): """Writes the given list of scenes to an output file handle in html format. @@ -287,6 +293,7 @@ def write_scene_list_html( image_width: Optional desired width of images in table in pixels image_height: Optional desired height of images in table in pixels """ + logger.info("Exporting scenes to html:\n %s:", output_html_filename) if not css: css = """ table.mytable { @@ -386,11 +393,9 @@ def write_scene_list_html( # -# TODO(v1.0): Refactor to take a SceneList object; consider moving this and save scene list -# to a better spot, or just move them to scene_list.py. -# +# TODO(v1.0): Consider moving all post-processing functionality into a separate submodule. def save_images( - scene_list: List[Tuple[FrameTimecode, FrameTimecode]], + scene_list: SceneList, video: VideoStream, num_images: int = 3, frame_margin: int = 1, @@ -474,7 +479,7 @@ def save_images( # Setup flags and init progress bar if available. completed = True - logger.info("Generating output images (%d per scene)...", num_images) + logger.info(f"Saving {num_images} images per scene to {output_dir}, format {image_extension}") progress_bar = None if show_progress: progress_bar = tqdm(total=len(scene_list) * num_images, unit="images", dynamic_ncols=True) @@ -537,6 +542,7 @@ def save_images( video.seek(image_timecode) frame_im = video.read() if frame_im is not None: + # TODO: Add extension to template. # TODO: Allow NUM to be a valid suffix in addition to NUMBER. file_path = "%s.%s" % ( filename_template.safe_substitute( @@ -740,7 +746,7 @@ def clear_detectors(self) -> None: def get_scene_list( self, base_timecode: Optional[FrameTimecode] = None, start_in_scene: bool = False - ) -> List[Tuple[FrameTimecode, FrameTimecode]]: + ) -> SceneList: """Return a list of tuples of start/end FrameTimecodes for each detected scene. Arguments: @@ -779,7 +785,7 @@ def _get_cutting_list(self) -> List[int]: # Ensure all cuts are unique by using a set to remove all duplicates. return [self._base_timecode + cut for cut in sorted(set(self._cutting_list))] - def _get_event_list(self) -> List[Tuple[FrameTimecode, FrameTimecode]]: + def _get_event_list(self) -> SceneList: if not self._event_list: return [] assert self._base_timecode is not None @@ -1065,8 +1071,10 @@ def _decode_thread( # def get_cut_list( - self, base_timecode: Optional[FrameTimecode] = None, show_warning: bool = True - ) -> List[FrameTimecode]: + self, + base_timecode: Optional[FrameTimecode] = None, + show_warning: bool = True, + ) -> CutList: """[DEPRECATED] Return a list of FrameTimecodes of the detected scene changes/cuts. Unlike get_scene_list, the cutting list returns a list of FrameTimecodes representing @@ -1092,9 +1100,7 @@ def get_cut_list( logger.error("`get_cut_list()` is deprecated and will be removed in a future release.") return self._get_cutting_list() - def get_event_list( - self, base_timecode: Optional[FrameTimecode] = None - ) -> List[Tuple[FrameTimecode, FrameTimecode]]: + def get_event_list(self, base_timecode: Optional[FrameTimecode] = None) -> SceneList: """[DEPRECATED] DO NOT USE. Get a list of start/end timecodes of sparse detection events.