From 19655d2907703ea6fed3b5ce1ec4ae57d3ccc74e Mon Sep 17 00:00:00 2001 From: Larry White Date: Thu, 23 Jan 2025 14:40:13 -0500 Subject: [PATCH 01/19] try multiprocessing of frame queue --- neurobooth_os/iout/flir_cam.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index 33b42303..87d7cbfd 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -6,17 +6,15 @@ import os import threading import uuid -import neurobooth_os.iout.metadator as meta import logging +import multiprocessing from typing import Callable, Any import cv2 import PySpin from pylsl import StreamInfo, StreamOutlet -import skvideo -import skvideo.io -import h5py +import neurobooth_os.iout.metadator as meta from neurobooth_os.iout.stim_param_reader import FlirDeviceArgs from neurobooth_os.iout.stream_utils import DataVersion, set_stream_description from neurobooth_os.log_manager import APP_LOG_NAME @@ -44,6 +42,8 @@ def __init__( gamma=0.6, fd=1, ): + self.frame_counter = None + self.save_process = None self.device_args: FlirDeviceArgs = device_args # not currently using sizex, sizey --> need to update to use these parameters # need to read these parameters from database @@ -65,8 +65,7 @@ def __init__( self.get_cam() self.setup_cam() - - self.image_queue = queue.Queue(0) + self.image_queue = multiprocessing.Queue(0) self.outlet = self.createOutlet() self.logger.debug(f'FLIR: fps={str(self.device_args.sample_rate())}; ' @@ -195,8 +194,9 @@ def record(self): self.logger.debug('FLIR: LSL Thread Started') self.recording = True self.frame_counter = 0 - self.save_thread = threading.Thread(target=self.camCaptureVid) - self.save_thread.start() + + self.save_process = multiprocessing.Process(target = self.camCaptureVid()) + self.save_process.start() self.stamp = [] while self.recording: @@ -216,17 +216,15 @@ def record(self): self.outlet = self.createOutlet(self.video_filename) self.outlet.push_sample([self.frame_counter, tsmp]) - # self.video_out.write(im_conv_d) self.frame_counter += 1 if not self.frame_counter % 1000 and self.image_queue.qsize() > 2: self.logger.debug( f"Queue length is {self.image_queue.qsize()} frame count: {self.frame_counter}" ) - self.cam.EndAcquisition() self.recording = False - self.save_thread.join() + self.save_process.join() self.video_out.release() self.logger.debug('FLIR: Video File Released; Exiting LSL Thread') @@ -243,7 +241,7 @@ def close(self): def ensure_stopped(self, timeout_seconds: float) -> None: """Check to make sure the recording is actually stopped.""" - self.video_thread.join() + self.video_thread.join(timeout_seconds) if self.video_thread.is_alive(): self.logger.error('FLIR: Potential Zombie Thread Detected!') raise FlirException('Potential Zombie Thread Detected!') From 0181f593f3a9d42c0ba8aeb8a92bbfdc4784e9ba Mon Sep 17 00:00:00 2001 From: Larry White Date: Thu, 23 Jan 2025 15:04:22 -0500 Subject: [PATCH 02/19] Update flir_cam.py --- neurobooth_os/iout/flir_cam.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index 87d7cbfd..74be6112 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -150,7 +150,7 @@ def createOutlet(self): # function to capture images, convert to numpy, send to queue, and release # from buffer in separate process def camCaptureVid(self): - self.logger.debug('FLIR: Save Thread Started') + self.logger.debug('FLIR: Save Process Started') while self.recording or self.image_queue.qsize(): try: dequeuedImage = self.image_queue.get(block=True, timeout=1) @@ -191,12 +191,16 @@ def prepare(self, name="temp_video"): self.streaming = True def record(self): + self.logger.debug('FLIR: LSL Thread Started') self.recording = True self.frame_counter = 0 - self.save_process = multiprocessing.Process(target = self.camCaptureVid()) - self.save_process.start() + try: + self.save_process = multiprocessing.Process(target = self.camCaptureVid()) + self.save_process.start() + except BaseException as e: + self.logger.error(f'Unable to start Flir save process; error={e}') self.stamp = [] while self.recording: @@ -206,7 +210,12 @@ def record(self): except: continue - self.image_queue.put(im) + try: + self.image_queue.put(im) + except BaseException as e: + self.logger.critical(f'Unable to enqueue Flir frame; error={e}') + raise e + self.stamp.append(tsmp) try: From 9d7100c8f25da5b0fcaca9bfe61ee81c8998deef Mon Sep 17 00:00:00 2001 From: Larry White Date: Sun, 26 Jan 2025 12:18:42 -0500 Subject: [PATCH 03/19] Update flir_cam.py --- neurobooth_os/iout/flir_cam.py | 36 ++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index 74be6112..18ee3a9c 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -150,14 +150,25 @@ def createOutlet(self): # function to capture images, convert to numpy, send to queue, and release # from buffer in separate process def camCaptureVid(self): - self.logger.debug('FLIR: Save Process Started') - while self.recording or self.image_queue.qsize(): - try: - dequeuedImage = self.image_queue.get(block=True, timeout=1) - self.video_out.write(dequeuedImage) - except queue.Empty: - continue - self.logger.debug('FLIR: Exiting Save Thread') + try: + fourcc = cv2.VideoWriter_fourcc(*"MJPG") + video_out = cv2.VideoWriter( + self.video_filename, fourcc, + self.FRAME_RATE_OUT, self.frameSize + ) + self.logger.debug('FLIR: Save Process Started') + + while self.recording or not self.image_queue.empty(): + try: + dequeuedImage = self.image_queue.get(block=True, timeout=1) + video_out.write(dequeuedImage) + except queue.Empty: + continue + except Exception as e: + self.logger.error(f'FLIR: Error in save process: {e}') + finally: + video_out.release() + self.logger.debug('FLIR: Video File Released; Exiting Save Process') def start(self, name="temp_video"): self.prepare(name) @@ -179,11 +190,7 @@ def prepare(self, name="temp_video"): self.frameSize = (im.shape[1], im.shape[0]) self.video_filename = "{}_flir.avi".format(name) - fourcc = cv2.VideoWriter_fourcc(*"MJPG") self.FRAME_RATE_OUT = self.cam.AcquisitionResultingFrameRate() - self.video_out = cv2.VideoWriter( - self.video_filename, fourcc, self.FRAME_RATE_OUT, self.frameSize - ) msg_body = NewVideoFile(stream_name=self.streamName, filename=op.split(self.video_filename)[-1]) with meta.get_database_connection() as db_conn: @@ -197,7 +204,7 @@ def record(self): self.frame_counter = 0 try: - self.save_process = multiprocessing.Process(target = self.camCaptureVid()) + self.save_process = multiprocessing.Process(target=self.camCaptureVid) self.save_process.start() except BaseException as e: self.logger.error(f'Unable to start Flir save process; error={e}') @@ -234,8 +241,7 @@ def record(self): self.cam.EndAcquisition() self.recording = False self.save_process.join() - self.video_out.release() - self.logger.debug('FLIR: Video File Released; Exiting LSL Thread') + self.logger.debug('FLIR: Exiting LSL Thread') def stop(self): if self.open and self.recording: From d99d9ff1a8175244c4b3c6cb6139819ef7b15839 Mon Sep 17 00:00:00 2001 From: Larry White Date: Sun, 26 Jan 2025 13:03:51 -0500 Subject: [PATCH 04/19] handle additional variable shared between processes --- neurobooth_os/iout/flir_cam.py | 103 ++++++++++++++++----------------- 1 file changed, 51 insertions(+), 52 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index 18ee3a9c..10006b6e 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -29,18 +29,13 @@ def __init__(self, *args, **kwargs): class VidRec_Flir: - # def __init__(self, - # sizex=round(1936 / 2), sizey=round(1216 / 2), fps=196, - # camSN="20522874", exposure=4500, gain=20, gamma=.6, - # device_id="FLIR_blackfly_1", sensor_ids=['FLIR_rgb_1'], fd= .5): - # Staging FLIR SN is 22348141 def __init__( - self, - device_args: FlirDeviceArgs, - exposure=4500, - gain=20, - gamma=0.6, - fd=1, + self, + device_args: FlirDeviceArgs, + exposure=4500, + gain=20, + gamma=0.6, + fd=1, ): self.frame_counter = None self.save_process = None @@ -149,26 +144,24 @@ def createOutlet(self): # function to capture images, convert to numpy, send to queue, and release # from buffer in separate process - def camCaptureVid(self): + def camCaptureVid(self, video_filename, frame_rate, frame_size, image_queue, manager): + logger = logging.getLogger(APP_LOG_NAME) try: fourcc = cv2.VideoWriter_fourcc(*"MJPG") - video_out = cv2.VideoWriter( - self.video_filename, fourcc, - self.FRAME_RATE_OUT, self.frameSize - ) - self.logger.debug('FLIR: Save Process Started') + video_out = cv2.VideoWriter(video_filename, fourcc, frame_rate, frame_size) + logger.debug('FLIR: Save Process Started') - while self.recording or not self.image_queue.empty(): + while manager.recording or not image_queue.empty(): try: - dequeuedImage = self.image_queue.get(block=True, timeout=1) + dequeuedImage = image_queue.get(block=True, timeout=1) video_out.write(dequeuedImage) except queue.Empty: continue except Exception as e: - self.logger.error(f'FLIR: Error in save process: {e}') + logger.error(f'FLIR: Error in save process: {e}') finally: video_out.release() - self.logger.debug('FLIR: Video File Released; Exiting Save Process') + logger.debug('FLIR: Video File Released; Exiting Save Process') def start(self, name="temp_video"): self.prepare(name) @@ -202,44 +195,50 @@ def record(self): self.logger.debug('FLIR: LSL Thread Started') self.recording = True self.frame_counter = 0 - - try: - self.save_process = multiprocessing.Process(target=self.camCaptureVid) - self.save_process.start() - except BaseException as e: - self.logger.error(f'Unable to start Flir save process; error={e}') - - self.stamp = [] - while self.recording: - # Exception for failed waiting self.cam.GetNextImage(1000) - try: - im, tsmp = self.imgage_proc() - except: - continue + with multiprocessing.Manager() as manager: try: - self.image_queue.put(im) + self.save_process = multiprocessing.Process(target=self.camCaptureVid, + args=(self.video_filename, + self.FRAME_RATE_OUT, + self.frameSize)) + self.save_process.start() except BaseException as e: - self.logger.critical(f'Unable to enqueue Flir frame; error={e}') - raise e + self.logger.error(f'Unable to start Flir save process; error={e}') + + self.stamp = [] + manager.recording = self.recording + while self.recording: + # Exception for failed waiting self.cam.GetNextImage(1000) + try: + im, tsmp = self.imgage_proc() + except: + continue - self.stamp.append(tsmp) + try: + self.image_queue.put(im) + except BaseException as e: + self.logger.critical(f'Unable to enqueue Flir frame; error={e}') + raise e - try: - self.outlet.push_sample([self.frame_counter, tsmp]) - except BaseException: - self.logger.debug(f"Reopening FLIR {self.device_index} stream already closed") - self.outlet = self.createOutlet(self.video_filename) - self.outlet.push_sample([self.frame_counter, tsmp]) - - self.frame_counter += 1 - - if not self.frame_counter % 1000 and self.image_queue.qsize() > 2: - self.logger.debug( - f"Queue length is {self.image_queue.qsize()} frame count: {self.frame_counter}" - ) + self.stamp.append(tsmp) + + try: + self.outlet.push_sample([self.frame_counter, tsmp]) + except BaseException: + self.logger.debug(f"Reopening FLIR {self.device_index} stream already closed") + self.outlet = self.createOutlet(self.video_filename) + self.outlet.push_sample([self.frame_counter, tsmp]) + + self.frame_counter += 1 + + if not self.frame_counter % 1000 and self.image_queue.qsize() > 2: + self.logger.debug( + f"Queue length is {self.image_queue.qsize()} frame count: {self.frame_counter}" + ) self.cam.EndAcquisition() self.recording = False + manager.recording = False self.save_process.join() self.logger.debug('FLIR: Exiting LSL Thread') From 27a3279c8890241f3c78db6d7dcefe9cace4d188 Mon Sep 17 00:00:00 2001 From: Larry White Date: Sun, 26 Jan 2025 13:36:11 -0500 Subject: [PATCH 05/19] Update flir_cam.py --- neurobooth_os/iout/flir_cam.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index 10006b6e..c71abbf1 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -144,14 +144,14 @@ def createOutlet(self): # function to capture images, convert to numpy, send to queue, and release # from buffer in separate process - def camCaptureVid(self, video_filename, frame_rate, frame_size, image_queue, manager): + def camCaptureVid(self, video_filename, frame_rate, frame_size, image_queue, recording): logger = logging.getLogger(APP_LOG_NAME) try: fourcc = cv2.VideoWriter_fourcc(*"MJPG") video_out = cv2.VideoWriter(video_filename, fourcc, frame_rate, frame_size) logger.debug('FLIR: Save Process Started') - while manager.recording or not image_queue.empty(): + while recording or not image_queue.empty(): try: dequeuedImage = image_queue.get(block=True, timeout=1) video_out.write(dequeuedImage) @@ -191,23 +191,23 @@ def prepare(self, name="temp_video"): self.streaming = True def record(self): - self.logger.debug('FLIR: LSL Thread Started') self.recording = True self.frame_counter = 0 with multiprocessing.Manager() as manager: - + recording = manager.Value('b', True) try: self.save_process = multiprocessing.Process(target=self.camCaptureVid, args=(self.video_filename, self.FRAME_RATE_OUT, - self.frameSize)) + self.frameSize, + queue, + recording)) self.save_process.start() except BaseException as e: self.logger.error(f'Unable to start Flir save process; error={e}') self.stamp = [] - manager.recording = self.recording while self.recording: # Exception for failed waiting self.cam.GetNextImage(1000) try: @@ -238,7 +238,7 @@ def record(self): ) self.cam.EndAcquisition() self.recording = False - manager.recording = False + recording.value = False self.save_process.join() self.logger.debug('FLIR: Exiting LSL Thread') From f533091d103512f02b71083e763ba889b9408034 Mon Sep 17 00:00:00 2001 From: Larry White Date: Sun, 26 Jan 2025 14:06:20 -0500 Subject: [PATCH 06/19] Update flir_cam.py --- neurobooth_os/iout/flir_cam.py | 60 +++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index c71abbf1..7706be0e 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -23,6 +23,27 @@ os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" +def camCaptureVid(video_filename, frame_rate, frame_size, image_queue, recording): + logger = logging.getLogger(APP_LOG_NAME) + logger.debug('FLIR: Save Process Started') + + try: + fourcc = cv2.VideoWriter_fourcc(*"MJPG") + video_out = cv2.VideoWriter(video_filename, fourcc, frame_rate, frame_size) + + while recording or not image_queue.empty(): + try: + dequeuedImage = image_queue.get(block=True, timeout=1) + video_out.write(dequeuedImage) + except queue.Empty: + continue + except Exception as e: + logger.error(f'FLIR: Error in save process: {e}') + finally: + video_out.release() + logger.debug('FLIR: Video File Released; Exiting Save Process') + + class FlirException(Exception): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -144,24 +165,25 @@ def createOutlet(self): # function to capture images, convert to numpy, send to queue, and release # from buffer in separate process - def camCaptureVid(self, video_filename, frame_rate, frame_size, image_queue, recording): - logger = logging.getLogger(APP_LOG_NAME) - try: - fourcc = cv2.VideoWriter_fourcc(*"MJPG") - video_out = cv2.VideoWriter(video_filename, fourcc, frame_rate, frame_size) - logger.debug('FLIR: Save Process Started') - - while recording or not image_queue.empty(): - try: - dequeuedImage = image_queue.get(block=True, timeout=1) - video_out.write(dequeuedImage) - except queue.Empty: - continue - except Exception as e: - logger.error(f'FLIR: Error in save process: {e}') - finally: - video_out.release() - logger.debug('FLIR: Video File Released; Exiting Save Process') + # def camCaptureVid(self, video_filename, frame_rate, frame_size, image_queue, recording): + # logger = logging.getLogger(APP_LOG_NAME) + # logger.debug('FLIR: Save Process Started') + # + # try: + # fourcc = cv2.VideoWriter_fourcc(*"MJPG") + # video_out = cv2.VideoWriter(video_filename, fourcc, frame_rate, frame_size) + # + # while recording or not image_queue.empty(): + # try: + # dequeuedImage = image_queue.get(block=True, timeout=1) + # video_out.write(dequeuedImage) + # except queue.Empty: + # continue + # except Exception as e: + # logger.error(f'FLIR: Error in save process: {e}') + # finally: + # video_out.release() + # logger.debug('FLIR: Video File Released; Exiting Save Process') def start(self, name="temp_video"): self.prepare(name) @@ -197,7 +219,7 @@ def record(self): with multiprocessing.Manager() as manager: recording = manager.Value('b', True) try: - self.save_process = multiprocessing.Process(target=self.camCaptureVid, + self.save_process = multiprocessing.Process(target=camCaptureVid, args=(self.video_filename, self.FRAME_RATE_OUT, self.frameSize, From 4b8ad3a49ea334bfc1a5602d4325ba2626ce21b6 Mon Sep 17 00:00:00 2001 From: Larry White Date: Sun, 26 Jan 2025 14:11:01 -0500 Subject: [PATCH 07/19] Update flir_cam.py --- neurobooth_os/iout/flir_cam.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index 7706be0e..114567e2 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -133,6 +133,7 @@ def setup_cam(self): # cam.BalanceWhiteAuto.SetValue(0) def createOutlet(self): + self.logger.debug("Creating Outlet") self.streamName = "FlirFrameIndex" self.oulet_id = str(uuid.uuid4()) info = set_stream_description( @@ -156,7 +157,6 @@ def createOutlet(self): exposure=str(self.exposure), gain=str(self.gain), gamma=str(self.gamma), - # device_model_id=self.cam.get_device_name().decode(), ) msg_body = DeviceInitialization(stream_name=self.streamName, outlet_id=self.oulet_id) with meta.get_database_connection() as db_conn: From f19619abcad27fe7c063023db5fa555c0fb2f88b Mon Sep 17 00:00:00 2001 From: Larry White Date: Sun, 26 Jan 2025 14:25:29 -0500 Subject: [PATCH 08/19] remove test code --- neurobooth_os/iout/flir_cam.py | 46 ++++++++-------------------------- 1 file changed, 11 insertions(+), 35 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index 114567e2..6530b25c 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -1,8 +1,6 @@ # -*- coding: utf-8 -*- import os.path as op -import numpy as np import queue -import time import os import threading import uuid @@ -163,28 +161,6 @@ def createOutlet(self): meta.post_message(Request(source='Flir', destination='CTR', body=msg_body), conn=db_conn) return StreamOutlet(info) - # function to capture images, convert to numpy, send to queue, and release - # from buffer in separate process - # def camCaptureVid(self, video_filename, frame_rate, frame_size, image_queue, recording): - # logger = logging.getLogger(APP_LOG_NAME) - # logger.debug('FLIR: Save Process Started') - # - # try: - # fourcc = cv2.VideoWriter_fourcc(*"MJPG") - # video_out = cv2.VideoWriter(video_filename, fourcc, frame_rate, frame_size) - # - # while recording or not image_queue.empty(): - # try: - # dequeuedImage = image_queue.get(block=True, timeout=1) - # video_out.write(dequeuedImage) - # except queue.Empty: - # continue - # except Exception as e: - # logger.error(f'FLIR: Error in save process: {e}') - # finally: - # video_out.release() - # logger.debug('FLIR: Video File Released; Exiting Save Process') - def start(self, name="temp_video"): self.prepare(name) self.video_thread = threading.Thread(target=self.record) @@ -283,14 +259,14 @@ def ensure_stopped(self, timeout_seconds: float) -> None: raise FlirException('Potential Zombie Thread Detected!') -if __name__ == "__main__": - flir = VidRec_Flir() - print('Recording...') - flir.start() - time.sleep(10) - flir.stop() - print('Stopping...') - flir.ensure_stopped(timeout_seconds=5) - flir.close() - tdiff = np.diff(flir.stamp) / 1e6 - print(f"diff range {np.ptp(tdiff):.2e}") +# if __name__ == "__main__": +# flir = VidRec_Flir() +# print('Recording...') +# flir.start() +# time.sleep(10) +# flir.stop() +# print('Stopping...') +# flir.ensure_stopped(timeout_seconds=5) +# flir.close() +# tdiff = np.diff(flir.stamp) / 1e6 +# print(f"diff range {np.ptp(tdiff):.2e}") From 8c12d468f62637ecd3390225775def2aefaf6c34 Mon Sep 17 00:00:00 2001 From: Larry White Date: Sun, 26 Jan 2025 14:32:55 -0500 Subject: [PATCH 09/19] Update flir_cam.py --- neurobooth_os/iout/flir_cam.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index 6530b25c..15aa5e4e 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -199,7 +199,7 @@ def record(self): args=(self.video_filename, self.FRAME_RATE_OUT, self.frameSize, - queue, + self.image_queue, recording)) self.save_process.start() except BaseException as e: From f34eebe9946690ea582c8a8f056a3115372884f1 Mon Sep 17 00:00:00 2001 From: Larry White Date: Sun, 26 Jan 2025 15:11:42 -0500 Subject: [PATCH 10/19] Update flir_cam.py --- neurobooth_os/iout/flir_cam.py | 30 ++++++------------------------ 1 file changed, 6 insertions(+), 24 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index 15aa5e4e..b8823305 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -213,12 +213,7 @@ def record(self): except: continue - try: - self.image_queue.put(im) - except BaseException as e: - self.logger.critical(f'Unable to enqueue Flir frame; error={e}') - raise e - + self.image_queue.put(im) self.stamp.append(tsmp) try: @@ -234,11 +229,11 @@ def record(self): self.logger.debug( f"Queue length is {self.image_queue.qsize()} frame count: {self.frame_counter}" ) - self.cam.EndAcquisition() - self.recording = False - recording.value = False - self.save_process.join() - self.logger.debug('FLIR: Exiting LSL Thread') + self.cam.EndAcquisition() + self.recording = False + recording.value = False + self.save_process.join() + self.logger.debug('FLIR: Exiting LSL Thread') def stop(self): if self.open and self.recording: @@ -257,16 +252,3 @@ def ensure_stopped(self, timeout_seconds: float) -> None: if self.video_thread.is_alive(): self.logger.error('FLIR: Potential Zombie Thread Detected!') raise FlirException('Potential Zombie Thread Detected!') - - -# if __name__ == "__main__": -# flir = VidRec_Flir() -# print('Recording...') -# flir.start() -# time.sleep(10) -# flir.stop() -# print('Stopping...') -# flir.ensure_stopped(timeout_seconds=5) -# flir.close() -# tdiff = np.diff(flir.stamp) / 1e6 -# print(f"diff range {np.ptp(tdiff):.2e}") From 94e8272544ace86bf07a627cc45bdcc7d89c314c Mon Sep 17 00:00:00 2001 From: Larry White Date: Sun, 26 Jan 2025 15:18:36 -0500 Subject: [PATCH 11/19] Update flir_cam.py --- neurobooth_os/iout/flir_cam.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index b8823305..1cd32ce5 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -250,5 +250,6 @@ def ensure_stopped(self, timeout_seconds: float) -> None: """Check to make sure the recording is actually stopped.""" self.video_thread.join(timeout_seconds) if self.video_thread.is_alive(): - self.logger.error('FLIR: Potential Zombie Thread Detected!') - raise FlirException('Potential Zombie Thread Detected!') + self.logger.error(f'FLIR: Potential Zombie Thread Detected!' + f' Stop taking longer than {timeout_seconds} seconds') + # raise FlirException('Potential Zombie Thread Detected!') From 4965a1d051442972ee3da37d86e7a8f2b9f0598c Mon Sep 17 00:00:00 2001 From: Larry White Date: Sun, 26 Jan 2025 15:39:07 -0500 Subject: [PATCH 12/19] Update flir_cam.py --- neurobooth_os/iout/flir_cam.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index 1cd32ce5..4dc2c3dd 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -248,7 +248,8 @@ def close(self): def ensure_stopped(self, timeout_seconds: float) -> None: """Check to make sure the recording is actually stopped.""" - self.video_thread.join(timeout_seconds) + #self.video_thread.join(timeout_seconds) + self.video_thread.join() if self.video_thread.is_alive(): self.logger.error(f'FLIR: Potential Zombie Thread Detected!' f' Stop taking longer than {timeout_seconds} seconds') From 34197377f994ce4f7f5cebb98dc280d3f0be0d25 Mon Sep 17 00:00:00 2001 From: Larry White Date: Sun, 26 Jan 2025 16:05:48 -0500 Subject: [PATCH 13/19] Update flir_cam.py --- neurobooth_os/iout/flir_cam.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index 4dc2c3dd..d81ebdce 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -248,8 +248,8 @@ def close(self): def ensure_stopped(self, timeout_seconds: float) -> None: """Check to make sure the recording is actually stopped.""" - #self.video_thread.join(timeout_seconds) - self.video_thread.join() + self.video_thread.join(timeout_seconds) + # self.video_thread.join() if self.video_thread.is_alive(): self.logger.error(f'FLIR: Potential Zombie Thread Detected!' f' Stop taking longer than {timeout_seconds} seconds') From b022ed204366858e85afb337ca3df52a00d9d385 Mon Sep 17 00:00:00 2001 From: Larry White Date: Mon, 27 Jan 2025 11:21:28 -0500 Subject: [PATCH 14/19] fixing boolean check in multithreading.Manager --- neurobooth_os/iout/flir_cam.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index d81ebdce..b5387cb9 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -29,12 +29,14 @@ def camCaptureVid(video_filename, frame_rate, frame_size, image_queue, recording fourcc = cv2.VideoWriter_fourcc(*"MJPG") video_out = cv2.VideoWriter(video_filename, fourcc, frame_rate, frame_size) - while recording or not image_queue.empty(): + while recording.value or not image_queue.empty(): try: dequeuedImage = image_queue.get(block=True, timeout=1) video_out.write(dequeuedImage) except queue.Empty: continue + if not recording.value: + logger.debug('FLIR: Recording.value is False; Ready to exit child process') except Exception as e: logger.error(f'FLIR: Error in save process: {e}') finally: @@ -212,10 +214,8 @@ def record(self): im, tsmp = self.imgage_proc() except: continue - self.image_queue.put(im) self.stamp.append(tsmp) - try: self.outlet.push_sample([self.frame_counter, tsmp]) except BaseException: @@ -229,9 +229,11 @@ def record(self): self.logger.debug( f"Queue length is {self.image_queue.qsize()} frame count: {self.frame_counter}" ) + self.logger.debug("FLIR: Record loop has exited; Trying to stop.") self.cam.EndAcquisition() - self.recording = False recording.value = False + self.logger.debug("FLIR: manager.recording is False; Trying to stop child process.") + self.recording = False self.save_process.join() self.logger.debug('FLIR: Exiting LSL Thread') @@ -249,7 +251,6 @@ def close(self): def ensure_stopped(self, timeout_seconds: float) -> None: """Check to make sure the recording is actually stopped.""" self.video_thread.join(timeout_seconds) - # self.video_thread.join() if self.video_thread.is_alive(): self.logger.error(f'FLIR: Potential Zombie Thread Detected!' f' Stop taking longer than {timeout_seconds} seconds') From 757fc4598b12c4ef569e41d634a73494936f3148 Mon Sep 17 00:00:00 2001 From: Larry White Date: Mon, 27 Jan 2025 11:45:20 -0500 Subject: [PATCH 15/19] Remove debugging code and add comments --- neurobooth_os/iout/flir_cam.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index b5387cb9..c64a9a76 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -21,7 +21,18 @@ os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" -def camCaptureVid(video_filename, frame_rate, frame_size, image_queue, recording): +def camCaptureVid(video_filename, frame_rate, frame_size, image_queue, recording) -> None: + """ + Takes Flir frames from a queue and writes them to a video file + + Parameters + ---------- + video_filename Name of file to write frames to + frame_rate Frames per second + frame_size Size of each frame + image_queue Queue of Flir frames to write (multithreading.Queue) + recording boolean value object (multithreading.Manager.Value) If True, the camera is recording + """ logger = logging.getLogger(APP_LOG_NAME) logger.debug('FLIR: Save Process Started') @@ -35,8 +46,6 @@ def camCaptureVid(video_filename, frame_rate, frame_size, image_queue, recording video_out.write(dequeuedImage) except queue.Empty: continue - if not recording.value: - logger.debug('FLIR: Recording.value is False; Ready to exit child process') except Exception as e: logger.error(f'FLIR: Error in save process: {e}') finally: @@ -209,7 +218,7 @@ def record(self): self.stamp = [] while self.recording: - # Exception for failed waiting self.cam.GetNextImage(1000) + # Exception for failed waiting self.cam.GetNextImage(2000) try: im, tsmp = self.imgage_proc() except: @@ -229,10 +238,8 @@ def record(self): self.logger.debug( f"Queue length is {self.image_queue.qsize()} frame count: {self.frame_counter}" ) - self.logger.debug("FLIR: Record loop has exited; Trying to stop.") self.cam.EndAcquisition() recording.value = False - self.logger.debug("FLIR: manager.recording is False; Trying to stop child process.") self.recording = False self.save_process.join() self.logger.debug('FLIR: Exiting LSL Thread') From d5f8c651bb3d201bbbeda2569b963a85a7abcf94 Mon Sep 17 00:00:00 2001 From: Larry White Date: Mon, 27 Jan 2025 12:39:15 -0500 Subject: [PATCH 16/19] added back old thread-based flir logic for comparison --- neurobooth_os/iout/flir_cam.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index c64a9a76..d4862d99 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -172,6 +172,23 @@ def createOutlet(self): meta.post_message(Request(source='Flir', destination='CTR', body=msg_body), conn=db_conn) return StreamOutlet(info) + # function to capture images, convert to numpy, send to queue, and release + # from buffer in separate process + def camCaptureVid(self): + self.logger.debug('FLIR: Save Thread Started') + while self.recording or self.image_queue.qsize(): + try: + dequeuedImage = self.image_queue.get(block=True, timeout=1) + self.video_out.write(dequeuedImage) + except queue.Empty: + continue + if not self.recording: + if not self.frame_counter % 1000 and self.image_queue.qsize() > 2: + self.logger.debug( + f"Queue length is {self.image_queue.qsize()} frame count: {self.frame_counter}" + ) + self.logger.debug('FLIR: Exiting Save Thread') + def start(self, name="temp_video"): self.prepare(name) self.video_thread = threading.Thread(target=self.record) @@ -206,7 +223,7 @@ def record(self): with multiprocessing.Manager() as manager: recording = manager.Value('b', True) try: - self.save_process = multiprocessing.Process(target=camCaptureVid, + self.save_process = multiprocessing.Process(target=self.camCaptureVid, args=(self.video_filename, self.FRAME_RATE_OUT, self.frameSize, From 1c22411fad27eb1e627aeb3f229c8e282d7220b2 Mon Sep 17 00:00:00 2001 From: Larry White Date: Mon, 27 Jan 2025 13:35:57 -0500 Subject: [PATCH 17/19] remove test code --- neurobooth_os/iout/flir_cam.py | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index d4862d99..c64a9a76 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -172,23 +172,6 @@ def createOutlet(self): meta.post_message(Request(source='Flir', destination='CTR', body=msg_body), conn=db_conn) return StreamOutlet(info) - # function to capture images, convert to numpy, send to queue, and release - # from buffer in separate process - def camCaptureVid(self): - self.logger.debug('FLIR: Save Thread Started') - while self.recording or self.image_queue.qsize(): - try: - dequeuedImage = self.image_queue.get(block=True, timeout=1) - self.video_out.write(dequeuedImage) - except queue.Empty: - continue - if not self.recording: - if not self.frame_counter % 1000 and self.image_queue.qsize() > 2: - self.logger.debug( - f"Queue length is {self.image_queue.qsize()} frame count: {self.frame_counter}" - ) - self.logger.debug('FLIR: Exiting Save Thread') - def start(self, name="temp_video"): self.prepare(name) self.video_thread = threading.Thread(target=self.record) @@ -223,7 +206,7 @@ def record(self): with multiprocessing.Manager() as manager: recording = manager.Value('b', True) try: - self.save_process = multiprocessing.Process(target=self.camCaptureVid, + self.save_process = multiprocessing.Process(target=camCaptureVid, args=(self.video_filename, self.FRAME_RATE_OUT, self.frameSize, From 8bb27963821452fca3040076dd1cff69cd1133cb Mon Sep 17 00:00:00 2001 From: Larry White Date: Wed, 29 Jan 2025 13:38:41 -0500 Subject: [PATCH 18/19] Insert test code in flir_cam.py. This will be removed --- neurobooth_os/iout/flir_cam.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index c64a9a76..9cd98328 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -27,7 +27,7 @@ def camCaptureVid(video_filename, frame_rate, frame_size, image_queue, recording Parameters ---------- - video_filename Name of file to write frames to + video_filename Name of file to write frames to frame_rate Frames per second frame_size Size of each frame image_queue Queue of Flir frames to write (multithreading.Queue) @@ -192,6 +192,11 @@ def prepare(self, name="temp_video"): self.frameSize = (im.shape[1], im.shape[0]) self.video_filename = "{}_flir.avi".format(name) + # TODO: REMOVE THESE NEXT 3 LINES + actual_fname = op.split(self.video_filename)[-1] + fname = os.path.join("D://", "neurobooth", "neurobooth_data", "100001_2025-01-29", actual_fname) + self.video_filename = fname + self.FRAME_RATE_OUT = self.cam.AcquisitionResultingFrameRate() msg_body = NewVideoFile(stream_name=self.streamName, filename=op.split(self.video_filename)[-1]) From 18d5ad89e21ee48f7af8189b30571355ccd79ac8 Mon Sep 17 00:00:00 2001 From: Larry White Date: Wed, 29 Jan 2025 17:13:40 -0500 Subject: [PATCH 19/19] comment-out test code --- neurobooth_os/iout/flir_cam.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/neurobooth_os/iout/flir_cam.py b/neurobooth_os/iout/flir_cam.py index 9cd98328..f00b9f1d 100644 --- a/neurobooth_os/iout/flir_cam.py +++ b/neurobooth_os/iout/flir_cam.py @@ -193,9 +193,9 @@ def prepare(self, name="temp_video"): self.video_filename = "{}_flir.avi".format(name) # TODO: REMOVE THESE NEXT 3 LINES - actual_fname = op.split(self.video_filename)[-1] - fname = os.path.join("D://", "neurobooth", "neurobooth_data", "100001_2025-01-29", actual_fname) - self.video_filename = fname + # actual_fname = op.split(self.video_filename)[-1] + # fname = os.path.join("D://", "neurobooth", "neurobooth_data", "100001_2025-01-29", actual_fname) + # self.video_filename = fname self.FRAME_RATE_OUT = self.cam.AcquisitionResultingFrameRate() msg_body = NewVideoFile(stream_name=self.streamName,