Skip to content

Commit

Permalink
Merge pull request #3086 from Bravo555/improve/config-manager-simplif…
Browse files Browse the repository at this point in the history
…y-control-flow

refactor: Simplify config manager control flow
  • Loading branch information
Bravo555 authored Aug 29, 2024
2 parents c875a89 + a4f7a49 commit b2c753c
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 149 deletions.
1 change: 1 addition & 0 deletions crates/extensions/tedge_config_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tedge_uploader_ext = { workspace = true }
tedge_utils = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
toml = { workspace = true }
uzers = { workspace = true }

Expand Down
210 changes: 72 additions & 138 deletions crates/extensions/tedge_config_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@ use anyhow::Context;
use async_trait::async_trait;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use log::debug;
use log::error;
use log::info;
use serde_json::json;
use std::collections::HashMap;
use std::io::ErrorKind;
use std::os::unix::fs::fchown;
use std::os::unix::fs::MetadataExt;
use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;
use tedge_actors::fan_in_message_type;
use tedge_actors::Actor;
use tedge_actors::ChannelError;
use tedge_actors::DynSender;
use tedge_actors::ClientMessageBox;
use tedge_actors::LoggingReceiver;
use tedge_actors::LoggingSender;
use tedge_actors::MessageReceiver;
Expand Down Expand Up @@ -53,16 +52,15 @@ pub type ConfigDownloadResult = (MqttTopic, DownloadResult);
pub type ConfigUploadRequest = (MqttTopic, UploadRequest);
pub type ConfigUploadResult = (MqttTopic, UploadResult);

fan_in_message_type!(ConfigInput[ConfigOperation, FsWatchEvent, ConfigDownloadResult, ConfigUploadResult] : Debug);
fan_in_message_type!(ConfigInput[ConfigOperation, FsWatchEvent] : Debug);

pub struct ConfigManagerActor {
config: ConfigManagerConfig,
plugin_config: PluginConfig,
pending_operations: HashMap<String, ConfigOperation>,
input_receiver: LoggingReceiver<ConfigInput>,
output_sender: LoggingSender<ConfigOperationData>,
download_sender: DynSender<ConfigDownloadRequest>,
upload_sender: DynSender<ConfigUploadRequest>,
downloader: ClientMessageBox<ConfigDownloadRequest, ConfigDownloadResult>,
uploader: ClientMessageBox<ConfigUploadRequest, ConfigUploadResult>,
}

#[async_trait]
Expand All @@ -72,20 +70,24 @@ impl Actor for ConfigManagerActor {
}

async fn run(mut self) -> Result<(), RuntimeError> {
self.reload_supported_config_types().await?;
let mut worker = ConfigManagerWorker {
config: Arc::from(self.config),
plugin_config: self.plugin_config,
output_sender: self.output_sender,
downloader: self.downloader,
uploader: self.uploader,
};

worker.reload_supported_config_types().await?;

while let Some(event) = self.input_receiver.recv().await {
let result = match event {
ConfigInput::ConfigOperation(request) => {
self.process_operation_request(request).await
}
ConfigInput::FsWatchEvent(event) => self.process_file_watch_events(event).await,
ConfigInput::ConfigDownloadResult((topic, result)) => {
Ok(self.process_downloaded_config(&topic, result).await?)
}
ConfigInput::ConfigUploadResult((topic, result)) => {
self.process_uploaded_config(&topic, result).await
let mut worker = worker.clone();
tokio::spawn(async move { worker.process_operation_request(request).await });
Ok(())
}
ConfigInput::FsWatchEvent(event) => worker.process_file_watch_events(event).await,
};

if let Err(err) = result {
Expand All @@ -103,20 +105,30 @@ impl ConfigManagerActor {
plugin_config: PluginConfig,
input_receiver: LoggingReceiver<ConfigInput>,
output_sender: LoggingSender<ConfigOperationData>,
download_sender: DynSender<ConfigDownloadRequest>,
upload_sender: DynSender<ConfigUploadRequest>,
downloader: ClientMessageBox<ConfigDownloadRequest, ConfigDownloadResult>,
uploader: ClientMessageBox<ConfigUploadRequest, ConfigUploadResult>,
) -> Self {
ConfigManagerActor {
config,
plugin_config,
pending_operations: HashMap::new(),
input_receiver,
output_sender,
download_sender,
upload_sender,
downloader,
uploader,
}
}
}

#[derive(Clone)]
struct ConfigManagerWorker {
config: Arc<ConfigManagerConfig>,
plugin_config: PluginConfig,
output_sender: LoggingSender<ConfigOperationData>,
downloader: ClientMessageBox<ConfigDownloadRequest, ConfigDownloadResult>,
uploader: ClientMessageBox<ConfigUploadRequest, ConfigUploadResult>,
}

impl ConfigManagerWorker {
async fn process_operation_request(
&mut self,
request: ConfigOperation,
Expand Down Expand Up @@ -192,18 +204,18 @@ impl ConfigManagerActor {
.execute_config_snapshot_request(&topic, &mut request)
.await
{
Ok(_) => {
self.pending_operations.insert(
topic.name.clone(),
ConfigOperation::Snapshot(topic, request),
Ok(file_path) => {
request.successful(file_path.as_str());
info!(
"Config Snapshot request processed for config type: {}.",
request.config_type
);
self.publish_command_status(ConfigOperation::Snapshot(topic, request))
.await?;
}
Err(error) => {
let error_message = format!(
"Failed to initiate configuration snapshot upload to file-transfer service: {error}",
);
request.failed(&error_message);
error!("{}", error_message);
request.failed(error.to_string());
error!("config-manager failed to process config snapshot: {error}");
self.publish_command_status(ConfigOperation::Snapshot(topic, request))
.await?;
}
Expand All @@ -215,7 +227,7 @@ impl ConfigManagerActor {
&mut self,
topic: &Topic,
request: &mut ConfigSnapshotCmdPayload,
) -> Result<(), ConfigManagementError> {
) -> Result<Utf8PathBuf, ConfigManagementError> {
let file_entry = self
.plugin_config
.get_file_entry_from_type(&request.config_type)?;
Expand All @@ -238,11 +250,15 @@ impl ConfigManagerActor {
request.config_type, tedge_url
);

self.upload_sender
.send((topic.name.clone(), upload_request))
let (_, upload_result) = self
.uploader
.await_response((topic.name.clone(), upload_request))
.await?;

Ok(())
let upload_response =
upload_result.context("config-manager failed uploading configuration snapshot")?;

Ok(upload_response.file_path)
}

fn create_tedge_url_for_config_operation(
Expand Down Expand Up @@ -280,57 +296,24 @@ impl ConfigManagerActor {
))
}

async fn process_uploaded_config(
&mut self,
topic: &str,
result: UploadResult,
) -> Result<(), ChannelError> {
if let Some(ConfigOperation::Snapshot(topic, mut request)) =
self.pending_operations.remove(topic)
{
match result {
Ok(response) => {
request.successful(response.file_path.as_str());
info!(
"Config Snapshot request processed for config type: {}.",
request.config_type
);
self.publish_command_status(ConfigOperation::Snapshot(topic, request))
.await?;
}
Err(err) => {
let error_message = format!(
"config-manager failed uploading configuration snapshot: {}",
err
);
request.failed(&error_message);
error!("{}", error_message);
self.publish_command_status(ConfigOperation::Snapshot(topic, request))
.await?;
}
}
}

Ok(())
}

async fn handle_config_update_request(
&mut self,
topic: Topic,
mut request: ConfigUpdateCmdPayload,
) -> Result<(), ChannelError> {
match self.execute_config_update_request(&topic, &request).await {
Ok(_) => {
self.pending_operations
.insert(topic.name.clone(), ConfigOperation::Update(topic, request));
Ok(deployed_to_path) => {
request.successful(deployed_to_path);
info!(
"Config Update request processed for config type: {}.",
request.config_type
);
self.publish_command_status(ConfigOperation::Update(topic, request))
.await?;
}
Err(error) => {
let error_message = format!(
"config-manager failed to start downloading configuration: {}",
error
);
request.failed(&error_message);
error!("{}", error_message);
request.failed(error.to_string());
error!("config-manager failed to process config update: {error}");
self.publish_command_status(ConfigOperation::Update(topic, request))
.await?;
}
Expand All @@ -342,7 +325,7 @@ impl ConfigManagerActor {
&mut self,
topic: &Topic,
request: &ConfigUpdateCmdPayload,
) -> Result<(), ConfigManagementError> {
) -> Result<Utf8PathBuf, ConfigManagementError> {
let file_entry = self
.plugin_config
.get_file_entry_from_type(&request.config_type)?;
Expand All @@ -352,8 +335,7 @@ impl ConfigManagerActor {
let temp_path = &self.config.tmp_path.join(&file_entry.config_type);

let Some(tedge_url) = &request.tedge_url else {
debug!("tedge_url not present in config update payload, ignoring");
return Ok(());
return Err(anyhow::anyhow!("tedge_url not present in config update payload").into());
};

let download_request = DownloadRequest::new(tedge_url, temp_path.as_std_path())
Expand All @@ -364,71 +346,23 @@ impl ConfigManagerActor {
request.config_type, tedge_url
);

self.download_sender
.send((topic.name.clone(), download_request))
let (_, download_result) = self
.downloader
.await_response((topic.name.clone(), download_request))
.await?;

Ok(())
}

async fn process_downloaded_config(
&mut self,
topic: &str,
result: DownloadResult,
) -> Result<(), ConfigManagementError> {
let Some(ConfigOperation::Update(topic, mut request)) =
self.pending_operations.remove(topic)
else {
return Ok(());
};

let response = match result {
Ok(response) => response,
Err(err) => {
let err =
anyhow::Error::from(err).context("config-manager failed downloading a file");
let error_message = format!("{err:#}");
request.failed(&error_message);
error!("{}", error_message);
self.publish_command_status(ConfigOperation::Update(topic, request))
.await?;
return Ok(());
}
};

// new config was downloaded into tmpdir, we need to write it into destination using tedge-write
let from = Utf8Path::from_path(response.file_path.as_path()).unwrap();

let deployed_to_path = match self.deploy_config_file(from, &request.config_type) {
Ok(path) => path,
Err(err) => {
let error_message =
format!("config-manager failed writing updated configuration file: {err}",);

request.failed(&error_message);
error!("{}", error_message);
self.publish_command_status(ConfigOperation::Update(topic, request))
.await?;

// TODO: source temporary file should be cleaned up automatically
let _ = std::fs::remove_file(from);
let download_response =
download_result.context("config-manager failed downloading a file")?;

return Ok(());
}
};

request.successful(deployed_to_path);
info!(
"Config Update request processed for config type: {}.",
request.config_type
);
self.publish_command_status(ConfigOperation::Update(topic, request))
.await?;
let from = Utf8Path::from_path(download_response.file_path.as_path()).unwrap();
let deployed_to_path = self
.deploy_config_file(from, &request.config_type)
.context("failed to deploy configuration file")?;

// TODO: source temporary file should be cleaned up automatically
let _ = std::fs::remove_file(from);

Ok(())
Ok(deployed_to_path)
}

/// Deploys the new version of the configuration file and returns the path under which it was
Expand Down
19 changes: 9 additions & 10 deletions crates/extensions/tedge_config_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use log::error;
use serde_json::json;
use std::path::PathBuf;
use tedge_actors::Builder;
use tedge_actors::CloneSender;
use tedge_actors::ClientMessageBox;
use tedge_actors::DynSender;
use tedge_actors::LinkError;
use tedge_actors::MappingSender;
Expand Down Expand Up @@ -48,8 +48,8 @@ pub struct ConfigManagerBuilder {
config: ConfigManagerConfig,
plugin_config: PluginConfig,
box_builder: SimpleMessageBoxBuilder<ConfigInput, ConfigOperationData>,
download_sender: DynSender<ConfigDownloadRequest>,
upload_sender: DynSender<ConfigUploadRequest>,
downloader: ClientMessageBox<ConfigDownloadRequest, ConfigDownloadResult>,
uploader: ClientMessageBox<ConfigUploadRequest, ConfigUploadResult>,
}

impl ConfigManagerBuilder {
Expand All @@ -64,10 +64,9 @@ impl ConfigManagerBuilder {
let plugin_config = PluginConfig::new(config.plugin_config_path.as_path());
let box_builder = SimpleMessageBoxBuilder::new("Tedge-Config-Manager", 16);

let download_sender =
downloader_actor.connect_client(box_builder.get_sender().sender_clone());
let downloader = ClientMessageBox::new(downloader_actor);

let upload_sender = uploader_actor.connect_client(box_builder.get_sender().sender_clone());
let uploader = ClientMessageBox::new(uploader_actor);

fs_notify.connect_sink(
ConfigManagerBuilder::watched_directory(&config),
Expand All @@ -78,8 +77,8 @@ impl ConfigManagerBuilder {
config,
plugin_config,
box_builder,
download_sender,
upload_sender,
downloader,
uploader,
})
}

Expand Down Expand Up @@ -189,8 +188,8 @@ impl Builder<ConfigManagerActor> for ConfigManagerBuilder {
self.plugin_config,
input_receiver,
output_sender,
self.download_sender,
self.upload_sender,
self.downloader,
self.uploader,
))
}
}
Expand Down
Loading

0 comments on commit b2c753c

Please sign in to comment.