Skip to content

Commit

Permalink
add disabled heartbeat option on diode-receive side, add doc and log
Browse files Browse the repository at this point in the history
messages
  • Loading branch information
ad-anssi committed Jun 26, 2023
1 parent cfd2bc4 commit 8e2f9ca
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 24 deletions.
10 changes: 6 additions & 4 deletions src/bin/diode-receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct Config {
flush_timeout: time::Duration,
nb_decoding_threads: u8,
to: ClientConfig,
heartbeat: time::Duration,
heartbeat: Option<time::Duration>,
}

enum ClientConfig {
Expand Down Expand Up @@ -118,7 +118,7 @@ fn command_args() -> Config {
.value_name("nb_seconds")
.default_value("10")
.value_parser(clap::value_parser!(u16))
.help("Maximum duration expected between heartbeat messages"),
.help("Maximum duration expected between heartbeat messages, 0 to disable"),
)
.get_matches();

Expand All @@ -141,8 +141,10 @@ fn command_args() -> Config {
.get_one::<String>("to_unix")
.map(|s| path::PathBuf::from_str(s).expect("to_unix must point to a valid path"));

let heartbeat =
time::Duration::from_secs(*args.get_one::<u16>("heartbeat").expect("default") as u64);
let heartbeat = {
let hb = *args.get_one::<u16>("heartbeat").expect("default") as u64;
(hb != 0).then(|| time::Duration::from_secs(hb))
};

let to = if let Some(to_tcp) = to_tcp {
ClientConfig::Tcp(to_tcp)
Expand Down
10 changes: 6 additions & 4 deletions src/bin/diode-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct Config {
to_bind: net::SocketAddr,
to_udp: net::SocketAddr,
to_udp_mtu: u16,
heartbeat: time::Duration,
heartbeat: Option<time::Duration>,
}

fn command_args() -> Config {
Expand Down Expand Up @@ -109,7 +109,7 @@ fn command_args() -> Config {
.value_name("nb_seconds")
.default_value("5")
.value_parser(clap::value_parser!(u16))
.help("Duration between two emitted heartbeat messages"),
.help("Duration between two emitted heartbeat messages, 0 to disable"),
)
.get_matches();

Expand All @@ -133,8 +133,10 @@ fn command_args() -> Config {
let to_udp = net::SocketAddr::from_str(args.get_one::<String>("to_udp").expect("default"))
.expect("invalid to_udp parameter");
let to_udp_mtu = *args.get_one::<u16>("to_udp_mtu").expect("default");
let heartbeat =
time::Duration::from_secs(*args.get_one::<u16>("heartbeat").expect("default") as u64);
let heartbeat = {
let hb = *args.get_one::<u16>("heartbeat").expect("default") as u64;
(hb != 0).then(|| time::Duration::from_secs(hb))
};

Config {
from_tcp,
Expand Down
25 changes: 13 additions & 12 deletions src/receive/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,21 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E
let mut last_heartbeat = time::Instant::now();

loop {
let message = match receiver
.for_dispatch
.recv_timeout(receiver.config.heartbeat_interval)
{
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
if last_heartbeat.elapsed() > receiver.config.heartbeat_interval {
log::warn!(
"no heartbeat message received during the last {} second(s)",
receiver.config.heartbeat_interval.as_secs()
);
let message = if let Some(hb_interval) = receiver.config.heartbeat_interval {
match receiver.for_dispatch.recv_timeout(hb_interval) {
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
if last_heartbeat.elapsed() > hb_interval {
log::warn!(
"no heartbeat message received during the last {} second(s)",
hb_interval.as_secs()
);
}
continue;
}
continue;
other => other?,
}
other => other?,
} else {
receiver.for_dispatch.recv()?
};

log::trace!("received {message}");
Expand Down
11 changes: 10 additions & 1 deletion src/receive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct Config {
pub repair_block_size: u32,
pub flush_timeout: time::Duration,
pub nb_decoding_threads: u8,
pub heartbeat_interval: time::Duration,
pub heartbeat_interval: Option<time::Duration>,
}

impl Config {
Expand Down Expand Up @@ -252,6 +252,15 @@ where
self.config.flush_timeout.as_millis()
);

if let Some(hb_interval) = self.config.heartbeat_interval {
log::info!(
"heartbeat interval is set to {} seconds",
hb_interval.as_secs()
);
} else {
log::info!("heartbeat is disabled");
}

for i in 0..self.config.nb_clients {
thread::Builder::new()
.name(format!("client_{i}"))
Expand Down
3 changes: 2 additions & 1 deletion src/send/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use crate::{protocol, send};

pub(crate) fn start<C>(sender: &send::Sender<C>) -> Result<(), send::Error> {
let alarm = crossbeam_channel::tick(sender.config.hearbeat_interval);
let alarm =
crossbeam_channel::tick(sender.config.hearbeat_interval.expect("heartbeat enabled"));

loop {
sender.to_encoding.send(protocol::Message::new(
Expand Down
10 changes: 8 additions & 2 deletions src/send/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct Config {
pub encoding_block_size: u64,
pub repair_block_size: u32,
pub nb_encoding_threads: u8,
pub hearbeat_interval: time::Duration,
pub hearbeat_interval: Option<time::Duration>,
pub to_bind: net::SocketAddr,
pub to_udp: net::SocketAddr,
pub to_mtu: u16,
Expand Down Expand Up @@ -211,10 +211,16 @@ where
.spawn_scoped(scope, || encoding::start(self))?;
}

if !self.config.hearbeat_interval.is_zero() {
if let Some(hb_interval) = self.config.hearbeat_interval {
log::info!(
"heartbeat message will be sent every {} seconds",
hb_interval.as_secs()
);
thread::Builder::new()
.name("heartbeat".into())
.spawn_scoped(scope, || heartbeat::start(self))?;
} else {
log::info!("heartbeat is disabled");
}

for i in 0..self.config.nb_clients {
Expand Down

0 comments on commit 8e2f9ca

Please sign in to comment.