diff --git a/crates/rbuilder/src/building/block_orders/prioritized_order_store.rs b/crates/rbuilder/src/building/block_orders/prioritized_order_store.rs index ef7123f9..c0588d75 100644 --- a/crates/rbuilder/src/building/block_orders/prioritized_order_store.rs +++ b/crates/rbuilder/src/building/block_orders/prioritized_order_store.rs @@ -7,6 +7,7 @@ use priority_queue::PriorityQueue; use crate::{ building::Sorting, primitives::{AccountNonce, Nonce, OrderId, SimulatedOrder}, + telemetry::mark_order_not_ready_for_immediate_inclusion, }; use super::SimulatedOrderSink; @@ -125,7 +126,6 @@ impl PrioritizedOrderStore { for order_id in invalidated_orders { // check if order can still be valid because of optional nonces - self.main_queue.remove(&order_id); let order = self .remove_poped_order(&order_id) @@ -158,6 +158,8 @@ impl PrioritizedOrderStore { ); if retain_order { self.insert_order(order); + } else { + mark_order_not_ready_for_immediate_inclusion(&order_id); } } diff --git a/crates/rbuilder/src/building/builders/block_building_helper.rs b/crates/rbuilder/src/building/builders/block_building_helper.rs index f6e135d7..ec6df014 100644 --- a/crates/rbuilder/src/building/builders/block_building_helper.rs +++ b/crates/rbuilder/src/building/builders/block_building_helper.rs @@ -17,7 +17,7 @@ use crate::{ }, primitives::SimulatedOrder, provider::StateProviderFactory, - telemetry, + telemetry::{self, add_block_fill_time, add_order_simulation_time}, utils::{check_block_hash_reader_health, HistoricalBlockError}, }; @@ -218,10 +218,8 @@ where let gas_used = finalized_block.sealed_block.gas_used; let blobs = finalized_block.txs_blob_sidecars.len(); - telemetry::add_built_block_metrics( - built_block_trace.fill_time, - built_block_trace.finalize_time, - built_block_trace.root_hash_time, + telemetry::add_finalized_block_metrics( + built_block_trace, txs, blobs, gas_used, @@ -234,6 +232,7 @@ where block = building_ctx.block_env.number.to::(), build_time_mus = built_block_trace.fill_time.as_micros(), finalize_time_mus = built_block_trace.finalize_time.as_micros(), + root_hash_time_mus = built_block_trace.root_hash_time.as_micros(), profit = format_ether(built_block_trace.bid_value), builder_name = builder_name, txs, @@ -293,27 +292,35 @@ where &mut self, order: &SimulatedOrder, ) -> Result, CriticalCommitOrderError> { + let start = Instant::now(); let result = self.partial_block .commit_order(order, &self.building_ctx, &mut self.block_state); - match result { + let sim_time = start.elapsed(); + let (result, sim_ok) = match result { Ok(ok_result) => match ok_result { Ok(res) => { self.built_block_trace.add_included_order(res); - Ok(Ok(self.built_block_trace.included_orders.last().unwrap())) + ( + Ok(Ok(self.built_block_trace.included_orders.last().unwrap())), + true, + ) } Err(err) => { self.built_block_trace .modify_payment_when_no_signer_error(&err); - Ok(Err(err)) + (Ok(Err(err)), false) } }, - Err(e) => Err(e), - } + Err(e) => (Err(e), false), + }; + add_order_simulation_time(sim_time, &self.builder_name, sim_ok); + result } fn set_trace_fill_time(&mut self, time: Duration) { self.built_block_trace.fill_time = time; + add_block_fill_time(time, &self.builder_name, self.building_ctx.timestamp()) } fn set_trace_orders_closed_at(&mut self, orders_closed_at: OffsetDateTime) { @@ -369,7 +376,6 @@ where }; self.built_block_trace.update_orders_sealed_at(); self.built_block_trace.root_hash_time = finalized_block.root_hash_time; - self.built_block_trace.finalize_time = start_time.elapsed(); Self::trace_finalized_block( diff --git a/crates/rbuilder/src/building/builders/ordering_builder.rs b/crates/rbuilder/src/building/builders/ordering_builder.rs index ba5313f8..3660587b 100644 --- a/crates/rbuilder/src/building/builders/ordering_builder.rs +++ b/crates/rbuilder/src/building/builders/ordering_builder.rs @@ -15,6 +15,7 @@ use crate::{ }, primitives::{AccountNonce, OrderId}, provider::StateProviderFactory, + telemetry::mark_builder_considers_order, }; use ahash::{HashMap, HashSet}; use reth::revm::cached::CachedReads; @@ -258,6 +259,11 @@ where break; } } + mark_builder_considers_order( + sim_order.id(), + &block_building_helper.built_block_trace().orders_closed_at, + block_building_helper.builder_name(), + ); let start_time = Instant::now(); let commit_result = block_building_helper.commit_order(&sim_order)?; let order_commit_time = start_time.elapsed(); diff --git a/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs b/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs index 9fde7c2c..50f11e7c 100644 --- a/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs +++ b/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs @@ -19,6 +19,7 @@ use crate::{ BlockBuildingContext, }, provider::StateProviderFactory, + telemetry::mark_builder_considers_order, }; /// Assembles block building results from the best orderings of order groups. @@ -223,6 +224,11 @@ where let (order_idx, _) = sequence_of_orders.sequence_of_orders.remove(0); let sim_order = &order_group.orders[order_idx]; + mark_builder_considers_order( + sim_order.id(), + &block_building_helper.built_block_trace().orders_closed_at, + block_building_helper.builder_name(), + ); let start_time = Instant::now(); let commit_result = block_building_helper.commit_order(sim_order)?; let order_commit_time = start_time.elapsed(); diff --git a/crates/rbuilder/src/building/built_block_trace.rs b/crates/rbuilder/src/building/built_block_trace.rs index dd36e60c..0b3d6def 100644 --- a/crates/rbuilder/src/building/built_block_trace.rs +++ b/crates/rbuilder/src/building/built_block_trace.rs @@ -19,7 +19,9 @@ pub struct BuiltBlockTrace { pub true_bid_value: U256, /// Some bundle failed with BundleErr::NoSigner, we might want to switch to !use_suggested_fee_recipient_as_coinbase pub got_no_signer_error: bool, + /// Timestamp of the moment we stopped considering new orders for this block. pub orders_closed_at: OffsetDateTime, + /// Timestamp when this block was fully sealed and ready for submission. pub orders_sealed_at: OffsetDateTime, pub fill_time: Duration, pub finalize_time: Duration, diff --git a/crates/rbuilder/src/building/sim.rs b/crates/rbuilder/src/building/sim.rs index 26c6ec05..65059256 100644 --- a/crates/rbuilder/src/building/sim.rs +++ b/crates/rbuilder/src/building/sim.rs @@ -6,6 +6,7 @@ use crate::{ building::{BlockBuildingContext, BlockState, CriticalCommitOrderError}, primitives::{Order, OrderId, SimValue, SimulatedOrder}, provider::StateProviderFactory, + telemetry::{add_order_simulation_time, mark_order_pending_nonce}, utils::{NonceCache, NonceCacheRef}, }; use ahash::{HashMap, HashSet}; @@ -110,11 +111,14 @@ where let order_nonce_state = self.get_order_nonce_state(&order, nonces)?; + let order_id = order.id(); + match order_nonce_state { OrderNonceState::Invalid => { return Ok(()); } OrderNonceState::PendingNonces(pending_nonces) => { + mark_order_pending_nonce(order_id); let unsatisfied_nonces = pending_nonces.len(); for nonce in pending_nonces { self.pending_nonces @@ -429,6 +433,7 @@ pub fn simulate_order_using_fork( ctx: &BlockBuildingContext, fork: &mut PartialBlockFork<'_, '_, Tracer>, ) -> Result { + let start = Instant::now(); // simulate parents let mut gas_used = 0; let mut blob_gas_used = 0; @@ -452,6 +457,9 @@ pub fn simulate_order_using_fork( // simulate let result = fork.commit_order(&order, ctx, gas_used, 0, blob_gas_used, true)?; + let sim_time = start.elapsed(); + add_order_simulation_time(sim_time, "sim", result.is_ok()); // we count parent sim time + order sim time time here + match result { Ok(res) => { let sim_value = SimValue::new( diff --git a/crates/rbuilder/src/live_builder/block_output/relay_submit.rs b/crates/rbuilder/src/live_builder/block_output/relay_submit.rs index 84251fd5..98b10ea0 100644 --- a/crates/rbuilder/src/live_builder/block_output/relay_submit.rs +++ b/crates/rbuilder/src/live_builder/block_output/relay_submit.rs @@ -13,7 +13,7 @@ use crate::{ add_relay_submit_time, add_subsidy_value, inc_conn_relay_errors, inc_failed_block_simulations, inc_initiated_submissions, inc_other_relay_errors, inc_relay_accepted_submissions, inc_subsidized_blocks, inc_too_many_req_relay_errors, - measure_block_e2e_latency, + mark_submission_start_time, }, utils::{error_storage::store_error_event, tracing::dynamic_event}, validation_api_client::{ValidationAPIClient, ValidationError}, @@ -291,22 +291,33 @@ async fn run_submit_to_relays_job( (normal_signed_submission, optimistic_signed_submission) }; + mark_submission_start_time(block.trace.orders_sealed_at); + if config.dry_run { - validate_block( - &slot_data, - &normal_signed_submission.submission, - block.sealed_block.clone(), - &config, - cancel.clone(), - "Dry run", - ) - .instrument(submission_span) - .await; + tokio::spawn({ + let slot_data = slot_data.clone(); + let submission = normal_signed_submission.submission.clone(); + let sealed_block = block.sealed_block.clone(); + let config = config.clone(); + let cancel = cancel.clone(); + let submission_span = submission_span.clone(); + + async move { + validate_block( + &slot_data, + &submission, + sealed_block, + &config, + cancel, + "Dry run", + ) + .instrument(submission_span) + .await + } + }); continue 'submit; } - measure_block_e2e_latency(&block.trace.included_orders); - for relay in &normal_relays { let span = info_span!(parent: &submission_span, "relay_submit", relay = &relay.id(), optimistic = false); let relay = relay.clone(); diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index 1523755c..3eadb0bf 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -21,7 +21,7 @@ use crate::{ }, primitives::{MempoolTx, Order, TransactionSignedEcRecoveredWithBlobs}, provider::StateProviderFactory, - telemetry::inc_active_slots, + telemetry::{inc_active_slots, mark_building_started, reset_histogram_metrics}, utils::{ error_storage::spawn_error_storage_writer, provider_head_state::ProviderHeadState, Signer, }, @@ -204,6 +204,8 @@ where }; while let Some(payload) = payload_events_channel.recv().await { + reset_histogram_metrics(); + let blocklist = self.blocklist_provider.get_blocklist()?; if blocklist.contains(&payload.fee_recipient()) { warn!( @@ -278,13 +280,13 @@ where None, root_hasher, ) { + mark_building_started(block_ctx.timestamp()); builder_pool.start_block_building( payload, block_ctx, self.global_cancellation.clone(), time_until_slot_end.try_into().unwrap_or_default(), ); - if let Some(watchdog_sender) = watchdog_sender.as_ref() { watchdog_sender.try_send(()).unwrap_or_default(); }; diff --git a/crates/rbuilder/src/live_builder/order_input/rpc_server.rs b/crates/rbuilder/src/live_builder/order_input/rpc_server.rs index ee7dd0ea..bcb3b9ba 100644 --- a/crates/rbuilder/src/live_builder/order_input/rpc_server.rs +++ b/crates/rbuilder/src/live_builder/order_input/rpc_server.rs @@ -1,7 +1,10 @@ use super::{OrderInputConfig, ReplaceableOrderPoolCommand}; -use crate::primitives::{ - serialize::{RawBundle, RawShareBundle, RawShareBundleDecodeResult, RawTx, TxEncoding}, - Bundle, BundleReplacementKey, MempoolTx, Order, +use crate::{ + primitives::{ + serialize::{RawBundle, RawShareBundle, RawShareBundleDecodeResult, RawTx, TxEncoding}, + Bundle, BundleReplacementKey, MempoolTx, Order, + }, + telemetry::mark_command_received, }; use alloy_primitives::{Address, Bytes}; use jsonrpsee::{server::Server, types::ErrorObject, RpcModule}; @@ -10,6 +13,7 @@ use std::{ net::{SocketAddr, SocketAddrV4}, time::{Duration, Instant}, }; +use time::OffsetDateTime; use tokio::{ sync::{mpsc, mpsc::error::SendTimeoutError}, task::JoinHandle, @@ -42,6 +46,8 @@ pub async fn start_server_accepting_bundles( module.register_async_method("eth_sendBundle", move |params, _| { let results = results_clone.clone(); async move { + let received_at = OffsetDateTime::now_utc(); + let start = Instant::now(); let raw_bundle: RawBundle = match params.one() { Ok(raw_bundle) => raw_bundle, @@ -64,7 +70,7 @@ pub async fn start_server_accepting_bundles( let parse_duration = start.elapsed(); let target_block = order.target_block().unwrap_or_default(); trace!(order = ?order.id(), parse_duration_mus = parse_duration.as_micros(), target_block, "Received bundle"); - send_order(order, &results, timeout).await; + send_order(order, &results, timeout, received_at).await; } })?; @@ -80,9 +86,10 @@ pub async fn start_server_accepting_bundles( let results_clone = results.clone(); module.register_async_method("eth_sendRawTransaction", move |params, _| { - let start = Instant::now(); let results = results_clone.clone(); async move { + let received_at = OffsetDateTime::now_utc(); + let start = Instant::now(); let raw_tx: Bytes = match params.one() { Ok(raw_tx) => raw_tx, Err(err) => { @@ -105,7 +112,7 @@ pub async fn start_server_accepting_bundles( let order = Order::Tx(tx); let parse_duration = start.elapsed(); trace!(order = ?order.id(), parse_duration_mus = parse_duration.as_micros(), "Received mempool tx from API"); - send_order(order, &results, timeout).await; + send_order(order, &results, timeout, received_at).await; Ok(hash) } })?; @@ -134,6 +141,7 @@ async fn handle_mev_send_bundle( timeout: Duration, params: jsonrpsee::types::Params<'static>, ) { + let received_at = OffsetDateTime::now_utc(); let start = Instant::now(); let raw_bundle: RawShareBundle = match params.one() { Ok(raw_bundle) => raw_bundle, @@ -157,7 +165,7 @@ async fn handle_mev_send_bundle( let parse_duration = start.elapsed(); let target_block = order.target_block().unwrap_or_default(); trace!(order = ?order.id(), parse_duration_mus = parse_duration.as_micros(), target_block, "Received share bundle"); - send_order(order, &results, timeout).await; + send_order(order, &results, timeout, received_at).await; } RawShareBundleDecodeResult::CancelShareBundle(cancel) => { trace!(cancel = ?cancel, "Received share bundle cancellation"); @@ -165,6 +173,7 @@ async fn handle_mev_send_bundle( ReplaceableOrderPoolCommand::CancelShareBundle(cancel), &results, timeout, + received_at, ) .await; } @@ -175,8 +184,15 @@ async fn send_order( order: Order, channel: &mpsc::Sender, timeout: Duration, + received_at: OffsetDateTime, ) { - send_command(ReplaceableOrderPoolCommand::Order(order), channel, timeout).await; + send_command( + ReplaceableOrderPoolCommand::Order(order), + channel, + timeout, + received_at, + ) + .await; } /// Eats the errors and traces them. @@ -184,7 +200,9 @@ async fn send_command( command: ReplaceableOrderPoolCommand, channel: &mpsc::Sender, timeout: Duration, + received_at: OffsetDateTime, ) { + mark_command_received(&command, received_at); match channel.send_timeout(command, timeout).await { Ok(()) => {} Err(SendTimeoutError::Timeout(_)) => { @@ -208,6 +226,7 @@ async fn handle_cancel_bundle( timeout: Duration, params: jsonrpsee::types::Params<'static>, ) { + let received_at = OffsetDateTime::now_utc(); let cancel_bundle: RawCancelBundle = match params.one() { Ok(cancel_bundle) => cancel_bundle, Err(err) => { @@ -224,6 +243,7 @@ async fn handle_cancel_bundle( ReplaceableOrderPoolCommand::CancelBundle(key), &results, timeout, + received_at, ) .await; } diff --git a/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs b/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs index 72eda083..7fe7bd94 100644 --- a/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs +++ b/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs @@ -1,13 +1,14 @@ use super::{OrderInputConfig, ReplaceableOrderPoolCommand}; use crate::{ primitives::{MempoolTx, Order, TransactionSignedEcRecoveredWithBlobs}, - telemetry::add_txfetcher_time_to_query, + telemetry::{add_txfetcher_time_to_query, mark_command_received}, }; use alloy_primitives::{hex, Bytes, FixedBytes}; use alloy_provider::{IpcConnect, Provider, ProviderBuilder, RootProvider}; use alloy_pubsub::PubSubFrontend; use futures::StreamExt; use std::{pin::pin, time::Instant}; +use time::OffsetDateTime; use tokio::{ sync::{mpsc, mpsc::error::SendTimeoutError}, task::JoinHandle, @@ -45,6 +46,7 @@ pub async fn subscribe_to_txpool_with_blobs( let mut stream = pin!(stream); while let Some(tx_hash) = stream.next().await { + let received_at = OffsetDateTime::now_utc(); let start = Instant::now(); let tx_with_blobs = match get_tx_with_blobs(tx_hash, &provider).await { @@ -65,11 +67,10 @@ pub async fn subscribe_to_txpool_with_blobs( trace!(order = ?order.id(), parse_duration_mus = parse_duration.as_micros(), "Mempool transaction received with blobs"); add_txfetcher_time_to_query(parse_duration); + let orderpool_command = ReplaceableOrderPoolCommand::Order(order); + mark_command_received(&orderpool_command, received_at); match results - .send_timeout( - ReplaceableOrderPoolCommand::Order(order), - config.results_channel_timeout, - ) + .send_timeout(orderpool_command, config.results_channel_timeout) .await { Ok(()) => {} diff --git a/crates/rbuilder/src/live_builder/simulation/sim_worker.rs b/crates/rbuilder/src/live_builder/simulation/sim_worker.rs index a0412c4d..2390efe8 100644 --- a/crates/rbuilder/src/live_builder/simulation/sim_worker.rs +++ b/crates/rbuilder/src/live_builder/simulation/sim_worker.rs @@ -5,8 +5,7 @@ use crate::{ }, live_builder::simulation::CurrentSimulationContexts, provider::StateProviderFactory, - telemetry, - telemetry::add_sim_thread_utilisation_timings, + telemetry::{self, add_sim_thread_utilisation_timings, mark_order_simulation_end}, }; use parking_lot::Mutex; use reth::revm::cached::CachedReads; @@ -64,6 +63,7 @@ pub fn run_sim_worker

( break; } }; + let order_id = task.order.id(); let start_time = Instant::now(); let mut block_state = BlockState::new(state_provider).with_cached_reads(cached_reads); let sim_result = simulate_order( @@ -72,7 +72,7 @@ pub fn run_sim_worker

( ¤t_sim_context.block_ctx, &mut block_state, ); - match sim_result { + let sim_ok = match sim_result { Ok(sim_result) => { let sim_ok = match sim_result.result { OrderSimResult::Success(simulated_order, nonces_after) => { @@ -96,15 +96,17 @@ pub fn run_sim_worker

( }; telemetry::inc_simulated_orders(sim_ok); telemetry::inc_simulation_gas_used(sim_result.gas_used); + sim_ok } Err(err) => { - error!(?err, "Critical error while simulating order"); + error!(?err, ?order_id, "Critical error while simulating order"); // @Metric break; } - } + }; (cached_reads, _, _) = block_state.into_parts(); + mark_order_simulation_end(order_id, sim_ok); last_sim_finished = Instant::now(); let sim_thread_work_time = sim_start.elapsed(); add_sim_thread_utilisation_timings( diff --git a/crates/rbuilder/src/telemetry/metrics.rs b/crates/rbuilder/src/telemetry/metrics/mod.rs similarity index 70% rename from crates/rbuilder/src/telemetry/metrics.rs rename to crates/rbuilder/src/telemetry/metrics/mod.rs index e4d34c58..86800030 100644 --- a/crates/rbuilder/src/telemetry/metrics.rs +++ b/crates/rbuilder/src/telemetry/metrics/mod.rs @@ -6,8 +6,8 @@ //! When metric server is spawned is serves prometheus metrics at: /debug/metrics/prometheus #![allow(unexpected_cfgs)] +use crate::building::BuiltBlockTrace; use crate::{ - building::ExecutionResult, live_builder::block_list_provider::{blocklist_hash, BlockList}, primitives::mev_boost::MevBoostRelayID, utils::build_info::Version, @@ -21,10 +21,16 @@ use prometheus::{ Counter, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, }; +use std::sync::{Arc, Mutex}; use std::time::Duration; +use std::time::Instant; use time::OffsetDateTime; use tracing::error; +mod tracing_metrics; + +pub use tracing_metrics::*; + const SUBSIDY_ATTEMPT: &str = "attempt"; const SUBSIDY_LANDED: &str = "landed"; @@ -32,34 +38,29 @@ const RELAY_ERROR_CONNECTION: &str = "conn"; const RELAY_ERROR_TOO_MANY_REQUESTS: &str = "too_many"; const RELAY_ERROR_OTHER: &str = "other"; +const SIM_STATUS_OK: &str = "sim_success"; +const SIM_STATUS_FAIL: &str = "sim_fail"; + /// We record timestamps only for blocks built within interval of the block timestamp const BLOCK_METRICS_TIMESTAMP_LOWER_DELTA: time::Duration = time::Duration::seconds(3); /// We record timestamps only for blocks built within interval of the block timestamp const BLOCK_METRICS_TIMESTAMP_UPPER_DELTA: time::Duration = time::Duration::seconds(2); +fn is_now_close_to_slot_end(block_timestamp: OffsetDateTime) -> bool { + let now = OffsetDateTime::now_utc(); + let too_early = now < block_timestamp - BLOCK_METRICS_TIMESTAMP_LOWER_DELTA; + let too_late = block_timestamp + BLOCK_METRICS_TIMESTAMP_UPPER_DELTA < now; + !too_early && !too_late +} + lazy_static! { pub static ref REGISTRY: Registry = Registry::new(); } register_metrics! { - pub static BLOCK_FILL_TIME: HistogramVec = HistogramVec::new( - HistogramOpts::new("block_fill_time", "Block Fill Times (ms)") - .buckets(exponential_buckets_range(1.0, 3000.0, 100)), - &["builder_name"] - ) - .unwrap(); - pub static BLOCK_FINALIZE_TIME: HistogramVec = HistogramVec::new( - HistogramOpts::new("block_finalize_time", "Block Finalize Times (ms)") - .buckets(exponential_buckets_range(1.0, 3000.0, 100)), - &["builder_name"] - ) - .unwrap(); - pub static BLOCK_ROOT_HASH_TIME: HistogramVec = HistogramVec::new( - HistogramOpts::new("block_root_hash_time", "Block Root Hash Time (ms)") - .buckets(exponential_buckets_range(1.0, 2000.0, 100)), - &["builder_name"] - ) - .unwrap(); + + // Statistics about finalized blocks + pub static BLOCK_BUILT_TXS: HistogramVec = HistogramVec::new( HistogramOpts::new("block_built_txs", "Transactions in the built block") .buckets(linear_buckets_range(1.0, 1000.0, 100)), @@ -87,27 +88,30 @@ register_metrics! { &["builder_name"] ) .unwrap(); - pub static BLOCK_BUILT_MGAS_PER_SECOND: HistogramVec = HistogramVec::new( - HistogramOpts::new( - "block_built_mgas_per_second", - "MGas/s for the built block (including failing txs)" - ) - .buckets(linear_buckets_range(1.0, 1000.0, 100)), - &["builder_name"] - ) - .unwrap(); + + pub static BLOCK_VALIDATION_TIME: HistogramVec = HistogramVec::new( HistogramOpts::new("block_validation_time", "Block Validation Times (ms)") .buckets(exponential_buckets_range(1.0, 3000.0, 100)), &[] ) .unwrap(); + + + pub static CURRENT_BLOCK: IntGauge = IntGauge::new("current_block", "Current Block").unwrap(); pub static ORDERPOOL_TXS: IntGauge = IntGauge::new("orderpool_txs", "Transactions In The Orderpool").unwrap(); pub static ORDERPOOL_BUNDLES: IntGauge = IntGauge::new("orderpool_bundles", "Bundles In The Orderpool").unwrap(); + + pub static ORDERPOOL_ORDERS_RECEIVED: IntCounterVec = IntCounterVec::new( + Opts::new("orderpool_commands_received", "counter of orders received"), + &["kind"] + ) + .unwrap(); + pub static RELAY_ERRORS: IntCounterVec = IntCounterVec::new( Opts::new("relay_errors", "counter of relay errors"), &["relay", "kind"] @@ -173,16 +177,6 @@ register_metrics! { &["worker_id"] ) .unwrap(); - pub static ORDERS_IN_LAST_BUILT_BLOCK_E2E_LAT_MS: HistogramVec = HistogramVec::new( - HistogramOpts::new( - "orders_in_last_built_block_e2e_lat", - "For all blocks that are ready for submission to the relay its = min over orders (submission start - order received)" - ) - .buckets(exponential_buckets_range(0.5, 3_000.0, 30)), - &[] - ) - .unwrap(); - pub static PROVIDER_REOPEN_COUNTER: IntCounter = IntCounter::new( "provider_reopen_counter", "Counter of provider reopens").unwrap(); @@ -223,6 +217,109 @@ register_metrics! { pub static TOTAL_LANDED_SUBSIDIES_SUM: Counter = Counter::new("total_landed_subsidies_sum", "Sum of all total landed subsidies").unwrap(); + + + // Performance metrics related to E2E latency + + // Metrics for important step of the block processing + pub static BLOCK_FILL_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("block_fill_time", "Block Fill Times (ms)") + .buckets(exponential_buckets_range(1.0, 3000.0, 100)), + &["builder_name"] + ) + .unwrap(); + pub static BLOCK_FINALIZE_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("block_finalize_time", "Block Finalize Times (ms)") + .buckets(exponential_buckets_range(1.0, 3000.0, 100)), + &[] + ) + .unwrap(); + pub static BLOCK_ROOT_HASH_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("block_root_hash_time", "Block Root Hash Time (ms)") + .buckets(exponential_buckets_range(1.0, 2000.0, 100)), + &[] + ) + .unwrap(); + pub static ORDER_SIMULATION_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("order_simulation_time", "Order Simulation Time (ms)") + .buckets(exponential_buckets_range(0.01, 200.0, 200)), + &["builder_name", "status"] + ) + .unwrap(); + + // E2E tracing metrics + // The goal of these two metrics is: + // 1. Cover as many lines of code as possible without any gaps. + // 2. Show E2E latency of the order that could be executed immediately and also arrived towards the end of the slot. + // The path of order goes as follows: + // Received -> Simulated -> (builders start to build a block with it) -> block sealed -> block submit started + pub static ORDER_RECEIVED_TO_SIM_END_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("order_received_to_sim_end_time", "Time between when the order was received and top of the block simulation ended for orders that arrive after slot start. (ms)") + .buckets(exponential_buckets_range(0.01, 200.0, 200)), + &["status"] + ) + .unwrap(); + pub static ORDER_SIM_END_TO_FIRST_BUILD_STARTED_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("order_sim_end_to_first_build_started_time", "Time between when the order simulation ended and the builder started to build first block with it. (ms)") + .buckets(exponential_buckets_range(0.01, 300.0, 300)), + &["builder_name"] + ) + .unwrap(); + pub static ORDER_SIM_END_TO_FIRST_BUILD_STARTED_MIN_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("order_sim_end_to_first_build_started_min_time", "Time between when the order simulation ended and the first builder started to build first block with it. (ms)") + .buckets(exponential_buckets_range(0.01, 300.0, 300)), + &["builder_name"] + ) + .unwrap(); + pub static BLOCK_FILL_START_SEAL_END_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("block_build_start_seal_end_time", "Time between when the block build started and the block sealed ended. (ms)") + .buckets(exponential_buckets_range(0.01, 500.0, 300)), + &["builder_name"] + ) + .unwrap(); + pub static BLOCK_SEAL_END_SUBMIT_START_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("block_seal_end_submit_start_time", "Time between when the block sealed ended and the block submission started. (ms)") + .buckets(exponential_buckets_range(0.01, 500.0, 300)), + &[] + ) + .unwrap(); +} + +// This function should be called periodically to reset histogram metrics. +// If metrics are not reset histogram quantiles become rigid. +// Reset period is 10 minutes. +pub fn reset_histogram_metrics() { + const HISTOGRAM_METRIC_RESET_PERIOD: Duration = Duration::from_secs(10 * 60); + + lazy_static! { + static ref LAST_RESET: Arc> = Arc::new(Mutex::new(Instant::now())); + } + + let now = Instant::now(); + let mut last_reset = LAST_RESET.lock().unwrap(); + if now.duration_since(*last_reset) < HISTOGRAM_METRIC_RESET_PERIOD { + return; + } + *last_reset = now; + + // Reset all histogram metrics + BLOCK_BUILT_TXS.reset(); + BLOCK_BUILT_BLOBS.reset(); + BLOCK_BUILT_GAS_USED.reset(); + BLOCK_BUILT_SIM_GAS_USED.reset(); + BLOCK_VALIDATION_TIME.reset(); + BLOCK_FILL_TIME.reset(); + BLOCK_FINALIZE_TIME.reset(); + BLOCK_ROOT_HASH_TIME.reset(); + ORDER_SIMULATION_TIME.reset(); + RELAY_SUBMIT_TIME.reset(); + TXFETCHER_TRANSACTION_QUERY_TIME.reset(); + SUBSIDY_VALUE.reset(); + ORDER_RECEIVED_TO_SIM_END_TIME.reset(); + ORDER_SIM_END_TO_FIRST_BUILD_STARTED_TIME.reset(); + ORDER_SIM_END_TO_FIRST_BUILD_STARTED_MIN_TIME.reset(); + BLOCK_FILL_START_SEAL_END_TIME.reset(); + BLOCK_SEAL_END_SUBMIT_START_TIME.reset(); } pub(super) fn set_version(version: Version) { @@ -289,10 +386,8 @@ pub fn set_ordepool_count(txs: usize, bundles: usize) { } #[allow(clippy::too_many_arguments)] -pub fn add_built_block_metrics( - build_time: Duration, - finalize_time: Duration, - root_hash_time: Duration, +pub fn add_finalized_block_metrics( + built_block_trace: &BuiltBlockTrace, txs: usize, blobs: usize, gas_used: u64, @@ -300,22 +395,17 @@ pub fn add_built_block_metrics( builder_name: &str, block_timestamp: OffsetDateTime, ) { - let now = OffsetDateTime::now_utc(); - if now < block_timestamp - BLOCK_METRICS_TIMESTAMP_LOWER_DELTA - || block_timestamp + BLOCK_METRICS_TIMESTAMP_UPPER_DELTA < now - { + if !is_now_close_to_slot_end(block_timestamp) { return; } - BLOCK_FILL_TIME - .with_label_values(&[builder_name]) - .observe(build_time.as_millis() as f64); BLOCK_FINALIZE_TIME - .with_label_values(&[builder_name]) - .observe(finalize_time.as_millis() as f64); + .with_label_values(&[]) + .observe(built_block_trace.finalize_time.as_micros() as f64 / 1000.0); BLOCK_ROOT_HASH_TIME - .with_label_values(&[builder_name]) - .observe(root_hash_time.as_millis() as f64); + .with_label_values(&[]) + .observe(built_block_trace.root_hash_time.as_micros() as f64 / 1000.0); + BLOCK_BUILT_TXS .with_label_values(&[builder_name]) .observe(txs as f64); @@ -328,11 +418,26 @@ pub fn add_built_block_metrics( BLOCK_BUILT_SIM_GAS_USED .with_label_values(&[builder_name]) .observe(sim_gas_used as f64); - BLOCK_BUILT_MGAS_PER_SECOND + + let build_start_seal_end_time = + (built_block_trace.orders_sealed_at - built_block_trace.orders_closed_at).as_seconds_f64() + * 1000.0; + BLOCK_FILL_START_SEAL_END_TIME + .with_label_values(&[builder_name]) + .observe(build_start_seal_end_time); +} + +pub fn add_block_fill_time( + duration: Duration, + builder_name: &str, + block_timestamp: OffsetDateTime, +) { + if !is_now_close_to_slot_end(block_timestamp) { + return; + } + BLOCK_FILL_TIME .with_label_values(&[builder_name]) - .observe( - (sim_gas_used as f64) / ((build_time.as_micros() + finalize_time.as_micros()) as f64), - ); + .observe(duration.as_micros() as f64 / 1000.0); } pub fn add_block_validation_time(duration: Duration) { @@ -392,30 +497,6 @@ pub fn add_sim_thread_utilisation_timings( .inc_by(wait_time.as_micros() as u64); } -pub fn measure_block_e2e_latency(included_orders: &[ExecutionResult]) { - let submission_time = OffsetDateTime::now_utc(); - - let mut min_latency = None; - for order in included_orders { - let latency_ms = (submission_time - order.order.metadata().received_at_timestamp) - .as_seconds_f64() - * 1000.0; - if let Some(current_mint) = min_latency { - if latency_ms > 0.0 && latency_ms < current_mint { - min_latency = Some(latency_ms); - } - } else if latency_ms > 0.0 { - min_latency = Some(latency_ms); - } - } - - if let Some(min_latency) = min_latency { - ORDERS_IN_LAST_BUILT_BLOCK_E2E_LAT_MS - .with_label_values(&[]) - .observe(min_latency); - } -} - /// landed vs attempt fn subsidized_label(landed: bool) -> &'static str { if landed { @@ -441,6 +522,29 @@ pub fn add_subsidy_value(value: U256, landed: bool) { } } +fn sim_status(success: bool) -> &'static str { + if success { + SIM_STATUS_OK + } else { + SIM_STATUS_FAIL + } +} + +pub fn add_order_simulation_time(duration: Duration, builder_name: &str, success: bool) { + ORDER_SIMULATION_TIME + .with_label_values(&[builder_name, sim_status(success)]) + .observe(duration.as_micros() as f64 / 1000.0); +} + +pub fn mark_submission_start_time(block_sealed_at: OffsetDateTime) { + // we don't check if we are close to slot end because submission code handles that + let now = OffsetDateTime::now_utc(); + let value = (now - block_sealed_at).as_seconds_f64() * 1000.0; + BLOCK_SEAL_END_SUBMIT_START_TIME + .with_label_values(&[]) + .observe(value); +} + pub(super) fn gather_prometheus_metrics() -> String { use prometheus::Encoder; let encoder = prometheus::TextEncoder::new(); diff --git a/crates/rbuilder/src/telemetry/metrics/tracing_metrics.rs b/crates/rbuilder/src/telemetry/metrics/tracing_metrics.rs new file mode 100644 index 00000000..0cfbab6c --- /dev/null +++ b/crates/rbuilder/src/telemetry/metrics/tracing_metrics.rs @@ -0,0 +1,266 @@ +/// Tracing metrics are used to get a fine grained look at the path of the order through the builder. +/// To start collecting this metric mark_building_started must be called at the start of each slot. +use crate::{ + live_builder::order_input::ReplaceableOrderPoolCommand, + primitives::{Order, OrderId}, +}; +use ahash::RandomState; +use dashmap::{DashMap, DashSet}; +use lazy_static::lazy_static; +use std::sync::{Arc, RwLock}; +use time::OffsetDateTime; + +use super::{ + sim_status, BLOCK_METRICS_TIMESTAMP_LOWER_DELTA, BLOCK_METRICS_TIMESTAMP_UPPER_DELTA, + ORDERPOOL_ORDERS_RECEIVED, ORDER_RECEIVED_TO_SIM_END_TIME, + ORDER_SIM_END_TO_FIRST_BUILD_STARTED_MIN_TIME, ORDER_SIM_END_TO_FIRST_BUILD_STARTED_TIME, +}; + +type Timestamp = u64; // timestamp in microseconds +type BuilderId = u64; // integer id to minimize string cloning + +#[derive(Debug, Default)] +struct TracingMetricsData { + last_slot_critical_period: Arc>, + + builder_by_name: Arc>, + + // All fields below must be cleaned once per slot in `mark_building_started` + orders_received: Arc>, + orders_with_pending_nonces: Arc>, + orders_simulation_end: Arc>, + + orders_not_ready_for_immediate_inclusion: Arc>, + orders_first_insertion_block_seal_start_by_builder: + Arc>, + orders_first_insertion_block_seal_start: + Arc>, +} + +lazy_static! { + static ref METRICS_TRACING_REGISTRY: TracingMetricsData = TracingMetricsData::default(); +} + +// this should be called on each of the tracing metric invocation to prevent memory leak when +// mark_building_started is not called +fn should_record_tracing_metric(timestamp: &OffsetDateTime) -> bool { + let (start, end) = *METRICS_TRACING_REGISTRY + .last_slot_critical_period + .read() + .unwrap(); + if start == 0 || end == 0 { + return false; + } + let time = offset_datetime_to_timestamp_us(timestamp); + + let too_early = time < start; + let too_late = time > end; + !too_early && !too_late +} + +fn get_builder_id(builder_name: &str) -> BuilderId { + if let Some(id) = METRICS_TRACING_REGISTRY.builder_by_name.get(builder_name) { + return *id; + } + let id: u64 = rand::random(); + METRICS_TRACING_REGISTRY + .builder_by_name + .insert(builder_name.to_string(), id); + id +} + +/// mark_building_started should be called on each slot start to mark building starting time and to clean accumulated data. +/// If its not called tracing data is not collected. +pub fn mark_building_started(block_timestamp: OffsetDateTime) { + let reg = &METRICS_TRACING_REGISTRY; + { + let start = (block_timestamp - BLOCK_METRICS_TIMESTAMP_LOWER_DELTA).unix_timestamp_nanos() + as u64 + / 1000; + let end = (block_timestamp + BLOCK_METRICS_TIMESTAMP_UPPER_DELTA).unix_timestamp_nanos() + as u64 + / 1000; + let mut last_slot_period = reg.last_slot_critical_period.write().unwrap(); + *last_slot_period = (start, end); + } + + reg.orders_received.clear(); + reg.orders_with_pending_nonces.clear(); + reg.orders_simulation_end.clear(); + reg.orders_not_ready_for_immediate_inclusion.clear(); + reg.orders_first_insertion_block_seal_start_by_builder + .clear(); + reg.orders_first_insertion_block_seal_start.clear(); +} + +/// This should be called when ordrepool command appears in the builder. It can be a new order or order replacement. +pub fn mark_command_received(command: &ReplaceableOrderPoolCommand, received_at: OffsetDateTime) { + let kind = match command { + ReplaceableOrderPoolCommand::Order(order) => { + mark_order_received(order.id(), received_at); + match order { + Order::Bundle(_) => "bundle", + Order::Tx(_) => "tx", + Order::ShareBundle(_) => "sbundle", + } + } + ReplaceableOrderPoolCommand::CancelShareBundle(_) + | ReplaceableOrderPoolCommand::CancelBundle(_) => "replacement", + }; + ORDERPOOL_ORDERS_RECEIVED.with_label_values(&[kind]).inc(); +} + +fn mark_order_received(id: OrderId, received_at: OffsetDateTime) { + if !should_record_tracing_metric(&received_at) { + return; + } + + if METRICS_TRACING_REGISTRY.orders_received.contains_key(&id) { + return; + } + let timestamp = offset_datetime_to_timestamp_us(&received_at); + METRICS_TRACING_REGISTRY + .orders_received + .insert(id, timestamp); +} + +/// mark_order_pending_nonce should be called when order that was received can't be simulated immediately because of the nonce. +pub fn mark_order_pending_nonce(id: OrderId) { + let now = OffsetDateTime::now_utc(); + if !should_record_tracing_metric(&now) { + return; + } + + METRICS_TRACING_REGISTRY + .orders_with_pending_nonces + .insert(id); +} + +/// mark_order_simulation_end should be called when order top of block simulation ends. +pub fn mark_order_simulation_end(id: OrderId, success: bool) { + let now = OffsetDateTime::now_utc(); + if !should_record_tracing_metric(&now) { + return; + } + + let received_at = if let Some(ts) = METRICS_TRACING_REGISTRY.orders_received.get(&id) { + *ts + } else { + return; + }; + + if METRICS_TRACING_REGISTRY + .orders_simulation_end + .contains_key(&id) + { + return; + } + + // we con't record metrics for ordrers that were stuck due to nonce + if METRICS_TRACING_REGISTRY + .orders_with_pending_nonces + .contains(&id) + { + return; + } + + let now = offset_datetime_to_timestamp_us(&now); + METRICS_TRACING_REGISTRY + .orders_simulation_end + .insert(id, now); + + let received_to_sim_end_time_ms = if received_at < now { + let time_us = (now - received_at) as f64; + time_us / 1000.0 + } else { + return; + }; + + ORDER_RECEIVED_TO_SIM_END_TIME + .with_label_values(&[sim_status(success)]) + .observe(received_to_sim_end_time_ms); +} + +/// mark_order_not_ready_for_immediate_inclusion should be called if order can't be included immediatly. +/// For example, if it was invalidated by nonce by other order inclusion. +pub fn mark_order_not_ready_for_immediate_inclusion(order_id: &OrderId) { + let now = OffsetDateTime::now_utc(); + if !should_record_tracing_metric(&now) { + return; + } + + if METRICS_TRACING_REGISTRY + .orders_not_ready_for_immediate_inclusion + .contains(order_id) + { + return; + }; + METRICS_TRACING_REGISTRY + .orders_not_ready_for_immediate_inclusion + .insert(*order_id); +} + +/// mark_builder_considers_order should be called when builder considers order for inclusion +/// order_closed_at is a time at which builder stopped considering new orders for the current run +pub fn mark_builder_considers_order( + order_id: OrderId, + order_closed_at: &OffsetDateTime, + builder_name: &str, +) { + if !should_record_tracing_metric(order_closed_at) { + return; + } + + let builder_id = get_builder_id(builder_name); + if METRICS_TRACING_REGISTRY + .orders_first_insertion_block_seal_start_by_builder + .contains_key(&(order_id, builder_id)) + { + return; + } + + let order_sim_end_time = METRICS_TRACING_REGISTRY + .orders_simulation_end + .get(&order_id) + .map(|r| *r) + .unwrap_or_default(); + let ready_for_immediate_inclusion = METRICS_TRACING_REGISTRY + .orders_not_ready_for_immediate_inclusion + .contains(&order_id); + + let timestamp = offset_datetime_to_timestamp_us(order_closed_at); + let min_time_set = if !METRICS_TRACING_REGISTRY + .orders_first_insertion_block_seal_start + .contains_key(&order_id) + { + METRICS_TRACING_REGISTRY + .orders_first_insertion_block_seal_start + .insert(order_id, (builder_id, timestamp)); + true + } else { + false + }; + + METRICS_TRACING_REGISTRY + .orders_first_insertion_block_seal_start_by_builder + .insert((order_id, builder_id), timestamp); + + if order_sim_end_time == 0 || order_sim_end_time > timestamp || ready_for_immediate_inclusion { + return; + } + + ORDER_SIM_END_TO_FIRST_BUILD_STARTED_TIME + .with_label_values(&[builder_name]) + .observe((timestamp - order_sim_end_time) as f64 / 1000.0); + if min_time_set { + ORDER_SIM_END_TO_FIRST_BUILD_STARTED_MIN_TIME + .with_label_values(&[builder_name]) + .observe((timestamp - order_sim_end_time) as f64 / 1000.0); + } +} + +fn offset_datetime_to_timestamp_us(dt: &OffsetDateTime) -> Timestamp { + (dt.unix_timestamp_nanos() / 1_000) + .try_into() + .unwrap_or_default() +}