Skip to content

Commit

Permalink
fix(dgw): adjust delete_many endpoint
Browse files Browse the repository at this point in the history
Instead of failing and returning 404 Not Found when one of the recordings
is not found, we return the number of recordings not found in the
response body with a 200 Ok.

The client may use this information to notify the user about a possible
out of sync state on their side and suggest to rerun an indexing
operation.

Changelog: ignore
  • Loading branch information
CBenoit committed Nov 7, 2024
1 parent 341d455 commit 8fd4ba9
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 41 deletions.
135 changes: 94 additions & 41 deletions devolutions-gateway/src/api/jrec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,26 +147,40 @@ async fn jrec_delete(
Ok(())
}

#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(Serialize)]
pub(crate) struct DeleteManyResult {
/// Number of recordings found
found_count: usize,
/// Number of recordings not found
not_found_count: usize,
}

/// Mass-deletes recordings stored on this instance
///
/// If you try to delete more than 1,000,000 recordings at once, you should split the list into multiple requests
/// to avoid timing out during the validation of the preconditions.
/// to avoid timing out during the processing of the request.
///
/// The request processing consist in
/// 1) checking if one of the recording is active,
/// 2) counting the number of recordings not found on this instance.
///
/// The preconditions are:
/// - No recording to delete is in active state.
/// - The associated folder for each of the recording to delete must exist on this instance.
/// When a recording is not found on this instance, a counter is incremented.
/// This number is returned as part of the response.
/// You may use this information to detect anomalies on your side.
/// For instance, this suggests the list of recordings on your side is out of date,
/// and you may want re-index.
#[cfg_attr(feature = "openapi", utoipa::path(
delete,
operation_id = "DeleteManyRecordings",
tag = "Jrec",
path = "/jet/jrec/delete",
request_body(content = Vec<Uuid>, description = "JSON-encoded list of session IDs", content_type = "application/json"),
responses(
(status = 200, description = "Mass recording deletion task was successfully started"),
(status = 200, description = "Mass recording deletion task was successfully started", body = DeleteManyResult),
(status = 400, description = "Bad request"),
(status = 401, description = "Invalid or missing authorization token"),
(status = 403, description = "Insufficient permissions"),
(status = 404, description = "One of the recordings specified in the body was not found"),
(status = 406, description = "A recording is still ongoing and can't be deleted yet (nothing is deleted)"),
),
security(("scope_token" = ["gateway.recording.delete"])),
Expand All @@ -179,44 +193,32 @@ async fn jrec_delete_many(
}): State<DgwState>,
_scope: RecordingDeleteScope,
Json(delete_list): Json<Vec<Uuid>>,
) -> Result<(), HttpError> {
// When deleting many many recordings, this synchronous block may take more than 250ms to execute.
let join_handle = tokio::task::spawn_blocking(move || {
let active_recordings = recordings.active_recordings.cloned();

let conflict = delete_list.iter().any(|id| active_recordings.contains(id));

if conflict {
return Err(HttpErrorBuilder::new(StatusCode::CONFLICT)
.msg("attempted to delete a recording for an ongoing session"));
}

let conf = conf_handle.get_conf();

let recording_paths: Vec<Utf8PathBuf> = delete_list
.iter()
.map(|session_id| conf.recording_path.join(session_id.to_string()))
.collect();

for (path, session_id) in recording_paths.iter().zip(delete_list.iter()) {
if !path.exists() {
return Err(HttpError::not_found()
.with_msg("attempted to delete a recording not found on this instance")
.err()(format!(
"folder {path} for session {session_id} does not exist"
)));
}
}

Ok((delete_list, recording_paths))
});

let (delete_list, recording_paths) = join_handle.await.map_err(HttpError::internal().err())??;
) -> Result<Json<DeleteManyResult>, HttpError> {
use std::collections::HashSet;

const BLOCKING_THRESHOLD: usize = 100_000;

let recording_path = conf_handle.get_conf().recording_path.clone();
let active_recordings = recordings.active_recordings.cloned();

// When deleting many many recordings, check_preconditions may take more than 250ms to execute.
// For this reason, we defensively spawn a blocking task.
let ProcessResult {
not_found_count,
found_count,
recording_paths,
} = if delete_list.len() > BLOCKING_THRESHOLD {
let join_handle =
tokio::task::spawn_blocking(move || process_request(delete_list, &recording_path, &active_recordings));
join_handle.await.map_err(HttpError::internal().err())??
} else {
process_request(delete_list, &recording_path, &active_recordings)?
};

// FIXME: It would be better to have a job queue for this kind of things in case the service is killed.
tokio::spawn({
async move {
for (path, session_id) in recording_paths.into_iter().zip(delete_list.into_iter()) {
for (session_id, path) in recording_paths {
if let Err(error) = delete_recording(&path).await {
error!(
error = format!("{error:#}"),
Expand All @@ -227,7 +229,58 @@ async fn jrec_delete_many(
}
});

Ok(())
let delete_many_result = DeleteManyResult {
found_count,
not_found_count,
};

return Ok(Json(delete_many_result));

struct ProcessResult {
not_found_count: usize,
found_count: usize,
recording_paths: Vec<(Uuid, Utf8PathBuf)>,
}

fn process_request(
delete_list: Vec<Uuid>,
recording_path: &Utf8Path,
active_recordings: &HashSet<Uuid>,
) -> Result<ProcessResult, HttpError> {
let conflict = delete_list.iter().any(|id| active_recordings.contains(id));

if conflict {
return Err(HttpErrorBuilder::new(StatusCode::CONFLICT)
.msg("attempted to delete a recording for an ongoing session"));
}

let mut not_found_count = 0;

let recording_paths: Vec<(Uuid, Utf8PathBuf)> = delete_list
.into_iter()
.filter_map(|session_id| {
let path = recording_path.join(session_id.to_string());

if !path.exists() {
warn!(%path, %session_id, "Attempted to delete a recording not found on this instance");
not_found_count += 1;
None
} else {
Some((session_id, path))
}
})
.collect();

let found_count = recording_paths.len();

let result = ProcessResult {
not_found_count,
found_count,
recording_paths,
};

Ok(result)
}
}

async fn delete_recording(recording_path: &Utf8Path) -> anyhow::Result<()> {
Expand Down
1 change: 1 addition & 0 deletions devolutions-gateway/src/openapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use uuid::Uuid;
crate::api::config::SubProvisionerKey,
crate::api::config::ConfigPatch,
crate::api::jrl::JrlInfo,
crate::api::jrec::DeleteManyResult,
crate::token::AccessScope,
crate::api::webapp::AppTokenSignRequest,
crate::api::webapp::AppTokenContentType,
Expand Down
1 change: 1 addition & 0 deletions devolutions-gateway/src/recording.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ impl RecordingManagerTask {
if recording_file_path.extension() == Some(RecordingFileType::WebM.extension()) {
if cadeau::xmf::is_init() {
debug!(%recording_file_path, "Enqueue video remuxing operation");
// FIXME: It would be better to have a job queue for this kind of things in case the service is killed.
tokio::spawn(remux(recording_file_path));
} else {
debug!("Video remuxing was skipped because XMF native library is not loaded");
Expand Down

0 comments on commit 8fd4ba9

Please sign in to comment.