Skip to content

Commit

Permalink
chore: add monitor service back
Browse files Browse the repository at this point in the history
Co-authored-by: Nagaprasadvr <[email protected]>
  • Loading branch information
kespinola authored and Nagaprasadvr committed Feb 17, 2025
1 parent 5591f1d commit f143b0e
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions grpc-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ chrono = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive"] }
thiserror = { workspace = true }
das-core = { workspace = true }
das-bubblegum = { workspace = true }
digital_asset_types = { workspace = true }
futures = { workspace = true }
hex = { workspace = true }
Expand Down
32 changes: 32 additions & 0 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,35 @@ impl ConfigIngesterDownloadMetadata {
1
}
}

#[derive(Debug, Clone, Deserialize)]
pub struct ConfigMonitor {
pub postgres: ConfigPostgres,
pub rpc: String,
pub bubblegum: ConfigBubblegumVerify,
}

#[derive(Debug, Clone, Deserialize)]
pub struct ConfigBubblegumVerify {
#[serde(
default = "ConfigBubblegumVerify::default_report_interval",
deserialize_with = "deserialize_duration_str"
)]
pub _report_interval: Duration,
#[serde(default)]
pub only_trees: Option<Vec<String>>,
#[serde(
default = "ConfigBubblegumVerify::default_max_concurrency",
deserialize_with = "deserialize_usize_str"
)]
pub max_concurrency: usize,
}

impl ConfigBubblegumVerify {
pub const fn default_report_interval() -> Duration {
Duration::from_millis(5 * 60 * 1000)
}
pub const fn default_max_concurrency() -> usize {
20
}
}
13 changes: 12 additions & 1 deletion grpc-ingest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use {
},
anyhow::Context,
clap::{Parser, Subcommand},
config::ConfigMonitor,
std::net::SocketAddr,
};

mod config;
mod grpc;
mod ingester;
mod monitor;
mod postgres;
mod prom;
mod redis;
Expand Down Expand Up @@ -42,6 +44,9 @@ enum ArgsAction {
/// Run ingester process (process events from Redis)
#[command(name = "ingester")]
Ingester,
/// Run monitor process (verify bubblegum proofs)
#[command(name = "monitor")]
Monitor,
}

#[tokio::main]
Expand All @@ -58,7 +63,7 @@ async fn main() -> anyhow::Result<()> {
prometheus_run_server(address)?;
}

// Run grpc / ingester / download-metadata
// Run grpc / ingester / download-metadata / monitor
match args.action {
ArgsAction::Grpc => {
let config = config_load::<ConfigGrpc>(&args.config)
Expand All @@ -72,5 +77,11 @@ async fn main() -> anyhow::Result<()> {
.with_context(|| format!("failed to parse config from: {}", args.config))?;
ingester::run(config).await
}
ArgsAction::Monitor => {
let config = config_load::<ConfigMonitor>(&args.config)
.await
.with_context(|| format!("failed to parse config from: {}", args.config))?;
monitor::run(config).await
}
}
}
50 changes: 50 additions & 0 deletions grpc-ingest/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use crate::postgres::create_pool;
use crate::util::create_shutdown;
use crate::{config::ConfigMonitor, prom::update_tree_proof_report};
use das_bubblegum::{verify_bubblegum, BubblegumContext, VerifyArgs};
use das_core::{Rpc, SolanaRpcArgs};
use futures::stream::StreamExt;
use tracing::{error, info};

pub async fn run(config: ConfigMonitor) -> anyhow::Result<()> {
let mut shutdown = create_shutdown()?;
let database_pool = create_pool(config.postgres).await?;
let rpc = Rpc::from_config(&SolanaRpcArgs {
solana_rpc_url: config.rpc,
});

let bubblegum_verify = tokio::spawn(async move {
loop {
let bubblegum_context = BubblegumContext::new(database_pool.clone(), rpc.clone());
let verify_args = VerifyArgs {
only_trees: config.bubblegum.only_trees.clone(),
max_concurrency: config.bubblegum.max_concurrency,
};

match verify_bubblegum(bubblegum_context, verify_args).await {
Ok(mut reports_receiver) => {
while let Some(report) = reports_receiver.recv().await {
info!(
report = ?report,
);
update_tree_proof_report(&report);
}

tokio::time::sleep(tokio::time::Duration::from_secs(600)).await;
}
Err(e) => {
error!(
message = "Error proof report recv",
error = ?e
);
}
}
}
});

if let Some(_signal) = shutdown.next().await {}

bubblegum_verify.abort();

Ok(())
}
23 changes: 23 additions & 0 deletions grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
crate::{redis::RedisStreamMessageError, version::VERSION as VERSION_INFO},
das_bubblegum::ProofReport,
das_core::MetadataJsonTaskError,
hyper::{
server::conn::AddrStream,
Expand Down Expand Up @@ -391,3 +392,25 @@ pub fn program_transformer_task_status_inc(
.with_label_values(&[stream, consumer, kind.to_str()])
.inc()
}

pub fn update_tree_proof_report(report: &ProofReport) {
BUBBLEGUM_TREE_TOTAL_LEAVES
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.total_leaves as i64);

BUBBLEGUM_TREE_INCORRECT_PROOFS
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.incorrect_proofs as i64);

BUBBLEGUM_TREE_NOT_FOUND_PROOFS
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.not_found_proofs as i64);

BUBBLEGUM_TREE_CORRECT_PROOFS
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.correct_proofs as i64);

BUBBLEGUM_TREE_CORRUPT_PROOFS
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.corrupt_proofs as i64);
}

0 comments on commit f143b0e

Please sign in to comment.