Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MMR-sync to consensus chain snap sync. #3206

Merged
merged 5 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ where

if let Some(offchain_storage) = backend.offchain_storage() {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) = MmrRequestHandler::new::<NetworkWorker<_, _>, _>(
let (handler, protocol_config) = MmrRequestHandler::new::<NetworkWorker<_, _>>(
&protocol_id,
fork_id,
client.clone(),
Expand Down Expand Up @@ -1088,6 +1088,8 @@ where
sync_service.clone(),
network_service_handle,
subspace_link.erasure_coding().clone(),
backend.offchain_storage(),
network_service.clone(),
);

let (observer, worker) = sync_from_dsn::create_observer_and_worker(
Expand Down
6 changes: 6 additions & 0 deletions crates/subspace-service/src/mmr.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use sp_core::H256;
use sp_mmr_primitives::utils::NodesUtils;
use sp_mmr_primitives::{NodeIndex, INDEXING_PREFIX};
use subspace_runtime_primitives::opaque::Header;

pub(crate) mod request_handler;
pub(crate) mod sync;

pub(crate) fn get_offchain_key(index: NodeIndex) -> Vec<u8> {
NodesUtils::node_canon_offchain_key(INDEXING_PREFIX, index)
}

pub(crate) fn get_temp_key(index: NodeIndex, hash: H256) -> Vec<u8> {
NodesUtils::node_temp_offchain_key::<Header>(INDEXING_PREFIX, index, hash)
}
63 changes: 50 additions & 13 deletions crates/subspace-service/src/mmr/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use crate::mmr::get_offchain_key;
#[cfg(test)]
mod tests;

use crate::mmr::sync::decode_mmr_data;
use crate::mmr::{get_offchain_key, get_temp_key};
use futures::channel::oneshot;
use futures::stream::StreamExt;
use parity_scale_codec::{Decode, Encode};
Expand All @@ -23,14 +27,17 @@ use sc_network::config::ProtocolId;
use sc_network::request_responses::{IncomingRequest, OutgoingResponse};
use sc_network::{NetworkBackend, PeerId};
use schnellru::{ByLength, LruMap};
use sp_blockchain::HeaderBackend;
use sp_core::offchain::storage::OffchainDb;
use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
use sp_mmr_primitives::utils::NodesUtils;
use sp_runtime::codec;
use sp_runtime::traits::Block as BlockT;
use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::BlockNumber;
use tracing::{debug, error, trace};

const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
Expand Down Expand Up @@ -80,6 +87,12 @@ pub fn generate_protocol_name<Hash: AsRef<[u8]>>(
}
}

fn leaf_index_that_added_node(position: BlockNumber) -> BlockNumber {
NodesUtils::leaf_index_that_added_node(position.into())
.try_into()
.expect("Always its into a block number; qed")
}

/// The key of [`BlockRequestHandler::seen_requests`].
#[derive(Eq, PartialEq, Clone, Hash)]
struct SeenRequestsKey {
Expand Down Expand Up @@ -111,7 +124,10 @@ enum SeenRequestsValue {
}

/// Handler for incoming block requests from a remote peer.
pub struct MmrRequestHandler<Block: BlockT, OS> {
pub struct MmrRequestHandler<Block, OS, Client>
where
Block: BlockT,
{
request_receiver: async_channel::Receiver<IncomingRequest>,
/// Maps from request to number of times we have seen this request.
///
Expand All @@ -120,26 +136,28 @@ pub struct MmrRequestHandler<Block: BlockT, OS> {

offchain_db: OffchainDb<OS>,

client: Arc<Client>,

_phantom: PhantomData<Block>,
}

impl<Block, OS> MmrRequestHandler<Block, OS>
impl<Block, OS, Client> MmrRequestHandler<Block, OS, Client>
where
Block: BlockT,

Block: BlockT<Hash = sp_core::H256>,
Client:
HeaderBackend<Block> + BlockBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
OS: OffchainStorage,
{
/// Create a new [`MmrRequestHandler`].
pub fn new<NB, Client>(
pub fn new<NB>(
protocol_id: &ProtocolId,
fork_id: Option<&str>,
client: Arc<Client>,
num_peer_hint: usize,
offchain_storage: OS,
) -> (Self, NB::RequestResponseProtocolConfig)
where
NB: NetworkBackend<Block, Block::Hash>,
Client: BlockBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
NB: NetworkBackend<Block, <Block as BlockT>::Hash>,
{
// Reserve enough request slots for one request per peer when we are at the maximum
// number of peers.
Expand All @@ -162,6 +180,7 @@ where

(
Self {
client,
request_receiver,
seen_requests,
offchain_db: OffchainDb::new(offchain_storage),
Expand Down Expand Up @@ -228,17 +247,35 @@ where
Err(())
} else {
let mut mmr_data = BTreeMap::new();
for block_number in
request.starting_position..(request.starting_position + request.limit)
{
let canon_key = get_offchain_key(block_number.into());
for position in request.starting_position..(request.starting_position + request.limit) {
let canon_key = get_offchain_key(position.into());
let storage_value = self
.offchain_db
.local_storage_get(StorageKind::PERSISTENT, &canon_key);

let block_number = leaf_index_that_added_node(position);
trace!(%position, %block_number, "Storage data present: {}", storage_value.is_some());

if let Some(storage_value) = storage_value {
mmr_data.insert(block_number, storage_value);
mmr_data.insert(position, storage_value);
} else {
if let Ok(Some(hash)) = self.client.hash(block_number.into()) {
let temp_key = get_temp_key(position.into(), hash);
let storage_value = self
.offchain_db
.local_storage_get(StorageKind::PERSISTENT, &temp_key);

if let Some(storage_value) = storage_value {
let data = decode_mmr_data(&storage_value);
trace!(%position, %block_number,"MMR node: {data:?}");
mmr_data.insert(position, storage_value);
continue;
} else {
debug!(%position, %block_number, ?hash, "Didn't find value in storage.")
}
} else {
debug!(%position, %block_number, "Didn't find hash.")
}
break; // No more storage values
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/subspace-service/src/mmr/request_handler/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use subspace_core_primitives::BlockNumber;

#[test]
fn leaf_index_that_added_node_fits_block_number() {
// Must not panic
super::leaf_index_that_added_node(BlockNumber::MAX);
}
Loading
Loading