Skip to content

Commit

Permalink
Merge pull request #2849 from Bravo555/improv/c8y-converter-remove-du…
Browse files Browse the repository at this point in the history
…plicate-paths

refactor: group and remove duplicates of paths in CumulocityConverter
  • Loading branch information
didier-wenzek authored May 3, 2024
2 parents 7d7e12f + 7332307 commit d2c31c1
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 69 deletions.
3 changes: 1 addition & 2 deletions crates/core/tedge_mapper/src/aws/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use async_trait::async_trait;
use aws_mapper_ext::converter::AwsConverter;
use clock::WallClock;
use mqtt_channel::TopicFilter;
use std::path::Path;
use tedge_actors::ConvertingActor;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
Expand All @@ -26,7 +25,7 @@ impl TEdgeComponent for AwsMapper {
async fn start(
&self,
tedge_config: TEdgeConfig,
_config_dir: &Path,
_config_dir: &tedge_config::Path,
) -> Result<(), anyhow::Error> {
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;
Expand Down
3 changes: 1 addition & 2 deletions crates/core/tedge_mapper/src/az/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use async_trait::async_trait;
use az_mapper_ext::converter::AzureConverter;
use clock::WallClock;
use mqtt_channel::TopicFilter;
use std::path::Path;
use tedge_actors::ConvertingActor;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
Expand All @@ -26,7 +25,7 @@ impl TEdgeComponent for AzureMapper {
async fn start(
&self,
tedge_config: TEdgeConfig,
_config_dir: &Path,
_config_dir: &tedge_config::Path,
) -> Result<(), anyhow::Error> {
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;
Expand Down
7 changes: 5 additions & 2 deletions crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use c8y_mapper_ext::compatibility_adapter::OldAgentAdapter;
use c8y_mapper_ext::config::C8yMapperConfig;
use c8y_mapper_ext::converter::CumulocityConverter;
use mqtt_channel::Config;
use std::path::Path;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_config::TEdgeConfig;
Expand All @@ -32,7 +31,11 @@ impl TEdgeComponent for CumulocityMapper {
CUMULOCITY_MAPPER_NAME
}

async fn start(&self, tedge_config: TEdgeConfig, cfg_dir: &Path) -> Result<(), anyhow::Error> {
async fn start(
&self,
tedge_config: TEdgeConfig,
cfg_dir: &tedge_config::Path,
) -> Result<(), anyhow::Error> {
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;

Expand Down
3 changes: 1 addition & 2 deletions crates/core/tedge_mapper/src/collectd/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use collectd_ext::actor::CollectdActorBuilder;
use mqtt_channel::QoS;
use mqtt_channel::Topic;
use mqtt_channel::TopicFilter;
use std::path::Path;
use tedge_actors::MessageSink;
use tedge_actors::NoConfig;
use tedge_config::TEdgeConfig;
Expand Down Expand Up @@ -36,7 +35,7 @@ impl TEdgeComponent for CollectdMapper {
async fn start(
&self,
tedge_config: TEdgeConfig,
_config_dir: &Path,
_config_dir: &tedge_config::Path,
) -> Result<(), anyhow::Error> {
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;
Expand Down
8 changes: 6 additions & 2 deletions crates/core/tedge_mapper/src/core/component.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use async_trait::async_trait;
use std::path::Path;
use tedge_config::TEdgeConfig;

#[async_trait]
pub trait TEdgeComponent: Sync + Send {
fn session_name(&self) -> &str;
async fn start(&self, tedge_config: TEdgeConfig, cfg_dir: &Path) -> Result<(), anyhow::Error>;

async fn start(
&self,
tedge_config: TEdgeConfig,
cfg_dir: &tedge_config::Path,
) -> Result<(), anyhow::Error>;
}
17 changes: 12 additions & 5 deletions crates/extensions/c8y_mapper_ext/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,12 @@ impl C8yMapperActor {
| FsWatchEvent::Modified(path) => {
// Process inotify events only for the main device at the root operations directory
// directly under /etc/tedge/operations/c8y
if path.parent() == Some(&self.converter.ops_dir) {
match process_inotify_events(&self.converter.ops_dir, &path, file_event) {
if path.parent() == Some(self.converter.config.ops_dir.as_std_path()) {
match process_inotify_events(
self.converter.config.ops_dir.as_std_path(),
&path,
file_event,
) {
Ok(Some(discovered_ops)) => {
self.mqtt_publisher
.send(
Expand Down Expand Up @@ -316,7 +320,10 @@ impl C8yMapperBuilder {
let timer_sender = timer.connect_client(box_builder.get_sender().sender_clone());
let upload_sender = uploader.connect_client(box_builder.get_sender().sender_clone());
let download_sender = downloader.connect_client(box_builder.get_sender().sender_clone());
fs_watcher.connect_sink(config.ops_dir.clone(), &box_builder.get_sender());
fs_watcher.connect_sink(
config.ops_dir.as_std_path().to_path_buf(),
&box_builder.get_sender(),
);
let auth_proxy = ProxyUrlGenerator::new(
config.auth_proxy_addr.clone(),
config.auth_proxy_port,
Expand Down Expand Up @@ -346,11 +353,11 @@ impl C8yMapperBuilder {

fn init(config: &C8yMapperConfig) -> Result<(), FileError> {
// Create c8y operations directory
create_directory_with_defaults(config.ops_dir.clone())?;
create_directory_with_defaults(config.ops_dir.as_std_path())?;
// Create directory for device custom fragments
create_directory_with_defaults(config.config_dir.join("device"))?;
// Create directory for persistent entity store
create_directory_with_defaults(&config.state_dir)?;
create_directory_with_defaults(config.state_dir.as_std_path())?;
Ok(())
}
}
Expand Down
47 changes: 25 additions & 22 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use c8y_api::smartrest::operations::Operations;
use c8y_api::smartrest::topic::C8yTopic;
use c8y_auth_proxy::url::Protocol;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use std::path::Path;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use tedge_api::mqtt_topics::ChannelFilter::Command;
Expand All @@ -33,16 +31,10 @@ const C8Y_CLOUD: &str = "c8y";
const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations";

pub struct C8yMapperConfig {
pub config_dir: PathBuf,
pub logs_path: Utf8PathBuf,
pub data_dir: DataDir,
pub device_id: String,
pub device_topic_id: EntityTopicId,
pub device_type: String,
pub service: TEdgeConfigReaderService,
pub ops_dir: PathBuf,
pub state_dir: PathBuf,
pub tmp_dir: Arc<Utf8Path>,
pub c8y_host: String,
pub tedge_http_host: Arc<str>,
pub topics: TopicFilter,
Expand All @@ -57,15 +49,23 @@ pub struct C8yMapperConfig {
pub bridge_in_mapper: bool,
pub software_management_api: SoftwareManagementApiFlag,
pub software_management_with_types: bool,

pub data_dir: DataDir,
pub config_dir: Arc<Utf8Path>,
pub logs_path: Arc<Utf8Path>,
pub ops_dir: Arc<Utf8Path>,
pub state_dir: Arc<Utf8Path>,
pub tmp_dir: Arc<Utf8Path>,
}

impl C8yMapperConfig {
#[allow(clippy::too_many_arguments)]
pub fn new(
config_dir: PathBuf,
logs_path: Utf8PathBuf,
config_dir: Arc<Utf8Path>,
logs_path: Arc<Utf8Path>,
data_dir: DataDir,
tmp_dir: Arc<Utf8Path>,

device_id: String,
device_topic_id: EntityTopicId,
device_type: String,
Expand All @@ -87,20 +87,16 @@ impl C8yMapperConfig {
) -> Self {
let ops_dir = config_dir
.join(SUPPORTED_OPERATIONS_DIRECTORY)
.join(C8Y_CLOUD);
let state_dir = config_dir.join(STATE_DIR_NAME);
.join(C8Y_CLOUD)
.into();
let state_dir = config_dir.join(STATE_DIR_NAME).into();

Self {
config_dir,
logs_path,
data_dir,
device_id,
device_topic_id,
device_type,
service,
ops_dir,
state_dir,
tmp_dir,
c8y_host,
tedge_http_host,
topics,
Expand All @@ -115,6 +111,12 @@ impl C8yMapperConfig {
bridge_in_mapper,
software_management_api,
software_management_with_types,

config_dir,
logs_path,
ops_dir,
state_dir,
tmp_dir,
}
}

Expand All @@ -128,14 +130,14 @@ impl C8yMapperConfig {
}

pub fn from_tedge_config(
config_dir: impl AsRef<Path>,
config_dir: impl AsRef<Utf8Path>,
tedge_config: &TEdgeConfig,
) -> Result<C8yMapperConfig, C8yMapperConfigBuildError> {
let config_dir: PathBuf = config_dir.as_ref().into();
let config_dir: Arc<Utf8Path> = config_dir.as_ref().into();

let logs_path = tedge_config.logs.path.clone();
let logs_path = tedge_config.logs.path.as_path().into();
let data_dir: DataDir = tedge_config.data.path.clone().into();
let tmp_dir = tedge_config.tmp.path.clone().into();
let tmp_dir = tedge_config.tmp.path.as_path().into();

let device_id = tedge_config.device.id.try_read(tedge_config)?.to_string();
let device_type = tedge_config.device.ty.clone();
Expand Down Expand Up @@ -164,7 +166,8 @@ impl C8yMapperConfig {
};
let c8y_prefix = tedge_config.c8y.bridge.topic_prefix.clone();

let mut topics = Self::default_internal_topic_filter(&config_dir, &c8y_prefix)?;
let mut topics =
Self::default_internal_topic_filter(config_dir.as_std_path(), &c8y_prefix)?;
let enable_auto_register = tedge_config.c8y.entity_store.auto_register;
let clean_start = tedge_config.c8y.entity_store.clean_start;

Expand Down
44 changes: 17 additions & 27 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use c8y_api::smartrest::topic::C8yTopic;
use c8y_auth_proxy::url::ProxyUrlGenerator;
use c8y_http_proxy::handle::C8YHttpProxy;
use c8y_http_proxy::messages::CreateEvent;
use camino::Utf8Path;
use logged_command::LoggedCommand;
use plugin_sm::operation_logs::OperationLogs;
use plugin_sm::operation_logs::OperationLogsError;
Expand Down Expand Up @@ -181,7 +180,7 @@ pub struct UploadOperationData {

pub struct CumulocityConverter {
pub(crate) size_threshold: SizeThreshold,
pub config: C8yMapperConfig,
pub config: Arc<C8yMapperConfig>,
pub(crate) mapper_config: MapperConfig,
pub device_name: String,
pub(crate) device_topic_id: EntityTopicId,
Expand All @@ -191,9 +190,6 @@ pub struct CumulocityConverter {
operation_logs: OperationLogs,
mqtt_publisher: LoggingSender<MqttMessage>,
pub http_proxy: C8YHttpProxy,
pub cfg_dir: PathBuf,
pub ops_dir: PathBuf,
pub tmp_dir: Arc<Utf8Path>,
pub children: HashMap<String, Operations>,
pub service_type: String,
pub c8y_endpoint: C8yEndPoint,
Expand Down Expand Up @@ -233,16 +229,11 @@ impl CumulocityConverter {
};

let c8y_host = config.c8y_host.clone();
let cfg_dir = config.config_dir.clone();

let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);

let ops_dir = config.ops_dir.clone();

let tmp_dir = config.tmp_dir.clone();

let operations = Operations::try_new(ops_dir.clone())?;
let children = get_child_ops(ops_dir.clone())?;
let operations = Operations::try_new(&*config.ops_dir)?;
let children = get_child_ops(&*config.ops_dir)?;

let alarm_converter = AlarmConverter::new();

Expand All @@ -267,7 +258,7 @@ impl CumulocityConverter {
Self::map_to_c8y_external_id,
Self::validate_external_id,
EARLY_MESSAGE_BUFFER_SIZE,
config.state_dir.clone(),
&*config.state_dir,
config.clean_start,
)
.unwrap();
Expand All @@ -276,7 +267,7 @@ impl CumulocityConverter {

Ok(CumulocityConverter {
size_threshold,
config,
config: Arc::new(config),
mapper_config,
device_name: device_id,
device_topic_id,
Expand All @@ -285,9 +276,6 @@ impl CumulocityConverter {
operations,
operation_logs,
http_proxy,
cfg_dir,
ops_dir,
tmp_dir,
children,
mqtt_publisher,
service_type,
Expand Down Expand Up @@ -1245,8 +1233,10 @@ impl CumulocityConverter {
fn try_init_messages(&mut self) -> Result<Vec<MqttMessage>, ConversionError> {
let mut messages = self.parse_base_inventory_file()?;

let supported_operations_message =
self.create_supported_operations(&self.ops_dir, &self.config.c8y_prefix)?;
let supported_operations_message = self.create_supported_operations(
self.config.ops_dir.as_std_path(),
&self.config.c8y_prefix,
)?;

let device_data_message = self.inventory_device_type_update_message()?;

Expand Down Expand Up @@ -1350,10 +1340,10 @@ impl CumulocityConverter {
) -> Result<Vec<MqttMessage>, ConversionError> {
let device = self.entity_store.try_get(target)?;
let ops_dir = match device.r#type {
EntityType::MainDevice => self.ops_dir.clone(),
EntityType::MainDevice => self.config.ops_dir.clone(),
EntityType::ChildDevice => {
let child_dir_name = device.external_id.as_ref();
self.ops_dir.clone().join(child_dir_name)
self.config.ops_dir.join(child_dir_name).into()
}
EntityType::Service => {
let target = &device.topic_id;
Expand All @@ -1362,14 +1352,14 @@ impl CumulocityConverter {
}
};
let ops_file = ops_dir.join(c8y_operation_name);
create_directory_with_defaults(&ops_dir)?;
create_directory_with_defaults(&*ops_dir)?;
create_file_with_defaults(ops_file, None)?;

let need_cloud_update = self.update_operations(&ops_dir)?;
let need_cloud_update = self.update_operations(ops_dir.as_std_path())?;

if need_cloud_update {
let device_operations =
self.create_supported_operations(&ops_dir, &self.config.c8y_prefix)?;
self.create_supported_operations(ops_dir.as_std_path(), &self.config.c8y_prefix)?;
return Ok(vec![device_operations]);
}

Expand Down Expand Up @@ -3255,10 +3245,10 @@ pub(crate) mod tests {
.unwrap();

C8yMapperConfig::new(
tmp_dir.to_path_buf(),
tmp_dir.utf8_path_buf(),
tmp_dir.utf8_path_buf().into(),
tmp_dir.utf8_path().into(),
tmp_dir.utf8_path().into(),
tmp_dir.utf8_path_buf().into(),
tmp_dir.utf8_path().into(),
device_id,
device_topic_id,
device_type,
Expand Down
7 changes: 5 additions & 2 deletions crates/extensions/c8y_mapper_ext/src/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ impl CumulocityConverter {
&mut self,
) -> Result<Vec<MqttMessage>, ConversionError> {
let mut messages = vec![];
let inventory_file_path = self.cfg_dir.join(INVENTORY_FRAGMENTS_FILE_LOCATION);
let mut inventory_base = Self::get_inventory_fragments(&inventory_file_path)?;
let inventory_file_path = self
.config
.config_dir
.join(INVENTORY_FRAGMENTS_FILE_LOCATION);
let mut inventory_base = Self::get_inventory_fragments(inventory_file_path.as_std_path())?;

if let Some(map) = inventory_base.as_object_mut() {
if map.remove("name").is_some() {
Expand Down
Loading

0 comments on commit d2c31c1

Please sign in to comment.