From ee3a49e886454a3a0ad2420e2719890c68b7cec1 Mon Sep 17 00:00:00 2001 From: Kai Hudalla Date: Wed, 16 Oct 2024 08:09:33 +0200 Subject: [PATCH] Add support for specifying Zenoh session's connection timeout --- components/Cargo.lock | 1 + components/fms-consumer/Cargo.toml | 1 + components/fms-consumer/src/main.rs | 31 +++++++++++++++++++++++++---- 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/components/Cargo.lock b/components/Cargo.lock index df83c1e..5f174f7 100644 --- a/components/Cargo.lock +++ b/components/Cargo.lock @@ -952,6 +952,7 @@ version = "0.1.0-SNAPSHOT" dependencies = [ "async-trait", "clap", + "duration-str", "env_logger", "fms-proto", "futures", diff --git a/components/fms-consumer/Cargo.toml b/components/fms-consumer/Cargo.toml index 682a6d1..e9829ed 100644 --- a/components/fms-consumer/Cargo.toml +++ b/components/fms-consumer/Cargo.toml @@ -39,6 +39,7 @@ clap = { workspace = true, features = [ "error-context", "suggestions", ] } +duration-str = { workspace = true } env_logger = { workspace = true } fms-proto = { workspace = true } futures = { version = "0.3" } diff --git a/components/fms-consumer/src/main.rs b/components/fms-consumer/src/main.rs index 9217015..33ecc33 100644 --- a/components/fms-consumer/src/main.rs +++ b/components/fms-consumer/src/main.rs @@ -82,7 +82,10 @@ fn parse_zenoh_args(args: &ArgMatches) -> Config { .set_enabled(Some(*values)) .unwrap(); } - + if let Some(values) = args.get_one::("session-timeout") { + let millis = u64::try_from(values.as_millis()).unwrap_or(u64::MAX); + config.scouting.set_timeout(Some(millis)).unwrap(); + } config } @@ -310,10 +313,20 @@ async fn run_async_processor_zenoh(args: &ArgMatches) { let config = parse_zenoh_args(zenoh_args); info!("Opening session..."); - let session = zenoh::open(config).res().await.unwrap(); + let session = zenoh::open(config).res().await.unwrap_or_else(|e| { + error!("failed to open Zenoh session: {e}"); + process::exit(1); + }); info!("Declaring Subscriber on '{}'...", &KEY_EXPR); - let subscriber = session.declare_subscriber(KEY_EXPR).res().await.unwrap(); + let subscriber = session + .declare_subscriber(KEY_EXPR) + .res() + .await + .unwrap_or_else(|e| { + error!("failed to create Zenoh subscriber: {e}"); + process::exit(1); + }); loop { select!( sample = subscriber.recv_async() => { @@ -342,7 +355,8 @@ pub async fn main() { .subcommand_required(true) .subcommand( Command::new(SUBCOMMAND_HONO) - .about("Forwards VSS data to an Influx DB server from Hono's north bound Kafka API").arg( + .about("Forwards VSS data to an Influx DB server from Hono's north bound Kafka API") + .arg( Arg::new(PARAM_KAFKA_PROPERTIES_FILE) .value_parser(clap::builder::NonEmptyStringValueParser::new()) .long(PARAM_KAFKA_PROPERTIES_FILE) @@ -404,6 +418,15 @@ pub async fn main() { .short('c') .help("A configuration file.") .required(false), + ) + .arg( + Arg::new("session-timeout") + .value_parser(|s: &str| duration_str::parse(s)) + .long("session-timeout") + .help("The time to wait for establishment of a Zenoh session, e.g. 10s.") + .value_name("DURATION_SPEC") + .required(false) + .default_value("20s") ), );