diff --git a/Cargo.lock b/Cargo.lock index 511fab3a46..30230a449f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9840,7 +9840,6 @@ version = "0.1.0" dependencies = [ "async-trait", "futures", - "lru 0.12.3", "parity-scale-codec", "parking_lot 0.12.2", "rand", @@ -9853,6 +9852,7 @@ dependencies = [ "sc-telemetry", "sc-transaction-pool-api", "sc-utils", + "schnellru", "schnorrkel", "sp-api", "sp-block-builder", @@ -9881,7 +9881,6 @@ dependencies = [ "futures", "futures-timer", "jsonrpsee", - "lru 0.12.3", "parity-scale-codec", "parking_lot 0.12.2", "sc-client-api", @@ -9889,6 +9888,7 @@ dependencies = [ "sc-rpc", "sc-rpc-api", "sc-utils", + "schnellru", "sp-api", "sp-blockchain", "sp-consensus", @@ -10266,7 +10266,6 @@ dependencies = [ "core_affinity", "derive_more", "futures", - "lru 0.12.3", "parity-scale-codec", "parking_lot 0.12.2", "rayon", @@ -10274,6 +10273,7 @@ dependencies = [ "sc-consensus-slots", "sc-network", "sc-network-gossip", + "schnellru", "sp-api", "sp-blockchain", "sp-consensus", @@ -10690,9 +10690,9 @@ dependencies = [ [[package]] name = "schnellru" -version = "0.2.1" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "772575a524feeb803e5b0fcbc6dd9f367e579488197c94c6e4023aad2305774d" +checksum = "c9a8ef13a93c54d20580de1e5c413e624e53121d42fc7e2c11d10ef7f8b02367" dependencies = [ "ahash 0.8.11", "cfg-if", @@ -12348,7 +12348,6 @@ dependencies = [ "hex", "hwlocality", "jsonrpsee", - "lru 0.12.3", "mimalloc", "num_cpus", "parity-scale-codec", @@ -12357,6 +12356,7 @@ dependencies = [ "prometheus-client 0.22.2", "rand", "rayon", + "schnellru", "schnorrkel", "serde", "serde_json", @@ -12505,7 +12505,6 @@ dependencies = [ "hex", "libp2p 0.53.2", "libp2p-swarm-test", - "lru 0.12.3", "memmap2 0.9.4", "nohash-hasher", "parity-scale-codec", @@ -12513,6 +12512,7 @@ dependencies = [ "pin-project", "prometheus-client 0.22.2", "rand", + "schnellru", "serde", "serde_json", "subspace-core-primitives", diff --git a/crates/sc-consensus-subspace-rpc/Cargo.toml b/crates/sc-consensus-subspace-rpc/Cargo.toml index ab84b02fa3..3d7c7d529c 100644 --- a/crates/sc-consensus-subspace-rpc/Cargo.toml +++ b/crates/sc-consensus-subspace-rpc/Cargo.toml @@ -17,9 +17,9 @@ async-oneshot = "0.5.9" futures = "0.3.29" futures-timer = "3.0.3" jsonrpsee = { version = "0.22.5", features = ["server", "macros"] } -lru = "0.12.3" parity-scale-codec = "3.6.9" parking_lot = "0.12.2" +schnellru = "0.2.3" sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" } sc-consensus-subspace = { version = "0.1.0", path = "../sc-consensus-subspace" } sc-rpc = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" } diff --git a/crates/sc-consensus-subspace-rpc/src/lib.rs b/crates/sc-consensus-subspace-rpc/src/lib.rs index 741a68e63f..19a113f9aa 100644 --- a/crates/sc-consensus-subspace-rpc/src/lib.rs +++ b/crates/sc-consensus-subspace-rpc/src/lib.rs @@ -25,7 +25,6 @@ use jsonrpsee::core::async_trait; use jsonrpsee::proc_macros::rpc; use jsonrpsee::types::{ErrorObject, ErrorObjectOwned}; use jsonrpsee::PendingSubscriptionSink; -use lru::LruCache; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; use sc_client_api::{AuxStore, BlockBackend}; @@ -40,6 +39,7 @@ use sc_rpc::utils::pipe_from_stream; use sc_rpc::SubscriptionTaskExecutor; use sc_rpc_api::{DenyUnsafe, UnsafeRpcError}; use sc_utils::mpsc::TracingUnboundedSender; +use schnellru::{ByLength, LruMap}; use sp_api::{ApiError, ProvideRuntimeApi}; use sp_blockchain::HeaderBackend; use sp_consensus::SyncOracle; @@ -53,7 +53,6 @@ use sp_runtime::traits::Block as BlockT; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::marker::PhantomData; -use std::num::NonZeroUsize; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Weak}; use std::time::Duration; @@ -231,7 +230,7 @@ where archived_segment_notification_stream: SubspaceNotificationStream, #[allow(clippy::type_complexity)] solution_response_senders: - Arc>>>>, + Arc>>>>, reward_signature_senders: Arc>, dsn_bootstrap_nodes: Vec, segment_headers_store: SegmentHeadersStore, @@ -278,8 +277,8 @@ where let block_authoring_delay = u64::from(chain_constants.block_authoring_delay()); let block_authoring_delay = usize::try_from(block_authoring_delay) .expect("Block authoring delay will never exceed usize on any platform; qed"); - let solution_response_senders_capacity = - NonZeroUsize::try_from(block_authoring_delay).unwrap_or(NonZeroUsize::MIN); + let solution_response_senders_capacity = u32::try_from(block_authoring_delay) + .expect("Always a tiny constant in the protocol; qed"); Ok(Self { client: config.client, @@ -287,9 +286,9 @@ where new_slot_notification_stream: config.new_slot_notification_stream, reward_signing_notification_stream: config.reward_signing_notification_stream, archived_segment_notification_stream: config.archived_segment_notification_stream, - solution_response_senders: Arc::new(Mutex::new(LruCache::new( + solution_response_senders: Arc::new(Mutex::new(LruMap::new(ByLength::new( solution_response_senders_capacity, - ))), + )))), reward_signature_senders: Arc::default(), dsn_bootstrap_nodes: config.dsn_bootstrap_nodes, segment_headers_store: config.segment_headers_store, @@ -400,7 +399,7 @@ where let (response_sender, mut response_receiver) = mpsc::channel(SOLUTION_SENDER_CHANNEL_CAPACITY); - solution_response_senders.push(slot_number, response_sender); + solution_response_senders.insert(slot_number, response_sender); // Wait for solutions and transform proposed proof of space solutions // into data structure `sc-consensus-subspace` expects diff --git a/crates/sc-consensus-subspace/Cargo.toml b/crates/sc-consensus-subspace/Cargo.toml index b953886a10..c0efc1db65 100644 --- a/crates/sc-consensus-subspace/Cargo.toml +++ b/crates/sc-consensus-subspace/Cargo.toml @@ -16,11 +16,11 @@ targets = ["x86_64-unknown-linux-gnu"] async-trait = "0.1.80" codec = { package = "parity-scale-codec", version = "3.6.5", features = ["derive"] } futures = "0.3.29" -lru = "0.12.3" parking_lot = "0.12.2" rand = "0.8.5" rand_chacha = "0.3.1" rayon = "1.10.0" +schnellru = "0.2.3" schnorrkel = "0.11.4" sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" } sc-consensus = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" } diff --git a/crates/sc-proof-of-time/Cargo.toml b/crates/sc-proof-of-time/Cargo.toml index e91e1655a7..8045b98e67 100644 --- a/crates/sc-proof-of-time/Cargo.toml +++ b/crates/sc-proof-of-time/Cargo.toml @@ -15,10 +15,10 @@ atomic = "0.5.3" core_affinity = "0.8.1" derive_more = "0.99.17" futures = "0.3.29" -lru = "0.12.3" parity-scale-codec = { version = "3.6.9", features = ["derive"] } parking_lot = "0.12.2" rayon = "1.10.0" +schnellru = "0.2.3" sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" } sc-consensus-slots = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" } sc-network = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" } diff --git a/crates/sc-proof-of-time/src/source/gossip.rs b/crates/sc-proof-of-time/src/source/gossip.rs index b97a66f0c5..4198e70f68 100644 --- a/crates/sc-proof-of-time/src/source/gossip.rs +++ b/crates/sc-proof-of-time/src/source/gossip.rs @@ -4,7 +4,6 @@ use crate::source::state::PotState; use crate::verifier::PotVerifier; use futures::channel::mpsc; use futures::{FutureExt, SinkExt, StreamExt}; -use lru::LruCache; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; use sc_network::config::NonDefaultSetConfig; @@ -13,6 +12,7 @@ use sc_network_gossip::{ GossipEngine, MessageIntent, Network as GossipNetwork, Syncing as GossipSyncing, ValidationResult, Validator, ValidatorContext, }; +use schnellru::{ByLength, LruMap}; use sp_consensus::SyncOracle; use sp_consensus_slots::Slot; use sp_consensus_subspace::PotNextSlotInput; @@ -20,7 +20,7 @@ use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT}; use std::cmp; use std::collections::{HashMap, VecDeque}; use std::future::poll_fn; -use std::num::{NonZeroU32, NonZeroUsize}; +use std::num::NonZeroU32; use std::pin::pin; use std::sync::{atomic, Arc}; use subspace_core_primitives::{PotCheckpoints, PotSeed}; @@ -30,7 +30,7 @@ use tracing::{debug, error, trace, warn}; const MAX_SLOTS_IN_THE_FUTURE: u64 = 10; /// How much faster PoT verification is expected to be comparing to PoT proving const EXPECTED_POT_VERIFICATION_SPEEDUP: usize = 7; -const GOSSIP_CACHE_PEER_COUNT: NonZeroUsize = NonZeroUsize::new(1_000).expect("Not zero; qed"); +const GOSSIP_CACHE_PEER_COUNT: u32 = 1_000; const GOSSIP_CACHE_PER_PEER_SIZE: usize = 20; mod rep { @@ -116,7 +116,7 @@ where topic: Block::Hash, state: Arc, pot_verifier: PotVerifier, - gossip_cache: LruCache>, + gossip_cache: LruMap>, to_gossip_receiver: mpsc::Receiver, from_gossip_sender: mpsc::Sender<(PeerId, GossipProof)>, } @@ -166,7 +166,7 @@ where topic, state, pot_verifier, - gossip_cache: LruCache::new(GOSSIP_CACHE_PEER_COUNT), + gossip_cache: LruMap::new(ByLength::new(GOSSIP_CACHE_PEER_COUNT)), to_gossip_receiver, from_gossip_sender, } @@ -265,25 +265,24 @@ where "Proof from the future", ); - let proofs = self - .gossip_cache - .get_or_insert_mut(sender, Default::default); - if proofs.len() == GOSSIP_CACHE_PER_PEER_SIZE { - if let Some(proof) = proofs.pop_front() { - trace!( - %sender, - slot = %proof.slot, - next_slot = %next_slot_input.slot, - "Too many proofs stored from peer", - ); - - self.engine - .lock() - .report(sender, rep::GOSSIP_TOO_MANY_PROOFS); + if let Some(proofs) = self.gossip_cache.get_or_insert(sender, Default::default) { + if proofs.len() == GOSSIP_CACHE_PER_PEER_SIZE { + if let Some(proof) = proofs.pop_front() { + trace!( + %sender, + slot = %proof.slot, + next_slot = %next_slot_input.slot, + "Too many proofs stored from peer", + ); + + self.engine + .lock() + .report(sender, rep::GOSSIP_TOO_MANY_PROOFS); + } } + proofs.push_back(proof); + return; } - proofs.push_back(proof); - return; } } @@ -324,7 +323,8 @@ where /// well as produce next proof if it was already received out of order before async fn handle_next_slot_input(&mut self, next_slot_input: PotNextSlotInput) { let mut old_proofs = HashMap::>::new(); - for (sender, proofs) in &mut self.gossip_cache { + + for (sender, proofs) in &mut self.gossip_cache.iter_mut() { proofs.retain(|proof| { if proof.slot > next_slot_input.slot { true diff --git a/crates/sc-proof-of-time/src/verifier.rs b/crates/sc-proof-of-time/src/verifier.rs index a64b1af193..e9926a95e4 100644 --- a/crates/sc-proof-of-time/src/verifier.rs +++ b/crates/sc-proof-of-time/src/verifier.rs @@ -3,11 +3,11 @@ #[cfg(test)] mod tests; -use lru::LruCache; use parking_lot::Mutex; +use schnellru::{ByLength, LruMap}; use sp_consensus_slots::Slot; use sp_consensus_subspace::{PotNextSlotInput, PotParametersChange}; -use std::num::{NonZeroU32, NonZeroUsize}; +use std::num::NonZeroU32; use std::sync::Arc; use subspace_core_primitives::{PotCheckpoints, PotOutput, PotSeed}; @@ -26,14 +26,14 @@ struct CacheValue { #[derive(Debug, Clone)] pub struct PotVerifier { genesis_seed: PotSeed, - cache: Arc>>, + cache: Arc>>, } impl PotVerifier { - pub fn new(genesis_seed: PotSeed, cache_size: NonZeroUsize) -> Self { + pub fn new(genesis_seed: PotSeed, cache_size: u32) -> Self { Self { genesis_seed, - cache: Arc::new(Mutex::new(LruCache::new(cache_size))), + cache: Arc::new(Mutex::new(LruMap::new(ByLength::new(cache_size)))), } } @@ -44,7 +44,7 @@ impl PotVerifier { slot_iterations: NonZeroU32, checkpoints: PotCheckpoints, ) { - self.cache.lock().push( + self.cache.lock().insert( CacheKey { seed, slot_iterations, @@ -196,7 +196,7 @@ impl PotVerifier { .try_lock() .expect("No one can access this mutex yet; qed"); // Store pending verification entry in cache - cache.push(cache_key, cache_value); + cache.insert(cache_key, cache_value); // Cache lock is no longer necessary, other callers should be able to access cache too drop(cache); @@ -207,13 +207,13 @@ impl PotVerifier { drop(checkpoints); // Proving failed, remove pending entry from cache such that retries can happen - let maybe_removed_cache_value = self.cache.lock().pop(&cache_key); + let maybe_removed_cache_value = self.cache.lock().remove(&cache_key); if let Some(removed_cache_value) = maybe_removed_cache_value { // It is possible that we have removed a verified value that we have not // inserted, check for this and restore if that was the case let removed_verified_value = removed_cache_value.checkpoints.lock().is_some(); if removed_verified_value { - self.cache.lock().push(cache_key, removed_cache_value); + self.cache.lock().insert(cache_key, removed_cache_value); } } return None; @@ -267,7 +267,7 @@ impl PotVerifier { .try_lock() .expect("No one can access this mutex yet; qed"); // Store pending verification entry in cache - cache.push(cache_key, cache_value); + cache.insert(cache_key, cache_value); // Cache lock is no longer necessary, other callers should be able to access cache too drop(cache); @@ -280,13 +280,13 @@ impl PotVerifier { drop(correct_checkpoints); // Verification failed, remove pending entry from cache such that retries can happen - let maybe_removed_cache_value = self.cache.lock().pop(&cache_key); + let maybe_removed_cache_value = self.cache.lock().remove(&cache_key); if let Some(removed_cache_value) = maybe_removed_cache_value { // It is possible that we have removed a verified value that we have not // inserted, check for this and restore if that was the case let removed_verified_value = removed_cache_value.checkpoints.lock().is_some(); if removed_verified_value { - self.cache.lock().push(cache_key, removed_cache_value); + self.cache.lock().insert(cache_key, removed_cache_value); } } return false; diff --git a/crates/sc-proof-of-time/src/verifier/tests.rs b/crates/sc-proof-of-time/src/verifier/tests.rs index c1ab555472..a9cd07932c 100644 --- a/crates/sc-proof-of-time/src/verifier/tests.rs +++ b/crates/sc-proof-of-time/src/verifier/tests.rs @@ -2,7 +2,7 @@ use crate::verifier::PotVerifier; use sp_consensus_slots::Slot; use sp_consensus_subspace::{PotNextSlotInput, PotParametersChange}; use std::mem; -use std::num::{NonZeroU32, NonZeroUsize}; +use std::num::NonZeroU32; use subspace_core_primitives::{Blake3Hash, PotSeed}; const SEED: [u8; 16] = [ @@ -15,7 +15,7 @@ fn test_basic() { let slot_iterations = NonZeroU32::new(512).unwrap(); let checkpoints_1 = subspace_proof_of_time::prove(genesis_seed, slot_iterations).unwrap(); - let verifier = PotVerifier::new(genesis_seed, NonZeroUsize::new(1000).unwrap()); + let verifier = PotVerifier::new(genesis_seed, 1000); // Expected to be valid assert!(verifier.is_output_valid( @@ -139,7 +139,7 @@ fn parameters_change() { let checkpoints_3 = subspace_proof_of_time::prove(checkpoints_2.output().seed(), slot_iterations_2).unwrap(); - let verifier = PotVerifier::new(genesis_seed, NonZeroUsize::new(1000).unwrap()); + let verifier = PotVerifier::new(genesis_seed, 1000); // Changing parameters after first slot assert!(verifier.is_output_valid( diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index b33c483d2d..c636a81d87 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -32,7 +32,6 @@ futures = "0.3.29" hex = { version = "0.4.3", features = ["serde"] } hwlocality = { version = "1.0.0-alpha.3", features = ["vendored"], optional = true } jsonrpsee = { version = "0.22.5", features = ["client"] } -lru = "0.12.3" mimalloc = "0.1.41" num_cpus = "1.16.0" parity-scale-codec = "3.6.9" @@ -41,6 +40,7 @@ pin-project = "1.1.5" prometheus-client = "0.22.2" rand = "0.8.5" rayon = "1.10.0" +schnellru = "0.2.3" schnorrkel = "0.11.4" serde = { version = "1.0.199", features = ["derive"] } serde_json = "1.0.116" diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index 1f42a740eb..87c33a67fa 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -55,7 +55,6 @@ pub mod utils; pub use identity::Identity; pub use jsonrpsee; -use std::num::NonZeroUsize; /// Size of the LRU cache for peers. -pub const KNOWN_PEERS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(100).expect("Not zero; qed"); +pub const KNOWN_PEERS_CACHE_SIZE: u32 = 100; diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 6d724f9c3b..a4804cc327 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -10,14 +10,13 @@ use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use futures::channel::{mpsc, oneshot}; use futures::stream::FuturesOrdered; use futures::{select, FutureExt, SinkExt, StreamExt}; -use lru::LruCache; use parity_scale_codec::Encode; +use schnellru::{ByLength, LruMap}; use std::collections::{HashMap, HashSet}; #[cfg(not(windows))] use std::fs::File; use std::future::{pending, Future}; use std::io; -use std::num::NonZeroUsize; use std::ops::Range; use std::pin::pin; use std::sync::Arc; @@ -35,7 +34,8 @@ use tracing::{debug, info, trace, warn}; const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500); /// Size of the cache of archived segments for the purposes of faster sector expiration checks. -const ARCHIVED_SEGMENTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).expect("Not zero; qed"); + +const ARCHIVED_SEGMENTS_CACHE_SIZE: u32 = 1000; const PLOTTING_RETRY_DELAY: Duration = Duration::from_secs(1); pub(super) struct SectorToPlot { @@ -715,7 +715,8 @@ where let mut sectors_to_replot = Vec::new(); let mut sectors_to_check = Vec::with_capacity(usize::from(target_sector_count)); - let mut archived_segment_commitments_cache = LruCache::new(ARCHIVED_SEGMENTS_CACHE_SIZE); + let mut archived_segment_commitments_cache = + LruMap::new(ByLength::new(ARCHIVED_SEGMENTS_CACHE_SIZE)); loop { let archived_segment_header = *archived_segments_receiver.borrow_and_update(); @@ -798,7 +799,7 @@ where let segment_commitment = segment_header.segment_commitment(); archived_segment_commitments_cache - .push(expiration_check_segment_index, segment_commitment); + .insert(expiration_check_segment_index, segment_commitment); segment_commitment }) }; diff --git a/crates/subspace-networking/Cargo.toml b/crates/subspace-networking/Cargo.toml index e653d365b8..c751fdee77 100644 --- a/crates/subspace-networking/Cargo.toml +++ b/crates/subspace-networking/Cargo.toml @@ -29,7 +29,6 @@ fs2 = "0.4.3" futures = "0.3.29" futures-timer = "3.0.3" hex = "0.4.3" -lru = "0.12.3" memmap2 = "0.9.4" nohash-hasher = "0.2.0" parity-scale-codec = "3.6.9" @@ -37,6 +36,7 @@ parking_lot = "0.12.2" pin-project = "1.1.5" prometheus-client = "0.22.2" rand = "0.8.5" +schnellru = "0.2.3" serde = { version = "1.0.199", features = ["derive"] } serde_json = "1.0.116" subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } diff --git a/crates/subspace-networking/src/behavior/persistent_parameters.rs b/crates/subspace-networking/src/behavior/persistent_parameters.rs index 43fd1293d6..4a913c8042 100644 --- a/crates/subspace-networking/src/behavior/persistent_parameters.rs +++ b/crates/subspace-networking/src/behavior/persistent_parameters.rs @@ -6,14 +6,14 @@ use futures::future::{pending, Fuse}; use futures::FutureExt; use libp2p::multiaddr::Protocol; use libp2p::{Multiaddr, PeerId}; -use lru::LruCache; use memmap2::{MmapMut, MmapOptions}; use parity_scale_codec::{Compact, CompactLen, Decode, Encode}; use parking_lot::Mutex; +use schnellru::{ByLength, LruMap}; use std::collections::HashSet; use std::fs::OpenOptions; use std::io::{Read, Seek, SeekFrom}; -use std::num::{NonZeroU64, NonZeroUsize}; +use std::num::NonZeroUsize; use std::path::Path; use std::pin::Pin; use std::str::FromStr; @@ -30,9 +30,9 @@ use tracing::{debug, error, trace, warn}; type FailureTime = Option; /// Size of the LRU cache for peers. -const KNOWN_PEERS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(100).expect("Not zero; qed"); +const KNOWN_PEERS_CACHE_SIZE: u32 = 100; /// Size of the LRU cache for addresses of a single peer ID. -const ADDRESSES_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(30).expect("Not zero; qed"); +const ADDRESSES_CACHE_SIZE: u32 = 30; /// Pause duration between network parameters save. const DATA_FLUSH_DURATION_SECS: u64 = 5; /// Defines a batch size for a combined collection for known peers addresses and boostrap addresses. @@ -60,21 +60,19 @@ struct EncodableKnownPeerAddress { #[derive(Debug, Encode, Decode)] struct EncodableKnownPeers { - cache_size: NonZeroU64, + cache_size: u32, timestamp: u64, // Each entry is a tuple of peer ID + list of multiaddresses with corresponding failure time known_peers: Vec<(Vec, Vec)>, } impl EncodableKnownPeers { - fn into_cache(self) -> LruCache> { - let mut peers_cache = LruCache::new( - NonZeroUsize::new(self.cache_size.get() as usize) - .expect("Upstream value is NoneZeroUsize"), - ); + fn into_cache(self) -> LruMap> { + let mut peers_cache = LruMap::new(ByLength::new(self.cache_size)); 'peers: for (peer_id, addresses) in self.known_peers { - let mut peer_cache = LruCache::::new(ADDRESSES_CACHE_SIZE); + let mut peer_cache = + LruMap::::new(ByLength::new(ADDRESSES_CACHE_SIZE)); let peer_id = match PeerId::from_bytes(&peer_id) { Ok(peer_id) => peer_id, @@ -95,7 +93,7 @@ impl EncodableKnownPeers { } }; - peer_cache.push( + peer_cache.insert( multiaddr, address.failure_time.map(|failure_time| { SystemTime::UNIX_EPOCH + Duration::from_secs(failure_time) @@ -103,21 +101,17 @@ impl EncodableKnownPeers { ); } - peers_cache.push(peer_id, peer_cache); + peers_cache.insert(peer_id, peer_cache); } peers_cache } - fn from_cache( - cache: &LruCache>, - cache_size: NonZeroUsize, - ) -> Self { + fn from_cache(cache: &LruMap>, cache_size: u32) -> Self { let single_peer_encoded_address_size = KnownPeersManager::single_peer_encoded_address_size(); Self { - cache_size: NonZeroU64::new(cache_size.get() as u64) - .expect("Getting the value from another NonZero type"), + cache_size, timestamp: SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("Never before Unix epoch; qed") @@ -260,7 +254,7 @@ pub struct KnownPeersManagerConfig { /// Defines whether we return known peers batches on next_known_addresses_batch(). pub enable_known_peers_source: bool, /// Defines cache size. - pub cache_size: NonZeroUsize, + pub cache_size: u32, /// Peer ID list to filter on address adding. pub ignore_peer_list: HashSet, /// Defines whether we enable cache persistence. @@ -301,7 +295,7 @@ pub struct KnownPeersManager { /// Defines whether the cache requires saving to DB cache_need_saving: bool, /// LRU cache for the known peers and their addresses - known_peers: LruCache>, + known_peers: LruMap>, /// Period between networking parameters saves. networking_parameters_save_delay: Pin>>, /// Slots backed by file that store known peers @@ -332,7 +326,7 @@ impl Drop for KnownPeersManager { impl KnownPeersManager { fn init_file( path: &Path, - cache_size: NonZeroUsize, + cache_size: u32, ) -> Result< (Option, Arc>), KnownPeersManagerPersistenceError, @@ -456,7 +450,7 @@ impl KnownPeersManager { let known_peers = maybe_newest_known_addresses .map(EncodableKnownPeers::into_cache) - .unwrap_or_else(|| LruCache::new(config.cache_size)); + .unwrap_or_else(|| LruMap::new(ByLength::new(config.cache_size))); Ok(Self { cache_need_saving: false, @@ -483,7 +477,7 @@ impl KnownPeersManager { } /// Size of the backing file on disk - pub fn file_size(cache_size: NonZeroUsize) -> usize { + pub fn file_size(cache_size: u32) -> usize { // *2 because we have a/b parts of the file Self::known_addresses_size(cache_size) * 2 } @@ -514,21 +508,21 @@ impl KnownPeersManager { // Peer ID encoding + compact encoding of the length of list of addresses + (length of a // single peer address entry + optional failure time) * number of entries PeerId::random().to_bytes().encoded_size() - + Compact::compact_len(&(ADDRESSES_CACHE_SIZE.get() as u32)) + + Compact::compact_len(&(ADDRESSES_CACHE_SIZE)) + (Self::single_peer_encoded_address_size() + Some(0u64).encoded_size()) - * ADDRESSES_CACHE_SIZE.get() + * ADDRESSES_CACHE_SIZE as usize } /// Size of known addresses and accompanying metadata. /// /// NOTE: This is max size that needs to be allocated on disk for successful write of a single /// `known_addresses` copy, the actual written data can occupy only a part of this size - fn known_addresses_size(cache_size: NonZeroUsize) -> usize { + fn known_addresses_size(cache_size: u32) -> usize { // Timestamp (when was written) + compact encoding of the length of peer records + peer // records + checksum mem::size_of::() - + Compact::compact_len(&(cache_size.get() as u32)) - + Self::single_peer_encoded_size() * cache_size.get() + + Compact::compact_len(&(cache_size)) + + Self::single_peer_encoded_size() * cache_size as usize + mem::size_of::() } @@ -579,11 +573,11 @@ impl KnownPeersRegistry for KnownPeersManager { .for_each(|addr| { // Add new address cache if it doesn't exist previously. self.known_peers - .get_or_insert(peer_id, || LruCache::new(ADDRESSES_CACHE_SIZE)); - - if let Some(addresses) = self.known_peers.get_mut(&peer_id) { - let previous_entry = addresses.push(addr, None); + .get_or_insert(peer_id, || LruMap::new(ByLength::new(ADDRESSES_CACHE_SIZE))); + if let Some(addresses) = self.known_peers.get(&peer_id) { + let previous_entry = addresses.peek(&addr).cloned().flatten(); + addresses.insert(addr, None); if let Some(previous_entry) = previous_entry { trace!(%peer_id, "Address cache entry replaced: {:?}", previous_entry); } @@ -614,7 +608,7 @@ impl KnownPeersRegistry for KnownPeersManager { fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) { trace!(%peer_id, "Remove all peer addresses from the networking parameters registry"); - self.known_peers.pop(&peer_id); + self.known_peers.remove(&peer_id); self.cache_need_saving = true; } @@ -715,7 +709,7 @@ pub(crate) fn append_p2p_suffix(peer_id: PeerId, mut address: Multiaddr) -> Mult // Testable implementation of the `remove_known_peer_addresses` pub(super) fn remove_known_peer_addresses_internal( - known_peers: &mut LruCache>, + known_peers: &mut LruMap>, peer_id: PeerId, addresses: Vec, expired_address_duration_persistent_storage: Duration, @@ -730,7 +724,7 @@ pub(super) fn remove_known_peer_addresses_internal( .for_each(|addr| { // if peer_id is present in the cache if let Some(addresses) = known_peers.peek_mut(&peer_id) { - let last_address = addresses.contains(&addr) && addresses.len() == 1; + let last_address = addresses.peek(&addr).is_some() && addresses.len() == 1; // Get mutable reference to first_failed_time for the address without updating // the item's position in the cache if let Some(first_failed_time) = addresses.peek_mut(&addr) { @@ -751,11 +745,11 @@ pub(super) fn remove_known_peer_addresses_internal( // if we failed first time more than a day ago (for persistent cache) if *time + expired_address_duration_persistent_storage < now { // Remove a failed address - addresses.pop(&addr); + addresses.remove(&addr); // If the last address for peer if last_address { - known_peers.pop(&peer_id); + known_peers.remove(&peer_id); trace!(%peer_id, "Peer removed from the cache"); } diff --git a/crates/subspace-networking/src/behavior/tests.rs b/crates/subspace-networking/src/behavior/tests.rs index ccdf545483..827ab67b6e 100644 --- a/crates/subspace-networking/src/behavior/tests.rs +++ b/crates/subspace-networking/src/behavior/tests.rs @@ -8,11 +8,10 @@ use futures::channel::oneshot; use futures::future::pending; use libp2p::multiaddr::Protocol; use libp2p::{Multiaddr, PeerId}; -use lru::LruCache; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; +use schnellru::{ByLength, LruMap}; use std::future::Future; -use std::num::NonZeroUsize; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; @@ -30,14 +29,14 @@ async fn test_address_timed_removal_from_known_peers_cache() { let expiration = Duration::from_nanos(1); let expiration_kademlia = Duration::from_nanos(1); - let mut peers_cache = LruCache::new(NonZeroUsize::new(100).unwrap()); - let mut addresses_cache = LruCache::new(NonZeroUsize::new(100).unwrap()); + let mut peers_cache = LruMap::new(ByLength::new(100)); + let mut addresses_cache = LruMap::new(ByLength::new(100)); for addr in addresses.clone() { - addresses_cache.push(addr, None); + addresses_cache.insert(addr, None); } - peers_cache.push(peer_id, addresses_cache); + peers_cache.insert(peer_id, addresses_cache); //Precondition-check assert_eq!(peers_cache.len(), 1); @@ -96,12 +95,12 @@ async fn test_different_removal_timing_from_known_peers_cache() { let expiration = Duration::from_secs(3); let expiration_kademlia = Duration::from_secs(1); - let mut peers_cache = LruCache::new(NonZeroUsize::new(100).unwrap()); - let mut addresses_cache = LruCache::new(NonZeroUsize::new(100).unwrap()); + let mut peers_cache = LruMap::new(ByLength::new(100)); + let mut addresses_cache = LruMap::new(ByLength::new(100)); let addresses = vec![addr.clone()]; - addresses_cache.push(addr, None); - peers_cache.push(peer_id, addresses_cache); + addresses_cache.insert(addr, None); + peers_cache.insert(peer_id, addresses_cache); //Precondition-check assert_eq!(peers_cache.len(), 1); @@ -281,7 +280,7 @@ async fn test_address_p2p_prefix_addition() { async fn test_known_peers_removal_address_after_specified_interval() { let config = KnownPeersManagerConfig { enable_known_peers_source: false, - cache_size: NonZeroUsize::new(100).unwrap(), + cache_size: 100, ignore_peer_list: Default::default(), path: None, failed_address_cache_removal_interval: Duration::from_millis(100), diff --git a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs index 61bbe61e19..49727a64c1 100644 --- a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs +++ b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs @@ -13,7 +13,6 @@ use std::collections::HashSet; use std::error::Error; use std::fmt::{Display, Formatter}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; @@ -31,7 +30,7 @@ use tracing_subscriber::EnvFilter; const REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA_SECS: Duration = Duration::from_secs(3600); /// Size of the LRU cache for peers. -pub const KNOWN_PEERS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(10000).expect("Not zero; qed"); +pub const KNOWN_PEERS_CACHE_SIZE: u32 = 10000; #[derive(Debug, Parser)] #[clap(about, version)] diff --git a/crates/subspace-networking/src/constructor.rs b/crates/subspace-networking/src/constructor.rs index 8b4f3b1ab5..8503379367 100644 --- a/crates/subspace-networking/src/constructor.rs +++ b/crates/subspace-networking/src/constructor.rs @@ -35,7 +35,6 @@ use parking_lot::Mutex; use prometheus_client::registry::Registry; use std::borrow::Cow; use std::iter::Empty; -use std::num::NonZeroUsize; use std::sync::Arc; use std::time::{Duration, Instant}; use std::{fmt, io, iter}; @@ -66,7 +65,7 @@ const SWARM_MAX_ESTABLISHED_CONNECTIONS_PER_PEER: Option = Some(3); // use-case for gossipsub protocol. const ENABLE_GOSSIP_PROTOCOL: bool = false; -const TEMPORARY_BANS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(10_000).expect("Not zero; qed"); +const TEMPORARY_BANS_CACHE_SIZE: u32 = 10_000; const TEMPORARY_BANS_DEFAULT_BACKOFF_INITIAL_INTERVAL: Duration = Duration::from_secs(5); const TEMPORARY_BANS_DEFAULT_BACKOFF_RANDOMIZATION_FACTOR: f64 = 0.1; const TEMPORARY_BANS_DEFAULT_BACKOFF_MULTIPLIER: f64 = 1.5; @@ -224,7 +223,7 @@ pub struct Config { /// Pending outgoing swarm connection limit. pub max_pending_outgoing_connections: u32, /// How many temporarily banned unreachable peers to keep in memory. - pub temporary_bans_cache_size: NonZeroUsize, + pub temporary_bans_cache_size: u32, /// Backoff policy for temporary banning of unreachable peers. pub temporary_ban_backoff: ExponentialBackoff, /// Optional libp2p prometheus metrics. None will disable metrics gathering. diff --git a/crates/subspace-networking/src/constructor/temporary_bans.rs b/crates/subspace-networking/src/constructor/temporary_bans.rs index 1a392827f9..c0db3c80b3 100644 --- a/crates/subspace-networking/src/constructor/temporary_bans.rs +++ b/crates/subspace-networking/src/constructor/temporary_bans.rs @@ -1,8 +1,7 @@ use backoff::backoff::Backoff; use backoff::ExponentialBackoff; use libp2p::PeerId; -use lru::LruCache; -use std::num::NonZeroUsize; +use schnellru::{ByLength, LruMap}; use std::ops::Add; use std::time::Instant; @@ -63,14 +62,14 @@ impl TemporaryBan { #[derive(Debug)] pub(crate) struct TemporaryBans { backoff: ExponentialBackoff, - list: LruCache, + list: LruMap, } impl TemporaryBans { - pub(super) fn new(capacity: NonZeroUsize, backoff: ExponentialBackoff) -> Self { + pub(super) fn new(capacity: u32, backoff: ExponentialBackoff) -> Self { Self { backoff, - list: LruCache::new(capacity), + list: LruMap::new(ByLength::new(capacity)), } } @@ -87,11 +86,11 @@ impl TemporaryBans { /// Create temporary ban for peer or extend existing ban pub(crate) fn create_or_extend(&mut self, peer_id: &PeerId) { - if let Some(ban) = self.list.get_mut(peer_id) { + if let Some(ban) = self.list.get(peer_id) { ban.try_extend(); } else { self.list - .put(*peer_id, TemporaryBan::new(self.backoff.clone())); + .insert(*peer_id, TemporaryBan::new(self.backoff.clone())); } } @@ -99,6 +98,6 @@ impl TemporaryBans { /// /// Returns `true` if there was an entry for peer during call. pub(crate) fn remove(&mut self, peer_id: &PeerId) -> bool { - self.list.pop(peer_id).is_some() + self.list.remove(peer_id).is_some() } } diff --git a/crates/subspace-service/src/dsn.rs b/crates/subspace-service/src/dsn.rs index ab97c3b791..66f82dd47a 100644 --- a/crates/subspace-service/src/dsn.rs +++ b/crates/subspace-service/src/dsn.rs @@ -1,7 +1,6 @@ use prometheus_client::registry::Registry; use std::collections::HashSet; use std::fs; -use std::num::NonZeroUsize; use std::path::PathBuf; use subspace_networking::libp2p::kad::Mode; use subspace_networking::libp2p::{identity, Multiaddr}; @@ -15,7 +14,7 @@ use thiserror::Error; use tracing::{error, trace}; /// Size of the LRU cache for peers. -pub const KNOWN_PEERS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(100).expect("Not zero; qed"); +pub const KNOWN_PEERS_CACHE_SIZE: u32 = 100; /// Errors that might happen during DSN configuration. #[derive(Debug, Error)] diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index 0238fd922d..5b915681c7 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -106,7 +106,6 @@ use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunct use sp_transaction_pool::runtime_api::TaggedTransactionQueue; use static_assertions::const_assert; use std::marker::PhantomData; -use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; @@ -125,7 +124,7 @@ const_assert!(std::mem::size_of::() >= std::mem::size_of::()); /// This is over 15 minutes of slots assuming there are no forks, should be both sufficient and not /// too large to handle -const POT_VERIFIER_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(30_000).expect("Not zero; qed"); +const POT_VERIFIER_CACHE_SIZE: u32 = 30_000; const SYNC_TARGET_UPDATE_INTERVAL: Duration = Duration::from_secs(1); /// Error type for Subspace service.