Skip to content

Commit

Permalink
fix(sync): archival node should request old blocks from archival nodes (
Browse files Browse the repository at this point in the history
#3369)

Currently when an archival node syncs old blocks, they will still try to request them from regular nodes and this causes requests to be wasted and also, if the archival peers are close to the end of `highest_height_peers`, it could potentially cause the node to get stuck in syncing.

Fixes #3365 

Test plan
---------
`test_block_sync_archival`
  • Loading branch information
bowenwang1996 authored Sep 26, 2020
1 parent 48cc4f6 commit 523eb4f
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 17 deletions.
1 change: 1 addition & 0 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ impl ClientActor {
if (head.height < block.header().height()
|| &head.epoch_id == block.header().epoch_id())
&& provenance == Provenance::NONE
&& !self.client.sync_status.is_syncing()
{
self.client.rebroadcast_block(block.clone());
}
Expand Down
100 changes: 83 additions & 17 deletions chain/client/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,23 @@ fn get_locator_heights(height: u64) -> Vec<u64> {
heights
}

#[derive(Clone, Debug)]
struct BlockHashAndHeight {
hash: CryptoHash,
height: BlockHeight,
}

impl BlockHashAndHeight {
pub fn new(hash: CryptoHash, height: BlockHeight) -> Self {
BlockHashAndHeight { hash, height }
}
}

/// Cache for block sync that stores the hashes to request in insertion order.
/// It also maintains the last final block header to minimize the impact of reorgs.
#[derive(Default)]
struct BlockSyncCache {
hashes: LinkedHashMap<CryptoHash, ()>,
hashes: LinkedHashMap<CryptoHash, BlockHeight>,
last_header: Option<BlockHeader>,
}

Expand All @@ -337,8 +349,8 @@ impl BlockSyncCache {
self.hashes.len()
}

fn insert(&mut self, block_hash: CryptoHash) {
self.hashes.insert(block_hash, ());
fn insert(&mut self, block_hash: CryptoHash, block_height: BlockHeight) {
self.hashes.insert(block_hash, block_height);
}

fn remove(&mut self, block_hash: &CryptoHash) {
Expand Down Expand Up @@ -432,13 +444,13 @@ impl BlockSync {

fn compute_hashes_to_request(
&mut self,
new_hashes: &[CryptoHash],
new_blocks: &[BlockHashAndHeight],
chain: &mut Chain,
block_count: usize,
) -> Vec<CryptoHash> {
) -> Vec<BlockHashAndHeight> {
let mut res = vec![];
let mut hashes_to_remove = vec![];
for (block_hash, _) in self.cache.hashes.iter() {
for (block_hash, block_height) in self.cache.hashes.iter() {
if res.len() >= block_count {
break;
}
Expand All @@ -451,14 +463,16 @@ impl BlockSync {
if block_exists || chain.is_orphan(block_hash) || chain.is_chunk_orphan(block_hash) {
continue;
} else {
res.push(*block_hash);
res.push(BlockHashAndHeight::new(*block_hash, *block_height));
}
}
for hash in hashes_to_remove {
self.cache.remove(&hash);
}
for hash in new_hashes.iter().rev().take(block_count.saturating_sub(res.len())) {
res.push(*hash);
for block_hash_and_height in
new_blocks.iter().rev().take(block_count.saturating_sub(res.len()))
{
res.push(block_hash_and_height.clone());
}
res
}
Expand All @@ -468,7 +482,7 @@ impl BlockSync {
&mut self,
chain: &mut Chain,
block_count: usize,
) -> Result<Vec<CryptoHash>, near_chain::Error> {
) -> Result<Vec<BlockHashAndHeight>, near_chain::Error> {
let last_header = match self.cache.last_header {
Some(ref h) => h.clone(),
None => chain.head_header()?.clone(),
Expand Down Expand Up @@ -496,14 +510,14 @@ impl BlockSync {
}
}

hashes_to_request.push(*header.hash());
hashes_to_request.push(BlockHashAndHeight::new(*header.hash(), header.height()));
current = chain.get_previous_header(&header).map(|h| h.clone());
}
let res = self.compute_hashes_to_request(&hashes_to_request, chain, block_count);

self.cache.hashes.reserve(hashes_to_request.len());
for hash in hashes_to_request.into_iter().rev() {
self.cache.insert(hash);
for block_hash_and_height in hashes_to_request.into_iter().rev() {
self.cache.insert(block_hash_and_height.hash, block_hash_and_height.height);
}
self.cache.last_header =
chain.get_block_header(&header_head.last_block_hash).map(|h| h.clone()).ok();
Expand Down Expand Up @@ -538,11 +552,22 @@ impl BlockSync {
self.blocks_requested = 0;
self.receive_timeout = Utc::now() + Duration::seconds(BLOCK_REQUEST_TIMEOUT);

let mut peers_iter = highest_height_peers.iter().cycle();
for hash in hashes_to_request {
if let Some(peer) = peers_iter.next() {
let gc_stop_height =
chain.runtime_adapter.get_gc_stop_height(&header_head.last_block_hash);
let mut archival_peer_iter =
highest_height_peers.iter().filter(|p| p.chain_info.archival).cycle();
let mut peer_iter = highest_height_peers.iter().cycle();
for block_hash_and_height in hashes_to_request {
let request_from_archival =
self.archive && block_hash_and_height.height < gc_stop_height;
let peer = if request_from_archival {
archival_peer_iter.next()
} else {
peer_iter.next()
};
if let Some(peer) = peer {
self.network_adapter.do_send(NetworkRequests::BlockRequest {
hash: hash.clone(),
hash: block_hash_and_height.hash,
peer_id: peer.peer_info.id.clone(),
});
self.blocks_requested += 1;
Expand Down Expand Up @@ -1557,4 +1582,45 @@ mod test {
assert_eq!(block_sync.cache.len(), expected_block_hashes.len());
assert_eq!(block_sync.cache.last_header, Some(fork_block.header().clone()));
}

#[test]
fn test_block_sync_archival() {
let network_adapter = Arc::new(MockNetworkAdapter::default());
let block_fetch_horizon = 10;
let mut block_sync = BlockSync::new(network_adapter.clone(), block_fetch_horizon, true);
let mut chain_genesis = ChainGenesis::test();
chain_genesis.epoch_length = 5;
let mut env = TestEnv::new(chain_genesis, 2, 1);
let mut blocks = vec![];
for i in 1..31 {
let block = env.clients[0].produce_block(i).unwrap().unwrap();
blocks.push(block.clone());
env.process_block(0, block, Provenance::PRODUCED);
}
let block_headers = blocks.iter().map(|b| b.header().clone()).collect::<Vec<_>>();
let peer_infos = create_peer_infos(2);
env.clients[1].chain.sync_block_headers(block_headers, |_| unreachable!()).unwrap();
let is_state_sync = block_sync.block_sync(&mut env.clients[1].chain, &peer_infos).unwrap();
assert!(!is_state_sync);
let requested_block_hashes = collect_hashes_from_network_adapter(network_adapter.clone());
assert_eq!(
requested_block_hashes,
blocks[4..].iter().map(|b| *b.hash()).collect::<HashSet<_>>()
);
let last_block_header = blocks.last().unwrap().header().clone();
assert_eq!(block_sync.cache.len() as u64, last_block_header.height());
assert_eq!(block_sync.cache.last_header, Some(last_block_header.clone()));

let mut peer_infos = create_peer_infos(2);
for peer in peer_infos.iter_mut() {
peer.chain_info.archival = true;
}
let is_state_sync = block_sync.block_sync(&mut env.clients[1].chain, &peer_infos).unwrap();
assert!(!is_state_sync);
let requested_block_hashes = collect_hashes_from_network_adapter(network_adapter.clone());
assert_eq!(
requested_block_hashes,
blocks.iter().map(|b| *b.hash()).collect::<HashSet<_>>()
);
}
}

0 comments on commit 523eb4f

Please sign in to comment.