Skip to content

Commit

Permalink
Fixing RTSP Grabber Exception Handling (#36)
Browse files Browse the repository at this point in the history
* fixing exception handling and adding tests

* Automatically reformatting code with black and isort

* adjusting readme

---------

Co-authored-by: Tim Huff <[email protected]>
Co-authored-by: Auto-format Bot <[email protected]>
  • Loading branch information
3 people authored Mar 22, 2024
1 parent 62002b7 commit c2b9e9b
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 156 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ from framegrab import FrameGrabber, MotionDetector
motion_threshold = 1.0
config = {
'input_type': 'webcam',
'input_type': 'generic_usb',
}
grabber = FrameGrabber.create_grabber(config)
m = MotionDetector(pct_threshold=motion_threshold)
Expand All @@ -243,6 +243,6 @@ We welcome contributions to FrameGrab! If you would like to contribute, please f

## License

FrameGrab is released under the MIT License. For more information, please refer to the [LICENSE.txt](LICENSE.txt) file.
FrameGrab is released under the MIT License. For more information, please refer to the [LICENSE.txt](https://github.com/groundlight/framegrab/blob/main/LICENSE.txt) file.


169 changes: 16 additions & 153 deletions src/framegrab/grabber.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,13 +639,16 @@ class RTSPFrameGrabber(FrameGrabber):
"""

def __init__(self, config: dict):
self.config = self._substitute_rtsp_password(config)
self.stream = self.config.get("id", {}).get("rtsp_url")
if not self.stream:
camera_name = self.config.get("name", "Unnamed RTSP Stream")
rtsp_url = config.get("id", {}).get("rtsp_url")
if not rtsp_url:
camera_name = config.get("name", "Unnamed RTSP Stream")
raise ValueError(
f"No RTSP URL provided for {camera_name}. Please add an rtsp_url attribute to the config under id."
)

self.config = RTSPFrameGrabber._substitute_rtsp_password(config)
self.rtsp_url = self.config["id"]["rtsp_url"]

self.lock = Lock()
self.run = True
self.keep_connection_open = config.get("options", {}).get("keep_connection_open", True)
Expand All @@ -654,7 +657,8 @@ def __init__(self, config: dict):
self._open_connection()
self._init_drain_thread()

def _substitute_rtsp_password(self, config: dict) -> dict:
@staticmethod
def _substitute_rtsp_password(config: dict) -> dict:
"""
Substitutes the password placeholder in the rtsp_url with the actual password
from an environment variable.
Expand All @@ -670,8 +674,10 @@ def _substitute_rtsp_password(self, config: dict) -> dict:
rtsp_url = config.get("id", {}).get("rtsp_url", "")
matches = re.findall(pattern, rtsp_url)

if len(matches) != 1:
raise ValueError("RTSP URL should contain exactly one placeholder for the password.")
if len(matches) == 0:
return config # make no change to config if no password placeholder is found
elif len(matches) > 1:
raise ValueError("RTSP URL should contain no more than one placeholder for the password.")

match = matches[0]
password_env_var = os.environ.get(match)
Expand All @@ -690,10 +696,10 @@ def _apply_camera_specific_options(self, options: dict) -> None:
raise ValueError(f"Resolution was set for {camera_name}, but resolution cannot be set for RTSP streams.")

def _open_connection(self):
self.capture = cv2.VideoCapture(self.stream)
self.capture = cv2.VideoCapture(self.rtsp_url)
if not self.capture.isOpened():
raise ValueError(
f"Could not open RTSP stream: {self.stream}. Is the RTSP URL correct? Is the camera connected to the network?"
f"Could not open RTSP stream: {self.rtsp_url}. Is the RTSP URL correct? Is the camera connected to the network?"
)
logger.debug(f"Initialized video capture with backend={self.capture.getBackendName()}")

Expand Down Expand Up @@ -986,147 +992,4 @@ def release(self) -> None:
MockFrameGrabber.serial_numbers_in_use.remove(self.config["id"]["serial_number"])

def _apply_camera_specific_options(self, options: dict) -> None:
pass # no action necessary for mock camera


# # TODO update this class to work with the latest updates
# import os
# import fnmatch
# import random
# class DirectoryFrameGrabber(FrameGrabber):
# def __init__(self, stream=None, fps_target=0):
# """stream must be an file mask"""
# try:
# self.filename_list = []
# for filename in os.listdir():
# if fnmatch.fnmatch(filename, stream):
# self.filename_list.append(filename)
# logger.debug(f"found {len(self.filename_list)} files matching stream: {stream}")
# random.shuffle(self.filename_list)
# except Exception as e:
# logger.error(f"could not initialize DirectoryFrameGrabber: stream: {stream} filename is invalid or read error")
# raise e
# if len(self.filename_list) == 0:
# logger.warning(f"no files found matching stream: {stream}")

# def grab(self):
# if len(self.filename_list) == 0:
# raise RuntimeWarning(f"could not read frame from {self.capture}. possible end of file.")

# start = time.time()
# frame = cv2.imread(self.filename_list[0], cv2.IMREAD_GRAYSCALE)
# self.filename_list.pop(0)
# logger.debug(f"read the frame in {1000*(time.time()-start):.1f}ms")

# return frame

# # TODO update this class to work with the latest updates
# class FileStreamFrameGrabber(FrameGrabber):
# def __init__(self, stream=None, fps_target=0):
# """stream must be an filename"""
# try:
# self.capture = cv2.VideoCapture(stream)
# logger.debug(f"initialized video capture with backend={self.capture.getBackendName()}")
# ret, frame = self.capture.read()
# self.fps_source = round(self.capture.get(cv2.CAP_PROP_FPS), 2)
# self.fps_target = fps_target
# logger.debug(f"source FPS : {self.fps_source} / target FPS : {self.fps_target}")
# self.remainder = 0.0
# except Exception as e:
# logger.error(f"could not initialize DeviceFrameGrabber: stream: {stream} filename is invalid or read error")
# raise e

# def _read(self) -> np.ndarray:
# """decimates stream to self.fps_target, 0 fps to use full original stream.
# consistent with existing behavior based on VideoCapture.read()
# which may return None when it cannot read a frame.
# """
# start = time.time()

# if self.fps_target > 0 and self.fps_target < self.fps_source:
# drop_frames = (self.fps_source / self.fps_target) - 1 + self.remainder
# for i in range(round(drop_frames)):
# ret, frame = self.capture.read()
# self.remainder = round(drop_frames - round(drop_frames), 2)
# logger.info(
# f"dropped {round(drop_frames)} frames to meet {self.fps_target} FPS target from {self.fps_source} FPS source (off by {self.remainder} frames)"
# )
# else:
# logger.debug(f"frame dropping disabled for {self.fps_target} FPS target from {self.fps_source} FPS source")

# ret, frame = self.capture.read()
# if not ret:
# raise RuntimeWarning(f"could not read frame from {self.capture}. possible end of file.")
# now = time.time()
# logger.debug(f"read the frame in {1000*(now-start):.1f}ms")
# return frame

# # TODO update this class to work with the latest updates'
# import pafy
# class YouTubeFrameGrabber(FrameGrabber):
# """grabs the most recent frame from an YouTube stream. To avoid extraneous bandwidth
# this class tears down the stream between each frame grab. maximum framerate
# is likely around 0.5fps in most cases.
# """

# def __init__(self, stream=None):
# self.stream = stream
# self.video = pafy.new(self.stream)
# self.best_video = self.video.getbest(preftype="mp4")
# self.capture = cv2.VideoCapture(self.best_video.url)
# logger.debug(f"initialized video capture with backend={self.capture.getBackendName()}")
# if not self.capture.isOpened():
# raise ValueError(f"could not initially open {self.stream}")
# self.capture.release()

# def reset_stream(self):
# self.video = pafy.new(self.stream)
# self.best_video = self.video.getbest(preftype="mp4")
# self.capture = cv2.VideoCapture(self.best_video.url)
# logger.debug(f"initialized video capture with backend={self.capture.getBackendName()}")
# if not self.capture.isOpened():
# raise ValueError(f"could not initially open {self.stream}")
# self.capture.release()

# def grab(self):
# start = time.time()
# self.capture = cv2.VideoCapture(self.best_video.url)
# ret, frame = self.capture.read() # grab and decode since we want this frame
# if not ret:
# logger.error(f"could not read frame from {self.capture}. attempting to reset stream")
# self.reset_stream()
# self.capture = cv2.VideoCapture(self.best_video.url)
# ret, frame = self.capture.read()
# if not ret:
# logger.error(f"failed to effectively reset stream {self.stream} / {self.best_video.url}")
# now = time.time()
# logger.debug(f"read the frame in {1000*(now-start):.1f}ms")
# self.capture.release()
# return frame

# # TODO update this class to work with the latest updates
# import urllib
# class ImageURLFrameGrabber(FrameGrabber):
# """grabs the current image at a single URL.
# NOTE: if image is expected to be refreshed or change with a particular frequency,
# it is up to the user of the class to call the `grab` method with that frequency
# """

# def __init__(self, url=None, **kwargs):
# self.url = url

# def grab(self):
# start = time.time()
# try:
# req = urllib.request.urlopen(self.url)
# response = req.read()
# arr = np.asarray(bytearray(response), dtype=np.uint8)
# frame = cv2.imdecode(arr, -1) # 'Load it as it is'
# except Exception as e:
# logger.error(f"could not grab frame from {self.url}: {str(e)}")
# frame = None
# now = time.time()
# elapsed = now - start
# logger.info(f"read image from URL {self.url} into frame in {elapsed}s")

# return frame
pass # no action necessary for mock cameras
41 changes: 40 additions & 1 deletion test/test_framegrab_with_mock_camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
Intended to check basic functionality like cropping, zooming, config validation, etc.
"""

import os
import unittest
from framegrab.grabber import FrameGrabber
from framegrab.grabber import FrameGrabber, RTSPFrameGrabber

class TestFrameGrabWithMockCamera(unittest.TestCase):
def test_crop_pixels(self):
Expand Down Expand Up @@ -174,3 +175,41 @@ def test_attempt_create_grabbers_with_duplicate_names(self):
# Should raise an exception because camera1 is duplicated
with self.assertRaises(ValueError):
FrameGrabber.create_grabbers(configs)

def test_substitute_rtsp_url(self):
"""Test that the RTSP password is substituted correctly.
"""
os.environ['RTSP_PASSWORD_1'] = 'password1'

config = {
'input_type': 'rtsp',
'id': {'rtsp_url': "rtsp://admin:{{RTSP_PASSWORD_1}}@10.0.0.1"},
}

substituted_config = RTSPFrameGrabber._substitute_rtsp_password(config)
substituted_rtsp_url = substituted_config['id']['rtsp_url']

assert substituted_rtsp_url == "rtsp://admin:[email protected]"

def test_substitute_rtsp_url_password_not_set(self):
"""Test that an exception is raised if the user adds a placeholder but neglects to set the environment variable.
"""
config = {
'input_type': 'rtsp',
'id': {'rtsp_url': "rtsp://admin:{{SOME_NONEXISTENT_ENV_VARIABLE}}@10.0.0.1"},
}

with self.assertRaises(ValueError):
RTSPFrameGrabber._substitute_rtsp_password(config)

def test_substitute_rtsp_url_without_placeholder(self):
"""Users should be able to use RTSP urls without a password placeholder. In this case, the config should be returned unchanged.
"""
config = {
'input_type': 'rtsp',
'id': {'rtsp_url': "rtsp://admin:[email protected]"},
}

new_config = RTSPFrameGrabber._substitute_rtsp_password(config)

assert new_config == config

0 comments on commit c2b9e9b

Please sign in to comment.