Skip to content

Commit

Permalink
Speed up request buffered hashes (#6318)
Browse files Browse the repository at this point in the history
Co-authored-by: Bjerg <[email protected]>
Co-authored-by: back <[email protected]>
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
4 people authored Feb 13, 2024
1 parent f925e5f commit c0f3d38
Show file tree
Hide file tree
Showing 13 changed files with 1,306 additions and 829 deletions.
166 changes: 81 additions & 85 deletions crates/net/eth-wire/src/types/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use crate::{EthMessage, EthVersion};
use alloy_rlp::{
Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper,
};
use derive_more::{Deref, DerefMut, IntoIterator};

use derive_more::{Constructor, Deref, DerefMut, IntoIterator};
use reth_codecs::derive_arbitrary;
use reth_primitives::{Block, Bytes, TransactionSigned, TxHash, B256, U128};

Expand Down Expand Up @@ -451,10 +452,12 @@ pub trait HandleAnnouncement {
/// Returns the number of entries.
fn len(&self) -> usize;

/// Retain only entries for which the hash in the entry, satisfies a given predicate.
fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool);
/// Retain only entries for which the hash in the entry satisfies a given predicate, return
/// the rest.
fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self;

/// Returns the announcement version, either [`EthVersion::Eth66`] or [`EthVersion::Eth68`].
/// Returns the announcement version, either [`Eth66`](EthVersion::Eth66) or
/// [`Eth68`](EthVersion::Eth68).
fn msg_version(&self) -> EthVersion;
}

Expand All @@ -467,10 +470,10 @@ impl HandleAnnouncement for NewPooledTransactionHashes {
self.len()
}

fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool) {
fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self {
match self {
NewPooledTransactionHashes::Eth66(msg) => msg.retain_by_hash(f),
NewPooledTransactionHashes::Eth68(msg) => msg.retain_by_hash(f),
NewPooledTransactionHashes::Eth66(msg) => Self::Eth66(msg.retain_by_hash(f)),
NewPooledTransactionHashes::Eth68(msg) => Self::Eth68(msg.retain_by_hash(f)),
}
}

Expand All @@ -488,19 +491,28 @@ impl HandleAnnouncement for NewPooledTransactionHashes68 {
self.hashes.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, &hash) in self.hashes.iter().enumerate() {
for (i, hash) in self.hashes.iter().enumerate() {
if !f(hash) {
indices_to_remove.push(i);
}
}

let mut removed_hashes = Vec::with_capacity(indices_to_remove.len());
let mut removed_types = Vec::with_capacity(indices_to_remove.len());
let mut removed_sizes = Vec::with_capacity(indices_to_remove.len());

for index in indices_to_remove.into_iter().rev() {
self.hashes.remove(index);
self.types.remove(index);
self.sizes.remove(index);
let hash = self.hashes.remove(index);
removed_hashes.push(hash);
let ty = self.types.remove(index);
removed_types.push(ty);
let size = self.sizes.remove(index);
removed_sizes.push(size);
}

Self { hashes: removed_hashes, types: removed_types, sizes: removed_sizes }
}

fn msg_version(&self) -> EthVersion {
Expand All @@ -517,17 +529,22 @@ impl HandleAnnouncement for NewPooledTransactionHashes66 {
self.0.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, &hash) in self.0.iter().enumerate() {
for (i, hash) in self.0.iter().enumerate() {
if !f(hash) {
indices_to_remove.push(i);
}
}

let mut removed_hashes = Vec::with_capacity(indices_to_remove.len());

for index in indices_to_remove.into_iter().rev() {
self.0.remove(index);
let hash = self.0.remove(index);
removed_hashes.push(hash);
}

Self(removed_hashes)
}

fn msg_version(&self) -> EthVersion {
Expand All @@ -538,42 +555,53 @@ impl HandleAnnouncement for NewPooledTransactionHashes66 {
/// Announcement data that has been validated according to the configured network. For an eth68
/// announcement, values of the map are `Some((u8, usize))` - the tx metadata. For an eth66
/// announcement, values of the map are `None`.
#[derive(Debug, IntoIterator)]
#[derive(Debug, Deref, DerefMut, IntoIterator, Constructor)]
pub struct ValidAnnouncementData {
#[deref]
#[deref_mut]
#[into_iterator]
data: HashMap<TxHash, Option<(u8, usize)>>,
version: EthVersion,
}

impl ValidAnnouncementData {
/// Returns a new [`ValidAnnouncementData`] wrapper around validated [`EthVersion::Eth68`]
/// announcement data.
/// Returns a new [`ValidAnnouncementData`] wrapper around validated
/// [`Eth68`](EthVersion::Eth68) announcement data.
pub fn new_eth68(data: HashMap<TxHash, Option<(u8, usize)>>) -> Self {
Self { data, version: EthVersion::Eth68 }
Self::new(data, EthVersion::Eth68)
}

/// Returns a new [`ValidAnnouncementData`] wrapper around validated [`EthVersion::Eth68`]
/// announcement data.
/// Returns a new [`ValidAnnouncementData`] wrapper around validated
/// [`Eth68`](EthVersion::Eth68) announcement data.
pub fn new_eth66(data: HashMap<TxHash, Option<(u8, usize)>>) -> Self {
Self { data, version: EthVersion::Eth66 }
Self::new(data, EthVersion::Eth66)
}

/// Returns a new [`ValidAnnouncementData`] with empty data for an [`EthVersion::Eth68`]
/// Returns a new [`ValidAnnouncementData`] with empty data from an [`Eth68`](EthVersion::Eth68)
/// announcement.
pub fn empty_eth68() -> Self {
Self { data: HashMap::new(), version: EthVersion::Eth68 }
Self::new_eth68(HashMap::new())
}

/// Returns a new [`ValidAnnouncementData`] with empty data for an [`EthVersion::Eth66`]
/// Returns a new [`ValidAnnouncementData`] with empty data from an [`Eth66`](EthVersion::Eth66)
/// announcement.
pub fn empty_eth66() -> Self {
Self { data: HashMap::new(), version: EthVersion::Eth66 }
Self::new_eth66(HashMap::new())
}

/// Destructs returning the validated data.
pub fn into_data(self) -> HashMap<TxHash, Option<(u8, usize)>> {
self.data
}

/// Destructs returning only the valid hashes and the announcement message version. Caution! If
/// this is [`Eth68`](EthVersion::Eth68)announcement data, the metadata must be cached
/// before call.
pub fn into_request_hashes(self) -> (RequestTxHashes, EthVersion) {
let hashes = self.data.into_keys().collect::<Vec<_>>();

(RequestTxHashes::new(hashes), self.version)
}
}

impl HandleAnnouncement for ValidAnnouncementData {
Expand All @@ -585,83 +613,51 @@ impl HandleAnnouncement for ValidAnnouncementData {
self.data.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
self.data.retain(|&hash, _| f(hash))
fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let data = std::mem::take(&mut self.data);

let (keep, rest) = data.into_iter().partition(|(hash, _)| f(hash));

self.data = keep;

ValidAnnouncementData::new(rest, self.version)
}

fn msg_version(&self) -> EthVersion {
self.version
}
}

/// Hashes extracted from valid announcement data. For an eth68 announcement, this means the eth68
/// metadata should have been cached already.
#[derive(Debug, Deref, DerefMut, IntoIterator)]
pub struct ValidTxHashes {
/// Hashes to request from a peer.
#[derive(Debug, Default, Deref, DerefMut, IntoIterator, Constructor)]
pub struct RequestTxHashes {
#[deref]
#[deref_mut]
#[into_iterator]
#[into_iterator(owned, ref)]
hashes: Vec<TxHash>,
version: EthVersion,
}

impl ValidTxHashes {
/// Returns a new [`ValidTxHashes`] wrapper around validated hashes. Takes a list of validated
/// hashes as parameter along with the eth version.
pub fn new(hashes: Vec<TxHash>, version: EthVersion) -> Self {
Self { hashes, version }
}

/// Returns a new [`ValidTxHashes`] wrapper around validated hashes from valid
/// [`EthVersion::Eth68`] announcement data. Takes a list of validated hashes as parameter.
pub fn new_eth68(hashes: Vec<TxHash>) -> Self {
Self::new(hashes, EthVersion::Eth68)
}

/// Returns a new [`ValidTxHashes`] wrapper around validated hashes from valid
/// [`EthVersion::Eth66`] announcement data. Takes a list of validated hashes as parameter.
pub fn new_eth66(hashes: Vec<TxHash>) -> Self {
Self::new(hashes, EthVersion::Eth66)
}

/// Returns a new [`ValidTxHashes`] with empty hashes.
pub fn empty(version: EthVersion) -> Self {
Self { hashes: vec![], version }
}

/// Returns a new [`ValidTxHashes`] with empty hashes for an [`EthVersion::Eth68`]
/// announcement.
pub fn empty_eth68() -> Self {
Self::empty(EthVersion::Eth68)
}

/// Returns a new [`ValidTxHashes`] with empty hashes for an [`EthVersion::Eth66`]
/// announcement.
pub fn empty_eth66() -> Self {
Self::empty(EthVersion::Eth66)
}

/// Destructs returning the validated hashes.
pub fn into_hashes(self) -> Vec<TxHash> {
self.hashes
impl RequestTxHashes {
/// Returns a new [`RequestTxHashes`] with given capacity for hashes. Caution! Make sure to
/// call [`Vec::shrink_to_fit`] on [`RequestTxHashes`] when full, especially where it will be
/// stored in its entirety like in the future waiting for a
/// [`GetPooledTransactions`](crate::GetPooledTransactions) request to resolve.
pub fn with_capacity(capacity: usize) -> Self {
Self::new(Vec::with_capacity(capacity))
}
}

impl HandleAnnouncement for ValidTxHashes {
fn is_empty(&self) -> bool {
self.hashes.is_empty()
}
impl FromIterator<(TxHash, Option<(u8, usize)>)> for RequestTxHashes {
fn from_iter<I: IntoIterator<Item = (TxHash, Option<(u8, usize)>)>>(iter: I) -> Self {
let mut hashes = Vec::with_capacity(32);

fn len(&self) -> usize {
self.hashes.len()
}
for (hash, _) in iter {
hashes.push(hash);
}

fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
self.hashes.retain(|&hash| f(hash))
}
hashes.shrink_to_fit();

fn msg_version(&self) -> EthVersion {
self.version
RequestTxHashes::new(hashes)
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/net/network/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ where
}

/// Wrapper of [`schnellru::LruMap`] that implements [`fmt::Debug`].
#[derive(Deref, DerefMut)]
#[derive(Deref, DerefMut, Default)]
pub struct LruMap<K, V, L = ByLength, S = RandomState>(schnellru::LruMap<K, V, L, S>)
where
K: Hash + PartialEq,
Expand Down
20 changes: 18 additions & 2 deletions crates/net/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct SessionManagerMetrics {
pub(crate) total_dial_successes: Counter,
}

/// Metrics for the TransactionsManager
/// Metrics for the [`TransactionsManager`](crate::transactions::TransactionsManager).
#[derive(Metrics)]
#[metrics(scope = "network")]
pub struct TransactionsManagerMetrics {
Expand All @@ -67,10 +67,26 @@ pub struct TransactionsManagerMetrics {
pub(crate) messages_with_already_seen_transactions: Counter,
/// Number of transactions about to be imported into the pool.
pub(crate) pending_pool_imports: Gauge,
/// Currently active outgoing GetPooledTransactions requests.
/// Number of inflight requests at which the
/// [`TransactionPool`](reth_transaction_pool::TransactionPool) is considered to be at
/// capacity. Note, this is not a limit to the number of inflight requests, but a health
/// measure.
pub(crate) capacity_pending_pool_imports: Counter,
/// Currently active outgoing [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions)
/// requests.
pub(crate) inflight_transaction_requests: Gauge,
/// Number of inflight requests at which the
/// [`TransactionFetcher`](crate::transactions::TransactionFetcher) is considered to be at
/// capacity. Note, this is not a limit to the number of inflight requests, but a health
/// measure.
pub(crate) capacity_inflight_requests: Counter,
/// Hashes in currently active outgoing
/// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests.
pub(crate) hashes_inflight_transaction_requests: Gauge,
/// How often we failed to send a request to the peer because the channel was full.
pub(crate) egress_peer_channel_full: Counter,
/// Total number of hashes pending fetch.
pub(crate) hashes_pending_fetch: Gauge,
}

/// Metrics for Disconnection types
Expand Down
10 changes: 5 additions & 5 deletions crates/net/network/src/peers/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::{
error::{BackoffKind, SessionError},
peers::{
reputation::{is_banned_reputation, DEFAULT_REPUTATION},
ReputationChangeWeights, DEFAULT_MAX_CONCURRENT_DIALS, DEFAULT_MAX_PEERS_INBOUND,
DEFAULT_MAX_PEERS_OUTBOUND,
ReputationChangeWeights, DEFAULT_MAX_COUNT_CONCURRENT_DIALS,
DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND,
},
session::{Direction, PendingSessionHandshakeError},
swarm::NetworkConnectionState,
Expand Down Expand Up @@ -877,9 +877,9 @@ impl Default for ConnectionInfo {
ConnectionInfo {
num_outbound: 0,
num_inbound: 0,
max_outbound: DEFAULT_MAX_PEERS_OUTBOUND,
max_inbound: DEFAULT_MAX_PEERS_INBOUND,
max_concurrent_outbound_dials: DEFAULT_MAX_CONCURRENT_DIALS,
max_outbound: DEFAULT_MAX_COUNT_PEERS_OUTBOUND as usize,
max_inbound: DEFAULT_MAX_COUNT_PEERS_INBOUND as usize,
max_concurrent_outbound_dials: DEFAULT_MAX_COUNT_CONCURRENT_DIALS,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/net/network/src/peers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ pub use reputation::ReputationChangeWeights;
pub use reth_network_api::PeerKind;

/// Maximum number of available slots for outbound sessions.
pub(crate) const DEFAULT_MAX_PEERS_OUTBOUND: usize = 100;
pub(crate) const DEFAULT_MAX_COUNT_PEERS_OUTBOUND: u32 = 100;

/// Maximum number of available slots for inbound sessions.
pub(crate) const DEFAULT_MAX_PEERS_INBOUND: usize = 30;
pub(crate) const DEFAULT_MAX_COUNT_PEERS_INBOUND: u32 = 30;

/// Maximum number of available slots concurrent outgoing dials.
pub(crate) const DEFAULT_MAX_CONCURRENT_DIALS: usize = 10;
pub(crate) const DEFAULT_MAX_COUNT_CONCURRENT_DIALS: usize = 10;
6 changes: 4 additions & 2 deletions crates/net/network/src/session/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Configuration types for [SessionManager](crate::session::SessionManager).
use crate::{
peers::{DEFAULT_MAX_PEERS_INBOUND, DEFAULT_MAX_PEERS_OUTBOUND},
peers::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND},
session::{Direction, ExceedsSessionLimit},
};
use std::time::Duration;
Expand Down Expand Up @@ -52,7 +52,9 @@ impl Default for SessionsConfig {
// `poll`.
// The default is twice the maximum number of available slots, if all slots are occupied
// the buffer will have capacity for 3 messages per session (average).
session_event_buffer: (DEFAULT_MAX_PEERS_OUTBOUND + DEFAULT_MAX_PEERS_INBOUND) * 2,
session_event_buffer: (DEFAULT_MAX_COUNT_PEERS_OUTBOUND as usize +
DEFAULT_MAX_COUNT_PEERS_INBOUND as usize) *
2,
limits: Default::default(),
initial_internal_request_timeout: INITIAL_REQUEST_TIMEOUT,
protocol_breach_request_timeout: PROTOCOL_BREACH_REQUEST_TIMEOUT,
Expand Down
Loading

0 comments on commit c0f3d38

Please sign in to comment.