Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Longarithm committed Jan 24, 2025
1 parent 1c4c876 commit 05f86fb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 35 deletions.
39 changes: 19 additions & 20 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1588,11 +1588,11 @@ impl Chain {
block_timestamp: block_context.block_timestamp,
random_seed: block_context.random_seed,
};
let chunk_application_key =
Self::get_chunk_application_cache_key(&block_key_source, &chunks, shard_id)?;
let cached_shard_update_key =
Self::get_cached_shard_update_key(&block_key_source, &chunks, shard_id)?;
let job = self.get_update_shard_job(
me,
chunk_application_key,
cached_shard_update_key,
block_context,
&chunks,
shard_index,
Expand Down Expand Up @@ -1678,11 +1678,13 @@ impl Chain {

let prev_block_hash = optimistic_block.prev_block_hash();
let block_height = optimistic_block.height();
for (shard_id, chunk_application_key, apply_result) in apply_result.into_iter()
for (shard_id, cached_shard_update_key, apply_result) in
apply_result.into_iter()
{
match apply_result {
Ok(result) => {
self.apply_chunk_results_cache.push(chunk_application_key, result);
self.apply_chunk_results_cache
.push(cached_shard_update_key, result);
}
Err(e) => {
warn!(
Expand All @@ -1692,7 +1694,7 @@ impl Chain {
?prev_block_hash,
block_height,
?optimistic_block_hash,
?chunk_application_key,
?cached_shard_update_key,
"Error applying chunk for OptimisticBlock"
);
}
Expand Down Expand Up @@ -2066,7 +2068,6 @@ impl Chain {
self.apply_chunks_spawner.spawn("apply_chunks", move || {
// do_apply_chunks runs `work` in parallel, but still waits for all of them to finish
let res = do_apply_chunks(block.clone(), block_height, work);

// If we encounter error here, that means the receiver is deallocated and the client
// thread is already shut down. The node is already crashed, so we can unwrap here
sc.send((block, res)).unwrap();
Expand Down Expand Up @@ -3859,11 +3860,11 @@ impl Chain {
block_timestamp: block_context.block_timestamp,
random_seed: block_context.random_seed,
};
let chunk_application_key =
Self::get_chunk_application_cache_key(&block_key_source, chunk_headers, shard_id)?;
let cached_shard_update_key =
Self::get_cached_shard_update_key(&block_key_source, chunk_headers, shard_id)?;
let job = self.get_update_shard_job(
me,
chunk_application_key,
cached_shard_update_key,
block_context,
chunk_headers,
shard_index,
Expand Down Expand Up @@ -3936,20 +3937,18 @@ impl Chain {
Ok(ShardContext { shard_uid, should_apply_chunk })
}

fn get_chunk_application_cache_key(
fn get_cached_shard_update_key(
block: &OptimisticBlockKeySource,
chunk_headers: &Chunks,
shard_id: ShardId,
) -> Result<CryptoHash, Error> {
const BYTES_LEN: usize =
size_of::<CryptoHash>() + size_of::<CryptoHash>() + size_of::<u64>();
let mut bytes: Vec<u8> = Vec::with_capacity(BYTES_LEN);

let mut bytes: Vec<u8> = Vec::with_capacity(BYTES_LEN);
bytes.extend_from_slice(&hash(&borsh::to_vec(&block)?).0);

let chunks_key_source: Vec<_> = chunk_headers.iter_raw().map(|c| c.chunk_hash()).collect();
bytes.extend_from_slice(&hash(&borsh::to_vec(&chunks_key_source)?).0);

bytes.extend_from_slice(&shard_id.to_le_bytes());

Ok(hash(&bytes))
Expand All @@ -3959,7 +3958,7 @@ impl Chain {
fn get_update_shard_job(
&self,
me: &Option<AccountId>,
chunk_application_key: CryptoHash,
cached_shard_update_key: CryptoHash,
block: ApplyChunkBlockContext,
chunk_headers: &Chunks,
shard_index: ShardIndex,
Expand All @@ -3984,11 +3983,11 @@ impl Chain {
let block_height = block.height;
let is_new_chunk = chunk_header.is_new_chunk(block_height);

if let Some(result) = self.apply_chunk_results_cache.peek(&chunk_application_key) {
if let Some(result) = self.apply_chunk_results_cache.peek(&cached_shard_update_key) {
let result = result.clone();
return Ok(Some((
shard_id,
chunk_application_key,
cached_shard_update_key,
Box::new(move |_| -> Result<ShardUpdateResult, Error> { Ok(result) }),
)));
}
Expand Down Expand Up @@ -4084,7 +4083,7 @@ impl Chain {
let runtime = self.runtime_adapter.clone();
Ok(Some((
shard_id,
chunk_application_key,
cached_shard_update_key,
Box::new(move |parent_span| -> Result<ShardUpdateResult, Error> {
Ok(process_shard_update(
parent_span,
Expand Down Expand Up @@ -4763,10 +4762,10 @@ pub fn do_apply_chunks(
let parent_span =
tracing::debug_span!(target: "chain", "do_apply_chunks", block_height, ?block).entered();
work.into_par_iter()
.map(|(shard_id, chunk_application_key, task)| {
.map(|(shard_id, cached_shard_update_key, task)| {
// As chunks can be processed in parallel, make sure they are all tracked as children of
// a single span.
(shard_id, chunk_application_key, task(&parent_span))
(shard_id, cached_shard_update_key, task(&parent_span))
})
.collect()
}
Expand Down
15 changes: 0 additions & 15 deletions core/primitives/src/optimistic_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,3 @@ pub enum BlockToApply {
Normal(CryptoHash),
Optimistic(CryptoHash),
}

// impl Debug for ChunkApplicationKey {
// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
// match self {
// ChunkApplicationKey::Full(hash) => write!(f, "Full({})", hash),
// ChunkApplicationKey::Optimistic((prev_block_hash, optimistic_block_hash)) => {
// write!(
// f,
// "Optimistic(prev_block_hash={}, optimistic_block_hash={})",
// prev_block_hash, optimistic_block_hash
// )
// }
// }
// }
// }

0 comments on commit 05f86fb

Please sign in to comment.