diff --git a/.github/workflows/auto-format.yaml b/.github/workflows/auto-format.yaml index 9014e09..c06c683 100644 --- a/.github/workflows/auto-format.yaml +++ b/.github/workflows/auto-format.yaml @@ -23,7 +23,7 @@ jobs: with: src: "src" options: "--verbose --line-length 120" - - name: Check if any files were modikfied + - name: Check if any files were modified id: git-check run: echo ::set-output name=modified::$(if git diff-index --quiet HEAD --; then echo "false"; else echo "true"; fi) - name: Push changes if needed @@ -34,4 +34,3 @@ jobs: git remote set-url origin https://x-access-token:${{ secrets.GITHUB_TOKEN }}@github.com/${{ github.repository }} git commit -am "Automatically reformatting code with black and isort" git push - diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index d1e1e05..ff35f5f 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -15,11 +15,9 @@ jobs: fail-fast: false matrix: python-version: [ - # "3.7", - "3.8", - #"3.9", - "3.10", - #"3.11", + "3.9", + "3.10", + "3.11", ] steps: - name: get code diff --git a/README.md b/README.md index 9c3bf58..7cceb01 100644 --- a/README.md +++ b/README.md @@ -6,11 +6,24 @@ FrameGrab is an open-source Python library designed to make it easy to grab fram FrameGrab also provides basic motion detection functionality. FrameGrab requires Python 3.7 or higher. ## Table of Contents -- [Installation](#installation) -- [Usage](#usage) -- [Examples](#examples) -- [Contributing](#contributing) -- [License](#license) + +- [FrameGrab by Groundlight](#framegrab-by-groundlight) + - [A user-friendly library for grabbing images from cameras or streams](#a-user-friendly-library-for-grabbing-images-from-cameras-or-streams) + - [Table of Contents](#table-of-contents) + - [Installation](#installation) + - [Optional Dependencies](#optional-dependencies) + - [Usage](#usage) + - [Command line interface (CLI)](#command-line-interface-cli) + - [Frame Grabbing](#frame-grabbing) + - [Configurations](#configurations) + - [Autodiscovery](#autodiscovery) + - [RTSP Discovery](#rtsp-discovery) + - [Motion Detection](#motion-detection) + - [Examples](#examples) + - [Generic USB](#generic-usb) + - [YouTube Live](#youtube-live) + - [Contributing](#contributing) + - [License](#license) ## Installation @@ -21,11 +34,27 @@ pip install framegrab ``` ## Optional Dependencies -Certain camera types have additional dependencies that must be installed separately. If you don't intend to use these camera types, you don't need to install these extra packages. +Certain camera types have additional dependencies that must be installed separately. If you don't intend to use these camera types, you don't need to install these extra packages. - To use a Basler USB or GigE camera, you must separately install the `pypylon` package. - To use Intel RealSense cameras, you must install `pyrealsense2`. - To use a Raspberry Pi "CSI2" camera (connected with a ribbon cable), you must install the `picamera2` library. See install instructions at the [picamera2 github repository](https://github.com/raspberrypi/picamera2). +- To use a YouTube Live stream, you must install `streamlink`. + +We provide optional extras to install these dependencies. For example, to install the Basler camera dependencies, run: +``` +pip install framegrab[basler] +``` + +To install YouTube Live stream dependencies, run: +``` +pip install framegrab[youtube] +``` + +To install all optional dependencies, run: +``` +pip install framegrab[all] +``` ## Usage @@ -45,7 +74,7 @@ lists the sub-commands, including `autodiscover` and `preview`. Frame Grabbers are defined by a configuration dict which is usually stored as YAML. The configuration combines the camera type, the camera ID, and the camera options. The configuration is passed to the `FrameGrabber.create_grabber` method to create a grabber object. The grabber object can then be used to grab frames from the camera. -`config` can contain many details and settings about your camera, but only `input_type` is required. Available `input_type` options are: `generic_usb`, `rtsp`, `realsense`, `basler`, and `rpi_csi2`. +`config` can contain many details and settings about your camera, but only `input_type` is required. Available `input_type` options are: `generic_usb`, `rtsp`, `realsense`, `basler`, `rpi_csi2`, `hls`, and `youtube_live`. Here's an example of a single USB camera configured with several options: ```python @@ -92,13 +121,13 @@ When you are done with the camera, release the resource by running: grabber.release() ``` -You might have several cameras that you want to use in the same application. In this case, you can load the configurations from a yaml file and use `FrameGrabber.create_grabbers`. Note that currently only a single Raspberry Pi CSI2 camera is supported, but these cameras can be used in conjunction with other types of cameras. +You might have several cameras that you want to use in the same application. In this case, you can load the configurations from a yaml file and use `FrameGrabber.create_grabbers`. Note that currently only a single Raspberry Pi CSI2 camera is supported, but these cameras can be used in conjunction with other types of cameras. If you have multiple cameras of the same type plugged in, it's recommended that you include serial numbers in the configurations; this ensures that each configuration is paired with the correct camera. If you don't provide serial numbers in your configurations, configurations will be paired with cameras in a sequential manner. Below is a sample yaml file containing configurations for three different cameras. ```yaml -image_sources: +image_sources: - name: On Robot Arm input_type: basler id: @@ -139,28 +168,29 @@ for grabber in grabbers.values(): ``` ### Configurations The table below shows all available configurations and the cameras to which they apply. -| Configuration Name | Example | Generic USB | RTSP | Basler | Realsense | Raspberry Pi CSI2 | -|----------------------------|-----------------|------------|-----------|-----------|-----------|-----------| -| name | On Robot Arm | optional | optional | optional | optional | optional | -| input_type | generic_usb | required | required | required | required | required | -| id.serial_number | 23458234 | optional | - | optional | optional | - | -| id.rtsp_url | rtsp://… | - | required | - | - | - | -| options.resolution.height | 480 | optional | - | - | optional | - | -| options.resolution.width | 640 | optional | - | - | optional | - | -| options.zoom.digital | 1.3 | optional | optional | optional | optional | optional | -| options.crop.pixels.top | 100 | optional | optional | optional | optional | optional | -| options.crop.pixels.bottom | 400 | optional | optional | optional | optional | optional | -| options.crop.pixels.left | 100 | optional | optional | optional | optional | optional | -| options.crop.pixels.right | 400 | optional | optional | optional | optional | optional | -| options.crop.relative.top | 0.1 | optional | optional | optional | optional | optional | -| options.crop.relative.bottom | 0.9 | optional | optional | optional | optional | optional | -| options.crop.relative.left | 0.1 | optional | optional | optional | optional | optional | -| options.crop.relative.right | 0.9 | optional | optional | optional | optional | optional | -| options.depth.side_by_side | 1 | - | - | - | optional | - | -| options.num_90_deg_rotations | 2 | optional | optional | optional | optional | optional | -| options.keep_connection_open | True | - | optional | - | - | - | -| options.max_fps | 30 | - | optional | - | - | - | - +| Configuration Name | Example | Generic USB | RTSP | Basler | Realsense | Raspberry Pi CSI2 | HLS | YouTube Live | +|----------------------------|-----------------|------------|-----------|-----------|-----------|-----------|-----------|-----------| +| name | On Robot Arm | optional | optional | optional | optional | optional | optional | optional | +| input_type | generic_usb | required | required | required | required | required | required | required | +| id.serial_number | 23458234 | optional | - | optional | optional | - | - | - | +| id.rtsp_url | rtsp://… | - | required | - | - | - | - | - | +| id.hls_url | https://.../*.m3u8 | - | - | - | - | - | required | - | +| id.youtube_url | https://www.youtube.com/watch?v=... | - | - | - | - | - | - | required | +| options.resolution.height | 480 | optional | - | - | optional | - | - | - | +| options.resolution.width | 640 | optional | - | - | optional | - | - | - | +| options.zoom.digital | 1.3 | optional | optional | optional | optional | optional | optional | optional | +| options.crop.pixels.top | 100 | optional | optional | optional | optional | optional | optional | optional | +| options.crop.pixels.bottom | 400 | optional | optional | optional | optional | optional | optional | optional | +| options.crop.pixels.left | 100 | optional | optional | optional | optional | optional | optional | optional | +| options.crop.pixels.right | 400 | optional | optional | optional | optional | optional | optional | optional | +| options.crop.relative.top | 0.1 | optional | optional | optional | optional | optional | optional | optional | +| options.crop.relative.bottom | 0.9 | optional | optional | optional | optional | optional | optional | optional | +| options.crop.relative.left | 0.1 | optional | optional | optional | optional | optional | optional | optional | +| options.crop.relative.right | 0.9 | optional | optional | optional | optional | optional | optional | optional | +| options.depth.side_by_side | 1 | - | - | - | optional | - | - | - | +| options.num_90_deg_rotations | 2 | optional | optional | optional | optional | optional | optional | optional | +| options.keep_connection_open | True | - | optional | - | - | - | optional | optional | +| options.max_fps | 30 | - | optional | - | - | - | - | - | @@ -185,7 +215,7 @@ RTSP cameras with support for ONVIF can be discovered on your local network in t ```python from framegrab import RTSPDiscovery, ONVIFDeviceInfo - + devices = RTSPDiscovery.discover_onvif_devices() ``` @@ -194,7 +224,7 @@ The `discover_onvif_devices()` will provide a list of devices that it finds in t - off: No discovery. - ip_only: Only discover the IP address of the camera. - light: Only try first two usernames and passwords ("admin:admin" and no username/password). -- complete_fast: Try the entire DEFAULT_CREDENTIALS without delays in between. +- complete_fast: Try the entire DEFAULT_CREDENTIALS without delays in between. - complete_slow: Try the entire DEFAULT_CREDENTIALS with a delay of 1 seconds in between. @@ -240,6 +270,7 @@ if m.motion_detected(frame): ## Examples +### Generic USB Here's an example of using the FrameGrab library to continuously capture frames and detect motion from a video stream: ```python @@ -263,6 +294,34 @@ while True: print("Motion detected!") ``` +### YouTube Live +Here's an example of using FrameGrab to capture frames from a YouTube Live stream: + +```python +from framegrab import FrameGrabber +import cv2 + +config = { + 'input_type': 'youtube_live', + 'id': { + 'youtube_url': 'https://www.youtube.com/watch?v=your_video_id' + } +} + +grabber = FrameGrabber.create_grabber(config) + +frame = grabber.grab() +if frame is None: + raise Exception("No frame captured") + +# Process the frame as needed +# For example, display it using cv2.imshow() +# For example, save it to a file +cv2.imwrite('youtube_frame.jpg', frame) + +grabber.release() +``` + ## Contributing We welcome contributions to FrameGrab! If you would like to contribute, please follow these steps: @@ -275,5 +334,3 @@ We welcome contributions to FrameGrab! If you would like to contribute, please f ## License FrameGrab is released under the MIT License. For more information, please refer to the [LICENSE.txt](https://github.com/groundlight/framegrab/blob/main/LICENSE.txt) file. - - diff --git a/pyproject.toml b/pyproject.toml index 36ca859..3c1d9c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "framegrab" -version = "0.7.0" +version = "0.8.0" description = "Easily grab frames from cameras or streams" authors = ["Groundlight "] license = "MIT" @@ -9,19 +9,30 @@ homepage = "https://www.groundlight.ai/" repository = "https://github.com/groundlight/framegrab" [tool.poetry.dependencies] -python = "^3.7" +python = "^3.9" opencv-python = "^4.4.0.46" -pyyaml = "^6.0.1" +pyyaml = "^6.0.2" imgcat = "^0.5.0" click = "^8.1.6" ascii-magic = "^2.3.0" wsdiscovery = "^2.0.0" onvif-zeep = "^0.2.12" -pydantic = "^2.5.3" +pydantic = "^2.9.2" +pypylon = { version = ">=3.0.0", optional = true } +pyrealsense2 = { version = "^2.55.1.6486", optional = true } +picamera2 = { version = ">=0.3.21", optional = true } +streamlink = { version = "^7.0.0", optional = true } + +[tool.poetry.extras] +basler = ["pypylon"] +realsense = ["pyrealsense2"] +raspberrypi = ["picamera2"] +youtube = ["streamlink"] +all = ["pypylon", "pyrealsense2", "picamera2", "streamlink"] [tool.poetry.group.dev.dependencies] -black = "^23.3.0" -pytest = "^7.0.1" +black = "^24.10.0" +pytest = "^8.3.3" [build-system] requires = ["poetry-core"] diff --git a/sample_scripts/sample_config.yaml b/sample_scripts/sample_config.yaml index f67c664..0c0b8d6 100644 --- a/sample_scripts/sample_config.yaml +++ b/sample_scripts/sample_config.yaml @@ -1,12 +1,12 @@ image_sources: - name: Front Door input_type: generic_usb - options: + options: zoom: digital: 1.5 - name: Conference Room input_type: rtsp - id: + id: rtsp_url: rtsp://admin:password@10.0.0.0/cam/realmonitor?channel=1&subtype=0 options: crop: @@ -17,8 +17,12 @@ image_sources: right: .9 - name: Workshop input_type: basler - id: + id: serial_number: 12345678 options: basler: ExposureTime: 60000 + - name: NamibiaCam Live stream at the Okaukuejo waterhole in Etosha National Park + input_type: youtube_live + id: + youtube_url: https://www.youtube.com/watch?v=DAmFZj1y_a0 diff --git a/src/framegrab/cli/autodiscover.py b/src/framegrab/cli/autodiscover.py index b81a2a5..90342e6 100644 --- a/src/framegrab/cli/autodiscover.py +++ b/src/framegrab/cli/autodiscover.py @@ -2,7 +2,6 @@ import click import yaml -from imgcat import imgcat from framegrab import FrameGrabber from framegrab.cli.clitools import ( @@ -10,7 +9,6 @@ PREVIEW_RTSP_COMMAND_CHOICES, preview_image, ) -from framegrab.rtsp_discovery import AutodiscoverMode @click.command() @@ -47,7 +45,10 @@ def autodiscover(preview: str, rtsp_discover_mode: str = "off"): click.echo(f"Failed to grab sample frame from {camera_name}.", err=True) continue - click.echo(f"Grabbed sample frame from {camera_name} with shape {frame.shape}", err=True) + click.echo( + f"Grabbed sample frame from {camera_name} with shape {frame.shape}", + err=True, + ) click.echo(grabber.config, err=True) preview_image(frame, camera_name, preview) diff --git a/src/framegrab/cli/preview.py b/src/framegrab/cli/preview.py index 116bf1a..98bd465 100644 --- a/src/framegrab/cli/preview.py +++ b/src/framegrab/cli/preview.py @@ -1,12 +1,7 @@ -import shutil import traceback -import ascii_magic import click -import cv2 import yaml -from imgcat import imgcat -from PIL import Image from framegrab import FrameGrabber, preview_image diff --git a/src/framegrab/grabber.py b/src/framegrab/grabber.py index ae5b8e4..e95fe04 100644 --- a/src/framegrab/grabber.py +++ b/src/framegrab/grabber.py @@ -17,14 +17,14 @@ from .rtsp_discovery import AutodiscoverMode, RTSPDiscovery from .unavailable_module import UnavailableModule -logger = logging.getLogger(__name__) - -# Optional imports +# -- Optional imports -- +# Only used for Basler cameras, not required otherwise try: from pypylon import pylon except ImportError as e: pylon = UnavailableModule(e) +# Only used for RealSense cameras, not required otherwise try: from pyrealsense2 import pyrealsense2 as rs except ImportError as e: @@ -36,6 +36,14 @@ except ImportError as e: Picamera2 = UnavailableModule(e) +# Only used for Youtube Live streams, not required otherwise +try: + import streamlink +except ImportError as e: + streamlink = UnavailableModule(e) + +logger = logging.getLogger(__name__) + OPERATING_SYSTEM = platform.system() DIGITAL_ZOOM_MAX = 4 NOISE = np.random.randint(0, 256, (480, 640, 3), dtype=np.uint8) # in case a camera can't get a frame @@ -49,6 +57,8 @@ class InputTypes: REALSENSE = "realsense" BASLER = "basler" RPI_CSI2 = "rpi_csi2" + HLS = "hls" + YOUTUBE_LIVE = "youtube_live" MOCK = "mock" def get_options() -> list: @@ -137,12 +147,16 @@ def create_grabbers(configs: List[dict], warmup_delay: float = 1.0) -> Dict[str, @staticmethod def from_yaml(filename: Optional[str] = None, yaml_str: Optional[str] = None) -> List["FrameGrabber"]: - """Creates multiple FrameGrab objects based on a YAML file or YAML string. - Either filename or yaml_str must be provided, but not both. + """Creates multiple FrameGrabber objects based on a YAML file or YAML string. + + Args: + filename (str, optional): The filename of the YAML file to load. + Either filename or yaml_str must be provided, but not both. + yaml_str (str, optional): A YAML string to parse. + Either filename or yaml_str must be provided, but not both. - :param filename: The filename of the YAML file to load. - :param yaml_str: A YAML string to parse. - :return: A dictionary where the keys are the camera names, and the values are FrameGrabber objects. + Returns: + List[FrameGrabber]: A list of FrameGrabber objects created from the YAML configuration. """ if filename is None and yaml_str is None: raise ValueError("Either filename or yaml_str must be provided.") @@ -257,6 +271,10 @@ def create_grabber(config: dict, autogenerate_name: bool = True, warmup_delay: f grabber = RealSenseFrameGrabber(config) elif input_type == InputTypes.RPI_CSI2: grabber = RaspberryPiCSI2FrameGrabber(config) + elif input_type == InputTypes.HLS: + grabber = HttpLiveStreamingFrameGrabber(config) + elif input_type == InputTypes.YOUTUBE_LIVE: + grabber = YouTubeLiveFrameGrabber(config) elif input_type == InputTypes.MOCK: grabber = MockFrameGrabber(config) else: @@ -282,7 +300,10 @@ def create_grabber(config: dict, autogenerate_name: bool = True, warmup_delay: f return grabber @staticmethod - def autodiscover(warmup_delay: float = 1.0, rtsp_discover_mode: AutodiscoverMode = AutodiscoverMode.off) -> dict: + def autodiscover( + warmup_delay: float = 1.0, + rtsp_discover_mode: AutodiscoverMode = AutodiscoverMode.off, + ) -> dict: """Autodiscovers cameras and returns a dictionary of FrameGrabber objects warmup_delay (float, optional): The number of seconds to wait after creating the grabbers. USB @@ -750,9 +771,11 @@ def _find_cameras() -> list: class RTSPFrameGrabber(FrameGrabber): - """Handles RTSP streams. Can operate in two modes based on the `keep_connection_open` configuration: - 1. If `true`, keeps the connection open for low-latency frame grabbing, but consumes more CPU. (default) - 2. If `false`, opens the connection only when needed, which is slower but conserves resources. + """Handles RTSP streams. + + Can operate in two modes based on the `keep_connection_open` configuration: + 1. If `true`, keeps the connection open for low-latency frame grabbing, but consumes more CPU. (default) + 2. If `false`, opens the connection only when needed, which is slower but conserves resources. """ def __init__(self, config: dict): @@ -1097,6 +1120,109 @@ def release(self) -> None: self.camera.close() +class HttpLiveStreamingFrameGrabber(FrameGrabber): + """Handles Http Live Streaming (HLS) + + Supports two modes: + 1. Keep connection open (default): Opens the connection once and keeps it open for high-fps frame grabbing. + 2. Open connection on every frame: Opens and closes the connection on every captured frame, which conserves + both CPU and network bandwidth but has higher latency. In practice, roughly 1FPS is achievable with this strategy. + """ + + def __init__(self, config: dict): + hls_url = config.get("id", {}).get("hls_url") + if not hls_url: + camera_name = config.get("name", "Unnamed HLS Stream") + raise ValueError( + f"No HLS URL provided for {camera_name}. Please add an hls_url attribute to the config under id." + ) + + self.type = "HLS" + self.config = config + self.hls_url = self.config["id"]["hls_url"] + + self.lock = Lock() + self.keep_connection_open = config.get("options", {}).get("keep_connection_open", True) + + if self.keep_connection_open: + self._open_connection() + + def _apply_camera_specific_options(self, options: dict) -> None: + if options.get("resolution"): + camera_name = self.config.get("name", f"Unnamed {self.type} Stream") + raise ValueError( + f"Resolution was set for {camera_name}, but resolution cannot be set for {self.type} streams." + ) + + def _open_connection(self): + self.capture = cv2.VideoCapture(self.hls_url) + if not self.capture.isOpened(): + raise ValueError(f"Could not open {self.type} stream: {self.hls_url}. Is the HLS URL correct?") + logger.warning(f"Initialized video capture with backend={self.capture.getBackendName()}") + + def _close_connection(self): + logger.warning(f"Closing connection to {self.type} stream") + with self.lock: + if self.capture is not None: + self.capture.release() + + def _grab_implementation(self) -> np.ndarray: + if not self.keep_connection_open: + self._open_connection() + try: + return self._grab_open() + finally: + self._close_connection() + else: + return self._grab_open() + + def _grab_open(self) -> np.ndarray: + with self.lock: + ret, frame = self.capture.read() + if not ret: + logger.error(f"Could not read frame from {self.capture}") + return frame + + def release(self) -> None: + if self.keep_connection_open: + self._close_connection() + + +class YouTubeLiveFrameGrabber(HttpLiveStreamingFrameGrabber): + """Grabs the most recent frame from a YouTube Live stream (which are HLS streams under the hood) + + Supports two modes: + 1. Keep connection open (default): Opens the connection once and keeps it open for high-fps frame grabbing. + 2. Open connection on every frame: Opens and closes the connection on every captured frame, which conserves + both CPU and network bandwidth but has higher latency. In practice, roughly 1FPS is achievable with this strategy. + """ + + def __init__(self, config: dict): + youtube_url = config.get("id", {}).get("youtube_url") + if not youtube_url: + camera_name = config.get("name", "Unnamed YouTube Live Stream") + raise ValueError( + f"No YouTube Live URL provided for {camera_name}. Please add an youtube_url attribute to the config under id." + ) + + self.type = "YouTube Live" + self.hls_url = self._extract_hls_url(youtube_url) + self.config = config + + self.lock = Lock() + self.keep_connection_open = config.get("options", {}).get("keep_connection_open", True) + + if self.keep_connection_open: + self._open_connection() + + def _extract_hls_url(self, youtube_url: str) -> str: + """Extracts the HLS URL from a YouTube Live URL.""" + available_streams = streamlink.streams(youtube_url) + if "best" not in available_streams: + raise ValueError(f"No available HLS stream for {youtube_url=}\n{available_streams=}") + return available_streams["best"].url + + class MockFrameGrabber(FrameGrabber): """A mock camera class for testing purposes""" diff --git a/src/framegrab/motion.py b/src/framegrab/motion.py index b72deb5..7bbbf3e 100644 --- a/src/framegrab/motion.py +++ b/src/framegrab/motion.py @@ -1,4 +1,5 @@ import logging +from typing import Optional import numpy as np @@ -6,22 +7,40 @@ class MotionDetector: - # Simple motion detector using the three frame differencing - # commonly attributed to Collins, et. al A system for video surveillance and monitoring. Technical report, 2000 - # Defaults to 1% pixel difference threshold (good for many applications) + """Motion detector using three-frame differencing algorithm. - def __init__(self, pct_threshold: float = 1, val_threshold: int = 50) -> bool: - """ - :param val_threshold: The minimum brightness change for a pixel for it to be considered changed - :param pct_threshold: Percent of pixels needed to change before motion is detected - """ + This implements the motion detection algorithm described in: + Collins et al., "A System for Video Surveillance and Monitoring", + Carnegie Mellon University, Pittsburgh, PA, Technical Report CMU-RI-TR-00-12, May 2000. + + The algorithm compares each new frame against the previous two frames to detect motion. + Motion is detected when a sufficient percentage of pixels show significant brightness changes + across consecutive frames. + + Args: + pct_threshold (float, optional): Percentage of pixels that must change for motion to be detected. + Defaults to 1.0 (1% of pixels). + val_threshold (int, optional): Minimum brightness change required for a pixel to be considered changed. + Defaults to 50. + """ + + def __init__(self, pct_threshold: float = 1, val_threshold: int = 50) -> None: self.unused = True self.pixel_val_threshold = val_threshold self.pixel_pct_threshold = pct_threshold self.log_pixel_percent = True - def pixel_threshold(self, img: np.ndarray, threshold_val: float = None) -> bool: - """Returns true if more then pixel_pct_threshold% of pixels have value greater than pixel_val_threshold""" + def pixel_threshold(self, img: np.ndarray, threshold_val: Optional[float] = None) -> bool: + """Check if enough pixels exceed the brightness threshold. + + Args: + img: Input image array + threshold_val: Optional override for the brightness threshold value. + If None, uses self.pixel_val_threshold. + + Returns: + bool: True if percentage of changed pixels exceeds pct_threshold + """ if threshold_val is None: threshold_val = self.pixel_val_threshold total_pixels = np.prod(img.shape) @@ -36,6 +55,17 @@ def pixel_threshold(self, img: np.ndarray, threshold_val: float = None) -> bool: return False def motion_detected(self, new_img: np.ndarray) -> bool: + """Process a new frame and detect if motion occurred. + + Uses three-frame differencing - compares the new frame against the previous + two frames to detect consistent motion. + + Args: + new_img: New frame to analyze for motion + + Returns: + bool: True if motion was detected + """ if self.unused: self.base_img = new_img self.base2 = self.base_img diff --git a/test/test_framegrab_with_hls.py b/test/test_framegrab_with_hls.py new file mode 100644 index 0000000..437365d --- /dev/null +++ b/test/test_framegrab_with_hls.py @@ -0,0 +1,89 @@ +import unittest +from unittest.mock import MagicMock, patch + +import numpy as np + +from framegrab.grabber import HttpLiveStreamingFrameGrabber + + +class TestHttpLiveStreamingFrameGrabber(unittest.TestCase): + def setUp(self): + """Common setup for all HLS tests""" + self.mock_frame = np.zeros((480, 640, 3), dtype=np.uint8) + self.base_config = { + "input_type": "hls", + "id": {"hls_url": "http://example.com/stream.m3u8"}, + "name": "test_stream", + } + + @patch("cv2.VideoCapture") + def test_grab_frame_success(self, mock_cv2): + """Test that the grabber initializes correctly with a valid config""" + # Setup mock + mock_capture = MagicMock() + mock_capture.isOpened.return_value = True + mock_capture.read.return_value = (True, self.mock_frame) + mock_cv2.return_value = mock_capture + + grabber = HttpLiveStreamingFrameGrabber(self.base_config) + self.assertEqual(grabber.hls_url, "http://example.com/stream.m3u8") + + frame = grabber.grab() + mock_cv2.assert_called_once_with("http://example.com/stream.m3u8") + mock_capture.read.assert_called_once() + self.assertEqual(frame.shape, (480, 640, 3)) + + grabber.release() + mock_capture.release.assert_called_once() + + def test_init_without_hls_url(self): + """Test that initialization fails without an HLS URL""" + config = {"input_type": "hls", "name": "test_stream"} + + with self.assertRaises(ValueError): + HttpLiveStreamingFrameGrabber(config) + + @patch("cv2.VideoCapture") + def test_grab_with_failed_connection(self, mock_cv2): + """Test that initialization fails when the stream cannot be opened""" + # Setup mock + mock_capture = MagicMock() + mock_capture.isOpened.return_value = False + mock_cv2.return_value = mock_capture + + with self.assertRaises(ValueError) as cm: + grabber = HttpLiveStreamingFrameGrabber(self.base_config) + grabber.grab() + + self.assertIn("Could not open HLS stream", str(cm.exception)) + + @patch("cv2.VideoCapture") + def test_grab_frame_failure(self, mock_cv2): + """Test frame grab failure""" + # Setup mock + mock_capture = MagicMock() + mock_capture.isOpened.return_value = True + mock_capture.read.return_value = (False, None) + mock_capture.retrieve.return_value = (False, None) + mock_cv2.return_value = mock_capture + + grabber = HttpLiveStreamingFrameGrabber(self.base_config) + + with self.assertRaises(Exception): + grabber.grab() + + @patch("cv2.VideoCapture") + def test_invalid_resolution_option(self, mock_cv2): + """Test that setting resolution raises an error""" + # Setup mock + mock_capture = MagicMock() + mock_capture.isOpened.return_value = True + mock_cv2.return_value = mock_capture + + config = self.base_config.copy() + config["options"] = {"resolution": {"width": 1920, "height": 1080}} + + grabber = HttpLiveStreamingFrameGrabber(config) + + with self.assertRaises(ValueError): + grabber.apply_options(config["options"]) diff --git a/test/test_framegrab_with_mock_camera.py b/test/test_framegrab_with_mock_camera.py index 73a9001..58d7384 100644 --- a/test/test_framegrab_with_mock_camera.py +++ b/test/test_framegrab_with_mock_camera.py @@ -4,29 +4,30 @@ import os import unittest + from framegrab.grabber import FrameGrabber, RTSPFrameGrabber + class TestFrameGrabWithMockCamera(unittest.TestCase): def test_crop_pixels(self): - """Grab a frame, crop a frame by pixels, and make sure the shape is correct. - """ + """Grab a frame, crop a frame by pixels, and make sure the shape is correct.""" config = { - 'name': 'mock_camera', - 'input_type': 'mock', - 'options': { - 'resolution': { - 'width': 640, - 'height': 480, + "name": "mock_camera", + "input_type": "mock", + "options": { + "resolution": { + "width": 640, + "height": 480, }, - 'crop': { - 'pixels': { - 'top': 40, - 'bottom': 440, - 'left': 120, - 'right': 520, + "crop": { + "pixels": { + "top": 40, + "bottom": 440, + "left": 120, + "right": 520, } - } - } + }, + }, } grabber = FrameGrabber.create_grabber(config) @@ -37,25 +38,24 @@ def test_crop_pixels(self): assert frame.shape == (400, 400, 3) def test_crop_relative(self): - """Grab a frame, crop a frame in an relative manner (0 to 1), and make sure the shape is correct. - """ + """Grab a frame, crop a frame in an relative manner (0 to 1), and make sure the shape is correct.""" config = { - 'name': 'mock_camera', - 'input_type': 'mock', - 'options': { - 'resolution': { - 'width': 640, - 'height': 480, + "name": "mock_camera", + "input_type": "mock", + "options": { + "resolution": { + "width": 640, + "height": 480, }, - 'crop': { - 'relative': { - 'top': .1, - 'bottom': .9, - 'left': .1, - 'right': .9, + "crop": { + "relative": { + "top": 0.1, + "bottom": 0.9, + "left": 0.1, + "right": 0.9, } - } - } + }, + }, } grabber = FrameGrabber.create_grabber(config) @@ -66,20 +66,19 @@ def test_crop_relative(self): assert frame.shape == (384, 512, 3) def test_zoom(self): - """Grab a frame, zoom a frame, and make sure the shape is correct. - """ + """Grab a frame, zoom a frame, and make sure the shape is correct.""" config = { - 'name': 'mock_camera', - 'input_type': 'mock', - 'options': { - 'resolution': { - 'width': 640, - 'height': 480, + "name": "mock_camera", + "input_type": "mock", + "options": { + "resolution": { + "width": 640, + "height": 480, }, - 'zoom': { - 'digital': 2, - } - } + "zoom": { + "digital": 2, + }, + }, } grabber = FrameGrabber.create_grabber(config) @@ -90,49 +89,45 @@ def test_zoom(self): assert frame.shape == (240, 320, 3) def test_attempt_create_grabber_with_invalid_input_type(self): - config = { - 'input_type': 'some_invalid_camera_type' - } + config = {"input_type": "some_invalid_camera_type"} with self.assertRaises(ValueError): FrameGrabber.create_grabber(config) def test_create_grabber_without_name(self): - config = { - 'input_type': 'mock' - } + config = {"input_type": "mock"} grabber = FrameGrabber.create_grabber(config) # Check that some camera name was added - assert len(grabber.config['name']) > 2 + assert len(grabber.config["name"]) > 2 grabber.release() def test_create_grabber_with_name(self): - user_provided_name = 'my_camera' + user_provided_name = "my_camera" config = { - 'name': user_provided_name, - 'input_type': 'mock', + "name": user_provided_name, + "input_type": "mock", } grabber = FrameGrabber.create_grabber(config) - assert grabber.config['name'] == user_provided_name + assert grabber.config["name"] == user_provided_name grabber.release() def test_create_grabbers_without_names(self): configs = [ - {'input_type': 'mock'}, - {'input_type': 'mock'}, - {'input_type': 'mock'}, + {"input_type": "mock"}, + {"input_type": "mock"}, + {"input_type": "mock"}, ] grabbers = FrameGrabber.create_grabbers(configs) - grabber_names = set([grabber.config['name'] for grabber in grabbers.values()]) + grabber_names = set([grabber.config["name"] for grabber in grabbers.values()]) # Make sure all the grabbers have unique names assert len(configs) == len(grabber_names) @@ -147,16 +142,16 @@ def test_attempt_create_more_grabbers_than_exist(self): # Connect to 3 grabbers, this should be fine configs = [ - {'input_type': 'mock'}, - {'input_type': 'mock'}, - {'input_type': 'mock'}, + {"input_type": "mock"}, + {"input_type": "mock"}, + {"input_type": "mock"}, ] grabbers = FrameGrabber.create_grabbers(configs) # Try to connect to another grabber, this should raise an exception because there are only 3 mock cameras available try: - FrameGrabber.create_grabber({'input_type': 'mock'}) + FrameGrabber.create_grabber({"input_type": "mock"}) self.fail() except ValueError: pass @@ -167,9 +162,9 @@ def test_attempt_create_more_grabbers_than_exist(self): def test_attempt_create_grabbers_with_duplicate_names(self): configs = [ - {'name': 'camera1', 'input_type': 'mock'}, - {'name': 'camera2', 'input_type': 'mock'}, - {'name': 'camera1', 'input_type': 'mock'}, + {"name": "camera1", "input_type": "mock"}, + {"name": "camera2", "input_type": "mock"}, + {"name": "camera1", "input_type": "mock"}, ] # Should raise an exception because camera1 is duplicated @@ -177,39 +172,38 @@ def test_attempt_create_grabbers_with_duplicate_names(self): FrameGrabber.create_grabbers(configs) def test_substitute_rtsp_url(self): - """Test that the RTSP password is substituted correctly. - """ - os.environ['RTSP_PASSWORD_1'] = 'password1' + """Test that the RTSP password is substituted correctly.""" + os.environ["RTSP_PASSWORD_1"] = "password1" config = { - 'input_type': 'rtsp', - 'id': {'rtsp_url': "rtsp://admin:{{RTSP_PASSWORD_1}}@10.0.0.1"}, + "input_type": "rtsp", + "id": {"rtsp_url": "rtsp://admin:{{RTSP_PASSWORD_1}}@10.0.0.1"}, } substituted_config = RTSPFrameGrabber._substitute_rtsp_password(config) - substituted_rtsp_url = substituted_config['id']['rtsp_url'] + substituted_rtsp_url = substituted_config["id"]["rtsp_url"] assert substituted_rtsp_url == "rtsp://admin:password1@10.0.0.1" def test_substitute_rtsp_url_password_not_set(self): - """Test that an exception is raised if the user adds a placeholder but neglects to set the environment variable. - """ + """Test that an exception is raised if the user adds a placeholder but neglects to set the environment variable.""" config = { - 'input_type': 'rtsp', - 'id': {'rtsp_url': "rtsp://admin:{{SOME_NONEXISTENT_ENV_VARIABLE}}@10.0.0.1"}, + "input_type": "rtsp", + "id": { + "rtsp_url": "rtsp://admin:{{SOME_NONEXISTENT_ENV_VARIABLE}}@10.0.0.1" + }, } - + with self.assertRaises(ValueError): RTSPFrameGrabber._substitute_rtsp_password(config) def test_substitute_rtsp_url_without_placeholder(self): - """Users should be able to use RTSP urls without a password placeholder. In this case, the config should be returned unchanged. - """ + """Users should be able to use RTSP urls without a password placeholder. In this case, the config should be returned unchanged.""" config = { - 'input_type': 'rtsp', - 'id': {'rtsp_url': "rtsp://admin:password@10.0.0.1"}, + "input_type": "rtsp", + "id": {"rtsp_url": "rtsp://admin:password@10.0.0.1"}, } - + new_config = RTSPFrameGrabber._substitute_rtsp_password(config) - assert new_config == config + assert new_config == config