Skip to content

Commit

Permalink
[#22] Add support for using Eclipse Zenoh as transport between vehicl…
Browse files Browse the repository at this point in the history
…e and back-end

Extended FMS Forwarder to support publishing vehicle status information to the back-end using Eclipse Zenoh.
The FMS Consumer has also been extended to support consuming the data using Zenoh.
A Docker Compose profile has been added for starting up a Zenoh router and running the FMS Forwarder and Consumer using the router.

Co-authored-by: pgangula-src <[email protected]>
  • Loading branch information
vivekpandey02 and pgangula-src authored Feb 15, 2024
1 parent 484fa05 commit 8bf2f16
Show file tree
Hide file tree
Showing 13 changed files with 546 additions and 20 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ of an Eclipse Hono instance as shown in the diagram below.
The second compose file specified on the command line will also start the [FMS Consumer](./components/fms-consumer)
back end component which receives vehicle data via Hono's north bound Kafka based Telemetry API and writes it to the
Influx DB.

# Using Eclipse Zenoh to geographically distribute the vehicle data

The blueprint supports configuring the FMS Forwarder to send vehicle data to the Eclipse Zenoh router of an [Eclipse Zenoh](https://zenoh.io/) instance as shown in the diagram below.
Zenoh router provides a plugin mechanism to other protocols to enable the Vehicle to anything communication.

<img src="img/architecture-zenoh.drawio.svg">

Start up the vehicle and back end services using Docker Compose:

```sh
docker compose -f ./fms-blueprint-compose.yaml -f ./fms-blueprint-compose-zenoh.yaml up --detach
```

Once all services have been started, the current vehicle status can be viewed on a [Grafana dashboard](http://127.0.0.1:3000),
using *admin*/*admin* as username and password for logging in.

# Manual configuration

Expand Down
1 change: 1 addition & 0 deletions components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ prost = { version = "0.12" }
prost-types = { version = "0.12" }
# tokio does not enable features by default
tokio = { version = "1.36" }
zenoh = {version = "0.10.1-rc"}

[profile.release]
lto = true # Link time optimization (dead code removal etc...)
Expand Down
4 changes: 3 additions & 1 deletion components/fms-consumer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ readme.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = { workspace = true }
clap = { workspace = true, features = ["std", "env", "color", "help", "usage", "error-context", "suggestions"] }
env_logger = { workspace = true }
fms-proto = { workspace = true }
futures = "0.3"
futures = {version ="0.3"}
influx-client = { workspace = true, features = ["writer"] }
rdkafka = { version = "0.36", default-features = false, features = ["libz", "tokio", "cmake-build", "ssl-vendored"] }
log = { workspace = true }
protobuf = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] }
zenoh = { workspace = true }

[build-dependencies]
# see https://github.com/fede1024/rust-rdkafka/issues/572#issuecomment-1529316876
Expand Down
58 changes: 58 additions & 0 deletions components/fms-consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<!--
SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation
See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
-->
The FMS Consumer receives vehicle data either via Hono's north bound Kafka based Telemetry API or Zenoh and writes it to the Influx DB.


# Building

Building the consumer requires a [Rust development toolchain](https://rustup.rs/).

# Running

The FMS Consumer receives vehicle data either via Hono's north bound Kafka based Telemetry API or Zenoh and writes it to the Influx DB. The type of source can be selected by means of command line arguments when starting the consumer.

Please refer to the command line help for details:

```sh
fms-consumer --help
```

## Receive data from Hono's north bound Kafka based Telemetry API

The consumer can receive data from Hono's north bound Kafka based Telemetry API

Please refer to the command line help for details:

```sh
fms-consumer hono --help
```


## Receive data from Zenoh

The consumer can receive data from the Zenoh router of an [Eclipse Zenoh](https://projects.eclipse.org/projects/iot.zenoh/) instance.
For this to work, the consumer needs to be configured with the Zenoh router end points.

Please refer to the command line help for details:

```sh
fms-consumer zenoh --help
```
181 changes: 165 additions & 16 deletions components/fms-consumer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,56 @@ use rdkafka::consumer::Consumer;
use rdkafka::message::{BorrowedHeaders, BorrowedMessage, Headers};
use rdkafka::{ClientConfig, Message};

use futures::select;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;

const CONTENT_TYPE_PROTOBUF: &str = "application/vnd.google.protobuf";

const HEADER_NAME_ORIG_ADDRESS: &str = "orig_address";

const PARAM_KAFKA_PROPERTIES_FILE: &str = "kafka-properties-file";
const PARAM_KAFKA_TOPIC_NAME: &str = "kafka-topic";

const SUBCOMMAND_HONO: &str = "hono";
const SUBCOMMAND_ZENOH: &str = "zenoh";

const KEY_EXPR: &str = "fms/vehicleStatus";

fn parse_zenoh_args(args: &ArgMatches) -> Config {
let mut config: Config = if let Some(conf_file) = args.get_one::<String>("config") {
Config::from_file(conf_file).unwrap()
} else {
Config::default()
};

if let Some(mode) = args.get_one::<WhatAmI>("mode") {
config.set_mode(Some(*mode)).unwrap();
}

if let Some(values) = args.get_many::<String>("connect") {
config
.connect
.endpoints
.extend(values.map(|v| v.parse().unwrap()))
}
if let Some(values) = args.get_many::<String>("listen") {
config
.listen
.endpoints
.extend(values.map(|v| v.parse().unwrap()))
}
if let Some(values) = args.get_one::<bool>("no-multicast-scouting") {
config
.scouting
.multicast
.set_enabled(Some(*values))
.unwrap();
}

config
}

fn add_property_bag_to_map(property_bag: String, headers: &mut HashMap<String, String>) {
property_bag.split('&').for_each(|p| {
trace!("processing property: {p}");
Expand Down Expand Up @@ -148,7 +191,15 @@ async fn process_protobuf_message(
}
}

async fn process_message(m: &BorrowedMessage<'_>, influx_writer: Arc<InfluxWriter>) {
async fn process_zenoh_message(payload: &[u8], influx_writer: Arc<InfluxWriter>) {
if let Some(vehicle_status) = deserialize_vehicle_status(payload) {
influx_writer.write_vehicle_status(&vehicle_status).await;
} else {
debug!("ignoring message without payload");
}
}

async fn process_hono_message(m: &BorrowedMessage<'_>, influx_writer: Arc<InfluxWriter>) {
if let Some(headers) = m.headers() {
let message_properties = get_headers_as_map(headers);
match (
Expand All @@ -167,7 +218,7 @@ async fn process_message(m: &BorrowedMessage<'_>, influx_writer: Arc<InfluxWrite
}
}

async fn run_async_processor(args: &ArgMatches) {
async fn run_async_processor_hono(args: &ArgMatches) {
let influx_writer = InfluxWriter::new(args).map_or_else(
|e| {
error!("failed to create InfluxDB writer: {e}");
Expand All @@ -176,12 +227,16 @@ async fn run_async_processor(args: &ArgMatches) {
Arc::new,
);

let mut client_config =
get_kafka_client_config(args.get_one::<String>(PARAM_KAFKA_PROPERTIES_FILE).unwrap())
.unwrap_or_else(|e| {
error!("failed to create Kafka client: {e}");
process::exit(1);
});
let hono_args = args.subcommand_matches(SUBCOMMAND_HONO).unwrap();
let mut client_config = get_kafka_client_config(
hono_args
.get_one::<String>(PARAM_KAFKA_PROPERTIES_FILE)
.unwrap(),
)
.unwrap_or_else(|e| {
error!("failed to create Kafka client: {e}");
process::exit(1);
});

// Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`.
let consumer: StreamConsumer = client_config
Expand All @@ -192,7 +247,7 @@ async fn run_async_processor(args: &ArgMatches) {
process::exit(1);
});

let topic_name = args.get_one::<String>(PARAM_KAFKA_TOPIC_NAME).unwrap();
let topic_name = hono_args.get_one::<String>(PARAM_KAFKA_TOPIC_NAME).unwrap();

match consumer.fetch_metadata(Some(topic_name), Duration::from_secs(10)) {
Err(e) => {
Expand Down Expand Up @@ -230,7 +285,7 @@ async fn run_async_processor(args: &ArgMatches) {
.try_for_each(|borrowed_message| {
let cloned_writer = influx_writer.clone();
async move {
process_message(&borrowed_message, cloned_writer).await;
process_hono_message(&borrowed_message, cloned_writer).await;
Ok(())
}
})
Expand All @@ -243,6 +298,32 @@ async fn run_async_processor(args: &ArgMatches) {
}
}

async fn run_async_processor_zenoh(args: &ArgMatches) {
let influx_writer = InfluxWriter::new(args).map_or_else(
|e| {
error!("failed to create InfluxDB writer: {e}");
process::exit(1);
},
Arc::new,
);
let zenoh_args = args.subcommand_matches(SUBCOMMAND_ZENOH).unwrap();
let config = parse_zenoh_args(zenoh_args);

info!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();

info!("Declaring Subscriber on '{}'...", &KEY_EXPR);
let subscriber = session.declare_subscriber(KEY_EXPR).res().await.unwrap();
loop {
select!(
sample = subscriber.recv_async() => {
let sample = sample.unwrap();
let cloned_writer = influx_writer.clone();
process_zenoh_message(&sample.value.payload.contiguous(), cloned_writer).await;
}
);
}
}
#[tokio::main]
pub async fn main() {
env_logger::init();
Expand All @@ -251,14 +332,22 @@ pub async fn main() {
.unwrap_or(option_env!("VERGEN_GIT_SHA").unwrap_or("unknown"));

let mut parser = Command::new("FMS data consumer")
.arg_required_else_help(true)
.version(version)
.about("Receives FMS related VSS data points via Hono's Kafka based Telemetry API and writes them to an InfluxDB server")
.arg(
.about("Receives FMS related VSS data points via Hono's Kafka based Telemetry API or Eclipse Zenoh instance and writes them to an InfluxDB server");

parser = influx_client::connection::add_command_line_args(parser);

parser = parser
.subcommand_required(true)
.subcommand(
Command::new(SUBCOMMAND_HONO)
.about("Forwards VSS data to an Influx DB server from Hono's north bound Kafka API").arg(
Arg::new(PARAM_KAFKA_PROPERTIES_FILE)
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long(PARAM_KAFKA_PROPERTIES_FILE)
.help("The path to a file containing Kafka client properties for connecting to the Kafka broker(s).")
.action(ArgAction::Set)
.action(ArgAction::Set)
.value_name("PATH")
.env("KAFKA_PROPERTIES_FILE")
.required(true),
Expand All @@ -272,10 +361,70 @@ pub async fn main() {
.value_name("TOPIC")
.required(true)
.env("KAFKA_TOPIC_NAME"),
),
)
.subcommand(
Command::new(SUBCOMMAND_ZENOH)
.about("Forwards VSS data to an Influx DB server from Eclipse Zenoh")
.arg(
Arg::new("mode")
.value_parser(clap::value_parser!(WhatAmI))
.long("mode")
.short('m')
.help("The Zenoh session mode (peer by default).")
.required(false),
)
.arg(
Arg::new("connect")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("connect")
.short('e')
.help("Endpoints to connect to.")
.required(false),
)
.arg(
Arg::new("listen")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("listen")
.short('l')
.help("Endpoints to listen on.")
.required(false),
)
.arg(
Arg::new("no-multicast-scouting")
.long("no-multicast-scouting")
.help("Disable the multicast-based scouting mechanism.")
.action(clap::ArgAction::SetFalse)
.required(false),
)
.arg(
Arg::new("config")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("config")
.short('c')
.help("A configuration file.")
.required(false),
),
);

parser = influx_client::connection::add_command_line_args(parser);
let args = parser.get_matches();
info!("starting FMS data consumer");
run_async_processor(&args).await

match args.subcommand_name() {
Some(SUBCOMMAND_HONO) => {
info!("starting FMS data consumer for Hono");
run_async_processor_hono(&args).await
}
Some(SUBCOMMAND_ZENOH) => {
info!("starting FMS data consumer for Zenoh");
run_async_processor_zenoh(&args).await
}
Some(_) => {
// cannot happen because subcommand is required
process::exit(1);
}
None => {
// cannot happen because subcommand is required
process::exit(1);
}
};
}
5 changes: 2 additions & 3 deletions components/fms-forwarder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ env_logger = { workspace = true }
fms-proto = { workspace = true }
influx-client = { workspace = true, features = ["writer"] }
log = { workspace = true }
paho-mqtt = { version = "0.12", default-features = false, features = [
"vendored-ssl",
] }
paho-mqtt = { version = "0.12", default-features = false, features = [ "vendored-ssl" ] }
zenoh = { workspace = true }
protobuf = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
Expand Down
12 changes: 12 additions & 0 deletions components/fms-forwarder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,15 @@ Please refer to the command line help for details:
```sh
fms-forwarder hono --help
```


## Publishing to Zenoh

The forwarder can publish status information to the Zenoh router of an [Eclipse Zenoh](https://projects.eclipse.org/projects/iot.zenoh/) instance.
For this to work, the forwarder needs to be configured with the Zenoh router end points.

Please refer to the command line help for details:

```sh
fms-forwarder zenoh --help
```
Loading

0 comments on commit 8bf2f16

Please sign in to comment.