From 824d679d54abc7b243cee2336b87d9a8ab327a7a Mon Sep 17 00:00:00 2001 From: Brennan Date: Thu, 13 Feb 2025 18:35:41 +0000 Subject: [PATCH 1/2] window svc channels struct --- core/src/repair/ancestor_hashes_service.rs | 16 ++++- core/src/repair/repair_service.rs | 74 ++++++++++++++++------ core/src/tvu.rs | 32 ++++++---- core/src/window_service.rs | 73 +++++++++++---------- 4 files changed, 125 insertions(+), 70 deletions(-) diff --git a/core/src/repair/ancestor_hashes_service.rs b/core/src/repair/ancestor_hashes_service.rs index 97b8cd865adbc6..0e6a65c2c2a70e 100644 --- a/core/src/repair/ancestor_hashes_service.rs +++ b/core/src/repair/ancestor_hashes_service.rs @@ -142,6 +142,12 @@ impl AncestorRepairRequestsStats { } } +pub struct AncestorHashesChannels { + pub ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, + pub ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, + pub ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, +} + pub struct AncestorHashesService { thread_hdls: Vec>, } @@ -151,10 +157,8 @@ impl AncestorHashesService { exit: Arc, blockstore: Arc, ancestor_hashes_request_socket: Arc, - ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, - ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, + ancestor_hashes_channels: AncestorHashesChannels, repair_info: RepairInfo, - ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ) -> Self { let outstanding_requests = Arc::>::default(); let (response_sender, response_receiver) = unbounded(); @@ -173,6 +177,12 @@ impl AncestorHashesService { false, // is_staked_service ); + let AncestorHashesChannels { + ancestor_hashes_request_quic_sender, + ancestor_hashes_response_quic_receiver, + ancestor_hashes_replay_update_receiver, + } = ancestor_hashes_channels; + let t_receiver_quic = { let exit = exit.clone(); Builder::new() diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index ed6f33330339d5..cc9024bd47dcf0 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -10,7 +10,9 @@ use { cluster_info_vote_listener::VerifiedVoteReceiver, cluster_slots_service::cluster_slots::ClusterSlots, repair::{ - ancestor_hashes_service::{AncestorHashesReplayUpdateReceiver, AncestorHashesService}, + ancestor_hashes_service::{ + AncestorHashesChannels, AncestorHashesReplayUpdateReceiver, AncestorHashesService, + }, duplicate_repair_status::AncestorDuplicateSlotToRepair, outstanding_requests::OutstandingRequests, repair_weight::RepairWeight, @@ -400,27 +402,58 @@ impl Default for RepairSlotRange { } } +struct RepairChannels { + repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, + verified_vote_receiver: VerifiedVoteReceiver, + dumped_slots_receiver: DumpedSlotsReceiver, + popular_pruned_forks_sender: PopularPrunedForksSender, +} + +pub struct RepairServiceChannels { + repair_channels: RepairChannels, + ancestors_hashes_channels: AncestorHashesChannels, +} + +impl RepairServiceChannels { + pub fn new( + repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, + verified_vote_receiver: VerifiedVoteReceiver, + dumped_slots_receiver: DumpedSlotsReceiver, + popular_pruned_forks_sender: PopularPrunedForksSender, + ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, + ancestor_hashes_response_quic_receiver: CrossbeamReceiver<(Pubkey, SocketAddr, Bytes)>, + ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, + ) -> Self { + Self { + repair_channels: RepairChannels { + repair_request_quic_sender, + verified_vote_receiver, + dumped_slots_receiver, + popular_pruned_forks_sender, + }, + ancestors_hashes_channels: AncestorHashesChannels { + ancestor_hashes_request_quic_sender, + ancestor_hashes_response_quic_receiver, + ancestor_hashes_replay_update_receiver, + }, + } + } +} + pub struct RepairService { t_repair: JoinHandle<()>, ancestor_hashes_service: AncestorHashesService, } impl RepairService { - #[allow(clippy::too_many_arguments)] pub fn new( blockstore: Arc, exit: Arc, repair_socket: Arc, ancestor_hashes_socket: Arc, - repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, - ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, - ancestor_hashes_response_quic_receiver: CrossbeamReceiver<(Pubkey, SocketAddr, Bytes)>, repair_info: RepairInfo, - verified_vote_receiver: VerifiedVoteReceiver, outstanding_requests: Arc>, - ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, - dumped_slots_receiver: DumpedSlotsReceiver, - popular_pruned_forks_sender: PopularPrunedForksSender, + repair_service_channels: RepairServiceChannels, ) -> Self { let t_repair = { let blockstore = blockstore.clone(); @@ -433,12 +466,9 @@ impl RepairService { &blockstore, &exit, &repair_socket, - &repair_request_quic_sender, + &repair_service_channels.repair_channels, repair_info, - verified_vote_receiver, &outstanding_requests, - dumped_slots_receiver, - popular_pruned_forks_sender, ) }) .unwrap() @@ -448,10 +478,8 @@ impl RepairService { exit, blockstore, ancestor_hashes_socket, - ancestor_hashes_request_quic_sender, - ancestor_hashes_response_quic_receiver, + repair_service_channels.ancestors_hashes_channels, repair_info, - ancestor_hashes_replay_update_receiver, ); RepairService { @@ -460,17 +488,13 @@ impl RepairService { } } - #[allow(clippy::too_many_arguments)] fn run( blockstore: &Blockstore, exit: &AtomicBool, repair_socket: &UdpSocket, - repair_request_quic_sender: &AsyncSender<(SocketAddr, Bytes)>, + repair_channels: &RepairChannels, repair_info: RepairInfo, - verified_vote_receiver: VerifiedVoteReceiver, outstanding_requests: &RwLock, - dumped_slots_receiver: DumpedSlotsReceiver, - popular_pruned_forks_sender: PopularPrunedForksSender, ) { let mut root_bank_cache = RootBankCache::new(repair_info.bank_forks.clone()); let mut repair_weight = RepairWeight::new(root_bank_cache.root_bank().slot()); @@ -487,6 +511,14 @@ impl RepairService { // Maps a repair that may still be outstanding to the timestamp it was requested. let mut outstanding_repairs = HashMap::new(); + let RepairChannels { + repair_request_quic_sender, + verified_vote_receiver, + dumped_slots_receiver, + popular_pruned_forks_sender, + .. + } = repair_channels; + while !exit.load(Ordering::Relaxed) { let mut set_root_elapsed; let mut dump_slots_elapsed; diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 934ced1eac397e..4416e4153222a7 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -13,12 +13,12 @@ use { consensus::{tower_storage::TowerStorage, Tower}, cost_update_service::CostUpdateService, drop_bank_service::DropBankService, - repair::repair_service::{OutstandingShredRepairs, RepairInfo}, + repair::repair_service::{OutstandingShredRepairs, RepairInfo, RepairServiceChannels}, replay_stage::{ReplayReceivers, ReplaySenders, ReplayStage, ReplayStageConfig}, shred_fetch_stage::ShredFetchStage, voting_service::VotingService, warm_quic_cache_service::WarmQuicCacheService, - window_service::WindowService, + window_service::{WindowService, WindowServiceChannels}, }, bytes::Bytes, crossbeam_channel::{unbounded, Receiver, Sender}, @@ -237,24 +237,30 @@ impl Tvu { cluster_slots: cluster_slots.clone(), wen_restart_repair_slots, }; - WindowService::new( - blockstore.clone(), + let repair_service_channels = RepairServiceChannels::new( + repair_request_quic_sender, + verified_vote_receiver, + dumped_slots_receiver, + popular_pruned_forks_sender, + ancestor_hashes_request_quic_sender, + ancestor_hashes_response_quic_receiver, + ancestor_hashes_replay_update_receiver, + ); + let window_service_channels = WindowServiceChannels::new( verified_receiver, retransmit_sender, + completed_data_sets_sender, + duplicate_slots_sender.clone(), + repair_service_channels, + ); + WindowService::new( + blockstore.clone(), repair_socket, ancestor_hashes_socket, - repair_request_quic_sender, - ancestor_hashes_request_quic_sender, - ancestor_hashes_response_quic_receiver, exit.clone(), repair_info, + window_service_channels, leader_schedule_cache.clone(), - verified_vote_receiver, - completed_data_sets_sender, - duplicate_slots_sender.clone(), - ancestor_hashes_replay_update_receiver, - dumped_slots_receiver, - popular_pruned_forks_sender, outstanding_repair_requests, ) }; diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 07adae2d8a18e4..f14bbce8a71357 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -4,19 +4,13 @@ use { crate::{ - cluster_info_vote_listener::VerifiedVoteReceiver, completed_data_sets_service::CompletedDataSetsSender, - repair::{ - ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, - repair_service::{ - DumpedSlotsReceiver, OutstandingShredRepairs, PopularPrunedForksSender, RepairInfo, - RepairService, - }, + repair::repair_service::{ + OutstandingShredRepairs, RepairInfo, RepairService, RepairServiceChannels, }, result::{Error, Result}, }, assert_matches::debug_assert_matches, - bytes::Bytes, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, rayon::{prelude::*, ThreadPool}, solana_feature_set as feature_set, @@ -30,13 +24,10 @@ use { solana_metrics::inc_new_counter_error, solana_rayon_threadlimit::get_thread_count, solana_runtime::bank_forks::BankForks, - solana_sdk::{ - clock::{Slot, DEFAULT_MS_PER_SLOT}, - pubkey::Pubkey, - }, + solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}, solana_turbine::cluster_nodes, std::{ - net::{SocketAddr, UdpSocket}, + net::UdpSocket, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, RwLock, @@ -44,7 +35,6 @@ use { thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, }, - tokio::sync::mpsc::Sender as AsyncSender, }; type DuplicateSlotSender = Sender; @@ -250,6 +240,32 @@ where Ok(()) } +pub struct WindowServiceChannels { + pub verified_receiver: Receiver>, + pub retransmit_sender: Sender>, + pub completed_data_sets_sender: Option, + pub duplicate_slots_sender: DuplicateSlotSender, + pub repair_service_channels: RepairServiceChannels, +} + +impl WindowServiceChannels { + pub fn new( + verified_receiver: Receiver>, + retransmit_sender: Sender>, + completed_data_sets_sender: Option, + duplicate_slots_sender: DuplicateSlotSender, + repair_service_channels: RepairServiceChannels, + ) -> Self { + Self { + verified_receiver, + retransmit_sender, + completed_data_sets_sender, + duplicate_slots_sender, + repair_service_channels, + } + } +} + pub(crate) struct WindowService { t_insert: JoinHandle<()>, t_check_duplicate: JoinHandle<()>, @@ -257,25 +273,14 @@ pub(crate) struct WindowService { } impl WindowService { - #[allow(clippy::too_many_arguments)] pub(crate) fn new( blockstore: Arc, - verified_receiver: Receiver>, - retransmit_sender: Sender>, repair_socket: Arc, ancestor_hashes_socket: Arc, - repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, - ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, - ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, exit: Arc, repair_info: RepairInfo, + window_service_channels: WindowServiceChannels, leader_schedule_cache: Arc, - verified_vote_receiver: VerifiedVoteReceiver, - completed_data_sets_sender: Option, - duplicate_slots_sender: DuplicateSlotSender, - ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, - dumped_slots_receiver: DumpedSlotsReceiver, - popular_pruned_forks_sender: PopularPrunedForksSender, outstanding_repair_requests: Arc>, ) -> WindowService { let cluster_info = repair_info.cluster_info.clone(); @@ -285,20 +290,22 @@ impl WindowService { // avoid new shreds make validator OOM before wen_restart is over. let accept_repairs_only = repair_info.wen_restart_repair_slots.is_some(); + let WindowServiceChannels { + verified_receiver, + retransmit_sender, + completed_data_sets_sender, + duplicate_slots_sender, + repair_service_channels, + } = window_service_channels; + let repair_service = RepairService::new( blockstore.clone(), exit.clone(), repair_socket, ancestor_hashes_socket, - repair_request_quic_sender, - ancestor_hashes_request_quic_sender, - ancestor_hashes_response_quic_receiver, repair_info, - verified_vote_receiver, outstanding_repair_requests.clone(), - ancestor_hashes_replay_update_receiver, - dumped_slots_receiver, - popular_pruned_forks_sender, + repair_service_channels, ); let (duplicate_sender, duplicate_receiver) = unbounded(); From 7e107138bb83c4f990b7de22e5d378524987a0bc Mon Sep 17 00:00:00 2001 From: Brennan Date: Fri, 14 Feb 2025 18:12:07 +0000 Subject: [PATCH 2/2] owned repair channels --- core/src/repair/repair_service.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index cc9024bd47dcf0..9ad2be56190b82 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -466,7 +466,7 @@ impl RepairService { &blockstore, &exit, &repair_socket, - &repair_service_channels.repair_channels, + repair_service_channels.repair_channels, repair_info, &outstanding_requests, ) @@ -492,7 +492,7 @@ impl RepairService { blockstore: &Blockstore, exit: &AtomicBool, repair_socket: &UdpSocket, - repair_channels: &RepairChannels, + repair_channels: RepairChannels, repair_info: RepairInfo, outstanding_requests: &RwLock, ) { @@ -666,7 +666,7 @@ impl RepairService { &repair_info.repair_validators, &mut outstanding_requests, identity_keypair, - repair_request_quic_sender, + &repair_request_quic_sender, repair_protocol, ) .ok()??;