Skip to content

Commit

Permalink
Merge pull request #38 from Commit-Boost/ltitanb/improve-pbs-logging
Browse files Browse the repository at this point in the history
ltitanb/improve pbs logging
  • Loading branch information
ltitanb authored Jul 25, 2024
2 parents 9bed57f + 5002857 commit 7ff8f37
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 103 deletions.
7 changes: 1 addition & 6 deletions crates/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,7 @@ pub fn initialize_tracing_log() {
Level::INFO
};

tracing_subscriber::fmt()
.compact()
.with_max_level(level)
.with_target(true)
.with_file(true)
.init();
tracing_subscriber::fmt().compact().with_max_level(level).with_target(true).init();
}

pub fn print_logo() {
Expand Down
49 changes: 25 additions & 24 deletions crates/pbs/src/mev_boost/get_header.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration};
use std::{ops::Mul, sync::Arc, time::Duration};

use alloy::{
primitives::{B256, U256},
primitives::{utils::format_ether, B256, U256},
rpc::types::beacon::BlsPublicKey,
};
use axum::http::{HeaderMap, HeaderValue};
Expand All @@ -10,7 +10,7 @@ use cb_common::{
pbs::{RelayEntry, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS},
signature::verify_signed_builder_message,
types::Chain,
utils::{get_user_agent, utcnow_ms, wei_to_eth},
utils::{get_user_agent, utcnow_ms},
};
use futures::future::join_all;
use reqwest::{header::USER_AGENT, StatusCode};
Expand All @@ -36,12 +36,11 @@ pub async fn get_header<S: BuilderApiState>(
let slot_uuid = state.get_or_update_slot_uuid(slot);

// prepare headers
let ua = get_user_agent(&req_headers);
let mut send_headers = HeaderMap::new();
send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?);
send_headers
.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?);
if let Some(ua) = ua {
if let Some(ua) = get_user_agent(&req_headers) {
send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?);
}

Expand Down Expand Up @@ -75,6 +74,7 @@ pub async fn get_header<S: BuilderApiState>(
Ok(state.add_bids(slot, relay_bids))
}

#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))]
async fn send_get_header(
headers: HeaderMap,
slot: u64,
Expand All @@ -95,35 +95,39 @@ async fn send_get_header(
.headers(headers)
.send()
.await?;
timer.observe_duration();
let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64;

let status = res.status();
RELAY_STATUS_CODE
.with_label_values(&[status.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id])
.inc();
let code = res.status();
RELAY_STATUS_CODE.with_label_values(&[code.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id]).inc();

let response_bytes = res.bytes().await?;
if !status.is_success() {
if !code.is_success() {
return Err(PbsError::RelayResponse {
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
code: status.as_u16(),
code: code.as_u16(),
});
};

debug!(
method = "get_header",
relay = relay.id,
code = status.as_u16(),
response = ?response_bytes,
"received response"
);

if status == StatusCode::NO_CONTENT {
if code == StatusCode::NO_CONTENT {
debug!(
?code,
latency_ms,
response = ?response_bytes,
"no header from relay"
);
return Ok(None)
}

let get_header_response: GetHeaderReponse = serde_json::from_slice(&response_bytes)?;

debug!(
?code,
latency_ms,
block_hash = %get_header_response.block_hash(),
value_eth = format_ether(get_header_response.value()),
"received new header"
);

validate_header(
&get_header_response.data,
chain,
Expand All @@ -146,12 +150,9 @@ fn validate_header(
) -> Result<(), ValidationError> {
let block_hash = signed_header.message.header.block_hash;
let relay_pubkey = signed_header.message.pubkey;
let block_number = signed_header.message.header.block_number;
let tx_root = signed_header.message.header.transactions_root;
let value = signed_header.message.value();

debug!(block_number, %block_hash, %tx_root, value_eth=wei_to_eth(&value), "received relay bid");

if block_hash == B256::ZERO {
return Err(ValidationError::EmptyBlockhash)
}
Expand Down
28 changes: 14 additions & 14 deletions crates/pbs/src/mev_boost/register_validator.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::time::Duration;
use std::{ops::Mul, time::Duration};

use alloy::rpc::types::beacon::relay::ValidatorRegistration;
use axum::http::{HeaderMap, HeaderValue};
use cb_common::{
pbs::{RelayEntry, HEADER_START_TIME_UNIX_MS},
utils::{get_user_agent, utcnow_ms},
};
use eyre::bail;
use futures::future::join_all;
use reqwest::header::USER_AGENT;
use tracing::error;
use tracing::{debug, error};

use crate::{
constants::REGISTER_VALIDATOR_ENDPOINT_TAG,
Expand All @@ -25,11 +26,10 @@ pub async fn register_validator<S: BuilderApiState>(
state: PbsState<S>,
) -> eyre::Result<()> {
// prepare headers
let ua = get_user_agent(&req_headers);
let mut send_headers = HeaderMap::new();
send_headers
.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?);
if let Some(ua) = ua {
if let Some(ua) = get_user_agent(&req_headers) {
send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?);
}

Expand All @@ -50,10 +50,11 @@ pub async fn register_validator<S: BuilderApiState>(
if results.iter().any(|res| res.is_ok()) {
Ok(())
} else {
Err(eyre::eyre!("No relay passed register_validator successfully"))
bail!("No relay passed register_validator successfully")
}
}

#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))]
async fn send_register_validator(
headers: HeaderMap,
relay: RelayEntry,
Expand All @@ -73,27 +74,26 @@ async fn send_register_validator(
.json(&registrations)
.send()
.await?;
timer.observe_duration();
let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64;

// TODO: send to relay monitor

let status = res.status();
let code = res.status();
RELAY_STATUS_CODE
.with_label_values(&[status.as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id])
.with_label_values(&[code.as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id])
.inc();

let response_bytes = res.bytes().await?;
if !status.is_success() {
if !code.is_success() {
let err = PbsError::RelayResponse {
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
code: status.as_u16(),
code: code.as_u16(),
};

// error here since we check if any success aboves
error!(?err, relay_id = relay.id, event = "register_validator");

error!(?err, "failed registration");
return Err(err);
};

debug!(?code, latency_ms, "registration successful");

Ok(())
}
26 changes: 16 additions & 10 deletions crates/pbs/src/mev_boost/status.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::time::Duration;
use std::{ops::Mul, time::Duration};

use axum::http::{HeaderMap, HeaderValue};
use cb_common::{pbs::RelayEntry, utils::get_user_agent};
use futures::future::select_ok;
use reqwest::header::USER_AGENT;
use tracing::{debug, error};

use crate::{
constants::STATUS_ENDPOINT_TAG,
Expand All @@ -24,9 +25,8 @@ pub async fn get_status<S: BuilderApiState>(
Ok(())
} else {
// prepare headers
let ua = get_user_agent(&req_headers);
let mut send_headers = HeaderMap::new();
if let Some(ua) = ua {
if let Some(ua) = get_user_agent(&req_headers) {
send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?);
}

Expand All @@ -49,6 +49,7 @@ pub async fn get_status<S: BuilderApiState>(
}
}

#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))]
async fn send_relay_check(
headers: HeaderMap,
relay: RelayEntry,
Expand All @@ -58,18 +59,23 @@ async fn send_relay_check(

let timer = RELAY_LATENCY.with_label_values(&[STATUS_ENDPOINT_TAG, &relay.id]).start_timer();
let res = client.get(url).timeout(Duration::from_secs(30)).headers(headers).send().await?;
timer.observe_duration();
let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64;

let status = res.status();
RELAY_STATUS_CODE.with_label_values(&[status.as_str(), STATUS_ENDPOINT_TAG, &relay.id]).inc();
let code = res.status();
RELAY_STATUS_CODE.with_label_values(&[code.as_str(), STATUS_ENDPOINT_TAG, &relay.id]).inc();

let response_bytes = res.bytes().await?;
if !status.is_success() {
return Err(PbsError::RelayResponse {
if !code.is_success() {
let err = PbsError::RelayResponse {
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
code: status.as_u16(),
})
code: code.as_u16(),
};

error!(?err, "status failed");
return Err(err)
};

debug!(?code, latency_ms, "status passed");

Ok(())
}
47 changes: 24 additions & 23 deletions crates/pbs/src/mev_boost/submit_block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{sync::Arc, time::Duration};
use std::{ops::Mul, sync::Arc, time::Duration};

use axum::http::{HeaderMap, HeaderValue};
use cb_common::{
Expand All @@ -8,7 +8,7 @@ use cb_common::{
};
use futures::future::select_ok;
use reqwest::header::USER_AGENT;
use tracing::warn;
use tracing::{debug, error};

use crate::{
constants::SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG,
Expand All @@ -24,24 +24,14 @@ pub async fn submit_block<S: BuilderApiState>(
req_headers: HeaderMap,
state: PbsState<S>,
) -> eyre::Result<SubmitBlindedBlockResponse> {
let (slot, slot_uuid) = state.get_slot_and_uuid();
let mut send_headers = HeaderMap::new();

if slot != signed_blinded_block.message.slot {
warn!(
expected = slot,
got = signed_blinded_block.message.slot,
"blinded beacon slot mismatch"
);
} else {
send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?);
}
let (_, slot_uuid) = state.get_slot_and_uuid();

// prepare headers
let ua = get_user_agent(&req_headers);
let mut send_headers = HeaderMap::new();
send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?);
send_headers
.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?);
if let Some(ua) = ua {
if let Some(ua) = get_user_agent(&req_headers) {
send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?);
}

Expand All @@ -66,6 +56,7 @@ pub async fn submit_block<S: BuilderApiState>(

// submits blinded signed block and expects the execution payload + blobs bundle
// back
#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))]
async fn send_submit_block(
headers: HeaderMap,
relay: RelayEntry,
Expand All @@ -85,23 +76,33 @@ async fn send_submit_block(
.json(&signed_blinded_block)
.send()
.await?;
timer.observe_duration();
let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64;

let status = res.status();
let code = res.status();
RELAY_STATUS_CODE
.with_label_values(&[status.as_str(), SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id])
.with_label_values(&[code.as_str(), SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id])
.inc();

let response_bytes = res.bytes().await?;
if !status.is_success() {
return Err(PbsError::RelayResponse {
if !code.is_success() {
let err = PbsError::RelayResponse {
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
code: status.as_u16(),
})
code: code.as_u16(),
};

error!(?err, "failed submit block");
return Err(err)
};

let block_response: SubmitBlindedBlockResponse = serde_json::from_slice(&response_bytes)?;

debug!(
?code,
latency_ms,
block_hash = %block_response.block_hash(),
"received unblinded block"
);

if signed_blinded_block.block_hash() != block_response.block_hash() {
return Err(PbsError::Validation(ValidationError::BlockHashMismatch {
expected: signed_blinded_block.block_hash(),
Expand Down
Loading

0 comments on commit 7ff8f37

Please sign in to comment.