Skip to content

Commit

Permalink
Get back to Recovering syncing when we haven't sync for a while
Browse files Browse the repository at this point in the history
  • Loading branch information
MatMaul committed Oct 8, 2024
1 parent 4bcb9b7 commit e345b5d
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 53 deletions.
31 changes: 17 additions & 14 deletions crates/matrix-sdk-ui/src/room_list_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod state;
use std::{sync::Arc, time::Duration};

use async_stream::stream;
use eyeball::{SharedObservable, Subscriber};
use eyeball::Subscriber;
use futures_util::{pin_mut, Stream, StreamExt};
use matrix_sdk::{
event_cache::EventCacheError, Client, Error as SlidingSyncError, SlidingSync, SlidingSyncList,
Expand Down Expand Up @@ -89,7 +89,7 @@ pub struct RoomListService {
/// The current state of the `RoomListService`.
///
/// `RoomListService` is a simple state-machine.
state: SharedObservable<State>,
state_machine: StateMachine,
}

impl RoomListService {
Expand Down Expand Up @@ -172,7 +172,7 @@ impl RoomListService {
// Eagerly subscribe the event cache to sync responses.
client.event_cache().subscribe()?;

Ok(Self { client, sliding_sync, state: SharedObservable::new(State::Init) })
Ok(Self { client, sliding_sync, state_machine: StateMachine::new() })
}

/// Start to sync the room list.
Expand Down Expand Up @@ -208,7 +208,7 @@ impl RoomListService {
debug!("Run a sync iteration");

// Calculate the next state, and run the associated actions.
let next_state = self.state.get().next(&self.sliding_sync).await?;
let next_state = self.state_machine.next(&self.sliding_sync).await?;

// Do the sync.
match sync.next().await {
Expand All @@ -217,7 +217,7 @@ impl RoomListService {
debug!(state = ?next_state, "New state");

// Update the state.
self.state.set(next_state);
self.state_machine.set(next_state);

yield Ok(());
}
Expand All @@ -227,7 +227,7 @@ impl RoomListService {
debug!(expected_state = ?next_state, "New state is an error");

let next_state = State::Error { from: Box::new(next_state) };
self.state.set(next_state);
self.state_machine.set(next_state);

yield Err(Error::SlidingSync(error));

Expand All @@ -239,7 +239,7 @@ impl RoomListService {
debug!(expected_state = ?next_state, "New state is a termination");

let next_state = State::Terminated { from: Box::new(next_state) };
self.state.set(next_state);
self.state_machine.set(next_state);

break;
}
Expand Down Expand Up @@ -286,8 +286,8 @@ impl RoomListService {
// when the session is forced to expire, the state remains `Terminated`, thus
// the actions aren't executed as expected. Consequently, let's update the
// state.
if let State::Terminated { from } = self.state.get() {
self.state.set(State::Error { from });
if let State::Terminated { from } = self.state_machine.get() {
self.state_machine.set(State::Error { from });
}
}

Expand Down Expand Up @@ -341,7 +341,7 @@ impl RoomListService {
// Update the `current_state`.
current_state = next_state;
} else {
// Something is broken with `self.state`. Let's stop this stream too.
// Something is broken with the state. Let's stop this stream too.
break;
}
}
Expand All @@ -355,7 +355,7 @@ impl RoomListService {

/// Get a subscriber to the state.
pub fn state(&self) -> Subscriber<State> {
self.state.subscribe()
self.state_machine.subscribe()
}

async fn list_for(&self, sliding_sync_list_name: &str) -> Result<RoomList, Error> {
Expand Down Expand Up @@ -396,7 +396,7 @@ impl RoomListService {
settings.required_state.push((StateEventType::RoomCreate, "".to_owned()));
}

let cancel_in_flight_request = match self.state.get() {
let cancel_in_flight_request = match self.state_machine.get() {
State::Init | State::Recovering | State::Error { .. } | State::Terminated { .. } => {
false
}
Expand Down Expand Up @@ -617,13 +617,16 @@ mod tests {
let _ = sync.next().await;

// State is `Terminated`, as expected!
assert_eq!(room_list.state.get(), State::Terminated { from: Box::new(State::Running) });
assert_eq!(
room_list.state_machine.get(),
State::Terminated { from: Box::new(State::Running) }
);

// Now, let's make the sliding sync session to expire.
room_list.expire_sync_session().await;

// State is `Error`, as a regular session expiration would generate!
assert_eq!(room_list.state.get(), State::Error { from: Box::new(State::Running) });
assert_eq!(room_list.state_machine.get(), State::Error { from: Box::new(State::Running) });

Ok(())
}
Expand Down
Loading

0 comments on commit e345b5d

Please sign in to comment.