From 2a29e85b735e5c1431b93fc531e04789b6d0e577 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Sun, 20 Oct 2024 15:25:48 +0200 Subject: [PATCH] websrv: Pass Tokio runtime to filters --- Cargo.lock | 31 ++-- Cargo.toml | 4 +- crates/repo-sqlite/Cargo.toml | 2 +- file-collection-app/src/main.rs | 8 +- websrv/src/launcher/mod.rs | 9 +- websrv/src/routing/api.rs | 279 +++++++++++++++++--------------- websrv/src/runtime.rs | 2 + 7 files changed, 179 insertions(+), 156 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 66d97799..b4fac912 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -151,9 +151,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +checksum = "37bf3594c4c988a53154954629820791dde498571819ae4ca50ca811e060cc95" [[package]] name = "aoide" @@ -890,9 +890,9 @@ dependencies = [ [[package]] name = "built" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "236e6289eda5a812bc6b53c3b024039382a2895fbbeef2d748b2931546d392c4" +checksum = "c360505aed52b7ec96a3636c3f039d99103c37d1d9b4f7a8c743d3ea9ffcd03b" [[package]] name = "bumpalo" @@ -981,9 +981,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.30" +version = "1.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" +checksum = "c2e7962b54006dcfcc61cb72735f4d89bb97061dd6a7ed882ec6b8ee53714c6f" dependencies = [ "jobserver", "libc", @@ -1735,9 +1735,9 @@ dependencies = [ [[package]] name = "flume" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" dependencies = [ "spin", ] @@ -3789,9 +3789,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.129" +version = "1.0.132" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dbcf9b78a125ee667ae19388837dd12294b858d101fdd393cb9d5501ef09eb2" +checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" dependencies = [ "itoa", "memchr", @@ -4026,9 +4026,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.79" +version = "2.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +checksum = "198514704ca887dd5a1e408c6c6cdcba43672f9b4062e1b24aa34e74e6d7faae" dependencies = [ "proc-macro2", "quote", @@ -4516,12 +4516,9 @@ dependencies = [ [[package]] name = "unicase" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" -dependencies = [ - "version_check", -] +checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df" [[package]] name = "unicode-bidi" diff --git a/Cargo.toml b/Cargo.toml index 729b0ab3..5eef0971 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ aoide-websrv-warp-sqlite = { version = "=0.8.0", path = "crates/websrv-warp-sqli # Commonly used dependencies. # Also serves for documenting the opionionated selection of third-party crates. aho-corasick = "1.1.3" -anyhow = "1.0.89" +anyhow = "1.0.90" bitflags = "2.6.0" data-encoding = "2.6.0" derive_more = "1.0.0" @@ -52,7 +52,7 @@ log = "0.4.22" mime = "0.3.17" regex = "1.11.0" serde = "1.0.210" -serde_json = "1.0.129" +serde_json = "1.0.132" static_assertions = "1.1.0" strum = "0.26.3" tantivy = "0.22.0" diff --git a/crates/repo-sqlite/Cargo.toml b/crates/repo-sqlite/Cargo.toml index 7a4361d5..4882be5e 100644 --- a/crates/repo-sqlite/Cargo.toml +++ b/crates/repo-sqlite/Cargo.toml @@ -25,7 +25,7 @@ diesel_migrations = { version = "2.2.0", default-features = false, features = [ log.workspace = true mime.workspace = true strum = { workspace = true, features = ["derive"] } -unicase = "2.7.0" +unicase = "2.8.0" url.workspace = true # Workspace dependencies diff --git a/file-collection-app/src/main.rs b/file-collection-app/src/main.rs index bbf86b99..c8131893 100644 --- a/file-collection-app/src/main.rs +++ b/file-collection-app/src/main.rs @@ -30,9 +30,13 @@ pub struct NoReceiverForEvent; #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[must_use] pub enum ActionResponse { - /// Rejected (and unchanged), i.e. no effect. + /// Rejected without any effect. + /// + /// State is unchanged without any side-effects. Rejected, - /// Rejected but maybe changed. + /// Rejected (but maybe already changed). + /// + /// Might have caused a state change or side-effects. RejectedMaybeChanged, /// Accepted (and maybe changed). Accepted, diff --git a/websrv/src/launcher/mod.rs b/websrv/src/launcher/mod.rs index 54f73e48..b79e888e 100644 --- a/websrv/src/launcher/mod.rs +++ b/websrv/src/launcher/mod.rs @@ -117,7 +117,14 @@ impl Launcher { let join_handle = std::thread::spawn({ let tokio_rt = runtime.handle().clone(); let config = config.clone(); - move || tokio_rt.block_on(run(config, runtime_command_rx, current_runtime_state_tx)) + move || { + tokio_rt.block_on(run( + &tokio_rt, + config, + runtime_command_rx, + current_runtime_state_tx, + )) + } }); self.state = InternalState::Running { diff --git a/websrv/src/routing/api.rs b/websrv/src/routing/api.rs index 77475bbc..b70aa7fe 100644 --- a/websrv/src/routing/api.rs +++ b/websrv/src/routing/api.rs @@ -49,6 +49,7 @@ struct CleanseDatabaseQueryParams { #[allow(clippy::too_many_lines)] // TODO pub(crate) fn create_filters( + rt: &tokio::runtime::Handle, shared_connection_gatekeeper: Arc, abort_flag: Arc, ) -> BoxedFilter<(impl Reply,)> { @@ -331,61 +332,65 @@ pub(crate) fn create_filters( .and(shared_connection_gatekeeper.clone()) .and(media_tracker_progress.clone()) .and(abort_flag.clone()) - .and_then( + .and_then({ + let rt = rt.clone(); move |uid, request_body, shared_connection_gatekeeper: Arc, media_tracker_progress: Arc>, - abort_flag: Arc| async move { - let (progress_event_tx, mut progress_event_rx) = watch::channel(None); - let watcher = tokio::spawn(async move { - *media_tracker_progress.lock().await = - MediaTrackerProgress::Scanning(Default::default()); - log::debug!("Watching media tracker scanning"); - while progress_event_rx.changed().await.is_ok() { - let progress = progress_event_rx - .borrow() - .as_ref() - .map(|event: &ScanProgressEvent| event.progress.clone()); - // Borrow has already been released at this point - if let Some(progress) = progress { - *media_tracker_progress.lock().await = - MediaTrackerProgress::Scanning(progress); + abort_flag: Arc| { + let rt = rt.clone(); + async move { + let (progress_event_tx, mut progress_event_rx) = watch::channel(None); + let watcher = rt.spawn(async move { + *media_tracker_progress.lock().await = + MediaTrackerProgress::Scanning(Default::default()); + log::debug!("Watching media tracker scanning"); + while progress_event_rx.changed().await.is_ok() { + let progress = progress_event_rx + .borrow() + .as_ref() + .map(|event: &ScanProgressEvent| event.progress.clone()); + // Borrow has already been released at this point + if let Some(progress) = progress { + *media_tracker_progress.lock().await = + MediaTrackerProgress::Scanning(progress); + } } - } - log::debug!("Unwatching media tracker scanning"); - *media_tracker_progress.lock().await = MediaTrackerProgress::Idle; - }); - let response = websrv::spawn_blocking_write_task( - &shared_connection_gatekeeper, - move |mut pooled_connection| { - abort_flag.store(false, Ordering::Relaxed); - api::media::tracker::scan_directories::handle_request( - &mut pooled_connection, - &uid, - request_body, - &mut |progress_event: ScanProgressEvent| { - if let Err(err) = progress_event_tx.send(Some(progress_event)) { - log::error!( + log::debug!("Unwatching media tracker scanning"); + *media_tracker_progress.lock().await = MediaTrackerProgress::Idle; + }); + let response = websrv::spawn_blocking_write_task( + &shared_connection_gatekeeper, + move |mut pooled_connection| { + abort_flag.store(false, Ordering::Relaxed); + api::media::tracker::scan_directories::handle_request( + &mut pooled_connection, + &uid, + request_body, + &mut |progress_event: ScanProgressEvent| { + if let Err(err) = progress_event_tx.send(Some(progress_event)) { + log::error!( "Failed to send media tracker scanning progress event: \ {:?}", err.0 ); - } - }, - &abort_flag, - ) - }, - ) - .await; - if let Err(err) = watcher.await { - log::error!( - "Failed to terminate media tracker scanning progress watcher: {err}" - ); + } + }, + &abort_flag, + ) + }, + ) + .await; + if let Err(err) = watcher.await { + log::error!( + "Failed to terminate media tracker scanning progress watcher: {err}" + ); + } + response.map(|response_body| warp::reply::json(&response_body)) } - response.map(|response_body| warp::reply::json(&response_body)) - }, - ); + } + }); let media_tracker_post_collection_import = warp::post() .and(collections_path) .and(path_param_collection_uid) @@ -396,61 +401,65 @@ pub(crate) fn create_filters( .and(shared_connection_gatekeeper.clone()) .and(media_tracker_progress.clone()) .and(abort_flag.clone()) - .and_then( + .and_then({ + let rt = rt.clone(); move |uid, request_body, shared_connection_gatekeeper: Arc, media_tracker_progress: Arc>, - abort_flag: Arc| async move { - let (progress_event_tx, mut progress_event_rx) = watch::channel(None); - let watcher = tokio::spawn(async move { - *media_tracker_progress.lock().await = - MediaTrackerProgress::Importing(Default::default()); - log::debug!("Watching media tracker importing"); - while progress_event_rx.changed().await.is_ok() { - let progress = progress_event_rx - .borrow() - .as_ref() - .map(|event: &ImportProgressEvent| event.summary.clone()); - // Borrow has already been released at this point - if let Some(progress) = progress { - *media_tracker_progress.lock().await = - MediaTrackerProgress::Importing(progress); + abort_flag: Arc| { + let rt = rt.clone(); + async move { + let (progress_event_tx, mut progress_event_rx) = watch::channel(None); + let watcher = rt.spawn(async move { + *media_tracker_progress.lock().await = + MediaTrackerProgress::Importing(Default::default()); + log::debug!("Watching media tracker importing"); + while progress_event_rx.changed().await.is_ok() { + let progress = progress_event_rx + .borrow() + .as_ref() + .map(|event: &ImportProgressEvent| event.summary.clone()); + // Borrow has already been released at this point + if let Some(progress) = progress { + *media_tracker_progress.lock().await = + MediaTrackerProgress::Importing(progress); + } } - } - log::debug!("Unwatching media tracker importing"); - *media_tracker_progress.lock().await = MediaTrackerProgress::Idle; - }); - let response = websrv::spawn_blocking_write_task( - &shared_connection_gatekeeper, - move |mut pooled_connection| { - abort_flag.store(false, Ordering::Relaxed); - api::media::tracker::import_files::handle_request( - &mut pooled_connection, - &uid, - request_body, - &mut |progress_event| { - if let Err(err) = progress_event_tx.send(Some(progress_event)) { - log::error!( + log::debug!("Unwatching media tracker importing"); + *media_tracker_progress.lock().await = MediaTrackerProgress::Idle; + }); + let response = websrv::spawn_blocking_write_task( + &shared_connection_gatekeeper, + move |mut pooled_connection| { + abort_flag.store(false, Ordering::Relaxed); + api::media::tracker::import_files::handle_request( + &mut pooled_connection, + &uid, + request_body, + &mut |progress_event| { + if let Err(err) = progress_event_tx.send(Some(progress_event)) { + log::error!( "Failed to send media tracker importing progress event: \ {:?}", err.0 ); - } - }, - &abort_flag, - ) - }, - ) - .await; - if let Err(err) = watcher.await { - log::error!( - "Failed to terminate media tracker importing progress watcher: {err}" - ); + } + }, + &abort_flag, + ) + }, + ) + .await; + if let Err(err) = watcher.await { + log::error!( + "Failed to terminate media tracker importing progress watcher: {err}" + ); + } + response.map(|response_body| warp::reply::json(&response_body)) } - response.map(|response_body| warp::reply::json(&response_body)) - }, - ); + } + }); let media_tracker_post_collection_untrack = warp::post() .and(collections_path) .and(path_param_collection_uid) @@ -487,62 +496,66 @@ pub(crate) fn create_filters( .and(shared_connection_gatekeeper.clone()) .and(media_tracker_progress) .and(abort_flag.clone()) - .and_then( + .and_then({ + let rt = rt.clone(); move |uid, request_body, shared_connection_gatekeeper: Arc, media_tracker_progress: Arc>, - abort_flag: Arc| async move { - let (progress_event_tx, mut progress_event_rx) = watch::channel(None); - let watcher = tokio::spawn(async move { - *media_tracker_progress.lock().await = - MediaTrackerProgress::FindingUntracked(Default::default()); - log::debug!("Watching media tracker finding untracked"); - while progress_event_rx.changed().await.is_ok() { - let progress = progress_event_rx - .borrow() - .as_ref() - .map(|event: &FindUntrackedProgressEvent| event.progress.clone()); - // Borrow has already been released at this point - if let Some(progress) = progress { - *media_tracker_progress.lock().await = - MediaTrackerProgress::FindingUntracked(progress); + abort_flag: Arc| { + let rt = rt.clone(); + async move { + let (progress_event_tx, mut progress_event_rx) = watch::channel(None); + let watcher = rt.spawn(async move { + *media_tracker_progress.lock().await = + MediaTrackerProgress::FindingUntracked(Default::default()); + log::debug!("Watching media tracker finding untracked"); + while progress_event_rx.changed().await.is_ok() { + let progress = progress_event_rx + .borrow() + .as_ref() + .map(|event: &FindUntrackedProgressEvent| event.progress.clone()); + // Borrow has already been released at this point + if let Some(progress) = progress { + *media_tracker_progress.lock().await = + MediaTrackerProgress::FindingUntracked(progress); + } } - } - log::debug!("Unwatching media tracker finding untracked"); - *media_tracker_progress.lock().await = MediaTrackerProgress::Idle; - }); - let response = websrv::spawn_blocking_read_task( - &shared_connection_gatekeeper, - move |mut pooled_connection| { - abort_flag.store(false, Ordering::Relaxed); - api::media::tracker::find_untracked_files::handle_request( - &mut pooled_connection, - &uid, - request_body, - &mut |progress_event: FindUntrackedProgressEvent| { - if let Err(err) = progress_event_tx.send(Some(progress_event)) { - log::error!( + log::debug!("Unwatching media tracker finding untracked"); + *media_tracker_progress.lock().await = MediaTrackerProgress::Idle; + }); + let response = websrv::spawn_blocking_read_task( + &shared_connection_gatekeeper, + move |mut pooled_connection| { + abort_flag.store(false, Ordering::Relaxed); + api::media::tracker::find_untracked_files::handle_request( + &mut pooled_connection, + &uid, + request_body, + &mut |progress_event: FindUntrackedProgressEvent| { + if let Err(err) = progress_event_tx.send(Some(progress_event)) { + log::error!( "Failed to send media tracker finding untracked progress \ event: {:?}", err.0 ); - } - }, - &abort_flag, - ) - }, - ) - .await; - if let Err(err) = watcher.await { - log::error!( + } + }, + &abort_flag, + ) + }, + ) + .await; + if let Err(err) = watcher.await { + log::error!( "Failed to terminate media tracker finding untracked progress watcher: \ {err}" ); + } + response.map(|response_body| warp::reply::json(&response_body)) } - response.map(|response_body| warp::reply::json(&response_body)) - }, - ); + } + }); let media_tracker_filters = media_tracker_get_progress .or(media_tracker_post_collection_scan) .or(media_tracker_post_collection_import) diff --git a/websrv/src/runtime.rs b/websrv/src/runtime.rs index 3d98a9ad..5e11a21b 100644 --- a/websrv/src/runtime.rs +++ b/websrv/src/runtime.rs @@ -73,6 +73,7 @@ fn provision_database(config: &DatabaseConfig) -> anyhow::Result, current_state_tx: discro::Publisher>, @@ -129,6 +130,7 @@ pub(crate) async fn run( let abort_flag = Arc::new(AtomicBool::new(false)); let api_filters = warp::path("api").and(self::routing::api::create_filters( + rt, Arc::clone(&shared_connection_pool), abort_flag, ));