Skip to content

Commit

Permalink
Fix clippy warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
sophokles73 committed Sep 22, 2023
1 parent 86cb505 commit cb56b4a
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 49 deletions.
38 changes: 18 additions & 20 deletions components/fms-consumer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,16 @@ const PARAM_KAFKA_PROPERTIES_FILE: &str = "kafka-properties-file";
const PARAM_KAFKA_TOPIC_NAME: &str = "kafka-topic";

fn add_property_bag_to_map(property_bag: String, headers: &mut HashMap<String, String>) {
property_bag.split("&").for_each(|p| {
property_bag.split('&').for_each(|p| {
trace!("processing property: {p}");
p.split_once('=')
.map(|(k,v)| {
if headers.contains_key(k) {
trace!("skipping property [{k}] from property bag because header with same name exists");
} else {
trace!("adding propery [k: {k}, v: {v}] to header map");
headers.insert(k.to_string(), v.to_string());
}
});
if let Some((key, value)) = p.split_once('=') {
if headers.contains_key(key) {
trace!("skipping property [{key}] from property bag because header with same name exists");
} else {
trace!("adding propery [key: {key}, value: {value}] to header map");
headers.insert(key.to_string(), value.to_string());
}
}
});
}

Expand All @@ -65,13 +64,13 @@ fn get_headers_as_map(headers: &BorrowedHeaders) -> HashMap<String, String> {
header.key,
header
.value
.and_then(|v| String::from_utf8(v.to_vec()).map_or(None, Option::Some)),
.and_then(|v| String::from_utf8(v.to_vec()).ok()),
) {
(HEADER_NAME_ORIG_ADDRESS, Some(value)) => {
value.rsplit_once("/?").map(|(_topic, props)| {
if let Some((_topic, props)) = value.rsplit_once("/?") {
debug!("found property bag in {HEADER_NAME_ORIG_ADDRESS} header: {props}");
add_property_bag_to_map(props.to_string(), &mut result);
});
}
}
(_, Some(value)) => {
result.insert(header.key.to_string(), value);
Expand Down Expand Up @@ -101,7 +100,7 @@ fn get_kafka_client_config(filename: &String) -> Result<ClientConfig, Box<dyn st
let mut client_config = ClientConfig::new();
for line in lines {
match line {
Ok(property) => match property.split_once("=") {
Ok(property) => match property.split_once('=') {
Some((key, value)) => {
client_config.set(key, value);
}
Expand Down Expand Up @@ -141,9 +140,8 @@ async fn process_protobuf_message(
match message_properties.get("device_id") {
Some(device_id) => {
debug!("received message from vehicle {}", device_id);
match deserialize_vehicle_status(payload) {
Some(vehicle_status) => influx_writer.write_vehicle_status(&vehicle_status).await,
None => {}
if let Some(vehicle_status) = deserialize_vehicle_status(payload) {
influx_writer.write_vehicle_status(&vehicle_status).await;
}
}
None => debug!("discarding message from unknown device"),
Expand All @@ -170,7 +168,7 @@ async fn process_message(m: &BorrowedMessage<'_>, influx_writer: Arc<InfluxWrite
}

async fn run_async_processor(args: &ArgMatches) {
let influx_writer = InfluxWriter::new(&args).map_or_else(
let influx_writer = InfluxWriter::new(args).map_or_else(
|e| {
error!("failed to create InfluxDB writer: {e}");
process::exit(1);
Expand Down Expand Up @@ -203,11 +201,11 @@ async fn run_async_processor(args: &ArgMatches) {
}
Ok(metadata) => match metadata
.topics()
.into_iter()
.iter()
.find(|topic| topic.name() == topic_name)
{
Some(topic) => {
if topic.partitions().len() == 0 {
if topic.partitions().is_empty() {
error!("topic [{topic_name}] does not exist (yet)");
process::exit(1);
}
Expand Down
2 changes: 1 addition & 1 deletion components/fms-forwarder/src/hono_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl HonoPublisher {
/// MQTT endpoint using a client certificate of username/password credentials.
pub async fn new(args: &ArgMatches) -> Result<Self, Box<dyn std::error::Error>> {

MqttConnection::new(&args).await
MqttConnection::new(args).await
.map(|con| {
HonoPublisher { mqtt_connection: con }
})
Expand Down
4 changes: 2 additions & 2 deletions components/fms-forwarder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let publisher: Box<dyn StatusPublisher> = match args.subcommand_name() {
Some(SUBCOMMAND_HONO) => {
let hono_args = args.subcommand_matches(SUBCOMMAND_HONO).unwrap();
match HonoPublisher::new(&hono_args).await {
match HonoPublisher::new(hono_args).await {
Ok(writer) => Box::new(writer),
Err(e) => {
error!("failed to create Hono publisher: {}", e);
Expand All @@ -70,7 +70,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
},
Some(SUBCOMMAND_INFLUX) => {
let influx_args = args.subcommand_matches(SUBCOMMAND_INFLUX).unwrap();
match InfluxWriter::new(&influx_args) {
match InfluxWriter::new(influx_args) {
Ok(writer) => Box::new(writer),
Err(e) => {
error!("failed to create InfluxDB writer: {e}");
Expand Down
18 changes: 8 additions & 10 deletions components/fms-forwarder/src/mqtt_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,26 +154,24 @@ impl MqttConnection {
.unwrap_or(&"".to_string())
.to_owned();
let mut ssl_options_builder = SslOptionsBuilder::new();
match args.get_one::<String>(PARAM_CA_PATH) {
Some(path) => match ssl_options_builder.ca_path(path) {
if let Some(path) = args.get_one::<String>(PARAM_CA_PATH) {
match ssl_options_builder.ca_path(path) {
Err(e) => {
error!("failed to set CA path on MQTT client: {e}");
return Err(Box::new(e));
}
Ok(_builder) => (),
},
None => (),
};
match args.get_one::<String>(PARAM_TRUST_STORE_PATH) {
Some(path) => match ssl_options_builder.trust_store(path) {
}
}
if let Some(path) = args.get_one::<String>(PARAM_TRUST_STORE_PATH) {
match ssl_options_builder.trust_store(path) {
Err(e) => {
error!("failed to set trust store path on MQTT client: {e}");
return Err(Box::new(e));
}
Ok(_builder) => (),
},
None => (),
};
}
}

let mut connect_options_builder = ConnectOptionsBuilder::new_v3();
connect_options_builder.connect_timeout(Duration::from_secs(10));
Expand Down
4 changes: 2 additions & 2 deletions components/fms-forwarder/src/vehicle_abstraction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ impl KuksaValDatabroker {

pub async fn get_vehicle_status(&mut self) -> Result<VehicleStatus, DatabrokerError> {
let entry_requests: Vec<EntryRequest> = SNAPSHOT_VSS_PATHS
.into_iter()
.iter()
.map(|path| EntryRequest {
path: path.to_string(),
view: View::CurrentValue as i32,
Expand Down Expand Up @@ -427,7 +427,7 @@ impl KuksaValDatabroker {
sender: Sender<FmsTrigger>,
) -> Result<(), DatabrokerError> {
let subscribe_entries: Vec<SubscribeEntry> = TRIGGER_VSS_PATHS
.into_iter()
.iter()
.map(|path| SubscribeEntry {
path: path.to_string(),
view: View::CurrentValue as i32,
Expand Down
2 changes: 1 addition & 1 deletion components/fms-forwarder/src/vehicle_abstraction/kuksa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl TryFrom<Value> for Option<f64> {
Value::Uint64(v) => Ok(Some(v as f64)),
Value::Int32(v) => Ok(Some(v as f64)),
Value::Int64(v) => Ok(Some(v as f64)),
Value::Double(v) => Ok(Some(v as f64)),
Value::Double(v) => Ok(Some(v)),
Value::Float(v) => Ok(Some(v as f64)),
_ => Err(UnsupportedValueTypeError{}),
}
Expand Down
16 changes: 8 additions & 8 deletions components/fms-server/src/influx_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use influxrs::InfluxError;

use crate::models::{self, GnssPositionObject, TriggerObject, VehiclePositionObject};

const FILTER_FIELDS_POSITION: &'static str = formatcp!(
const FILTER_FIELDS_POSITION: &str = formatcp!(
r#"filter(fn: (r) => contains(set: ["{}","{}","{}","{}","{}","{}","{}","{}", "{}"], value: r._field))"#,
influx_client::FIELD_CREATED_DATE_TIME,
influx_client::FIELD_LATITUDE,
Expand All @@ -37,23 +37,23 @@ const FILTER_FIELDS_POSITION: &'static str = formatcp!(
influx_client::FIELD_TACHOGRAPH_SPEED,
influx_client::FIELD_WHEEL_BASED_SPEED,
);
const FILTER_MEASUREMENT_SNAPSHOT: &'static str = formatcp!(
const FILTER_MEASUREMENT_SNAPSHOT: &str = formatcp!(
r#"filter(fn: (r) => r._measurement == "{}")"#,
influx_client::MEASUREMENT_SNAPSHOT,
);
const FILTER_TAG_ANY_VIN: &'static str = formatcp!(r#"filter(fn: (r) => r["{}"] =~ /.*/)"#, influx_client::TAG_VIN);
const FILTER_TAG_ANY_TRIGGER: &'static str = formatcp!(r#"filter(fn: (r) => r["{}"] =~ /.*/)"#, influx_client::TAG_TRIGGER);
const FILTER_TAG_ANY_VIN: &str = formatcp!(r#"filter(fn: (r) => r["{}"] =~ /.*/)"#, influx_client::TAG_VIN);
const FILTER_TAG_ANY_TRIGGER: &str = formatcp!(r#"filter(fn: (r) => r["{}"] =~ /.*/)"#, influx_client::TAG_TRIGGER);

fn unpack_value_i32(value: Option<&String>) -> Option<i32> {
value.and_then(|v| v.parse().map_or(None, Option::Some))
value.and_then(|v| v.parse().ok())
}

fn unpack_value_f64(value: Option<&String>) -> Option<f64> {
value.and_then(|v| v.parse().map_or(None, Option::Some))
value.and_then(|v| v.parse().ok())
}

fn unpack_time(value: Option<&String>) -> Option<DateTime<Utc>> {
value.and_then(|v| v.parse().map_or(None, Option::Some))
value.and_then(|v| v.parse().ok())
}

pub struct InfluxReader {
Expand All @@ -63,7 +63,7 @@ pub struct InfluxReader {
impl InfluxReader {

pub fn new(args: &ArgMatches) -> Result<Self, Box<dyn std::error::Error>> {
InfluxConnection::new(&args).map(|con| InfluxReader { influx_con: con })
InfluxConnection::new(args).map(|con| InfluxReader { influx_con: con })
}

pub async fn get_vehicles(&self) -> Result<Vec<models::VehicleObject>, InfluxError> {
Expand Down
1 change: 0 additions & 1 deletion components/fms-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use axum::extract::{Query, State};
use clap::Command;
use log::{error, info};

use serde_json;
use serde_json::json;
use std::collections::HashMap;
use std::process;
Expand Down
2 changes: 1 addition & 1 deletion components/fms-server/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ pub struct VehicleObject {
impl VehicleObject {
pub fn new(vin : String) -> VehicleObject {
VehicleObject {
vin: vin,
vin,
customer_vehicle_name: None,
brand: None,
production_date: None,
Expand Down
6 changes: 3 additions & 3 deletions components/influx-client/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl InfluxWriter {
/// | field | parkingBrakeSwitch | Switch signal which indicates when the parking brake is set. |
///
pub async fn write_vehicle_status(&self, vehicle_status: &VehicleStatus) {
if vehicle_status.vin.len() == 0 {
if vehicle_status.vin.is_empty() {
debug!("ignoring vehicle status without VIN ...");
return;
}
Expand Down Expand Up @@ -275,7 +275,7 @@ impl InfluxWriter {
vehicle_status.vin.as_str(),
&trigger,
created_timestamp,
&vehicle_status,
vehicle_status,
) {
debug!("writing header measurement to influxdb");
measurements.push(measurement);
Expand All @@ -284,7 +284,7 @@ impl InfluxWriter {
vehicle_status.vin.as_str(),
&trigger,
created_timestamp,
&vehicle_status,
vehicle_status,
) {
debug!("writing snapshot measurement to influxdb");
measurements.push(measurement);
Expand Down

0 comments on commit cb56b4a

Please sign in to comment.