From 76c1d10df1a35e8de360bc7f6f882c50a7d07501 Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Sat, 18 Jan 2025 03:40:06 +0000 Subject: [PATCH] [Indexer-Grpc-V2] Add ConnectionManager. --- Cargo.lock | 6 + .../indexer-grpc-data-service-v2/Cargo.toml | 6 + .../src/config.rs | 2 + .../src/connection_manager.rs | 291 ++++++++++++++++++ .../indexer-grpc-data-service-v2/src/lib.rs | 2 + 5 files changed, 307 insertions(+) create mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/connection_manager.rs diff --git a/Cargo.lock b/Cargo.lock index 811de76d13cfe..c9d22a66b426b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2172,11 +2172,17 @@ version = "1.0.0" dependencies = [ "anyhow", "aptos-indexer-grpc-server-framework", + "aptos-indexer-grpc-utils", + "aptos-protos 1.3.1", "async-trait", "clap 4.5.21", + "dashmap", "jemallocator", + "rand 0.7.3", "serde", "tokio", + "tonic 0.12.3", + "tracing", ] [[package]] diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/Cargo.toml index 533ac675464a0..392351bbf39d0 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/Cargo.toml @@ -15,10 +15,16 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } aptos-indexer-grpc-server-framework = { workspace = true } +aptos-indexer-grpc-utils = { workspace = true } +aptos-protos = { workspace = true } async-trait = { workspace = true } clap = { workspace = true } +dashmap = { workspace = true } +rand = { workspace = true } serde = { workspace = true } tokio = { workspace = true } +tonic = { workspace = true } +tracing = { workspace = true } [target.'cfg(unix)'.dependencies] jemallocator = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/config.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/config.rs index 7f81be313ec48..93db4301b658d 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/config.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/config.rs @@ -5,6 +5,8 @@ use anyhow::Result; use aptos_indexer_grpc_server_framework::RunnableConfig; use serde::{Deserialize, Serialize}; +pub(crate) const MAX_MESSAGE_SIZE: usize = 256 * (1 << 20); + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] pub struct IndexerGrpcDataServiceConfig {} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/connection_manager.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/connection_manager.rs new file mode 100644 index 0000000000000..a4f8df67dd209 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/connection_manager.rs @@ -0,0 +1,291 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::config::MAX_MESSAGE_SIZE; +use aptos_indexer_grpc_utils::{system_time_to_proto, timestamp_now_proto}; +use aptos_protos::indexer::v1::{ + grpc_manager_client::GrpcManagerClient, service_info::Info, ActiveStream, HeartbeatRequest, + HistoricalDataServiceInfo, LiveDataServiceInfo, ServiceInfo, StreamInfo, StreamProgress, + StreamProgressSampleProto, +}; +use dashmap::DashMap; +use rand::prelude::*; +use std::{ + collections::VecDeque, + sync::atomic::{AtomicU64, Ordering}, + time::{Duration, SystemTime}, +}; +use tonic::{codec::CompressionEncoding, transport::channel::Channel}; +use tracing::{info, warn}; + +pub static MAX_HEARTBEAT_RETRIES: usize = 3; + +static OLD_PROGRESS_SAMPLING_RATE: Duration = Duration::from_secs(60); +static RECENT_PROGRESS_SAMPLING_RATE: Duration = Duration::from_secs(1); +static MAX_RECENT_SAMPLES_TO_KEEP: usize = 60; +static MAX_OLD_SAMPLES_TO_KEEP: usize = 60; + +#[derive(Default)] +struct StreamProgressSamples { + old_samples: VecDeque, + recent_samples: VecDeque, +} + +impl StreamProgressSamples { + fn new() -> Self { + Default::default() + } + + fn to_proto(&self) -> Vec { + self.old_samples + .iter() + .chain(self.recent_samples.iter()) + .map(|sample| StreamProgressSampleProto { + timestamp: Some(system_time_to_proto(sample.timestamp)), + version: sample.version, + size_bytes: sample.size_bytes, + }) + .collect() + } + + fn maybe_add_sample(&mut self, version: u64, size_bytes: u64) { + let now = SystemTime::now(); + let sample = StreamProgressSample { + timestamp: now, + version, + size_bytes, + }; + + if Self::accept_sample(&self.recent_samples, &sample, RECENT_PROGRESS_SAMPLING_RATE) { + self.recent_samples.push_back(sample); + if self.recent_samples.len() > MAX_RECENT_SAMPLES_TO_KEEP { + let sample = self.recent_samples.pop_front().unwrap(); + if Self::accept_sample(&self.old_samples, &sample, OLD_PROGRESS_SAMPLING_RATE) { + self.old_samples.push_back(sample); + if self.old_samples.len() > MAX_OLD_SAMPLES_TO_KEEP { + self.old_samples.pop_front(); + } + } + } + } + } + + fn accept_sample( + samples: &VecDeque, + sample: &StreamProgressSample, + sampling_rate: Duration, + ) -> bool { + if let Some(last_sample) = samples.back() { + if let Ok(delta) = sample.timestamp.duration_since(last_sample.timestamp) { + if delta >= sampling_rate { + return true; + } + } + } else { + return true; + }; + + return false; + } +} + +struct StreamProgressSample { + timestamp: SystemTime, + version: u64, + size_bytes: u64, +} + +pub(crate) struct ConnectionManager { + chain_id: u64, + grpc_manager_connections: DashMap>, + self_advertised_address: String, + known_latest_version: AtomicU64, + active_streams: DashMap, + is_live_data_service: bool, +} + +impl ConnectionManager { + pub(crate) async fn new( + chain_id: u64, + grpc_manager_addresses: Vec, + self_advertised_address: String, + is_live_data_service: bool, + ) -> Self { + let grpc_manager_connections = DashMap::new(); + grpc_manager_addresses.into_iter().for_each(|address| { + grpc_manager_connections + .insert(address.clone(), Self::create_client_from_address(&address)); + }); + let res = Self { + chain_id, + grpc_manager_connections, + self_advertised_address, + known_latest_version: AtomicU64::new(0), + active_streams: DashMap::new(), + is_live_data_service, + }; + + // Keep fetching latest version until it is available. + while res.known_latest_version.load(Ordering::SeqCst) == 0 { + for entry in res.grpc_manager_connections.iter() { + let address = entry.key(); + if let Err(e) = res.heartbeat(address).await { + warn!("Error during heartbeat: {e}."); + } + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + + res + } + + pub(crate) async fn start(&self) { + loop { + for entry in self.grpc_manager_connections.iter() { + let address = entry.key(); + let mut retries = 0; + loop { + let result = self.heartbeat(address).await; + if result.is_ok() { + break; + } + retries += 1; + if retries > MAX_HEARTBEAT_RETRIES { + warn!("Failed to send heartbeat to GrpcManager at {address}, last error: {result:?}."); + } + } + continue; + } + + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + + pub(crate) fn get_grpc_manager_client_for_request(&self) -> GrpcManagerClient { + let mut rng = thread_rng(); + self.grpc_manager_connections + .iter() + .choose(&mut rng) + .map(|kv| kv.value().clone()) + .unwrap() + } + + pub(crate) fn known_latest_version(&self) -> u64 { + self.known_latest_version.load(Ordering::SeqCst) + } + + pub(crate) fn update_known_latest_version(&self, version: u64) { + self.known_latest_version + .fetch_max(version, Ordering::SeqCst); + } + + pub(crate) fn insert_active_stream( + &self, + id: &String, + start_version: u64, + end_version: Option, + ) { + self.active_streams.insert( + id.clone(), + ( + ActiveStream { + id: Some(id.clone()), + start_time: Some(timestamp_now_proto()), + start_version, + end_version, + progress: None, + }, + StreamProgressSamples::new(), + ), + ); + } + + pub(crate) fn remove_active_stream(&self, id: &String) { + self.active_streams.remove(id); + } + + pub(crate) fn update_stream_progress(&self, id: &str, version: u64, size_bytes: u64) { + self.active_streams + .get_mut(id) + .unwrap() + .1 + .maybe_add_sample(version, size_bytes); + } + + pub(crate) fn get_active_streams(&self) -> Vec { + self.active_streams + .iter() + .map(|entry| { + let (active_stream, samples) = entry.value(); + let mut active_stream = active_stream.clone(); + active_stream.progress = Some(StreamProgress { + samples: samples.to_proto(), + }); + active_stream + }) + .collect() + } + + async fn heartbeat(&self, address: &str) -> Result<(), tonic::Status> { + info!("Sending heartbeat to GrpcManager {address}."); + let timestamp = Some(timestamp_now_proto()); + let known_latest_version = Some(self.known_latest_version()); + let stream_info = Some(StreamInfo { + active_streams: self.get_active_streams(), + }); + + let info = if self.is_live_data_service { + Some(Info::LiveDataServiceInfo(LiveDataServiceInfo { + chain_id: self.chain_id, + timestamp, + known_latest_version, + stream_info, + // TODO(grao): Populate min_servable_version. + min_servable_version: None, + })) + } else { + Some(Info::HistoricalDataServiceInfo(HistoricalDataServiceInfo { + chain_id: self.chain_id, + timestamp, + known_latest_version, + stream_info, + })) + }; + let service_info = ServiceInfo { + address: Some(self.self_advertised_address.clone()), + info, + }; + let request = HeartbeatRequest { + service_info: Some(service_info), + }; + let response = self + .grpc_manager_connections + .get(address) + // TODO(grao): Consider to not use unwrap here. + .unwrap() + .clone() + .heartbeat(request) + .await? + .into_inner(); + if let Some(known_latest_version) = response.known_latest_version { + info!("Received known_latest_version ({known_latest_version}) from GrpcManager {address}."); + self.update_known_latest_version(known_latest_version); + } else { + warn!("HeartbeatResponse doesn't contain known_latest_version, GrpcManager address: {address}"); + } + + Ok(()) + } + + fn create_client_from_address(address: &str) -> GrpcManagerClient { + info!("Creating GrpcManagerClient for {address}."); + let channel = Channel::from_shared(address.to_string()) + .expect("Bad address.") + .connect_lazy(); + GrpcManagerClient::new(channel) + .send_compressed(CompressionEncoding::Zstd) + .accept_compressed(CompressionEncoding::Zstd) + .max_decoding_message_size(MAX_MESSAGE_SIZE) + .max_encoding_message_size(MAX_MESSAGE_SIZE) + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/lib.rs index 36d88a8c1b06b..3add1bd47242e 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/lib.rs @@ -2,3 +2,5 @@ // SPDX-License-Identifier: Apache-2.0 pub mod config; +#[allow(dead_code)] +mod connection_manager;