Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add monitor service back #227

Open
wants to merge 1 commit into
base: das-127-publish-ops-cli-as-separate-pr
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions grpc-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ chrono = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive"] }
thiserror = { workspace = true }
das-core = { workspace = true }
das-bubblegum = { workspace = true }
digital_asset_types = { workspace = true }
futures = { workspace = true }
hex = { workspace = true }
Expand Down
32 changes: 32 additions & 0 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,35 @@ impl ConfigIngesterDownloadMetadata {
1
}
}

#[derive(Debug, Clone, Deserialize)]
pub struct ConfigMonitor {
pub postgres: ConfigPostgres,
pub rpc: String,
pub bubblegum: ConfigBubblegumVerify,
}

#[derive(Debug, Clone, Deserialize)]
pub struct ConfigBubblegumVerify {
#[serde(
default = "ConfigBubblegumVerify::default_report_interval",
deserialize_with = "deserialize_duration_str"
)]
pub _report_interval: Duration,
#[serde(default)]
pub only_trees: Option<Vec<String>>,
#[serde(
default = "ConfigBubblegumVerify::default_max_concurrency",
deserialize_with = "deserialize_usize_str"
)]
pub max_concurrency: usize,
}

impl ConfigBubblegumVerify {
pub const fn default_report_interval() -> Duration {
Duration::from_millis(5 * 60 * 1000)
}
pub const fn default_max_concurrency() -> usize {
20
}
}
13 changes: 12 additions & 1 deletion grpc-ingest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use {
},
anyhow::Context,
clap::{Parser, Subcommand},
config::ConfigMonitor,
std::net::SocketAddr,
};

mod config;
mod grpc;
mod ingester;
mod monitor;
mod postgres;
mod prom;
mod redis;
Expand Down Expand Up @@ -42,6 +44,9 @@ enum ArgsAction {
/// Run ingester process (process events from Redis)
#[command(name = "ingester")]
Ingester,
/// Run monitor process (verify bubblegum proofs)
#[command(name = "monitor")]
Monitor,
}

#[tokio::main]
Expand All @@ -58,7 +63,7 @@ async fn main() -> anyhow::Result<()> {
prometheus_run_server(address)?;
}

// Run grpc / ingester / download-metadata
// Run grpc / ingester / download-metadata / monitor
match args.action {
ArgsAction::Grpc => {
let config = config_load::<ConfigGrpc>(&args.config)
Expand All @@ -72,5 +77,11 @@ async fn main() -> anyhow::Result<()> {
.with_context(|| format!("failed to parse config from: {}", args.config))?;
ingester::run(config).await
}
ArgsAction::Monitor => {
let config = config_load::<ConfigMonitor>(&args.config)
.await
.with_context(|| format!("failed to parse config from: {}", args.config))?;
monitor::run(config).await
}
}
}
50 changes: 50 additions & 0 deletions grpc-ingest/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use crate::postgres::create_pool;
use crate::util::create_shutdown;
use crate::{config::ConfigMonitor, prom::update_tree_proof_report};
use das_bubblegum::{verify_bubblegum, BubblegumContext, VerifyArgs};
use das_core::{Rpc, SolanaRpcArgs};
use futures::stream::StreamExt;
use tracing::{error, info};

pub async fn run(config: ConfigMonitor) -> anyhow::Result<()> {
let mut shutdown = create_shutdown()?;
let database_pool = create_pool(config.postgres).await?;
let rpc = Rpc::from_config(&SolanaRpcArgs {
solana_rpc_url: config.rpc,
});

let bubblegum_verify = tokio::spawn(async move {
loop {
let bubblegum_context = BubblegumContext::new(database_pool.clone(), rpc.clone());
let verify_args = VerifyArgs {
only_trees: config.bubblegum.only_trees.clone(),
max_concurrency: config.bubblegum.max_concurrency,
};

match verify_bubblegum(bubblegum_context, verify_args).await {
Ok(mut reports_receiver) => {
while let Some(report) = reports_receiver.recv().await {
info!(
report = ?report,
);
update_tree_proof_report(&report);
}

tokio::time::sleep(tokio::time::Duration::from_secs(600)).await;
}
Err(e) => {
error!(
message = "Error proof report recv",
error = ?e
);
}
}
}
});

if let Some(_signal) = shutdown.next().await {}

bubblegum_verify.abort();

Ok(())
}
23 changes: 23 additions & 0 deletions grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
crate::{redis::RedisStreamMessageError, version::VERSION as VERSION_INFO},
das_bubblegum::ProofReport,
das_core::MetadataJsonTaskError,
hyper::{
server::conn::AddrStream,
Expand Down Expand Up @@ -391,3 +392,25 @@ pub fn program_transformer_task_status_inc(
.with_label_values(&[stream, consumer, kind.to_str()])
.inc()
}

pub fn update_tree_proof_report(report: &ProofReport) {
BUBBLEGUM_TREE_TOTAL_LEAVES
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.total_leaves as i64);

BUBBLEGUM_TREE_INCORRECT_PROOFS
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.incorrect_proofs as i64);

BUBBLEGUM_TREE_NOT_FOUND_PROOFS
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.not_found_proofs as i64);

BUBBLEGUM_TREE_CORRECT_PROOFS
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.correct_proofs as i64);

BUBBLEGUM_TREE_CORRUPT_PROOFS
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.corrupt_proofs as i64);
}
Loading