Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ltitanb/improve pbs logging #38

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions crates/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,7 @@ pub fn initialize_tracing_log() {
Level::INFO
};

tracing_subscriber::fmt()
.compact()
.with_max_level(level)
.with_target(true)
.with_file(true)
.init();
tracing_subscriber::fmt().compact().with_max_level(level).with_target(true).init();
}

pub fn print_logo() {
Expand Down
49 changes: 25 additions & 24 deletions crates/pbs/src/mev_boost/get_header.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration};
use std::{ops::Mul, sync::Arc, time::Duration};

use alloy::{
primitives::{B256, U256},
primitives::{utils::format_ether, B256, U256},
rpc::types::beacon::BlsPublicKey,
};
use axum::http::{HeaderMap, HeaderValue};
Expand All @@ -10,7 +10,7 @@ use cb_common::{
pbs::{RelayEntry, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS},
signature::verify_signed_builder_message,
types::Chain,
utils::{get_user_agent, utcnow_ms, wei_to_eth},
utils::{get_user_agent, utcnow_ms},
};
use futures::future::join_all;
use reqwest::{header::USER_AGENT, StatusCode};
Expand All @@ -36,12 +36,11 @@ pub async fn get_header<S: BuilderApiState>(
let slot_uuid = state.get_or_update_slot_uuid(slot);

// prepare headers
let ua = get_user_agent(&req_headers);
let mut send_headers = HeaderMap::new();
send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?);
send_headers
.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?);
if let Some(ua) = ua {
if let Some(ua) = get_user_agent(&req_headers) {
send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?);
}

Expand Down Expand Up @@ -75,6 +74,7 @@ pub async fn get_header<S: BuilderApiState>(
Ok(state.add_bids(slot, relay_bids))
}

#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))]
async fn send_get_header(
headers: HeaderMap,
slot: u64,
Expand All @@ -95,35 +95,39 @@ async fn send_get_header(
.headers(headers)
.send()
.await?;
timer.observe_duration();
let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64;

let status = res.status();
RELAY_STATUS_CODE
.with_label_values(&[status.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id])
.inc();
let code = res.status();
RELAY_STATUS_CODE.with_label_values(&[code.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id]).inc();

let response_bytes = res.bytes().await?;
if !status.is_success() {
if !code.is_success() {
return Err(PbsError::RelayResponse {
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
code: status.as_u16(),
code: code.as_u16(),
});
};

debug!(
method = "get_header",
relay = relay.id,
code = status.as_u16(),
response = ?response_bytes,
"received response"
);

if status == StatusCode::NO_CONTENT {
if code == StatusCode::NO_CONTENT {
debug!(
?code,
latency_ms,
response = ?response_bytes,
"no header from relay"
);
return Ok(None)
}

let get_header_response: GetHeaderReponse = serde_json::from_slice(&response_bytes)?;

debug!(
?code,
latency_ms,
block_hash = %get_header_response.block_hash(),
value_eth = format_ether(get_header_response.value()),
"received new header"
);

validate_header(
&get_header_response.data,
chain,
Expand All @@ -146,12 +150,9 @@ fn validate_header(
) -> Result<(), ValidationError> {
let block_hash = signed_header.message.header.block_hash;
let relay_pubkey = signed_header.message.pubkey;
let block_number = signed_header.message.header.block_number;
let tx_root = signed_header.message.header.transactions_root;
let value = signed_header.message.value();

debug!(block_number, %block_hash, %tx_root, value_eth=wei_to_eth(&value), "received relay bid");

if block_hash == B256::ZERO {
return Err(ValidationError::EmptyBlockhash)
}
Expand Down
28 changes: 14 additions & 14 deletions crates/pbs/src/mev_boost/register_validator.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::time::Duration;
use std::{ops::Mul, time::Duration};

use alloy::rpc::types::beacon::relay::ValidatorRegistration;
use axum::http::{HeaderMap, HeaderValue};
use cb_common::{
pbs::{RelayEntry, HEADER_START_TIME_UNIX_MS},
utils::{get_user_agent, utcnow_ms},
};
use eyre::bail;
use futures::future::join_all;
use reqwest::header::USER_AGENT;
use tracing::error;
use tracing::{debug, error};

use crate::{
constants::REGISTER_VALIDATOR_ENDPOINT_TAG,
Expand All @@ -25,11 +26,10 @@ pub async fn register_validator<S: BuilderApiState>(
state: PbsState<S>,
) -> eyre::Result<()> {
// prepare headers
let ua = get_user_agent(&req_headers);
let mut send_headers = HeaderMap::new();
send_headers
.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?);
if let Some(ua) = ua {
if let Some(ua) = get_user_agent(&req_headers) {
send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?);
}

Expand All @@ -50,10 +50,11 @@ pub async fn register_validator<S: BuilderApiState>(
if results.iter().any(|res| res.is_ok()) {
Ok(())
} else {
Err(eyre::eyre!("No relay passed register_validator successfully"))
bail!("No relay passed register_validator successfully")
}
}

#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))]
async fn send_register_validator(
headers: HeaderMap,
relay: RelayEntry,
Expand All @@ -73,27 +74,26 @@ async fn send_register_validator(
.json(&registrations)
.send()
.await?;
timer.observe_duration();
let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64;

// TODO: send to relay monitor

let status = res.status();
let code = res.status();
RELAY_STATUS_CODE
.with_label_values(&[status.as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id])
.with_label_values(&[code.as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id])
.inc();

let response_bytes = res.bytes().await?;
if !status.is_success() {
if !code.is_success() {
let err = PbsError::RelayResponse {
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
code: status.as_u16(),
code: code.as_u16(),
};

// error here since we check if any success aboves
error!(?err, relay_id = relay.id, event = "register_validator");

error!(?err, "failed registration");
return Err(err);
};

debug!(?code, latency_ms, "registration successful");

Ok(())
}
26 changes: 16 additions & 10 deletions crates/pbs/src/mev_boost/status.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::time::Duration;
use std::{ops::Mul, time::Duration};

use axum::http::{HeaderMap, HeaderValue};
use cb_common::{pbs::RelayEntry, utils::get_user_agent};
use futures::future::select_ok;
use reqwest::header::USER_AGENT;
use tracing::{debug, error};

use crate::{
constants::STATUS_ENDPOINT_TAG,
Expand All @@ -24,9 +25,8 @@ pub async fn get_status<S: BuilderApiState>(
Ok(())
} else {
// prepare headers
let ua = get_user_agent(&req_headers);
let mut send_headers = HeaderMap::new();
if let Some(ua) = ua {
if let Some(ua) = get_user_agent(&req_headers) {
send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?);
}

Expand All @@ -49,6 +49,7 @@ pub async fn get_status<S: BuilderApiState>(
}
}

#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))]
async fn send_relay_check(
headers: HeaderMap,
relay: RelayEntry,
Expand All @@ -58,18 +59,23 @@ async fn send_relay_check(

let timer = RELAY_LATENCY.with_label_values(&[STATUS_ENDPOINT_TAG, &relay.id]).start_timer();
let res = client.get(url).timeout(Duration::from_secs(30)).headers(headers).send().await?;
timer.observe_duration();
let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64;

let status = res.status();
RELAY_STATUS_CODE.with_label_values(&[status.as_str(), STATUS_ENDPOINT_TAG, &relay.id]).inc();
let code = res.status();
RELAY_STATUS_CODE.with_label_values(&[code.as_str(), STATUS_ENDPOINT_TAG, &relay.id]).inc();

let response_bytes = res.bytes().await?;
if !status.is_success() {
return Err(PbsError::RelayResponse {
if !code.is_success() {
let err = PbsError::RelayResponse {
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
code: status.as_u16(),
})
code: code.as_u16(),
};

error!(?err, "status failed");
return Err(err)
};

debug!(?code, latency_ms, "status passed");

Ok(())
}
47 changes: 24 additions & 23 deletions crates/pbs/src/mev_boost/submit_block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{sync::Arc, time::Duration};
use std::{ops::Mul, sync::Arc, time::Duration};

use axum::http::{HeaderMap, HeaderValue};
use cb_common::{
Expand All @@ -8,7 +8,7 @@ use cb_common::{
};
use futures::future::select_ok;
use reqwest::header::USER_AGENT;
use tracing::warn;
use tracing::{debug, error};

use crate::{
constants::SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG,
Expand All @@ -24,24 +24,14 @@ pub async fn submit_block<S: BuilderApiState>(
req_headers: HeaderMap,
state: PbsState<S>,
) -> eyre::Result<SubmitBlindedBlockResponse> {
let (slot, slot_uuid) = state.get_slot_and_uuid();
let mut send_headers = HeaderMap::new();

if slot != signed_blinded_block.message.slot {
warn!(
expected = slot,
got = signed_blinded_block.message.slot,
"blinded beacon slot mismatch"
);
} else {
send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?);
}
let (_, slot_uuid) = state.get_slot_and_uuid();

// prepare headers
let ua = get_user_agent(&req_headers);
let mut send_headers = HeaderMap::new();
send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?);
send_headers
.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?);
if let Some(ua) = ua {
if let Some(ua) = get_user_agent(&req_headers) {
send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?);
}

Expand All @@ -66,6 +56,7 @@ pub async fn submit_block<S: BuilderApiState>(

// submits blinded signed block and expects the execution payload + blobs bundle
// back
#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))]
David-Petrov marked this conversation as resolved.
Show resolved Hide resolved
async fn send_submit_block(
headers: HeaderMap,
relay: RelayEntry,
Expand All @@ -85,23 +76,33 @@ async fn send_submit_block(
.json(&signed_blinded_block)
.send()
.await?;
timer.observe_duration();
let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64;

let status = res.status();
let code = res.status();
RELAY_STATUS_CODE
.with_label_values(&[status.as_str(), SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id])
.with_label_values(&[code.as_str(), SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id])
.inc();

let response_bytes = res.bytes().await?;
if !status.is_success() {
return Err(PbsError::RelayResponse {
if !code.is_success() {
let err = PbsError::RelayResponse {
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
code: status.as_u16(),
})
code: code.as_u16(),
};

error!(?err, "failed submit block");
return Err(err)
};

let block_response: SubmitBlindedBlockResponse = serde_json::from_slice(&response_bytes)?;

debug!(
?code,
latency_ms,
block_hash = %block_response.block_hash(),
"received unblinded block"
);

if signed_blinded_block.block_hash() != block_response.block_hash() {
return Err(PbsError::Validation(ValidationError::BlockHashMismatch {
expected: signed_blinded_block.block_hash(),
Expand Down
Loading