Skip to content

Commit

Permalink
replace lru cache with schnellru lru map
Browse files Browse the repository at this point in the history
  • Loading branch information
anujmax authored and anujmax committed May 14, 2024
1 parent 0dc1ce8 commit 601c92c
Show file tree
Hide file tree
Showing 19 changed files with 118 additions and 131 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/sc-consensus-subspace-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ async-oneshot = "0.5.9"
futures = "0.3.29"
futures-timer = "3.0.3"
jsonrpsee = { version = "0.22.5", features = ["server", "macros"] }
lru = "0.12.3"
parity-scale-codec = "3.6.9"
parking_lot = "0.12.2"
schnellru = "0.2.3"
sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" }
sc-consensus-subspace = { version = "0.1.0", path = "../sc-consensus-subspace" }
sc-rpc = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" }
Expand Down
15 changes: 7 additions & 8 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use jsonrpsee::core::async_trait;
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::types::{ErrorObject, ErrorObjectOwned};
use jsonrpsee::PendingSubscriptionSink;
use lru::LruCache;
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use sc_client_api::{AuxStore, BlockBackend};
Expand All @@ -40,6 +39,7 @@ use sc_rpc::utils::pipe_from_stream;
use sc_rpc::SubscriptionTaskExecutor;
use sc_rpc_api::{DenyUnsafe, UnsafeRpcError};
use sc_utils::mpsc::TracingUnboundedSender;
use schnellru::{ByLength, LruMap};
use sp_api::{ApiError, ProvideRuntimeApi};
use sp_blockchain::HeaderBackend;
use sp_consensus::SyncOracle;
Expand All @@ -53,7 +53,6 @@ use sp_runtime::traits::Block as BlockT;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;
Expand Down Expand Up @@ -231,7 +230,7 @@ where
archived_segment_notification_stream: SubspaceNotificationStream<ArchivedSegmentNotification>,
#[allow(clippy::type_complexity)]
solution_response_senders:
Arc<Mutex<LruCache<SlotNumber, mpsc::Sender<Solution<PublicKey, PublicKey>>>>>,
Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution<PublicKey, PublicKey>>>>>,
reward_signature_senders: Arc<Mutex<BlockSignatureSenders>>,
dsn_bootstrap_nodes: Vec<Multiaddr>,
segment_headers_store: SegmentHeadersStore<AS>,
Expand Down Expand Up @@ -278,18 +277,18 @@ where
let block_authoring_delay = u64::from(chain_constants.block_authoring_delay());
let block_authoring_delay = usize::try_from(block_authoring_delay)
.expect("Block authoring delay will never exceed usize on any platform; qed");
let solution_response_senders_capacity =
NonZeroUsize::try_from(block_authoring_delay).unwrap_or(NonZeroUsize::MIN);
let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
.expect("Always a tiny constant in the protocol; qed");

Ok(Self {
client: config.client,
subscription_executor: config.subscription_executor,
new_slot_notification_stream: config.new_slot_notification_stream,
reward_signing_notification_stream: config.reward_signing_notification_stream,
archived_segment_notification_stream: config.archived_segment_notification_stream,
solution_response_senders: Arc::new(Mutex::new(LruCache::new(
solution_response_senders: Arc::new(Mutex::new(LruMap::new(ByLength::new(
solution_response_senders_capacity,
))),
)))),
reward_signature_senders: Arc::default(),
dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
segment_headers_store: config.segment_headers_store,
Expand Down Expand Up @@ -400,7 +399,7 @@ where
let (response_sender, mut response_receiver) =
mpsc::channel(SOLUTION_SENDER_CHANNEL_CAPACITY);

solution_response_senders.push(slot_number, response_sender);
solution_response_senders.insert(slot_number, response_sender);

// Wait for solutions and transform proposed proof of space solutions
// into data structure `sc-consensus-subspace` expects
Expand Down
2 changes: 1 addition & 1 deletion crates/sc-consensus-subspace/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ targets = ["x86_64-unknown-linux-gnu"]
async-trait = "0.1.80"
codec = { package = "parity-scale-codec", version = "3.6.5", features = ["derive"] }
futures = "0.3.29"
lru = "0.12.3"
parking_lot = "0.12.2"
rand = "0.8.5"
rand_chacha = "0.3.1"
rayon = "1.10.0"
schnellru = "0.2.3"
schnorrkel = "0.11.4"
sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" }
sc-consensus = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" }
Expand Down
2 changes: 1 addition & 1 deletion crates/sc-proof-of-time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ atomic = "0.5.3"
core_affinity = "0.8.1"
derive_more = "0.99.17"
futures = "0.3.29"
lru = "0.12.3"
parity-scale-codec = { version = "3.6.9", features = ["derive"] }
parking_lot = "0.12.2"
rayon = "1.10.0"
schnellru = "0.2.3"
sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" }
sc-consensus-slots = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" }
sc-network = { git = "https://github.com/subspace/polkadot-sdk", rev = "808269708cf5375526755797e8f9a9986016727d" }
Expand Down
46 changes: 23 additions & 23 deletions crates/sc-proof-of-time/src/source/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::source::state::PotState;
use crate::verifier::PotVerifier;
use futures::channel::mpsc;
use futures::{FutureExt, SinkExt, StreamExt};
use lru::LruCache;
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use sc_network::config::NonDefaultSetConfig;
Expand All @@ -13,14 +12,15 @@ use sc_network_gossip::{
GossipEngine, MessageIntent, Network as GossipNetwork, Syncing as GossipSyncing,
ValidationResult, Validator, ValidatorContext,
};
use schnellru::{ByLength, LruMap};
use sp_consensus::SyncOracle;
use sp_consensus_slots::Slot;
use sp_consensus_subspace::PotNextSlotInput;
use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT};
use std::cmp;
use std::collections::{HashMap, VecDeque};
use std::future::poll_fn;
use std::num::{NonZeroU32, NonZeroUsize};
use std::num::NonZeroU32;
use std::pin::pin;
use std::sync::{atomic, Arc};
use subspace_core_primitives::{PotCheckpoints, PotSeed};
Expand All @@ -30,7 +30,7 @@ use tracing::{debug, error, trace, warn};
const MAX_SLOTS_IN_THE_FUTURE: u64 = 10;
/// How much faster PoT verification is expected to be comparing to PoT proving
const EXPECTED_POT_VERIFICATION_SPEEDUP: usize = 7;
const GOSSIP_CACHE_PEER_COUNT: NonZeroUsize = NonZeroUsize::new(1_000).expect("Not zero; qed");
const GOSSIP_CACHE_PEER_COUNT: u32 = 1_000;
const GOSSIP_CACHE_PER_PEER_SIZE: usize = 20;

mod rep {
Expand Down Expand Up @@ -116,7 +116,7 @@ where
topic: Block::Hash,
state: Arc<PotState>,
pot_verifier: PotVerifier,
gossip_cache: LruCache<PeerId, VecDeque<GossipProof>>,
gossip_cache: LruMap<PeerId, VecDeque<GossipProof>>,
to_gossip_receiver: mpsc::Receiver<ToGossipMessage>,
from_gossip_sender: mpsc::Sender<(PeerId, GossipProof)>,
}
Expand Down Expand Up @@ -166,7 +166,7 @@ where
topic,
state,
pot_verifier,
gossip_cache: LruCache::new(GOSSIP_CACHE_PEER_COUNT),
gossip_cache: LruMap::new(ByLength::new(GOSSIP_CACHE_PEER_COUNT)),
to_gossip_receiver,
from_gossip_sender,
}
Expand Down Expand Up @@ -265,25 +265,24 @@ where
"Proof from the future",
);

let proofs = self
.gossip_cache
.get_or_insert_mut(sender, Default::default);
if proofs.len() == GOSSIP_CACHE_PER_PEER_SIZE {
if let Some(proof) = proofs.pop_front() {
trace!(
%sender,
slot = %proof.slot,
next_slot = %next_slot_input.slot,
"Too many proofs stored from peer",
);

self.engine
.lock()
.report(sender, rep::GOSSIP_TOO_MANY_PROOFS);
if let Some(proofs) = self.gossip_cache.get_or_insert(sender, Default::default) {
if proofs.len() == GOSSIP_CACHE_PER_PEER_SIZE {
if let Some(proof) = proofs.pop_front() {
trace!(
%sender,
slot = %proof.slot,
next_slot = %next_slot_input.slot,
"Too many proofs stored from peer",
);

self.engine
.lock()
.report(sender, rep::GOSSIP_TOO_MANY_PROOFS);
}
}
proofs.push_back(proof);
return;
}
proofs.push_back(proof);
return;
}
}

Expand Down Expand Up @@ -324,7 +323,8 @@ where
/// well as produce next proof if it was already received out of order before
async fn handle_next_slot_input(&mut self, next_slot_input: PotNextSlotInput) {
let mut old_proofs = HashMap::<GossipProof, Vec<PeerId>>::new();
for (sender, proofs) in &mut self.gossip_cache {

for (sender, proofs) in &mut self.gossip_cache.iter_mut() {
proofs.retain(|proof| {
if proof.slot > next_slot_input.slot {
true
Expand Down
24 changes: 12 additions & 12 deletions crates/sc-proof-of-time/src/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
#[cfg(test)]
mod tests;

use lru::LruCache;
use parking_lot::Mutex;
use schnellru::{ByLength, LruMap};
use sp_consensus_slots::Slot;
use sp_consensus_subspace::{PotNextSlotInput, PotParametersChange};
use std::num::{NonZeroU32, NonZeroUsize};
use std::num::NonZeroU32;
use std::sync::Arc;
use subspace_core_primitives::{PotCheckpoints, PotOutput, PotSeed};

Expand All @@ -26,14 +26,14 @@ struct CacheValue {
#[derive(Debug, Clone)]
pub struct PotVerifier {
genesis_seed: PotSeed,
cache: Arc<Mutex<LruCache<CacheKey, CacheValue>>>,
cache: Arc<Mutex<LruMap<CacheKey, CacheValue>>>,
}

impl PotVerifier {
pub fn new(genesis_seed: PotSeed, cache_size: NonZeroUsize) -> Self {
pub fn new(genesis_seed: PotSeed, cache_size: u32) -> Self {
Self {
genesis_seed,
cache: Arc::new(Mutex::new(LruCache::new(cache_size))),
cache: Arc::new(Mutex::new(LruMap::new(ByLength::new(cache_size)))),
}
}

Expand All @@ -44,7 +44,7 @@ impl PotVerifier {
slot_iterations: NonZeroU32,
checkpoints: PotCheckpoints,
) {
self.cache.lock().push(
self.cache.lock().insert(
CacheKey {
seed,
slot_iterations,
Expand Down Expand Up @@ -196,7 +196,7 @@ impl PotVerifier {
.try_lock()
.expect("No one can access this mutex yet; qed");
// Store pending verification entry in cache
cache.push(cache_key, cache_value);
cache.insert(cache_key, cache_value);
// Cache lock is no longer necessary, other callers should be able to access cache too
drop(cache);

Expand All @@ -207,13 +207,13 @@ impl PotVerifier {
drop(checkpoints);

// Proving failed, remove pending entry from cache such that retries can happen
let maybe_removed_cache_value = self.cache.lock().pop(&cache_key);
let maybe_removed_cache_value = self.cache.lock().remove(&cache_key);
if let Some(removed_cache_value) = maybe_removed_cache_value {
// It is possible that we have removed a verified value that we have not
// inserted, check for this and restore if that was the case
let removed_verified_value = removed_cache_value.checkpoints.lock().is_some();
if removed_verified_value {
self.cache.lock().push(cache_key, removed_cache_value);
self.cache.lock().insert(cache_key, removed_cache_value);
}
}
return None;
Expand Down Expand Up @@ -267,7 +267,7 @@ impl PotVerifier {
.try_lock()
.expect("No one can access this mutex yet; qed");
// Store pending verification entry in cache
cache.push(cache_key, cache_value);
cache.insert(cache_key, cache_value);
// Cache lock is no longer necessary, other callers should be able to access cache too
drop(cache);

Expand All @@ -280,13 +280,13 @@ impl PotVerifier {
drop(correct_checkpoints);

// Verification failed, remove pending entry from cache such that retries can happen
let maybe_removed_cache_value = self.cache.lock().pop(&cache_key);
let maybe_removed_cache_value = self.cache.lock().remove(&cache_key);
if let Some(removed_cache_value) = maybe_removed_cache_value {
// It is possible that we have removed a verified value that we have not
// inserted, check for this and restore if that was the case
let removed_verified_value = removed_cache_value.checkpoints.lock().is_some();
if removed_verified_value {
self.cache.lock().push(cache_key, removed_cache_value);
self.cache.lock().insert(cache_key, removed_cache_value);
}
}
return false;
Expand Down
Loading

0 comments on commit 601c92c

Please sign in to comment.