Skip to content

Commit

Permalink
chore(gossispsub): deprecate future-ticker
Browse files Browse the repository at this point in the history
  • Loading branch information
João Oliveira committed Nov 12, 2024
1 parent 4192fc3 commit 1ce8186
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 42 deletions.
41 changes: 15 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ bytes = "1.6"
either = "1.11"
fnv = "1.0.7"
futures = { workspace = true }
futures-ticker = "0.0.3"
getrandom = "0.2.15"
hex_fmt = "0.3.0"
web-time = { workspace = true }
Expand All @@ -40,6 +39,7 @@ void = "1.0.2"

# Metrics dependencies
prometheus-client = { workspace = true }
futures-timer = "3.0.3"

[dev-dependencies]
async-std = { version = "1.6.3", features = ["unstable"] }
Expand Down
25 changes: 12 additions & 13 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use std::{
time::Duration,
};

use futures::StreamExt;
use futures_ticker::Ticker;
use futures::FutureExt;
use futures_timer::Delay;
use prometheus_client::registry::Registry;
use rand::{seq::SliceRandom, thread_rng};

Expand Down Expand Up @@ -283,7 +283,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
mcache: MessageCache,

/// Heartbeat interval stream.
heartbeat: Ticker,
heartbeat: Delay,

/// Number of heartbeats since the beginning of time; this allows us to amortize some resource
/// clean up -- eg backoff clean up.
Expand All @@ -301,7 +301,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {

/// Stores optional peer score data together with thresholds, decay interval and gossip
/// promises.
peer_score: Option<(PeerScore, PeerScoreThresholds, Ticker, GossipPromises)>,
peer_score: Option<(PeerScore, PeerScoreThresholds, Delay, GossipPromises)>,

/// Counts the number of `IHAVE` received from each peer since the last heartbeat.
count_received_ihave: HashMap<PeerId, usize>,
Expand Down Expand Up @@ -448,10 +448,7 @@ where
config.backoff_slack(),
),
mcache: MessageCache::new(config.history_gossip(), config.history_length()),
heartbeat: Ticker::new_with_next(
config.heartbeat_interval(),
config.heartbeat_initial_delay(),
),
heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
heartbeat_ticks: 0,
px_peers: HashSet::new(),
outbound_peers: HashSet::new(),
Expand Down Expand Up @@ -879,7 +876,7 @@ where
return Err("Peer score set twice".into());
}

let interval = Ticker::new(params.decay_interval);
let interval = Delay::new(params.decay_interval);
let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default()));
Ok(())
Expand Down Expand Up @@ -1145,7 +1142,7 @@ where
}

fn score_below_threshold_from_scores(
peer_score: &Option<(PeerScore, PeerScoreThresholds, Ticker, GossipPromises)>,
peer_score: &Option<(PeerScore, PeerScoreThresholds, Delay, GossipPromises)>,
peer_id: &PeerId,
threshold: impl Fn(&PeerScoreThresholds) -> f64,
) -> (bool, f64) {
Expand Down Expand Up @@ -3105,14 +3102,16 @@ where
}

// update scores
if let Some((peer_score, _, interval, _)) = &mut self.peer_score {
while let Poll::Ready(Some(_)) = interval.poll_next_unpin(cx) {
if let Some((peer_score, _, delay, _)) = &mut self.peer_score {
if delay.poll_unpin(cx).is_ready() {
peer_score.refresh_scores();
delay.reset(peer_score.params.decay_interval);
}
}

while let Poll::Ready(Some(_)) = self.heartbeat.poll_next_unpin(cx) {
if self.heartbeat.poll_unpin(cx).is_ready() {
self.heartbeat();
self.heartbeat.reset(self.config.heartbeat_interval());
}

Poll::Pending
Expand Down
5 changes: 3 additions & 2 deletions protocols/gossipsub/src/peer_score.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ mod tests;
const TIME_CACHE_DURATION: u64 = 120;

pub(crate) struct PeerScore {
params: PeerScoreParams,
/// The score parameters.
pub(crate) params: PeerScoreParams,
/// The stats per PeerId.
peer_stats: HashMap<PeerId, PeerStats>,
/// Tracking peers per IP.
peer_ips: HashMap<IpAddr, HashSet<PeerId>>,
/// Message delivery tracking. This is a time-cache of [`DeliveryRecord`]s.
deliveries: TimeCache<MessageId, DeliveryRecord>,
/// callback for monitoring message delivery times
/// Callback for monitoring message delivery times.
message_delivery_time_callback: Option<fn(&PeerId, &TopicHash, f64)>,
}

Expand Down

0 comments on commit 1ce8186

Please sign in to comment.