diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index 33b42303..f00b9f1d 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -1,22 +1,18 @@ # -*- coding: utf-8 -*- import os.path as op -import numpy as np import queue -import time import os import threading import uuid -import neurobooth_os.iout.metadator as meta import logging +import multiprocessing from typing import Callable, Any import cv2 import PySpin from pylsl import StreamInfo, StreamOutlet -import skvideo -import skvideo.io -import h5py +import neurobooth_os.iout.metadator as meta from neurobooth_os.iout.stim_param_reader import FlirDeviceArgs from neurobooth_os.iout.stream_utils import DataVersion, set_stream_description from neurobooth_os.log_manager import APP_LOG_NAME @@ -25,25 +21,54 @@ os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" +def camCaptureVid(video_filename, frame_rate, frame_size, image_queue, recording) -> None: + """ + Takes Flir frames from a queue and writes them to a video file + + Parameters + ---------- + video_filename Name of file to write frames to + frame_rate Frames per second + frame_size Size of each frame + image_queue Queue of Flir frames to write (multithreading.Queue) + recording boolean value object (multithreading.Manager.Value) If True, the camera is recording + """ + logger = logging.getLogger(APP_LOG_NAME) + logger.debug('FLIR: Save Process Started') + + try: + fourcc = cv2.VideoWriter_fourcc(*"MJPG") + video_out = cv2.VideoWriter(video_filename, fourcc, frame_rate, frame_size) + + while recording.value or not image_queue.empty(): + try: + dequeuedImage = image_queue.get(block=True, timeout=1) + video_out.write(dequeuedImage) + except queue.Empty: + continue + except Exception as e: + logger.error(f'FLIR: Error in save process: {e}') + finally: + video_out.release() + logger.debug('FLIR: Video File Released; Exiting Save Process') + + class FlirException(Exception): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) class VidRec_Flir: - # def __init__(self, - # sizex=round(1936 / 2), sizey=round(1216 / 2), fps=196, - # camSN="20522874", exposure=4500, gain=20, gamma=.6, - # device_id="FLIR_blackfly_1", sensor_ids=['FLIR_rgb_1'], fd= .5): - # Staging FLIR SN is 22348141 def __init__( - self, - device_args: FlirDeviceArgs, - exposure=4500, - gain=20, - gamma=0.6, - fd=1, + self, + device_args: FlirDeviceArgs, + exposure=4500, + gain=20, + gamma=0.6, + fd=1, ): + self.frame_counter = None + self.save_process = None self.device_args: FlirDeviceArgs = device_args # not currently using sizex, sizey --> need to update to use these parameters # need to read these parameters from database @@ -65,8 +90,7 @@ def __init__( self.get_cam() self.setup_cam() - - self.image_queue = queue.Queue(0) + self.image_queue = multiprocessing.Queue(0) self.outlet = self.createOutlet() self.logger.debug(f'FLIR: fps={str(self.device_args.sample_rate())}; ' @@ -118,6 +142,7 @@ def setup_cam(self): # cam.BalanceWhiteAuto.SetValue(0) def createOutlet(self): + self.logger.debug("Creating Outlet") self.streamName = "FlirFrameIndex" self.oulet_id = str(uuid.uuid4()) info = set_stream_description( @@ -141,25 +166,12 @@ def createOutlet(self): exposure=str(self.exposure), gain=str(self.gain), gamma=str(self.gamma), - # device_model_id=self.cam.get_device_name().decode(), ) msg_body = DeviceInitialization(stream_name=self.streamName, outlet_id=self.oulet_id) with meta.get_database_connection() as db_conn: meta.post_message(Request(source='Flir', destination='CTR', body=msg_body), conn=db_conn) return StreamOutlet(info) - # function to capture images, convert to numpy, send to queue, and release - # from buffer in separate process - def camCaptureVid(self): - self.logger.debug('FLIR: Save Thread Started') - while self.recording or self.image_queue.qsize(): - try: - dequeuedImage = self.image_queue.get(block=True, timeout=1) - self.video_out.write(dequeuedImage) - except queue.Empty: - continue - self.logger.debug('FLIR: Exiting Save Thread') - def start(self, name="temp_video"): self.prepare(name) self.video_thread = threading.Thread(target=self.record) @@ -180,11 +192,12 @@ def prepare(self, name="temp_video"): self.frameSize = (im.shape[1], im.shape[0]) self.video_filename = "{}_flir.avi".format(name) - fourcc = cv2.VideoWriter_fourcc(*"MJPG") + # TODO: REMOVE THESE NEXT 3 LINES + # actual_fname = op.split(self.video_filename)[-1] + # fname = os.path.join("D://", "neurobooth", "neurobooth_data", "100001_2025-01-29", actual_fname) + # self.video_filename = fname + self.FRAME_RATE_OUT = self.cam.AcquisitionResultingFrameRate() - self.video_out = cv2.VideoWriter( - self.video_filename, fourcc, self.FRAME_RATE_OUT, self.frameSize - ) msg_body = NewVideoFile(stream_name=self.streamName, filename=op.split(self.video_filename)[-1]) with meta.get_database_connection() as db_conn: @@ -195,40 +208,46 @@ def record(self): self.logger.debug('FLIR: LSL Thread Started') self.recording = True self.frame_counter = 0 - self.save_thread = threading.Thread(target=self.camCaptureVid) - self.save_thread.start() - - self.stamp = [] - while self.recording: - # Exception for failed waiting self.cam.GetNextImage(1000) + with multiprocessing.Manager() as manager: + recording = manager.Value('b', True) try: - im, tsmp = self.imgage_proc() - except: - continue - - self.image_queue.put(im) - self.stamp.append(tsmp) - - try: - self.outlet.push_sample([self.frame_counter, tsmp]) - except BaseException: - self.logger.debug(f"Reopening FLIR {self.device_index} stream already closed") - self.outlet = self.createOutlet(self.video_filename) - self.outlet.push_sample([self.frame_counter, tsmp]) - - # self.video_out.write(im_conv_d) - self.frame_counter += 1 - - if not self.frame_counter % 1000 and self.image_queue.qsize() > 2: - self.logger.debug( - f"Queue length is {self.image_queue.qsize()} frame count: {self.frame_counter}" - ) - - self.cam.EndAcquisition() - self.recording = False - self.save_thread.join() - self.video_out.release() - self.logger.debug('FLIR: Video File Released; Exiting LSL Thread') + self.save_process = multiprocessing.Process(target=camCaptureVid, + args=(self.video_filename, + self.FRAME_RATE_OUT, + self.frameSize, + self.image_queue, + recording)) + self.save_process.start() + except BaseException as e: + self.logger.error(f'Unable to start Flir save process; error={e}') + + self.stamp = [] + while self.recording: + # Exception for failed waiting self.cam.GetNextImage(2000) + try: + im, tsmp = self.imgage_proc() + except: + continue + self.image_queue.put(im) + self.stamp.append(tsmp) + try: + self.outlet.push_sample([self.frame_counter, tsmp]) + except BaseException: + self.logger.debug(f"Reopening FLIR {self.device_index} stream already closed") + self.outlet = self.createOutlet(self.video_filename) + self.outlet.push_sample([self.frame_counter, tsmp]) + + self.frame_counter += 1 + + if not self.frame_counter % 1000 and self.image_queue.qsize() > 2: + self.logger.debug( + f"Queue length is {self.image_queue.qsize()} frame count: {self.frame_counter}" + ) + self.cam.EndAcquisition() + recording.value = False + self.recording = False + self.save_process.join() + self.logger.debug('FLIR: Exiting LSL Thread') def stop(self): if self.open and self.recording: @@ -243,20 +262,8 @@ def close(self): def ensure_stopped(self, timeout_seconds: float) -> None: """Check to make sure the recording is actually stopped.""" - self.video_thread.join() + self.video_thread.join(timeout_seconds) if self.video_thread.is_alive(): - self.logger.error('FLIR: Potential Zombie Thread Detected!') - raise FlirException('Potential Zombie Thread Detected!') - - -if __name__ == "__main__": - flir = VidRec_Flir() - print('Recording...') - flir.start() - time.sleep(10) - flir.stop() - print('Stopping...') - flir.ensure_stopped(timeout_seconds=5) - flir.close() - tdiff = np.diff(flir.stamp) / 1e6 - print(f"diff range {np.ptp(tdiff):.2e}") + self.logger.error(f'FLIR: Potential Zombie Thread Detected!' + f' Stop taking longer than {timeout_seconds} seconds') + # raise FlirException('Potential Zombie Thread Detected!')