From 68df34b469c207afdcf7be78d2cd47a08fafca8c Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Tue, 22 Oct 2024 12:38:09 +0200 Subject: [PATCH] desktop-app: Revise state and task management --- Cargo.lock | 12 ++-- Cargo.toml | 4 +- crates/desktop-app/src/collection/mod.rs | 64 +++++++++++++------ .../desktop-app/src/track/repo_search/mod.rs | 36 +++++------ .../src/track/repo_search/tasklet.rs | 3 +- file-collection-app/src/app/update.rs | 11 +--- file-collection-app/src/library/mod.rs | 35 ++-------- .../src/library/track_search.rs | 4 -- 8 files changed, 79 insertions(+), 90 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d6b82c16..4dda51c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3758,18 +3758,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.210" +version = "1.0.211" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" +checksum = "1ac55e59090389fb9f0dd9e0f3c09615afed1d19094284d0b200441f13550793" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.210" +version = "1.0.211" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" +checksum = "54be4f245ce16bc58d57ef2716271d0d4519e0f6defa147f6e081005bcb278ff" dependencies = [ "proc-macro2", "quote", @@ -4320,9 +4320,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.40.0" +version = "1.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" +checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" dependencies = [ "backtrace", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 5eef0971..4d052617 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,14 +51,14 @@ jiff = "0.1.13" log = "0.4.22" mime = "0.3.17" regex = "1.11.0" -serde = "1.0.210" +serde = "1.0.211" serde_json = "1.0.132" static_assertions = "1.1.0" strum = "0.26.3" tantivy = "0.22.0" time = "0.3.36" thiserror = "1.0.64" -tokio = "1.40.0" +tokio = "1.41.0" url = "2.5.2" [workspace.lints.rust] diff --git a/crates/desktop-app/src/collection/mod.rs b/crates/desktop-app/src/collection/mod.rs index 084fbf48..e7e70004 100644 --- a/crates/desktop-app/src/collection/mod.rs +++ b/crates/desktop-app/src/collection/mod.rs @@ -13,7 +13,8 @@ use std::{ }; use anyhow::anyhow; -use discro::{Publisher, Subscriber}; +use discro::{Observer, Publisher}; +use tokio::task::JoinHandle; use url::Url; use aoide_backend_embedded::{ @@ -680,7 +681,7 @@ impl State { fn synchronize_vfs_task_joined( &mut self, - joined_task: JoinedTask, + joined_task: SynchronizeVfsTaskJoined, continuation: SynchronizeVfsTaskContinuation, ) -> Result, StateUnchanged> { let SynchronizeVfsTaskContinuation { pending_state } = continuation; @@ -758,6 +759,7 @@ pub struct RefreshFromDbTaskContinuation { pending_state: State, } +/// Context for applying the corresponding [`JoinedTask`]. #[derive(Debug)] pub struct SynchronizeVfsTaskContinuation { pending_state: State, @@ -765,6 +767,8 @@ pub struct SynchronizeVfsTaskContinuation { pub type SynchronizeVfsResult = anyhow::Result; +pub type SynchronizeVfsTaskJoined = JoinedTask; + /// Manages the mutable, observable state #[derive(Debug)] pub struct ObservableState(Observable); @@ -886,7 +890,7 @@ impl ObservableState { fn synchronize_vfs_task_joined( &self, - joined_task: JoinedTask, + joined_task: SynchronizeVfsTaskJoined, continuation: SynchronizeVfsTaskContinuation, ) -> Result, StateUnchanged> { modify_observable_state(&self.0, |state| { @@ -998,13 +1002,16 @@ where .map_err(Into::into) } +/// Background task. +/// +/// Both progress and outcome are observable. #[derive(Debug)] pub struct SynchronizeVfsTask { started_at: Instant, - progress: Subscriber>, - outcome: Subscriber>, + progress: Observer>, + outcome: Observer>, abort_flag: Arc, - abort_handle: tokio::task::AbortHandle, + supervisor_handle: JoinHandle<()>, } impl SynchronizeVfsTask { @@ -1016,9 +1023,9 @@ impl SynchronizeVfsTask { ) -> Result { let started_at = Instant::now(); let progress_pub = Publisher::new(None); - let progress = progress_pub.subscribe(); + let progress = progress_pub.observe(); let outcome_pub = Publisher::new(None); - let outcome = outcome_pub.subscribe(); + let outcome = outcome_pub.observe(); let report_progress_fn = { // TODO: How to avoid wrapping the publisher? let progress_pub = Arc::new(Mutex::new(progress_pub)); @@ -1030,11 +1037,10 @@ impl SynchronizeVfsTask { let (task, continuation) = synchronize_vfs_task(state, handle, report_progress_fn, Arc::clone(&abort_flag))?; let join_handle = rt.spawn(task); - let abort_handle = join_handle.abort_handle(); let state = Arc::clone(state); - // The join task is responsible for updating the state eventually and - // cannot be aborted! It completes after the main task completed. - let join_task = async move { + // The supervisor task is responsible for updating the state eventually. + // It finishes after the main task finished. + let supervisor_task = async move { let joined_task = JoinedTask::join(join_handle).await; log::debug!("Synchronize music directory task joined: {joined_task:?}"); let result = state.synchronize_vfs_task_joined(joined_task, continuation); @@ -1042,13 +1048,13 @@ impl SynchronizeVfsTask { outcome_pub.write(outcome); } }; - rt.spawn(join_task); + let supervisor_handle = rt.spawn(supervisor_task); Ok(Self { started_at, progress, outcome, abort_flag, - abort_handle, + supervisor_handle, }) } @@ -1058,23 +1064,45 @@ impl SynchronizeVfsTask { } #[must_use] - pub const fn progress(&self) -> &Subscriber> { + pub const fn progress(&self) -> &Observer> { &self.progress } #[must_use] - pub const fn outcome(&self) -> &Subscriber> { + pub const fn outcome(&self) -> &Observer> { &self.outcome } #[must_use] pub fn is_finished(&self) -> bool { - self.abort_handle.is_finished() + self.supervisor_handle.is_finished() } pub fn abort(&self) { self.abort_flag.store(true, Ordering::Relaxed); - self.abort_handle.abort(); + // Both tasks should not be cancelled! The inner task will finish + // ASAP after the abort flag has been set. + } + + pub async fn join(self) -> anyhow::Result> { + let Self { + outcome, + supervisor_handle, + .. + } = self; + supervisor_handle + .await + .map(|()| { + let outcome = outcome.read().clone(); + debug_assert!(outcome.is_some()); + outcome + }) + .map_err(|err| { + debug_assert!(outcome.read().is_none()); + // The supervisor task is never cancelled. + debug_assert!(!err.is_cancelled()); + err.into() + }) } } diff --git a/crates/desktop-app/src/track/repo_search/mod.rs b/crates/desktop-app/src/track/repo_search/mod.rs index 5cfb18ef..eec0ad67 100644 --- a/crates/desktop-app/src/track/repo_search/mod.rs +++ b/crates/desktop-app/src/track/repo_search/mod.rs @@ -13,8 +13,7 @@ use aoide_core::{ use aoide_core_api::{track::search::Params, Pagination}; use crate::{ - modify_observable_state, Handle, JoinedTask, Observable, ObservableReader, ObservableRef, - StateUnchanged, + modify_observable_state, Handle, Observable, ObservableReader, ObservableRef, StateUnchanged, }; pub mod tasklet; @@ -211,7 +210,7 @@ impl FetchState { can_fetch_more: bool, ) -> Result<(), StateUnchanged> { let num_fetched_entities = fetched_entities.len(); - log::debug!("Fetching succeeded with {num_fetched_entities} newly fetched entities"); + log::debug!("Fetching more succeeded with {num_fetched_entities} newly fetched entities"); let Self::Pending { fetched_entities_before, @@ -219,7 +218,7 @@ impl FetchState { } = self else { // Not applicable - log::error!("Not pending when fetching succeeded"); + log::error!("Not pending when fetching more succeeded"); return Err(StateUnchanged); }; let expected_offset = fetched_entities_before.as_ref().map_or(0, Vec::len); @@ -270,14 +269,14 @@ impl FetchState { #[allow(clippy::needless_pass_by_value)] fn fetch_more_failed(&mut self, error: anyhow::Error) -> Result<(), StateUnchanged> { - log::warn!("Fetching failed: {error}"); + log::warn!("Fetching more failed: {error}"); let Self::Pending { fetched_entities_before, pending_since: _, } = self else { // No effect - log::error!("Not pending when fetching failed"); + log::error!("Not pending when fetching more failed"); return Err(StateUnchanged); }; let fetched_entities_before = std::mem::take(fetched_entities_before); @@ -288,15 +287,16 @@ impl FetchState { Ok(()) } - fn fetch_more_aborted(&mut self) -> Result<(), StateUnchanged> { - log::debug!("Fetching aborted"); + #[allow(dead_code)] // Currently the task cannot be cancelled. + fn fetch_more_cancelled(&mut self) -> Result<(), StateUnchanged> { + log::debug!("Fetching more cancelled"); let Self::Pending { fetched_entities_before, pending_since: _, } = self else { // No effect - log::error!("Not pending when fetching aborted"); + log::error!("Not pending when fetching more cancelled"); return Err(StateUnchanged); }; let fetched_entities_before = std::mem::take(fetched_entities_before); @@ -605,9 +605,9 @@ impl State { self.fetch.fetch_more() } - fn fetch_more_task_joined( + fn fetch_more_task_completed( &mut self, - joined_tasked: JoinedTask, + result: FetchMoreResult, continuation: FetchMoreTaskContinuation, ) -> Result<(), StateUnchanged> { let FetchMoreTaskContinuation { @@ -625,9 +625,8 @@ impl State { // No effect. return Err(StateUnchanged); } - match joined_tasked { - JoinedTask::Cancelled => self.fetch.fetch_more_aborted(), - JoinedTask::Completed(Ok(fetched_entities)) => { + match result { + Ok(fetched_entities) => { let can_fetch_more = if let Some(limit) = limit { limit.get() <= fetched_entities.len() } else { @@ -640,8 +639,7 @@ impl State { can_fetch_more, ) } - JoinedTask::Completed(Err(err)) => self.fetch.fetch_more_failed(err.into()), - JoinedTask::Panicked(err) => self.fetch.fetch_more_failed(err), + Err(err) => self.fetch.fetch_more_failed(err.into()), } } @@ -762,13 +760,13 @@ impl ObservableState { modify_observable_state(&self.0, |state| state.fetch_more_task(handle, fetch_limit)) } - pub fn fetch_more_task_joined( + pub fn fetch_more_task_completed( &self, - joined_task: JoinedTask, + result: FetchMoreResult, continuation: FetchMoreTaskContinuation, ) -> Result<(), StateUnchanged> { modify_observable_state(&self.0, |state| { - state.fetch_more_task_joined(joined_task, continuation) + state.fetch_more_task_completed(result, continuation) }) } diff --git a/crates/desktop-app/src/track/repo_search/tasklet.rs b/crates/desktop-app/src/track/repo_search/tasklet.rs index f18f13d0..c45b2705 100644 --- a/crates/desktop-app/src/track/repo_search/tasklet.rs +++ b/crates/desktop-app/src/track/repo_search/tasklet.rs @@ -70,8 +70,7 @@ pub fn on_should_prefetch( { log::debug!("Prefetching"); let result = task.await; - let _ = - observable_state.fetch_more_task_joined(result.into(), continuation); + let _ = observable_state.fetch_more_task_completed(result, continuation); } } OnChanged::Continue diff --git a/file-collection-app/src/app/update.rs b/file-collection-app/src/app/update.rs index 1f8328ed..60d5ea83 100644 --- a/file-collection-app/src/app/update.rs +++ b/file-collection-app/src/app/update.rs @@ -186,7 +186,7 @@ impl<'a> UpdateContext<'a> { TrackSearchAction::FetchMore => { memo_state.abort(); debug_assert!(matches!(memo_state, track_search::MemoState::Ready(_))); - library.spawn_fetch_more_track_search_results_task(rt, *msg_tx) + library.fetch_more_track_search_results(rt) } TrackSearchAction::AbortPendingStateChange => { if matches!(memo_state, track_search::MemoState::Pending { .. }) { @@ -320,15 +320,6 @@ impl<'a> UpdateContext<'a> { ctx.request_repaint(); } } - library::track_search::Event::FetchMoreTaskCompleted { - result, - continuation, - } => { - library.on_fetch_more_track_search_results_task_completed( - result, - continuation, - ); - } } } library::Event::MusicDirSyncProgress(progress) => { diff --git a/file-collection-app/src/library/mod.rs b/file-collection-app/src/library/mod.rs index 9c878ec3..4c3dc822 100644 --- a/file-collection-app/src/library/mod.rs +++ b/file-collection-app/src/library/mod.rs @@ -364,7 +364,7 @@ impl Library { }; rt.spawn({ let event_emitter = event_emitter.clone(); - let mut subscriber = task.progress().clone(); + let mut subscriber = task.progress().subscribe(); async move { loop { let progress = subscriber.read_ack().clone(); @@ -382,7 +382,7 @@ impl Library { }); rt.spawn({ let event_emitter = event_emitter.clone(); - let mut subscriber = task.outcome().clone(); + let mut subscriber = task.outcome().subscribe(); async move { loop { let outcome = subscriber.read_ack().clone(); @@ -488,14 +488,10 @@ impl Library { } #[allow(clippy::must_use_candidate)] - pub fn spawn_fetch_more_track_search_results_task( + pub fn fetch_more_track_search_results( &self, tokio_rt: &tokio::runtime::Handle, - event_emitter: &E, - ) -> ActionResponse - where - E: EventEmitter + Clone + 'static, - { + ) -> ActionResponse { let Ok((task, continuation)) = self .state_observables .track_search @@ -504,33 +500,14 @@ impl Library { return ActionResponse::Rejected; }; log::debug!("Fetching more track search results"); - let event_emitter = event_emitter.clone(); + let track_search_state = Arc::clone(&self.state_observables.track_search); tokio_rt.spawn(async move { let result = task.await; - event_emitter - .emit_event( - track_search::Event::FetchMoreTaskCompleted { - result, - continuation, - } - .into(), - ) - .ok(); + let _ = track_search_state.fetch_more_task_completed(result, continuation); }); ActionResponse::Accepted } - pub fn on_fetch_more_track_search_results_task_completed( - &self, - result: track_search::FetchMoreResult, - continuation: track_search::FetchMoreTaskContinuation, - ) { - let _ = self - .state_observables - .track_search - .fetch_more_task_joined(result.into(), continuation); - } - /// Spawn reactive background tasks pub fn spawn_background_tasks(&self, tokio_rt: &tokio::runtime::Handle, settings_dir: PathBuf) { tokio_rt.spawn(settings::tasklet::on_state_changed_save_to_file( diff --git a/file-collection-app/src/library/track_search.rs b/file-collection-app/src/library/track_search.rs index b9ba89f9..b15036a3 100644 --- a/file-collection-app/src/library/track_search.rs +++ b/file-collection-app/src/library/track_search.rs @@ -53,10 +53,6 @@ pub(super) const DEFAULT_PREFETCH_LIMIT: NonZeroUsize = #[derive(Debug)] pub enum Event { StateChanged, - FetchMoreTaskCompleted { - result: FetchMoreResult, - continuation: FetchMoreTaskContinuation, - }, } pub type StateRef<'a> = Ref<'a, State>;