Skip to content

Commit

Permalink
desktop-app: Revise state and task management
Browse files Browse the repository at this point in the history
  • Loading branch information
uklotzde committed Oct 22, 2024
1 parent 3340500 commit 68df34b
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 90 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ jiff = "0.1.13"
log = "0.4.22"
mime = "0.3.17"
regex = "1.11.0"
serde = "1.0.210"
serde = "1.0.211"
serde_json = "1.0.132"
static_assertions = "1.1.0"
strum = "0.26.3"
tantivy = "0.22.0"
time = "0.3.36"
thiserror = "1.0.64"
tokio = "1.40.0"
tokio = "1.41.0"
url = "2.5.2"

[workspace.lints.rust]
Expand Down
64 changes: 46 additions & 18 deletions crates/desktop-app/src/collection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use std::{
};

use anyhow::anyhow;
use discro::{Publisher, Subscriber};
use discro::{Observer, Publisher};
use tokio::task::JoinHandle;
use url::Url;

use aoide_backend_embedded::{
Expand Down Expand Up @@ -680,7 +681,7 @@ impl State {

fn synchronize_vfs_task_joined(
&mut self,
joined_task: JoinedTask<SynchronizeVfsResult>,
joined_task: SynchronizeVfsTaskJoined,
continuation: SynchronizeVfsTaskContinuation,
) -> Result<Option<Outcome>, StateUnchanged> {
let SynchronizeVfsTaskContinuation { pending_state } = continuation;
Expand Down Expand Up @@ -758,13 +759,16 @@ pub struct RefreshFromDbTaskContinuation {
pending_state: State,
}

/// Context for applying the corresponding [`JoinedTask`].
#[derive(Debug)]
pub struct SynchronizeVfsTaskContinuation {
pending_state: State,
}

pub type SynchronizeVfsResult = anyhow::Result<batch::synchronize_collection_vfs::Outcome>;

pub type SynchronizeVfsTaskJoined = JoinedTask<SynchronizeVfsResult>;

/// Manages the mutable, observable state
#[derive(Debug)]
pub struct ObservableState(Observable<State>);
Expand Down Expand Up @@ -886,7 +890,7 @@ impl ObservableState {

fn synchronize_vfs_task_joined(
&self,
joined_task: JoinedTask<SynchronizeVfsResult>,
joined_task: SynchronizeVfsTaskJoined,
continuation: SynchronizeVfsTaskContinuation,
) -> Result<Option<Outcome>, StateUnchanged> {
modify_observable_state(&self.0, |state| {
Expand Down Expand Up @@ -998,13 +1002,16 @@ where
.map_err(Into::into)
}

/// Background task.
///
/// Both progress and outcome are observable.
#[derive(Debug)]
pub struct SynchronizeVfsTask {
started_at: Instant,
progress: Subscriber<Option<Progress>>,
outcome: Subscriber<Option<Outcome>>,
progress: Observer<Option<Progress>>,
outcome: Observer<Option<Outcome>>,
abort_flag: Arc<AtomicBool>,
abort_handle: tokio::task::AbortHandle,
supervisor_handle: JoinHandle<()>,
}

impl SynchronizeVfsTask {
Expand All @@ -1016,9 +1023,9 @@ impl SynchronizeVfsTask {
) -> Result<Self, StateUnchanged> {
let started_at = Instant::now();
let progress_pub = Publisher::new(None);
let progress = progress_pub.subscribe();
let progress = progress_pub.observe();
let outcome_pub = Publisher::new(None);
let outcome = outcome_pub.subscribe();
let outcome = outcome_pub.observe();
let report_progress_fn = {
// TODO: How to avoid wrapping the publisher?
let progress_pub = Arc::new(Mutex::new(progress_pub));
Expand All @@ -1030,25 +1037,24 @@ impl SynchronizeVfsTask {
let (task, continuation) =
synchronize_vfs_task(state, handle, report_progress_fn, Arc::clone(&abort_flag))?;
let join_handle = rt.spawn(task);
let abort_handle = join_handle.abort_handle();
let state = Arc::clone(state);
// The join task is responsible for updating the state eventually and
// cannot be aborted! It completes after the main task completed.
let join_task = async move {
// The supervisor task is responsible for updating the state eventually.
// It finishes after the main task finished.
let supervisor_task = async move {
let joined_task = JoinedTask::join(join_handle).await;
log::debug!("Synchronize music directory task joined: {joined_task:?}");
let result = state.synchronize_vfs_task_joined(joined_task, continuation);
if let Ok(outcome) = result {
outcome_pub.write(outcome);
}
};
rt.spawn(join_task);
let supervisor_handle = rt.spawn(supervisor_task);
Ok(Self {
started_at,
progress,
outcome,
abort_flag,
abort_handle,
supervisor_handle,
})
}

Expand All @@ -1058,23 +1064,45 @@ impl SynchronizeVfsTask {
}

#[must_use]
pub const fn progress(&self) -> &Subscriber<Option<Progress>> {
pub const fn progress(&self) -> &Observer<Option<Progress>> {
&self.progress
}

#[must_use]
pub const fn outcome(&self) -> &Subscriber<Option<Outcome>> {
pub const fn outcome(&self) -> &Observer<Option<Outcome>> {
&self.outcome
}

#[must_use]
pub fn is_finished(&self) -> bool {
self.abort_handle.is_finished()
self.supervisor_handle.is_finished()
}

pub fn abort(&self) {
self.abort_flag.store(true, Ordering::Relaxed);
self.abort_handle.abort();
// Both tasks should not be cancelled! The inner task will finish
// ASAP after the abort flag has been set.
}

pub async fn join(self) -> anyhow::Result<Option<Outcome>> {
let Self {
outcome,
supervisor_handle,
..
} = self;
supervisor_handle
.await
.map(|()| {
let outcome = outcome.read().clone();
debug_assert!(outcome.is_some());
outcome
})
.map_err(|err| {
debug_assert!(outcome.read().is_none());
// The supervisor task is never cancelled.
debug_assert!(!err.is_cancelled());
err.into()
})
}
}

Expand Down
36 changes: 17 additions & 19 deletions crates/desktop-app/src/track/repo_search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use aoide_core::{
use aoide_core_api::{track::search::Params, Pagination};

use crate::{
modify_observable_state, Handle, JoinedTask, Observable, ObservableReader, ObservableRef,
StateUnchanged,
modify_observable_state, Handle, Observable, ObservableReader, ObservableRef, StateUnchanged,
};

pub mod tasklet;
Expand Down Expand Up @@ -211,15 +210,15 @@ impl FetchState {
can_fetch_more: bool,
) -> Result<(), StateUnchanged> {
let num_fetched_entities = fetched_entities.len();
log::debug!("Fetching succeeded with {num_fetched_entities} newly fetched entities");
log::debug!("Fetching more succeeded with {num_fetched_entities} newly fetched entities");

let Self::Pending {
fetched_entities_before,
pending_since: _,
} = self
else {
// Not applicable
log::error!("Not pending when fetching succeeded");
log::error!("Not pending when fetching more succeeded");
return Err(StateUnchanged);
};
let expected_offset = fetched_entities_before.as_ref().map_or(0, Vec::len);
Expand Down Expand Up @@ -270,14 +269,14 @@ impl FetchState {

#[allow(clippy::needless_pass_by_value)]
fn fetch_more_failed(&mut self, error: anyhow::Error) -> Result<(), StateUnchanged> {
log::warn!("Fetching failed: {error}");
log::warn!("Fetching more failed: {error}");
let Self::Pending {
fetched_entities_before,
pending_since: _,
} = self
else {
// No effect
log::error!("Not pending when fetching failed");
log::error!("Not pending when fetching more failed");
return Err(StateUnchanged);
};
let fetched_entities_before = std::mem::take(fetched_entities_before);
Expand All @@ -288,15 +287,16 @@ impl FetchState {
Ok(())
}

fn fetch_more_aborted(&mut self) -> Result<(), StateUnchanged> {
log::debug!("Fetching aborted");
#[allow(dead_code)] // Currently the task cannot be cancelled.
fn fetch_more_cancelled(&mut self) -> Result<(), StateUnchanged> {
log::debug!("Fetching more cancelled");
let Self::Pending {
fetched_entities_before,
pending_since: _,
} = self
else {
// No effect
log::error!("Not pending when fetching aborted");
log::error!("Not pending when fetching more cancelled");
return Err(StateUnchanged);
};
let fetched_entities_before = std::mem::take(fetched_entities_before);
Expand Down Expand Up @@ -605,9 +605,9 @@ impl State {
self.fetch.fetch_more()
}

fn fetch_more_task_joined(
fn fetch_more_task_completed(
&mut self,
joined_tasked: JoinedTask<FetchMoreResult>,
result: FetchMoreResult,
continuation: FetchMoreTaskContinuation,
) -> Result<(), StateUnchanged> {
let FetchMoreTaskContinuation {
Expand All @@ -625,9 +625,8 @@ impl State {
// No effect.
return Err(StateUnchanged);
}
match joined_tasked {
JoinedTask::Cancelled => self.fetch.fetch_more_aborted(),
JoinedTask::Completed(Ok(fetched_entities)) => {
match result {
Ok(fetched_entities) => {
let can_fetch_more = if let Some(limit) = limit {
limit.get() <= fetched_entities.len()
} else {
Expand All @@ -640,8 +639,7 @@ impl State {
can_fetch_more,
)
}
JoinedTask::Completed(Err(err)) => self.fetch.fetch_more_failed(err.into()),
JoinedTask::Panicked(err) => self.fetch.fetch_more_failed(err),
Err(err) => self.fetch.fetch_more_failed(err.into()),
}
}

Expand Down Expand Up @@ -762,13 +760,13 @@ impl ObservableState {
modify_observable_state(&self.0, |state| state.fetch_more_task(handle, fetch_limit))
}

pub fn fetch_more_task_joined(
pub fn fetch_more_task_completed(
&self,
joined_task: JoinedTask<FetchMoreResult>,
result: FetchMoreResult,
continuation: FetchMoreTaskContinuation,
) -> Result<(), StateUnchanged> {
modify_observable_state(&self.0, |state| {
state.fetch_more_task_joined(joined_task, continuation)
state.fetch_more_task_completed(result, continuation)
})
}

Expand Down
3 changes: 1 addition & 2 deletions crates/desktop-app/src/track/repo_search/tasklet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ pub fn on_should_prefetch(
{
log::debug!("Prefetching");
let result = task.await;
let _ =
observable_state.fetch_more_task_joined(result.into(), continuation);
let _ = observable_state.fetch_more_task_completed(result, continuation);
}
}
OnChanged::Continue
Expand Down
11 changes: 1 addition & 10 deletions file-collection-app/src/app/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl<'a> UpdateContext<'a> {
TrackSearchAction::FetchMore => {
memo_state.abort();
debug_assert!(matches!(memo_state, track_search::MemoState::Ready(_)));
library.spawn_fetch_more_track_search_results_task(rt, *msg_tx)
library.fetch_more_track_search_results(rt)
}
TrackSearchAction::AbortPendingStateChange => {
if matches!(memo_state, track_search::MemoState::Pending { .. }) {
Expand Down Expand Up @@ -320,15 +320,6 @@ impl<'a> UpdateContext<'a> {
ctx.request_repaint();
}
}
library::track_search::Event::FetchMoreTaskCompleted {
result,
continuation,
} => {
library.on_fetch_more_track_search_results_task_completed(
result,
continuation,
);
}
}
}
library::Event::MusicDirSyncProgress(progress) => {
Expand Down
Loading

0 comments on commit 68df34b

Please sign in to comment.