diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index 5d819cc6c9..ed9f7594bf 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -49,7 +49,6 @@ #[cfg(test)] mod tests; -use crate::block_import::BlockImportingNotification; use crate::slot_worker::SubspaceSyncOracle; use crate::{SubspaceLink, SubspaceNotificationSender}; use codec::{Decode, Encode}; @@ -68,8 +67,8 @@ use sp_consensus::SyncOracle; use sp_consensus_subspace::{FarmerPublicKey, SubspaceApi, SubspaceJustification}; use sp_objects::ObjectsApi; use sp_runtime::generic::SignedBlock; -use sp_runtime::traits::{Block as BlockT, CheckedSub, Header, NumberFor, Zero}; -use sp_runtime::{Justifications, Saturating}; +use sp_runtime::traits::{Block as BlockT, CheckedSub, Header, NumberFor, One, Zero}; +use sp_runtime::Justifications; use std::error::Error; use std::future::Future; use std::slice; @@ -529,7 +528,10 @@ where AS: AuxStore, { let client_info = client.info(); - let best_block_number = client_info.best_number; + let best_block_number = TryInto::::try_into(client_info.best_number) + .unwrap_or_else(|_| { + unreachable!("Block number fits into block number; qed"); + }); let best_block_hash = client_info.best_hash; let confirmation_depth_k = client @@ -537,11 +539,19 @@ where .chain_constants(best_block_hash)? .confirmation_depth_k(); - let maybe_last_archived_block = find_last_archived_block( - client, - segment_headers_store, - best_block_number.saturating_sub(confirmation_depth_k.into()), - )?; + let mut best_block_to_archive = best_block_number.saturating_sub(confirmation_depth_k); + if (best_block_to_archive..best_block_number) + .any(|block_number| client.hash(block_number.into()).ok().flatten().is_none()) + { + // If there are blocks missing blocks between best block to archive and best block of the + // blockchain it means newer block was inserted in some special way and as such is by + // definition valid, so we can simply assume that is our best block to archive instead + best_block_to_archive = best_block_number; + } + + let maybe_last_archived_block = + find_last_archived_block(client, segment_headers_store, best_block_to_archive.into())?; + let have_last_segment_header = maybe_last_archived_block.is_some(); let mut best_archived_block = None; @@ -582,29 +592,23 @@ where let mut older_archived_segments = Vec::new(); - // Process blocks since last fully archived block (or genesis) up to the current head minus K + // Process blocks since last fully archived block up to the current head minus K { let blocks_to_archive_from = archiver .last_archived_block_number() .map(|n| n + 1) .unwrap_or_default(); - let blocks_to_archive_to = - TryInto::::try_into(best_block_number) - .unwrap_or_else(|_| { - panic!( - "Best block number {best_block_number} can't be converted into BlockNumber", - ); - }) - .checked_sub(confirmation_depth_k) - .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from) - .or({ - if have_last_segment_header { - None - } else { - // If not continuation, archive genesis block - Some(0) - } - }); + let blocks_to_archive_to = best_block_number + .checked_sub(confirmation_depth_k) + .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from) + .or({ + if have_last_segment_header { + None + } else { + // If not continuation, archive genesis block + Some(0) + } + }); if let Some(blocks_to_archive_to) = blocks_to_archive_to { info!( @@ -808,8 +812,9 @@ where confirmation_depth_k, mut archiver, older_archived_segments, - best_archived_block: (mut best_archived_block_hash, mut best_archived_block_number), + best_archived_block, } = archiver; + let (mut best_archived_block_hash, mut best_archived_block_number) = best_archived_block; let archived_segment_notification_sender = subspace_link.archived_segment_notification_sender.clone(); @@ -823,133 +828,198 @@ where .await; } - while let Some(BlockImportingNotification { - block_number, - // Just to be very explicit that block import shouldn't continue until archiving - // is over - acknowledgement_sender: _acknowledgement_sender, - .. - }) = block_importing_notification_stream.next().await + while let Some(ref block_import_notification) = + block_importing_notification_stream.next().await { - let block_number_to_archive = - match block_number.checked_sub(&confirmation_depth_k.into()) { - Some(block_number_to_archive) => block_number_to_archive, - None => { - continue; - } - }; + let block_number_to_archive = match block_import_notification + .block_number + .checked_sub(&confirmation_depth_k.into()) + { + Some(block_number_to_archive) => block_number_to_archive, + None => { + // Too early to archive blocks + continue; + } + }; if best_archived_block_number >= block_number_to_archive { // This block was already archived, skip continue; } - best_archived_block_number = block_number_to_archive; - - let block = client - .block( - client - .hash(block_number_to_archive)? - .expect("Older block by number must always exist"), - )? - .expect("Older block by number must always exist"); - - let parent_block_hash = *block.block.header().parent_hash(); - let block_hash_to_archive = block.block.hash(); - - debug!( - "Archiving block {:?} ({})", - block_number_to_archive, block_hash_to_archive - ); - - if parent_block_hash != best_archived_block_hash { - let error = format!( - "Attempt to switch to a different fork beyond archiving depth, \ - can't do it: parent block hash {}, best archived block hash {}", - parent_block_hash, best_archived_block_hash - ); - return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other( - error.into(), - ))); + // In case there was a block gap re-initialize archiver and continue with current + // block number (rather than block number at some depth) to allow for special sync + // modes where pre-verified blocks are inserted at some point in the future comparing to + // previously existing blocks + if best_archived_block_number + One::one() != block_number_to_archive { + InitializedArchiver { + confirmation_depth_k: _, + archiver, + older_archived_segments: _, + best_archived_block: (best_archived_block_hash, best_archived_block_number), + } = initialize_archiver(&segment_headers_store, &subspace_link, client.as_ref())?; + + if best_archived_block_number + One::one() == block_number_to_archive { + // As expected, can continue now + } else if best_archived_block_number >= block_number_to_archive { + // Special sync mode where verified blocks were inserted into blockchain + // directly, archiving of this block will naturally happen later + continue; + } else { + let error = format!( + "There was a gap in blockchain history and the last contiguous series of \ + blocks starting with doesn't start with archived segment (best archived \ + block number {best_archived_block_number}, block number to archive \ + {block_number_to_archive}), block about to be imported {}), archiver can't \ + continue", + block_import_notification.block_number + ); + return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other( + error.into(), + ))); + } } - best_archived_block_hash = block_hash_to_archive; - - let block_object_mappings = client - .runtime_api() - .validated_object_call_hashes(block_hash_to_archive) - .and_then(|calls| { - client.runtime_api().extract_block_object_mapping( - parent_block_hash, - block.block.clone(), - calls, - ) - }) - .map_err(|error| { - sp_blockchain::Error::Application( - format!("Failed to retrieve block object mappings: {error}").into(), - ) - })?; - - let encoded_block = encode_block(block); - debug!( - "Encoded block {} has size of {:.2} kiB", + (best_archived_block_hash, best_archived_block_number) = archive_block( + &mut archiver, + segment_headers_store.clone(), + client.clone(), + &sync_oracle, + telemetry.clone(), + archived_segment_notification_sender.clone(), + best_archived_block_hash, block_number_to_archive, - encoded_block.len() as f32 / 1024.0 - ); + ) + .await?; + } - let mut new_segment_headers = Vec::new(); - for archived_segment in archiver.add_block( - encoded_block, - block_object_mappings, - !sync_oracle.is_major_syncing(), - ) { - let segment_header = archived_segment.segment_header; + Ok(()) + }) +} - segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?; +/// Tries to archive `block_number` and returns new (or old if not changed) best archived block +#[allow(clippy::too_many_arguments)] +async fn archive_block( + archiver: &mut Archiver, + segment_headers_store: SegmentHeadersStore, + client: Arc, + sync_oracle: &SubspaceSyncOracle, + telemetry: Option, + archived_segment_notification_sender: SubspaceNotificationSender, + best_archived_block_hash: Block::Hash, + block_number_to_archive: NumberFor, +) -> sp_blockchain::Result<(Block::Hash, NumberFor)> +where + Block: BlockT, + Backend: BackendT, + Client: ProvideRuntimeApi + + BlockBackend + + HeaderBackend + + LockImportRun + + Finalizer + + AuxStore + + Send + + Sync + + 'static, + Client::Api: SubspaceApi + ObjectsApi, + AS: AuxStore + Send + Sync + 'static, + SO: SyncOracle + Send + Sync + 'static, +{ + let block = client + .block( + client + .hash(block_number_to_archive)? + .expect("Older block by number must always exist"), + )? + .expect("Older block by number must always exist"); + + let parent_block_hash = *block.block.header().parent_hash(); + let block_hash_to_archive = block.block.hash(); + + debug!( + "Archiving block {:?} ({})", + block_number_to_archive, block_hash_to_archive + ); + + if parent_block_hash != best_archived_block_hash { + let error = format!( + "Attempt to switch to a different fork beyond archiving depth, \ + can't do it: parent block hash {}, best archived block hash {}", + parent_block_hash, best_archived_block_hash + ); + return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other( + error.into(), + ))); + } - send_archived_segment_notification( - &archived_segment_notification_sender, - archived_segment, - ) - .await; + let block_object_mappings = client + .runtime_api() + .validated_object_call_hashes(block_hash_to_archive) + .and_then(|calls| { + client.runtime_api().extract_block_object_mapping( + parent_block_hash, + block.block.clone(), + calls, + ) + }) + .map_err(|error| { + sp_blockchain::Error::Application( + format!("Failed to retrieve block object mappings: {error}").into(), + ) + })?; + + let encoded_block = encode_block(block); + debug!( + "Encoded block {} has size of {:.2} kiB", + block_number_to_archive, + encoded_block.len() as f32 / 1024.0 + ); + + let mut new_segment_headers = Vec::new(); + for archived_segment in archiver.add_block( + encoded_block, + block_object_mappings, + !sync_oracle.is_major_syncing(), + ) { + let segment_header = archived_segment.segment_header; + + segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?; + + send_archived_segment_notification(&archived_segment_notification_sender, archived_segment) + .await; - new_segment_headers.push(segment_header); - } + new_segment_headers.push(segment_header); + } - if !new_segment_headers.is_empty() { - let maybe_block_number_to_finalize = segment_headers_store - .max_segment_index() - // Skip last `FINALIZATION_DEPTH_IN_SEGMENTS` archived segments - .and_then(|max_segment_index| { - max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS) - }) - .and_then(|segment_index| { - segment_headers_store.get_segment_header(segment_index) - }) - .map(|segment_header| segment_header.last_archived_block().number) - // Make sure not to finalize block number that does not yet exist (segment - // headers store may contain future blocks during initial sync) - .map(|block_number| best_archived_block_number.min(block_number.into())) - // Do not finalize blocks twice - .filter(|block_number| *block_number > client.info().finalized_number); - - if let Some(block_number_to_finalize) = maybe_block_number_to_finalize { - let block_hash_to_finalize = client - .hash(block_number_to_finalize)? - .expect("Block about to be finalized must always exist"); - finalize_block( - client.as_ref(), - telemetry.clone(), - block_hash_to_finalize, - block_number_to_finalize, - ); - } - } + if !new_segment_headers.is_empty() { + let maybe_block_number_to_finalize = segment_headers_store + .max_segment_index() + // Skip last `FINALIZATION_DEPTH_IN_SEGMENTS` archived segments + .and_then(|max_segment_index| { + max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS) + }) + .and_then(|segment_index| segment_headers_store.get_segment_header(segment_index)) + .map(|segment_header| segment_header.last_archived_block().number) + // Make sure not to finalize block number that does not yet exist (segment + // headers store may contain future blocks during initial sync) + .map(|block_number| block_number_to_archive.min(block_number.into())) + // Do not finalize blocks twice + .filter(|block_number| *block_number > client.info().finalized_number); + + if let Some(block_number_to_finalize) = maybe_block_number_to_finalize { + let block_hash_to_finalize = client + .hash(block_number_to_finalize)? + .expect("Block about to be finalized must always exist"); + finalize_block( + client.as_ref(), + telemetry.clone(), + block_hash_to_finalize, + block_number_to_finalize, + ); } + } - Ok(()) - }) + Ok((block_hash_to_archive, block_number_to_archive)) } async fn send_archived_segment_notification(