diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index 009a82ccfb..9bc25aa326 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -905,7 +905,7 @@ where if let Some(offchain_storage) = backend.offchain_storage() { // Allow both outgoing and incoming requests. - let (handler, protocol_config) = MmrRequestHandler::new::, _>( + let (handler, protocol_config) = MmrRequestHandler::new::>( &protocol_id, fork_id, client.clone(), @@ -1088,6 +1088,8 @@ where sync_service.clone(), network_service_handle, subspace_link.erasure_coding().clone(), + backend.offchain_storage(), + network_service.clone(), ); let (observer, worker) = sync_from_dsn::create_observer_and_worker( diff --git a/crates/subspace-service/src/mmr.rs b/crates/subspace-service/src/mmr.rs index 590f5e9d1f..42c85780d1 100644 --- a/crates/subspace-service/src/mmr.rs +++ b/crates/subspace-service/src/mmr.rs @@ -1,5 +1,7 @@ +use sp_core::H256; use sp_mmr_primitives::utils::NodesUtils; use sp_mmr_primitives::{NodeIndex, INDEXING_PREFIX}; +use subspace_runtime_primitives::opaque::Header; pub(crate) mod request_handler; pub(crate) mod sync; @@ -7,3 +9,7 @@ pub(crate) mod sync; pub(crate) fn get_offchain_key(index: NodeIndex) -> Vec { NodesUtils::node_canon_offchain_key(INDEXING_PREFIX, index) } + +pub(crate) fn get_temp_key(index: NodeIndex, hash: H256) -> Vec { + NodesUtils::node_temp_offchain_key::
(INDEXING_PREFIX, index, hash) +} diff --git a/crates/subspace-service/src/mmr/request_handler.rs b/crates/subspace-service/src/mmr/request_handler.rs index dcb5c76e5b..78c8977c15 100644 --- a/crates/subspace-service/src/mmr/request_handler.rs +++ b/crates/subspace-service/src/mmr/request_handler.rs @@ -14,7 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::mmr::get_offchain_key; +#[cfg(test)] +mod tests; + +use crate::mmr::sync::decode_mmr_data; +use crate::mmr::{get_offchain_key, get_temp_key}; use futures::channel::oneshot; use futures::stream::StreamExt; use parity_scale_codec::{Decode, Encode}; @@ -23,14 +27,17 @@ use sc_network::config::ProtocolId; use sc_network::request_responses::{IncomingRequest, OutgoingResponse}; use sc_network::{NetworkBackend, PeerId}; use schnellru::{ByLength, LruMap}; +use sp_blockchain::HeaderBackend; use sp_core::offchain::storage::OffchainDb; use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind}; +use sp_mmr_primitives::utils::NodesUtils; use sp_runtime::codec; use sp_runtime::traits::Block as BlockT; use std::collections::BTreeMap; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; +use subspace_core_primitives::BlockNumber; use tracing::{debug, error, trace}; const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2; @@ -80,6 +87,12 @@ pub fn generate_protocol_name>( } } +fn leaf_index_that_added_node(position: BlockNumber) -> BlockNumber { + NodesUtils::leaf_index_that_added_node(position.into()) + .try_into() + .expect("Always its into a block number; qed") +} + /// The key of [`BlockRequestHandler::seen_requests`]. #[derive(Eq, PartialEq, Clone, Hash)] struct SeenRequestsKey { @@ -111,7 +124,10 @@ enum SeenRequestsValue { } /// Handler for incoming block requests from a remote peer. -pub struct MmrRequestHandler { +pub struct MmrRequestHandler +where + Block: BlockT, +{ request_receiver: async_channel::Receiver, /// Maps from request to number of times we have seen this request. /// @@ -120,17 +136,20 @@ pub struct MmrRequestHandler { offchain_db: OffchainDb, + client: Arc, + _phantom: PhantomData, } -impl MmrRequestHandler +impl MmrRequestHandler where - Block: BlockT, - + Block: BlockT, + Client: + HeaderBackend + BlockBackend + ProofProvider + Send + Sync + 'static, OS: OffchainStorage, { /// Create a new [`MmrRequestHandler`]. - pub fn new( + pub fn new( protocol_id: &ProtocolId, fork_id: Option<&str>, client: Arc, @@ -138,8 +157,7 @@ where offchain_storage: OS, ) -> (Self, NB::RequestResponseProtocolConfig) where - NB: NetworkBackend, - Client: BlockBackend + ProofProvider + Send + Sync + 'static, + NB: NetworkBackend::Hash>, { // Reserve enough request slots for one request per peer when we are at the maximum // number of peers. @@ -162,6 +180,7 @@ where ( Self { + client, request_receiver, seen_requests, offchain_db: OffchainDb::new(offchain_storage), @@ -228,17 +247,35 @@ where Err(()) } else { let mut mmr_data = BTreeMap::new(); - for block_number in - request.starting_position..(request.starting_position + request.limit) - { - let canon_key = get_offchain_key(block_number.into()); + for position in request.starting_position..(request.starting_position + request.limit) { + let canon_key = get_offchain_key(position.into()); let storage_value = self .offchain_db .local_storage_get(StorageKind::PERSISTENT, &canon_key); + let block_number = leaf_index_that_added_node(position); + trace!(%position, %block_number, "Storage data present: {}", storage_value.is_some()); + if let Some(storage_value) = storage_value { - mmr_data.insert(block_number, storage_value); + mmr_data.insert(position, storage_value); } else { + if let Ok(Some(hash)) = self.client.hash(block_number.into()) { + let temp_key = get_temp_key(position.into(), hash); + let storage_value = self + .offchain_db + .local_storage_get(StorageKind::PERSISTENT, &temp_key); + + if let Some(storage_value) = storage_value { + let data = decode_mmr_data(&storage_value); + trace!(%position, %block_number,"MMR node: {data:?}"); + mmr_data.insert(position, storage_value); + continue; + } else { + debug!(%position, %block_number, ?hash, "Didn't find value in storage.") + } + } else { + debug!(%position, %block_number, "Didn't find hash.") + } break; // No more storage values } } diff --git a/crates/subspace-service/src/mmr/request_handler/tests.rs b/crates/subspace-service/src/mmr/request_handler/tests.rs new file mode 100644 index 0000000000..94fe09e4b8 --- /dev/null +++ b/crates/subspace-service/src/mmr/request_handler/tests.rs @@ -0,0 +1,7 @@ +use subspace_core_primitives::BlockNumber; + +#[test] +fn leaf_index_that_added_node_fits_block_number() { + // Must not panic + super::leaf_index_that_added_node(BlockNumber::MAX); +} diff --git a/crates/subspace-service/src/mmr/sync.rs b/crates/subspace-service/src/mmr/sync.rs index 8e8f14fb46..be86d23008 100644 --- a/crates/subspace-service/src/mmr/sync.rs +++ b/crates/subspace-service/src/mmr/sync.rs @@ -1,24 +1,116 @@ -#![allow(dead_code)] // TODO: enable after the domain-sync implementation - use crate::mmr::get_offchain_key; use crate::mmr::request_handler::{generate_protocol_name, MmrRequest, MmrResponse, MAX_MMR_ITEMS}; use futures::channel::oneshot; use parity_scale_codec::{Decode, Encode}; use sc_network::{IfDisconnected, NetworkRequest, PeerId, RequestFailure}; use sc_network_sync::SyncingService; +use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_core::offchain::storage::OffchainDb; use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind}; +use sp_core::{Hasher, H256}; +use sp_mmr_primitives::mmr_lib::{MMRStoreReadOps, MMRStoreWriteOps}; use sp_mmr_primitives::utils::NodesUtils; -use sp_runtime::traits::Block as BlockT; +use sp_mmr_primitives::{mmr_lib, DataOrHash, MmrApi}; +use sp_runtime::traits::{Block as BlockT, Keccak256, NumberFor}; +use sp_subspace_mmr::MmrLeaf; +use std::cell::RefCell; use std::sync::Arc; use std::time::Duration; -use subspace_core_primitives::BlockNumber; +use subspace_core_primitives::{BlockHash, BlockNumber}; use tokio::time::sleep; use tracing::{debug, error, trace}; +type Node = DataOrHash; +type MmrLeafOf = MmrLeaf; +type NodeOf = Node; +type MmrOf = mmr_lib::MMR>; + +pub(crate) fn decode_mmr_data(mut data: &[u8]) -> mmr_lib::Result { + let node = match NodeOf::decode(&mut data) { + Ok(node) => node, + Err(err) => { + error!(?err, "Can't decode MMR data"); + + return Err(mmr_lib::Error::StoreError( + "Can't decode MMR data".to_string(), + )); + } + }; + + Ok(node) +} + +struct OffchainMmrStorage { + offchain_db: RefCell>, +} + +impl OffchainMmrStorage { + fn new(offchain_storage: OS) -> Self { + let offchain_db = OffchainDb::new(offchain_storage); + + Self { + offchain_db: RefCell::new(offchain_db), + } + } +} + +impl MMRStoreReadOps for OffchainMmrStorage { + fn get_elem(&self, pos: u64) -> mmr_lib::Result> { + let canon_key = get_offchain_key(pos); + let Some(data) = self + .offchain_db + .borrow_mut() + .local_storage_get(StorageKind::PERSISTENT, &canon_key) + else { + error!(%pos, "Can't get MMR data."); + + return Ok(None); + }; + + let node = decode_mmr_data(data.as_slice()); + + node.map(Some) + } +} + +impl MMRStoreWriteOps for OffchainMmrStorage { + fn append(&mut self, pos: u64, elems: Vec) -> mmr_lib::Result<()> { + let mut current_pos = pos; + for elem in elems { + let data = elem.encode(); + + let canon_key = get_offchain_key(current_pos); + self.offchain_db.borrow_mut().local_storage_set( + StorageKind::PERSISTENT, + &canon_key, + &data, + ); + + current_pos += 1; + } + + Ok(()) + } +} + +/// Default Merging & Hashing behavior for MMR. +pub struct MmrHasher; + +impl mmr_lib::Merge for MmrHasher { + type Item = NodeOf; + + fn merge(left: &Self::Item, right: &Self::Item) -> mmr_lib::Result { + let mut concat = left.hash().as_ref().to_vec(); + concat.extend_from_slice(right.hash().as_ref()); + + Ok(Node::Hash(Keccak256::hash(&concat))) + } +} + const SYNC_PAUSE: Duration = Duration::from_secs(5); +// TODO: Add support for MMR-sync reruns from non-zero starting point. /// Synchronize MMR-leafs from remote offchain storage of the synced peer. pub async fn mmr_sync( fork_id: Option, @@ -26,48 +118,22 @@ pub async fn mmr_sync( network_service: NR, sync_service: Arc>, offchain_storage: OS, - target_block: Option, + target_block: BlockNumber, ) -> Result<(), sp_blockchain::Error> where Block: BlockT, NR: NetworkRequest, - Client: HeaderBackend, + Client: ProvideRuntimeApi + HeaderBackend, + Client::Api: MmrApi>, OS: OffchainStorage, { debug!("MMR sync started."); let info = client.info(); let protocol_name = generate_protocol_name(info.genesis_hash, fork_id.as_deref()); - let mut offchain_db = OffchainDb::new(offchain_storage); - - // Look for existing local MMR-nodes - let mut starting_position = { - let mut starting_position: Option = None; - for position in 0..=u32::MAX { - let canon_key = get_offchain_key(position.into()); - if offchain_db - .local_storage_get(StorageKind::PERSISTENT, &canon_key) - .is_none() - { - starting_position = Some(position); - break; - } - } - - match starting_position { - None => { - error!("Can't get starting MMR position - MMR storage is corrupted."); - return Err(sp_blockchain::Error::Application( - "Can't get starting MMR position - MMR storage is corrupted.".into(), - )); - } - Some(last_processed_position) => { - debug!("MMR-sync last processed position: {last_processed_position}"); - - last_processed_position - } - } - }; + let mut mmr = MmrOf::new(0, OffchainMmrStorage::new(offchain_storage)); + let mut leaves_number = 0u32; + let mut starting_position = 0; 'outer: loop { let peers_info = match sync_service.peers_info().await { @@ -93,25 +159,13 @@ where // Request MMR until target block reached. loop { let target_position = { - let best_block = if let Some(target_block) = target_block { - target_block - } else { - let best_block: u32 = peer_info.best_number.try_into().map_err(|_| { - sp_blockchain::Error::Application( - "Can't convert best block from peer info.".into(), - ) - })?; - - best_block - }; - - let nodes = NodesUtils::new(best_block.into()); + let nodes = NodesUtils::new(target_block.into()); let target_position = nodes.size().saturating_sub(1); debug!( - "MMR-sync. Best block={}, Node target position={}", - best_block, target_position + "MMR-sync. Target block={}, Node target position={}", + target_block, target_position ); target_position @@ -137,18 +191,46 @@ where // Save the MMR-nodes from response to the local storage 'data: for (position, data) in response.mmr_data.iter() { // Ensure continuous sync - if *position == starting_position { - let canon_key = get_offchain_key((*position).into()); - offchain_db.local_storage_set( - StorageKind::PERSISTENT, - &canon_key, - data, + if *position != starting_position { + debug!( + ?peer_info, + %starting_position, + %position, + "MMR sync error: incorrect starting position." ); - starting_position += 1; - } else { - debug!("MMR-sync gap detected={peer_id}, position={position}",); - break 'data; // We don't support gaps in MMR data + continue 'peers; + } + + let node = decode_mmr_data(data); + + let node = match node { + Ok(node) => node, + Err(err) => { + debug!(?peer_info, ?err, %position, "Can't decode MMR data received from the peer."); + + continue 'peers; + } + }; + + if matches!(node, Node::Data(_)) { + if let Err(err) = mmr.push(node) { + debug!(?peer_info, ?err, %position, "Can't add MMR data received from the peer."); + + return Err(sp_blockchain::Error::Backend( + "Can't add MMR data to the MMR storage".to_string(), + )); + } + + leaves_number += 1; + } + + starting_position += 1; + + // Did we collect all the necessary data from the last response? + if u64::from(*position) >= target_position { + debug!(%target_position, "MMR-sync: target position reached."); + break 'data; } } } @@ -159,10 +241,26 @@ where } } - // Actual MMR-nodes may exceed this number, however, we will catch up with the rest - // when we sync the remaining data (consensus and domain chains). + // Should we request a new portion of the data from the last peer? if target_position <= starting_position.into() { + if let Err(err) = mmr.commit() { + error!(?err, "MMR commit failed."); + + return Err(sp_blockchain::Error::Application( + "Failed to commit MMR data.".into(), + )); + } + + // Actual MMR-nodes may exceed this number, however, we will catch up with the rest + // when we sync the remaining data (consensus and domain chains). debug!("Target position reached: {target_position}"); + + if !verify_mmr_data(client, &mmr, leaves_number) { + return Err(sp_blockchain::Error::Application( + "MMR data verification failed.".into(), + )); + } + break 'outer; } } @@ -176,6 +274,50 @@ where Ok(()) } +fn verify_mmr_data( + client: Arc, + mmr: &MmrOf, + leaves_number: u32, +) -> bool +where + Block: BlockT, + OS: OffchainStorage, + Client: HeaderBackend + ProvideRuntimeApi, + Client::Api: MmrApi>, +{ + debug!("Verifying MMR data..."); + + let block_number = leaves_number; + let Ok(Some(hash)) = client.hash(block_number.into()) else { + error!(%leaves_number, %block_number, "MMR data verification: error during hash acquisition"); + return false; + }; + + let mmr_root = mmr.get_root(); + trace!("MMR root: {:?}", mmr_root); + let api_root = client.runtime_api().mmr_root(hash); + trace!("API root: {:?}", api_root); + + let Ok(Node::Hash(mmr_root_hash)) = mmr_root.clone() else { + error!(%leaves_number, %block_number, ?mmr_root, "Can't get MMR root from local storage."); + return false; + }; + + let Ok(Ok(api_root_hash)) = api_root else { + error!(%leaves_number, %block_number, ?mmr_root, "Can't get MMR root from API."); + return false; + }; + + if api_root_hash != mmr_root_hash { + error!(?api_root_hash, ?mmr_root_hash, "MMR data hashes differ."); + return false; + } + + debug!("MMR data verified"); + + true +} + /// MMR-sync error #[derive(Debug, thiserror::Error)] pub enum MmrResponseError { diff --git a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs index 4ae68c9678..bdc8bdc95a 100644 --- a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs +++ b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs @@ -1,3 +1,4 @@ +use crate::mmr::sync::mmr_sync; use crate::sync_from_dsn::import_blocks::download_and_reconstruct_blocks; use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader; use crate::sync_from_dsn::snap_sync_engine::SnapSyncingEngine; @@ -9,13 +10,17 @@ use sc_consensus::{ StorageChanges, }; use sc_consensus_subspace::archiver::{decode_block, SegmentHeadersStore}; -use sc_network::{NetworkBlock, PeerId}; +use sc_network::service::traits::NetworkService; +use sc_network::{NetworkBlock, NetworkRequest, PeerId}; use sc_network_sync::service::network::NetworkServiceHandle; use sc_network_sync::SyncingService; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_consensus::BlockOrigin; use sp_consensus_subspace::SubspaceApi; +use sp_core::offchain::OffchainStorage; +use sp_core::H256; +use sp_mmr_primitives::MmrApi; use sp_objects::ObjectsApi; use sp_runtime::traits::{Block as BlockT, Header, NumberFor}; use std::collections::{HashSet, VecDeque}; @@ -60,7 +65,7 @@ impl From for Error { /// Run a snap sync, return an error if snap sync is impossible and user intervention is required. /// Otherwise, just log the error and return `Ok(())` so that regular sync continues. #[allow(clippy::too_many_arguments)] -pub(crate) async fn snap_sync( +pub(crate) async fn snap_sync( segment_headers_store: SegmentHeadersStore, node: Node, fork_id: Option, @@ -71,6 +76,8 @@ pub(crate) async fn snap_sync( sync_service: Arc>, network_service_handle: NetworkServiceHandle, erasure_coding: ErasureCoding, + offchain_storage: Option, + network_service: Arc, ) -> Result<(), Error> where Block: BlockT, @@ -82,8 +89,10 @@ where + Send + Sync + 'static, - Client::Api: SubspaceApi + ObjectsApi, + Client::Api: + SubspaceApi + ObjectsApi + MmrApi>, PG: DsnSyncPieceGetter, + OS: OffchainStorage, { let info = client.info(); // Only attempt snap sync with genesis state @@ -99,10 +108,12 @@ where fork_id.as_deref(), &client, import_queue_service.as_mut(), - &sync_service, + sync_service.clone(), &network_service_handle, None, &erasure_coding, + offchain_storage, + network_service, ) .await?; @@ -265,17 +276,19 @@ where #[allow(clippy::too_many_arguments)] /// Synchronize the blockchain to the target_block (approximate value based on the containing /// segment) or to the last archived block. -async fn sync( +async fn sync( segment_headers_store: &SegmentHeadersStore, node: &Node, piece_getter: &PG, fork_id: Option<&str>, client: &Arc, import_queue_service: &mut IQS, - sync_service: &SyncingService, + sync_service: Arc>, network_service_handle: &NetworkServiceHandle, target_block: Option, erasure_coding: &ErasureCoding, + offchain_storage: Option, + network_request: NR, ) -> Result<(), Error> where PG: DsnSyncPieceGetter, @@ -288,8 +301,11 @@ where + Send + Sync + 'static, - Client::Api: SubspaceApi + ObjectsApi, + Client::Api: + SubspaceApi + ObjectsApi + MmrApi>, IQS: ImportQueueService + ?Sized, + OS: OffchainStorage, + NR: NetworkRequest + Sync + Send, { debug!("Starting snap sync..."); @@ -344,7 +360,7 @@ where &header, client, fork_id, - sync_service, + &sync_service, network_service_handle, ) .await @@ -400,6 +416,24 @@ where // TODO: Replace this hack with actual watching of block import wait_for_block_import(client.as_ref(), last_block_number.into()).await; + let mmr_target_block = if let Some(target_block) = target_block { + target_block + } else { + last_block_number + }; + + if let Some(offchain_storage) = offchain_storage { + mmr_sync( + fork_id.map(|v| v.into()), + client.clone(), + network_request, + sync_service.clone(), + offchain_storage, + mmr_target_block, + ) + .await?; + } + debug!(info = ?client.info(), "Snap sync finished successfully"); Ok(())