Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multiprocessing for writing Flir frames #465

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
Draft
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions neurobooth_os/iout/flir_cam.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@
import os
import threading
import uuid
import neurobooth_os.iout.metadator as meta
import logging
import multiprocessing
from typing import Callable, Any

import cv2
import PySpin
from pylsl import StreamInfo, StreamOutlet
import skvideo
import skvideo.io
import h5py

import neurobooth_os.iout.metadator as meta
from neurobooth_os.iout.stim_param_reader import FlirDeviceArgs
from neurobooth_os.iout.stream_utils import DataVersion, set_stream_description
from neurobooth_os.log_manager import APP_LOG_NAME
Expand Down Expand Up @@ -44,6 +42,8 @@ def __init__(
gamma=0.6,
fd=1,
):
self.frame_counter = None
self.save_process = None
self.device_args: FlirDeviceArgs = device_args
# not currently using sizex, sizey --> need to update to use these parameters
# need to read these parameters from database
Expand All @@ -65,8 +65,7 @@ def __init__(

self.get_cam()
self.setup_cam()

self.image_queue = queue.Queue(0)
self.image_queue = multiprocessing.Queue(0)
self.outlet = self.createOutlet()

self.logger.debug(f'FLIR: fps={str(self.device_args.sample_rate())}; '
Expand Down Expand Up @@ -151,7 +150,7 @@ def createOutlet(self):
# function to capture images, convert to numpy, send to queue, and release
# from buffer in separate process
def camCaptureVid(self):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can create a video writer here (that gets created in the child process instead). this can be the rewritten function:

#rewritten function

def camCaptureVid(self):
    try:
        # Initializing VideoWriter in the child process instead
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        video_out = cv2.VideoWriter(
            self.video_filename, fourcc,
            self.FRAME_RATE_OUT, self.frameSize
        )
        self.logger.debug('FLIR: Save Process Started')
 
       #write the frames from the queue as before
       while self.recording or not self.image_queue.empty():
            try:
                # Retrieve frame from the queue
                dequeuedImage = self.image_queue.get(block=True, timeout=1)
                # Write frame to the video file
                video_out.write(dequeuedImage)
            except queue.Empty:
                continue
    except Exception as e:
        self.logger.error(f'FLIR: Error in save process: {e}')
    finally:
        # Safely release video writer
        video_out.release()
        self.logger.debug('FLIR: Exiting Save Process')

self.logger.debug('FLIR: Save Thread Started')
self.logger.debug('FLIR: Save Process Started')
while self.recording or self.image_queue.qsize():
try:
dequeuedImage = self.image_queue.get(block=True, timeout=1)
Expand Down Expand Up @@ -192,11 +191,16 @@ def prepare(self, name="temp_video"):
self.streaming = True

def record(self):

self.logger.debug('FLIR: LSL Thread Started')
self.recording = True
self.frame_counter = 0
self.save_thread = threading.Thread(target=self.camCaptureVid)
self.save_thread.start()

try:
self.save_process = multiprocessing.Process(target = self.camCaptureVid())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor change here -> passing the function object instead of calling the function.

self.save_process = multiprocessing.Process(target = self.camCaptureVid)

self.save_process.start()
except BaseException as e:
self.logger.error(f'Unable to start Flir save process; error={e}')

self.stamp = []
while self.recording:
Expand All @@ -206,7 +210,12 @@ def record(self):
except:
continue

self.image_queue.put(im)
try:
self.image_queue.put(im)
except BaseException as e:
self.logger.critical(f'Unable to enqueue Flir frame; error={e}')
raise e

self.stamp.append(tsmp)

try:
Expand All @@ -216,17 +225,15 @@ def record(self):
self.outlet = self.createOutlet(self.video_filename)
self.outlet.push_sample([self.frame_counter, tsmp])

# self.video_out.write(im_conv_d)
self.frame_counter += 1

if not self.frame_counter % 1000 and self.image_queue.qsize() > 2:
self.logger.debug(
f"Queue length is {self.image_queue.qsize()} frame count: {self.frame_counter}"
)

self.cam.EndAcquisition()
self.recording = False
self.save_thread.join()
self.save_process.join()
self.video_out.release()
self.logger.debug('FLIR: Video File Released; Exiting LSL Thread')

Expand All @@ -243,7 +250,7 @@ def close(self):

def ensure_stopped(self, timeout_seconds: float) -> None:
"""Check to make sure the recording is actually stopped."""
self.video_thread.join()
self.video_thread.join(timeout_seconds)
if self.video_thread.is_alive():
self.logger.error('FLIR: Potential Zombie Thread Detected!')
raise FlirException('Potential Zombie Thread Detected!')
Expand Down