Skip to content

Commit

Permalink
Add support for specifying Zenoh session's connection timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
sophokles73 authored and eriksven committed Oct 18, 2024
1 parent 0eda8db commit ee3a49e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
1 change: 1 addition & 0 deletions components/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/fms-consumer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ clap = { workspace = true, features = [
"error-context",
"suggestions",
] }
duration-str = { workspace = true }
env_logger = { workspace = true }
fms-proto = { workspace = true }
futures = { version = "0.3" }
Expand Down
31 changes: 27 additions & 4 deletions components/fms-consumer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ fn parse_zenoh_args(args: &ArgMatches) -> Config {
.set_enabled(Some(*values))
.unwrap();
}

if let Some(values) = args.get_one::<Duration>("session-timeout") {
let millis = u64::try_from(values.as_millis()).unwrap_or(u64::MAX);
config.scouting.set_timeout(Some(millis)).unwrap();
}
config
}

Expand Down Expand Up @@ -310,10 +313,20 @@ async fn run_async_processor_zenoh(args: &ArgMatches) {
let config = parse_zenoh_args(zenoh_args);

info!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).res().await.unwrap_or_else(|e| {
error!("failed to open Zenoh session: {e}");
process::exit(1);
});

info!("Declaring Subscriber on '{}'...", &KEY_EXPR);
let subscriber = session.declare_subscriber(KEY_EXPR).res().await.unwrap();
let subscriber = session
.declare_subscriber(KEY_EXPR)
.res()
.await
.unwrap_or_else(|e| {
error!("failed to create Zenoh subscriber: {e}");
process::exit(1);
});
loop {
select!(
sample = subscriber.recv_async() => {
Expand Down Expand Up @@ -342,7 +355,8 @@ pub async fn main() {
.subcommand_required(true)
.subcommand(
Command::new(SUBCOMMAND_HONO)
.about("Forwards VSS data to an Influx DB server from Hono's north bound Kafka API").arg(
.about("Forwards VSS data to an Influx DB server from Hono's north bound Kafka API")
.arg(
Arg::new(PARAM_KAFKA_PROPERTIES_FILE)
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long(PARAM_KAFKA_PROPERTIES_FILE)
Expand Down Expand Up @@ -404,6 +418,15 @@ pub async fn main() {
.short('c')
.help("A configuration file.")
.required(false),
)
.arg(
Arg::new("session-timeout")
.value_parser(|s: &str| duration_str::parse(s))
.long("session-timeout")
.help("The time to wait for establishment of a Zenoh session, e.g. 10s.")
.value_name("DURATION_SPEC")
.required(false)
.default_value("20s")
),
);

Expand Down

0 comments on commit ee3a49e

Please sign in to comment.