diff --git a/README.md b/README.md index 0e7e752..2bde456 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,22 @@ of an Eclipse Hono instance as shown in the diagram below. The second compose file specified on the command line will also start the [FMS Consumer](./components/fms-consumer) back end component which receives vehicle data via Hono's north bound Kafka based Telemetry API and writes it to the Influx DB. + +# Using Eclipse Zenoh to geographically distribute the vehicle data + +The blueprint supports configuring the FMS Forwarder to send vehicle data to the Eclipse Zenoh router of an [Eclipse Zenoh](https://zenoh.io/) instance as shown in the diagram below. +Zenoh router provides a plugin mechanism to other protocols to enable the Vehicle to anything communication. + + + +Start up the vehicle and back end services using Docker Compose: + +```sh +docker compose -f ./fms-blueprint-compose.yaml -f ./fms-blueprint-compose-zenoh.yaml up --detach +``` + +Once all services have been started, the current vehicle status can be viewed on a [Grafana dashboard](http://127.0.0.1:3000), +using *admin*/*admin* as username and password for logging in. # Manual configuration diff --git a/components/Cargo.toml b/components/Cargo.toml index 4023f3d..78aac83 100644 --- a/components/Cargo.toml +++ b/components/Cargo.toml @@ -50,6 +50,7 @@ prost = { version = "0.12" } prost-types = { version = "0.12" } # tokio does not enable features by default tokio = { version = "1.36" } +zenoh = {version = "0.10.1-rc"} [profile.release] lto = true # Link time optimization (dead code removal etc...) diff --git a/components/fms-consumer/Cargo.toml b/components/fms-consumer/Cargo.toml index ad43aa8..38b9930 100644 --- a/components/fms-consumer/Cargo.toml +++ b/components/fms-consumer/Cargo.toml @@ -29,15 +29,17 @@ readme.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = { workspace = true } clap = { workspace = true, features = ["std", "env", "color", "help", "usage", "error-context", "suggestions"] } env_logger = { workspace = true } fms-proto = { workspace = true } -futures = "0.3" +futures = {version ="0.3"} influx-client = { workspace = true, features = ["writer"] } rdkafka = { version = "0.36", default-features = false, features = ["libz", "tokio", "cmake-build", "ssl-vendored"] } log = { workspace = true } protobuf = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] } +zenoh = { workspace = true } [build-dependencies] # see https://github.com/fede1024/rust-rdkafka/issues/572#issuecomment-1529316876 diff --git a/components/fms-consumer/README.md b/components/fms-consumer/README.md new file mode 100644 index 0000000..afcbb34 --- /dev/null +++ b/components/fms-consumer/README.md @@ -0,0 +1,58 @@ + +The FMS Consumer receives vehicle data either via Hono's north bound Kafka based Telemetry API or Zenoh and writes it to the Influx DB. + + +# Building + +Building the consumer requires a [Rust development toolchain](https://rustup.rs/). + +# Running + +The FMS Consumer receives vehicle data either via Hono's north bound Kafka based Telemetry API or Zenoh and writes it to the Influx DB. The type of source can be selected by means of command line arguments when starting the consumer. + +Please refer to the command line help for details: + +```sh +fms-consumer --help +``` + +## Receive data from Hono's north bound Kafka based Telemetry API + +The consumer can receive data from Hono's north bound Kafka based Telemetry API + +Please refer to the command line help for details: + +```sh +fms-consumer hono --help +``` + + +## Receive data from Zenoh + +The consumer can receive data from the Zenoh router of an [Eclipse Zenoh](https://projects.eclipse.org/projects/iot.zenoh/) instance. +For this to work, the consumer needs to be configured with the Zenoh router end points. + +Please refer to the command line help for details: + +```sh +fms-consumer zenoh --help +``` diff --git a/components/fms-consumer/src/main.rs b/components/fms-consumer/src/main.rs index e6e1470..9217015 100644 --- a/components/fms-consumer/src/main.rs +++ b/components/fms-consumer/src/main.rs @@ -36,6 +36,10 @@ use rdkafka::consumer::Consumer; use rdkafka::message::{BorrowedHeaders, BorrowedMessage, Headers}; use rdkafka::{ClientConfig, Message}; +use futures::select; +use zenoh::config::Config; +use zenoh::prelude::r#async::*; + const CONTENT_TYPE_PROTOBUF: &str = "application/vnd.google.protobuf"; const HEADER_NAME_ORIG_ADDRESS: &str = "orig_address"; @@ -43,6 +47,45 @@ const HEADER_NAME_ORIG_ADDRESS: &str = "orig_address"; const PARAM_KAFKA_PROPERTIES_FILE: &str = "kafka-properties-file"; const PARAM_KAFKA_TOPIC_NAME: &str = "kafka-topic"; +const SUBCOMMAND_HONO: &str = "hono"; +const SUBCOMMAND_ZENOH: &str = "zenoh"; + +const KEY_EXPR: &str = "fms/vehicleStatus"; + +fn parse_zenoh_args(args: &ArgMatches) -> Config { + let mut config: Config = if let Some(conf_file) = args.get_one::("config") { + Config::from_file(conf_file).unwrap() + } else { + Config::default() + }; + + if let Some(mode) = args.get_one::("mode") { + config.set_mode(Some(*mode)).unwrap(); + } + + if let Some(values) = args.get_many::("connect") { + config + .connect + .endpoints + .extend(values.map(|v| v.parse().unwrap())) + } + if let Some(values) = args.get_many::("listen") { + config + .listen + .endpoints + .extend(values.map(|v| v.parse().unwrap())) + } + if let Some(values) = args.get_one::("no-multicast-scouting") { + config + .scouting + .multicast + .set_enabled(Some(*values)) + .unwrap(); + } + + config +} + fn add_property_bag_to_map(property_bag: String, headers: &mut HashMap) { property_bag.split('&').for_each(|p| { trace!("processing property: {p}"); @@ -148,7 +191,15 @@ async fn process_protobuf_message( } } -async fn process_message(m: &BorrowedMessage<'_>, influx_writer: Arc) { +async fn process_zenoh_message(payload: &[u8], influx_writer: Arc) { + if let Some(vehicle_status) = deserialize_vehicle_status(payload) { + influx_writer.write_vehicle_status(&vehicle_status).await; + } else { + debug!("ignoring message without payload"); + } +} + +async fn process_hono_message(m: &BorrowedMessage<'_>, influx_writer: Arc) { if let Some(headers) = m.headers() { let message_properties = get_headers_as_map(headers); match ( @@ -167,7 +218,7 @@ async fn process_message(m: &BorrowedMessage<'_>, influx_writer: Arc(PARAM_KAFKA_PROPERTIES_FILE).unwrap()) - .unwrap_or_else(|e| { - error!("failed to create Kafka client: {e}"); - process::exit(1); - }); + let hono_args = args.subcommand_matches(SUBCOMMAND_HONO).unwrap(); + let mut client_config = get_kafka_client_config( + hono_args + .get_one::(PARAM_KAFKA_PROPERTIES_FILE) + .unwrap(), + ) + .unwrap_or_else(|e| { + error!("failed to create Kafka client: {e}"); + process::exit(1); + }); // Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`. let consumer: StreamConsumer = client_config @@ -192,7 +247,7 @@ async fn run_async_processor(args: &ArgMatches) { process::exit(1); }); - let topic_name = args.get_one::(PARAM_KAFKA_TOPIC_NAME).unwrap(); + let topic_name = hono_args.get_one::(PARAM_KAFKA_TOPIC_NAME).unwrap(); match consumer.fetch_metadata(Some(topic_name), Duration::from_secs(10)) { Err(e) => { @@ -230,7 +285,7 @@ async fn run_async_processor(args: &ArgMatches) { .try_for_each(|borrowed_message| { let cloned_writer = influx_writer.clone(); async move { - process_message(&borrowed_message, cloned_writer).await; + process_hono_message(&borrowed_message, cloned_writer).await; Ok(()) } }) @@ -243,6 +298,32 @@ async fn run_async_processor(args: &ArgMatches) { } } +async fn run_async_processor_zenoh(args: &ArgMatches) { + let influx_writer = InfluxWriter::new(args).map_or_else( + |e| { + error!("failed to create InfluxDB writer: {e}"); + process::exit(1); + }, + Arc::new, + ); + let zenoh_args = args.subcommand_matches(SUBCOMMAND_ZENOH).unwrap(); + let config = parse_zenoh_args(zenoh_args); + + info!("Opening session..."); + let session = zenoh::open(config).res().await.unwrap(); + + info!("Declaring Subscriber on '{}'...", &KEY_EXPR); + let subscriber = session.declare_subscriber(KEY_EXPR).res().await.unwrap(); + loop { + select!( + sample = subscriber.recv_async() => { + let sample = sample.unwrap(); + let cloned_writer = influx_writer.clone(); + process_zenoh_message(&sample.value.payload.contiguous(), cloned_writer).await; + } + ); + } +} #[tokio::main] pub async fn main() { env_logger::init(); @@ -251,14 +332,22 @@ pub async fn main() { .unwrap_or(option_env!("VERGEN_GIT_SHA").unwrap_or("unknown")); let mut parser = Command::new("FMS data consumer") + .arg_required_else_help(true) .version(version) - .about("Receives FMS related VSS data points via Hono's Kafka based Telemetry API and writes them to an InfluxDB server") - .arg( + .about("Receives FMS related VSS data points via Hono's Kafka based Telemetry API or Eclipse Zenoh instance and writes them to an InfluxDB server"); + + parser = influx_client::connection::add_command_line_args(parser); + + parser = parser + .subcommand_required(true) + .subcommand( + Command::new(SUBCOMMAND_HONO) + .about("Forwards VSS data to an Influx DB server from Hono's north bound Kafka API").arg( Arg::new(PARAM_KAFKA_PROPERTIES_FILE) .value_parser(clap::builder::NonEmptyStringValueParser::new()) .long(PARAM_KAFKA_PROPERTIES_FILE) .help("The path to a file containing Kafka client properties for connecting to the Kafka broker(s).") - .action(ArgAction::Set) + .action(ArgAction::Set) .value_name("PATH") .env("KAFKA_PROPERTIES_FILE") .required(true), @@ -272,10 +361,70 @@ pub async fn main() { .value_name("TOPIC") .required(true) .env("KAFKA_TOPIC_NAME"), + ), + ) + .subcommand( + Command::new(SUBCOMMAND_ZENOH) + .about("Forwards VSS data to an Influx DB server from Eclipse Zenoh") + .arg( + Arg::new("mode") + .value_parser(clap::value_parser!(WhatAmI)) + .long("mode") + .short('m') + .help("The Zenoh session mode (peer by default).") + .required(false), + ) + .arg( + Arg::new("connect") + .value_parser(clap::builder::NonEmptyStringValueParser::new()) + .long("connect") + .short('e') + .help("Endpoints to connect to.") + .required(false), + ) + .arg( + Arg::new("listen") + .value_parser(clap::builder::NonEmptyStringValueParser::new()) + .long("listen") + .short('l') + .help("Endpoints to listen on.") + .required(false), + ) + .arg( + Arg::new("no-multicast-scouting") + .long("no-multicast-scouting") + .help("Disable the multicast-based scouting mechanism.") + .action(clap::ArgAction::SetFalse) + .required(false), + ) + .arg( + Arg::new("config") + .value_parser(clap::builder::NonEmptyStringValueParser::new()) + .long("config") + .short('c') + .help("A configuration file.") + .required(false), + ), ); - parser = influx_client::connection::add_command_line_args(parser); let args = parser.get_matches(); - info!("starting FMS data consumer"); - run_async_processor(&args).await + + match args.subcommand_name() { + Some(SUBCOMMAND_HONO) => { + info!("starting FMS data consumer for Hono"); + run_async_processor_hono(&args).await + } + Some(SUBCOMMAND_ZENOH) => { + info!("starting FMS data consumer for Zenoh"); + run_async_processor_zenoh(&args).await + } + Some(_) => { + // cannot happen because subcommand is required + process::exit(1); + } + None => { + // cannot happen because subcommand is required + process::exit(1); + } + }; } diff --git a/components/fms-forwarder/Cargo.toml b/components/fms-forwarder/Cargo.toml index d0f0c1d..d714490 100644 --- a/components/fms-forwarder/Cargo.toml +++ b/components/fms-forwarder/Cargo.toml @@ -45,9 +45,8 @@ env_logger = { workspace = true } fms-proto = { workspace = true } influx-client = { workspace = true, features = ["writer"] } log = { workspace = true } -paho-mqtt = { version = "0.12", default-features = false, features = [ - "vendored-ssl", -] } +paho-mqtt = { version = "0.12", default-features = false, features = [ "vendored-ssl" ] } +zenoh = { workspace = true } protobuf = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } diff --git a/components/fms-forwarder/README.md b/components/fms-forwarder/README.md index 04df059..cb515ba 100644 --- a/components/fms-forwarder/README.md +++ b/components/fms-forwarder/README.md @@ -62,3 +62,15 @@ Please refer to the command line help for details: ```sh fms-forwarder hono --help ``` + + +## Publishing to Zenoh + +The forwarder can publish status information to the Zenoh router of an [Eclipse Zenoh](https://projects.eclipse.org/projects/iot.zenoh/) instance. +For this to work, the forwarder needs to be configured with the Zenoh router end points. + +Please refer to the command line help for details: + +```sh +fms-forwarder zenoh --help +``` diff --git a/components/fms-forwarder/src/main.rs b/components/fms-forwarder/src/main.rs index d19d605..b734227 100644 --- a/components/fms-forwarder/src/main.rs +++ b/components/fms-forwarder/src/main.rs @@ -26,14 +26,17 @@ use influx_client::writer::InfluxWriter; use log::{error, info}; use status_publishing::StatusPublisher; use tokio::sync::mpsc; +use zenoh_publisher::ZenohPublisher; mod hono_publisher; mod mqtt_connection; mod status_publishing; mod vehicle_abstraction; +mod zenoh_publisher; const SUBCOMMAND_HONO: &str = "hono"; const SUBCOMMAND_INFLUX: &str = "influx"; +const SUBCOMMAND_ZENOH: &str = "zenoh"; #[tokio::main] async fn main() -> Result<(), Box> { @@ -53,6 +56,9 @@ async fn main() -> Result<(), Box> { )) .subcommand(influx_client::connection::add_command_line_args( Command::new(SUBCOMMAND_INFLUX).about("Forwards VSS data to an Influx DB server"), + )) + .subcommand(zenoh_publisher::add_command_line_args( + Command::new(SUBCOMMAND_ZENOH).about("Forwards VSS data to Zenoh"), )); let args = parser.get_matches(); @@ -78,6 +84,16 @@ async fn main() -> Result<(), Box> { } } } + Some(SUBCOMMAND_ZENOH) => { + let zenoh_args = args.subcommand_matches(SUBCOMMAND_ZENOH).unwrap(); + match ZenohPublisher::new(zenoh_args).await { + Ok(writer) => Box::new(writer), + Err(e) => { + error!("failed to create Zenoh Publisher: {e}"); + process::exit(1); + } + } + } Some(_) => { // cannot happen because subcommand is required process::exit(1); diff --git a/components/fms-forwarder/src/zenoh_publisher.rs b/components/fms-forwarder/src/zenoh_publisher.rs new file mode 100644 index 0000000..c978aea --- /dev/null +++ b/components/fms-forwarder/src/zenoh_publisher.rs @@ -0,0 +1,127 @@ +use crate::status_publishing::StatusPublisher; +use async_trait::async_trait; +use clap::{Arg, ArgMatches, Command}; +use fms_proto::fms::VehicleStatus; +use log::{debug, warn}; +use protobuf::Message; +use std::sync::Arc; +use zenoh::config::Config; +use zenoh::prelude::sync::*; +use zenoh::publication::Publisher; + +const KEY_EXPR: &str = "fms/vehicleStatus"; + +pub fn add_command_line_args(command: Command) -> Command { + command + .arg( + Arg::new("mode") + .value_parser(clap::value_parser!(WhatAmI)) + .long("mode") + .short('m') + .help("The Zenoh session mode (peer by default).") + .required(false), + ) + .arg( + Arg::new("connect") + .value_parser(clap::builder::NonEmptyStringValueParser::new()) + .long("connect") + .short('e') + .help("Endpoints to connect to.") + .required(false), + ) + .arg( + Arg::new("listen") + .value_parser(clap::builder::NonEmptyStringValueParser::new()) + .long("listen") + .short('l') + .help("Endpoints to listen on.") + .required(false), + ) + .arg( + Arg::new("no-multicast-scouting") + .long("no-multicast-scouting") + .help("Disable the multicast-based scouting mechanism.") + .action(clap::ArgAction::SetFalse) + .required(false), + ) + .arg( + Arg::new("config") + .value_parser(clap::builder::NonEmptyStringValueParser::new()) + .long("config") + .short('c') + .help("A configuration file.") + .required(false), + ) +} + +pub fn parse_args(args: &ArgMatches) -> Config { + let mut config: Config = if let Some(conf_file) = args.get_one::("config") { + Config::from_file(conf_file).unwrap() + } else { + Config::default() + }; + + if let Some(mode) = args.get_one::("mode") { + config.set_mode(Some(*mode)).unwrap(); + } + + if let Some(values) = args.get_many::("connect") { + config + .connect + .endpoints + .extend(values.map(|v| v.parse().unwrap())) + } + if let Some(values) = args.get_many::("listen") { + config + .listen + .endpoints + .extend(values.map(|v| v.parse().unwrap())) + } + if let Some(values) = args.get_one::("no-multicast-scouting") { + config + .scouting + .multicast + .set_enabled(Some(*values)) + .unwrap(); + } + + config +} + +pub struct ZenohPublisher<'a> { + // publisher + publisher: Publisher<'a>, +} + +impl<'a> ZenohPublisher<'a> { + pub async fn new(args: &ArgMatches) -> Result, Box> { + let config = parse_args(args); + let session = Arc::new(zenoh::open(config).res().unwrap()); + let publisher = session.declare_publisher(KEY_EXPR).res().unwrap(); + Ok(ZenohPublisher { + // publisher + publisher, + }) + } +} + +#[async_trait] +impl<'a> StatusPublisher for ZenohPublisher<'a> { + async fn publish_vehicle_status(&self, vehicle_status: &VehicleStatus) { + match vehicle_status.write_to_bytes() { + Ok(payload) => { + match self.publisher.put(payload).res() { + Ok(_t) => debug!("successfully published vehicle status to Zenoh",), + Err(e) => { + warn!("error publishing vehicle status to Zenoh: {}", e); + } + }; + return; + } + Err(e) => warn!( + "error serializing vehicle status to protobuf message: {}", + e + ), + } + } +} diff --git a/fms-blueprint-compose-hono.yaml b/fms-blueprint-compose-hono.yaml index 8a97976..35b103f 100644 --- a/fms-blueprint-compose-hono.yaml +++ b/fms-blueprint-compose-hono.yaml @@ -19,6 +19,7 @@ services: fms-consumer: + command: "hono" image: "ghcr.io/eclipse-sdv-blueprints/fleet-management/fms-consumer:main" build: context: "./components" diff --git a/fms-blueprint-compose-zenoh.yaml b/fms-blueprint-compose-zenoh.yaml new file mode 100644 index 0000000..980e038 --- /dev/null +++ b/fms-blueprint-compose-zenoh.yaml @@ -0,0 +1,52 @@ +services: + zenoh: + command: -c /zenoh-config.json5 + environment: + RUST_LOG: zenoh=info + image: eclipse/zenoh:0.10.1-rc + container_name: "fms-zenoh-router" + networks: + - "fms-backend" + - "fms-vehicle" + ports: + - 7447:7447/tcp + - 1883:1883/tcp + - 8000:8000/tcp + restart: unless-stopped + volumes: + - ./zenoh-config.json5:/zenoh-config.json5 + + fms-forwarder: + command: "zenoh -m client" + depends_on: + zenoh: + condition: service_started + + fms-consumer: + command: "zenoh -m client" + image: "ghcr.io/eclipse-sdv-blueprints/fleet-management/fms-consumer:main" + build: + context: "./components" + dockerfile: "Dockerfile.fms-consumer" + container_name: "fms-consumer" + cap_drop: + - CAP_MKNOD + - CAP_NET_RAW + - CAP_AUDIT_WRITE + networks: + - "fms-backend" + depends_on: + influxdb: + condition: service_healthy + zenoh: + condition: service_started + env_file: + - "./influxdb/fms-demo.env" + environment: + INFLUXDB_TOKEN_FILE: "/tmp/fms-demo.token" + RUST_LOG: "${FMS_CONSUMER_LOG_CONFIG:-info,fms_consumer=debug,influx_client=debug}" + volumes: + - type: "volume" + source: "influxdb-auth" + target: "/tmp" + read_only: true diff --git a/img/architecture-zenoh.drawio.svg b/img/architecture-zenoh.drawio.svg new file mode 100644 index 0000000..e9c0d21 --- /dev/null +++ b/img/architecture-zenoh.drawio.svg @@ -0,0 +1,4 @@ + + + +



Databroker
Databroker...
FMS
Forwarder
FMS...


<<get dashboard>>
<<get dashboard>>
Browser
Browser
FMS
Server
FMS...
Fleet
Management
Fleet...
Influx API 2.0
I...
rFMS
rFMS
Influx API 2.0
I...
kuksa.val.v1
kuksa.val.v1
VSS overlay
VSS overlay
In-Vehicle
In-Vehicle
Off-Vehicle
Off-Vehicle



CSV Provider
CSV Provider...
CSV recording from truck
CSV recording...
kuksa.val.v1  
kuksa.val.v1  
FMS
Consumer
FMS...
Zenoh
Ze...

Zenoh
Router
Zenoh...
Zenoh
or
MQTT
Zenoh...
Influx API 2.0
Influx API 2.0
Text is not SVG - cannot display
\ No newline at end of file diff --git a/zenoh-config.json5 b/zenoh-config.json5 new file mode 100644 index 0000000..b37d95e --- /dev/null +++ b/zenoh-config.json5 @@ -0,0 +1,89 @@ +{ + plugins: { + //// + //// MQTT related configuration + //// All settings are optional and are unset by default - uncomment the ones you want to set + //// + mqtt: { + //// + //// port: The address to bind the MQTT server. Default: "0.0.0.0:1883". Accepted values:' + //// - a port number ("0.0.0.0" will be used as IP to bind, meaning any interface of the host) + //// - a string with format `:` (to bind the MQTT server to a specific interface). + //// + // port: "0.0.0.0:1883", + + //// + //// scope: A string added as prefix to all routed MQTT topics when mapped to a zenoh resource. + //// This should be used to avoid conflicts when several distinct MQTT systems using + //// the same topics names are routed via zenoh. + //// + // scope: "home-1", + + //// + //// allow: A regular expression matching the MQTT topic name that must be routed via zenoh. By default topics are allowed. + //// If both '--allow' and '--deny' are set a topic will be allowed if it matches only the 'allow' expression. + //// + // allow: "zigbee2mqtt|home-1/room-2", + + //// + //// deny: A regular expression matching the MQTT topic name that must not be routed via zenoh. By default no topics are denied. + //// If both '--allow' and '--deny' are set a topic will be allowed if it matches only the 'allow' expression. + //// + // deny: "zigbee2mqtt|home-1/room-2", + + //// + //// generalise_subs: A list of key expression to use for generalising subscriptions. + //// + // generalise_subs: ["SUB1", "SUB2"], + + //// + //// generalise_subs: A list of key expression to use for generalising publications. + //// + // generalise_subs: ["PUB1", "PUB2"], + + }, + + + //// + //// REST API configuration (active only if this part is defined) + //// + // Optionally, add the REST plugin + rest: { http_port: 8000 }, + }, + + + //// + //// zenoh related configuration (see zenoh documentation for more details) + //// + + //// + //// id: The identifier (as hex-string) that zenoh-bridge-mqtt must use. If not set, a random UUIDv4 will be used. + //// WARNING: this id must be unique in your zenoh network. + // id: "A00001", + + //// + //// mode: The bridge's mode (peer or client) + //// + //mode: "client", + + //// + //// Which endpoints to connect to. E.g. tcp/localhost:7447. + //// By configuring the endpoints, it is possible to tell zenoh which router/peer to connect to at startup. + //// + connect: { + endpoints: [ + //"tcp/127.0.0.1:7447" + ] + }, + + //// + //// Which endpoints to listen on. E.g. tcp/localhost:7447. + //// By configuring the endpoints, it is possible to tell zenoh which are the endpoints that other routers, + //// peers, or client can use to establish a zenoh session. + //// + listen: { + endpoints: [ + //"tcp/127.0.0.1:7447" + ] + }, +}