Skip to content

Commit

Permalink
feat(sync): central class cache
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-starkware committed Oct 16, 2023
1 parent 32bf727 commit 0ce1048
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 7 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ jsonrpsee = "0.18.1"
jsonschema = "0.17.0"
lazy_static = "1.4.0"
libmdbx = "0.3.5"
lru = "0.12.0"
memmap2 = "0.8.0"
metrics = "0.21.0"
metrics-exporter-prometheus = "0.12.1"
Expand Down
7 changes: 6 additions & 1 deletion config/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
"privacy": "Public",
"value": "0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4"
},
"central.class_cache_size": {
"description": "Size of class cache, must be a positive integer.",
"privacy": "Public",
"value": 30
},
"central.concurrent_requests": {
"description": "Maximum number of concurrent requests to Starknet feeder-gateway for getting a type of data (for example, blocks).",
"privacy": "Public",
Expand Down Expand Up @@ -189,4 +194,4 @@
"privacy": "Public",
"value": 1000
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ expression: dumped_default_config
"value": "0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4",
"privacy": "Public"
},
"central.class_cache_size": {
"description": "Size of class cache, must be a positive integer.",
"value": {
"$serde_json::private::Number": "30"
},
"privacy": "Public"
},
"central.concurrent_requests": {
"description": "Maximum number of concurrent requests to Starknet feeder-gateway for getting a type of data (for example, blocks).",
"value": {
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ hex.workspace = true
indexmap = { workspace = true, features = ["serde"] }
itertools.workspace = true
libmdbx = { workspace = true, features = ["lifetimed-bytes"] }
lru.workspace = true
metrics.workspace = true
papyrus_storage = { path = "../papyrus_storage", version = "0.0.5" }
papyrus_base_layer = { path = "../papyrus_base_layer" }
Expand Down
21 changes: 19 additions & 2 deletions crates/papyrus_sync/src/sources/central.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ mod central_test;
mod state_update_stream;

use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};

use async_stream::stream;
use async_trait::async_trait;
Expand All @@ -13,6 +14,7 @@ use futures::stream::BoxStream;
use futures_util::StreamExt;
use indexmap::IndexMap;
use itertools::chain;
use lru::LruCache;
#[cfg(test)]
use mockall::automock;
use papyrus_common::BlockHashAndNumber;
Expand Down Expand Up @@ -49,6 +51,8 @@ pub struct CentralSourceConfig {
pub max_state_updates_to_download: usize,
pub max_state_updates_to_store_in_memory: usize,
pub max_classes_to_download: usize,
// TODO(dan): validate that class_cache_size is a positive integer.
pub class_cache_size: usize,
pub retry_config: RetryConfig,
}

Expand All @@ -61,6 +65,7 @@ impl Default for CentralSourceConfig {
max_state_updates_to_download: 20,
max_state_updates_to_store_in_memory: 20,
max_classes_to_download: 20,
class_cache_size: 30,
retry_config: RetryConfig {
retry_base_millis: 30,
retry_max_delay_millis: 30000,
Expand Down Expand Up @@ -110,6 +115,12 @@ impl SerializeConfig for CentralSourceConfig {
"Maximum number of classes to download at a given time.",
ParamPrivacyInput::Public,
),
ser_param(
"class_cache_size",
&self.class_cache_size,
"Size of class cache, must be a positive integer.",
ParamPrivacyInput::Public,
),
]);
chain!(self_params_dump, append_sub_config_name(self.retry_config.dump(), "retry_config"))
.collect()
Expand All @@ -121,10 +132,11 @@ pub struct GenericCentralSource<TStarknetClient: StarknetReader + Send + Sync> {
pub starknet_client: Arc<TStarknetClient>,
pub storage_reader: StorageReader,
pub state_update_stream_config: StateUpdateStreamConfig,
pub(crate) class_cache: Arc<Mutex<LruCache<ClassHash, ApiContractClass>>>,
}

#[derive(Clone)]
enum ApiContractClass {
pub(crate) enum ApiContractClass {
DeprecatedContractClass(starknet_api::deprecated_contract_class::ContractClass),
ContractClass(starknet_api::state::ContractClass),
}
Expand Down Expand Up @@ -251,6 +263,7 @@ impl<TStarknetClient: StarknetReader + Send + Sync + 'static> CentralSourceTrait
self.starknet_client.clone(),
self.storage_reader.clone(),
self.state_update_stream_config.clone(),
self.class_cache.clone(),
)
.boxed()
}
Expand Down Expand Up @@ -395,6 +408,10 @@ impl CentralSource {
max_state_updates_to_store_in_memory: config.max_state_updates_to_store_in_memory,
max_classes_to_download: config.max_classes_to_download,
},
class_cache: Arc::from(Mutex::new(LruCache::new(
NonZeroUsize::new(config.class_cache_size)
.expect("class_cache_size should be a positive integer."),
))),
})
}
}
32 changes: 30 additions & 2 deletions crates/papyrus_sync/src/sources/central/state_update_stream.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::Poll;

use futures_util::stream::FuturesOrdered;
use futures_util::{Future, Stream, StreamExt};
use indexmap::IndexMap;
use lru::LruCache;
use papyrus_storage::state::StateStorageReader;
use papyrus_storage::StorageReader;
use starknet_api::block::BlockNumber;
Expand Down Expand Up @@ -39,6 +40,7 @@ pub(crate) struct StateUpdateStream<TStarknetClient: StarknetReader + Send + 'st
classes_to_download: VecDeque<ClassHash>,
download_class_tasks: TasksQueue<CentralResult<Option<ApiContractClass>>>,
downloaded_classes: VecDeque<ApiContractClass>,
class_cache: Arc<Mutex<LruCache<ClassHash, ApiContractClass>>>,
config: StateUpdateStreamConfig,
}

Expand Down Expand Up @@ -89,6 +91,7 @@ impl<TStarknetClient: StarknetReader + Send + Sync + 'static> StateUpdateStream<
starknet_client: Arc<TStarknetClient>,
storage_reader: StorageReader,
config: StateUpdateStreamConfig,
class_cache: Arc<Mutex<LruCache<ClassHash, ApiContractClass>>>,
) -> Self {
StateUpdateStream {
initial_block_number,
Expand All @@ -107,6 +110,7 @@ impl<TStarknetClient: StarknetReader + Send + Sync + 'static> StateUpdateStream<
config.max_state_updates_to_store_in_memory * 5,
),
config,
class_cache,
}
}

Expand Down Expand Up @@ -151,7 +155,9 @@ impl<TStarknetClient: StarknetReader + Send + Sync + 'static> StateUpdateStream<
};
let starknet_client = self.starknet_client.clone();
let storage_reader = self.storage_reader.clone();
let cache = self.class_cache.clone();
self.download_class_tasks.push_back(Box::pin(download_class_if_necessary(
cache,
class_hash,
starknet_client,
storage_reader,
Expand Down Expand Up @@ -330,10 +336,18 @@ fn client_to_central_state_update(
// If not found in the storage, the class is downloaded.
#[instrument(skip(starknet_client, storage_reader), level = "debug", err)]
async fn download_class_if_necessary<TStarknetClient: StarknetReader>(
cache: Arc<Mutex<LruCache<ClassHash, ApiContractClass>>>,
class_hash: ClassHash,
starknet_client: Arc<TStarknetClient>,
storage_reader: StorageReader,
) -> CentralResult<Option<ApiContractClass>> {
{
let mut cache = cache.lock().expect("Failed to lock class cache.");
if let Some(class) = cache.get(&class_hash) {
return Ok(Some(class.clone()));
}
}

let txn = storage_reader.begin_ro_txn()?;
let state_reader = txn.get_state_reader()?;
let block_number = txn.get_state_marker()?;
Expand All @@ -342,6 +356,10 @@ async fn download_class_if_necessary<TStarknetClient: StarknetReader>(
// Check declared classes.
if let Ok(Some(class)) = state_reader.get_class_definition_at(state_number, &class_hash) {
trace!("Class {:?} retrieved from storage.", class_hash);
{
let mut cache = cache.lock().expect("Failed to lock class cache.");
cache.put(class_hash, ApiContractClass::ContractClass(class.clone()));
}
return Ok(Some(ApiContractClass::ContractClass(class)));
};

Expand All @@ -350,6 +368,10 @@ async fn download_class_if_necessary<TStarknetClient: StarknetReader>(
state_reader.get_deprecated_class_definition_at(state_number, &class_hash)
{
trace!("Deprecated class {:?} retrieved from storage.", class_hash);
{
let mut cache = cache.lock().expect("Failed to lock class cache.");
cache.put(class_hash, ApiContractClass::DeprecatedContractClass(class.clone()));
}
return Ok(Some(ApiContractClass::DeprecatedContractClass(class)));
}

Expand All @@ -358,6 +380,12 @@ async fn download_class_if_necessary<TStarknetClient: StarknetReader>(
let client_class = starknet_client.class_by_hash(class_hash).await.map_err(Arc::new)?;
match client_class {
None => Ok(None),
Some(class) => Ok(Some(class.into())),
Some(class) => {
{
let mut cache = cache.lock().expect("Failed to lock class cache.");
cache.put(class_hash, class.clone().into());
}
Ok(Some(class.into()))
}
}
}
15 changes: 14 additions & 1 deletion crates/papyrus_sync/src/sources/central_test.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::Arc;
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};

use assert_matches::assert_matches;
use cairo_lang_starknet::casm_contract_class::CasmContractClass;
use futures_util::pin_mut;
use indexmap::{indexmap, IndexMap};
use lru::LruCache;
use mockall::predicate;
use papyrus_storage::state::StateStorageWriter;
use papyrus_storage::test_utils::get_test_storage;
Expand Down Expand Up @@ -38,6 +40,7 @@ use starknet_client::ClientError;
use tokio_stream::StreamExt;

use super::state_update_stream::StateUpdateStreamConfig;
use super::ApiContractClass;
use crate::sources::central::{CentralError, CentralSourceTrait, GenericCentralSource};

const TEST_CONCURRENT_REQUESTS: usize = 300;
Expand All @@ -58,6 +61,7 @@ async fn last_block_number() {
concurrent_requests: TEST_CONCURRENT_REQUESTS,
storage_reader: reader,
state_update_stream_config: state_update_stream_config_for_test(),
class_cache: get_test_class_cache(),
};

let last_block_number = central_source.get_latest_block().await.unwrap().unwrap().block_number;
Expand All @@ -83,6 +87,7 @@ async fn stream_block_headers() {
starknet_client: Arc::new(mock),
storage_reader: reader,
state_update_stream_config: state_update_stream_config_for_test(),
class_cache: get_test_class_cache(),
};

let mut expected_block_num = BlockNumber(START_BLOCK_NUMBER);
Expand Down Expand Up @@ -120,6 +125,7 @@ async fn stream_block_headers_some_are_missing() {
starknet_client: Arc::new(mock),
storage_reader: reader,
state_update_stream_config: state_update_stream_config_for_test(),
class_cache: get_test_class_cache(),
};

let mut expected_block_num = BlockNumber(START_BLOCK_NUMBER);
Expand Down Expand Up @@ -172,6 +178,7 @@ async fn stream_block_headers_error() {
starknet_client: Arc::new(mock),
storage_reader: reader,
state_update_stream_config: state_update_stream_config_for_test(),
class_cache: get_test_class_cache(),
};

let mut expected_block_num = BlockNumber(START_BLOCK_NUMBER);
Expand Down Expand Up @@ -308,6 +315,7 @@ async fn stream_state_updates() {
starknet_client: Arc::new(mock),
storage_reader: reader,
state_update_stream_config: state_update_stream_config_for_test(),
class_cache: get_test_class_cache(),
};
let initial_block_num = BlockNumber(START_BLOCK_NUMBER);

Expand Down Expand Up @@ -418,6 +426,7 @@ async fn stream_compiled_classes() {
starknet_client: Arc::new(mock),
storage_reader: reader,
state_update_stream_config: state_update_stream_config_for_test(),
class_cache: get_test_class_cache(),
};

let stream = central_source.stream_compiled_classes(BlockNumber(0), BlockNumber(2));
Expand All @@ -442,3 +451,7 @@ fn state_update_stream_config_for_test() -> StateUpdateStreamConfig {
max_classes_to_download: 10,
}
}

fn get_test_class_cache() -> Arc<Mutex<LruCache<ClassHash, ApiContractClass>>> {
Arc::from(Mutex::new(LruCache::new(NonZeroUsize::new(2).unwrap())))
}

0 comments on commit 0ce1048

Please sign in to comment.