Skip to content

Commit

Permalink
removes redundant vector allocations before calling sendmmsg::batch_send
Browse files Browse the repository at this point in the history
streamer::sendmmsg::batch_send only requires an ExactSizeIterator:
https://github.com/anza-xyz/agave/blob/566bb9565/streamer/src/sendmmsg.rs#L203-L204
https://github.com/anza-xyz/agave/blob/566bb9565/streamer/src/sendmmsg.rs#L166-L175

Collecting an iterator into a vector before calling batch_send is
unnecessary and only adds overhead.
In particular multi_target_send used in retransmitting shreds can be use
without doing an additional vector allocation:
https://github.com/anza-xyz/agave/blob/566bb9565/streamer/src/sendmmsg.rs#L219
  • Loading branch information
behzadnouri committed Feb 8, 2025
1 parent 566bb95 commit 1e221d9
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 55 deletions.
5 changes: 2 additions & 3 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use {
solana_sdk::{pubkey::Pubkey, transport::TransportError},
solana_streamer::sendmmsg::batch_send,
std::{
iter::repeat,
net::{SocketAddr, UdpSocket},
sync::{atomic::Ordering, Arc, RwLock},
},
Expand Down Expand Up @@ -281,8 +280,8 @@ impl<T: LikeClusterInfo> Forwarder<T> {
match forward_option {
ForwardOption::ForwardTpuVote => {
// The vote must be forwarded using only UDP.
let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(*addr)).collect();
batch_send(&self.socket, &pkts).map_err(|err| err.into())
let pkts = packet_vec.iter().map(|pkt| (pkt, addr));
batch_send(&self.socket, pkts).map_err(TransportError::from)
}
ForwardOption::ForwardTransaction => {
let conn = self.connection_cache.get_connection(addr);
Expand Down
3 changes: 2 additions & 1 deletion core/src/forwarding_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ impl VoteClient {
}

fn send_batch(&self, batch: &mut Vec<(Vec<u8>, SocketAddr)>) {
let _res = batch_send(&self.udp_socket, batch);
let pkts = batch.iter().map(|(bytes, addr)| (bytes, addr));
let _res = batch_send(&self.udp_socket, pkts);
batch.clear();
}
}
Expand Down
14 changes: 6 additions & 8 deletions core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,15 +646,13 @@ impl RepairService {

let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed");
if !batch.is_empty() {
match batch_send(repair_socket, &batch) {
let num_pkts = batch.len();
let batch = batch.iter().map(|(bytes, addr)| (bytes, addr));
match batch_send(repair_socket, batch) {
Ok(()) => (),
Err(SendPktsError::IoError(err, num_failed)) => {
error!(
"{} batch_send failed to send {}/{} packets first error {:?}",
id,
num_failed,
batch.len(),
err
"{id} batch_send failed to send {num_failed}/{num_pkts} packets first error {err:?}"
);
}
}
Expand Down Expand Up @@ -954,10 +952,10 @@ impl RepairService {
ServeRepair::repair_proto_to_bytes(&request_proto, &identity_keypair).unwrap();

// Prepare packet batch to send
let reqs = [(packet_buf, address)];
let reqs = [(&packet_buf, address)];

// Send packet batch
match batch_send(repair_socket, &reqs[..]) {
match batch_send(repair_socket, reqs) {
Ok(()) => {
debug!("successfully sent repair request to {pubkey} / {address}!");
}
Expand Down
9 changes: 4 additions & 5 deletions core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,14 +1249,13 @@ impl ServeRepair {
}
}
if !pending_pongs.is_empty() {
match batch_send(repair_socket, &pending_pongs) {
let num_pkts = pending_pongs.len();
let pending_pongs = pending_pongs.iter().map(|(bytes, addr)| (bytes, addr));
match batch_send(repair_socket, pending_pongs) {
Ok(()) => (),
Err(SendPktsError::IoError(err, num_failed)) => {
warn!(
"batch_send failed to send {}/{} packets. First error: {:?}",
num_failed,
pending_pongs.len(),
err
"batch_send failed to send {num_failed}/{num_pkts} packets. First error: {err:?}"
);
}
}
Expand Down
54 changes: 36 additions & 18 deletions streamer/src/sendmmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use {
std::{
borrow::Borrow,
io,
iter::repeat,
net::{SocketAddr, UdpSocket},
},
thiserror::Error,
Expand All @@ -35,11 +34,15 @@ impl From<SendPktsError> for TransportError {
}
}

// The type and lifetime constraints are overspecified to match 'linux' code.
#[cfg(not(target_os = "linux"))]
pub fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
pub fn batch_send<'a, S, T: 'a + ?Sized>(
sock: &UdpSocket,
packets: impl IntoIterator<Item = (&'a T, S), IntoIter: ExactSizeIterator>,
) -> Result<(), SendPktsError>
where
S: Borrow<SocketAddr>,
T: AsRef<[u8]>,
&'a T: AsRef<[u8]>,
{
let mut num_failed = 0;
let mut erropt = None;
Expand Down Expand Up @@ -158,11 +161,16 @@ fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut [mmsghdr]) -> Result<(), SendPkts
const MAX_IOV: usize = libc::UIO_MAXIOV as usize;

#[cfg(target_os = "linux")]
pub fn batch_send_max_iov<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
pub fn batch_send_max_iov<'a, S, T: 'a + ?Sized>(
sock: &UdpSocket,
packets: impl IntoIterator<Item = (&'a T, S), IntoIter: ExactSizeIterator>,
) -> Result<(), SendPktsError>
where
S: Borrow<SocketAddr>,
T: AsRef<[u8]>,
&'a T: AsRef<[u8]>,
{
let packets = packets.into_iter();
let num_packets = packets.len();
assert!(packets.len() <= MAX_IOV);

let mut iovs = [MaybeUninit::uninit(); MAX_IOV];
Expand All @@ -177,13 +185,13 @@ where
// SAFETY: The first `packets.len()` elements of `hdrs`, `iovs`, and `addrs` are
// guaranteed to be initialized by `mmsghdr_for_packet` before this loop.
let hdrs_slice =
unsafe { std::slice::from_raw_parts_mut(hdrs.as_mut_ptr() as *mut mmsghdr, packets.len()) };
unsafe { std::slice::from_raw_parts_mut(hdrs.as_mut_ptr() as *mut mmsghdr, num_packets) };

let result = sendmmsg_retry(sock, hdrs_slice);

// SAFETY: The first `packets.len()` elements of `hdrs`, `iovs`, and `addrs` are
// guaranteed to be initialized by `mmsghdr_for_packet` before this loop.
for (hdr, iov, addr) in izip!(&mut hdrs, &mut iovs, &mut addrs).take(packets.len()) {
for (hdr, iov, addr) in izip!(&mut hdrs, &mut iovs, &mut addrs).take(num_packets) {
unsafe {
hdr.assume_init_drop();
iov.assume_init_drop();
Expand All @@ -194,13 +202,23 @@ where
result
}

// Need &'a to ensure that raw packet pointers obtained
// in mmsghdr_for_packet stay valid.
#[cfg(target_os = "linux")]
pub fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
pub fn batch_send<'a, S, T: 'a + ?Sized>(
sock: &UdpSocket,
packets: impl IntoIterator<Item = (&'a T, S), IntoIter: ExactSizeIterator>,
) -> Result<(), SendPktsError>
where
S: Borrow<SocketAddr>,
T: AsRef<[u8]>,
&'a T: AsRef<[u8]>,
{
for chunk in packets.chunks(MAX_IOV) {
let mut packets = packets.into_iter();
loop {
let chunk = packets.by_ref().take(MAX_IOV);
if chunk.len() == 0 {
break;
}
batch_send_max_iov(sock, chunk)?;
}
Ok(())
Expand All @@ -216,8 +234,8 @@ where
T: AsRef<[u8]>,
{
let dests = dests.iter().map(Borrow::borrow);
let pkts: Vec<_> = repeat(&packet).zip(dests).collect();
batch_send(sock, &pkts)
let pkts = dests.map(|addr| (&packet, addr));
batch_send(sock, pkts)
}

#[cfg(test)]
Expand Down Expand Up @@ -246,7 +264,7 @@ mod tests {
let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect();

let sent = batch_send(&sender, &packet_refs[..]).ok();
let sent = batch_send(&sender, packet_refs).ok();
assert_eq!(sent, Some(()));

let mut packets = vec![Packet::default(); 32];
Expand Down Expand Up @@ -277,7 +295,7 @@ mod tests {
})
.collect();

let sent = batch_send(&sender, &packet_refs[..]).ok();
let sent = batch_send(&sender, packet_refs).ok();
assert_eq!(sent, Some(()));

let mut packets = vec![Packet::default(); 32];
Expand Down Expand Up @@ -345,7 +363,7 @@ mod tests {
let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4];

let sender = bind_to_unspecified().expect("bind");
let res = batch_send(&sender, &packet_refs[..]);
let res = batch_send(&sender, packet_refs);
assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1)));
let res = multi_target_send(&sender, &packets[0], &dest_refs);
assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1)));
Expand All @@ -366,7 +384,7 @@ mod tests {
(&packets[3][..], &ipv4broadcast),
(&packets[4][..], &ipv4local),
];
match batch_send(&sender, &packet_refs[..]) {
match batch_send(&sender, packet_refs) {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
Expand All @@ -382,7 +400,7 @@ mod tests {
(&packets[3][..], &ipv4local),
(&packets[4][..], &ipv4broadcast),
];
match batch_send(&sender, &packet_refs[..]) {
match batch_send(&sender, packet_refs) {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
Expand All @@ -398,7 +416,7 @@ mod tests {
(&packets[3][..], &ipv4broadcast),
(&packets[4][..], &ipv4local),
];
match batch_send(&sender, &packet_refs[..]) {
match batch_send(&sender, packet_refs) {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
Expand Down
2 changes: 1 addition & 1 deletion streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ fn recv_send(
let data = pkt.data(..)?;
socket_addr_space.check(&addr).then_some((data, addr))
});
batch_send(sock, &packets.collect::<Vec<_>>())?;
batch_send(sock, packets.collect::<Vec<_>>())?;
Ok(())
}

Expand Down
5 changes: 3 additions & 2 deletions turbine/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,9 @@ pub fn broadcast_shreds(
shred_select.stop();
transmit_stats.shred_select += shred_select.as_us();

let num_udp_packets = packets.len();
let mut send_mmsg_time = Measure::start("send_mmsg");
match batch_send(s, &packets[..]) {
match batch_send(s, packets) {
Ok(()) => (),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
transmit_stats.dropped_packets_udp += num_failed;
Expand All @@ -487,7 +488,7 @@ pub fn broadcast_shreds(
}
send_mmsg_time.stop();
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
transmit_stats.total_packets += packets.len() + quic_packets.len();
transmit_stats.total_packets += num_udp_packets + quic_packets.len();
for (shred, addr) in quic_packets {
let shred = Bytes::from(shred::Payload::unwrap_or_clone(shred.clone()));
if let Err(err) = quic_endpoint_sender.blocking_send((addr, shred)) {
Expand Down
8 changes: 1 addition & 7 deletions turbine/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
.flatten()
.collect();

match batch_send(sock, &packets) {
Ok(()) => (),
Err(SendPktsError::IoError(ioerr, _)) => {
return Err(Error::Io(ioerr));
}
}
Ok(())
batch_send(sock, packets).map_err(|SendPktsError::IoError(err, _)| Error::Io(err))
}

fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> {
Expand Down
16 changes: 6 additions & 10 deletions udp-client/src/udp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//! an interface for sending data
use {
core::iter::repeat,
solana_connection_cache::client_connection::ClientConnection,
solana_streamer::sendmmsg::batch_send,
solana_transaction_error::TransportResult,
Expand Down Expand Up @@ -37,18 +36,15 @@ impl ClientConnection for UdpClientConnection {
}

fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
let pkts: Vec<_> = buffers.iter().zip(repeat(self.server_addr())).collect();
batch_send(&self.socket, &pkts)?;
Ok(())
let addr = self.server_addr();
let pkts = buffers.iter().map(|bytes| (bytes, addr));
Ok(batch_send(&self.socket, pkts)?)
}

fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
let pkts: Vec<_> = buffers
.into_iter()
.zip(repeat(self.server_addr()))
.collect();
batch_send(&self.socket, &pkts)?;
Ok(())
let addr = self.server_addr();
let pkts = buffers.iter().map(|bytes| (bytes, addr));
Ok(batch_send(&self.socket, pkts)?)
}

fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
Expand Down

0 comments on commit 1e221d9

Please sign in to comment.