diff --git a/devolutions-gateway/src/api/jrec.rs b/devolutions-gateway/src/api/jrec.rs index e87d4d3a..c7c6d288 100644 --- a/devolutions-gateway/src/api/jrec.rs +++ b/devolutions-gateway/src/api/jrec.rs @@ -147,14 +147,29 @@ async fn jrec_delete( Ok(()) } +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +#[derive(Serialize)] +pub(crate) struct DeleteManyResult { + /// Number of recordings found + found_count: usize, + /// Number of recordings not found + not_found_count: usize, +} + /// Mass-deletes recordings stored on this instance /// /// If you try to delete more than 1,000,000 recordings at once, you should split the list into multiple requests -/// to avoid timing out during the validation of the preconditions. +/// to avoid timing out during the processing of the request. +/// +/// The request processing consist in +/// 1) checking if one of the recording is active, +/// 2) counting the number of recordings not found on this instance. /// -/// The preconditions are: -/// - No recording to delete is in active state. -/// - The associated folder for each of the recording to delete must exist on this instance. +/// When a recording is not found on this instance, a counter is incremented. +/// This number is returned as part of the response. +/// You may use this information to detect anomalies on your side. +/// For instance, this suggests the list of recordings on your side is out of date, +/// and you may want re-index. #[cfg_attr(feature = "openapi", utoipa::path( delete, operation_id = "DeleteManyRecordings", @@ -162,11 +177,10 @@ async fn jrec_delete( path = "/jet/jrec/delete", request_body(content = Vec, description = "JSON-encoded list of session IDs", content_type = "application/json"), responses( - (status = 200, description = "Mass recording deletion task was successfully started"), + (status = 200, description = "Mass recording deletion task was successfully started", body = DeleteManyResult), (status = 400, description = "Bad request"), (status = 401, description = "Invalid or missing authorization token"), (status = 403, description = "Insufficient permissions"), - (status = 404, description = "One of the recordings specified in the body was not found"), (status = 406, description = "A recording is still ongoing and can't be deleted yet (nothing is deleted)"), ), security(("scope_token" = ["gateway.recording.delete"])), @@ -179,44 +193,32 @@ async fn jrec_delete_many( }): State, _scope: RecordingDeleteScope, Json(delete_list): Json>, -) -> Result<(), HttpError> { - // When deleting many many recordings, this synchronous block may take more than 250ms to execute. - let join_handle = tokio::task::spawn_blocking(move || { - let active_recordings = recordings.active_recordings.cloned(); - - let conflict = delete_list.iter().any(|id| active_recordings.contains(id)); - - if conflict { - return Err(HttpErrorBuilder::new(StatusCode::CONFLICT) - .msg("attempted to delete a recording for an ongoing session")); - } - - let conf = conf_handle.get_conf(); - - let recording_paths: Vec = delete_list - .iter() - .map(|session_id| conf.recording_path.join(session_id.to_string())) - .collect(); - - for (path, session_id) in recording_paths.iter().zip(delete_list.iter()) { - if !path.exists() { - return Err(HttpError::not_found() - .with_msg("attempted to delete a recording not found on this instance") - .err()(format!( - "folder {path} for session {session_id} does not exist" - ))); - } - } - - Ok((delete_list, recording_paths)) - }); - - let (delete_list, recording_paths) = join_handle.await.map_err(HttpError::internal().err())??; +) -> Result, HttpError> { + use std::collections::HashSet; + + const BLOCKING_THRESHOLD: usize = 100_000; + + let recording_path = conf_handle.get_conf().recording_path.clone(); + let active_recordings = recordings.active_recordings.cloned(); + + // When deleting many many recordings, check_preconditions may take more than 250ms to execute. + // For this reason, we defensively spawn a blocking task. + let ProcessResult { + not_found_count, + found_count, + recording_paths, + } = if delete_list.len() > BLOCKING_THRESHOLD { + let join_handle = + tokio::task::spawn_blocking(move || process_request(delete_list, &recording_path, &active_recordings)); + join_handle.await.map_err(HttpError::internal().err())?? + } else { + process_request(delete_list, &recording_path, &active_recordings)? + }; // FIXME: It would be better to have a job queue for this kind of things in case the service is killed. tokio::spawn({ async move { - for (path, session_id) in recording_paths.into_iter().zip(delete_list.into_iter()) { + for (session_id, path) in recording_paths { if let Err(error) = delete_recording(&path).await { error!( error = format!("{error:#}"), @@ -227,7 +229,58 @@ async fn jrec_delete_many( } }); - Ok(()) + let delete_many_result = DeleteManyResult { + found_count, + not_found_count, + }; + + return Ok(Json(delete_many_result)); + + struct ProcessResult { + not_found_count: usize, + found_count: usize, + recording_paths: Vec<(Uuid, Utf8PathBuf)>, + } + + fn process_request( + delete_list: Vec, + recording_path: &Utf8Path, + active_recordings: &HashSet, + ) -> Result { + let conflict = delete_list.iter().any(|id| active_recordings.contains(id)); + + if conflict { + return Err(HttpErrorBuilder::new(StatusCode::CONFLICT) + .msg("attempted to delete a recording for an ongoing session")); + } + + let mut not_found_count = 0; + + let recording_paths: Vec<(Uuid, Utf8PathBuf)> = delete_list + .into_iter() + .filter_map(|session_id| { + let path = recording_path.join(session_id.to_string()); + + if !path.exists() { + warn!(%path, %session_id, "Attempted to delete a recording not found on this instance"); + not_found_count += 1; + None + } else { + Some((session_id, path)) + } + }) + .collect(); + + let found_count = recording_paths.len(); + + let result = ProcessResult { + not_found_count, + found_count, + recording_paths, + }; + + Ok(result) + } } async fn delete_recording(recording_path: &Utf8Path) -> anyhow::Result<()> { diff --git a/devolutions-gateway/src/openapi.rs b/devolutions-gateway/src/openapi.rs index 1b9e6c3a..5fca467c 100644 --- a/devolutions-gateway/src/openapi.rs +++ b/devolutions-gateway/src/openapi.rs @@ -39,6 +39,7 @@ use uuid::Uuid; crate::api::config::SubProvisionerKey, crate::api::config::ConfigPatch, crate::api::jrl::JrlInfo, + crate::api::jrec::DeleteManyResult, crate::token::AccessScope, crate::api::webapp::AppTokenSignRequest, crate::api::webapp::AppTokenContentType, diff --git a/devolutions-gateway/src/recording.rs b/devolutions-gateway/src/recording.rs index 0104b502..cddafbf0 100644 --- a/devolutions-gateway/src/recording.rs +++ b/devolutions-gateway/src/recording.rs @@ -504,6 +504,7 @@ impl RecordingManagerTask { if recording_file_path.extension() == Some(RecordingFileType::WebM.extension()) { if cadeau::xmf::is_init() { debug!(%recording_file_path, "Enqueue video remuxing operation"); + // FIXME: It would be better to have a job queue for this kind of things in case the service is killed. tokio::spawn(remux(recording_file_path)); } else { debug!("Video remuxing was skipped because XMF native library is not loaded");