From c3634ecca84f790b6324a5bde822dcbfbcc6953c Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Thu, 24 Oct 2024 17:51:49 +0200 Subject: [PATCH] desktop-app --- Cargo.lock | 20 +- Cargo.toml | 6 +- crates/desktop-app/src/collection/mod.rs | 906 ++++++++++-------- crates/desktop-app/src/collection/tasklet.rs | 78 +- crates/desktop-app/src/fs.rs | 2 + crates/desktop-app/src/handle.rs | 59 -- crates/desktop-app/src/lib.rs | 159 ++- crates/desktop-app/src/settings/mod.rs | 47 +- crates/desktop-app/src/settings/tasklet.rs | 30 +- .../desktop-app/src/track/repo_search/mod.rs | 419 ++++---- .../src/track/repo_search/tasklet.rs | 90 +- crates/repo/src/tag/mod.rs | 69 +- file-collection-app/src/library/mod.rs | 59 +- file-collection-app/src/main.rs | 4 +- 14 files changed, 1036 insertions(+), 912 deletions(-) delete mode 100644 crates/desktop-app/src/handle.rs diff --git a/Cargo.lock b/Cargo.lock index 4dda51c8..48cad633 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -151,9 +151,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.90" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37bf3594c4c988a53154954629820791dde498571819ae4ca50ca811e060cc95" +checksum = "c042108f3ed77fd83760a5fd79b53be043192bb3b9dba91d8c574c0ada7850c8" [[package]] name = "aoide" @@ -3758,18 +3758,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.211" +version = "1.0.213" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ac55e59090389fb9f0dd9e0f3c09615afed1d19094284d0b200441f13550793" +checksum = "3ea7893ff5e2466df8d720bb615088341b295f849602c6956047f8f80f0e9bc1" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.211" +version = "1.0.213" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54be4f245ce16bc58d57ef2716271d0d4519e0f6defa147f6e081005bcb278ff" +checksum = "7e85ad2009c50b58e87caa8cd6dac16bdf511bbfb7af6c33df902396aa480fa5" dependencies = [ "proc-macro2", "quote", @@ -4232,18 +4232,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.64" +version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" +checksum = "5d11abd9594d9b38965ef50805c5e469ca9cc6f197f883f717e0269a3057b3d5" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.64" +version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" +checksum = "ae71770322cbd277e69d762a16c444af02aa0575ac0d174f0b9562d3b37f8602" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 4d052617..3231e7bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ aoide-websrv-warp-sqlite = { version = "=0.8.0", path = "crates/websrv-warp-sqli # Commonly used dependencies. # Also serves for documenting the opionionated selection of third-party crates. aho-corasick = "1.1.3" -anyhow = "1.0.90" +anyhow = "1.0.91" bitflags = "2.6.0" data-encoding = "2.6.0" derive_more = "1.0.0" @@ -51,13 +51,13 @@ jiff = "0.1.13" log = "0.4.22" mime = "0.3.17" regex = "1.11.0" -serde = "1.0.211" +serde = "1.0.213" serde_json = "1.0.132" static_assertions = "1.1.0" strum = "0.26.3" tantivy = "0.22.0" time = "0.3.36" -thiserror = "1.0.64" +thiserror = "1.0.65" tokio = "1.41.0" url = "2.5.2" diff --git a/crates/desktop-app/src/collection/mod.rs b/crates/desktop-app/src/collection/mod.rs index e7e70004..a06c2d32 100644 --- a/crates/desktop-app/src/collection/mod.rs +++ b/crates/desktop-app/src/collection/mod.rs @@ -3,7 +3,6 @@ use std::{ borrow::Cow, - future::Future, path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, Ordering}, @@ -14,7 +13,7 @@ use std::{ use anyhow::anyhow; use discro::{Observer, Publisher}; -use tokio::task::JoinHandle; +use tokio::task::{AbortHandle, JoinHandle}; use url::Url; use aoide_backend_embedded::{ @@ -39,10 +38,7 @@ use aoide_core_api::{ use aoide_media_file::io::import::ImportTrackConfig; use aoide_repo::collection::{KindFilter, MediaSourceRootUrlFilter}; -use crate::{ - modify_observable_state, Environment, Handle, JoinedTask, Observable, ObservableReader, - ObservableRef, StateUnchanged, -}; +use crate::{modify_shared_state_infallible, Environment, JoinedTask, Reaction, StateEffect}; pub mod tasklet; @@ -125,13 +121,13 @@ pub struct RestoringFromMusicDirectoryContext { } #[derive(Debug, Clone, PartialEq, Eq)] -pub struct LoadingContext { +pub struct LoadingFromDatabaseContext { pub entity_uid: EntityUid, pub loaded_before: Option, } #[derive(Debug, Clone, PartialEq, Eq)] -pub struct SynchronizingContext { +pub struct SynchronizingVfsContext { pub entity: Entity, } @@ -213,7 +209,7 @@ impl RestoringFromMusicDirectoryContext { uid = entity_with_summary.entity.hdr.uid, title = entity_with_summary.entity.body.title, ); - let state = State::loading_succeeded(entity_with_summary); + let state = State::loading_from_database_succeeded(entity_with_summary); return Ok(state); } if !matches!(nested_music_dirs, NestedMusicDirectoriesStrategy::Permit) { @@ -283,14 +279,14 @@ impl RestoringFromMusicDirectoryContext { uid = entity_with_summary.entity.hdr.uid, title = entity_with_summary.entity.body.title, ); - let state = State::loading_succeeded(entity_with_summary); + let state = State::loading_from_database_succeeded(entity_with_summary); Ok(state) } } } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] pub enum RestoreFromMusicDirectoryError { EntityNotFound, Other(String), @@ -306,10 +302,11 @@ impl RestoreFromMusicDirectoryError { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] pub enum RestoringFromMusicDirectoryState { Pending { - pending_since: Instant, + since: Instant, + abort_handle: AbortHandle, }, Failed { error: RestoreFromMusicDirectoryError, @@ -319,23 +316,35 @@ pub enum RestoringFromMusicDirectoryState { }, } -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum LoadingState { - Pending { pending_since: Instant }, - Failed { error: String }, +#[derive(Debug)] +pub enum LoadingFromDatabaseState { + Pending { + since: Instant, + abort_handle: AbortHandle, + }, + Failed { + error: String, + }, } -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum SynchronizingState { - Pending { pending_since: Instant }, - Failed { error: String }, - Succeeded, +#[derive(Debug)] +pub enum SynchronizingVfsState { + Pending { + since: Instant, + task: SynchronizingVfsTask, + }, + Failed { + error: String, + }, + Succeeded { + outcome: Outcome, + }, Aborted, } /// State of a single collection that is based on directory in the /// local directory using a virtual file system (VFS) for content paths. -#[derive(Debug, Clone, Default, PartialEq, Eq)] +#[derive(Debug, Default)] #[allow(clippy::large_enum_variant)] pub enum State { #[default] @@ -344,13 +353,13 @@ pub enum State { context: RestoringFromMusicDirectoryContext, state: RestoringFromMusicDirectoryState, }, - Loading { - context: LoadingContext, - state: LoadingState, + LoadingFromDatabase { + context: LoadingFromDatabaseContext, + state: LoadingFromDatabaseState, }, - Synchronizing { - context: SynchronizingContext, - state: SynchronizingState, + SynchronizingVfs { + context: SynchronizingVfsContext, + state: SynchronizingVfsState, }, Ready { entity: Entity, @@ -363,17 +372,17 @@ impl State { pub const fn pending_since(&self) -> Option { match self { Self::RestoringFromMusicDirectory { - state: RestoringFromMusicDirectoryState::Pending { pending_since, .. }, + state: RestoringFromMusicDirectoryState::Pending { since, .. }, .. } - | Self::Loading { - state: LoadingState::Pending { pending_since, .. }, + | Self::LoadingFromDatabase { + state: LoadingFromDatabaseState::Pending { since, .. }, .. } - | Self::Synchronizing { - state: SynchronizingState::Pending { pending_since, .. }, + | Self::SynchronizingVfs { + state: SynchronizingVfsState::Pending { since, .. }, .. - } => Some(*pending_since), + } => Some(*since), _ => None, } } @@ -385,7 +394,7 @@ impl State { #[must_use] pub const fn is_synchronizing(&self) -> bool { - matches!(self, Self::Synchronizing { .. }) + matches!(self, Self::SynchronizingVfs { .. }) } #[must_use] @@ -397,24 +406,24 @@ impl State { pub fn music_dir(&self) -> Option> { match self { Self::Void - | Self::Loading { + | Self::LoadingFromDatabase { context: - LoadingContext { + LoadingFromDatabaseContext { loaded_before: None, .. }, .. } => None, - Self::Loading { + Self::LoadingFromDatabase { context: - LoadingContext { + LoadingFromDatabaseContext { loaded_before: Some(loaded_before), .. }, .. } => vfs_music_dir(loaded_before), - Self::Synchronizing { - context: SynchronizingContext { entity }, + Self::SynchronizingVfs { + context: SynchronizingVfsContext { entity }, .. } | Self::Ready { entity, .. } => vfs_music_dir(&entity.body), @@ -434,16 +443,16 @@ impl State { pub fn entity_brief(&self) -> Option<(&EntityUid, Option<&Collection>)> { match self { Self::Void | Self::RestoringFromMusicDirectory { .. } => None, - Self::Loading { + Self::LoadingFromDatabase { context: - LoadingContext { + LoadingFromDatabaseContext { entity_uid, loaded_before, }, .. } => Some((entity_uid, loaded_before.as_ref())), - Self::Synchronizing { - context: SynchronizingContext { entity }, + Self::SynchronizingVfs { + context: SynchronizingVfsContext { entity }, .. } | Self::Ready { entity, .. } => Some((&entity.hdr.uid, Some(&entity.body))), @@ -465,35 +474,38 @@ impl State { state: RestoringFromMusicDirectoryState::Failed { error }, .. } => Some(error.as_str()), - Self::Loading { - state: LoadingState::Failed { error }, + Self::LoadingFromDatabase { + state: LoadingFromDatabaseState::Failed { error }, + .. + } => Some(error.as_str()), + Self::SynchronizingVfs { + state: SynchronizingVfsState::Failed { error }, .. } => Some(error.as_str()), _ => None, } } - pub fn reset(&mut self) -> Result<(), StateUnchanged> { + pub fn reset(&mut self) -> StateEffect { if matches!(self, Self::Void) { - return Err(StateUnchanged); + return StateEffect::Unchanged; } let reset = Self::Void; log::debug!("Resetting state: {self:?} -> {reset:?}"); *self = reset; - Ok(()) + StateEffect::Changed } - pub fn update_music_dir( + pub fn spawn_restoring_from_music_directory_task( &mut self, + this: &SharedState, + rt: &tokio::runtime::Handle, + env: &Arc, new_kind: Option>, new_music_dir: DirPath<'_>, restore_entity: RestoreEntityStrategy, nested_music_dirs: NestedMusicDirectoriesStrategy, - ) -> Result<(), StateUnchanged> { - if self.is_pending() { - log::warn!("Illegal state for updating directory: {self:?}"); - return Err(StateUnchanged); - } + ) -> (Reaction, StateEffect) { match self { Self::Ready { entity, .. } => { // When set the `kind` controls the selection of collections by music directory. @@ -505,7 +517,7 @@ impl State { "Music directory unchanged and not updated: {music_dir}", music_dir = new_music_dir.display() ); - return Err(StateUnchanged); + return (Reaction::Accepted, StateEffect::Unchanged); } } } @@ -526,196 +538,388 @@ impl State { "Music directory unchanged and not updated: {music_dir}", music_dir = new_music_dir.display() ); - return Err(StateUnchanged); + return (Reaction::Accepted, StateEffect::Unchanged); } } _ => { // Proceed without any checks. } } + let context = RestoringFromMusicDirectoryContext { kind: new_kind.map(Into::into), music_dir: new_music_dir.into_owned(), restore_entity, nested_music_dirs, }; - let state = RestoringFromMusicDirectoryState::Pending { - pending_since: Instant::now(), + + let pending_since = Instant::now(); + debug_assert_ne!(Some(pending_since), self.pending_since()); + let continuation = RestoringFromMusicDirectoryTaskContinuation { + context: context.clone(), + pending_since, }; - *self = Self::RestoringFromMusicDirectory { context, state }; - Ok(()) + + let worker_task = rt.spawn({ + let env = Arc::clone(env); + let params = LoadStateFromDatabaseParams { + context: Some(context.clone()), + entity_uid: None, + }; + async move { load_state_from_database(env, params).await } + }); + let abort_handle = worker_task.abort_handle(); + rt.spawn({ + let this = this.clone(); + async move { + let worker_joined = JoinedTask::join(worker_task).await; + let _ = + this.restoring_from_music_directory_task_joined(worker_joined, continuation); + } + }); + + *self = Self::RestoringFromMusicDirectory { + context, + state: RestoringFromMusicDirectoryState::Pending { + since: pending_since, + abort_handle, + }, + }; + (Reaction::Accepted, StateEffect::Changed) } - fn refresh_from_db(&mut self) -> Result { + fn spawn_loading_from_database_task( + &mut self, + this: &SharedState, + rt: &tokio::runtime::Handle, + env: &Arc, + ) -> (Reaction, StateEffect) { let old_self = std::mem::replace(self, Self::Void); let context = match old_self { Self::Void => { - return Err(StateUnchanged); + // Nothing to do. + return (Reaction::Accepted, StateEffect::Unchanged); } - Self::RestoringFromMusicDirectory { context, .. } => { - let params = RefreshStateFromDbParams { - entity_uid: None, - context: Some(context.clone()), + Self::RestoringFromMusicDirectory { + context, + state: + RestoringFromMusicDirectoryState::Failed { .. } + | RestoringFromMusicDirectoryState::NestedMusicDirectoriesConflict { .. }, + } => { + let pending_since = Instant::now(); + debug_assert_ne!(Some(pending_since), self.pending_since()); + let continuation = RestoringFromMusicDirectoryTaskContinuation { + context: context.clone(), + pending_since, }; - let state = RestoringFromMusicDirectoryState::Pending { - pending_since: Instant::now(), + + let task = rt.spawn({ + let this = this.clone(); + let env = env.clone(); + let params = LoadStateFromDatabaseParams { + entity_uid: None, + context: Some(context.clone()), + }; + async move { + let result = load_state_from_database(env, params).await; + let _ = this + .restoring_from_music_directory_task_completed(result, continuation); + } + }); + + *self = Self::RestoringFromMusicDirectory { + context, + state: RestoringFromMusicDirectoryState::Pending { + since: pending_since, + task, + }, }; - *self = Self::RestoringFromMusicDirectory { context, state }; - return Ok(params); + + return (Reaction::Accepted, StateEffect::Changed); } - Self::Loading { - state: LoadingState::Failed { .. }, + Self::LoadingFromDatabase { + state: LoadingFromDatabaseState::Failed { .. }, context, } => context, Self::Ready { entity, .. } - | Self::Synchronizing { + | Self::SynchronizingVfs { state: - SynchronizingState::Failed { .. } - | SynchronizingState::Succeeded { .. } - | SynchronizingState::Aborted { .. }, - context: SynchronizingContext { entity, .. }, - } => LoadingContext { + SynchronizingVfsState::Failed { .. } + | SynchronizingVfsState::Succeeded { .. } + | SynchronizingVfsState::Aborted { .. }, + context: SynchronizingVfsContext { entity, .. }, + } => LoadingFromDatabaseContext { entity_uid: entity.raw.hdr.uid, loaded_before: Some(entity.raw.body), }, - _ => { - log::warn!("Illegal state for refreshing from database: {old_self:?}"); + old_self => { + // Restore old state and reject. *self = old_self; - return Err(StateUnchanged); + log::warn!("Illegal state for refreshing from database: {self:?}"); + return (Reaction::Rejected, StateEffect::Unchanged); } }; - let params = self.refresh_from_db_unchecked(context); - Ok(params) + self.spawn_loading_from_database_task_unchecked(this, rt, env, context); + (Reaction::Accepted, StateEffect::Changed) } #[must_use] - fn refresh_from_db_unchecked(&mut self, context: LoadingContext) -> RefreshStateFromDbParams { + fn spawn_loading_from_database_task_unchecked( + &mut self, + this: &SharedState, + rt: &tokio::runtime::Handle, + env: &Arc, + context: LoadingFromDatabaseContext, + ) { debug_assert!(matches!(self, Self::Void)); - let params = RefreshStateFromDbParams { + + let params = LoadStateFromDatabaseParams { entity_uid: Some(context.entity_uid.clone()), context: None, // Omit checking the context. }; - let state = LoadingState::Pending { - pending_since: Instant::now(), + let pending_since = Instant::now(); + let continuation = LoadingFromDatabaseTaskContinuation { + context: context.clone(), + pending_since, }; - *self = Self::Loading { context, state }; - params - } - fn synchronize_vfs(&mut self) -> Result { - let old_self = std::mem::replace(self, Self::Void); - let Self::Ready { entity, .. } = old_self else { - log::warn!("Illegal state for synchronizing: {old_self:?}"); - *self = old_self; - return Err(StateUnchanged); - }; - let entity_uid = entity.hdr.uid.clone(); - let context = SynchronizingContext { entity }; - let state = SynchronizingState::Pending { - pending_since: Instant::now(), + let task = rt.spawn({ + let this = this.clone(); + let env = env.clone(); + async move { + let result = load_state_from_database(env, params).await; + let _ = this.loading_from_database_task_completed(result, continuation); + } + }); + + *self = Self::LoadingFromDatabase { + context, + state: LoadingFromDatabaseState::Pending { + since: pending_since, + task, + }, }; - *self = Self::Synchronizing { context, state }; - Ok(entity_uid) + } + + fn continue_after_restoring_from_music_directory_task_joined( + &mut self, + joined: JoinedTask>, + continuation: RestoringFromMusicDirectoryTaskContinuation, + ) -> (Reaction, StateEffect) { + let RestoringFromMusicDirectoryTaskContinuation { + context: continuation_context, + pending_since: continuation_pending_since, + } = continuation; + match self { + Self::RestoringFromMusicDirectory { + context, + state: + RestoringFromMusicDirectoryState::Pending { + since: pending_since, + abort_handle, + }, + } => { + debug_assert!(abort_handle.is_finished()); + if *pending_since != continuation_pending_since || *context != continuation_context + { + log::warn!( + "State changed while restoring from music directory: current state {self:?}, continuation {continuation:?} - discarding {joined:?}", + continuation = RestoringFromMusicDirectoryTaskContinuation { + context: continuation_context, + pending_since: continuation_pending_since, + } + ); + return (Reaction::Rejected, StateEffect::Unchanged); + } + match joined { + JoinedTask::Completed(Ok(next_state)) => { + log::debug!("Restored state from music directory: {next_state:?}"); + *self = next_state; + } + JoinedTask::Cancelled => { + log::debug!("Restored state from music directory cancelled"); + *self = Self::Void; + } + JoinedTask::Completed(Err(err)) | JoinedTask::Panicked(err) => { + log::warn!("Failed to restore state from music directory: {err}"); + let error = RestoreFromMusicDirectoryError::Other(err.to_string()); + *self = Self::RestoringFromMusicDirectory { + context: continuation_context, + state: RestoringFromMusicDirectoryState::Failed { error }, + }; + } + } + } + _ => { + log::warn!( + "State changed while restoring from music directory: current state {self:?}, continuation {continuation:?} - discarding {joined:?}", + continuation = RestoringFromMusicDirectoryTaskContinuation { + context: continuation_context, + pending_since: continuation_pending_since, + } + ); + return (Reaction::Rejected, StateEffect::Unchanged); + } + } + (Reaction::Accepted, StateEffect::MaybeChanged) + } + + fn continue_after_loading_from_database_task_joined( + &mut self, + joined: JoinedTask>, + continuation: LoadingFromDatabaseTaskContinuation, + ) -> (Reaction, StateEffect) { + let LoadingFromDatabaseTaskContinuation { + context: continuation_context, + pending_since: continuation_pending_since, + } = continuation; + match self { + Self::LoadingFromDatabase { + context: self_context, + state: + LoadingFromDatabaseState::Pending { + since: self_pending_since, + abort_handle, + }, + } => { + debug_assert!(abort_handle.is_finished()); + if *pending_since != continuation_pending_since || *context != continuation_context { + log::warn!( + "State changed while loading from database: current state {self:?}, continuation {self_context:?} - discarding {joined:?}", + continuation = LoadingFromDatabaseTaskContinuation { + context: continuation_context, + pending_since: continuation_pending_since, + } + ); + return (Reaction::Rejected, StateEffect::Unchanged); + } + match joined { + JoinedTask::Completed(Ok(next_state)) => { + log::debug!("Loaded state from database: {next_state:?}"); + *self = next_state; + } + JoinedTask::Completed(Err(err)) | JoinedTask::Panicked(err) => { + log::warn!("Failed to load state from database: {err}"); + let error = err.to_string(); + *self = Self::LoadingFromDatabase { + context, + state: LoadingFromDatabaseState::Failed { error }, + }; + } + } + } + _ => { + log::warn!( + "State changed while loading from database: current {self:?}, continuation {continuation:?} - discarding {result:?}", + continuation = LoadingFromDatabaseTaskContinuation { + context, + pending_since, + } + ); + return (Reaction::Rejected, StateEffect::Unchanged); + } + } + (Reaction::Accepted, StateEffect::MaybeChanged) } #[must_use] - fn loading_succeeded(entity_with_summary: EntityWithSummary) -> Self { + fn loading_from_database_succeeded(entity_with_summary: EntityWithSummary) -> Self { let EntityWithSummary { entity, summary } = entity_with_summary; if let Some(summary) = summary { State::Ready { entity, summary } } else { // Should never happen - let context = LoadingContext { + let context = LoadingFromDatabaseContext { entity_uid: entity.raw.hdr.uid, loaded_before: Some(entity.raw.body), }; - let state = LoadingState::Failed { + let state = LoadingFromDatabaseState::Failed { error: "no summary".to_owned(), }; - Self::Loading { context, state } + Self::LoadingFromDatabase { context, state } } } - fn refresh_from_db_task_completed( + fn spawn_synchronizing_vfs_task( &mut self, - result: anyhow::Result, - continuation: RefreshFromDbTaskContinuation, - ) -> Result<(), StateUnchanged> { - let RefreshFromDbTaskContinuation { pending_state } = continuation; - if pending_state != *self { - log::warn!( - "State changed while refreshing from database: expected {pending_state:?}, actual {self:?} - discarding {result:?}", - ); - return Err(StateUnchanged); - } - match result { - Ok(next_state) => { - if *self == next_state { - return Err(StateUnchanged); - } - log::debug!("Refreshed state from database: {next_state:?}"); - *self = next_state; - } - Err(err) => { - let error = err.to_string(); - match self { - State::RestoringFromMusicDirectory { state, .. } => { - log::warn!("Restoring from music directory failed: {error}"); - let error = RestoreFromMusicDirectoryError::Other(error); - let next_state = RestoringFromMusicDirectoryState::Failed { error }; - debug_assert_ne!(*state, next_state); - *state = next_state; - } - State::Loading { state, .. } => { - log::warn!("Loading failed: {error}"); - let next_state = LoadingState::Failed { error }; - debug_assert_ne!(*state, next_state); - *state = next_state; - } - _ => unreachable!(), - } - } + this: &SharedState, + rt: &tokio::runtime::Handle, + env: &Arc, + ) -> (Reaction, StateEffect) { + let old_self = std::mem::replace(self, Self::Void); + let Self::Ready { entity, .. } = old_self else { + // Restore old state and reject. + log::warn!("Illegal state for synchronizing with local file system: {old_self:?}"); + *self = old_self; + return (Reaction::Rejected, StateEffect::Unchanged); + }; + + let context = SynchronizingVfsContext { entity }; + + let pending_since = Instant::now(); + let continuation = SynchronizingVfsTaskContinuation { + context: context.clone(), + pending_since, + }; + + let task = SynchronizingVfsTask::spawn(rt, env.clone(), this.clone(), continuation); + let state = SynchronizingVfsState::Pending { + since: pending_since, + task, }; - Ok(()) + + *self = Self::SynchronizingVfs { context, state }; + (Reaction::Accepted, StateEffect::Changed) } - fn synchronize_vfs_task_joined( + fn continue_after_synchronizing_vfs_task_completed( &mut self, - joined_task: SynchronizeVfsTaskJoined, - continuation: SynchronizeVfsTaskContinuation, - ) -> Result, StateUnchanged> { - let SynchronizeVfsTaskContinuation { pending_state } = continuation; - debug_assert!(matches!( - pending_state, - State::Synchronizing { - state: SynchronizingState::Pending { .. }, - .. - } - )); - if pending_state != *self { + result: anyhow::Result, + continuation: SynchronizingVfsTaskContinuation, + ) -> (Reaction, StateEffect) { + let Self::SynchronizingVfs { + state: + SynchronizingVfsState::Pending { + since: pending_since, + task: _, + }, + context, + } = self + else { log::warn!( - "State changed while synchronizing: expected {pending_state:?}, actual {self:?}", + "State changed while synchronizing: current state {self:?}, continuation {continuation:?} - discarding {result:?}", ); - return Err(StateUnchanged); - } - let Self::Synchronizing { state, .. } = self else { - unreachable!("illegal state"); + return (Reaction::Rejected, StateEffect::Unchanged); }; - let mut outcome = None; - let next_state = match joined_task { - JoinedTask::Cancelled => SynchronizingState::Aborted, - JoinedTask::Completed(Ok(ok)) => { - outcome = Some(ok); - SynchronizingState::Succeeded - } - JoinedTask::Completed(Err(err)) | JoinedTask::Panicked(err) => { + let SynchronizingVfsTaskContinuation { + context: continuation_context, + pending_since: continuation_pending_since, + } = continuation; + if continuation_pending_since != *pending_since || continuation_context != *context { + log::warn!( + "State changed while synchronizing: current state {self:?}, continuation {continuation:?} - discarding {result:?}", + continuation = SynchronizingVfsTaskContinuation { + context: continuation_context, + pending_since: continuation_pending_since, + } + ); + return (Reaction::Rejected, StateEffect::Unchanged); + } + + let next_state = match result { + Ok(outcome) => SynchronizingVfsState::Succeeded { outcome }, + Err(err) => { let error = err.to_string(); - SynchronizingState::Failed { error } + SynchronizingVfsState::Failed { error } } + JoinedTask::Cancelled => SynchronizingVfsState::Aborted, }; - debug_assert_ne!(*state, next_state); - *state = next_state; - Ok(outcome) + *self = Self::SynchronizingVfs { + state: next_state, + context: continuation_context, + }; + + (Reaction::Accepted, StateEffect::Changed) } /// Map an URL to the corresponding content path. @@ -752,40 +956,53 @@ impl State { } } -pub type StateSubscriber = discro::Subscriber; +pub type SharedStateSubscriber = discro::Subscriber; + +#[derive(Debug)] +pub struct RestoringFromMusicDirectoryTaskContinuation { + context: RestoringFromMusicDirectoryContext, + pending_since: Instant, +} #[derive(Debug)] -pub struct RefreshFromDbTaskContinuation { - pending_state: State, +pub struct LoadingFromDatabaseTaskContinuation { + context: LoadingFromDatabaseContext, + pending_since: Instant, } -/// Context for applying the corresponding [`JoinedTask`]. #[derive(Debug)] -pub struct SynchronizeVfsTaskContinuation { - pending_state: State, +pub struct SynchronizingVfsTaskContinuation { + context: SynchronizingVfsContext, + pending_since: Instant, } pub type SynchronizeVfsResult = anyhow::Result; -pub type SynchronizeVfsTaskJoined = JoinedTask; +pub type SynchronizingVfsTaskJoined = JoinedTask; /// Manages the mutable, observable state -#[derive(Debug)] -pub struct ObservableState(Observable); +#[derive(Debug, Default)] +pub struct SharedState(Publisher); -impl ObservableState { +impl Clone for SharedState { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl SharedState { #[must_use] pub fn new(initial_state: State) -> Self { - Self(Observable::new(initial_state)) + Self(Publisher::new(initial_state)) } #[must_use] - pub fn read(&self) -> ObservableStateRef<'_> { + pub fn read(&self) -> SharedStateRef<'_> { self.0.read() } #[must_use] - pub fn subscribe_changed(&self) -> StateSubscriber { + pub fn subscribe_changed(&self) -> SharedStateSubscriber { self.0.subscribe_changed() } @@ -793,137 +1010,106 @@ impl ObservableState { self.0.set_modified(); } - pub fn reset(&self) -> Result<(), StateUnchanged> { - modify_observable_state(&self.0, State::reset) + pub fn reset(&self) -> StateEffect { + modify_shared_state_infallible(&self.0, |state| (Reaction::Accepted, state.reset())).1 } - fn update_music_dir( + #[must_use] + pub fn spawn_restoring_from_music_directory_task( &self, + rt: &tokio::runtime::Handle, + env: &Arc, kind: Option>, new_music_dir: Option>, restore_entity: RestoreEntityStrategy, nested_music_dirs: NestedMusicDirectoriesStrategy, - ) -> Result<(), StateUnchanged> { + ) -> (Reaction, StateEffect) { let Some(new_music_dir) = new_music_dir else { - log::debug!("Resetting music directory"); - return self.reset(); + return (Reaction::Accepted, self.reset()); }; - log::debug!( - "Updating music directory: {new_music_dir}", - new_music_dir = new_music_dir.display() - ); - modify_observable_state(&self.0, |state| { - state.update_music_dir(kind, new_music_dir, restore_entity, nested_music_dirs) + modify_shared_state_infallible(&self.0, |state| { + state.spawn_restoring_from_music_directory_task( + self, + rt, + env, + kind, + new_music_dir, + restore_entity, + nested_music_dirs, + ) }) } - pub fn refresh_from_db_task( + #[must_use] + pub fn spawn_loading_from_database_task( &self, - handle: &Handle, - ) -> Result< - ( - impl Future> + Send + 'static, - RefreshFromDbTaskContinuation, - ), - StateUnchanged, - > { - let (pending_state, params) = modify_observable_state(&self.0, |state: &mut State| { - let params = state.refresh_from_db()?; - debug_assert!(state.is_pending()); - let pending_state = state.clone(); - Ok((pending_state, params)) - })?; - let handle = handle.clone(); - let task = async move { refresh_state_from_db(handle, params).await }; - let continuation = RefreshFromDbTaskContinuation { pending_state }; - Ok((task, continuation)) + rt: &tokio::runtime::Handle, + env: &Arc, + ) -> (Reaction, StateEffect) { + modify_shared_state_infallible(&self.0, |state| { + state.spawn_loading_from_database_task(self, rt, env) + }) } - pub fn refresh_from_db_task_completed( + #[must_use] + fn continue_after_restoring_from_music_directory_task_completed( &self, result: anyhow::Result, - continuation: RefreshFromDbTaskContinuation, - ) -> Result<(), StateUnchanged> { - modify_observable_state(&self.0, |state| { - state.refresh_from_db_task_completed(result, continuation) + continuation: RestoringFromMusicDirectoryTaskContinuation, + ) -> (Reaction, StateEffect) { + modify_shared_state_infallible(&self.0, |state| { + state.continue_after_restoring_from_music_directory_task_completed(result, continuation) }) } - fn synchronize_vfs_task( + #[must_use] + fn continue_after_loading_from_database_task_completed( &self, - handle: &Handle, - import_track_config: ImportTrackConfig, - report_progress_fn: ReportProgressFn, - abort_flag: Arc, - ) -> Result< - ( - impl Future + Send + 'static, - SynchronizeVfsTaskContinuation, - ), - StateUnchanged, - > - where - ReportProgressFn: - FnMut(batch::synchronize_collection_vfs::Progress) + Clone + Send + 'static, - { - let (pending_state, entity_uid) = modify_observable_state(&self.0, |state| { - let entity_uid = state.synchronize_vfs()?; - debug_assert!(state.is_pending()); - let pending_state = state.clone(); - Ok((pending_state, entity_uid)) - })?; - debug_assert!(matches!(pending_state, State::Synchronizing { .. })); - let handle = handle.clone(); - let task = async move { - synchronize_vfs( - handle, - entity_uid, - import_track_config, - report_progress_fn, - abort_flag, - ) - .await - }; - let continuation = SynchronizeVfsTaskContinuation { pending_state }; - Ok((task, continuation)) + result: anyhow::Result, + continuation: LoadingFromDatabaseTaskContinuation, + ) -> (Reaction, StateEffect) { + modify_shared_state_infallible(&self.0, |state| { + state.continue_after_loading_from_database_task_completed(result, continuation) + }) } - fn synchronize_vfs_task_joined( + fn spawn_synchronizing_vfs_task( &self, - joined_task: SynchronizeVfsTaskJoined, - continuation: SynchronizeVfsTaskContinuation, - ) -> Result, StateUnchanged> { - modify_observable_state(&self.0, |state| { - state.synchronize_vfs_task_joined(joined_task, continuation) + rt: &tokio::runtime::Handle, + env: &Arc, + ) -> (Reaction, StateEffect) { + modify_shared_state_infallible(&self.0, |state| { + state.spawn_synchronizing_vfs_task(self, rt, env) }) } -} -impl Default for ObservableState { - fn default() -> Self { - Self::new(Default::default()) + fn continue_after_synchronizing_vfs_task_completed( + &self, + result: anyhow::Result, + continuation: SynchronizingVfsTaskContinuation, + ) -> (Reaction, StateEffect) { + modify_shared_state_infallible(&self.0, |state| { + state.continue_after_synchronizing_vfs_task_completed(result, continuation) + }) } } -pub type ObservableStateRef<'a> = ObservableRef<'a, State>; - -impl ObservableReader for ObservableState { - fn read_lock(&self) -> ObservableStateRef<'_> { - self.0.read_lock() - } -} +pub type SharedStateRef<'a> = discro::Ref<'a, State>; #[derive(Debug, Clone)] -struct RefreshStateFromDbParams { +struct LoadStateFromDatabaseParams { entity_uid: Option, context: Option, } -async fn refresh_state_from_db(env: E, params: RefreshStateFromDbParams) -> anyhow::Result +async fn load_state_from_database( + env: E, + params: LoadStateFromDatabaseParams, +) -> anyhow::Result where E: AsRef + Send + 'static, { - let RefreshStateFromDbParams { + let LoadStateFromDatabaseParams { entity_uid, context, } = params; @@ -936,19 +1122,19 @@ where return Ok(entity_with_summary.map_or_else( || { if let Some(entity_uid) = entity_uid { - let context = LoadingContext { + let context = LoadingFromDatabaseContext { entity_uid, loaded_before: None, }; - let state = LoadingState::Failed { + let state = LoadingFromDatabaseState::Failed { error: "not found".to_owned(), }; - State::Loading { context, state } + State::LoadingFromDatabase { context, state } } else { State::Void } }, - State::loading_succeeded, + State::loading_from_database_succeeded, )); }; if let Some(entity_with_summary) = entity_with_summary { @@ -958,7 +1144,7 @@ where if kind.is_none() || kind == &entity_with_summary.entity.body.kind { let entity_music_dir = vfs_music_dir(&entity_with_summary.entity.body); if entity_music_dir.as_ref() == Some(music_dir) { - return Ok(State::loading_succeeded(entity_with_summary)); + return Ok(State::loading_from_database_succeeded(entity_with_summary)); } } log::debug!( @@ -1002,65 +1188,59 @@ where .map_err(Into::into) } -/// Background task. -/// -/// Both progress and outcome are observable. #[derive(Debug)] -pub struct SynchronizeVfsTask { - started_at: Instant, +pub struct SynchronizingVfsTask { progress: Observer>, - outcome: Observer>, abort_flag: Arc, - supervisor_handle: JoinHandle<()>, + task: JoinHandle<()>, } -impl SynchronizeVfsTask { +impl SynchronizingVfsTask { #[allow(clippy::missing_panics_doc)] - pub fn spawn( + fn spawn( rt: &tokio::runtime::Handle, - handle: &Handle, - state: &Arc, - ) -> Result { - let started_at = Instant::now(); + env: Arc, + state: SharedState, + continuation: SynchronizingVfsTaskContinuation, + ) -> Self { let progress_pub = Publisher::new(None); let progress = progress_pub.observe(); - let outcome_pub = Publisher::new(None); - let outcome = outcome_pub.observe(); - let report_progress_fn = { - // TODO: How to avoid wrapping the publisher? - let progress_pub = Arc::new(Mutex::new(progress_pub)); - move |progress: Option| { - progress_pub.lock().unwrap().write(progress); - } - }; let abort_flag = Arc::new(AtomicBool::new(false)); - let (task, continuation) = - synchronize_vfs_task(state, handle, report_progress_fn, Arc::clone(&abort_flag))?; - let join_handle = rt.spawn(task); - let state = Arc::clone(state); - // The supervisor task is responsible for updating the state eventually. - // It finishes after the main task finished. - let supervisor_task = async move { - let joined_task = JoinedTask::join(join_handle).await; - log::debug!("Synchronize music directory task joined: {joined_task:?}"); - let result = state.synchronize_vfs_task_joined(joined_task, continuation); - if let Ok(outcome) = result { - outcome_pub.write(outcome); + + let task = rt.spawn({ + let report_progress_fn = { + // TODO: How to avoid wrapping the publisher? + let progress_pub = Arc::new(Mutex::new(progress_pub)); + move |progress: Progress| { + progress_pub.lock().unwrap().write(Some(progress)); + } + }; + let import_track_config = ImportTrackConfig { + // TODO: Customize faceted tag mapping + faceted_tag_mapping: predefined_faceted_tag_mapping_config(), + ..Default::default() + }; + let abort_flag = Arc::clone(&abort_flag); + let entity_uid = continuation.context.entity.hdr.uid.clone(); + async move { + log::debug!("Synchronizing collection with local file system..."); + let result = synchronize_vfs( + env, + entity_uid, + import_track_config, + report_progress_fn, + abort_flag, + ) + .await; + let _ = state.continue_after_synchronizing_vfs_task_completed(result, continuation); } - }; - let supervisor_handle = rt.spawn(supervisor_task); - Ok(Self { - started_at, + }); + + Self { progress, - outcome, abort_flag, - supervisor_handle, - }) - } - - #[must_use] - pub const fn started_at(&self) -> Instant { - self.started_at + task, + } } #[must_use] @@ -1068,65 +1248,9 @@ impl SynchronizeVfsTask { &self.progress } - #[must_use] - pub const fn outcome(&self) -> &Observer> { - &self.outcome - } - - #[must_use] - pub fn is_finished(&self) -> bool { - self.supervisor_handle.is_finished() - } - pub fn abort(&self) { self.abort_flag.store(true, Ordering::Relaxed); // Both tasks should not be cancelled! The inner task will finish // ASAP after the abort flag has been set. } - - pub async fn join(self) -> anyhow::Result> { - let Self { - outcome, - supervisor_handle, - .. - } = self; - supervisor_handle - .await - .map(|()| { - let outcome = outcome.read().clone(); - debug_assert!(outcome.is_some()); - outcome - }) - .map_err(|err| { - debug_assert!(outcome.read().is_none()); - // The supervisor task is never cancelled. - debug_assert!(!err.is_cancelled()); - err.into() - }) - } -} - -fn synchronize_vfs_task( - state: &ObservableState, - handle: &Handle, - report_progress_fn: impl FnMut(Option) + Clone + Send + 'static, - abort_flag: Arc, -) -> Result< - ( - impl Future> + Send + 'static, - SynchronizeVfsTaskContinuation, - ), - StateUnchanged, -> { - log::debug!("Synchronizing collection..."); - let import_track_config = ImportTrackConfig { - // TODO: Customize faceted tag mapping - faceted_tag_mapping: predefined_faceted_tag_mapping_config(), - ..Default::default() - }; - let mut report_progress_fn = report_progress_fn.clone(); - let report_progress_fn = move |progress| { - report_progress_fn(Some(progress)); - }; - state.synchronize_vfs_task(handle, import_track_config, report_progress_fn, abort_flag) } diff --git a/crates/desktop-app/src/collection/tasklet.rs b/crates/desktop-app/src/collection/tasklet.rs index d464f57a..62617e20 100644 --- a/crates/desktop-app/src/collection/tasklet.rs +++ b/crates/desktop-app/src/collection/tasklet.rs @@ -10,19 +10,20 @@ use unnest::some_or_break; use aoide_core::util::fs::DirPath; -use super::{NestedMusicDirectoriesStrategy, ObservableState, RestoreEntityStrategy}; -use crate::{settings, Handle, StateUnchanged, WeakHandle}; +use super::{NestedMusicDirectoriesStrategy, RestoreEntityStrategy, SharedState}; +use crate::{settings, Environment, StateUnchanged}; async fn update_music_dir( - settings_state: &settings::ObservableState, - observable_state: &ObservableState, - handle: Handle, + rt: &tokio::runtime::Handle, + env: &Arc, + this: &SharedState, + settings_state: &settings::SharedState, music_dir: Option>, collection_kind: Option, restore_entity: RestoreEntityStrategy, nested_music_directories: NestedMusicDirectoriesStrategy, ) { - if let Err(StateUnchanged) = observable_state.update_music_dir( + if let Err(StateUnchanged) = this.update_music_dir( collection_kind.map(Into::into), music_dir, restore_entity, @@ -30,15 +31,19 @@ async fn update_music_dir( ) { return; } - if let Ok((task, continuation)) = observable_state.refresh_from_db_task(&handle) { - log::debug!("Refreshing from DB after updating music directory"); - let result = task.await; - let _ = observable_state.refresh_from_db_task_completed(result, continuation); + match this.spawn_refresh_from_db_task(rt, env) { + Ok(()) => { + // The refresh task will finish in the background. + log::debug!("Refreshing from DB after updating music directory"); + } + Err(StateUnchanged) => { + log::debug!("Not refreshed from DB after updating music directory"); + } } // After succeeded read the actual music directory from the collection state // and feed it back into the settings state. let new_music_dir = { - let state = observable_state.read(); + let state = this.read(); if !state.is_ready() { return; } @@ -56,9 +61,10 @@ async fn update_music_dir( } pub fn on_settings_state_changed( - settings_state: &Arc, - observable_state: Weak, - handle: WeakHandle, + rt: tokio::runtime::Handle, + env: Weak, + this: Weak, + settings_state: &Arc, restore_entity: RestoreEntityStrategy, nested_music_directories: NestedMusicDirectoriesStrategy, ) -> impl Future + Send + 'static { @@ -67,34 +73,32 @@ pub fn on_settings_state_changed( async move { log::debug!("Starting on_settings_state_changed"); loop { - { - let settings_state = some_or_break!(settings_state.upgrade()); - let observable_state = some_or_break!(observable_state.upgrade()); - let handle = some_or_break!(handle.upgrade()); - let (music_dir, collection_kind) = { - let settings_state = settings_state_sub.read_ack(); - let music_dir = settings_state.music_dir().cloned().map(DirPath::into_owned); - let collection_kind = settings_state.collection_kind.clone(); - (music_dir, collection_kind) - }; - update_music_dir( - &settings_state, - &observable_state, - handle, - music_dir, - collection_kind, - restore_entity, - nested_music_directories, - ) - .await; - } log::debug!("Suspending on_settings_state_changed"); if settings_state_sub.changed().await.is_err() { - // Publisher disappeared + // No publisher(s). break; } log::debug!("Resuming on_settings_state_changed"); + let env = some_or_break!(env.upgrade()); + let this = some_or_break!(this.upgrade()); + let settings_state = some_or_break!(settings_state.upgrade()); + let (music_dir, collection_kind) = { + let settings_state = settings_state_sub.read_ack(); + let music_dir = settings_state.music_dir().cloned().map(DirPath::into_owned); + let collection_kind = settings_state.collection_kind.clone(); + (music_dir, collection_kind) + }; + update_music_dir( + &rt, + &env, + &this, + &settings_state, + music_dir, + collection_kind, + restore_entity, + nested_music_directories, + ) + .await; } - log::debug!("Stopping on_settings_state_changed"); } } diff --git a/crates/desktop-app/src/fs.rs b/crates/desktop-app/src/fs.rs index 8a4686b6..0ae42859 100644 --- a/crates/desktop-app/src/fs.rs +++ b/crates/desktop-app/src/fs.rs @@ -1,6 +1,8 @@ // SPDX-FileCopyrightText: Copyright (C) 2018-2024 Uwe Klotz et al. // SPDX-License-Identifier: AGPL-3.0-or-later +//! File system utilities. + #[cfg(feature = "async-file-dialog")] pub async fn choose_directory( dir_path: impl Into>, diff --git a/crates/desktop-app/src/handle.rs b/crates/desktop-app/src/handle.rs deleted file mode 100644 index 86624d24..00000000 --- a/crates/desktop-app/src/handle.rs +++ /dev/null @@ -1,59 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (C) 2018-2024 Uwe Klotz et al. -// SPDX-License-Identifier: AGPL-3.0-or-later - -use std::{ - ops::Deref, - sync::{Arc, Weak}, -}; - -use aoide_backend_embedded::storage::DatabaseConfig; - -use crate::Environment; - -/// Shared runtime environment handle -/// -/// A cheaply `Clone`able and `Send`able handle to a shared runtime environment -/// for invoking operations. -#[derive(Clone)] -#[allow(missing_debug_implementations)] -pub struct Handle(Arc); - -impl Handle { - /// Set up a shared runtime environment - /// - /// See also: [`Environment::commission()`] - pub fn commission(db_config: &DatabaseConfig) -> anyhow::Result { - let context = Environment::commission(db_config)?; - Ok(Self(Arc::new(context))) - } - - #[must_use] - pub fn downgrade(&self) -> WeakHandle { - WeakHandle(Arc::downgrade(&self.0)) - } -} - -impl AsRef for Handle { - fn as_ref(&self) -> &Environment { - &self.0 - } -} - -impl Deref for Handle { - type Target = Environment; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -#[derive(Clone)] -#[allow(missing_debug_implementations)] -pub struct WeakHandle(Weak); - -impl WeakHandle { - #[must_use] - pub fn upgrade(&self) -> Option { - self.0.upgrade().map(Handle) - } -} diff --git a/crates/desktop-app/src/lib.rs b/crates/desktop-app/src/lib.rs index ab6ab09f..670bc3dd 100644 --- a/crates/desktop-app/src/lib.rs +++ b/crates/desktop-app/src/lib.rs @@ -1,18 +1,13 @@ // SPDX-FileCopyrightText: Copyright (C) 2018-2024 Uwe Klotz et al. // SPDX-License-Identifier: AGPL-3.0-or-later -use std::ops::Deref; +use std::ops::{Add, AddAssign}; -use discro::{Publisher, Ref, Subscriber}; +use discro::Publisher; use tokio::task::JoinHandle; -mod handle; -pub use self::handle::{Handle, WeakHandle}; - -// Re-export the embedded backend environment that is referenced by [`Handle`]. pub use aoide_backend_embedded::Environment; -/// File system utilities pub mod fs; /// Collection management @@ -24,73 +19,6 @@ pub mod settings; /// Track management pub mod track; -pub type ObservableRef<'a, T> = Ref<'a, T>; - -/// Manages the mutable, observable state -#[derive(Debug, Default)] -pub struct Observable { - publisher: Publisher, -} - -impl Clone for Observable { - fn clone(&self) -> Self { - let Self { publisher } = self; - Self { - publisher: publisher.clone(), - } - } -} - -impl Observable { - #[must_use] - pub fn new(initial_value: T) -> Self { - let publisher = Publisher::new(initial_value); - Self { publisher } - } - - #[must_use] - pub fn read(&self) -> ObservableRef<'_, T> { - self.publisher.read() - } - - #[must_use] - pub fn subscribe_changed(&self) -> Subscriber { - self.publisher.subscribe_changed() - } - - #[allow(clippy::must_use_candidate)] - pub fn modify(&self, modify: impl FnOnce(&mut T) -> bool) -> bool { - self.publisher.modify(modify) - } - - pub fn set_modified(&self) { - self.publisher.set_modified(); - } -} - -/// Read-only access to an observable. -pub trait ObservableReader { - /// Read the current value of the observable. - /// - /// Holds a read lock until the returned reference is dropped. - fn read_lock(&self) -> ObservableRef<'_, T>; -} - -impl ObservableReader for Observable { - fn read_lock(&self) -> ObservableRef<'_, T> { - self.read() - } -} - -impl ObservableReader for T -where - T: Deref>, -{ - fn read_lock(&self) -> ObservableRef<'_, T> { - self.read() - } -} - #[derive(Debug)] pub enum JoinedTask { Completed(T), @@ -120,17 +48,82 @@ impl From for JoinedTask { } } +#[derive(Debug)] +pub enum Reaction { + Rejected, + Accepted, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StateEffect { + /// The state has not been modified. + Unchanged, + /// The state might have been modified. + MaybeChanged, + /// The state has been modified. + Changed, +} + +impl Add for StateEffect { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + match (self, rhs) { + (Self::Unchanged, Self::Unchanged) => Self::Unchanged, + (Self::MaybeChanged, Self::Unchanged | Self::MaybeChanged) + | (Self::Unchanged, Self::MaybeChanged) => Self::MaybeChanged, + (Self::Changed, _) | (_, Self::Changed) => Self::Changed, + } + } +} + +impl AddAssign for StateEffect { + fn add_assign(&mut self, rhs: Self) { + *self = self.add(rhs); + } +} + #[derive(Debug)] pub struct StateUnchanged; -pub(crate) fn modify_observable_state( - observable: &Observable, - modify: impl FnOnce(&mut S) -> Result, -) -> Result { - let mut result = Err(StateUnchanged); - observable.modify(|state| { - result = modify(state); - result.is_ok() +impl From for StateEffect { + fn from(_: StateUnchanged) -> Self { + StateEffect::Unchanged + } +} + +pub(crate) fn modify_shared_state( + shared_state: &Publisher, + modify: impl FnOnce(&mut S) -> (anyhow::Result, StateEffect), +) -> (anyhow::Result, StateEffect) { + let mut reaction_result = Ok(Reaction::Rejected); + let mut state_effect = StateEffect::Unchanged; + shared_state.modify(|state| { + let (modify_reaction_result, modify_state_effect) = modify(state); + reaction_result = modify_reaction_result; + state_effect = modify_state_effect; + match state_effect { + StateEffect::Unchanged => false, + StateEffect::MaybeChanged | StateEffect::Changed => true, + } + }); + (reaction_result, state_effect) +} + +pub(crate) fn modify_shared_state_infallible( + shared_state: &Publisher, + modify: impl FnOnce(&mut S) -> (Reaction, StateEffect), +) -> (Reaction, StateEffect) { + let mut reaction = Reaction::Rejected; + let mut state_effect = StateEffect::Unchanged; + shared_state.modify(|state| { + let (modify_reaction, modify_state_effect) = modify(state); + reaction = modify_reaction; + state_effect = modify_state_effect; + match state_effect { + StateEffect::Unchanged => false, + StateEffect::MaybeChanged | StateEffect::Changed => true, + } }); - result + (reaction, state_effect) } diff --git a/crates/desktop-app/src/settings/mod.rs b/crates/desktop-app/src/settings/mod.rs index 6323d4d6..42975207 100644 --- a/crates/desktop-app/src/settings/mod.rs +++ b/crates/desktop-app/src/settings/mod.rs @@ -6,13 +6,14 @@ use std::{ path::{Path, PathBuf}, }; +use discro::Publisher; use serde::{Deserialize, Serialize}; use url::Url; use aoide_backend_embedded::storage::DatabaseConfig; use aoide_core::util::fs::DirPath; -use crate::{modify_observable_state, Observable, ObservableReader, ObservableRef, StateUnchanged}; +use crate::{modify_shared_state_infallible, Reaction, StateEffect}; pub const FILE_NAME: &str = "aoide_desktop_settings"; @@ -156,10 +157,10 @@ impl State { self.music_dir.as_ref() } - fn update_music_dir(&mut self, music_dir: Option<&DirPath<'_>>) -> Result<(), StateUnchanged> { + fn update_music_dir(&mut self, music_dir: Option<&DirPath<'_>>) -> StateEffect { if self.music_dir() == music_dir { log::debug!("Unchanged music directory: {music_dir:?}"); - return Err(StateUnchanged); + return StateEffect::Unchanged; } self.music_dir = if let Some(music_dir) = music_dir { log::info!( @@ -171,7 +172,7 @@ impl State { log::info!("Resetting music directory"); None }; - Ok(()) + StateEffect::Changed } } @@ -191,25 +192,32 @@ fn default_database_file_path(parent_dir: PathBuf) -> PathBuf { path_buf } -pub type StateSubscriber = discro::Subscriber; +pub type SharedStateRef<'a> = discro::Ref<'a, State>; +pub type SharedStateObserver = discro::Observer; +pub type SharedStateSubscriber = discro::Subscriber; -/// Manages the mutable, observable state +/// Shared, mutable state. #[derive(Debug)] -pub struct ObservableState(Observable); +pub struct SharedState(Publisher); -impl ObservableState { +impl SharedState { #[must_use] pub fn new(initial_state: State) -> Self { - Self(Observable::new(initial_state)) + Self(Publisher::new(initial_state)) } #[must_use] - pub fn read(&self) -> ObservableStateRef<'_> { + pub fn read(&self) -> SharedStateRef<'_> { self.0.read() } #[must_use] - pub fn subscribe_changed(&self) -> StateSubscriber { + pub fn observe(&self) -> SharedStateObserver { + self.0.observe() + } + + #[must_use] + pub fn subscribe_changed(&self) -> SharedStateSubscriber { self.0.subscribe_changed() } @@ -218,21 +226,16 @@ impl ObservableState { } #[allow(clippy::must_use_candidate)] - pub fn update_music_dir(&self, music_dir: Option<&DirPath<'_>>) -> Result<(), StateUnchanged> { - modify_observable_state(&self.0, |state| state.update_music_dir(music_dir)) + pub fn update_music_dir(&self, music_dir: Option<&DirPath<'_>>) -> StateEffect { + modify_shared_state_infallible(&self.0, |state| { + (Reaction::Accepted, state.update_music_dir(music_dir)) + }) + .1 } } -impl Default for ObservableState { +impl Default for SharedState { fn default() -> Self { Self::new(Default::default()) } } - -pub type ObservableStateRef<'a> = ObservableRef<'a, State>; - -impl ObservableReader for ObservableState { - fn read_lock(&self) -> ObservableStateRef<'_> { - self.0.read_lock() - } -} diff --git a/crates/desktop-app/src/settings/tasklet.rs b/crates/desktop-app/src/settings/tasklet.rs index c99afaea..323358e4 100644 --- a/crates/desktop-app/src/settings/tasklet.rs +++ b/crates/desktop-app/src/settings/tasklet.rs @@ -3,7 +3,7 @@ use std::{future::Future, path::PathBuf}; -use discro::{tasklet::OnChanged, Subscriber}; +use discro::{tasklet::OnChanged, Observer}; use aoide_core::util::fs::DirPath; @@ -13,15 +13,16 @@ use super::State; /// /// The current settings at the time of invocation are not saved. pub fn on_state_changed_save_to_file( - mut subscriber: Subscriber, + this: &Observer, settings_dir: PathBuf, mut report_error: impl FnMut(anyhow::Error) + Send + 'static, ) -> impl Future + Send + 'static { // Read and acknowledge the initial settings immediately before spawning // the async task. These are supposed to be saved already. Only subsequent - // changes will be noticed, which might occur already while spawning the task. + // changes will be captured, which might occur already while spawning the task. // Otherwise when reading the initial settings later within the spawned task // all intermediate changes would slip through unnoticed! + let mut subscriber = this.subscribe_changed(); let mut settings = subscriber.read_ack().clone(); async move { log::debug!("Starting on_state_changed_save_to_file"); @@ -32,6 +33,7 @@ pub fn on_state_changed_save_to_file( break; } log::debug!("Resuming on_state_changed_save_to_file"); + { let new_settings = subscriber.read_ack(); if settings == *new_settings { @@ -40,6 +42,7 @@ pub fn on_state_changed_save_to_file( } settings = new_settings.clone(); } + log::info!("Saving changed settings: {settings:?}"); let save_settings = settings.clone(); if let Err(err) = save_settings @@ -54,10 +57,11 @@ pub fn on_state_changed_save_to_file( /// Listen for changes of the music directory. pub fn on_music_dir_changed( - mut subscriber: Subscriber, + this: &Observer, mut on_changed: impl FnMut(Option<&DirPath<'_>>) -> OnChanged + Send + 'static, ) -> impl Future + Send + 'static { // Read the initial value immediately before spawning the async task + let mut subscriber = this.subscribe(); let mut value = subscriber .read_ack() .music_dir() @@ -74,24 +78,26 @@ pub fn on_music_dir_changed( OnChanged::Continue => (), OnChanged::Abort => { // Consumer has rejected the notification - log::debug!("Aborting on_music_dir_changed"); return; } } + value_changed = false; } - value_changed = false; + + log::debug!("Suspending on_music_dir_changed"); if subscriber.changed().await.is_err() { - // Publisher has disappeared - log::debug!("Aborting on_music_dir_changed"); break; } + log::debug!("Resuming on_music_dir_changed"); + let settings = subscriber.read_ack(); let new_value = settings.music_dir(); - if value.as_ref() != new_value { - value = new_value.cloned().map(DirPath::into_owned); - value_changed = true; + if value.as_ref() == new_value { + continue; } + // Only clone the new value if it differs from the current value. + value = new_value.cloned().map(DirPath::into_owned); + value_changed = true; } - log::debug!("Stopping on_music_dir_changed"); } } diff --git a/crates/desktop-app/src/track/repo_search/mod.rs b/crates/desktop-app/src/track/repo_search/mod.rs index 3b41251b..2780e7de 100644 --- a/crates/desktop-app/src/track/repo_search/mod.rs +++ b/crates/desktop-app/src/track/repo_search/mod.rs @@ -1,8 +1,9 @@ // SPDX-FileCopyrightText: Copyright (C) 2018-2024 Uwe Klotz et al. // SPDX-License-Identifier: AGPL-3.0-or-later -use std::{hash::Hash as _, num::NonZeroUsize, time::Instant}; +use std::{hash::Hash as _, num::NonZeroUsize, sync::Arc, time::Instant}; +use discro::Publisher; use highway::{HighwayHash, HighwayHasher, Key}; use aoide_backend_embedded::track::search; @@ -13,9 +14,7 @@ use aoide_core::{ use aoide_core_api::{track::search::Params, Pagination}; use tokio::task::JoinHandle; -use crate::{ - modify_observable_state, Handle, Observable, ObservableReader, ObservableRef, StateUnchanged, -}; +use crate::{modify_shared_state_infallible, Environment, Reaction, StateEffect}; pub mod tasklet; @@ -33,20 +32,22 @@ pub struct FetchedEntity { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct FetchedEntitiesMemo { - pub offset: usize, - pub last_offset_hash: u64, + offset: usize, + offset_hash: u64, } -#[must_use] -pub fn last_offset_hash_of_fetched_entities<'a>( - fetched_entities: impl Into>, -) -> u64 { - fetched_entities - .into() - .and_then(<[FetchedEntity]>::last) - .map_or(INITIAL_OFFSET_HASH_SEED, |fetched_entity| { - fetched_entity.offset_hash - }) +impl FetchedEntitiesMemo { + pub fn new(fetched_entities: &[FetchedEntity]) -> Self { + let offset = fetched_entities.len(); + let offset_hash = fetched_entities + .last() + .map(|last| last.offset_hash) + .unwrap_or(INITIAL_OFFSET_HASH_SEED); + Self { + offset, + offset_hash, + } + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] @@ -55,7 +56,7 @@ pub enum FetchStateMemo { Initial, Ready, Pending { - pending_since: Instant, + since: Instant, }, Failed, } @@ -66,7 +67,7 @@ impl FetchStateMemo { const fn pending_since(&self) -> Option { match self { Self::Initial | Self::Ready { .. } | Self::Failed { .. } => None, - Self::Pending { pending_since, .. } => Some(*pending_since), + Self::Pending { since, .. } => Some(*since), } } } @@ -87,8 +88,8 @@ enum FetchState { }, Pending { fetched_entities_before: Option>, - pending_since: Instant, - background_task: JoinHandle<()>, + since: Instant, + task: JoinHandle<()>, }, Failed { fetched_entities_before: Option>, @@ -103,9 +104,7 @@ impl FetchState { Self::Initial => FetchStateMemo::Initial, Self::Ready { .. } => FetchStateMemo::Ready, Self::Failed { .. } => FetchStateMemo::Failed, - Self::Pending { pending_since, .. } => FetchStateMemo::Pending { - pending_since: *pending_since, - }, + Self::Pending { since, .. } => FetchStateMemo::Pending { since: *since }, } } @@ -114,6 +113,22 @@ impl FetchState { self.fetch_state_memo().pending_since() } + #[must_use] + const fn is_pending(&self) -> bool { + self.pending_since().is_some() + } + + #[must_use] + fn abort_pending_task(&self) -> StateEffect { + match self { + Self::Initial | Self::Ready { .. } | Self::Failed { .. } => StateEffect::Unchanged, + Self::Pending { task, .. } => { + task.abort(); + StateEffect::MaybeChanged + } + } + } + #[must_use] fn fetched_entities(&self) -> Option<&[FetchedEntity]> { match self { @@ -132,21 +147,10 @@ impl FetchState { } } - #[must_use] - fn fetched_entities_memo(&self) -> Option { - let fetched_entities = self.fetched_entities()?; - let offset = fetched_entities.len(); - let last_offset_hash = last_offset_hash_of_fetched_entities(fetched_entities); - Some(FetchedEntitiesMemo { - offset, - last_offset_hash, - }) - } - #[must_use] fn memo(&self) -> FetchMemo { let state = self.fetch_state_memo(); - let fetched_entities = self.fetched_entities_memo(); + let fetched_entities = self.fetched_entities().map(FetchedEntitiesMemo::new); FetchMemo { state, fetched_entities, @@ -175,50 +179,39 @@ impl FetchState { } } - fn reset(&mut self) -> Result<(), StateUnchanged> { + #[must_use] + fn reset(&mut self) -> StateEffect { if matches!(self, Self::Initial) { // No effect - return Err(StateUnchanged); + return StateEffect::Unchanged; } *self = Default::default(); debug_assert!(matches!(self, Self::Initial)); - Ok(()) + StateEffect::Changed } fn fetch_more_succeeded( &mut self, - offset: usize, - offset_hash: u64, + memo: Option, fetched_entities: Vec, can_fetch_more: bool, - ) -> Result<(), StateUnchanged> { + ) { let num_fetched_entities = fetched_entities.len(); log::debug!("Fetching more succeeded with {num_fetched_entities} newly fetched entities"); + let (mut offset, mut offset_hash_seed) = memo + .map(|memo| (memo.offset, memo.offset_hash)) + .unwrap_or((0, INITIAL_OFFSET_HASH_SEED)); + let Self::Pending { fetched_entities_before, - pending_since: _, - background_task, + since: _, + task, } = self else { - // Not applicable - log::error!("Not pending when fetching more succeeded"); - return Err(StateUnchanged); + unreachable!(); }; - debug_assert!(background_task.is_finished()); - let expected_offset = fetched_entities_before.as_ref().map_or(0, Vec::len); - let expected_offset_hash = - last_offset_hash_of_fetched_entities(fetched_entities_before.as_deref()); - if offset != expected_offset || offset_hash != expected_offset_hash { - // Not applicable - log::warn!( - "Mismatching offset/hash after fetching succeeded: expected = \ - {expected_offset}/{expected_offset_hash}, actual = {offset}/{offset_hash}" - ); - return Err(StateUnchanged); - } - let mut offset = offset; - let mut offset_hash_seed = offset_hash; + debug_assert!(task.is_finished()); let mut fetched_entities_before = std::mem::take(fetched_entities_before).unwrap_or_default(); fetched_entities_before.reserve(fetched_entities.len()); @@ -249,45 +242,39 @@ impl FetchState { } else { log::debug!("Caching {num_fetched_entities} fetched entities"); } - Ok(()) } #[allow(clippy::needless_pass_by_value)] - fn fetch_more_failed(&mut self, error: anyhow::Error) -> Result<(), StateUnchanged> { + fn fetch_more_failed(&mut self, error: anyhow::Error) { log::warn!("Fetching more failed: {error}"); let Self::Pending { fetched_entities_before, - pending_since: _, - background_task, + since: _, + task, } = self else { - // No effect - log::error!("Not pending when fetching more failed"); - return Err(StateUnchanged); + unreachable!(); }; - debug_assert!(background_task.is_finished()); + debug_assert!(task.is_finished()); let fetched_entities_before = std::mem::take(fetched_entities_before); *self = Self::Failed { fetched_entities_before, error, }; - Ok(()) } #[allow(dead_code)] // Currently the task cannot be cancelled. - fn fetch_more_cancelled(&mut self) -> Result<(), StateUnchanged> { + fn fetch_more_cancelled(&mut self) { log::debug!("Fetching more cancelled"); let Self::Pending { fetched_entities_before, - pending_since: _, - background_task, + since: _, + task, } = self else { - // No effect - log::error!("Not pending when fetching more cancelled"); - return Err(StateUnchanged); + unreachable!(); }; - debug_assert!(background_task.is_finished()); + debug_assert!(task.is_finished()); let fetched_entities_before = std::mem::take(fetched_entities_before); if let Some(fetched_entities) = fetched_entities_before { *self = Self::Ready { @@ -297,7 +284,6 @@ impl FetchState { } else { *self = Self::Initial; } - Ok(()) } } @@ -433,10 +419,6 @@ impl State { } #[must_use] - pub fn fetched_entities_memo(&self) -> Option { - self.fetch.fetched_entities_memo() - } - fn clone_memo(&self) -> Memo { let Self { default_params, @@ -491,16 +473,15 @@ impl State { { let FetchedEntitiesMemo { offset, - last_offset_hash: _, + offset_hash: _, } = fetched_entities; let FetchedEntitiesMemo { offset: memo_offset, - last_offset_hash: memo_last_offset_hash, + offset_hash: memo_offset_hash, } = memo_fetched_entities; - debug_assert_eq!(*offset, self.fetched_entities().unwrap().len()); if *memo_offset > 0 && memo_offset <= offset - && *memo_last_offset_hash + && *memo_offset_hash == self .fetched_entities() .unwrap() @@ -529,85 +510,96 @@ impl State { diff } - fn reset(&mut self) -> Result<(), StateUnchanged> { - // Cloning the default params once for pre-creating the target state - // is required to avoid redundant code for determining in advance if - // the state would actually change or not. - let reset = Self::new(self.default_params.clone()); + #[must_use] + fn reset(&mut self) -> StateEffect { let Self { default_params: _, - context: reset_context, - fetch: reset_fetch, - } = reset; - debug_assert!(matches!(reset_fetch, FetchState::Initial)); - if self.context == reset_context && matches!(self.fetch, FetchState::Initial) { + context, + fetch, + } = self; + let reset_context = Default::default(); + let reset_fetch_effect = fetch.reset(); + if *context == reset_context && matches!(reset_fetch_effect, StateEffect::Unchanged) { // No effect. log::debug!("State doesn't need to be reset"); - return Err(StateUnchanged); + return StateEffect::Unchanged; } self.context = reset_context; - self.fetch = reset_fetch; + let reset_context_effect = StateEffect::Changed; debug_assert!(!self.should_prefetch()); - log::info!("State has been reset"); - Ok(()) + log::debug!("State has been reset"); + reset_context_effect + reset_fetch_effect } /// Update the collection UID /// /// Consumed the argument when returning `true`. - fn update_collection_uid( - &mut self, - collection_uid: &mut Option, - ) -> Result<(), StateUnchanged> { + #[must_use] + fn update_collection_uid(&mut self, collection_uid: &mut Option) -> StateEffect { if collection_uid.as_ref() == self.context.collection_uid.as_ref() { // No effect. log::debug!("Collection UID unchanged: {collection_uid:?}"); - return Err(StateUnchanged); + return StateEffect::Unchanged; } self.context.collection_uid = collection_uid.take(); - let _ = self.fetch.reset(); - if let Some(uid) = &self.context.collection_uid { - log::info!("Collection UID updated: {uid}"); - } else { - log::info!("Collection UID updated: "); - } - Ok(()) + log::debug!( + "Collection UID updated: {uid:?}", + uid = self.context.collection_uid + ); + StateEffect::Changed + self.fetch.reset() } /// Update the search parameters /// /// Consumed the argument when returning `true`. - fn update_params(&mut self, params: &mut Params) -> Result<(), StateUnchanged> { + #[must_use] + fn update_params(&mut self, params: &mut Params) -> StateEffect { if params == &self.context.params { // No effect. log::debug!("Params unchanged: {params:?}"); - return Err(StateUnchanged); + return StateEffect::Unchanged; } self.context.params = std::mem::take(params); - let _ = self.fetch.reset(); - log::info!("Params updated: {params:?}", params = self.context.params); - Ok(()) + log::debug!("Params updated: {params:?}", params = self.context.params); + StateEffect::Changed + self.fetch.reset() } + #[must_use] fn fetch_more_task_completed( &mut self, result: FetchMoreResult, continuation: FetchMoreTaskContinuation, - ) -> Result<(), StateUnchanged> { - let FetchMoreTaskContinuation { + ) -> (Reaction, StateEffect) { + let Self { + default_params, context, - offset, - offset_hash, + fetch: + FetchState::Pending { + fetched_entities_before, + since: pending_since, + task, + }, + } = self + else { + return (Reaction::Rejected, StateEffect::Unchanged); + }; + debug_assert!(task.is_finished()); + let FetchMoreTaskContinuation { + pending_since: continuation_pending_since, + default_params: continuation_default_params, + context: continuation_context, + fetched_entities_before: continuation_fetched_entities_before_memo, limit, } = continuation; - if context != self.context { - log::warn!( - "Mismatching context after fetching succeeded: expected = {expected_context:?}, \ - actual = {context:?}", - expected_context = self.context - ); - // No effect. - return Err(StateUnchanged); + if continuation_pending_since != *pending_since + || continuation_default_params != *default_params + || continuation_context != *context + || continuation_fetched_entities_before_memo + != fetched_entities_before + .as_deref() + .map(FetchedEntitiesMemo::new) + { + return (Reaction::Rejected, StateEffect::Unchanged); } match result { Ok(fetched_entities) => { @@ -617,122 +609,130 @@ impl State { false }; self.fetch.fetch_more_succeeded( - offset, - offset_hash, + continuation_fetched_entities_before_memo, fetched_entities, can_fetch_more, - ) + ); + } + Err(err) => { + self.fetch.fetch_more_failed(err.into()); } - Err(err) => self.fetch.fetch_more_failed(err.into()), } + + (Reaction::Accepted, StateEffect::MaybeChanged) } - fn reset_fetched(&mut self) -> Result<(), StateUnchanged> { + fn reset_fetched(&mut self) -> StateEffect { self.fetch.reset() } + #[must_use] fn spawn_fetch_more_task( &mut self, - observable: &ObservableState, - handle: &Handle, - tokio_rt: &tokio::runtime::Handle, + this: &SharedState, + rt: &tokio::runtime::Handle, + env: &Arc, fetch_limit: Option, - ) -> Result<(), StateUnchanged> { - if self.can_fetch_more() != Some(true) { - return Err(StateUnchanged); - } + ) -> (Reaction, StateEffect) { + let Some(collection_uid) = &self.context.collection_uid else { + debug_assert!(self.can_fetch_more() != Some(true)); + return (Reaction::Rejected, StateEffect::Unchanged); + }; let fetched_entities_before = match &mut self.fetch { FetchState::Initial => None, - FetchState::Pending { .. } | FetchState::Failed { .. } => { - // Not applicable - return Err(StateUnchanged); + FetchState::Pending { .. } + | FetchState::Failed { .. } + | FetchState::Ready { + can_fetch_more: false, + .. + } => { + debug_assert!(self.can_fetch_more() != Some(true)); + return (Reaction::Rejected, StateEffect::Unchanged); } FetchState::Ready { - fetched_entities, .. + can_fetch_more: true, + fetched_entities, + .. } => Some(std::mem::take(fetched_entities)), }; - let Context { - collection_uid, - params, - } = &self.context; - let Some(collection_uid) = collection_uid.clone() else { - return Err(StateUnchanged); - }; - - let offset = fetched_entities_before - .as_ref() - .and_then(|slice| slice.len().try_into().ok()); - - let task = { - let params = params.clone(); - let limit = fetch_limit.and_then(|limit| limit.get().try_into().ok()); - let pagination = Pagination { limit, offset }; - let handle = handle.clone(); - async move { search(handle.db_gatekeeper(), collection_uid, params, pagination).await } - }; + debug_assert!(self.can_fetch_more() != Some(true)); + let pending_since = Instant::now(); let continuation = { + let default_params = self.default_params.clone(); let context = self.context.clone(); - let offset = offset.unwrap_or(0) as usize; - let offset_hash = last_offset_hash_of_fetched_entities(self.fetched_entities()); + let fetched_entities_before = fetched_entities_before + .as_deref() + .map(FetchedEntitiesMemo::new); let limit = fetch_limit; FetchMoreTaskContinuation { + pending_since, + default_params, context, - offset, - offset_hash, + fetched_entities_before, limit, } }; - let background_task = tokio_rt.spawn({ - let observable = observable.clone(); + let task = rt.spawn({ + let this = this.clone(); + let env = env.clone(); + let collection_uid = collection_uid.clone(); + let params = continuation.context.params.clone(); + let offset = continuation + .fetched_entities_before + .as_ref() + .map(|memo| memo.offset.try_into().expect("convertible")); + let limit = fetch_limit.map(|limit| limit.get().try_into().expect("convertible")); + let pagination = Pagination { limit, offset }; async move { - let result = task.await; - let _ = observable.fetch_more_task_completed(result, continuation); + let result = search(env.db_gatekeeper(), collection_uid, params, pagination).await; + let _ = this.fetch_more_task_completed(result, continuation); } }); self.fetch = FetchState::Pending { fetched_entities_before, - pending_since: Instant::now(), - background_task, + since: pending_since, + task, }; - Ok(()) + (Reaction::Accepted, StateEffect::MaybeChanged) } } #[derive(Debug)] pub struct FetchMoreTaskContinuation { + pending_since: Instant, + default_params: Params, context: Context, - offset: usize, - offset_hash: u64, + fetched_entities_before: Option, limit: Option, } pub type FetchMoreResult = aoide_backend_embedded::Result>; -pub type StateSubscriber = discro::Subscriber; +pub type SharedStateSubscriber = discro::Subscriber; -/// Manages the mutable, observable state +/// Shared, mutable state. #[derive(Debug, Clone, Default)] -pub struct ObservableState(Observable); +pub struct SharedState(Publisher); -impl ObservableState { +impl SharedState { #[must_use] pub fn new(initial_state: State) -> Self { - Self(Observable::new(initial_state)) + Self(Publisher::new(initial_state)) } #[must_use] - pub fn read(&self) -> ObservableStateRef<'_> { + pub fn read(&self) -> SharedStateRef<'_> { self.0.read() } #[must_use] - pub fn subscribe_changed(&self) -> StateSubscriber { + pub fn subscribe_changed(&self) -> SharedStateSubscriber { self.0.subscribe_changed() } @@ -740,54 +740,57 @@ impl ObservableState { self.0.set_modified(); } - pub fn reset(&self) -> Result<(), StateUnchanged> { - modify_observable_state(&self.0, State::reset) + pub fn reset(&self) -> StateEffect { + modify_shared_state_infallible(&self.0, |state| (Reaction::Accepted, state.reset())).1 } - pub fn update_collection_uid( - &self, - collection_uid: &mut Option, - ) -> Result<(), StateUnchanged> { - modify_observable_state(&self.0, |state| state.update_collection_uid(collection_uid)) + pub fn update_collection_uid(&self, collection_uid: &mut Option) -> StateEffect { + modify_shared_state_infallible(&self.0, |state| { + ( + Reaction::Accepted, + state.update_collection_uid(collection_uid), + ) + }) + .1 } - pub fn update_params(&self, params: &mut Params) -> Result<(), StateUnchanged> { - modify_observable_state(&self.0, |state| state.update_params(params)) + pub fn update_params(&self, params: &mut Params) -> StateEffect { + modify_shared_state_infallible(&self.0, |state| { + (Reaction::Accepted, state.update_params(params)) + }) + .1 } + #[must_use] pub fn spawn_fetch_more_task( &self, - handle: &Handle, - tokio_rt: &tokio::runtime::Handle, + rt: &tokio::runtime::Handle, + env: &Arc, fetch_limit: Option, - ) -> Result<(), StateUnchanged> { - modify_observable_state(&self.0, |state| { - state.spawn_fetch_more_task(self, handle, tokio_rt, fetch_limit) + ) -> (Reaction, StateEffect) { + modify_shared_state_infallible(&self.0, |state| { + state.spawn_fetch_more_task(self, rt, env, fetch_limit) }) } + #[must_use] fn fetch_more_task_completed( &self, result: FetchMoreResult, continuation: FetchMoreTaskContinuation, - ) -> Result<(), StateUnchanged> { - modify_observable_state(&self.0, |state| { + ) -> (Reaction, StateEffect) { + modify_shared_state_infallible(&self.0, |state| { state.fetch_more_task_completed(result, continuation) }) } - pub fn reset_fetched(&self) -> Result<(), StateUnchanged> { - modify_observable_state(&self.0, State::reset_fetched) + pub fn reset_fetched(&self) -> StateEffect { + modify_shared_state_infallible(&self.0, |state| (Reaction::Accepted, state.reset_fetched())) + .1 } } -pub type ObservableStateRef<'a> = ObservableRef<'a, State>; - -impl ObservableReader for ObservableState { - fn read_lock(&self) -> ObservableStateRef<'_> { - self.0.read_lock() - } -} +pub type SharedStateRef<'a> = discro::Ref<'a, State>; const INITIAL_OFFSET_HASH_SEED: u64 = 0; diff --git a/crates/desktop-app/src/track/repo_search/tasklet.rs b/crates/desktop-app/src/track/repo_search/tasklet.rs index bcc08620..fd552107 100644 --- a/crates/desktop-app/src/track/repo_search/tasklet.rs +++ b/crates/desktop-app/src/track/repo_search/tasklet.rs @@ -7,14 +7,14 @@ use std::{ sync::{Arc, Weak}, }; -use discro::{tasklet::OnChanged, Subscriber}; -use unnest::some_or_return_with; +use discro::{tasklet::OnChanged, Observer, Subscriber}; +use unnest::{some_or_break, some_or_return_with}; -use crate::{collection, WeakHandle}; +use crate::{collection, Environment}; -use super::{ObservableState, State}; +use super::{SharedState, State}; -pub fn on_should_prefetch_trigger( +fn on_should_prefetch_trigger( subscriber: Subscriber, mut on_trigger: impl FnMut() -> OnChanged + Send + 'static, ) -> impl Future + Send + 'static { @@ -29,7 +29,7 @@ pub fn on_should_prefetch_trigger( ) } -pub fn on_should_prefetch_trigger_async( +fn on_should_prefetch_trigger_async( subscriber: Subscriber, mut on_trigger: impl FnMut() -> T + Send + 'static, ) -> impl Future + Send + 'static @@ -48,29 +48,31 @@ where } pub fn on_should_prefetch( - observable_state: &Arc, - handle: WeakHandle, - tokio_rt: tokio::runtime::Handle, + this: &Arc, + rt: tokio::runtime::Handle, + env: Weak, prefetch_limit: Option, ) -> impl Future + Send + 'static { - let observable_state_sub = observable_state.subscribe_changed(); - let observable_state = Arc::downgrade(observable_state); + let subscriber = this.subscribe_changed(); + let this = Arc::downgrade(this); async move { - log::debug!("on_should_prefetch"); - on_should_prefetch_trigger_async(observable_state_sub, move || { - let observable_state = Weak::clone(&observable_state); - let handle = handle.clone(); - let tokio_rt = tokio_rt.clone(); + log::debug!("Starting on_should_prefetch"); + on_should_prefetch_trigger_async(subscriber, move || { + let rt = rt.clone(); + let env = Weak::clone(&env); + let this = Weak::clone(&this); + async move { - let observable_state = - some_or_return_with!(observable_state.upgrade(), OnChanged::Abort); - let should_prefetch = observable_state.read().should_prefetch(); + log::debug!("Resuming on_should_prefetch"); + let this = some_or_return_with!(this.upgrade(), OnChanged::Abort); + let should_prefetch = this.read().should_prefetch(); if should_prefetch { - let handle = some_or_return_with!(handle.upgrade(), OnChanged::Abort); - log::debug!("Prefetching"); - let _ = - observable_state.spawn_fetch_more_task(&handle, &tokio_rt, prefetch_limit); + let env = some_or_return_with!(env.upgrade(), OnChanged::Abort); + let (reaction, state_effect) = + this.spawn_fetch_more_task(&rt, &env, prefetch_limit); + log::debug!("Prefetching: {reaction:?} {state_effect:?}"); } + log::debug!("Suspending on_should_prefetch"); OnChanged::Continue } }) @@ -79,36 +81,32 @@ pub fn on_should_prefetch( } pub fn on_collection_state_changed( - collection_state: &Arc, - observable_state: Weak, + this: Weak, + collection_state: &Observer, ) -> impl Future + Send + 'static { let mut collection_state_sub = collection_state.subscribe_changed(); async move { - log::debug!("on_collection_state_changed"); + log::debug!("Starting on_collection_state_changed"); loop { - { - let Some(observable_state) = observable_state.upgrade() else { - // Observable has been dropped. - break; - }; - let mut collection_uid = { - let state = collection_state_sub.read_ack(); - // We are only interested in the collection UID if the collection is ready, - // even though it is available in other states as well. - match &*state { - collection::State::Ready { entity, .. } => Some(entity.hdr.uid.clone()), - _ => None, - } - }; - let _ = observable_state.update_collection_uid(&mut collection_uid); - if observable_state.reset_fetched().is_ok() { - log::debug!("Fetched results have been reset"); - } - } + log::debug!("Suspending on_collection_state_changed"); if collection_state_sub.changed().await.is_err() { - // Publisher has been dropped. + // No publisher(s). break; } + log::debug!("Resuming on_collection_state_changed"); + + let this = some_or_break!(this.upgrade()); + + let mut collection_uid = { + let state = collection_state_sub.read_ack(); + // We are only interested in the collection UID if the collection is ready, + // even though it is available in other states as well. + match &*state { + collection::State::Ready { entity, .. } => Some(entity.hdr.uid.clone()), + _ => None, + } + }; + this.update_collection_uid(&mut collection_uid); } } } diff --git a/crates/repo/src/tag/mod.rs b/crates/repo/src/tag/mod.rs index a4fea4df..0c125b6c 100644 --- a/crates/repo/src/tag/mod.rs +++ b/crates/repo/src/tag/mod.rs @@ -9,37 +9,86 @@ fn dedup_facets(facets: &mut Vec>) { facets.dedup(); } +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub enum SelectTags { + /// Both faceted and non-faceted tags. + #[default] + All, + /// Only faceted tags. + /// + /// Excludes all non-faceted tags. + Faceted, +} + #[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct CountParams<'a> { - pub facets: Option>>, - pub include_non_faceted_tags: Option, + pub tags: SelectTags, + pub include_facets: Option>>, + pub exclude_facets: Vec>, pub ordering: Vec, } impl<'a> CountParams<'a> { - pub fn dedup_facets(&mut self) { - if let Some(ref mut facets) = self.facets { - dedup_facets(facets); + #[must_use] + pub const fn all(ordering: Vec) -> Self { + Self { + tags: SelectTags::All, + include_facets: None, + exclude_facets: Vec::new(), + ordering, } } #[must_use] - pub fn include_non_faceted_tags(&self) -> bool { - self.include_non_faceted_tags.unwrap_or(true) + pub const fn all_faceted(ordering: Vec) -> Self { + Self { + tags: SelectTags::Faceted, + include_facets: None, + exclude_facets: Vec::new(), + ordering, + } + } + + #[must_use] + pub const fn all_non_faceted(ordering: Vec) -> Self { + Self { + tags: SelectTags::All, + include_facets: Some(vec![]), + exclude_facets: Vec::new(), + ordering, + } + } + + pub fn dedup_facets(&mut self) { + if let Some(include_facets) = &mut self.include_facets { + dedup_facets(include_facets); + } + dedup_facets(&mut self.exclude_facets); } } #[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct FacetCountParams<'a> { - pub facets: Option>>, + pub include_facets: Option>>, + pub exclude_facets: Vec>, pub ordering: Vec, } impl<'a> FacetCountParams<'a> { + #[must_use] + pub const fn all(ordering: Vec) -> Self { + Self { + include_facets: None, + exclude_facets: Vec::new(), + ordering, + } + } + pub fn dedup_facets(&mut self) { - if let Some(ref mut facets) = self.facets { - dedup_facets(facets); + if let Some(include_facets) = &mut self.include_facets { + dedup_facets(include_facets); } + dedup_facets(&mut self.exclude_facets); } } diff --git a/file-collection-app/src/library/mod.rs b/file-collection-app/src/library/mod.rs index bc602e2a..02e229d6 100644 --- a/file-collection-app/src/library/mod.rs +++ b/file-collection-app/src/library/mod.rs @@ -4,7 +4,7 @@ use std::{path::PathBuf, sync::Arc}; use aoide::{ - desktop_app::{collection::SynchronizeVfsTask, Handle, ObservableReader, StateUnchanged}, + desktop_app::{collection::SynchronizingVfsTask, rt, env, ObservableReader, StateUnchanged}, media::content::ContentPath, util::fs::DirPath, CollectionUid, @@ -96,7 +96,7 @@ impl StateObservables { pub struct State { pub music_dir: Option>, pub collection: collection::State, - pub pending_music_dir_sync_task: Option, + pub pending_music_dir_sync_task: Option, } impl State { @@ -124,7 +124,7 @@ impl State { pub struct CurrentState<'a> { music_dir: Option<&'a DirPath<'static>>, collection: &'a collection::State, - pending_music_dir_sync_task: Option<&'a SynchronizeVfsTask>, + pending_music_dir_sync_task: Option<&'a SynchronizingVfsTask>, track_search: Ref<'a, track_search::State>, } @@ -184,27 +184,27 @@ impl CurrentState<'_> { } } -/// Library state with a handle to the runtime environment +/// Library state with a env to the runtime environment #[allow(missing_debug_implementations)] pub struct Library { - handle: Handle, + env: env::Handle, state_observables: StateObservables, state: State, } impl Library { #[must_use] - pub fn new(handle: Handle, initial_settings: settings::State) -> Self { + pub fn new(env: env::Handle, initial_settings: settings::State) -> Self { Self { - handle, + env, state_observables: StateObservables::new(initial_settings), state: Default::default(), } } #[must_use] - pub const fn handle(&self) -> &Handle { - &self.handle + pub const fn env(&self) -> &env::Handle { + &self.env } #[must_use] @@ -358,7 +358,7 @@ impl Library { } log::info!("Spawning synchronize music directory task"); self.state.pending_music_dir_sync_task = - SynchronizeVfsTask::spawn(rt, &self.handle, &self.state_observables.collection).ok(); + SynchronizingVfsTask::spawn(rt, &self.env, &self.state_observables.collection).ok(); let Some(task) = &self.state.pending_music_dir_sync_task else { return rejected; }; @@ -427,11 +427,11 @@ impl Library { return ActionResponse::Rejected; } let collection_uid = self.state.collection.entity_uid().expect("Some").clone(); - let handle = self.handle.clone(); + let env = self.env.clone(); let event_emitter = event_emitter.clone(); rt.spawn(async move { let result = aoide::backend_embedded::media::tracker::count_sources_in_directories( - handle.db_gatekeeper(), + env.db_gatekeeper(), collection_uid.clone(), params.clone(), ) @@ -452,7 +452,7 @@ impl Library { let Ok((task, continuation)) = self .state_observables .collection - .refresh_from_db_task(&self.handle) + .spawn_refresh_from_db_task(rt, &self.env) else { return ActionResponse::Rejected; }; @@ -490,11 +490,11 @@ impl Library { #[allow(clippy::must_use_candidate)] pub fn fetch_more_track_search_results( &self, - tokio_rt: &tokio::runtime::Handle, + rt: &tokio::runtime::Handle, ) -> ActionResponse { match self.state_observables.track_search.spawn_fetch_more_task( - &self.handle, - tokio_rt, + rt, + &self.env, Some(track_search::DEFAULT_PREFETCH_LIMIT), ) { Ok(()) => ActionResponse::Accepted, @@ -503,52 +503,53 @@ impl Library { } /// Spawn reactive background tasks - pub fn spawn_background_tasks(&self, tokio_rt: &tokio::runtime::Handle, settings_dir: PathBuf) { - tokio_rt.spawn(settings::tasklet::on_state_changed_save_to_file( - self.state_observables.settings.subscribe_changed(), + pub fn spawn_background_tasks(&self, rt: &tokio::runtime::Handle, settings_dir: PathBuf) { + rt.spawn(settings::tasklet::on_state_changed_save_to_file( + &self.state_observables.settings.observe(), settings_dir, |err| { log::error!("Failed to save settings to file: {err}"); }, )); - tokio_rt.spawn(collection::tasklet::on_settings_state_changed( + rt.spawn(collection::tasklet::on_settings_state_changed( &self.state_observables.settings, Arc::downgrade(&self.state_observables.collection), - Handle::downgrade(&self.handle), + tokio::runtime::Handle::downgrade(rt), + env::Handle::downgrade(&self.env), collection::RESTORE_ENTITY_STRATEGY, collection::NESTED_MUSIC_DIRS_STRATEGY, )); - tokio_rt.spawn(track_search::tasklet::on_collection_state_changed( + rt.spawn(track_search::tasklet::on_collection_state_changed( &self.state_observables.collection, Arc::downgrade(&self.state_observables.track_search), )); - tokio_rt.spawn(track_search::tasklet::on_should_prefetch( + rt.spawn(track_search::tasklet::on_should_prefetch( &self.state_observables.track_search, - Handle::downgrade(&self.handle), - tokio_rt.clone(), + tokio::runtime::Handle::downgrade(rt), + env::Handle::downgrade(&self.env), Some(track_search::DEFAULT_PREFETCH_LIMIT), )); } - pub fn spawn_event_tasks(&self, tokio_rt: &tokio::runtime::Handle, event_emitter: &E) + pub fn spawn_event_tasks(&self, rt: &tokio::runtime::Handle, event_emitter: &E) where E: EventEmitter + Clone + 'static, { - tokio_rt.spawn({ + rt.spawn({ let subscriber = self.state_observables.settings.subscribe_changed(); let event_emitter = event_emitter.clone(); async move { settings::watch_state(subscriber, event_emitter).await; } }); - tokio_rt.spawn({ + rt.spawn({ let subscriber = self.state_observables.collection.subscribe_changed(); let event_emitter = event_emitter.clone(); async move { collection::watch_state(subscriber, event_emitter).await; } }); - tokio_rt.spawn({ + rt.spawn({ let subscriber = self.state_observables.track_search.subscribe_changed(); let event_emitter = event_emitter.clone(); async move { diff --git a/file-collection-app/src/main.rs b/file-collection-app/src/main.rs index c8131893..efdcda51 100644 --- a/file-collection-app/src/main.rs +++ b/file-collection-app/src/main.rs @@ -142,14 +142,14 @@ async fn main() { } }; log::debug!("Commissioning aoide library backend: {aoide_db_config:?}"); - let aoide_handle = match aoide::desktop_app::Handle::commission(&aoide_db_config) { + let aoide_env = match aoide::desktop_app::env::Handle::commission(&aoide_db_config) { Ok(library_backend) => library_backend, Err(err) => { log::error!("Failed to commission aoide library backend: {err}"); return; } }; - let library = Library::new(aoide_handle, aoide_initial_settings); + let library = Library::new(aoide_env, aoide_initial_settings); let rt = match tokio::runtime::Handle::try_current() { Ok(handle) => handle,