diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index fa750d7..1496101 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -108,12 +108,7 @@ pub fn initialize_tracing_log() { Level::INFO }; - tracing_subscriber::fmt() - .compact() - .with_max_level(level) - .with_target(true) - .with_file(true) - .init(); + tracing_subscriber::fmt().compact().with_max_level(level).with_target(true).init(); } pub fn print_logo() { diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index 10cd87a..49b86bc 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -1,7 +1,7 @@ -use std::{sync::Arc, time::Duration}; +use std::{ops::Mul, sync::Arc, time::Duration}; use alloy::{ - primitives::{B256, U256}, + primitives::{utils::format_ether, B256, U256}, rpc::types::beacon::BlsPublicKey, }; use axum::http::{HeaderMap, HeaderValue}; @@ -10,7 +10,7 @@ use cb_common::{ pbs::{RelayEntry, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS}, signature::verify_signed_builder_message, types::Chain, - utils::{get_user_agent, utcnow_ms, wei_to_eth}, + utils::{get_user_agent, utcnow_ms}, }; use futures::future::join_all; use reqwest::{header::USER_AGENT, StatusCode}; @@ -36,12 +36,11 @@ pub async fn get_header( let slot_uuid = state.get_or_update_slot_uuid(slot); // prepare headers - let ua = get_user_agent(&req_headers); let mut send_headers = HeaderMap::new(); send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?); send_headers .insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?); - if let Some(ua) = ua { + if let Some(ua) = get_user_agent(&req_headers) { send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?); } @@ -75,6 +74,7 @@ pub async fn get_header( Ok(state.add_bids(slot, relay_bids)) } +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))] async fn send_get_header( headers: HeaderMap, slot: u64, @@ -95,35 +95,39 @@ async fn send_get_header( .headers(headers) .send() .await?; - timer.observe_duration(); + let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64; - let status = res.status(); - RELAY_STATUS_CODE - .with_label_values(&[status.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id]) - .inc(); + let code = res.status(); + RELAY_STATUS_CODE.with_label_values(&[code.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id]).inc(); let response_bytes = res.bytes().await?; - if !status.is_success() { + if !code.is_success() { return Err(PbsError::RelayResponse { error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), - code: status.as_u16(), + code: code.as_u16(), }); }; - debug!( - method = "get_header", - relay = relay.id, - code = status.as_u16(), - response = ?response_bytes, - "received response" - ); - - if status == StatusCode::NO_CONTENT { + if code == StatusCode::NO_CONTENT { + debug!( + ?code, + latency_ms, + response = ?response_bytes, + "no header from relay" + ); return Ok(None) } let get_header_response: GetHeaderReponse = serde_json::from_slice(&response_bytes)?; + debug!( + ?code, + latency_ms, + block_hash = %get_header_response.block_hash(), + value_eth = format_ether(get_header_response.value()), + "received new header" + ); + validate_header( &get_header_response.data, chain, @@ -146,12 +150,9 @@ fn validate_header( ) -> Result<(), ValidationError> { let block_hash = signed_header.message.header.block_hash; let relay_pubkey = signed_header.message.pubkey; - let block_number = signed_header.message.header.block_number; let tx_root = signed_header.message.header.transactions_root; let value = signed_header.message.value(); - debug!(block_number, %block_hash, %tx_root, value_eth=wei_to_eth(&value), "received relay bid"); - if block_hash == B256::ZERO { return Err(ValidationError::EmptyBlockhash) } diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 5ace0af..98e63fb 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{ops::Mul, time::Duration}; use alloy::rpc::types::beacon::relay::ValidatorRegistration; use axum::http::{HeaderMap, HeaderValue}; @@ -6,9 +6,10 @@ use cb_common::{ pbs::{RelayEntry, HEADER_START_TIME_UNIX_MS}, utils::{get_user_agent, utcnow_ms}, }; +use eyre::bail; use futures::future::join_all; use reqwest::header::USER_AGENT; -use tracing::error; +use tracing::{debug, error}; use crate::{ constants::REGISTER_VALIDATOR_ENDPOINT_TAG, @@ -25,11 +26,10 @@ pub async fn register_validator( state: PbsState, ) -> eyre::Result<()> { // prepare headers - let ua = get_user_agent(&req_headers); let mut send_headers = HeaderMap::new(); send_headers .insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?); - if let Some(ua) = ua { + if let Some(ua) = get_user_agent(&req_headers) { send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?); } @@ -50,10 +50,11 @@ pub async fn register_validator( if results.iter().any(|res| res.is_ok()) { Ok(()) } else { - Err(eyre::eyre!("No relay passed register_validator successfully")) + bail!("No relay passed register_validator successfully") } } +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))] async fn send_register_validator( headers: HeaderMap, relay: RelayEntry, @@ -73,27 +74,26 @@ async fn send_register_validator( .json(®istrations) .send() .await?; - timer.observe_duration(); + let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64; - // TODO: send to relay monitor - - let status = res.status(); + let code = res.status(); RELAY_STATUS_CODE - .with_label_values(&[status.as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id]) + .with_label_values(&[code.as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id]) .inc(); let response_bytes = res.bytes().await?; - if !status.is_success() { + if !code.is_success() { let err = PbsError::RelayResponse { error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), - code: status.as_u16(), + code: code.as_u16(), }; // error here since we check if any success aboves - error!(?err, relay_id = relay.id, event = "register_validator"); - + error!(?err, "failed registration"); return Err(err); }; + debug!(?code, latency_ms, "registration successful"); + Ok(()) } diff --git a/crates/pbs/src/mev_boost/status.rs b/crates/pbs/src/mev_boost/status.rs index 240136e..5f71359 100644 --- a/crates/pbs/src/mev_boost/status.rs +++ b/crates/pbs/src/mev_boost/status.rs @@ -1,9 +1,10 @@ -use std::time::Duration; +use std::{ops::Mul, time::Duration}; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{pbs::RelayEntry, utils::get_user_agent}; use futures::future::select_ok; use reqwest::header::USER_AGENT; +use tracing::{debug, error}; use crate::{ constants::STATUS_ENDPOINT_TAG, @@ -24,9 +25,8 @@ pub async fn get_status( Ok(()) } else { // prepare headers - let ua = get_user_agent(&req_headers); let mut send_headers = HeaderMap::new(); - if let Some(ua) = ua { + if let Some(ua) = get_user_agent(&req_headers) { send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?); } @@ -49,6 +49,7 @@ pub async fn get_status( } } +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))] async fn send_relay_check( headers: HeaderMap, relay: RelayEntry, @@ -58,18 +59,23 @@ async fn send_relay_check( let timer = RELAY_LATENCY.with_label_values(&[STATUS_ENDPOINT_TAG, &relay.id]).start_timer(); let res = client.get(url).timeout(Duration::from_secs(30)).headers(headers).send().await?; - timer.observe_duration(); + let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64; - let status = res.status(); - RELAY_STATUS_CODE.with_label_values(&[status.as_str(), STATUS_ENDPOINT_TAG, &relay.id]).inc(); + let code = res.status(); + RELAY_STATUS_CODE.with_label_values(&[code.as_str(), STATUS_ENDPOINT_TAG, &relay.id]).inc(); let response_bytes = res.bytes().await?; - if !status.is_success() { - return Err(PbsError::RelayResponse { + if !code.is_success() { + let err = PbsError::RelayResponse { error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), - code: status.as_u16(), - }) + code: code.as_u16(), + }; + + error!(?err, "status failed"); + return Err(err) }; + debug!(?code, latency_ms, "status passed"); + Ok(()) } diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index 07c604b..eb49ece 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Duration}; +use std::{ops::Mul, sync::Arc, time::Duration}; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ @@ -8,7 +8,7 @@ use cb_common::{ }; use futures::future::select_ok; use reqwest::header::USER_AGENT; -use tracing::warn; +use tracing::{debug, error}; use crate::{ constants::SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, @@ -24,24 +24,14 @@ pub async fn submit_block( req_headers: HeaderMap, state: PbsState, ) -> eyre::Result { - let (slot, slot_uuid) = state.get_slot_and_uuid(); - let mut send_headers = HeaderMap::new(); - - if slot != signed_blinded_block.message.slot { - warn!( - expected = slot, - got = signed_blinded_block.message.slot, - "blinded beacon slot mismatch" - ); - } else { - send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?); - } + let (_, slot_uuid) = state.get_slot_and_uuid(); // prepare headers - let ua = get_user_agent(&req_headers); + let mut send_headers = HeaderMap::new(); + send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?); send_headers .insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?); - if let Some(ua) = ua { + if let Some(ua) = get_user_agent(&req_headers) { send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?); } @@ -66,6 +56,7 @@ pub async fn submit_block( // submits blinded signed block and expects the execution payload + blobs bundle // back +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))] async fn send_submit_block( headers: HeaderMap, relay: RelayEntry, @@ -85,23 +76,33 @@ async fn send_submit_block( .json(&signed_blinded_block) .send() .await?; - timer.observe_duration(); + let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64; - let status = res.status(); + let code = res.status(); RELAY_STATUS_CODE - .with_label_values(&[status.as_str(), SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id]) + .with_label_values(&[code.as_str(), SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id]) .inc(); let response_bytes = res.bytes().await?; - if !status.is_success() { - return Err(PbsError::RelayResponse { + if !code.is_success() { + let err = PbsError::RelayResponse { error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), - code: status.as_u16(), - }) + code: code.as_u16(), + }; + + error!(?err, "failed submit block"); + return Err(err) }; let block_response: SubmitBlindedBlockResponse = serde_json::from_slice(&response_bytes)?; + debug!( + ?code, + latency_ms, + block_hash = %block_response.block_hash(), + "received unblinded block" + ); + if signed_blinded_block.block_hash() != block_response.block_hash() { return Err(PbsError::Validation(ValidationError::BlockHashMismatch { expected: signed_blinded_block.block_hash(), diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index f90c8a6..082e43a 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -1,3 +1,4 @@ +use alloy::primitives::utils::format_ether; use axum::{ extract::{Path, State}, http::HeaderMap, @@ -17,6 +18,7 @@ use crate::{ BuilderEvent, GetHeaderParams, }; +#[tracing::instrument(skip_all, name = "get_header", fields(req_id = %Uuid::new_v4(), slot = params.slot))] pub async fn handle_get_header>( State(state): State>, req_headers: HeaderMap, @@ -24,30 +26,32 @@ pub async fn handle_get_header>( ) -> Result { state.publish_event(BuilderEvent::GetHeaderRequest(params)); - let req_id = Uuid::new_v4(); let now = utcnow_ms(); let slot_start_ms = timestamp_of_slot_start_millis(params.slot, state.config.chain); let ua = get_user_agent(&req_headers); - info!(event = "get_header", %req_id, ?ua, slot=params.slot, parent_hash=%params.parent_hash, validator_pubkey=%params.pubkey, ms_into_slot=now.saturating_sub(slot_start_ms)); + info!(?ua, parent_hash=%params.parent_hash, validator_pubkey=%params.pubkey, ms_into_slot=now.saturating_sub(slot_start_ms)); match T::get_header(params, req_headers, state.clone()).await { Ok(res) => { state.publish_event(BuilderEvent::GetHeaderResponse(Box::new(res.clone()))); if let Some(max_bid) = res { - info!(event ="get_header", %req_id, block_hash =% max_bid.data.message.header.block_hash, "header available for slot"); + info!(block_hash =% max_bid.block_hash(), value_eth = format_ether(max_bid.value()), "received header"); + BEACON_NODE_STATUS.with_label_values(&["200", GET_HEADER_ENDPOINT_TAG]).inc(); Ok((StatusCode::OK, axum::Json(max_bid)).into_response()) } else { // spec: return 204 if request is valid but no bid available - info!(event = "get_header", %req_id, "no header available for slot"); + info!("no header available for slot"); + BEACON_NODE_STATUS.with_label_values(&["204", GET_HEADER_ENDPOINT_TAG]).inc(); Ok(StatusCode::NO_CONTENT.into_response()) } } Err(err) => { - error!(event = "get_header", %req_id, ?err, "failed relay get_header"); + error!(?err, "no header available from relays"); + let err = PbsClientError::NoPayload; BEACON_NODE_STATUS .with_label_values(&[err.status_code().as_str(), GET_HEADER_ENDPOINT_TAG]) diff --git a/crates/pbs/src/routes/register_validator.rs b/crates/pbs/src/routes/register_validator.rs index 21cea09..9bab3ae 100644 --- a/crates/pbs/src/routes/register_validator.rs +++ b/crates/pbs/src/routes/register_validator.rs @@ -2,7 +2,7 @@ use alloy::rpc::types::beacon::relay::ValidatorRegistration; use axum::{extract::State, http::HeaderMap, response::IntoResponse, Json}; use cb_common::utils::get_user_agent; use reqwest::StatusCode; -use tracing::{error, info}; +use tracing::{error, info, trace}; use uuid::Uuid; use crate::{ @@ -14,29 +14,31 @@ use crate::{ BuilderEvent, }; +#[tracing::instrument(skip_all, name = "register_validators", fields(req_id = %Uuid::new_v4()))] pub async fn handle_register_validator>( State(state): State>, req_headers: HeaderMap, Json(registrations): Json>, ) -> Result { + trace!(?registrations); state.publish_event(BuilderEvent::RegisterValidatorRequest(registrations.clone())); - let req_id = Uuid::new_v4(); let ua = get_user_agent(&req_headers); - info!(method = "register_validator", %req_id, ?ua, num_registrations=registrations.len()); + info!(?ua, num_registrations = registrations.len()); if let Err(err) = T::register_validator(registrations, req_headers, state.clone()).await { state.publish_event(BuilderEvent::RegisterValidatorResponse); + error!(?err, "all relays failed registration"); - error!(method = "register_validator", %req_id, ?err, "all relays failed register_validator"); let err = PbsClientError::NoResponse; BEACON_NODE_STATUS .with_label_values(&[err.status_code().as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG]) .inc(); Err(err) } else { - info!(event = "register_validator", %req_id, "register validator successful"); + info!("register validator successful"); + BEACON_NODE_STATUS.with_label_values(&["200", REGISTER_VALIDATOR_ENDPOINT_TAG]).inc(); Ok(StatusCode::OK) } diff --git a/crates/pbs/src/routes/status.rs b/crates/pbs/src/routes/status.rs index d79ddd2..bd575e9 100644 --- a/crates/pbs/src/routes/status.rs +++ b/crates/pbs/src/routes/status.rs @@ -13,27 +13,28 @@ use crate::{ BuilderEvent, }; +#[tracing::instrument(skip_all, name = "status", fields(req_id = %Uuid::new_v4()))] pub async fn handle_get_status>( req_headers: HeaderMap, State(state): State>, ) -> Result { - let req_id = Uuid::new_v4(); - state.publish_event(BuilderEvent::GetStatusEvent); let ua = get_user_agent(&req_headers); - info!(method = "get_status", ?ua, relay_check = state.config.pbs_config.relay_check); + info!(?ua, relay_check = state.config.pbs_config.relay_check); match T::get_status(req_headers, state.clone()).await { Ok(_) => { state.publish_event(BuilderEvent::GetStatusResponse); - info!(method = "get_status", %req_id, "relay check successful"); + info!("relay check successful"); + BEACON_NODE_STATUS.with_label_values(&["200", STATUS_ENDPOINT_TAG]).inc(); Ok(StatusCode::OK) } Err(err) => { - error!(method = "get_status", %req_id, ?err, "all relays failed get_status"); + error!(?err, "all relays failed get_status"); + let err = PbsClientError::NoResponse; BEACON_NODE_STATUS .with_label_values(&[err.status_code().as_str(), STATUS_ENDPOINT_TAG]) diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index 65aca13..4c7c17f 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -1,7 +1,7 @@ use axum::{extract::State, http::HeaderMap, response::IntoResponse, Json}; use cb_common::utils::{get_user_agent, timestamp_of_slot_start_millis, utcnow_ms}; use reqwest::StatusCode; -use tracing::{error, info, warn}; +use tracing::{error, info, trace, warn}; use uuid::Uuid; use crate::{ @@ -14,16 +14,15 @@ use crate::{ BuilderEvent, }; -/// Implements https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlock -/// Returns error if the corresponding is not delivered by any relay +#[tracing::instrument(skip_all, name = "submit_blinded_block", fields(req_id = %Uuid::new_v4(), slot = signed_blinded_block.message.slot))] pub async fn handle_submit_block>( State(state): State>, req_headers: HeaderMap, Json(signed_blinded_block): Json, ) -> Result { + trace!(?signed_blinded_block); state.publish_event(BuilderEvent::SubmitBlockRequest(Box::new(signed_blinded_block.clone()))); - let req_id = Uuid::new_v4(); let now = utcnow_ms(); let slot = signed_blinded_block.message.slot; let block_hash = signed_blinded_block.message.body.execution_payload_header.block_hash; @@ -31,17 +30,17 @@ pub async fn handle_submit_block>( let ua = get_user_agent(&req_headers); let (curr_slot, slot_uuid) = state.get_slot_and_uuid(); - info!(method = "submit_block", %req_id, ?ua, slot, %slot_uuid, ms_into_slot=now.saturating_sub(slot_start_ms), %block_hash); + info!(?ua, %slot_uuid, ms_into_slot=now.saturating_sub(slot_start_ms), %block_hash); if curr_slot != signed_blinded_block.message.slot { - warn!(%req_id, expected = curr_slot, got = slot, "blinded beacon slot mismatch") + warn!(expected = curr_slot, got = slot, "blinded beacon slot mismatch") } match T::submit_block(signed_blinded_block, req_headers, state.clone()).await { Ok(res) => { + trace!(?res); state.publish_event(BuilderEvent::SubmitBlockResponse(Box::new(res.clone()))); - - info!(method="submit_block", %req_id, "received unblinded block"); + info!("received unblinded block"); BEACON_NODE_STATUS.with_label_values(&["200", SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG]).inc(); Ok((StatusCode::OK, Json(res).into_response())) @@ -57,14 +56,13 @@ pub async fn handle_submit_block>( .collect::>() .join(","); - error!(method="submit_block", %req_id, ?err, %block_hash, ?fault_relays, "CRITICAL: no payload received from relays"); - + error!(?err, %block_hash, fault_relays, "CRITICAL: no payload received from relays"); state.publish_event(BuilderEvent::MissedPayload { block_hash, relays: fault_relays, }); } else { - error!(method="submit_block", %req_id, ?err, %slot_uuid, %block_hash, "CRITICAL: no payload delivered and no relay for block hash. Was getHeader even called?"); + error!(?err, %block_hash, "CRITICAL: no payload delivered and no relay for block hash. Was getHeader even called?"); state.publish_event(BuilderEvent::MissedPayload { block_hash, relays: String::default(),