Skip to content

Commit

Permalink
[Indexer-Grpc-V2] Add GrpcManagerService.
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 committed Jan 14, 2025
1 parent f76ac07 commit 0ff320c
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 2 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@ rust-version = { workspace = true }
[dependencies]
anyhow = { workspace = true }
aptos-indexer-grpc-server-framework = { workspace = true }
aptos-indexer-grpc-utils = { workspace = true }
aptos-protos = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
tokio-scoped = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }

[target.'cfg(unix)'.dependencies]
jemallocator = { workspace = true }
20 changes: 18 additions & 2 deletions ecosystem/indexer-grpc/indexer-grpc-manager/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::grpc_manager::GrpcManager;
use anyhow::Result;
use aptos_indexer_grpc_server_framework::RunnableConfig;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use tokio::sync::OnceCell;

static GRPC_MANAGER: OnceCell<GrpcManager> = OnceCell::const_new();

#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) struct ServiceConfig {
pub(crate) listen_address: SocketAddr,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerGrpcManagerConfig {}
pub struct IndexerGrpcManagerConfig {
pub(crate) chain_id: u64,
pub(crate) service_config: ServiceConfig,
}

#[async_trait::async_trait]
impl RunnableConfig for IndexerGrpcManagerConfig {
async fn run(&self) -> Result<()> {
Ok(())
GRPC_MANAGER
.get_or_init(|| async { GrpcManager::new(self).await })
.await
.start(&self.service_config)
}

fn get_server_name(&self) -> String {
Expand Down
71 changes: 71 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-manager/src/grpc_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
config::{IndexerGrpcManagerConfig, ServiceConfig},
metadata_manager::MetadataManager,
service::GrpcManagerService,
};
use anyhow::Result;
use aptos_protos::indexer::v1::grpc_manager_server::GrpcManagerServer;
use std::{sync::Arc, time::Duration};
use tonic::{codec::CompressionEncoding, transport::Server};
use tracing::info;

const HTTP2_PING_INTERVAL_DURATION: Duration = Duration::from_secs(60);
const HTTP2_PING_TIMEOUT_DURATION: Duration = Duration::from_secs(10);

pub(crate) struct GrpcManager {
chain_id: u64,
metadata_manager: Arc<MetadataManager>,
}

impl GrpcManager {
pub(crate) async fn new(config: &IndexerGrpcManagerConfig) -> Self {
let chain_id = config.chain_id;

let metadata_manager = Arc::new(MetadataManager::new(
chain_id,
config.self_advertised_address.clone(),
config.grpc_manager_addresses.clone(),
config.fullnode_addresses.clone(),
));

info!(
self_advertised_address = config.self_advertised_address,
"MetadataManager is created, grpc_manager_addresses: {:?}, fullnode_addresses: {:?}.",
config.grpc_manager_addresses,
config.fullnode_addresses
);

Self {
chain_id,
metadata_manager,
}
}

pub(crate) fn start(&self, service_config: &ServiceConfig) -> Result<()> {
let service = GrpcManagerServer::new(GrpcManagerService::new(
self.chain_id,
self.metadata_manager.clone(),
))
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd);
let server = Server::builder()
.http2_keepalive_interval(Some(HTTP2_PING_INTERVAL_DURATION))
.http2_keepalive_timeout(Some(HTTP2_PING_TIMEOUT_DURATION))
.add_service(service);

tokio_scoped::scope(|s| {
s.spawn(async move {
self.metadata_manager.start().await.unwrap();
});
s.spawn(async move {
info!("Starting GrpcManager at {}.", service_config.listen_address);
server.serve(service_config.listen_address).await.unwrap();
});
});

Ok(())
}
}
2 changes: 2 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
// SPDX-License-Identifier: Apache-2.0

pub mod config;
mod grpc_manager;
mod service;
109 changes: 109 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-manager/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_protos::indexer::v1::{
grpc_manager_server::GrpcManager, service_info::Info, GetDataServiceForRequestRequest,
GetDataServiceForRequestResponse, GetTransactionsRequest, HeartbeatRequest, HeartbeatResponse,
TransactionsResponse,
};
use tonic::{Request, Response, Status};

pub struct GrpcManagerService {
chain_id: u64,
}

impl GrpcManagerService {
pub(crate) fn new(chain_id: u64) -> Self {
Self { chain_id }
}

async fn handle_heartbeat(
&self,
_address: String,
_info: Info,
) -> anyhow::Result<Response<HeartbeatResponse>> {
// TODO(grao): Implement.
todo!()
}

fn pick_live_data_service(&self, _starting_version: u64) -> Option<String> {
// TODO(grao): Implement.
todo!()
}

async fn pick_historical_data_service(&self, _starting_version: u64) -> Option<String> {
// TODO(grao): Implement.
todo!()
}
}

#[tonic::async_trait]
impl GrpcManager for GrpcManagerService {
async fn heartbeat(
&self,
request: Request<HeartbeatRequest>,
) -> Result<Response<HeartbeatResponse>, Status> {
let request = request.into_inner();
if let Some(service_info) = request.service_info {
if let Some(address) = service_info.address {
if let Some(info) = service_info.info {
return self
.handle_heartbeat(address, info)
.await
.map_err(|e| Status::internal(&format!("Error handling heartbeat: {e}")));
}
}
}

Err(Status::invalid_argument("Bad request."))
}

async fn get_transactions(
&self,
request: Request<GetTransactionsRequest>,
) -> Result<Response<TransactionsResponse>, Status> {
let _request = request.into_inner();
let transactions = vec![];
// TODO(grao): Implement.

Ok(Response::new(TransactionsResponse {
transactions,
chain_id: Some(self.chain_id),
}))
}

async fn get_data_service_for_request(
&self,
request: Request<GetDataServiceForRequestRequest>,
) -> Result<Response<GetDataServiceForRequestResponse>, Status> {
let request = request.into_inner();

if request.user_request.is_none() {
return Err(Status::invalid_argument("Bad request."));
}

let user_request = request.user_request.unwrap();
if user_request.starting_version.is_none() {
return Err(Status::invalid_argument("Bad request."));
}

let starting_version = user_request.starting_version();

let data_service_address =
// TODO(grao): Use a simple strategy for now. Consider to make it smarter in the
// future.
if let Some(address) = self.pick_live_data_service(starting_version) {
address
} else if let Some(address) = self.pick_historical_data_service(starting_version).await {
address
} else {
return Err(Status::internal(
"Cannot find a data service instance to serve the provided request.",
));
};

Ok(Response::new(GetDataServiceForRequestResponse {
data_service_address,
}))
}
}

0 comments on commit 0ff320c

Please sign in to comment.