diff --git a/Cargo.lock b/Cargo.lock index 23bf1f76b92c9..244b1605f8f34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2373,6 +2373,7 @@ dependencies = [ "aptos-indexer-grpc-utils", "aptos-logger", "aptos-mempool", + "aptos-metrics-core", "aptos-runtimes", "aptos-storage-interface", "aptos-types", @@ -2381,6 +2382,7 @@ dependencies = [ "google-cloud-storage", "hyper 0.14.28", "itertools 0.13.0", + "once_cell", "rocksdb", "serde", "serde_json", diff --git a/api/src/tests/transactions_test.rs b/api/src/tests/transactions_test.rs index e8524525004d4..86f10a25a05fd 100644 --- a/api/src/tests/transactions_test.rs +++ b/api/src/tests/transactions_test.rs @@ -496,7 +496,7 @@ async fn test_get_transaction_by_hash() { async fn test_get_transaction_by_hash_with_delayed_internal_indexer() { let mut context = new_test_context_with_sharding_and_delayed_internal_indexer( current_function_name!(), - Some(1), + Some(2), ); let mut account = context.gen_account(); diff --git a/api/test-context/src/test_context.rs b/api/test-context/src/test_context.rs index e4f9d686be7ca..0f9c9de274b93 100644 --- a/api/test-context/src/test_context.rs +++ b/api/test-context/src/test_context.rs @@ -56,7 +56,13 @@ use bytes::Bytes; use hyper::{HeaderMap, Response}; use rand::SeedableRng; use serde_json::{json, Value}; -use std::{boxed::Box, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; +use std::{ + boxed::Box, + net::SocketAddr, + path::PathBuf, + sync::Arc, + time::{Duration, Instant}, +}; use tokio::sync::watch::channel; use warp::{http::header::CONTENT_TYPE, Filter, Rejection, Reply}; use warp_reverse_proxy::reverse_proxy_filter; @@ -132,7 +138,7 @@ pub fn new_test_context_inner( let (root_key, genesis, genesis_waypoint, validators) = builder.build(&mut rng).unwrap(); let (validator_identity, _, _, _) = validators[0].get_key_objects(None).unwrap(); let validator_owner = validator_identity.account_address.unwrap(); - let (sender, recver) = channel::(0); + let (sender, recver) = channel::<(Instant, Version)>((Instant::now(), 0 as Version)); let (db, db_rw) = if use_db_with_indexer { let mut aptos_db = AptosDB::new_for_test_with_indexer( &tmp_dir, diff --git a/aptos-node/src/services.rs b/aptos-node/src/services.rs index 9537f4ea3ef9f..a4a1feac629b6 100644 --- a/aptos-node/src/services.rs +++ b/aptos-node/src/services.rs @@ -32,7 +32,9 @@ use aptos_peer_monitoring_service_server::{ use aptos_peer_monitoring_service_types::PeerMonitoringServiceMessage; use aptos_storage_interface::{DbReader, DbReaderWriter}; use aptos_time_service::TimeService; -use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader}; +use aptos_types::{ + chain_id::ChainId, indexer::indexer_db_reader::IndexerReader, transaction::Version, +}; use aptos_validator_transaction_pool::VTxnPoolState; use futures::channel::{mpsc, mpsc::Sender, oneshot}; use std::{sync::Arc, time::Instant}; @@ -51,7 +53,7 @@ pub fn bootstrap_api_and_indexer( db_rw: DbReaderWriter, chain_id: ChainId, internal_indexer_db: Option, - update_receiver: Option>, + update_receiver: Option>, api_port_tx: Option>, indexer_grpc_port_tx: Option>, ) -> anyhow::Result<( diff --git a/aptos-node/src/storage.rs b/aptos-node/src/storage.rs index a67e8709a6dde..2277057db898b 100644 --- a/aptos-node/src/storage.rs +++ b/aptos-node/src/storage.rs @@ -20,7 +20,6 @@ use tokio::{ runtime::Runtime, sync::watch::{channel, Receiver as WatchReceiver}, }; - pub(crate) fn maybe_apply_genesis( db_rw: &DbReaderWriter, node_config: &NodeConfig, @@ -51,11 +50,11 @@ pub(crate) fn bootstrap_db( DbReaderWriter, Option, Option, - Option>, + Option>, )> { let internal_indexer_db = InternalIndexerDBService::get_indexer_db(node_config); let (update_sender, update_receiver) = if internal_indexer_db.is_some() { - let (sender, receiver) = channel::(0); + let (sender, receiver) = channel::<(Instant, Version)>((Instant::now(), 0 as Version)); (Some(sender), Some(receiver)) } else { (None, None) @@ -177,7 +176,7 @@ pub fn initialize_database_and_checkpoints( Option, Waypoint, Option, - Option>, + Option>, )> { // If required, create RocksDB checkpoints and change the working directory. // This is test-only. diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml index 674a13484f5e2..c8ff7258a4e6d 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml @@ -21,6 +21,7 @@ aptos-indexer-grpc-fullnode = { workspace = true } aptos-indexer-grpc-utils = { workspace = true } aptos-logger = { workspace = true } aptos-mempool = { workspace = true } +aptos-metrics-core = { workspace = true } aptos-runtimes = { workspace = true } aptos-storage-interface = { workspace = true } aptos-types = { workspace = true } @@ -29,6 +30,7 @@ futures = { workspace = true } google-cloud-storage = { workspace = true } hyper = { workspace = true } itertools = { workspace = true } +once_cell = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tar = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs index 5b39bbe9784aa..73e2c7660d617 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs @@ -1,6 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use crate::metrics::INDEXER_DB_LATENCY; use anyhow::Result; use aptos_config::config::{internal_indexer_db_config::InternalIndexerDBConfig, NodeConfig}; use aptos_db_indexer::{ @@ -15,7 +16,7 @@ use aptos_types::{indexer::indexer_db_reader::IndexerReader, transaction::Versio use std::{ path::{Path, PathBuf}, sync::Arc, - time::Duration, + time::Instant, }; use tokio::{runtime::Handle, sync::watch::Receiver as WatchReceiver}; @@ -24,14 +25,14 @@ const INTERNAL_INDEXER_DB: &str = "internal_indexer_db"; pub struct InternalIndexerDBService { pub db_indexer: Arc, - pub update_receiver: WatchReceiver, + pub update_receiver: WatchReceiver<(Instant, Version)>, } impl InternalIndexerDBService { pub fn new( db_reader: Arc, internal_indexer_db: InternalIndexerDB, - update_receiver: WatchReceiver, + update_receiver: WatchReceiver<(Instant, Version)>, ) -> Self { let internal_db_indexer = Arc::new(DBIndexer::new(internal_indexer_db, db_reader)); Self { @@ -166,23 +167,23 @@ impl InternalIndexerDBService { pub async fn run(&mut self, node_config: &NodeConfig) -> Result<()> { let mut start_version = self.get_start_version(node_config).await?; + let mut target_version = self.db_indexer.main_db_reader.ensure_synced_version()?; + let mut step_timer = std::time::Instant::now(); loop { - let start_time: std::time::Instant = std::time::Instant::now(); - let next_version = self.db_indexer.process_a_batch(start_version)?; - - if next_version == start_version { - if let Ok(recv_res) = - tokio::time::timeout(Duration::from_millis(100), self.update_receiver.changed()) - .await - { - if recv_res.is_err() { - info!("update sender is dropped"); - return Ok(()); - } + if target_version == start_version { + match self.update_receiver.changed().await { + Ok(_) => { + info!("bowu update_receiver changed"); + (step_timer, target_version) = *self.update_receiver.borrow(); + }, + Err(e) => { + panic!("Failed to get update from update_receiver: {}", e); + }, } - continue; - }; + } + let next_version = self.db_indexer.process(start_version, target_version)?; + INDEXER_DB_LATENCY.set(step_timer.elapsed().as_millis() as i64); log_grpc_step( SERVICE_TYPE, IndexerGrpcStep::InternalIndexerDBProcessed, @@ -190,12 +191,13 @@ impl InternalIndexerDBService { Some(next_version as i64), None, None, - Some(start_time.elapsed().as_secs_f64()), + Some(step_timer.elapsed().as_secs_f64()), None, Some((next_version - start_version) as i64), None, ); start_version = next_version; + info!("bowu next_version: {:?}, start_version: {:?}, target_version: {:?}", next_version, start_version, target_version); } } @@ -205,18 +207,14 @@ impl InternalIndexerDBService { node_config: &NodeConfig, end_version: Option, ) -> Result<()> { - let mut start_version = self.get_start_version(node_config).await?; - while start_version <= end_version.unwrap_or(std::u64::MAX) { - let next_version = self.db_indexer.process_a_batch(start_version)?; - if next_version == start_version { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - continue; - } - start_version = next_version; + let start_version = self.get_start_version(node_config).await?; + let end_version = end_version.unwrap_or(std::u64::MAX); + let mut next_version = start_version; + while next_version < end_version { + next_version = self.db_indexer.process(start_version, end_version)?; + // We shouldn't stop the internal indexer so that internal indexer can catch up with the main DB + tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - // We should never stop the internal indexer - tokio::time::sleep(std::time::Duration::from_secs(100)).await; - Ok(()) } } @@ -230,7 +228,7 @@ impl MockInternalIndexerDBService { pub fn new_for_test( db_reader: Arc, node_config: &NodeConfig, - update_receiver: WatchReceiver, + update_receiver: WatchReceiver<(Instant, Version)>, end_version: Option, ) -> Self { if !node_config diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs index 59c545b7701b4..3c8edbb137bd1 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs @@ -3,6 +3,7 @@ pub mod backup_restore; pub mod internal_indexer_db_service; +pub mod metrics; pub mod runtime; pub mod table_info_service; diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/metrics.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/metrics.rs new file mode 100644 index 0000000000000..58104d65f8e01 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/metrics.rs @@ -0,0 +1,13 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_metrics_core::{register_int_gauge, IntGauge}; +use once_cell::sync::Lazy; + +pub static INDEXER_DB_LATENCY: Lazy = Lazy::new(|| { + register_int_gauge!( + "aptos_internal_indexer_latency", + "The latency between main db update and data written to indexer db" + ) + .unwrap() +}); diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs index cfd9dfc2ce5b7..de5fdd47252f9 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs @@ -15,7 +15,7 @@ use aptos_db_indexer::{ use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReaderWriter; use aptos_types::{chain_id::ChainId, transaction::Version}; -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use tokio::{runtime::Runtime, sync::watch::Receiver as WatchReceiver}; const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db"; @@ -24,7 +24,7 @@ pub fn bootstrap_internal_indexer_db( config: &NodeConfig, db_rw: DbReaderWriter, internal_indexer_db: Option, - update_receiver: Option>, + update_receiver: Option>, ) -> Option<(Runtime, Arc)> { if !config.indexer_db_config.is_internal_indexer_db_enabled() || internal_indexer_db.is_none() { return None; diff --git a/execution/executor/tests/internal_indexer_test.rs b/execution/executor/tests/internal_indexer_test.rs index a0ff25cef9ce5..1b9ec3396e9f5 100644 --- a/execution/executor/tests/internal_indexer_test.rs +++ b/execution/executor/tests/internal_indexer_test.rs @@ -152,10 +152,10 @@ fn test_db_indexer_data() { // assert the data matches the expected data let version = internal_indexer_db.get_persisted_version().unwrap(); assert_eq!(version, None); - let mut start_version = version.map_or(0, |v| v + 1); - while start_version < total_version { - start_version = db_indexer.process_a_batch(start_version).unwrap(); - } + let start_version = version.map_or(0, |v| v + 1); + db_indexer + .process_a_batch(start_version, total_version) + .unwrap(); // wait for the commit to finish thread::sleep(Duration::from_millis(100)); // indexer has process all the transactions diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index c1f3a1a942006..77b2b51bbc0fe 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -591,9 +591,10 @@ impl AptosDB { COMMITTED_TXNS.inc_by(num_txns); LATEST_TXN_VERSION.set(version as i64); + info!("bowu post_commit: {:?}", version); if let Some(update_sender) = &self.update_subscriber { update_sender.send( - version + (Instant::now(), version) ).map_err(| err | { AptosDbError::Other(format!("Failed to send update to subscriber: {}", err)) })?; diff --git a/storage/aptosdb/src/db/mod.rs b/storage/aptosdb/src/db/mod.rs index 4eb7c28193ba1..4569cdc6e17de 100644 --- a/storage/aptosdb/src/db/mod.rs +++ b/storage/aptosdb/src/db/mod.rs @@ -100,7 +100,7 @@ pub struct AptosDB { commit_lock: std::sync::Mutex<()>, indexer: Option, skip_index_and_usage: bool, - update_subscriber: Option>, + update_subscriber: Option>, } // DbReader implementations and private functions used by them. @@ -186,7 +186,10 @@ impl AptosDB { Ok((ledger_db, state_merkle_db, state_kv_db)) } - pub fn add_version_update_subscriber(&mut self, sender: Sender) -> Result<()> { + pub fn add_version_update_subscriber( + &mut self, + sender: Sender<(Instant, Version)>, + ) -> Result<()> { self.update_subscriber = Some(sender); Ok(()) } diff --git a/storage/aptosdb/src/fast_sync_storage_wrapper.rs b/storage/aptosdb/src/fast_sync_storage_wrapper.rs index e58099c0bb31d..d1a42ebf14e56 100644 --- a/storage/aptosdb/src/fast_sync_storage_wrapper.rs +++ b/storage/aptosdb/src/fast_sync_storage_wrapper.rs @@ -16,9 +16,8 @@ use aptos_types::{ transaction::{TransactionOutputListWithProof, Version}, }; use either::Either; -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use tokio::sync::watch::Sender; - pub const SECONDARY_DB_DIR: &str = "fast_sync_secondary"; #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -44,7 +43,7 @@ impl FastSyncStorageWrapper { pub fn initialize_dbs( config: &NodeConfig, internal_indexer_db: Option, - update_sender: Option>, + update_sender: Option>, ) -> Result> { let mut db_main = AptosDB::open( config.storage.get_dir_paths(), diff --git a/storage/indexer/src/db_indexer.rs b/storage/indexer/src/db_indexer.rs index 31b9cea0f69c7..13af504bb7bf3 100644 --- a/storage/indexer/src/db_indexer.rs +++ b/storage/indexer/src/db_indexer.rs @@ -378,8 +378,8 @@ impl DBIndexer { Ok(zipped) } - fn get_num_of_transactions(&self, version: Version) -> Result { - let highest_version = self.main_db_reader.ensure_synced_version()?; + fn get_num_of_transactions(&self, version: Version, end_version: Version) -> Result { + let highest_version = min(self.main_db_reader.ensure_synced_version()?, end_version); if version > highest_version { // In case main db is not synced yet or recreated return Ok(0); @@ -392,10 +392,25 @@ impl DBIndexer { Ok(num_of_transaction) } - pub fn process_a_batch(&self, start_version: Version) -> Result { - let _timer = TIMER.with_label_values(&["process_a_batch"]).start_timer(); + /// Process all transactions from `start_version` to `end_version`. Left inclusive, right exclusive. + pub fn process(&self, start_version: Version, end_version: Version) -> Result { let mut version = start_version; - let num_transactions = self.get_num_of_transactions(version)?; + while version < end_version { + let next_version = self.process_a_batch(version, end_version)?; + if next_version == version { + break; + } + version = next_version; + } + Ok(version) + } + + /// Process a batch of transactions that is within the range of `start_version` to `end_version`. Left inclusive, right exclusive. + pub fn process_a_batch(&self, start_version: Version, end_version: Version) -> Result { + let _timer: aptos_metrics_core::HistogramTimer = + TIMER.with_label_values(&["process_a_batch"]).start_timer(); + let mut version = start_version; + let num_transactions = self.get_num_of_transactions(version, end_version)?; // This promises num_transactions should be readable from main db let mut db_iter = self.get_main_db_iter(version, num_transactions)?; let mut batch = SchemaBatch::new();