From e345b5dd7b7deb5a31d5b00cb82b3326a077da74 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Sat, 14 Sep 2024 00:03:03 +0200 Subject: [PATCH] Get back to Recovering syncing when we haven't sync for a while --- .../src/room_list_service/mod.rs | 31 +-- .../src/room_list_service/state.rs | 214 ++++++++++++++---- 2 files changed, 192 insertions(+), 53 deletions(-) diff --git a/crates/matrix-sdk-ui/src/room_list_service/mod.rs b/crates/matrix-sdk-ui/src/room_list_service/mod.rs index 7f3f27b5aa1..81ce260947d 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/mod.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/mod.rs @@ -60,7 +60,7 @@ mod state; use std::{sync::Arc, time::Duration}; use async_stream::stream; -use eyeball::{SharedObservable, Subscriber}; +use eyeball::Subscriber; use futures_util::{pin_mut, Stream, StreamExt}; use matrix_sdk::{ event_cache::EventCacheError, Client, Error as SlidingSyncError, SlidingSync, SlidingSyncList, @@ -89,7 +89,7 @@ pub struct RoomListService { /// The current state of the `RoomListService`. /// /// `RoomListService` is a simple state-machine. - state: SharedObservable, + state_machine: StateMachine, } impl RoomListService { @@ -172,7 +172,7 @@ impl RoomListService { // Eagerly subscribe the event cache to sync responses. client.event_cache().subscribe()?; - Ok(Self { client, sliding_sync, state: SharedObservable::new(State::Init) }) + Ok(Self { client, sliding_sync, state_machine: StateMachine::new() }) } /// Start to sync the room list. @@ -208,7 +208,7 @@ impl RoomListService { debug!("Run a sync iteration"); // Calculate the next state, and run the associated actions. - let next_state = self.state.get().next(&self.sliding_sync).await?; + let next_state = self.state_machine.next(&self.sliding_sync).await?; // Do the sync. match sync.next().await { @@ -217,7 +217,7 @@ impl RoomListService { debug!(state = ?next_state, "New state"); // Update the state. - self.state.set(next_state); + self.state_machine.set(next_state); yield Ok(()); } @@ -227,7 +227,7 @@ impl RoomListService { debug!(expected_state = ?next_state, "New state is an error"); let next_state = State::Error { from: Box::new(next_state) }; - self.state.set(next_state); + self.state_machine.set(next_state); yield Err(Error::SlidingSync(error)); @@ -239,7 +239,7 @@ impl RoomListService { debug!(expected_state = ?next_state, "New state is a termination"); let next_state = State::Terminated { from: Box::new(next_state) }; - self.state.set(next_state); + self.state_machine.set(next_state); break; } @@ -286,8 +286,8 @@ impl RoomListService { // when the session is forced to expire, the state remains `Terminated`, thus // the actions aren't executed as expected. Consequently, let's update the // state. - if let State::Terminated { from } = self.state.get() { - self.state.set(State::Error { from }); + if let State::Terminated { from } = self.state_machine.get() { + self.state_machine.set(State::Error { from }); } } @@ -341,7 +341,7 @@ impl RoomListService { // Update the `current_state`. current_state = next_state; } else { - // Something is broken with `self.state`. Let's stop this stream too. + // Something is broken with the state. Let's stop this stream too. break; } } @@ -355,7 +355,7 @@ impl RoomListService { /// Get a subscriber to the state. pub fn state(&self) -> Subscriber { - self.state.subscribe() + self.state_machine.subscribe() } async fn list_for(&self, sliding_sync_list_name: &str) -> Result { @@ -396,7 +396,7 @@ impl RoomListService { settings.required_state.push((StateEventType::RoomCreate, "".to_owned())); } - let cancel_in_flight_request = match self.state.get() { + let cancel_in_flight_request = match self.state_machine.get() { State::Init | State::Recovering | State::Error { .. } | State::Terminated { .. } => { false } @@ -617,13 +617,16 @@ mod tests { let _ = sync.next().await; // State is `Terminated`, as expected! - assert_eq!(room_list.state.get(), State::Terminated { from: Box::new(State::Running) }); + assert_eq!( + room_list.state_machine.get(), + State::Terminated { from: Box::new(State::Running) } + ); // Now, let's make the sliding sync session to expire. room_list.expire_sync_session().await; // State is `Error`, as a regular session expiration would generate! - assert_eq!(room_list.state.get(), State::Error { from: Box::new(State::Running) }); + assert_eq!(room_list.state_machine.get(), State::Error { from: Box::new(State::Running) }); Ok(()) } diff --git a/crates/matrix-sdk-ui/src/room_list_service/state.rs b/crates/matrix-sdk-ui/src/room_list_service/state.rs index edf189cbb64..31d6bc4f9fd 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/state.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/state.rs @@ -14,15 +14,20 @@ //! States and actions for the `RoomList` state machine. -use std::future::ready; +use std::{ + future::ready, + sync::Mutex, + time::{Duration, Instant}, +}; +use eyeball::{SharedObservable, Subscriber}; use matrix_sdk::{sliding_sync::Range, SlidingSync, SlidingSyncMode}; use super::Error; pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms"; -/// The state of the [`super::RoomList`]' state machine. +/// The state of the [`super::RoomList`]. #[derive(Clone, Debug, PartialEq)] pub enum State { /// That's the first initial state. @@ -46,13 +51,71 @@ pub enum State { Terminated { from: Box }, } -impl State { +/// Default value for `StateMachine::state_lifespan`. +const DEFAULT_STATE_LIFESPAN: Duration = Duration::from_secs(1800); + +/// The state machine used to transition between the [`State`]s. +#[derive(Debug)] +pub struct StateMachine { + /// The current state of the `RoomListService`. + state: SharedObservable, + + /// Last time the state has been updated. + /// + /// When the state has not been updated since a long time, we want to enter + /// the [`State::Recovering`] state. Why do we need to do that? Because in + /// some cases, the user might have received many updates between two + /// distant syncs. If the sliding sync list range was too large, like + /// 0..=499, the next sync is likely to be heavy and potentially slow. + /// In this case, it's preferable to jump back onto `Recovering`, which will + /// reset the range, so that the next sync will be fast for the client. + /// + /// To be used in coordination with `Self::state_lifespan`. + /// + /// This mutex is only taken for short periods of time, so it's sync. + last_state_update_time: Mutex, + + /// The maximum time before considering the state as “too old”. + /// + /// To be used in coordination with `Self::last_state_update_time`. + state_lifespan: Duration, +} + +impl StateMachine { + pub(super) fn new() -> Self { + StateMachine { + state: SharedObservable::new(State::Init), + last_state_update_time: Mutex::new(Instant::now()), + state_lifespan: DEFAULT_STATE_LIFESPAN, + } + } + + /// Get the current state. + pub(super) fn get(&self) -> State { + self.state.get() + } + + /// Set the new state. + /// + /// Setting a new state will update `Self::last_state_update`. + pub(super) fn set(&self, state: State) { + let mut last_state_update_time = self.last_state_update_time.lock().unwrap(); + *last_state_update_time = Instant::now(); + + self.state.set(state); + } + + /// Subscribe to state updates. + pub fn subscribe(&self) -> Subscriber { + self.state.subscribe() + } + /// Transition to the next state, and execute the associated transition's /// [`Actions`]. - pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result { + pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result { use State::*; - let next_state = match self { + let next_state = match self.get() { Init => SettingUp, SettingUp | Recovering => { @@ -60,7 +123,18 @@ impl State { Running } - Running => Running, + Running => { + // We haven't changed the state for a while, we go back to `Recovering` to avoid + // requesting potentially large data. See `Self::last_state_update` to learn + // the details. + if self.last_state_update_time.lock().unwrap().elapsed() > self.state_lifespan { + set_all_rooms_to_selective_sync_mode(sliding_sync).await?; + + Recovering + } else { + Running + } + } Error { from: previous_state } | Terminated { from: previous_state } => { match previous_state.as_ref() { @@ -122,6 +196,7 @@ pub const ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE: u32 = 100; #[cfg(test)] mod tests { use matrix_sdk_test::async_test; + use tokio::time::sleep; use super::{super::tests::new_room_list, *}; @@ -130,94 +205,155 @@ mod tests { let room_list = new_room_list().await?; let sliding_sync = room_list.sliding_sync(); - // First state. - let state = State::Init; + let state_machine = StateMachine::new(); // Hypothetical error. { - let state = State::Error { from: Box::new(state.clone()) }.next(sliding_sync).await?; + state_machine.set(State::Error { from: Box::new(state_machine.get()) }); // Back to the previous state. - assert_eq!(state, State::Init); + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Init); } // Hypothetical termination. { - let state = - State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?; + state_machine.set(State::Terminated { from: Box::new(state_machine.get()) }); // Back to the previous state. - assert_eq!(state, State::Init); + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Init); } // Next state. - let state = state.next(sliding_sync).await?; - assert_eq!(state, State::SettingUp); + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::SettingUp); // Hypothetical error. { - let state = State::Error { from: Box::new(state.clone()) }.next(sliding_sync).await?; + state_machine.set(State::Error { from: Box::new(state_machine.get()) }); // Back to the previous state. - assert_eq!(state, State::SettingUp); + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::SettingUp); } // Hypothetical termination. { - let state = - State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?; + state_machine.set(State::Terminated { from: Box::new(state_machine.get()) }); // Back to the previous state. - assert_eq!(state, State::SettingUp); + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::SettingUp); } // Next state. - let state = state.next(sliding_sync).await?; - assert_eq!(state, State::Running); + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Running); // Hypothetical error. { - let state = State::Error { from: Box::new(state.clone()) }.next(sliding_sync).await?; + state_machine.set(State::Error { from: Box::new(state_machine.get()) }); // Jump to the **recovering** state! - assert_eq!(state, State::Recovering); - - let state = state.next(sliding_sync).await?; + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Recovering); // Now, back to the previous state. - assert_eq!(state, State::Running); + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Running); } // Hypothetical termination. { - let state = - State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?; + state_machine.set(State::Terminated { from: Box::new(state_machine.get()) }); // Jump to the **recovering** state! - assert_eq!(state, State::Recovering); - - let state = state.next(sliding_sync).await?; + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Recovering); // Now, back to the previous state. - assert_eq!(state, State::Running); + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Running); } // Hypothetical error when recovering. { - let state = - State::Error { from: Box::new(State::Recovering) }.next(sliding_sync).await?; + state_machine.set(State::Error { from: Box::new(State::Recovering) }); // Back to the previous state. - assert_eq!(state, State::Recovering); + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Recovering); } // Hypothetical termination when recovering. { - let state = - State::Terminated { from: Box::new(State::Recovering) }.next(sliding_sync).await?; + state_machine.set(State::Terminated { from: Box::new(State::Recovering) }); // Back to the previous state. - assert_eq!(state, State::Recovering); + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Recovering); + } + + Ok(()) + } + + #[async_test] + async fn test_recover_state_after_delay() -> Result<(), Error> { + let room_list = new_room_list().await?; + let sliding_sync = room_list.sliding_sync(); + + let mut state_machine = StateMachine::new(); + state_machine.state_lifespan = Duration::from_millis(50); + + { + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::SettingUp); + + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Running); + + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Running); + + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Running); + } + + // Time passes. + sleep(Duration::from_millis(100)).await; + + { + // Time has elapsed, time to recover. + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Recovering); + + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Running); + + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Running); + + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Running); + } + + // Time passes, again. Just to test everything is going well. + sleep(Duration::from_millis(100)).await; + + { + // Time has elapsed, time to recover. + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Recovering); + + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Running); + + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Running); + + state_machine.set(state_machine.next(sliding_sync).await?); + assert_eq!(state_machine.get(), State::Running); } Ok(())