From 57c89e237a57b49214eaf902303e3d89c9d82396 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Tue, 1 Oct 2024 13:11:13 -0300 Subject: [PATCH] feat: add metrics to service (#319) * feat: add metrics to service Signed-off-by: Gustavo Inacio * chore: remove unused code Signed-off-by: Gustavo Inacio * fix: update failed receipt metric name Signed-off-by: Gustavo Inacio * fix: use prometheus to encode metrics Signed-off-by: Gustavo Inacio --------- Signed-off-by: Gustavo Inacio --- Cargo.lock | 1 + common/src/indexer_errors.rs | 353 ------------------ .../indexer_service/http/indexer_service.rs | 53 ++- common/src/indexer_service/http/metrics.rs | 37 -- common/src/indexer_service/http/mod.rs | 1 - .../indexer_service/http/request_handler.rs | 172 ++++++--- .../http/tap_receipt_header.rs | 26 +- common/src/lib.rs | 3 - common/src/metrics/mod.rs | 14 - service/Cargo.toml | 1 + service/src/routes/cost.rs | 89 ++++- service/src/service.rs | 1 - 12 files changed, 269 insertions(+), 482 deletions(-) delete mode 100644 common/src/indexer_errors.rs delete mode 100644 common/src/indexer_service/http/metrics.rs delete mode 100644 common/src/metrics/mod.rs diff --git a/Cargo.lock b/Cargo.lock index be8afc6a..6dc40b73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5187,6 +5187,7 @@ dependencies = [ "indexer-common", "indexer-config", "lazy_static", + "prometheus", "reqwest", "serde", "serde_json", diff --git a/common/src/indexer_errors.rs b/common/src/indexer_errors.rs deleted file mode 100644 index 58fe7565..00000000 --- a/common/src/indexer_errors.rs +++ /dev/null @@ -1,353 +0,0 @@ -// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::{ - error::Error, - fmt::{self, Display}, -}; - -use tracing::warn; - -use crate::metrics; - -const ERROR_BASE_URL: &str = "https://github.com/graphprotocol/indexer/blob/main/docs/errors.md"; - -#[derive(Debug, Clone)] -pub enum IndexerErrorCode { - IE001, - IE002, - IE003, - IE004, - IE005, - IE006, - IE007, - IE008, - IE009, - IE010, - IE011, - IE012, - IE013, - IE014, - IE015, - IE016, - IE017, - IE018, - IE019, - IE020, - IE021, - IE022, - IE023, - IE024, - IE025, - IE026, - IE027, - IE028, - IE029, - IE030, - IE031, - IE032, - IE033, - IE034, - IE035, - IE036, - IE037, - IE038, - IE039, - IE040, - IE041, - IE042, - IE043, - IE044, - IE045, - IE046, - IE047, - IE048, - IE049, - IE050, - IE051, - IE052, - IE053, - IE054, - IE055, - IE056, - IE057, - IE058, - IE059, - IE060, - IE061, - IE062, - IE063, - IE064, - IE065, - IE066, - IE067, - IE068, - IE069, - IE070, - IE071, - IE072, - IE073, - IE074, - IE075, -} - -impl fmt::Display for IndexerErrorCode { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - IndexerErrorCode::IE001 => write!(f, "IE001"), - IndexerErrorCode::IE002 => write!(f, "IE002"), - IndexerErrorCode::IE003 => write!(f, "IE003"), - IndexerErrorCode::IE004 => write!(f, "IE004"), - IndexerErrorCode::IE005 => write!(f, "IE005"), - IndexerErrorCode::IE006 => write!(f, "IE006"), - IndexerErrorCode::IE007 => write!(f, "IE007"), - IndexerErrorCode::IE008 => write!(f, "IE008"), - IndexerErrorCode::IE009 => write!(f, "IE009"), - IndexerErrorCode::IE010 => write!(f, "IE010"), - IndexerErrorCode::IE011 => write!(f, "IE011"), - IndexerErrorCode::IE012 => write!(f, "IE012"), - IndexerErrorCode::IE013 => write!(f, "IE013"), - IndexerErrorCode::IE014 => write!(f, "IE014"), - IndexerErrorCode::IE015 => write!(f, "IE015"), - IndexerErrorCode::IE016 => write!(f, "IE016"), - IndexerErrorCode::IE017 => write!(f, "IE017"), - IndexerErrorCode::IE018 => write!(f, "IE018"), - IndexerErrorCode::IE019 => write!(f, "IE019"), - IndexerErrorCode::IE020 => write!(f, "IE020"), - IndexerErrorCode::IE021 => write!(f, "IE021"), - IndexerErrorCode::IE022 => write!(f, "IE022"), - IndexerErrorCode::IE023 => write!(f, "IE023"), - IndexerErrorCode::IE024 => write!(f, "IE024"), - IndexerErrorCode::IE025 => write!(f, "IE025"), - IndexerErrorCode::IE026 => write!(f, "IE026"), - IndexerErrorCode::IE027 => write!(f, "IE027"), - IndexerErrorCode::IE028 => write!(f, "IE028"), - IndexerErrorCode::IE029 => write!(f, "IE029"), - IndexerErrorCode::IE030 => write!(f, "IE030"), - IndexerErrorCode::IE031 => write!(f, "IE031"), - IndexerErrorCode::IE032 => write!(f, "IE032"), - IndexerErrorCode::IE033 => write!(f, "IE033"), - IndexerErrorCode::IE034 => write!(f, "IE034"), - IndexerErrorCode::IE035 => write!(f, "IE035"), - IndexerErrorCode::IE036 => write!(f, "IE036"), - IndexerErrorCode::IE037 => write!(f, "IE037"), - IndexerErrorCode::IE038 => write!(f, "IE038"), - IndexerErrorCode::IE039 => write!(f, "IE039"), - IndexerErrorCode::IE040 => write!(f, "IE040"), - IndexerErrorCode::IE041 => write!(f, "IE041"), - IndexerErrorCode::IE042 => write!(f, "IE042"), - IndexerErrorCode::IE043 => write!(f, "IE043"), - IndexerErrorCode::IE044 => write!(f, "IE044"), - IndexerErrorCode::IE045 => write!(f, "IE045"), - IndexerErrorCode::IE046 => write!(f, "IE046"), - IndexerErrorCode::IE047 => write!(f, "IE047"), - IndexerErrorCode::IE048 => write!(f, "IE048"), - IndexerErrorCode::IE049 => write!(f, "IE049"), - IndexerErrorCode::IE050 => write!(f, "IE050"), - IndexerErrorCode::IE051 => write!(f, "IE051"), - IndexerErrorCode::IE052 => write!(f, "IE052"), - IndexerErrorCode::IE053 => write!(f, "IE053"), - IndexerErrorCode::IE054 => write!(f, "IE054"), - IndexerErrorCode::IE055 => write!(f, "IE055"), - IndexerErrorCode::IE056 => write!(f, "IE056"), - IndexerErrorCode::IE057 => write!(f, "IE057"), - IndexerErrorCode::IE058 => write!(f, "IE058"), - IndexerErrorCode::IE059 => write!(f, "IE059"), - IndexerErrorCode::IE060 => write!(f, "IE060"), - IndexerErrorCode::IE061 => write!(f, "IE061"), - IndexerErrorCode::IE062 => write!(f, "IE062"), - IndexerErrorCode::IE063 => write!(f, "IE063"), - IndexerErrorCode::IE064 => write!(f, "IE064"), - IndexerErrorCode::IE065 => write!(f, "IE065"), - IndexerErrorCode::IE066 => write!(f, "IE066"), - IndexerErrorCode::IE067 => write!(f, "IE067"), - IndexerErrorCode::IE068 => write!(f, "IE068"), - IndexerErrorCode::IE069 => write!(f, "IE069"), - IndexerErrorCode::IE070 => write!(f, "IE070"), - IndexerErrorCode::IE071 => write!(f, "IE071"), - IndexerErrorCode::IE072 => write!(f, "IE072"), - IndexerErrorCode::IE073 => write!(f, "IE073"), - IndexerErrorCode::IE074 => write!(f, "IE074"), - IndexerErrorCode::IE075 => write!(f, "IE075"), - } - } -} - -impl IndexerErrorCode { - pub fn message(&self) -> &'static str { - match self { - Self::IE001 => "Failed to run database migrations", - Self::IE002 => "Invalid Ethereum URL", - Self::IE003 => "Failed to index network subgraph", - Self::IE004 => "Failed to synchronize with network", - Self::IE005 => "Failed to reconcile indexer and network", - Self::IE006 => "Failed to cross-check allocation state with contracts", - Self::IE007 => "Failed to check for network pause", - Self::IE008 => "Failed to check operator status for indexer", - Self::IE009 => "Failed to query subgraph deployments worth indexing", - Self::IE010 => "Failed to query indexer allocations", - Self::IE011 => "Failed to query claimable indexer allocations", - Self::IE012 => "Failed to register indexer", - Self::IE013 => "Failed to allocate: insufficient free stake", - Self::IE014 => "Failed to allocate: allocation not created on chain", - Self::IE015 => "Failed to close allocation", - Self::IE016 => "Failed to claim allocation", - Self::IE017 => "Failed to ensure default global indexing rule", - Self::IE018 => "Failed to query indexing status API", - Self::IE019 => "Failed to query proof of indexing", - Self::IE020 => "Failed to ensure subgraph deployment is indexing", - Self::IE021 => "Failed to migrate cost model", - Self::IE022 => "Failed to identify attestation signer for allocation", - Self::IE023 => "Failed to handle state channel message", - Self::IE024 => "Failed to connect to indexing status API", - Self::IE025 => "Failed to query indexer management API", - Self::IE026 => "Failed to deploy subgraph deployment", - Self::IE027 => "Failed to remove subgraph deployment", - Self::IE028 => "Failed to reassign subgraph deployment", - Self::IE029 => "Invalid Tap-Receipt header provided", - Self::IE030 => "No Tap-Receipt header provided", - Self::IE031 => "Invalid Tap-Receipt value provided", - Self::IE032 => "Failed to process paid query", - Self::IE033 => "Failed to process free query", - Self::IE034 => "Not authorized as an operator for the indexer", - Self::IE035 => "Unhandled promise rejection", - Self::IE036 => "Unhandled exception", - Self::IE037 => "Failed to query disputable allocations", - Self::IE038 => "Failed to query epochs", - Self::IE039 => "Failed to store potential POI disputes", - Self::IE040 => "Failed to fetch POI disputes", - Self::IE041 => "Failed to query transfers to resolve", - Self::IE042 => "Failed to add transfer to the database", - Self::IE043 => "Failed to mark transfer as resolved", - Self::IE044 => "Failed to collect query fees on chain", - Self::IE045 => "Failed to queue transfers for resolving", - Self::IE046 => "Failed to resolve transfer", - Self::IE047 => "Failed to mark transfer as failed", - Self::IE048 => "Failed to withdraw query fees for allocation", - Self::IE049 => "Failed to clean up transfers for allocation", - Self::IE050 => "Transaction reverted due to gas limit being hit", - Self::IE051 => "Transaction reverted for unknown reason", - Self::IE052 => "Transaction aborted: maximum configured gas price reached", - Self::IE053 => "Failed to queue receipts for collecting", - Self::IE054 => "Failed to collect receipts in exchange for query fee voucher", - Self::IE055 => "Failed to redeem query fee voucher", - Self::IE056 => "Failed to remember allocation for collecting receipts later", - Self::IE057 => "Transaction reverted due to failing assertion in contract", - Self::IE058 => "Transaction failed because nonce has already been used", - Self::IE059 => "Failed to check latest operator ETH balance", - Self::IE060 => "Failed to allocate: Already allocating to the subgraph deployment", - Self::IE061 => "Failed to allocate: Invalid allocation amount provided", - Self::IE062 => "Did not receive tx receipt, not authorized or network paused", - Self::IE063 => "No active allocation with provided id found", - Self::IE064 => { - "Failed to unallocate: Allocation cannot be closed in the same epoch it was created" - } - Self::IE065 => "Failed to unallocate: Allocation has already been closed", - Self::IE066 => "Failed to allocate: allocation ID already exists on chain", - Self::IE067 => "Failed to query POI for current epoch start block", - Self::IE068 => "User-provided POI did not match reference POI from graph-node", - Self::IE069 => "Failed to query Epoch Block Oracle Subgraph", - Self::IE070 => "Failed to query latest valid epoch and block hash", - Self::IE071 => "Add Epoch subgraph support for non-protocol chains", - Self::IE072 => "Failed to execute batch tx (contract: staking)", - Self::IE073 => "Failed to query subgraph features from indexing statuses endpoint", - Self::IE074 => "Failed to resolve the release version", - Self::IE075 => "Failed to parse response body to query string", - } - } - - pub fn explanation(&self) -> String { - format!("{}#{}", ERROR_BASE_URL, self.message()) - } -} - -#[derive(Debug)] -pub struct IndexerErrorCause(Box); - -impl IndexerErrorCause { - pub fn new(error: E) -> Self - where - E: Into>, - { - IndexerErrorCause(error.into()) - } -} - -impl Display for IndexerErrorCause { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -impl Error for IndexerErrorCause { - fn source(&self) -> Option<&(dyn Error + 'static)> { - self.0.source() - } -} - -impl From for IndexerErrorCause { - fn from(error: String) -> Self { - Self(Box::new(std::io::Error::new( - std::io::ErrorKind::Other, - error, - ))) - } -} - -#[derive(Debug)] -pub struct IndexerError { - code: IndexerErrorCode, - explanation: String, - cause: Option, -} - -impl IndexerError { - // Create Indexer Error and automatically increment counter by the error code - pub fn new(code: IndexerErrorCode, cause: Option) -> Self { - metrics::INDEXER_ERROR - .with_label_values(&[&code.to_string()]) - .inc(); - let explanation = code.message(); - warn!( - "Encountered error {}: {}. Cause: {:#?}", - code.to_string(), - explanation, - cause - ); - Self { - code, - explanation: explanation.to_string(), - cause, - } - } - - pub fn code(&self) -> IndexerErrorCode { - self.code.clone() - } - - pub fn explanation(&self) -> &str { - &self.explanation - } - - pub fn cause(&self) -> Option<&IndexerErrorCause> { - self.cause.as_ref() - } -} - -pub fn indexer_error(code: IndexerErrorCode) -> IndexerError { - IndexerError::new(code.clone(), Some(code.explanation().into())) -} - -impl std::fmt::Display for IndexerError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Indexer error: {}, explanation: {}", - self.code, self.explanation - )?; - if let Some(cause) = &self.cause { - write!(f, ", cause: {:?}", cause)?; - } - Ok(()) - } -} diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 09f3e03a..86c75654 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -6,9 +6,9 @@ use std::{ time::Duration, }; +use alloy::dyn_abi::Eip712Domain; use alloy::sol_types::eip712_domain; use anyhow; -use autometrics::prometheus_exporter; use axum::extract::MatchedPath; use axum::extract::Request as ExtractRequest; use axum::http::{Method, Request}; @@ -21,6 +21,7 @@ use axum::{ use axum::{serve, ServiceExt}; use build_info::BuildInfo; use eventuals::Eventual; +use prometheus::TextEncoder; use reqwest::StatusCode; use serde::{de::DeserializeOwned, Serialize}; use sqlx::postgres::PgPoolOptions; @@ -31,13 +32,14 @@ use tokio::net::TcpListener; use tokio::signal; use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; use tower_http::{cors, cors::CorsLayer, normalize_path::NormalizePath, trace::TraceLayer}; +use tracing::error; use tracing::{info, info_span}; +use crate::escrow_accounts::EscrowAccounts; +use crate::escrow_accounts::EscrowAccountsError; use crate::{ address::public_key, - indexer_service::http::{ - metrics::IndexerServiceMetrics, static_subgraph::static_subgraph_request_handler, - }, + indexer_service::http::static_subgraph::static_subgraph_request_handler, prelude::{ attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, AttestationSigner, DeploymentDetails, SubgraphClient, @@ -98,6 +100,12 @@ where FailedToSignAttestation, #[error("Failed to query subgraph: {0}")] FailedToQueryStaticSubgraph(anyhow::Error), + + #[error("Could not decode signer: {0}")] + CouldNotDecodeSigner(tap_core::Error), + + #[error("There was an error while accessing escrow account: {0}")] + EscrowAccount(EscrowAccountsError), } impl IntoResponse for IndexerServiceError @@ -122,6 +130,8 @@ where ReceiptError(_) | InvalidRequest(_) | InvalidFreeQueryAuthToken + | CouldNotDecodeSigner(_) + | EscrowAccount(_) | ProcessingError(_) => StatusCode::BAD_REQUEST, FailedToQueryStaticSubgraph(_) => StatusCode::INTERNAL_SERVER_ERROR, @@ -166,7 +176,6 @@ where pub config: IndexerServiceConfig, pub release: IndexerServiceRelease, pub url_namespace: &'static str, - pub metrics_prefix: &'static str, pub extra_routes: Router>>, } @@ -178,7 +187,10 @@ where pub attestation_signers: Eventual>, pub tap_manager: Manager, pub service_impl: Arc, - pub metrics: IndexerServiceMetrics, + + // tap + pub escrow_accounts: Eventual, + pub domain_separator: Eip712Domain, } pub struct IndexerService {} @@ -188,8 +200,6 @@ impl IndexerService { where I: IndexerServiceImpl + Sync + Send + 'static, { - let metrics = IndexerServiceMetrics::new(options.metrics_prefix); - let http_client = reqwest::Client::builder() .tcp_nodelay(true) .timeout(Duration::from_secs(30)) @@ -299,21 +309,26 @@ impl IndexerService { let checks = IndexerTapContext::get_checks( database, allocations, - escrow_accounts, + escrow_accounts.clone(), domain_separator.clone(), timestamp_error_tolerance, receipt_max_value, ) .await; - let tap_manager = Manager::new(domain_separator, indexer_context, CheckList::new(checks)); + let tap_manager = Manager::new( + domain_separator.clone(), + indexer_context, + CheckList::new(checks), + ); let state = Arc::new(IndexerServiceState { config: options.config.clone(), attestation_signers, tap_manager, service_impl: Arc::new(options.service_impl), - metrics, + escrow_accounts, + domain_separator, }); // Rate limits by allowing bursts of 10 requests and requiring 100ms of @@ -451,7 +466,21 @@ impl IndexerService { tokio::spawn(async move { let router = Router::new().route( "/metrics", - get(|| async { prometheus_exporter::encode_http_response() }), + get(|| async { + let metric_families = prometheus::gather(); + let encoder = TextEncoder::new(); + + match encoder.encode_to_string(&metric_families) { + Ok(s) => (StatusCode::OK, s), + Err(e) => { + error!("Error encoding metrics: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Error encoding metrics: {}", e), + ) + } + } + }), ); serve( diff --git a/common/src/indexer_service/http/metrics.rs b/common/src/indexer_service/http/metrics.rs deleted file mode 100644 index e37b2367..00000000 --- a/common/src/indexer_service/http/metrics.rs +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use prometheus::{register_int_counter_vec, IntCounterVec}; - -pub struct IndexerServiceMetrics { - pub requests: IntCounterVec, - pub successful_requests: IntCounterVec, - pub failed_requests: IntCounterVec, -} - -impl IndexerServiceMetrics { - pub fn new(prefix: &str) -> Self { - IndexerServiceMetrics { - requests: register_int_counter_vec!( - format!("{prefix}_service_requests_total"), - "Incoming requests", - &["manifest"] - ) - .unwrap(), - - successful_requests: register_int_counter_vec!( - format!("{prefix}_service_requests_ok"), - "Successfully executed requests", - &["manifest"] - ) - .unwrap(), - - failed_requests: register_int_counter_vec!( - format!("{prefix}_service_requests_failed"), - "requests that failed to execute", - &["manifest"] - ) - .unwrap(), - } - } -} diff --git a/common/src/indexer_service/http/mod.rs b/common/src/indexer_service/http/mod.rs index 44c341f3..20f4df95 100644 --- a/common/src/indexer_service/http/mod.rs +++ b/common/src/indexer_service/http/mod.rs @@ -3,7 +3,6 @@ mod config; mod indexer_service; -mod metrics; mod request_handler; mod static_subgraph; mod tap_receipt_header; diff --git a/common/src/indexer_service/http/request_handler.rs b/common/src/indexer_service/http/request_handler.rs index f3ec6a98..c9cb8e02 100644 --- a/common/src/indexer_service/http/request_handler.rs +++ b/common/src/indexer_service/http/request_handler.rs @@ -10,6 +10,8 @@ use axum::{ response::IntoResponse, }; use axum_extra::TypedHeader; +use lazy_static::lazy_static; +use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec}; use reqwest::StatusCode; use thegraph_core::DeploymentId; use tracing::trace; @@ -22,9 +24,49 @@ use super::{ IndexerServiceImpl, }; -#[autometrics::autometrics] +lazy_static! { + /// Register indexer error metrics in Prometheus registry + pub static ref HANDLER_HISTOGRAM: HistogramVec = register_histogram_vec!( + "indexer_query_handler_seconds", + "Histogram for default indexer query handler", + &["deployment", "allocation", "sender"] + ).unwrap(); + + pub static ref HANDLER_FAILURE: CounterVec = register_counter_vec!( + "indexer_query_handler_failed_total", + "Failed queries to handler", + &["deployment"] + ).unwrap(); + + pub static ref FAILED_RECEIPT: CounterVec = register_counter_vec!( + "indexer_receipt_failed_total", + "Failed receipt checks", + &["deployment", "allocation", "sender"] + ).unwrap(); + +} + pub async fn request_handler( Path(manifest_id): Path, + typed_header: TypedHeader, + state: State>>, + headers: HeaderMap, + body: Bytes, +) -> Result> +where + I: IndexerServiceImpl + Sync + Send + 'static, +{ + _request_handler(manifest_id, typed_header, state, headers, body) + .await + .inspect_err(|_| { + HANDLER_FAILURE + .with_label_values(&[&manifest_id.to_string()]) + .inc() + }) +} + +async fn _request_handler( + manifest_id: DeploymentId, TypedHeader(receipt): TypedHeader, State(state): State>>, headers: HeaderMap, @@ -35,39 +77,11 @@ where { trace!("Handling request for deployment `{manifest_id}`"); - state - .metrics - .requests - .with_label_values(&[&manifest_id.to_string()]) - .inc(); - let request = serde_json::from_slice(&body).map_err(|e| IndexerServiceError::InvalidRequest(e.into()))?; - let attestation_signer = if let Some(receipt) = receipt.into_signed_receipt() { - let allocation_id = receipt.message.allocation_id; - - // Verify the receipt and store it in the database - // TODO update checks - state - .tap_manager - .verify_and_store_receipt(receipt) - .await - .map_err(IndexerServiceError::ReceiptError)?; - - // Check if we have an attestation signer for the allocation the receipt was created for - let signers = state - .attestation_signers - .value_immediate() - .ok_or_else(|| IndexerServiceError::ServiceNotReady)?; - - Some( - signers - .get(&allocation_id) - .cloned() - .ok_or_else(|| (IndexerServiceError::NoSignerForAllocation(allocation_id)))?, - ) - } else { + let Some(receipt) = receipt.into_signed_receipt() else { + // Serve free query, NO METRICS match headers .get("authorization") .and_then(|v| v.to_str().ok()) @@ -81,30 +95,94 @@ where } } } - None + + trace!(?manifest_id, "New free query"); + + let response = state + .service_impl + .process_request(manifest_id, request) + .await + .map_err(IndexerServiceError::ProcessingError)? + .1 + .finalize(AttestationOutput::Attestable); + return Ok((StatusCode::OK, response)); }; + let allocation_id = receipt.message.allocation_id; + + // recover the signer address + // get escrow accounts from eventual + // return sender from signer + // + // TODO: We are currently doing this process twice. + // One here and other on common/src/tap/checks/sender_balance_check.rs + // We'll get back to normal once we have attachable context to `verify_and_store_receipt` + let signer = receipt + .recover_signer(&state.domain_separator) + .map_err(IndexerServiceError::CouldNotDecodeSigner)?; + + let escrow_accounts = state + .escrow_accounts + .value_immediate() + .ok_or(IndexerServiceError::ServiceNotReady)?; + + let sender = escrow_accounts + .get_sender_for_signer(&signer) + .map_err(IndexerServiceError::EscrowAccount)?; + + let _metric = HANDLER_HISTOGRAM + .with_label_values(&[ + &manifest_id.to_string(), + &allocation_id.to_string(), + &sender.to_string(), + ]) + .start_timer(); + + // Verify the receipt and store it in the database + state + .tap_manager + .verify_and_store_receipt(receipt) + .await + .inspect_err(|_| { + FAILED_RECEIPT + .with_label_values(&[ + &manifest_id.to_string(), + &allocation_id.to_string(), + &sender.to_string(), + ]) + .inc() + }) + .map_err(IndexerServiceError::ReceiptError)?; + + // Check if we have an attestation signer for the allocation the receipt was created for + let signers = state + .attestation_signers + .value_immediate() + .ok_or_else(|| IndexerServiceError::ServiceNotReady)?; + + let signer = signers + .get(&allocation_id) + .cloned() + .ok_or_else(|| (IndexerServiceError::NoSignerForAllocation(allocation_id)))?; + let (request, response) = state .service_impl .process_request(manifest_id, request) .await .map_err(IndexerServiceError::ProcessingError)?; - let attestation = match attestation_signer { - Some(signer) => { - let req = serde_json::to_string(&request) - .map_err(|_| IndexerServiceError::FailedToSignAttestation)?; - let res = response - .as_str() - .map_err(|_| IndexerServiceError::FailedToSignAttestation)?; - AttestationOutput::Attestation( - response - .is_attestable() - .then(|| signer.create_attestation(&req, res)), - ) - } - None => AttestationOutput::Attestable, - }; + let req = serde_json::to_string(&request) + .map_err(|_| IndexerServiceError::FailedToSignAttestation)?; + + let res = response + .as_str() + .map_err(|_| IndexerServiceError::FailedToSignAttestation)?; + + let attestation = AttestationOutput::Attestation( + response + .is_attestable() + .then(|| signer.create_attestation(&req, res)), + ); let response = response.finalize(attestation); diff --git a/common/src/indexer_service/http/tap_receipt_header.rs b/common/src/indexer_service/http/tap_receipt_header.rs index a150846d..11e2cf8c 100644 --- a/common/src/indexer_service/http/tap_receipt_header.rs +++ b/common/src/indexer_service/http/tap_receipt_header.rs @@ -5,6 +5,7 @@ use std::ops::Deref; use axum_extra::headers::{self, Header, HeaderName, HeaderValue}; use lazy_static::lazy_static; +use prometheus::{register_counter, Counter}; use tap_core::receipt::SignedReceipt; #[derive(Debug, PartialEq)] @@ -26,6 +27,8 @@ impl Deref for TapReceipt { lazy_static! { static ref TAP_RECEIPT: HeaderName = HeaderName::from_static("tap-receipt"); + pub static ref TAP_RECEIPT_INVALID: Counter = + register_counter!("indexer_tap_invalid_total", "Invalid tap receipt decode",).unwrap(); } impl Header for TapReceipt { @@ -37,16 +40,19 @@ impl Header for TapReceipt { where I: Iterator, { - let value = values.next(); - let raw_receipt = value - .map(|value| value.to_str()) - .transpose() - .map_err(|_| headers::Error::invalid())?; - let parsed_receipt = raw_receipt - .map(serde_json::from_str) - .transpose() - .map_err(|_| headers::Error::invalid())?; - Ok(TapReceipt(parsed_receipt)) + let mut execute = || { + let value = values.next(); + let raw_receipt = value + .map(|value| value.to_str()) + .transpose() + .map_err(|_| headers::Error::invalid())?; + let parsed_receipt = raw_receipt + .map(serde_json::from_str) + .transpose() + .map_err(|_| headers::Error::invalid())?; + Ok(TapReceipt(parsed_receipt)) + }; + execute().inspect_err(|_| TAP_RECEIPT_INVALID.inc()) } fn encode(&self, _values: &mut E) diff --git a/common/src/lib.rs b/common/src/lib.rs index e31de7d3..63470d75 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -6,9 +6,7 @@ pub mod allocations; pub mod attestations; pub mod escrow_accounts; pub mod graphql; -pub mod indexer_errors; pub mod indexer_service; -pub mod metrics; pub mod subgraph_client; pub mod tap; @@ -23,7 +21,6 @@ pub mod prelude { dispute_manager::dispute_manager, signer::AttestationSigner, signers::attestation_signers, }; pub use super::escrow_accounts::escrow_accounts; - pub use super::indexer_errors; pub use super::subgraph_client::{DeploymentDetails, Query, QueryVariables, SubgraphClient}; pub use super::tap::IndexerTapContext; } diff --git a/common/src/metrics/mod.rs b/common/src/metrics/mod.rs deleted file mode 100644 index 9f13cd39..00000000 --- a/common/src/metrics/mod.rs +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use lazy_static::lazy_static; -use prometheus::{register_int_counter_vec, IntCounterVec}; - -lazy_static! { - /// Register indexer error metrics in Prometheus registry - pub static ref INDEXER_ERROR: IntCounterVec = register_int_counter_vec!( - "indexer_error", - "Indexer errors observed over time", - &["code"] - ).expect("Create indexer_error metrics"); -} diff --git a/service/Cargo.toml b/service/Cargo.toml index 610d3843..df088af0 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -10,6 +10,7 @@ license = "Apache-2.0" indexer-common = { path = "../common" } indexer-config = { path = "../config" } anyhow = { workspace = true } +prometheus = { workspace = true } reqwest = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } tracing.workspace = true diff --git a/service/src/routes/cost.rs b/service/src/routes/cost.rs index b16f9dce..08d4eb2c 100644 --- a/service/src/routes/cost.rs +++ b/service/src/routes/cost.rs @@ -7,6 +7,11 @@ use std::sync::Arc; use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject}; use async_graphql_axum::{GraphQLRequest, GraphQLResponse}; use axum::extract::State; +use lazy_static::lazy_static; +use prometheus::{ + register_counter, register_counter_vec, register_histogram, register_histogram_vec, Counter, + CounterVec, Histogram, HistogramVec, +}; use serde::{Deserialize, Serialize}; use serde_json::Value; use thegraph_core::DeploymentId; @@ -14,6 +19,46 @@ use thegraph_core::DeploymentId; use crate::database::{self, CostModel}; use crate::service::SubgraphServiceState; +lazy_static! { + pub static ref COST_MODEL_METRIC: HistogramVec = register_histogram_vec!( + "indexer_cost_model_seconds", + "Histogram metric for single cost model query", + &["deployment"] + ) + .unwrap(); + pub static ref COST_MODEL_FAILED: CounterVec = register_counter_vec!( + "indexer_cost_model_failed_total", + "Total failed Cost Model query", + &["deployment"] + ) + .unwrap(); + pub static ref COST_MODEL_INVALID: Counter = register_counter!( + "indexer_cost_model_invalid_total", + "Cost model queries with invalid deployment id", + ) + .unwrap(); + pub static ref COST_MODEL_BATCH_METRIC: Histogram = register_histogram!( + "indexer_cost_model_batch_seconds", + "Histogram metric for batch cost model query", + ) + .unwrap(); + pub static ref COST_MODEL_BATCH_SIZE: Histogram = register_histogram!( + "indexer_cost_model_batch_size", + "This shows the size of deployment ids cost model batch queries got", + ) + .unwrap(); + pub static ref COST_MODEL_BATCH_FAILED: Counter = register_counter!( + "indexer_cost_model_batch_failed_total", + "Total failed batch cost model queries", + ) + .unwrap(); + pub static ref COST_MODEL_BATCH_INVALID: Counter = register_counter!( + "indexer_cost_model_batch_invalid_total", + "Batch cost model queries with invalid deployment ids", + ) + .unwrap(); +} + #[derive(Clone, Debug, Serialize, Deserialize, SimpleObject)] pub struct GraphQlCostModel { pub deployment: String, @@ -36,6 +81,25 @@ pub struct Query; #[Object] impl Query { + async fn cost_model( + &self, + ctx: &Context<'_>, + deployment: String, + ) -> Result, anyhow::Error> { + let deployment_id = + DeploymentId::from_str(&deployment).inspect_err(|_| COST_MODEL_INVALID.inc())?; + + let metric_deployment = deployment.clone(); + let _timer = COST_MODEL_METRIC + .with_label_values(&[&metric_deployment]) + .start_timer(); + self._cost_model(ctx, deployment_id).await.inspect_err(|_| { + COST_MODEL_FAILED + .with_label_values(&[&metric_deployment]) + .inc() + }) + } + async fn cost_models( &self, ctx: &Context<'_>, @@ -44,18 +108,35 @@ impl Query { let deployment_ids = deployments .into_iter() .map(|s| DeploymentId::from_str(&s)) - .collect::, _>>()?; + .collect::, _>>() + .inspect_err(|_| COST_MODEL_BATCH_INVALID.inc())?; + + let _metric = COST_MODEL_BATCH_METRIC.start_timer(); + + COST_MODEL_BATCH_SIZE.observe(deployment_ids.len() as f64); + + self._cost_models(ctx, deployment_ids) + .await + .inspect_err(|_| COST_MODEL_BATCH_FAILED.inc()) + } +} + +impl Query { + async fn _cost_models( + &self, + ctx: &Context<'_>, + deployment_ids: Vec, + ) -> Result, anyhow::Error> { let pool = &ctx.data_unchecked::>().database; let cost_models = database::cost_models(pool, &deployment_ids).await?; Ok(cost_models.into_iter().map(|m| m.into()).collect()) } - async fn cost_model( + async fn _cost_model( &self, ctx: &Context<'_>, - deployment: String, + deployment_id: DeploymentId, ) -> Result, anyhow::Error> { - let deployment_id = DeploymentId::from_str(&deployment)?; let pool = &ctx.data_unchecked::>().database; database::cost_model(pool, &deployment_id) .await diff --git a/service/src/service.rs b/service/src/service.rs index f75da115..1203f85c 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -177,7 +177,6 @@ pub async fn run() -> anyhow::Result<()> { release, config: config.0.clone(), url_namespace: "subgraphs", - metrics_prefix: "subgraph", service_impl: SubgraphService::new(state.clone()), extra_routes: Router::new() .route("/cost", post(routes::cost::cost))