Skip to content

Commit

Permalink
Merge pull request #3206 from autonomys/mainnet-release-test-2-mmr-sync
Browse files Browse the repository at this point in the history
Add MMR-sync to consensus chain snap sync.
  • Loading branch information
nazar-pc authored Nov 5, 2024
2 parents a5d526a + f15e6d5 commit 4452df8
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 85 deletions.
4 changes: 3 additions & 1 deletion crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ where

if let Some(offchain_storage) = backend.offchain_storage() {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) = MmrRequestHandler::new::<NetworkWorker<_, _>, _>(
let (handler, protocol_config) = MmrRequestHandler::new::<NetworkWorker<_, _>>(
&protocol_id,
fork_id,
client.clone(),
Expand Down Expand Up @@ -1088,6 +1088,8 @@ where
sync_service.clone(),
network_service_handle,
subspace_link.erasure_coding().clone(),
backend.offchain_storage(),
network_service.clone(),
);

let (observer, worker) = sync_from_dsn::create_observer_and_worker(
Expand Down
6 changes: 6 additions & 0 deletions crates/subspace-service/src/mmr.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use sp_core::H256;
use sp_mmr_primitives::utils::NodesUtils;
use sp_mmr_primitives::{NodeIndex, INDEXING_PREFIX};
use subspace_runtime_primitives::opaque::Header;

pub(crate) mod request_handler;
pub(crate) mod sync;

pub(crate) fn get_offchain_key(index: NodeIndex) -> Vec<u8> {
NodesUtils::node_canon_offchain_key(INDEXING_PREFIX, index)
}

pub(crate) fn get_temp_key(index: NodeIndex, hash: H256) -> Vec<u8> {
NodesUtils::node_temp_offchain_key::<Header>(INDEXING_PREFIX, index, hash)
}
63 changes: 50 additions & 13 deletions crates/subspace-service/src/mmr/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use crate::mmr::get_offchain_key;
#[cfg(test)]
mod tests;

use crate::mmr::sync::decode_mmr_data;
use crate::mmr::{get_offchain_key, get_temp_key};
use futures::channel::oneshot;
use futures::stream::StreamExt;
use parity_scale_codec::{Decode, Encode};
Expand All @@ -23,14 +27,17 @@ use sc_network::config::ProtocolId;
use sc_network::request_responses::{IncomingRequest, OutgoingResponse};
use sc_network::{NetworkBackend, PeerId};
use schnellru::{ByLength, LruMap};
use sp_blockchain::HeaderBackend;
use sp_core::offchain::storage::OffchainDb;
use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
use sp_mmr_primitives::utils::NodesUtils;
use sp_runtime::codec;
use sp_runtime::traits::Block as BlockT;
use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::BlockNumber;
use tracing::{debug, error, trace};

const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
Expand Down Expand Up @@ -80,6 +87,12 @@ pub fn generate_protocol_name<Hash: AsRef<[u8]>>(
}
}

fn leaf_index_that_added_node(position: BlockNumber) -> BlockNumber {
NodesUtils::leaf_index_that_added_node(position.into())
.try_into()
.expect("Always its into a block number; qed")
}

/// The key of [`BlockRequestHandler::seen_requests`].
#[derive(Eq, PartialEq, Clone, Hash)]
struct SeenRequestsKey {
Expand Down Expand Up @@ -111,7 +124,10 @@ enum SeenRequestsValue {
}

/// Handler for incoming block requests from a remote peer.
pub struct MmrRequestHandler<Block: BlockT, OS> {
pub struct MmrRequestHandler<Block, OS, Client>
where
Block: BlockT,
{
request_receiver: async_channel::Receiver<IncomingRequest>,
/// Maps from request to number of times we have seen this request.
///
Expand All @@ -120,26 +136,28 @@ pub struct MmrRequestHandler<Block: BlockT, OS> {

offchain_db: OffchainDb<OS>,

client: Arc<Client>,

_phantom: PhantomData<Block>,
}

impl<Block, OS> MmrRequestHandler<Block, OS>
impl<Block, OS, Client> MmrRequestHandler<Block, OS, Client>
where
Block: BlockT,

Block: BlockT<Hash = sp_core::H256>,
Client:
HeaderBackend<Block> + BlockBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
OS: OffchainStorage,
{
/// Create a new [`MmrRequestHandler`].
pub fn new<NB, Client>(
pub fn new<NB>(
protocol_id: &ProtocolId,
fork_id: Option<&str>,
client: Arc<Client>,
num_peer_hint: usize,
offchain_storage: OS,
) -> (Self, NB::RequestResponseProtocolConfig)
where
NB: NetworkBackend<Block, Block::Hash>,
Client: BlockBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
NB: NetworkBackend<Block, <Block as BlockT>::Hash>,
{
// Reserve enough request slots for one request per peer when we are at the maximum
// number of peers.
Expand All @@ -162,6 +180,7 @@ where

(
Self {
client,
request_receiver,
seen_requests,
offchain_db: OffchainDb::new(offchain_storage),
Expand Down Expand Up @@ -228,17 +247,35 @@ where
Err(())
} else {
let mut mmr_data = BTreeMap::new();
for block_number in
request.starting_position..(request.starting_position + request.limit)
{
let canon_key = get_offchain_key(block_number.into());
for position in request.starting_position..(request.starting_position + request.limit) {
let canon_key = get_offchain_key(position.into());
let storage_value = self
.offchain_db
.local_storage_get(StorageKind::PERSISTENT, &canon_key);

let block_number = leaf_index_that_added_node(position);
trace!(%position, %block_number, "Storage data present: {}", storage_value.is_some());

if let Some(storage_value) = storage_value {
mmr_data.insert(block_number, storage_value);
mmr_data.insert(position, storage_value);
} else {
if let Ok(Some(hash)) = self.client.hash(block_number.into()) {
let temp_key = get_temp_key(position.into(), hash);
let storage_value = self
.offchain_db
.local_storage_get(StorageKind::PERSISTENT, &temp_key);

if let Some(storage_value) = storage_value {
let data = decode_mmr_data(&storage_value);
trace!(%position, %block_number,"MMR node: {data:?}");
mmr_data.insert(position, storage_value);
continue;
} else {
debug!(%position, %block_number, ?hash, "Didn't find value in storage.")
}
} else {
debug!(%position, %block_number, "Didn't find hash.")
}
break; // No more storage values
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/subspace-service/src/mmr/request_handler/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use subspace_core_primitives::BlockNumber;

#[test]
fn leaf_index_that_added_node_fits_block_number() {
// Must not panic
super::leaf_index_that_added_node(BlockNumber::MAX);
}
Loading

0 comments on commit 4452df8

Please sign in to comment.