From 3928259bb5b45d6fb201b0b8a0859c9aa29f2f90 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 11 May 2023 17:36:29 +0200 Subject: [PATCH 1/6] bench: add restore session benchmark Signed-off-by: Benjamin Bouvier --- Cargo.lock | 2 + benchmarks/Cargo.toml | 8 +- benchmarks/benches/store_bench.rs | 124 ++++++++++++++++++++++++++++++ 3 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 benchmarks/benches/store_bench.rs diff --git a/Cargo.lock b/Cargo.lock index bfd602696e0..8931283dfef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -537,6 +537,8 @@ name = "benchmarks" version = "1.0.0" dependencies = [ "criterion", + "matrix-sdk", + "matrix-sdk-base", "matrix-sdk-crypto", "matrix-sdk-sled", "matrix-sdk-sqlite", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index fe065d358e0..a39272db690 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -9,10 +9,12 @@ publish = false [dependencies] criterion = { version = "0.4.0", features = ["async", "async_tokio", "html_reports"] } +matrix-sdk-base = { path = "../crates/matrix-sdk-base" } matrix-sdk-crypto = { path = "../crates/matrix-sdk-crypto", version = "0.6.0"} matrix-sdk-sqlite = { path = "../crates/matrix-sdk-sqlite", version = "0.1.0", default-features = false, features = ["crypto-store"] } -matrix-sdk-sled = { path = "../crates/matrix-sdk-sled", version = "0.2.0", default-features = false, features = ["crypto-store"] } +matrix-sdk-sled = { path = "../crates/matrix-sdk-sled", version = "0.2.0", features = ["crypto-store"] } matrix-sdk-test = { path = "../testing/matrix-sdk-test", version = "0.6.0"} +matrix-sdk = { path = "../crates/matrix-sdk" } ruma = { workspace = true } serde_json = { workspace = true } tempfile = "3.3.0" @@ -24,3 +26,7 @@ pprof = { version = "0.11.0", features = ["flamegraph", "criterion"] } [[bench]] name = "crypto_bench" harness = false + +[[bench]] +name = "store_bench" +harness = false diff --git a/benchmarks/benches/store_bench.rs b/benchmarks/benches/store_bench.rs new file mode 100644 index 00000000000..203c8c423f6 --- /dev/null +++ b/benchmarks/benches/store_bench.rs @@ -0,0 +1,124 @@ +use criterion::*; +use matrix_sdk::{config::StoreConfig, Client, RoomInfo, RoomState, Session, StateChanges}; +use matrix_sdk_base::{store::MemoryStore, StateStore as _}; +use matrix_sdk_sled::SledStateStore; +use matrix_sdk_sqlite::SqliteStateStore; +use ruma::{device_id, user_id, RoomId}; +use tokio::runtime::Builder; + +fn criterion() -> Criterion { + #[cfg(target_os = "linux")] + let criterion = Criterion::default().with_profiler(pprof::criterion::PProfProfiler::new( + 100, + pprof::criterion::Output::Flamegraph(None), + )); + + #[cfg(not(target_os = "linux"))] + let criterion = Criterion::default(); + + criterion +} + +/// Number of joined rooms in the benchmark. +const NUM_JOINED_ROOMS: usize = 10000; + +/// Number of stripped rooms in the benchmark. +const NUM_STRIPPED_JOINED_ROOMS: usize = 10000; + +pub fn restore_session(c: &mut Criterion) { + let runtime = Builder::new_multi_thread().build().expect("Can't create runtime"); + + // Create a fake list of changes, and a session to recover from. + let mut changes = StateChanges::default(); + + for i in 0..NUM_JOINED_ROOMS { + let room_id = RoomId::parse(format!("!room{i}:example.com")).unwrap().to_owned(); + changes.add_room(RoomInfo::new(&room_id, RoomState::Joined)); + } + + for i in 0..NUM_STRIPPED_JOINED_ROOMS { + let room_id = RoomId::parse(format!("!strippedroom{i}:example.com")).unwrap().to_owned(); + changes.add_stripped_room(RoomInfo::new(&room_id, RoomState::Joined)); + } + + let session = Session { + access_token: "OHEY".to_owned(), + refresh_token: None, + user_id: user_id!("@somebody:example.com").to_owned(), + device_id: device_id!("DEVICE_ID").to_owned(), + }; + + // Start the benchmark. + + let mut group = c.benchmark_group("Client reload"); + group.throughput(Throughput::Elements(100)); + + const NAME: &str = "restore a session"; + + // Memory + let mem_store = MemoryStore::new(); + runtime.block_on(mem_store.save_changes(&changes)).expect("initial filling of mem failed"); + + group.bench_with_input(BenchmarkId::new("memory store", NAME), &mem_store, |b, store| { + b.to_async(&runtime).iter(|| async { + let client = Client::builder() + .homeserver_url("https://matrix.example.com") + .store_config(StoreConfig::new().state_store(store.clone())) + .build() + .await + .expect("Can't build client"); + client.restore_session(session.clone()).await.expect("couldn't restore session"); + }) + }); + + // Sled + let sled_path = tempfile::tempdir().unwrap().path().to_path_buf(); + let sled_store = + SledStateStore::builder().path(sled_path).build().expect("Can't create sled store"); + runtime.block_on(sled_store.save_changes(&changes)).expect("initial filling of sled failed"); + + group.bench_with_input(BenchmarkId::new("sled store", NAME), &sled_store, |b, store| { + b.to_async(&runtime).iter(|| async { + let client = Client::builder() + .homeserver_url("https://matrix.example.com") + .store_config(StoreConfig::new().state_store(store.clone())) + .build() + .await + .expect("Can't build client"); + client.restore_session(session.clone()).await.expect("couldn't restore session"); + }) + }); + + // Sqlite + let sqlite_dir = tempfile::tempdir().unwrap(); + let sqlite_store = runtime.block_on(SqliteStateStore::open(sqlite_dir.path(), None)).unwrap(); + runtime + .block_on(sqlite_store.save_changes(&changes)) + .expect("initial filling of sqlite failed"); + + group.bench_with_input(BenchmarkId::new("sqlite store", NAME), &sqlite_store, |b, store| { + b.to_async(&runtime).iter(|| async { + let client = Client::builder() + .homeserver_url("https://matrix.example.com") + .store_config(StoreConfig::new().state_store(store.clone())) + .build() + .await + .expect("Can't build client"); + client.restore_session(session.clone()).await.expect("couldn't restore session"); + }) + }); + + { + let _guard = runtime.enter(); + drop(sqlite_store); + } + + group.finish() +} + +criterion_group! { + name = benches; + config = criterion(); + targets = restore_session +} +criterion_main!(benches); From b6302aca5c8b8123181b41dfbf4fe58db76dcb40 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 15 May 2023 16:53:32 +0200 Subject: [PATCH 2/6] bench: add benchmarks for encrypted stores too Signed-off-by: Benjamin Bouvier --- benchmarks/benches/store_bench.rs | 105 ++++++++++++++++++------------ 1 file changed, 65 insertions(+), 40 deletions(-) diff --git a/benchmarks/benches/store_bench.rs b/benchmarks/benches/store_bench.rs index 203c8c423f6..7bced77ede3 100644 --- a/benchmarks/benches/store_bench.rs +++ b/benchmarks/benches/store_bench.rs @@ -71,46 +71,71 @@ pub fn restore_session(c: &mut Criterion) { }) }); - // Sled - let sled_path = tempfile::tempdir().unwrap().path().to_path_buf(); - let sled_store = - SledStateStore::builder().path(sled_path).build().expect("Can't create sled store"); - runtime.block_on(sled_store.save_changes(&changes)).expect("initial filling of sled failed"); - - group.bench_with_input(BenchmarkId::new("sled store", NAME), &sled_store, |b, store| { - b.to_async(&runtime).iter(|| async { - let client = Client::builder() - .homeserver_url("https://matrix.example.com") - .store_config(StoreConfig::new().state_store(store.clone())) - .build() - .await - .expect("Can't build client"); - client.restore_session(session.clone()).await.expect("couldn't restore session"); - }) - }); - - // Sqlite - let sqlite_dir = tempfile::tempdir().unwrap(); - let sqlite_store = runtime.block_on(SqliteStateStore::open(sqlite_dir.path(), None)).unwrap(); - runtime - .block_on(sqlite_store.save_changes(&changes)) - .expect("initial filling of sqlite failed"); - - group.bench_with_input(BenchmarkId::new("sqlite store", NAME), &sqlite_store, |b, store| { - b.to_async(&runtime).iter(|| async { - let client = Client::builder() - .homeserver_url("https://matrix.example.com") - .store_config(StoreConfig::new().state_store(store.clone())) - .build() - .await - .expect("Can't build client"); - client.restore_session(session.clone()).await.expect("couldn't restore session"); - }) - }); - - { - let _guard = runtime.enter(); - drop(sqlite_store); + for encryption_password in [None, Some("hunter2")] { + let encrypted_suffix = if encryption_password.is_some() { "encrypted" } else { "clear" }; + + // Sled + let sled_path = tempfile::tempdir().unwrap().path().to_path_buf(); + let mut sled_store_builder = SledStateStore::builder().path(sled_path); + if let Some(password) = encryption_password { + sled_store_builder = sled_store_builder.passphrase(password.to_owned()); + } + let sled_store = sled_store_builder.build().expect("Can't create sled store"); + runtime + .block_on(sled_store.save_changes(&changes)) + .expect("initial filling of sled failed"); + + group.bench_with_input( + BenchmarkId::new(format!("sled store {encrypted_suffix}"), NAME), + &sled_store, + |b, store| { + b.to_async(&runtime).iter(|| async { + let client = Client::builder() + .homeserver_url("https://matrix.example.com") + .store_config(StoreConfig::new().state_store(store.clone())) + .build() + .await + .expect("Can't build client"); + client + .restore_session(session.clone()) + .await + .expect("couldn't restore session"); + }) + }, + ); + + // Sqlite + let sqlite_dir = tempfile::tempdir().unwrap(); + let sqlite_store = runtime + .block_on(SqliteStateStore::open(sqlite_dir.path(), encryption_password)) + .unwrap(); + runtime + .block_on(sqlite_store.save_changes(&changes)) + .expect("initial filling of sqlite failed"); + + group.bench_with_input( + BenchmarkId::new(format!("sqlite store {encrypted_suffix}"), NAME), + &sqlite_store, + |b, store| { + b.to_async(&runtime).iter(|| async { + let client = Client::builder() + .homeserver_url("https://matrix.example.com") + .store_config(StoreConfig::new().state_store(store.clone())) + .build() + .await + .expect("Can't build client"); + client + .restore_session(session.clone()) + .await + .expect("couldn't restore session"); + }) + }, + ); + + { + let _guard = runtime.enter(); + drop(sqlite_store); + } } group.finish() From 549edeb73b4d2c32843eab0de642bad28c9b3d08 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Mon, 15 May 2023 15:24:49 +0200 Subject: [PATCH 3/6] sdk: Instrument sliding response handling task --- crates/matrix-sdk/src/sliding_sync/mod.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 21fffc98390..c0df3c70137 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -493,7 +493,7 @@ impl SlidingSync { // Spawn a new future to ensure that the code inside this future cannot be // cancelled if this method is cancelled. - spawn(async move { + let fut = async move { debug!("Sliding Sync response handling starts"); // In case the task running this future is detached, we must @@ -503,14 +503,19 @@ impl SlidingSync { match &response.txn_id { None => { - error!(stream_id, "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; it's missing"); + error!( + stream_id, + "Sliding Sync has received an unexpected response: \ + `txn_id` must match `stream_id`; it's missing" + ); } Some(txn_id) if txn_id != &stream_id => { error!( stream_id, txn_id, - "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; they differ" + "Sliding Sync has received an unexpected response: \ + `txn_id` must match `stream_id`; they differ" ); } @@ -537,7 +542,8 @@ impl SlidingSync { debug!("Sliding Sync response has been fully handled"); Ok(Some(updates)) - }).await.unwrap() + }; + spawn(fut.instrument(Span::current())).await.unwrap() } /// Create a _new_ Sliding Sync stream. From c404e378a265ebc3292f9e332c3d6eb9d356ac29 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 15 May 2023 15:33:22 +0200 Subject: [PATCH 4/6] test: sliding sync list fields reloaded from the cache are observable in streams Signed-off-by: Benjamin Bouvier --- crates/matrix-sdk/src/sliding_sync/cache.rs | 27 ++++++++++++++++++- .../src/sliding_sync/list/builder.rs | 6 ++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/cache.rs b/crates/matrix-sdk/src/sliding_sync/cache.rs index 2fb4dae947f..40768998f53 100644 --- a/crates/matrix-sdk/src/sliding_sync/cache.rs +++ b/crates/matrix-sdk/src/sliding_sync/cache.rs @@ -210,7 +210,10 @@ pub(super) async fn restore_sliding_sync_state( #[cfg(test)] mod tests { + use std::sync::{Arc, RwLock}; + use futures::executor::block_on; + use futures_util::StreamExt; use url::Url; use super::*; @@ -274,11 +277,20 @@ mod tests { // Create a new `SlidingSync`, and it should be read from the cache. { + let max_number_of_room_stream = Arc::new(RwLock::new(None)); + let cloned_stream = max_number_of_room_stream.clone(); let sliding_sync = client .sliding_sync() .await .storage_key(Some("hello".to_owned())) - .add_list(SlidingSyncList::builder("list_foo")) + .add_list(SlidingSyncList::builder("list_foo").once_built(move |list| { + // In the `once_built()` handler, nothing has been read from the cache yet. + assert_eq!(list.maximum_number_of_rooms(), None); + + let mut stream = cloned_stream.write().unwrap(); + *stream = Some(list.maximum_number_of_rooms_stream()); + list + })) .build() .await?; @@ -290,6 +302,19 @@ mod tests { assert_eq!(list_foo.maximum_number_of_rooms(), Some(42)); } + // The maximum number of rooms reloaded from the cache should have been + // published. + { + let mut stream = max_number_of_room_stream + .write() + .unwrap() + .take() + .expect("stream must be set"); + let initial_max_number_of_rooms = + stream.next().await.expect("stream must have emitted something"); + assert_eq!(initial_max_number_of_rooms, Some(42)); + } + // Clean the cache. clean_storage(&client, "hello", &sliding_sync.inner.lists.read().unwrap()).await; } diff --git a/crates/matrix-sdk/src/sliding_sync/list/builder.rs b/crates/matrix-sdk/src/sliding_sync/list/builder.rs index d31233650fb..37cedfa267c 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/builder.rs @@ -75,7 +75,11 @@ impl SlidingSyncListBuilder { } } - /// foo + /// Runs a callback once the list has been built. + /// + /// If the list was cached, then the cached fields won't be available in + /// this callback. Use the streams to get published versions of the + /// cached fields, once they've been set. pub fn once_built(mut self, callback: C) -> Self where C: Fn(SlidingSyncList) -> SlidingSyncList + Send + Sync + 'static, From 58dbe1e252ae6854cd3793544dc850a3b2e5dcb6 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 15 May 2023 18:17:45 +0200 Subject: [PATCH 5/6] feat: add `add_cached_list` to `SlidingSyncBuilder` and `SlidingSync` (#1876) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This has slightly drifted from the initial design I thought about in the issue. Instead of having `build()` be fallible and mutably borrow some substate (namely `BTreeMap`) from `SlidingSync` (that may or may not already exist), I've introduced a new `add_cached_list` method on `SlidingSync` and `SlidingSyncBuilder`. This method hides all the underlying machinery, and injects the room data read from the list cache into the sliding sync room map. In particular, with these changes: - any list added with `add_list` **won't** be loaded from/written to the cache storage, - any list added with `add_cached_list` will be cached, and an attempt to reload it from the cache will be done during this call (hence `async` + `Result`). - `SlidingSyncBuilder::build()` now only fetches the `SlidingSync` data from the cache (assuming the storage key has been defined), not that of the lists anymore. Fixes #1737. Signed-off-by: Benjamin Bouvier Co-authored-by: Jonas Platte --- bindings/matrix-sdk-ffi/src/sliding_sync.rs | 24 ++ crates/matrix-sdk/src/sliding_sync/builder.rs | 33 ++- crates/matrix-sdk/src/sliding_sync/cache.rs | 217 +++++++++++------- crates/matrix-sdk/src/sliding_sync/error.rs | 9 + .../src/sliding_sync/list/builder.rs | 99 +++++++- .../matrix-sdk/src/sliding_sync/list/mod.rs | 55 +++-- crates/matrix-sdk/src/sliding_sync/mod.rs | 28 ++- 7 files changed, 349 insertions(+), 116 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 7b0ed5715f8..0e1a9d1e774 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -745,6 +745,19 @@ impl SlidingSync { self.inner.add_list(unwrap_or_clone_arc(list_builder).inner).unwrap(); } + pub fn add_cached_list( + &self, + list_builder: Arc, + ) -> Result>, ClientError> { + RUNTIME.block_on(async move { + Ok(self + .inner + .add_cached_list(list_builder.inner.clone()) + .await? + .map(|inner| Arc::new(SlidingSyncList { inner }))) + }) + } + pub fn reset_lists(&self) -> Result<(), SlidingSyncError> { self.inner.reset_lists().map_err(Into::into) } @@ -815,6 +828,17 @@ impl SlidingSyncBuilder { Arc::new(builder) } + pub fn add_cached_list( + self: Arc, + list_builder: Arc, + ) -> Result, ClientError> { + let mut builder = unwrap_or_clone_arc(self); + let list_builder = unwrap_or_clone_arc(list_builder); + builder.inner = RUNTIME + .block_on(async move { builder.inner.add_cached_list(list_builder.inner).await })?; + Ok(Arc::new(builder)) + } + pub fn with_common_extensions(self: Arc) -> Arc { let mut builder = unwrap_or_clone_arc(self); builder.inner = builder.inner.with_common_extensions(); diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 3c4cad08d29..5ec61b96ffd 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -35,6 +35,7 @@ pub struct SlidingSyncBuilder { bump_event_types: Vec, extensions: Option, subscriptions: BTreeMap, + rooms: BTreeMap, } impl SlidingSyncBuilder { @@ -47,6 +48,7 @@ impl SlidingSyncBuilder { bump_event_types: Vec::new(), extensions: None, subscriptions: BTreeMap::new(), + rooms: BTreeMap::new(), } } @@ -64,12 +66,35 @@ impl SlidingSyncBuilder { /// Add the given list to the lists. /// - /// Replace any list with the name. + /// Replace any list with the same name. pub fn add_list(mut self, list_builder: SlidingSyncListBuilder) -> Self { self.lists.push(list_builder); self } + /// Enroll the list in caching, reloads it from the cache if possible, and + /// adds it to the list of lists. + /// + /// This will raise an error if a [`storage_key()`][Self::storage_key] was + /// not set, or if there was a I/O error reading from the cache. + /// + /// Replace any list with the same name. + pub async fn add_cached_list(mut self, mut list: SlidingSyncListBuilder) -> Result { + let Some(ref storage_key) = self.storage_key else { + return Err(super::error::Error::MissingStorageKeyForCaching.into()); + }; + + let reloaded_rooms = list.set_cached_and_reload(&self.client, storage_key).await?; + + for (key, frozen) in reloaded_rooms { + self.rooms + .entry(key) + .or_insert_with(|| SlidingSyncRoom::from_frozen(frozen, self.client.clone())); + } + + Ok(self.add_list(list)) + } + /// Activate e2ee, to-device-message and account data extensions if not yet /// configured. /// @@ -204,7 +229,6 @@ impl SlidingSyncBuilder { let client = self.client; let mut delta_token = None; - let mut rooms_found: BTreeMap = BTreeMap::new(); let (internal_channel_sender, internal_channel_receiver) = channel(8); @@ -221,15 +245,14 @@ impl SlidingSyncBuilder { restore_sliding_sync_state( &client, storage_key, - &mut lists, + &lists, &mut delta_token, - &mut rooms_found, &mut self.extensions, ) .await?; } - let rooms = StdRwLock::new(rooms_found); + let rooms = StdRwLock::new(self.rooms); let lists = StdRwLock::new(lists); Ok(SlidingSync::new(SlidingSyncInner { diff --git a/crates/matrix-sdk/src/sliding_sync/cache.rs b/crates/matrix-sdk/src/sliding_sync/cache.rs index 40768998f53..b5cf5fbbe95 100644 --- a/crates/matrix-sdk/src/sliding_sync/cache.rs +++ b/crates/matrix-sdk/src/sliding_sync/cache.rs @@ -6,36 +6,47 @@ use std::collections::BTreeMap; -use ruma::{api::client::sync::sync_events::v4::ExtensionsConfig, OwnedRoomId}; +use matrix_sdk_base::{StateStore, StoreError}; +use ruma::api::client::sync::sync_events::v4::ExtensionsConfig; use tracing::{trace, warn}; -use super::{ - FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList, SlidingSyncRoom, -}; -use crate::{Client, Result}; +use super::{FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList}; +use crate::{sliding_sync::SlidingSyncListCachePolicy, Client, Result}; +/// Be careful: as this is used as a storage key; changing it requires migrating +/// data! fn format_storage_key_for_sliding_sync(storage_key: &str) -> String { format!("sliding_sync_store::{storage_key}") } +/// Be careful: as this is used as a storage key; changing it requires migrating +/// data! fn format_storage_key_for_sliding_sync_list(storage_key: &str, list_name: &str) -> String { format!("sliding_sync_store::{storage_key}::{list_name}") } -/// Clean the storage for everything related to `SlidingSync`. +/// Invalidate a single [`SlidingSyncList`] cache entry by removing it from the +/// cache. +async fn invalidate_cached_list( + storage: &dyn StateStore, + storage_key: &str, + list_name: &str, +) { + let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name); + let _ = storage.remove_custom_value(storage_key_for_list.as_bytes()).await; +} + +/// Clean the storage for everything related to `SlidingSync` and all known +/// lists. async fn clean_storage( client: &Client, storage_key: &str, lists: &BTreeMap, ) { let storage = client.store(); - for list_name in lists.keys() { - let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name); - - let _ = storage.remove_custom_value(storage_key_for_list.as_bytes()).await; + invalidate_cached_list(storage, storage_key, list_name).await; } - let _ = storage .remove_custom_value(format_storage_key_for_sliding_sync(storage_key).as_bytes()) .await; @@ -57,7 +68,7 @@ pub(super) async fn store_sliding_sync_state(sliding_sync: &SlidingSync) -> Resu ) .await?; - // Write every `SlidingSyncList` inside the client the store. + // Write every `SlidingSyncList` that's configured for caching into the store. let frozen_lists = { let rooms_lock = sliding_sync.inner.rooms.read().unwrap(); @@ -67,11 +78,13 @@ pub(super) async fn store_sliding_sync_state(sliding_sync: &SlidingSync) -> Resu .read() .unwrap() .iter() - .map(|(list_name, list)| { - Ok(( - format_storage_key_for_sliding_sync_list(storage_key, list_name), - serde_json::to_vec(&FrozenSlidingSyncList::freeze(list, &rooms_lock))?, - )) + .filter_map(|(list_name, list)| { + matches!(list.cache_policy(), SlidingSyncListCachePolicy::Enabled).then(|| { + Ok(( + format_storage_key_for_sliding_sync_list(storage_key, list_name), + serde_json::to_vec(&FrozenSlidingSyncList::freeze(list, &rooms_lock))?, + )) + }) }) .collect::, crate::Error>>()? }; @@ -85,68 +98,64 @@ pub(super) async fn store_sliding_sync_state(sliding_sync: &SlidingSync) -> Resu Ok(()) } -/// Restore the `SlidingSync`'s state from what is stored in the storage. +/// Try to restore a single [`SlidingSyncList`] from the cache. /// -/// If one cache is obsolete (corrupted, and cannot be deserialized or -/// anything), the entire `SlidingSync` cache is removed. -pub(super) async fn restore_sliding_sync_state( - client: &Client, +/// If it fails to deserialize for some reason, invalidate the cache entry. +pub(super) async fn restore_sliding_sync_list( + storage: &dyn StateStore, storage_key: &str, - lists: &mut BTreeMap, - delta_token: &mut Option, - rooms_found: &mut BTreeMap, - extensions: &mut Option, -) -> Result<()> { - let storage = client.store(); - - let mut collected_lists_and_frozen_lists = Vec::with_capacity(lists.len()); - - // Preload the `FrozenSlidingSyncList` objects from the cache. - // - // Even if a cache was detected as obsolete, we go over all of them, so that we - // are sure all obsolete cache entries are removed. - for (list_name, list) in lists.iter_mut() { - let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name); + list_name: &str, +) -> Result> { + let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name); - match storage - .get_custom_value(storage_key_for_list.as_bytes()) - .await? - .map(|custom_value| serde_json::from_slice::(&custom_value)) - { + match storage + .get_custom_value(storage_key_for_list.as_bytes()) + .await? + .map(|custom_value| serde_json::from_slice::(&custom_value)) + { + Some(Ok(frozen_list)) => { // List has been found and successfully deserialized. - Some(Ok(frozen_list)) => { - trace!(list_name, "successfully read the list from cache"); - - // Keep it for later. - collected_lists_and_frozen_lists.push((list, frozen_list)); - } + trace!(list_name, "successfully read the list from cache"); + return Ok(Some(frozen_list)); + } + Some(Err(_)) => { // List has been found, but it wasn't possible to deserialize it. It's declared // as obsolete. The main reason might be that the internal representation of a // `SlidingSyncList` might have changed. Instead of considering this as a strong // error, we remove the entry from the cache and keep the list in its initial // state. - Some(Err(_)) => { - warn!( + warn!( list_name, "failed to deserialize the list from the cache, it is obsolete; removing the cache entry!" ); + // Let's clear the list and stop here. + invalidate_cached_list(storage, storage_key, list_name).await; + } - // Let's clear everything and stop here. - clean_storage(client, storage_key, lists).await; - - return Ok(()); - } - - None => { - trace!(list_name, "failed to find the list in the cache"); - - // A missing cache doesn't make anything obsolete. - // We just do nothing here. - } + None => { + // A missing cache doesn't make anything obsolete. + // We just do nothing here. + trace!(list_name, "failed to find the list in the cache"); } } + Ok(None) +} + +/// Restore the `SlidingSync`'s state from what is stored in the storage. +/// +/// If one cache is obsolete (corrupted, and cannot be deserialized or +/// anything), the entire `SlidingSync` cache is removed. +pub(super) async fn restore_sliding_sync_state( + client: &Client, + storage_key: &str, + lists: &BTreeMap, + delta_token: &mut Option, + extensions: &mut Option, +) -> Result<()> { + let storage = client.store(); + // Preload the `SlidingSync` object from the cache. match storage .get_custom_value(format_storage_key_for_sliding_sync(storage_key).as_bytes()) @@ -156,22 +165,6 @@ pub(super) async fn restore_sliding_sync_state( // `SlidingSync` has been found and successfully deserialized. Some(Ok(FrozenSlidingSync { to_device_since, delta_token: frozen_delta_token })) => { trace!("Successfully read the `SlidingSync` from the cache"); - - // OK, at this step, everything has been loaded successfully from the cache. - - // Let's update all the `SlidingSyncList`. - for (list, FrozenSlidingSyncList { maximum_number_of_rooms, room_list, rooms }) in - collected_lists_and_frozen_lists - { - list.set_from_cold(maximum_number_of_rooms, room_list); - - for (key, frozen_room) in rooms.into_iter() { - rooms_found.entry(key).or_insert_with(|| { - SlidingSyncRoom::from_frozen(frozen_room, client.clone()) - }); - } - } - // Let's update the `SlidingSync`. if let Some(since) = to_device_since { let to_device_ext = &mut extensions.get_or_insert_with(Default::default).to_device; @@ -179,7 +172,6 @@ pub(super) async fn restore_sliding_sync_state( to_device_ext.since = Some(since); } } - *delta_token = frozen_delta_token; } @@ -219,6 +211,27 @@ mod tests { use super::*; use crate::{Client, Result}; + #[test] + fn test_cannot_cache_without_a_storage_key() -> Result<()> { + block_on(async { + let homeserver = Url::parse("https://foo.bar")?; + let client = Client::new(homeserver).await?; + let err = client + .sliding_sync() + .await + .add_cached_list(SlidingSyncList::builder("list_foo")) + .await + .unwrap_err(); + assert!(matches!( + err, + crate::Error::SlidingSync( + crate::sliding_sync::error::Error::MissingStorageKeyForCaching + ) + )); + Ok(()) + }) + } + #[allow(clippy::await_holding_lock)] #[test] fn test_sliding_sync_can_be_stored_and_restored() -> Result<()> { @@ -241,28 +254,40 @@ mod tests { .await? .is_none()); + assert!(store + .get_custom_value( + format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes() + ) + .await? + .is_none()); + // Create a new `SlidingSync` instance, and store it. { let sliding_sync = client .sliding_sync() .await .storage_key(Some("hello".to_owned())) - .add_list(SlidingSyncList::builder("list_foo")) + .add_cached_list(SlidingSyncList::builder("list_foo")) + .await? + .add_list(SlidingSyncList::builder("list_bar")) .build() .await?; - // Modify one list just to check the restoration. + // Modify both lists, so we can check expected caching behavior later. { let lists = sliding_sync.inner.lists.write().unwrap(); - let list_foo = lists.get("list_foo").unwrap(); + let list_foo = lists.get("list_foo").unwrap(); list_foo.set_maximum_number_of_rooms(Some(42)); + + let list_bar = lists.get("list_bar").unwrap(); + list_bar.set_maximum_number_of_rooms(Some(1337)); } assert!(sliding_sync.cache_to_storage().await.is_ok()); } - // Store entries now exist. + // Store entries now exist for the sliding sync object and list_foo. assert!(store .get_custom_value(format_storage_key_for_sliding_sync("hello").as_bytes()) .await? @@ -275,6 +300,14 @@ mod tests { .await? .is_some()); + // But not for list_bar. + assert!(store + .get_custom_value( + format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes() + ) + .await? + .is_none()); + // Create a new `SlidingSync`, and it should be read from the cache. { let max_number_of_room_stream = Arc::new(RwLock::new(None)); @@ -283,7 +316,7 @@ mod tests { .sliding_sync() .await .storage_key(Some("hello".to_owned())) - .add_list(SlidingSyncList::builder("list_foo").once_built(move |list| { + .add_cached_list(SlidingSyncList::builder("list_foo").once_built(move |list| { // In the `once_built()` handler, nothing has been read from the cache yet. assert_eq!(list.maximum_number_of_rooms(), None); @@ -291,15 +324,22 @@ mod tests { *stream = Some(list.maximum_number_of_rooms_stream()); list })) + .await? + .add_list(SlidingSyncList::builder("list_bar")) .build() .await?; // Check the list' state. { let lists = sliding_sync.inner.lists.write().unwrap(); - let list_foo = lists.get("list_foo").unwrap(); + // This one was cached. + let list_foo = lists.get("list_foo").unwrap(); assert_eq!(list_foo.maximum_number_of_rooms(), Some(42)); + + // This one wasn't. + let list_bar = lists.get("list_bar").unwrap(); + assert_eq!(list_bar.maximum_number_of_rooms(), None); } // The maximum number of rooms reloaded from the cache should have been @@ -332,6 +372,13 @@ mod tests { .await? .is_none()); + assert!(store + .get_custom_value( + format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes() + ) + .await? + .is_none()); + Ok(()) }) } diff --git a/crates/matrix-sdk/src/sliding_sync/error.rs b/crates/matrix-sdk/src/sliding_sync/error.rs index a2bc8d88373..58053dc52f1 100644 --- a/crates/matrix-sdk/src/sliding_sync/error.rs +++ b/crates/matrix-sdk/src/sliding_sync/error.rs @@ -12,15 +12,18 @@ pub enum Error { /// `sync`-restart might be required. #[error("The sliding sync response could not be handled: {0}")] BadResponse(String), + /// A `SlidingSyncListRequestGenerator` has been used without having been /// initialized. It happens when a response is handled before a request has /// been sent. It usually happens when testing. #[error("The sliding sync list `{0}` is handling a response, but its request generator has not been initialized")] RequestGeneratorHasNotBeenInitialized(String), + /// Someone has tried to modify a sliding sync list's ranges, but the /// selected sync mode doesn't allow that. #[error("The chosen sync mode for the list `{0}` doesn't allow to modify the ranges")] CannotModifyRanges(String), + /// Ranges have a `start` bound greater than `end`. #[error("Ranges have invalid bounds: `{start}..{end}`")] InvalidRange { @@ -29,6 +32,12 @@ pub enum Error { /// End bound. end: u32, }, + + /// Missing storage key when asking to deserialize some sub-state of sliding + /// sync. + #[error("A caching request was made but a storage key is missing in sliding sync")] + MissingStorageKeyForCaching, + /// The internal channel of `SlidingSync` seems to be broken. #[error("SlidingSync's internal channel is broken")] InternalChannelIsBroken, diff --git a/crates/matrix-sdk/src/sliding_sync/list/builder.rs b/crates/matrix-sdk/src/sliding_sync/list/builder.rs index 37cedfa267c..8ff49022962 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/builder.rs @@ -1,6 +1,7 @@ //! Builder for [`SlidingSyncList`]. use std::{ + collections::BTreeMap, convert::identity, fmt, ops::RangeInclusive, @@ -9,17 +10,35 @@ use std::{ use eyeball::unique::Observable; use eyeball_im::ObservableVector; -use ruma::{api::client::sync::sync_events::v4, events::StateEventType}; +use imbl::Vector; +use ruma::{api::client::sync::sync_events::v4, events::StateEventType, OwnedRoomId}; use tokio::sync::mpsc::Sender; use super::{ - super::SlidingSyncInternalMessage, Bound, SlidingSyncList, SlidingSyncListInner, - SlidingSyncListRequestGenerator, SlidingSyncMode, SlidingSyncState, + super::SlidingSyncInternalMessage, Bound, SlidingSyncList, SlidingSyncListCachePolicy, + SlidingSyncListInner, SlidingSyncListRequestGenerator, SlidingSyncMode, SlidingSyncState, +}; +use crate::{ + sliding_sync::{cache::restore_sliding_sync_list, FrozenSlidingSyncRoom}, + Client, RoomListEntry, }; /// The default name for the full sync list. pub const FULL_SYNC_LIST_NAME: &str = "full-sync"; +/// Data that might have been read from the cache. +#[derive(Clone)] +struct SlidingSyncListCachedData { + /// Total number of rooms that is possible to interact with the given list. + /// See also comment of [`SlidingSyncList::maximum_number_of_rooms`]. + /// May be reloaded from the cache. + maximum_number_of_rooms: Option, + + /// List of room entries. + /// May be reloaded from the cache. + room_list: Vector, +} + /// Builder for [`SlidingSyncList`]. #[derive(Clone)] pub struct SlidingSyncListBuilder { @@ -32,6 +51,14 @@ pub struct SlidingSyncListBuilder { timeline_limit: Option, name: String, ranges: Vec>, + + /// Should this list be cached and reloaded from the cache? + cache_policy: SlidingSyncListCachePolicy, + + /// If set, temporary data that's been read from the cache, reloaded from a + /// `FrozenSlidingSyncList`. + reloaded_cached_data: Option, + once_built: Arc SlidingSyncList + Send + Sync>>, } @@ -71,6 +98,8 @@ impl SlidingSyncListBuilder { timeline_limit: None, name: name.into(), ranges: Vec::new(), + reloaded_cached_data: None, + cache_policy: SlidingSyncListCachePolicy::Disabled, once_built: Arc::new(Box::new(identity)), } } @@ -171,6 +200,34 @@ impl SlidingSyncListBuilder { self } + /// Marks this list as sync'd from the cache, and attempts to reload it from + /// storage. + /// + /// Returns a mapping of the room's data read from the cache, to be + /// incorporated into the `SlidingSync` bookkeepping. + pub(in super::super) async fn set_cached_and_reload( + &mut self, + client: &Client, + storage_key: &str, + ) -> crate::Result> { + self.cache_policy = SlidingSyncListCachePolicy::Enabled; + if let Some(frozen_list) = + restore_sliding_sync_list(client.store(), storage_key, &self.name).await? + { + assert!( + self.reloaded_cached_data.is_none(), + "can't call `set_cached_and_reload` twice" + ); + self.reloaded_cached_data = Some(SlidingSyncListCachedData { + maximum_number_of_rooms: frozen_list.maximum_number_of_rooms, + room_list: frozen_list.room_list, + }); + Ok(frozen_list.rooms) + } else { + Ok(Default::default()) + } + } + /// Build the list. pub(in super::super) fn build( self, @@ -200,14 +257,16 @@ impl SlidingSyncListBuilder { timeline_limit: StdRwLock::new(Observable::new(self.timeline_limit)), name: self.name, ranges: StdRwLock::new(Observable::new(self.ranges)), + cache_policy: self.cache_policy, // Computed from the builder. request_generator: StdRwLock::new(request_generator), - // Default values for the type we are building. - state: StdRwLock::new(Observable::new(SlidingSyncState::default())), + // Values read from deserialization, or that are still equal to the default values + // otherwise. + state: StdRwLock::new(Observable::new(Default::default())), maximum_number_of_rooms: StdRwLock::new(Observable::new(None)), - room_list: StdRwLock::new(ObservableVector::new()), + room_list: StdRwLock::new(ObservableVector::from(Vector::new())), sliding_sync_internal_channel_sender, }), @@ -215,6 +274,32 @@ impl SlidingSyncListBuilder { let once_built = self.once_built; - once_built(list) + let list = once_built(list); + + // If we reloaded from the cache, update values in the list here. + // + // Note about ordering: because of the contract with the observables, the + // initial values, if filled, have to be observable in the `once_built` + // callback. That's why we're doing this here *after* constructing the + // list, and not a few lines above. + + if let Some(SlidingSyncListCachedData { maximum_number_of_rooms, room_list }) = + self.reloaded_cached_data + { + // Mark state as preloaded. + Observable::set(&mut list.inner.state.write().unwrap(), SlidingSyncState::Preloaded); + + // Reload values. + Observable::set( + &mut list.inner.maximum_number_of_rooms.write().unwrap(), + maximum_number_of_rooms, + ); + + let mut prev_room_list = list.inner.room_list.write().unwrap(); + assert!(prev_room_list.is_empty(), "room list was empty on creation above!"); + prev_room_list.append(room_list); + } + + list } } diff --git a/crates/matrix-sdk/src/sliding_sync/list/mod.rs b/crates/matrix-sdk/src/sliding_sync/list/mod.rs index dbd0db8da3c..28f8be16694 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/mod.rs @@ -17,7 +17,6 @@ use eyeball::unique::Observable; use eyeball_im::{ObservableVector, VectorDiff}; pub(super) use frozen::FrozenSlidingSyncList; use futures_core::Stream; -use imbl::Vector; pub(super) use request_generator::*; pub use room_list_entry::RoomListEntry; use ruma::{api::client::sync::sync_events::v4, assign, events::StateEventType, OwnedRoomId}; @@ -28,6 +27,16 @@ use tracing::{instrument, warn}; use super::{Error, SlidingSyncInternalMessage}; use crate::Result; +/// Should this [`SlidingSyncList`] be stored in the cache, and automatically +/// reloaded from the cache upon creation? +#[derive(Clone, Copy, Debug)] +pub(crate) enum SlidingSyncListCachePolicy { + /// Store and load this list from the cache. + Enabled, + /// Don't store and load this list from the cache. + Disabled, +} + /// The type used to express natural bounds (including but not limited to: /// ranges, timeline limit) in the sliding sync SDK. pub type Bound = u32; @@ -46,22 +55,6 @@ impl SlidingSyncList { SlidingSyncListBuilder::new(name) } - pub(crate) fn set_from_cold( - &mut self, - maximum_number_of_rooms: Option, - room_list: Vector, - ) { - Observable::set(&mut self.inner.state.write().unwrap(), SlidingSyncState::Preloaded); - Observable::set( - &mut self.inner.maximum_number_of_rooms.write().unwrap(), - maximum_number_of_rooms, - ); - - let mut lock = self.inner.room_list.write().unwrap(); - lock.clear(); - lock.append(room_list); - } - /// Get the name of the list. pub fn name(&self) -> &str { self.inner.name.as_str() @@ -125,7 +118,13 @@ impl SlidingSyncList { self.inner.state.read().unwrap().clone() } - /// Get a stream of state. + /// Get a stream of state updates. + /// + /// If this list has been reloaded from a cache, the initial value read from + /// the cache will be published. + /// + /// There's no guarantee of ordering between items emitted by this stream + /// and those emitted by other streams exposed on this structure. pub fn state_stream(&self) -> impl Stream { Observable::subscribe(&self.inner.state.read().unwrap()) } @@ -149,6 +148,12 @@ impl SlidingSyncList { } /// Get a stream of room list. + /// + /// If this list has been reloaded from a cache, the initial value read from + /// the cache will be published. + /// + /// There's no guarantee of ordering between items emitted by this stream + /// and those emitted by other streams exposed on this structure. pub fn room_list_stream(&self) -> impl Stream> { ObservableVector::subscribe(&self.inner.room_list.read().unwrap()) } @@ -160,6 +165,12 @@ impl SlidingSyncList { } /// Get a stream of rooms count. + /// + /// If this list has been reloaded from a cache, the initial value is + /// published too. + /// + /// There's no guarantee of ordering between items emitted by this stream + /// and those emitted by other streams exposed on this structure. pub fn maximum_number_of_rooms_stream(&self) -> impl Stream> { Observable::subscribe(&self.inner.maximum_number_of_rooms.read().unwrap()) } @@ -179,6 +190,11 @@ impl SlidingSyncList { self.inner.next_request() } + /// Returns the current cache policy for this list. + pub(super) fn cache_policy(&self) -> SlidingSyncListCachePolicy { + self.inner.cache_policy + } + /// Update the list based on the response from the server. /// /// The `maximum_number_of_rooms` is the `lists.$this_list.count` value, @@ -275,6 +291,9 @@ pub(super) struct SlidingSyncListInner { /// request. See [`SlidingSyncListRequestGenerator`] to learn more. request_generator: StdRwLock, + /// Cache policy for this list. + cache_policy: SlidingSyncListCachePolicy, + sliding_sync_internal_channel_sender: Sender, } diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index c0df3c70137..78874216d42 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -235,6 +235,32 @@ impl SlidingSync { Ok(self.inner.lists.write().unwrap().insert(list.name().to_owned(), list)) } + /// Add a list that will be cached and reloaded from the cache. + /// + /// This will raise an error if a storage key was not set, or if there + /// was a I/O error reading from the cache. + /// + /// The rest of the semantics is the same as [`Self::add_list`]. + pub async fn add_cached_list( + &self, + mut list_builder: SlidingSyncListBuilder, + ) -> Result> { + let Some(ref storage_key) = self.inner.storage_key else { + return Err(error::Error::MissingStorageKeyForCaching.into()); + }; + let reloaded_rooms = + list_builder.set_cached_and_reload(&self.inner.client, storage_key).await?; + if !reloaded_rooms.is_empty() { + let mut rooms = self.inner.rooms.write().unwrap(); + for (key, frozen) in reloaded_rooms { + rooms.entry(key).or_insert_with(|| { + SlidingSyncRoom::from_frozen(frozen, self.inner.client.clone()) + }); + } + } + self.add_list(list_builder) + } + /// Lookup a set of rooms pub fn get_rooms>( &self, @@ -645,7 +671,7 @@ impl SlidingSync { pub fn reset_lists(&self) -> Result<(), Error> { let lists = self.inner.lists.read().unwrap(); - for (_, list) in lists.iter() { + for list in lists.values() { list.reset()?; } From 923d4255854806b8fa91bff74b17ac5ab76bb782 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 15 May 2023 18:58:38 +0100 Subject: [PATCH 6/6] crypto-js: expose a constructor for `SigningKeysUploadRequest` (#1925) ... to help with testing. --- bindings/matrix-sdk-crypto-js/CHANGELOG.md | 1 + bindings/matrix-sdk-crypto-js/src/requests.rs | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/bindings/matrix-sdk-crypto-js/CHANGELOG.md b/bindings/matrix-sdk-crypto-js/CHANGELOG.md index 4a5dc9d6a39..4b09535d071 100644 --- a/bindings/matrix-sdk-crypto-js/CHANGELOG.md +++ b/bindings/matrix-sdk-crypto-js/CHANGELOG.md @@ -3,6 +3,7 @@ - Extend `OlmDevice.markRequestAsSent` to accept responses to `SigningKeysUploadRequest`s. - Fix the body of `SignatureUploadRequest`s to match the spec. +- Add a constructor for `SigningKeysUploadRequest`. # v0.1.0-alpha.8 diff --git a/bindings/matrix-sdk-crypto-js/src/requests.rs b/bindings/matrix-sdk-crypto-js/src/requests.rs index aeba6ffc24c..f68f046a62e 100644 --- a/bindings/matrix-sdk-crypto-js/src/requests.rs +++ b/bindings/matrix-sdk-crypto-js/src/requests.rs @@ -324,6 +324,12 @@ pub struct SigningKeysUploadRequest { #[wasm_bindgen] impl SigningKeysUploadRequest { + /// Create a new `SigningKeysUploadRequest`. + #[wasm_bindgen(constructor)] + pub fn new(id: JsString, body: JsString) -> SigningKeysUploadRequest { + Self { id: Some(id), body } + } + /// Get its request type. #[wasm_bindgen(getter, js_name = "type")] pub fn request_type(&self) -> RequestType {