Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make QUIC tpu QOS parameters configurable #4170

Merged
merged 10 commits into from
Jan 15, 2025
2 changes: 1 addition & 1 deletion bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use {
pubkey::Pubkey,
signature::{read_keypair_file, Keypair},
},
solana_streamer::nonblocking::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
solana_streamer::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC},
std::{
net::{IpAddr, Ipv4Addr},
Expand Down
45 changes: 13 additions & 32 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
//! multi-stage transaction processing pipeline in software.

pub use solana_sdk::net::DEFAULT_TPU_COALESCE;
// allow multiple connections for NAT and any open/close overlap
#[deprecated(
since = "2.2.0",
note = "Use solana_streamer::quic::DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER instead"
)]
pub use solana_streamer::quic::DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER as MAX_QUIC_CONNECTIONS_PER_PEER;
use {
crate::{
banking_stage::BankingStage,
Expand Down Expand Up @@ -37,10 +43,7 @@ use {
},
solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
solana_streamer::{
quic::{
spawn_server_multi, QuicServerParams, SpawnServerResult, MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
},
quic::{spawn_server_multi, QuicServerParams, SpawnServerResult},
streamer::StakedNodes,
},
solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
Expand All @@ -54,9 +57,6 @@ use {
tokio::sync::mpsc::Sender as AsyncSender,
};

// allow multiple connections for NAT and any open/close overlap
pub const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;
lijunwangs marked this conversation as resolved.
Show resolved Hide resolved

pub struct TpuSockets {
pub transactions: Vec<UdpSocket>,
pub transaction_forwards: Vec<UdpSocket>,
Expand Down Expand Up @@ -115,7 +115,9 @@ impl Tpu {
banking_tracer: Arc<BankingTracer>,
tracer_thread_hdl: TracerThread,
tpu_enable_udp: bool,
tpu_max_connections_per_ipaddr_per_minute: u64,
tpu_quic_server_config: QuicServerParams,
tpu_fwd_quic_server_config: QuicServerParams,
vote_quic_server_config: QuicServerParams,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
block_production_method: BlockProductionMethod,
enable_block_production_forwarding: bool,
Expand Down Expand Up @@ -178,15 +180,7 @@ impl Tpu {
vote_packet_sender.clone(),
exit.clone(),
staked_nodes.clone(),
QuicServerParams {
max_connections_per_peer: 1,
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
max_staked_connections: MAX_STAKED_CONNECTIONS
.saturating_add(MAX_UNSTAKED_CONNECTIONS),
max_unstaked_connections: 0,
..QuicServerParams::default()
},
vote_quic_server_config,
)
.unwrap();

Expand All @@ -203,12 +197,7 @@ impl Tpu {
packet_sender,
exit.clone(),
staked_nodes.clone(),
QuicServerParams {
max_connections_per_peer: MAX_QUIC_CONNECTIONS_PER_PEER,
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
..QuicServerParams::default()
},
tpu_quic_server_config,
)
.unwrap();

Expand All @@ -225,15 +214,7 @@ impl Tpu {
forwarded_packet_sender,
exit.clone(),
staked_nodes.clone(),
QuicServerParams {
max_connections_per_peer: MAX_QUIC_CONNECTIONS_PER_PEER,
max_staked_connections: MAX_STAKED_CONNECTIONS
.saturating_add(MAX_UNSTAKED_CONNECTIONS),
max_unstaked_connections: 0, // Prevent unstaked nodes from forwarding transactions
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
..QuicServerParams::default()
},
tpu_fwd_quic_server_config,
)
.unwrap();

Expand Down
72 changes: 49 additions & 23 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ use {
timing::timestamp,
},
solana_send_transaction_service::send_transaction_service,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
},
solana_turbine::{self, broadcast_stage::BroadcastStageType},
solana_unified_scheduler_pool::DefaultSchedulerPool,
solana_vote_program::vote_state,
Expand Down Expand Up @@ -483,8 +486,42 @@ pub struct ValidatorTpuConfig {
pub tpu_connection_pool_size: usize,
/// Controls if to enable UDP for TPU tansactions.
pub tpu_enable_udp: bool,
/// Controls the new maximum connections per IpAddr per minute
pub tpu_max_connections_per_ipaddr_per_minute: u64,
/// QUIC server config for regular TPU
pub tpu_quic_server_config: QuicServerParams,
/// QUIC server config for TPU forward
pub tpu_fwd_quic_server_config: QuicServerParams,
/// QUIC server config for Vote
pub vote_quic_server_config: QuicServerParams,
}

impl ValidatorTpuConfig {
/// A convenient function to build a ValidatorTpuConfig for testing with good
/// default.
pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
let tpu_quic_server_config = QuicServerParams {
max_connections_per_ipaddr_per_min: 32,
..Default::default()
};

let tpu_fwd_quic_server_config = QuicServerParams {
max_connections_per_ipaddr_per_min: 32,
max_unstaked_connections: 0,
..Default::default()
};

// vote and tpu_fwd share the same characteristics -- disallow non-staked connections:
let vote_quic_server_config = tpu_fwd_quic_server_config.clone();

ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp,
tpu_quic_server_config,
tpu_fwd_quic_server_config,
vote_quic_server_config,
}
}
}

pub struct Validator {
Expand Down Expand Up @@ -546,7 +583,9 @@ impl Validator {
vote_use_quic,
tpu_connection_pool_size,
tpu_enable_udp,
tpu_max_connections_per_ipaddr_per_minute,
tpu_quic_server_config,
tpu_fwd_quic_server_config,
vote_quic_server_config,
} = tpu_config;

let start_time = Instant::now();
Expand Down Expand Up @@ -1521,7 +1560,9 @@ impl Validator {
banking_tracer,
tracer_thread,
tpu_enable_udp,
tpu_max_connections_per_ipaddr_per_minute,
tpu_quic_server_config,
tpu_fwd_quic_server_config,
vote_quic_server_config,
&prioritization_fee_cache,
config.block_production_method.clone(),
config.enable_block_production_forwarding,
Expand Down Expand Up @@ -2723,10 +2764,7 @@ mod tests {
get_tmp_ledger_path_auto_delete,
},
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
DEFAULT_VOTE_USE_QUIC,
},
solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
std::{fs::remove_dir_all, thread, time::Duration},
};

Expand Down Expand Up @@ -2764,13 +2802,7 @@ mod tests {
None, // rpc_to_plugin_manager_receiver
start_progress.clone(),
SocketAddrSpace::Unspecified,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test
},
ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -2986,13 +3018,7 @@ mod tests {
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
SocketAddrSpace::Unspecified,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test
},
ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start")
Expand Down
28 changes: 5 additions & 23 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,9 @@ impl LocalCluster {
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
// We are turning tpu_enable_udp to true in order to prevent concurrent local cluster tests
// to use the same QUIC ports due to SO_REUSEPORT.
tpu_enable_udp: true,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute
},
// We are turning tpu_enable_udp to true in order to prevent concurrent local cluster tests
// to use the same QUIC ports due to SO_REUSEPORT.
ValidatorTpuConfig::new_for_tests(true),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -553,13 +547,7 @@ impl LocalCluster {
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per mintute
},
ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -1089,13 +1077,7 @@ impl Cluster for LocalCluster {
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute, use higher value because of tests
},
ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down
15 changes: 11 additions & 4 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,20 @@ const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
const CONNECTION_CLOSE_CODE_INVALID_STREAM: u32 = 5;
const CONNECTION_CLOSE_REASON_INVALID_STREAM: &[u8] = b"invalid_stream";

/// Limit to 250K PPS
lijunwangs marked this conversation as resolved.
Show resolved Hide resolved
pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250;

/// The new connections per minute from a particular IP address.
/// Heuristically set to the default maximum concurrent connections
/// per IP address. Might be adjusted later.
pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u64 = 8;
#[deprecated(
since = "2.2.0",
note = "Use solana_streamer::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE"
)]
pub use crate::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE;
/// Limit to 250K PPS
#[deprecated(
since = "2.2.0",
note = "Use solana_streamer::quic::DEFAULT_MAX_STREAMS_PER_MS"
)]
pub use crate::quic::DEFAULT_MAX_STREAMS_PER_MS;

/// Total new connection counts per second. Heuristically taken from
/// the default staked and unstaked connection limits. Might be adjusted
Expand Down
16 changes: 7 additions & 9 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,8 @@ pub mod test {
use {
super::*,
crate::{
nonblocking::{
quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
},
quic::{StreamerStats, MAX_UNSTAKED_CONNECTIONS},
nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
quic::{StreamerStats, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_MAX_UNSTAKED_CONNECTIONS},
},
std::{
sync::{atomic::Ordering, Arc},
Expand All @@ -251,7 +249,7 @@ pub mod test {
fn test_max_streams_for_unstaked_connection() {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
// 25K packets per ms * 20% / 500 max unstaked connections
Expand All @@ -268,7 +266,7 @@ pub mod test {
fn test_max_streams_for_staked_connection() {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));

Expand Down Expand Up @@ -448,7 +446,7 @@ pub mod test {
fn test_update_ema() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
Expand Down Expand Up @@ -477,7 +475,7 @@ pub mod test {
fn test_update_ema_missing_interval() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
Expand All @@ -497,7 +495,7 @@ pub mod test {
fn test_update_ema_if_needed() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
Expand Down
10 changes: 5 additions & 5 deletions streamer/src/nonblocking/testing_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
use {
super::quic::{
spawn_server_multi, SpawnNonBlockingServerResult, ALPN_TPU_PROTOCOL_ID,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
crate::{
quic::{
QuicServerParams, StreamerStats, DEFAULT_TPU_COALESCE, MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
QuicServerParams, StreamerStats, DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_MAX_STAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS, DEFAULT_TPU_COALESCE,
},
streamer::StakedNodes,
},
Expand Down Expand Up @@ -64,8 +64,8 @@ impl Default for TestServerConfig {
fn default() -> Self {
Self {
max_connections_per_peer: 1,
max_staked_connections: MAX_STAKED_CONNECTIONS,
max_unstaked_connections: MAX_UNSTAKED_CONNECTIONS,
max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
}
Expand Down
Loading
Loading