Skip to content

Commit

Permalink
[Traffic Control] Improved error metrics and benchmarking (#20134)
Browse files Browse the repository at this point in the history
## Description 

* Add error categorization to the error metrics
* Added trace logging 
* Set invariant on x-forwarded-for header length vs associated node
config setting

## Test plan 

How did you test the new or updated feature?

Ran on private testnet many times
---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
williampsmith authored Nov 6, 2024
1 parent 5c97430 commit 59786ac
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 36 deletions.
32 changes: 29 additions & 3 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ pub struct ValidatorServiceMetrics {
forwarded_header_parse_error: IntCounter,
forwarded_header_invalid: IntCounter,
forwarded_header_not_included: IntCounter,
client_id_source_config_mismatch: IntCounter,
}

impl ValidatorServiceMetrics {
Expand Down Expand Up @@ -329,6 +330,12 @@ impl ValidatorServiceMetrics {
registry,
)
.unwrap(),
client_id_source_config_mismatch: register_int_counter_with_registry!(
"validator_service_client_id_source_config_mismatch",
"Number of times detected that client id source config doesn't agree with x-forwarded-for header",
registry,
)
.unwrap(),
}
}

Expand Down Expand Up @@ -1225,6 +1232,19 @@ impl ValidatorService {
return None;
}
let contents_len = header_contents.len();
if contents_len < *num_hops {
error!(
"x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
`client-id-source` in the node config.",
header_contents,
contents_len,
num_hops,
contents_len,
);
self.metrics.client_id_source_config_mismatch.inc();
return None;
}
let Some(client_ip) = header_contents.get(contents_len - num_hops)
else {
error!(
Expand Down Expand Up @@ -1296,7 +1316,11 @@ impl ValidatorService {
traffic_controller.tally(TrafficTally {
direct: client,
through_fullnode: None,
error_weight: error.map(normalize).unwrap_or(Weight::zero()),
error_info: error.map(|e| {
let error_type = String::from(e.clone().as_ref());
let error_weight = normalize(e);
(error_weight, error_type)
}),
spam_weight,
timestamp: SystemTime::now(),
})
Expand All @@ -1320,8 +1344,10 @@ fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
// TODO: refine error matching here
fn normalize(err: SuiError) -> Weight {
match err {
SuiError::UserInputError { .. }
| SuiError::InvalidSignature { .. }
SuiError::UserInputError {
error: UserInputError::IncorrectUserSignature { .. },
} => Weight::one(),
SuiError::InvalidSignature { .. }
| SuiError::SignerSignatureAbsent { .. }
| SuiError::SignerSignatureNumberMismatch { .. }
| SuiError::IncorrectSigner { .. }
Expand Down
12 changes: 10 additions & 2 deletions crates/sui-core/src/traffic_controller/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

use prometheus::{
register_int_counter_with_registry, register_int_gauge_with_registry, IntCounter, IntGauge,
Registry,
register_int_counter_vec_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry,
};

#[derive(Clone)]
Expand All @@ -18,6 +18,7 @@ pub struct TrafficControllerMetrics {
pub num_dry_run_blocked_requests: IntCounter,
pub tally_handled: IntCounter,
pub error_tally_handled: IntCounter,
pub tally_error_types: IntCounterVec,
pub deadmans_switch_enabled: IntGauge,
pub highest_direct_spam_rate: IntGauge,
pub highest_proxied_spam_rate: IntGauge,
Expand Down Expand Up @@ -90,6 +91,13 @@ impl TrafficControllerMetrics {
registry
)
.unwrap(),
tally_error_types: register_int_counter_vec_with_registry!(
"traffic_control_tally_error_types",
"Number of tally errors, grouped by error type",
&["error_type"],
registry
)
.unwrap(),
deadmans_switch_enabled: register_int_gauge_with_registry!(
"deadmans_switch_enabled",
"If 1, the deadman's switch is enabled and all traffic control
Expand Down
28 changes: 21 additions & 7 deletions crates/sui-core/src/traffic_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,13 +378,13 @@ async fn run_tally_loop(
metrics
.highest_direct_spam_rate
.set(highest_direct_rate.0 as i64);
trace!("Recent highest direct spam rate: {:?}", highest_direct_rate);
debug!("Recent highest direct spam rate: {:?}", highest_direct_rate);
}
if let Some(highest_proxied_rate) = spam_policy.highest_proxied_rate() {
metrics
.highest_proxied_spam_rate
.set(highest_proxied_rate.0 as i64);
trace!(
debug!(
"Recent highest proxied spam rate: {:?}",
highest_proxied_rate
);
Expand All @@ -395,7 +395,7 @@ async fn run_tally_loop(
metrics
.highest_direct_error_rate
.set(highest_direct_rate.0 as i64);
trace!(
debug!(
"Recent highest direct error rate: {:?}",
highest_direct_rate
);
Expand All @@ -404,7 +404,7 @@ async fn run_tally_loop(
metrics
.highest_proxied_error_rate
.set(highest_proxied_rate.0 as i64);
trace!(
debug!(
"Recent highest proxied error rate: {:?}",
highest_proxied_rate
);
Expand All @@ -425,10 +425,22 @@ async fn handle_error_tally(
metrics: Arc<TrafficControllerMetrics>,
mem_drainfile_present: bool,
) -> Result<(), reqwest::Error> {
if !tally.error_weight.is_sampled() {
let Some((error_weight, error_type)) = tally.clone().error_info else {
return Ok(());
};
if !error_weight.is_sampled() {
return Ok(());
}
let resp = policy.handle_tally(tally.clone());
trace!(
"Handling error_type {:?} from client {:?}",
error_type,
tally.direct,
);
metrics
.tally_error_types
.with_label_values(&[error_type.as_str()])
.inc();
let resp = policy.handle_tally(tally);
metrics.error_tally_handled.inc();
if let Some(fw_config) = fw_config {
if fw_config.delegate_error_blocking && !mem_drainfile_present {
Expand Down Expand Up @@ -509,6 +521,7 @@ async fn handle_policy_response(
{
// Only increment the metric if the client was not already blocked
debug!("Blocking client: {:?}", client);
metrics.requests_blocked_at_protocol.inc();
metrics.connection_ip_blocklist_len.inc();
}
}
Expand All @@ -523,6 +536,7 @@ async fn handle_policy_response(
{
// Only increment the metric if the client was not already blocked
debug!("Blocking proxied client: {:?}", client);
metrics.requests_blocked_at_protocol.inc();
metrics.proxy_ip_blocklist_len.inc();
}
}
Expand Down Expand Up @@ -745,7 +759,7 @@ impl TrafficSim {
// TODO add proxy IP for testing
None,
// TODO add weight adjustments
Weight::one(),
None,
Weight::one(),
));
} else {
Expand Down
23 changes: 15 additions & 8 deletions crates/sui-core/src/traffic_controller/policies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::hash::Hash;
use std::time::Duration;
use std::time::{Instant, SystemTime};
use sui_types::traffic_control::{FreqThresholdConfig, PolicyConfig, PolicyType, Weight};
use tracing::info;
use tracing::{info, trace};

const HIGHEST_RATES_CAPACITY: usize = 20;

Expand Down Expand Up @@ -222,7 +222,7 @@ impl TrafficSketch {
pub struct TrafficTally {
pub direct: Option<IpAddr>,
pub through_fullnode: Option<IpAddr>,
pub error_weight: Weight,
pub error_info: Option<(Weight, String)>,
pub spam_weight: Weight,
pub timestamp: SystemTime,
}
Expand All @@ -231,13 +231,13 @@ impl TrafficTally {
pub fn new(
direct: Option<IpAddr>,
through_fullnode: Option<IpAddr>,
error_weight: Weight,
error_info: Option<(Weight, String)>,
spam_weight: Weight,
) -> Self {
Self {
direct,
through_fullnode,
error_weight,
error_info,
spam_weight,
timestamp: SystemTime::now(),
}
Expand Down Expand Up @@ -360,7 +360,14 @@ impl FreqThresholdPolicy {
let block_client = if let Some(source) = tally.direct {
let key = SketchKey(source, ClientType::Direct);
self.sketch.increment_count(&key);
if self.sketch.get_request_rate(&key) >= self.client_threshold as f64 {
let req_rate = self.sketch.get_request_rate(&key);
trace!(
"FreqThresholdPolicy handling tally -- req_rate: {:?}, client_threshold: {:?}, client: {:?}",
req_rate,
self.client_threshold,
source,
);
if req_rate >= self.client_threshold as f64 {
Some(source)
} else {
None
Expand Down Expand Up @@ -515,21 +522,21 @@ mod tests {
let alice = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))),
error_weight: Weight::zero(),
error_info: None,
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};
let bob = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(4, 3, 2, 1))),
error_weight: Weight::zero(),
error_info: None,
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};
let charlie = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8))),
error_weight: Weight::zero(),
error_info: None,
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};
Expand Down
34 changes: 19 additions & 15 deletions crates/sui-e2e-tests/tests/traffic_control_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use sui_network::default_mysten_network_config;
use sui_swarm_config::network_config_builder::ConfigBuilder;
use sui_test_transaction_builder::batch_make_transfer_transactions;
use sui_types::{
crypto::Ed25519SuiSignature,
quorum_driver_types::ExecuteTransactionRequestType,
signature::GenericSignature,
traffic_control::{
FreqThresholdConfig, PolicyConfig, PolicyType, RemoteFirewallConfig, Weight,
},
Expand Down Expand Up @@ -225,7 +227,7 @@ async fn test_validator_traffic_control_error_blocked() -> Result<(), anyhow::Er
.with_policy_config(Some(policy_config))
.build();
let committee = network_config.committee_with_network();
let _test_cluster = TestClusterBuilder::new()
let test_cluster = TestClusterBuilder::new()
.set_network_config(network_config)
.build()
.await;
Expand All @@ -235,12 +237,13 @@ async fn test_validator_traffic_control_error_blocked() -> Result<(), anyhow::Er
);
let (_, auth_client) = local_clients.first_key_value().unwrap();

// transaction signed using user wallet from a different chain/genesis,
// therefore we should fail with UserInputError
let other_cluster = TestClusterBuilder::new().build().await;

let mut txns = batch_make_transfer_transactions(&other_cluster.wallet, n as usize).await;
let tx = txns.swap_remove(0);
let mut txns = batch_make_transfer_transactions(&test_cluster.wallet, n as usize).await;
let mut tx = txns.swap_remove(0);
let signatures = tx.tx_signatures_mut_for_testing();
signatures.pop();
signatures.push(GenericSignature::Signature(
sui_types::crypto::Signature::Ed25519SuiSignature(Ed25519SuiSignature::default()),
));

// it should take no more than 4 requests to be added to the blocklist
for _ in 0..n {
Expand All @@ -251,7 +254,7 @@ async fn test_validator_traffic_control_error_blocked() -> Result<(), anyhow::Er
}
}
}
panic!("Expected spam policy to trigger within {n} requests");
panic!("Expected error policy to trigger within {n} requests");
}

#[tokio::test]
Expand Down Expand Up @@ -406,7 +409,7 @@ async fn test_validator_traffic_control_error_delegated() -> Result<(), anyhow::
.with_firewall_config(Some(firewall_config))
.build();
let committee = network_config.committee_with_network();
let _test_cluster = TestClusterBuilder::new()
let test_cluster = TestClusterBuilder::new()
.set_network_config(network_config)
.build()
.await;
Expand All @@ -416,12 +419,13 @@ async fn test_validator_traffic_control_error_delegated() -> Result<(), anyhow::
);
let (_, auth_client) = local_clients.first_key_value().unwrap();

// transaction signed using user wallet from a different chain/genesis,
// therefore we should fail with UserInputError
let other_cluster = TestClusterBuilder::new().build().await;

let mut txns = batch_make_transfer_transactions(&other_cluster.wallet, n as usize).await;
let tx = txns.swap_remove(0);
let mut txns = batch_make_transfer_transactions(&test_cluster.wallet, n as usize).await;
let mut tx = txns.swap_remove(0);
let signatures = tx.tx_signatures_mut_for_testing();
signatures.pop();
signatures.push(GenericSignature::Signature(
sui_types::crypto::Signature::Ed25519SuiSignature(Ed25519SuiSignature::default()),
));

// start test firewall server
let mut server = NodeFwTestServer::new();
Expand Down
6 changes: 5 additions & 1 deletion crates/sui-json-rpc/src/axum_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,11 @@ fn handle_traffic_resp(
traffic_controller.tally(TrafficTally {
direct: client,
through_fullnode: None,
error_weight: error.map(normalize).unwrap_or(Weight::zero()),
error_info: error.map(|e| {
let error_type = e.to_string();
let error_weight = normalize(e);
(error_weight, error_type)
}),
// For now, count everything as spam with equal weight
// on the rpc node side, including gas-charging endpoints
// such as `sui_executeTransactionBlock`, as this can enable
Expand Down

0 comments on commit 59786ac

Please sign in to comment.