Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

window svc channels struct #4977

Merged
merged 2 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ impl AncestorRepairRequestsStats {
}
}

pub struct AncestorHashesChannels {
steviez marked this conversation as resolved.
Show resolved Hide resolved
pub ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
pub ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
pub ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
}

pub struct AncestorHashesService {
thread_hdls: Vec<JoinHandle<()>>,
}
Expand All @@ -151,10 +157,8 @@ impl AncestorHashesService {
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
ancestor_hashes_request_socket: Arc<UdpSocket>,
ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
ancestor_hashes_channels: AncestorHashesChannels,
repair_info: RepairInfo,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> Self {
let outstanding_requests = Arc::<RwLock<OutstandingAncestorHashesRepairs>>::default();
let (response_sender, response_receiver) = unbounded();
Expand All @@ -173,6 +177,12 @@ impl AncestorHashesService {
false, // is_staked_service
);

let AncestorHashesChannels {
ancestor_hashes_request_quic_sender,
ancestor_hashes_response_quic_receiver,
ancestor_hashes_replay_update_receiver,
} = ancestor_hashes_channels;

let t_receiver_quic = {
let exit = exit.clone();
Builder::new()
Expand Down
76 changes: 54 additions & 22 deletions core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use {
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots_service::cluster_slots::ClusterSlots,
repair::{
ancestor_hashes_service::{AncestorHashesReplayUpdateReceiver, AncestorHashesService},
ancestor_hashes_service::{
AncestorHashesChannels, AncestorHashesReplayUpdateReceiver, AncestorHashesService,
},
duplicate_repair_status::AncestorDuplicateSlotToRepair,
outstanding_requests::OutstandingRequests,
repair_weight::RepairWeight,
Expand Down Expand Up @@ -400,27 +402,58 @@ impl Default for RepairSlotRange {
}
}

struct RepairChannels {
repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
verified_vote_receiver: VerifiedVoteReceiver,
dumped_slots_receiver: DumpedSlotsReceiver,
popular_pruned_forks_sender: PopularPrunedForksSender,
}

pub struct RepairServiceChannels {
repair_channels: RepairChannels,
ancestors_hashes_channels: AncestorHashesChannels,
}

impl RepairServiceChannels {
pub fn new(
repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
verified_vote_receiver: VerifiedVoteReceiver,
dumped_slots_receiver: DumpedSlotsReceiver,
popular_pruned_forks_sender: PopularPrunedForksSender,
ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
ancestor_hashes_response_quic_receiver: CrossbeamReceiver<(Pubkey, SocketAddr, Bytes)>,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> Self {
Self {
repair_channels: RepairChannels {
repair_request_quic_sender,
verified_vote_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
},
ancestors_hashes_channels: AncestorHashesChannels {
ancestor_hashes_request_quic_sender,
ancestor_hashes_response_quic_receiver,
ancestor_hashes_replay_update_receiver,
},
}
}
}

pub struct RepairService {
t_repair: JoinHandle<()>,
ancestor_hashes_service: AncestorHashesService,
}

impl RepairService {
#[allow(clippy::too_many_arguments)]
pub fn new(
blockstore: Arc<Blockstore>,
exit: Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
ancestor_hashes_response_quic_receiver: CrossbeamReceiver<(Pubkey, SocketAddr, Bytes)>,
repair_info: RepairInfo,
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
dumped_slots_receiver: DumpedSlotsReceiver,
popular_pruned_forks_sender: PopularPrunedForksSender,
repair_service_channels: RepairServiceChannels,
) -> Self {
let t_repair = {
let blockstore = blockstore.clone();
Expand All @@ -433,12 +466,9 @@ impl RepairService {
&blockstore,
&exit,
&repair_socket,
&repair_request_quic_sender,
repair_service_channels.repair_channels,
repair_info,
verified_vote_receiver,
&outstanding_requests,
dumped_slots_receiver,
popular_pruned_forks_sender,
)
})
.unwrap()
Expand All @@ -448,10 +478,8 @@ impl RepairService {
exit,
blockstore,
ancestor_hashes_socket,
ancestor_hashes_request_quic_sender,
ancestor_hashes_response_quic_receiver,
repair_service_channels.ancestors_hashes_channels,
repair_info,
ancestor_hashes_replay_update_receiver,
);

RepairService {
Expand All @@ -460,17 +488,13 @@ impl RepairService {
}
}

#[allow(clippy::too_many_arguments)]
fn run(
blockstore: &Blockstore,
exit: &AtomicBool,
repair_socket: &UdpSocket,
repair_request_quic_sender: &AsyncSender<(SocketAddr, Bytes)>,
repair_channels: RepairChannels,
repair_info: RepairInfo,
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
dumped_slots_receiver: DumpedSlotsReceiver,
popular_pruned_forks_sender: PopularPrunedForksSender,
) {
let mut root_bank_cache = RootBankCache::new(repair_info.bank_forks.clone());
let mut repair_weight = RepairWeight::new(root_bank_cache.root_bank().slot());
Expand All @@ -487,6 +511,14 @@ impl RepairService {
// Maps a repair that may still be outstanding to the timestamp it was requested.
let mut outstanding_repairs = HashMap::new();

let RepairChannels {
repair_request_quic_sender,
verified_vote_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
..
} = repair_channels;

while !exit.load(Ordering::Relaxed) {
let mut set_root_elapsed;
let mut dump_slots_elapsed;
Expand Down Expand Up @@ -634,7 +666,7 @@ impl RepairService {
&repair_info.repair_validators,
&mut outstanding_requests,
identity_keypair,
repair_request_quic_sender,
&repair_request_quic_sender,
repair_protocol,
)
.ok()??;
Expand Down
32 changes: 19 additions & 13 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use {
consensus::{tower_storage::TowerStorage, Tower},
cost_update_service::CostUpdateService,
drop_bank_service::DropBankService,
repair::repair_service::{OutstandingShredRepairs, RepairInfo},
repair::repair_service::{OutstandingShredRepairs, RepairInfo, RepairServiceChannels},
replay_stage::{ReplayReceivers, ReplaySenders, ReplayStage, ReplayStageConfig},
shred_fetch_stage::ShredFetchStage,
voting_service::VotingService,
warm_quic_cache_service::WarmQuicCacheService,
window_service::WindowService,
window_service::{WindowService, WindowServiceChannels},
},
bytes::Bytes,
crossbeam_channel::{unbounded, Receiver, Sender},
Expand Down Expand Up @@ -237,24 +237,30 @@ impl Tvu {
cluster_slots: cluster_slots.clone(),
wen_restart_repair_slots,
};
WindowService::new(
blockstore.clone(),
let repair_service_channels = RepairServiceChannels::new(
repair_request_quic_sender,
verified_vote_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
ancestor_hashes_request_quic_sender,
ancestor_hashes_response_quic_receiver,
ancestor_hashes_replay_update_receiver,
);
let window_service_channels = WindowServiceChannels::new(
verified_receiver,
retransmit_sender,
completed_data_sets_sender,
duplicate_slots_sender.clone(),
repair_service_channels,
);
WindowService::new(
blockstore.clone(),
repair_socket,
ancestor_hashes_socket,
repair_request_quic_sender,
ancestor_hashes_request_quic_sender,
ancestor_hashes_response_quic_receiver,
exit.clone(),
repair_info,
window_service_channels,
leader_schedule_cache.clone(),
verified_vote_receiver,
completed_data_sets_sender,
duplicate_slots_sender.clone(),
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
outstanding_repair_requests,
)
};
Expand Down
73 changes: 40 additions & 33 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,13 @@

use {
crate::{
cluster_info_vote_listener::VerifiedVoteReceiver,
completed_data_sets_service::CompletedDataSetsSender,
repair::{
ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
repair_service::{
DumpedSlotsReceiver, OutstandingShredRepairs, PopularPrunedForksSender, RepairInfo,
RepairService,
},
repair::repair_service::{
OutstandingShredRepairs, RepairInfo, RepairService, RepairServiceChannels,
},
result::{Error, Result},
},
assert_matches::debug_assert_matches,
bytes::Bytes,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
rayon::{prelude::*, ThreadPool},
solana_feature_set as feature_set,
Expand All @@ -30,21 +24,17 @@ use {
solana_metrics::inc_new_counter_error,
solana_rayon_threadlimit::get_thread_count,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
pubkey::Pubkey,
},
solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT},
solana_turbine::cluster_nodes,
std::{
net::{SocketAddr, UdpSocket},
net::UdpSocket,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
tokio::sync::mpsc::Sender as AsyncSender,
};

type DuplicateSlotSender = Sender<Slot>;
Expand Down Expand Up @@ -250,32 +240,47 @@ where
Ok(())
}

pub struct WindowServiceChannels {
pub verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
pub retransmit_sender: Sender<Vec<shred::Payload>>,
pub completed_data_sets_sender: Option<CompletedDataSetsSender>,
pub duplicate_slots_sender: DuplicateSlotSender,
pub repair_service_channels: RepairServiceChannels,
}

impl WindowServiceChannels {
pub fn new(
verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
retransmit_sender: Sender<Vec<shred::Payload>>,
completed_data_sets_sender: Option<CompletedDataSetsSender>,
duplicate_slots_sender: DuplicateSlotSender,
repair_service_channels: RepairServiceChannels,
) -> Self {
Self {
verified_receiver,
retransmit_sender,
completed_data_sets_sender,
duplicate_slots_sender,
repair_service_channels,
}
}
}

pub(crate) struct WindowService {
t_insert: JoinHandle<()>,
t_check_duplicate: JoinHandle<()>,
repair_service: RepairService,
}

impl WindowService {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
blockstore: Arc<Blockstore>,
verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
retransmit_sender: Sender<Vec<shred::Payload>>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
exit: Arc<AtomicBool>,
repair_info: RepairInfo,
window_service_channels: WindowServiceChannels,
leader_schedule_cache: Arc<LeaderScheduleCache>,
verified_vote_receiver: VerifiedVoteReceiver,
completed_data_sets_sender: Option<CompletedDataSetsSender>,
duplicate_slots_sender: DuplicateSlotSender,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
dumped_slots_receiver: DumpedSlotsReceiver,
popular_pruned_forks_sender: PopularPrunedForksSender,
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
) -> WindowService {
let cluster_info = repair_info.cluster_info.clone();
Expand All @@ -285,20 +290,22 @@ impl WindowService {
// avoid new shreds make validator OOM before wen_restart is over.
let accept_repairs_only = repair_info.wen_restart_repair_slots.is_some();

let WindowServiceChannels {
verified_receiver,
retransmit_sender,
completed_data_sets_sender,
duplicate_slots_sender,
repair_service_channels,
} = window_service_channels;

let repair_service = RepairService::new(
blockstore.clone(),
exit.clone(),
repair_socket,
ancestor_hashes_socket,
repair_request_quic_sender,
ancestor_hashes_request_quic_sender,
ancestor_hashes_response_quic_receiver,
repair_info,
verified_vote_receiver,
outstanding_repair_requests.clone(),
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
repair_service_channels,
);

let (duplicate_sender, duplicate_receiver) = unbounded();
Expand Down
Loading