Skip to content

Commit

Permalink
[Indexer-Grpc-V2] Add MetadataManager.
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 committed Jan 17, 2025
1 parent 7ecc8ad commit 66619b8
Show file tree
Hide file tree
Showing 12 changed files with 851 additions and 367 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ rust-version = { workspace = true }
[dependencies]
anyhow = { workspace = true }
aptos-indexer-grpc-server-framework = { workspace = true }
aptos-indexer-grpc-utils = { workspace = true }
aptos-protos = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true }
dashmap = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
tokio-scoped = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use tokio::sync::OnceCell;

static GRPC_MANAGER: OnceCell<GrpcManager> = OnceCell::const_new();

pub(crate) type GrpcAddress = String;

#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) struct ServiceConfig {
pub(crate) listen_address: SocketAddr,
Expand All @@ -20,6 +22,9 @@ pub(crate) struct ServiceConfig {
pub struct IndexerGrpcManagerConfig {
pub(crate) chain_id: u64,
pub(crate) service_config: ServiceConfig,
pub(crate) self_advertised_address: GrpcAddress,
pub(crate) grpc_manager_addresses: Vec<GrpcAddress>,
pub(crate) fullnode_addresses: Vec<GrpcAddress>,
}

#[async_trait::async_trait]
Expand Down
35 changes: 30 additions & 5 deletions ecosystem/indexer-grpc/indexer-grpc-manager/src/grpc_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

use crate::{
config::{IndexerGrpcManagerConfig, ServiceConfig},
metadata_manager::MetadataManager,
service::GrpcManagerService,
};
use anyhow::Result;
use aptos_protos::indexer::v1::grpc_manager_server::GrpcManagerServer;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use tonic::{codec::CompressionEncoding, transport::Server};
use tracing::info;

Expand All @@ -16,25 +17,49 @@ const HTTP2_PING_TIMEOUT_DURATION: Duration = Duration::from_secs(10);

pub(crate) struct GrpcManager {
chain_id: u64,
metadata_manager: Arc<MetadataManager>,
}

impl GrpcManager {
pub(crate) async fn new(config: &IndexerGrpcManagerConfig) -> Self {
let chain_id = config.chain_id;

Self { chain_id }
let metadata_manager = Arc::new(MetadataManager::new(
chain_id,
config.self_advertised_address.clone(),
config.grpc_manager_addresses.clone(),
config.fullnode_addresses.clone(),
));

info!(
self_advertised_address = config.self_advertised_address,
"MetadataManager is created, grpc_manager_addresses: {:?}, fullnode_addresses: {:?}.",
config.grpc_manager_addresses,
config.fullnode_addresses
);

Self {
chain_id,
metadata_manager,
}
}

pub(crate) fn start(&self, service_config: &ServiceConfig) -> Result<()> {
let service = GrpcManagerServer::new(GrpcManagerService::new(self.chain_id))
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd);
let service = GrpcManagerServer::new(GrpcManagerService::new(
self.chain_id,
self.metadata_manager.clone(),
))
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd);
let server = Server::builder()
.http2_keepalive_interval(Some(HTTP2_PING_INTERVAL_DURATION))
.http2_keepalive_timeout(Some(HTTP2_PING_TIMEOUT_DURATION))
.add_service(service);

tokio_scoped::scope(|s| {
s.spawn(async move {
self.metadata_manager.start().await.unwrap();
});
s.spawn(async move {
info!("Starting GrpcManager at {}.", service_config.listen_address);
server.serve(service_config.listen_address).await.unwrap();
Expand Down
1 change: 1 addition & 0 deletions ecosystem/indexer-grpc/indexer-grpc-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

pub mod config;
mod grpc_manager;
mod metadata_manager;
mod service;
#[cfg(test)]
mod test;
Loading

0 comments on commit 66619b8

Please sign in to comment.