diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index 95a9e628bef6..d2ee32b46177 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -2,7 +2,7 @@ use std::{ collections::BTreeMap, - io::Write, + process::ChildStdin, sync::{atomic::AtomicBool, Arc}, }; @@ -12,6 +12,7 @@ use ffmpeg_sidecar::{ command::FfmpegCommand, event::{FfmpegEvent, LogLevel}, }; +use parking_lot::Mutex; use crate::Time; @@ -87,6 +88,36 @@ enum FfmpegFrameData { EndOfStream, } +/// Wraps an stdin with a shared shutdown boolean. +struct StdinWithShutdown { + shutdown: Arc, + stdin: ChildStdin, +} + +impl StdinWithShutdown { + // Don't use `std::io::ErrorKind::Interrupted` because it has special meaning for default implementations of the `Write` trait, + // causing it to continue. + const SHUTDOWN_ERROR_KIND: std::io::ErrorKind = std::io::ErrorKind::Other; +} + +impl std::io::Write for StdinWithShutdown { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + if self.shutdown.load(std::sync::atomic::Ordering::Acquire) { + Err(std::io::Error::new(Self::SHUTDOWN_ERROR_KIND, "shutdown")) + } else { + self.stdin.write(buf) + } + } + + fn flush(&mut self) -> std::io::Result<()> { + if self.shutdown.load(std::sync::atomic::Ordering::Acquire) { + Err(std::io::Error::new(Self::SHUTDOWN_ERROR_KIND, "shutdown")) + } else { + self.stdin.flush() + } + } +} + struct FfmpegProcessAndListener { ffmpeg: FfmpegChild, @@ -100,7 +131,10 @@ struct FfmpegProcessAndListener { write_thread: Option>, /// If true, the write thread will not report errors. Used upon exit, so the write thread won't log spam on the hung up stdin. - suppress_write_error_reports: Arc, + stdin_shutdown: Arc, + + /// On output instance used by the threads. + on_output: Arc>>>, } impl FfmpegProcessAndListener { @@ -151,6 +185,12 @@ impl FfmpegProcessAndListener { let (frame_info_tx, frame_info_rx) = crossbeam::channel::unbounded(); let (frame_data_tx, frame_data_rx) = crossbeam::channel::unbounded(); + let stdin_shutdown = Arc::new(AtomicBool::new(false)); + + // Mutex protect `on_output` so that we can shut down the threads at a defined point in time at which we + // no longer receive any new frames or errors from this process. + let on_output = Arc::new(Mutex::new(Some(on_output))); + let listen_thread = std::thread::Builder::new() .name(format!("ffmpeg-reader for {debug_name}")) .spawn({ @@ -166,20 +206,21 @@ impl FfmpegProcessAndListener { } }) .expect("Failed to spawn ffmpeg listener thread"); - - let suppress_write_error_reports = Arc::new(AtomicBool::new(false)); let write_thread = std::thread::Builder::new() .name(format!("ffmpeg-writer for {debug_name}")) .spawn({ + let on_output = on_output.clone(); let ffmpeg_stdin = ffmpeg.take_stdin().ok_or(Error::NoStdin)?; - let suppress_write_error_reports = suppress_write_error_reports.clone(); + let mut ffmpeg_stdin = StdinWithShutdown { + stdin: ffmpeg_stdin, + shutdown: stdin_shutdown.clone(), + }; move || { write_ffmpeg_input( - ffmpeg_stdin, + &mut ffmpeg_stdin, &frame_data_rx, on_output.as_ref(), &avcc, - &suppress_write_error_reports, ); } }) @@ -191,7 +232,8 @@ impl FfmpegProcessAndListener { frame_data_tx, listen_thread: Some(listen_thread), write_thread: Some(write_thread), - suppress_write_error_reports, + stdin_shutdown, + on_output, }) } } @@ -199,30 +241,56 @@ impl FfmpegProcessAndListener { impl Drop for FfmpegProcessAndListener { fn drop(&mut self) { re_tracing::profile_function!(); - self.suppress_write_error_reports - .store(true, std::sync::atomic::Ordering::Relaxed); + + // Stop all outputs from being written to - any attempt from here on out will fail and cause thread shutdown. + // This way, we ensure all ongoing writes are finished and won't get any more on_output callbacks from this process + // before we take any other action on the shutdown sequence. + { + self.on_output.lock().take(); + } + + // Notify (potentially wake up) the stdin write thread to stop it (it might be sleeping). self.frame_data_tx.send(FfmpegFrameData::EndOfStream).ok(); + // Kill stdin for the write thread. This helps cancelling ongoing stream write operations. + self.stdin_shutdown + .store(true, std::sync::atomic::Ordering::Release); + + // Kill the ffmpeg process itself. + // This should wake up the listen thread if it is sleeping, but that may take a while. self.ffmpeg.kill().ok(); - if let Some(write_thread) = self.write_thread.take() { - if write_thread.join().is_err() { - re_log::error!("Failed to join ffmpeg listener thread."); + // Unfortunately, even with the above measures, it can still happen that the listen threads take occasionally 100ms and more to shut down. + // (very much depending on the system & OS, typical times may be low with large outliers) + // It is crucial that the threads come down eventually and rather timely so to avoid leaking resources. + // However, in order to avoid stalls, we'll let them finish in parallel. + // + // Since we disconnected the `on_output` callback from them, they won't influence any new instances. + if false { + { + re_tracing::profile_scope!("shutdown write thread"); + if let Some(write_thread) = self.write_thread.take() { + if write_thread.join().is_err() { + re_log::error!("Failed to join ffmpeg listener thread."); + } + } } - } - if let Some(listen_thread) = self.listen_thread.take() { - if listen_thread.join().is_err() { - re_log::error!("Failed to join ffmpeg listener thread."); + { + re_tracing::profile_scope!("shutdown listen thread"); + if let Some(listen_thread) = self.listen_thread.take() { + if listen_thread.join().is_err() { + re_log::error!("Failed to join ffmpeg listener thread."); + } + } } } } } fn write_ffmpeg_input( - mut ffmpeg_stdin: std::process::ChildStdin, + ffmpeg_stdin: &mut dyn std::io::Write, frame_data_rx: &Receiver, - on_output: &OutputCallback, + on_output: &Mutex>>, avcc: &re_mp4::Avc1Box, - suppress_write_error_reports: &AtomicBool, ) { let mut state = NaluStreamState::default(); @@ -232,19 +300,18 @@ fn write_ffmpeg_input( FfmpegFrameData::EndOfStream => break, }; - if let Err(err) = - write_avc_chunk_to_nalu_stream(avcc, &mut ffmpeg_stdin, &chunk, &mut state) - { - let write_error = matches!(err, Error::FailedToWriteToFfmpeg(_)); - if !write_error - || !suppress_write_error_reports.load(std::sync::atomic::Ordering::Relaxed) - { - (on_output)(Err(err.into())); - } + if let Err(err) = write_avc_chunk_to_nalu_stream(avcc, ffmpeg_stdin, &chunk, &mut state) { + let on_output = on_output.lock(); + if let Some(on_output) = on_output.as_ref() { + let write_error = matches!(err, Error::FailedToWriteToFfmpeg(_)); + on_output(Err(err.into())); - // This is unlikely to improve! Ffmpeg process likely died. - // By exiting here we hang up on the channel, making future attempts to push into it fail which should cause a reset eventually. - if write_error { + if write_error { + // This is unlikely to improve! Ffmpeg process likely died. + // By exiting here we hang up on the channel, making future attempts to push into it fail which should cause a reset eventually. + return; + } + } else { return; } } else { @@ -257,8 +324,8 @@ fn read_ffmpeg_output( debug_name: &str, ffmpeg_iterator: ffmpeg_sidecar::iter::FfmpegIterator, frame_info_rx: &Receiver, - on_output: &OutputCallback, -) { + on_output: &Mutex>>, +) -> Option<()> { /// Ignore some common output from ffmpeg: fn should_ignore_log_msg(msg: &str) -> bool { let patterns = [ @@ -310,19 +377,18 @@ fn read_ffmpeg_output( } FfmpegEvent::Log(LogLevel::Error, msg) => { - on_output(Err(Error::Ffmpeg(msg).into())); + (on_output.lock().as_ref()?)(Err(Error::Ffmpeg(msg).into())); } FfmpegEvent::Log(LogLevel::Fatal, msg) => { - on_output(Err(Error::FfmpegFatal(msg).into())); - return; + (on_output.lock().as_ref()?)(Err(Error::FfmpegFatal(msg).into())); } FfmpegEvent::Log(LogLevel::Unknown, msg) => { if msg.contains("system signals, hard exiting") { // That was probably us, killing the process. re_log::debug!("FFmpeg process for {debug_name} was killed"); - return; + return None; } if !should_ignore_log_msg(&msg) { re_log::warn_once!("{debug_name} decoder: {msg}"); @@ -336,7 +402,7 @@ fn read_ffmpeg_output( FfmpegEvent::Error(error) => { // An error in ffmpeg sidecar itself, rather than ffmpeg. - on_output(Err(Error::FfmpegSidecar(error).into())); + (on_output.lock().as_ref()?)(Err(Error::FfmpegSidecar(error).into())); } FfmpegEvent::ParsedInput(input) => { @@ -423,7 +489,7 @@ fn read_ffmpeg_output( re_log::debug!( "{debug_name} ffmpeg decoder frame info channel disconnected" ); - return; + return None; }; // If the decodetimestamp did not increase, we're probably seeking backwards! @@ -458,7 +524,7 @@ fn read_ffmpeg_output( debug_assert_eq!(pix_fmt, "rgb24"); debug_assert_eq!(width as usize * height as usize * 3, data.len()); - on_output(Ok(super::Frame { + (on_output.lock().as_ref()?)(Ok(super::Frame { content: super::FrameContent { data, width, @@ -476,7 +542,7 @@ fn read_ffmpeg_output( FfmpegEvent::Done => { // This happens on `pkill ffmpeg`, for instance. re_log::debug!("{debug_name}'s ffmpeg is Done"); - return; + return None; } FfmpegEvent::ParsedVersion(ffmpeg_version) => { @@ -497,11 +563,13 @@ fn read_ffmpeg_output( FfmpegEvent::OutputChunk(_) => { // Something went seriously wrong if we end up here. re_log::error!("Unexpected ffmpeg output chunk for {debug_name}"); - on_output(Err(Error::UnexpectedFfmpegOutputChunk.into())); - return; + (on_output.lock().as_ref()?)(Err(Error::UnexpectedFfmpegOutputChunk.into())); + return None; } } } + + Some(()) } /// Decode H.264 video via ffmpeg over CLI @@ -606,20 +674,12 @@ fn write_avc_chunk_to_nalu_stream( // Otherwise the decoder is not able to get the necessary information about how the video stream is encoded. if chunk.is_sync && !state.previous_frame_was_idr { for sps in &avcc.sequence_parameter_sets { - nalu_stream - .write_all(NAL_START_CODE) - .map_err(Error::FailedToWriteToFfmpeg)?; - nalu_stream - .write_all(&sps.bytes) - .map_err(Error::FailedToWriteToFfmpeg)?; + write_bytes(nalu_stream, NAL_START_CODE)?; + write_bytes(nalu_stream, &sps.bytes)?; } for pps in &avcc.picture_parameter_sets { - nalu_stream - .write_all(NAL_START_CODE) - .map_err(Error::FailedToWriteToFfmpeg)?; - nalu_stream - .write_all(&pps.bytes) - .map_err(Error::FailedToWriteToFfmpeg)?; + write_bytes(nalu_stream, NAL_START_CODE)?; + write_bytes(nalu_stream, &pps.bytes)?; } state.previous_frame_was_idr = true; } else {