Skip to content

Commit

Permalink
[storage] fix the replay-verify stuck
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Nov 21, 2023
1 parent 55c5395 commit 30a54ae
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 47 deletions.
5 changes: 4 additions & 1 deletion storage/backup/backup-cli/src/coordinators/replay_verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ impl ReplayVerifyCoordinator {
);
}

// Once it begins replay, we want to directly start from the version that failed
let save_start_version = (next_txn_version > 0).then_some(next_txn_version);

next_txn_version = std::cmp::max(next_txn_version, snapshot_version.map_or(0, |v| v + 1));

let transactions = metadata_view.select_transaction_backups(
Expand Down Expand Up @@ -185,7 +188,7 @@ impl ReplayVerifyCoordinator {
.into_iter()
.map(|t| t.manifest)
.collect::<Vec<_>>(),
None,
save_start_version,
Some((next_txn_version, false)), /* replay_from_version */
None, /* epoch_history */
self.verify_execution_mode.clone(),
Expand Down
75 changes: 31 additions & 44 deletions storage/db-tool/src/replay_verify.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use anyhow::{bail, Result};
use anyhow::Result;
use aptos_backup_cli::{
coordinators::replay_verify::{ReplayError, ReplayVerifyCoordinator},
metadata::cache::MetadataCacheOpt,
Expand All @@ -17,7 +17,7 @@ use aptos_executor_types::VerifyExecutionMode;
use aptos_logger::info;
use aptos_types::transaction::Version;
use clap::Parser;
use std::{path::PathBuf, sync::Arc};
use std::{path::PathBuf, process, sync::Arc};

/// Read the backup files, replay them and verify the modules
#[derive(Parser)]
Expand Down Expand Up @@ -59,60 +59,47 @@ pub struct Opt {
lazy_quit: bool,
}

const RETRY_ATTEMPT: u8 = 5;

impl Opt {
pub async fn run(self) -> Result<()> {
let restore_handler = Arc::new(AptosDB::open_kv_only(
StorageDirPaths::from_path(self.db_dir),
StorageDirPaths::from_path(self.db_dir.clone()),
false, /* read_only */
NO_OP_STORAGE_PRUNER_CONFIG, /* pruner config */
self.rocksdb_opt.into(),
self.rocksdb_opt.clone().into(),
false,
BUFFERED_STATE_TARGET_ITEMS,
DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD,
)?)
.get_restore_handler();
let mut attempt = 0;
while attempt < RETRY_ATTEMPT {
let ret = ReplayVerifyCoordinator::new(
self.storage.clone().init_storage().await?,
self.metadata_cache_opt.clone(),
self.trusted_waypoints_opt.clone(),
self.concurrent_downloads.get(),
self.replay_concurrency_level.get(),
restore_handler.clone(),
self.start_version.unwrap_or(0),
self.end_version.unwrap_or(Version::MAX),
self.validate_modules,
VerifyExecutionMode::verify_except(self.txns_to_skip.clone())
.set_lazy_quit(self.lazy_quit),
)?
.run()
.await;
match ret {
Err(e) => match e {
ReplayError::TxnMismatch => {
info!("ReplayVerify coordinator exiting with Txn output mismatch error.");
break;
},
_ => {
info!(
"ReplayVerify coordinator retrying with attempt {}.",
attempt
);
},
let ret = ReplayVerifyCoordinator::new(
self.storage.clone().init_storage().await?,
self.metadata_cache_opt.clone(),
self.trusted_waypoints_opt.clone(),
self.concurrent_downloads.get(),
self.replay_concurrency_level.get(),
restore_handler.clone(),
self.start_version.unwrap_or(0),
self.end_version.unwrap_or(Version::MAX),
self.validate_modules,
VerifyExecutionMode::verify_except(self.txns_to_skip.clone())
.set_lazy_quit(self.lazy_quit),
)?
.run()
.await;
match ret {
Err(e) => match e {
ReplayError::TxnMismatch => {
info!("ReplayVerify coordinator exiting with Txn output mismatch error.");
process::exit(2);
},
_ => {
info!("ReplayVerify coordinator succeeded");
return Ok(());
process::exit(1);
},
}
attempt += 1;
}
bail!(
"ReplayVerify coordinator failed after {} attempts.",
RETRY_ATTEMPT
)
},
_ => {
info!("ReplayVerify coordinator succeeded");
},
};
Ok(())
}
}
19 changes: 18 additions & 1 deletion testsuite/replay_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@
]


# retry the replay_verify_partition if it fails
def retry_replay_verify_partition(func, *args, **kwargs) -> Tuple[int, int, bytes]:
res = (0, 0, b"")
for i in range(3):
(partition_number, code, msg) = func(*args, **kwargs)
if code == 0:
return (partition_number, code, msg)
elif code == 1:
print(f"[partition {partition_number}] retrying {i}th time")
res = (partition_number, code, msg)
continue
else:
return (partition_number, code, msg)
return res


def replay_verify_partition(
n: int,
N: int,
Expand Down Expand Up @@ -208,9 +224,10 @@ def main(runner_no=None, runner_cnt=None, start_version=None, end_version=None):

with Pool(N) as p:
all_partitions = p.starmap(
replay_verify_partition,
retry_replay_verify_partition,
[
(
replay_verify_partition,
n,
N,
runner_start,
Expand Down
2 changes: 1 addition & 1 deletion testsuite/replay_verify_run_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@ def local_setup():
if __name__ == "__main__":
local_setup()
replay_verify.main(
runner_no=None, runner_cnt=None, start_version=261693085, end_version=267000000
runner_no=None, runner_cnt=None, start_version=291217350, end_version=292975771
)

0 comments on commit 30a54ae

Please sign in to comment.