From 8acc1050bb8f7c8a9e10a4ceffac19ba1ac6f4ea Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Wed, 22 May 2024 10:57:44 +0000 Subject: [PATCH 1/3] fix(#2835): Include software plugin output in workflow log --- Cargo.lock | 4 + crates/common/logged_command/Cargo.toml | 3 + crates/common/logged_command/src/lib.rs | 1 + .../logged_command/src/logged_command.rs | 252 +++++++++++++++--- crates/core/plugin_sm/Cargo.toml | 1 + crates/core/plugin_sm/src/log_file.rs | 12 +- crates/core/plugin_sm/src/operation_logs.rs | 16 +- crates/core/plugin_sm/src/plugin.rs | 138 +++++----- crates/core/plugin_sm/src/plugin_manager.rs | 31 ++- .../tedge_agent/src/software_manager/actor.rs | 55 ++-- .../src/tedge_operation_converter/actor.rs | 149 +---------- .../src/tedge_operation_converter/tests.rs | 20 +- crates/core/tedge_api/src/commands.rs | 22 +- .../c8y_mapper_ext/src/converter.rs | 15 +- .../software_management/software.robot | 7 + 15 files changed, 408 insertions(+), 318 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2dbadba73a7..b7298ff1df8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1859,10 +1859,13 @@ name = "logged_command" version = "1.1.1" dependencies = [ "anyhow", + "camino", "log", "nix", "shell-words", + "tedge_api", "tedge_test_utils", + "time", "tokio", ] @@ -2493,6 +2496,7 @@ version = "1.1.1" dependencies = [ "anyhow", "async-trait", + "camino", "csv", "download", "logged_command", diff --git a/crates/common/logged_command/Cargo.toml b/crates/common/logged_command/Cargo.toml index ba18814191d..1ef72d0bc34 100644 --- a/crates/common/logged_command/Cargo.toml +++ b/crates/common/logged_command/Cargo.toml @@ -9,9 +9,12 @@ homepage = { workspace = true } repository = { workspace = true } [dependencies] +camino = { workspace = true } log = { workspace = true } nix = { workspace = true } shell-words = { workspace = true } +tedge_api = { workspace = true } +time = { workspace = true, features = ["formatting"] } tokio = { workspace = true, features = [ "fs", "io-util", diff --git a/crates/common/logged_command/src/lib.rs b/crates/common/logged_command/src/lib.rs index f1a0813f33d..824cc2f416d 100644 --- a/crates/common/logged_command/src/lib.rs +++ b/crates/common/logged_command/src/lib.rs @@ -1,4 +1,5 @@ mod logged_command; +pub use crate::logged_command::CommandLog; pub use crate::logged_command::LoggedCommand; pub use crate::logged_command::LoggingChild; diff --git a/crates/common/logged_command/src/logged_command.rs b/crates/common/logged_command/src/logged_command.rs index e76ec94ec44..60c023dd6f8 100644 --- a/crates/common/logged_command/src/logged_command.rs +++ b/crates/common/logged_command/src/logged_command.rs @@ -1,3 +1,5 @@ +use camino::Utf8Path; +use camino::Utf8PathBuf; use log::error; use nix::unistd::Pid; use std::ffi::OsStr; @@ -5,6 +7,12 @@ use std::os::unix::process::ExitStatusExt; use std::process::Output; use std::process::Stdio; use std::time::Duration; +use tedge_api::workflow::CommandId; +use tedge_api::workflow::GenericCommandState; +use tedge_api::workflow::OperationAction; +use tedge_api::workflow::OperationName; +use time::format_description; +use time::OffsetDateTime; use tokio::fs::File; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; @@ -27,7 +35,7 @@ pub struct LoggingChild { impl LoggingChild { pub async fn wait_for_output_with_timeout( self, - logger: &mut BufWriter, + command_log: &mut CommandLog, graceful_timeout: Duration, forceful_timeout: Duration, ) -> Result { @@ -36,7 +44,7 @@ impl LoggingChild { let mut status = CmdStatus::Successful; tokio::select! { outcome = self.inner_child.wait_with_output() => { - Self::update_and_log_outcome(cmd_line, outcome, logger, graceful_timeout, &status).await + Self::update_and_log_outcome(cmd_line, outcome, command_log, graceful_timeout, &status).await } _ = Self::timeout_operation(&mut status, cid, graceful_timeout, forceful_timeout) => { Err(std::io::Error::new(std::io::ErrorKind::Other,"failed to kill the process: {cmd_line}")) @@ -46,20 +54,21 @@ impl LoggingChild { pub async fn wait_with_output( self, - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, ) -> Result { let outcome = self.inner_child.wait_with_output().await; - if let Err(err) = LoggedCommand::log_outcome(&self.command_line, &outcome, logger).await { - error!("Fail to log the command execution: {}", err); + if let Some(command_log) = command_log { + command_log + .log_command_and_output(&self.command_line, &outcome) + .await; } - outcome } async fn update_and_log_outcome( command_line: String, outcome: Result, - logger: &mut BufWriter, + command_log: &mut CommandLog, timeout: Duration, status: &CmdStatus, ) -> Result { @@ -69,9 +78,9 @@ impl LoggingChild { outcome.map(|outcome| update_stderr_message(outcome, timeout))? } }; - if let Err(err) = LoggedCommand::log_outcome(&command_line, &outcome, logger).await { - error!("Fail to log the command execution: {}", err); - } + command_log + .log_command_and_output(&command_line, &outcome) + .await; outcome } @@ -195,13 +204,16 @@ impl LoggedCommand { /// /// If the function fails to log the execution of the command, /// this is logged with `log::error!` without changing the return value. - pub async fn execute(mut self, logger: &mut BufWriter) -> Result { + pub async fn execute( + mut self, + command_log: Option<&mut CommandLog>, + ) -> Result { let outcome = self.command.output().await; - - if let Err(err) = LoggedCommand::log_outcome(&self.to_string(), &outcome, logger).await { - error!("Fail to log the command execution: {}", err); + if let Some(command_log) = command_log { + command_log + .log_command_and_output(&self.to_string(), &outcome) + .await; } - outcome } @@ -295,28 +307,200 @@ impl From for LoggedCommand { } } +/// Log all command steps +pub struct CommandLog { + /// Path to the command log file + pub path: Utf8PathBuf, + + /// operation name + pub operation: OperationName, + + /// the chain of operations leading to this command + pub invoking_operations: Vec, + + /// command id + pub cmd_id: CommandId, + + /// The log file of the root command invoking this command + /// + /// None, if not open yet. + pub file: Option, +} + +impl CommandLog { + pub fn new( + log_dir: Utf8PathBuf, + operation: OperationName, + cmd_id: CommandId, + invoking_operations: Vec, + root_operation: Option, + root_cmd_id: Option, + ) -> Self { + let root_operation = root_operation.unwrap_or(operation.clone()); + let root_cmd_id = root_cmd_id.unwrap_or(cmd_id.clone()); + + let path = log_dir.join(format!("workflow-{}-{}.log", root_operation, root_cmd_id)); + CommandLog { + path, + operation, + invoking_operations, + cmd_id, + file: None, + } + } + + pub fn from_log_path( + path: impl AsRef, + operation: OperationName, + cmd_id: CommandId, + ) -> Self { + Self { + path: path.as_ref().into(), + operation, + cmd_id, + invoking_operations: vec![], + file: None, + } + } + + pub async fn open(&mut self) -> Result<&mut File, std::io::Error> { + if self.file.is_none() { + self.file = Some( + File::options() + .append(true) + .create(true) + .open(self.path.clone()) + .await?, + ); + } + Ok(self.file.as_mut().unwrap()) + } + + pub async fn log_header(&mut self, topic: &str) { + let now = OffsetDateTime::now_utc() + .format(&format_description::well_known::Rfc3339) + .unwrap(); + let cmd_id = &self.cmd_id; + let operation = &self.operation; + let header_message = format!( + r#" +================================================================== +Triggered {operation} workflow +================================================================== + +topic: {topic} +operation: {operation} +cmd_id: {cmd_id} +time: {now} + +================================================================== +"# + ); + if let Err(err) = self.write(&header_message).await { + error!("Fail to log to {}: {err}", self.path) + } + } + + pub async fn log_state_action( + &mut self, + state: &GenericCommandState, + action: &OperationAction, + ) { + if state.is_init() && self.invoking_operations.is_empty() { + self.log_header(state.topic.name.as_str()).await; + } + let step = &state.status; + let state = &state.payload.to_string(); + let message = format!( + r#" +State: {state} + +Action: {action} +"# + ); + self.log_step(step, &message).await + } + + pub async fn log_step(&mut self, step: &str, action: &str) { + let now = OffsetDateTime::now_utc() + .format(&format_description::well_known::Rfc3339) + .unwrap(); + let operation = &self.operation; + let parent_operation = if self.invoking_operations.is_empty() { + operation.to_string() + } else { + format!("{} > {}", self.invoking_operations.join(" > "), operation) + }; + + let message = format!( + r#" +----------------------[ {parent_operation} @ {step} | time={now} ]---------------------- +{action} +"# + ); + if let Err(err) = self.write(&message).await { + error!("Fail to log to {}: {err}", self.path) + } + } + + pub async fn log_script_output(&mut self, result: &Result) { + self.log_command_and_output("", result).await + } + + pub async fn log_command_and_output( + &mut self, + command_line: &str, + result: &Result, + ) { + if let Err(err) = self.write_script_output(command_line, result).await { + error!("Fail to log to {}: {err}", self.path) + } + } + + pub async fn write_script_output( + &mut self, + command_line: &str, + result: &Result, + ) -> Result<(), std::io::Error> { + let file = self.open().await?; + let mut writer = BufWriter::new(file); + LoggedCommand::log_outcome(command_line, result, &mut writer).await?; + Ok(()) + } + + pub async fn write(&mut self, message: impl AsRef<[u8]>) -> Result<(), std::io::Error> { + let file = self.open().await?; + file.write_all(message.as_ref()).await?; + file.flush().await?; + file.sync_all().await?; + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*; use tedge_test_utils::fs::TempTedgeDir; - use tokio::fs::File; #[tokio::test] async fn on_execute_are_logged_command_line_exit_status_stdout_and_stderr( ) -> Result<(), anyhow::Error> { // Prepare a log file let tmp_dir = TempTedgeDir::new(); - let tmp_file = tmp_dir.file("operation.log"); - let log_file_path = tmp_file.path(); - let log_file = File::create(&log_file_path).await?; - let mut logger = BufWriter::new(log_file); + let workflow_log = tmp_dir.file("workflow.log"); + let log_file_path = workflow_log.path(); + let mut command_log = CommandLog::from_log_path( + workflow_log.utf8_path(), + "software_update".into(), + "123".into(), + ); // Prepare a command let mut command = LoggedCommand::new("echo").unwrap(); command.arg("Hello").arg("World!"); // Execute the command with logging - let _ = command.execute(&mut logger).await; + let _ = command.execute(Some(&mut command_log)).await; let log_content = String::from_utf8(std::fs::read(log_file_path)?)?; assert_eq!( @@ -338,17 +522,20 @@ EOF async fn on_execute_with_error_stderr_is_logged() -> Result<(), anyhow::Error> { // Prepare a log file let tmp_dir = TempTedgeDir::new(); - let tmp_file = tmp_dir.file("operation.log"); - let log_file_path = tmp_file.path(); - let log_file = File::create(&log_file_path).await?; - let mut logger = BufWriter::new(log_file); + let workflow_log = tmp_dir.file("workflow.log"); + let log_file_path = workflow_log.path(); + let mut command_log = CommandLog::from_log_path( + workflow_log.utf8_path(), + "software_update".into(), + "123".into(), + ); // Prepare a command that triggers some content on stderr let mut command = LoggedCommand::new("ls").unwrap(); command.arg("dummy-file"); // Execute the command with logging - let _ = command.execute(&mut logger).await; + let _ = command.execute(Some(&mut command_log)).await; // On expect the errors to be logged let log_content = String::from_utf8(std::fs::read(log_file_path)?)?; @@ -385,16 +572,19 @@ stdout (EMPTY) async fn on_execution_error_are_logged_command_line_and_error() -> Result<(), anyhow::Error> { // Prepare a log file let tmp_dir = TempTedgeDir::new(); - let tmp_file = tmp_dir.file("operation.log"); - let log_file_path = tmp_file.path(); - let log_file = File::create(&log_file_path).await?; - let mut logger = BufWriter::new(log_file); + let workflow_log = tmp_dir.file("workflow.log"); + let log_file_path = workflow_log.path(); + let mut command_log = CommandLog::from_log_path( + workflow_log.utf8_path(), + "software_update".into(), + "123".into(), + ); // Prepare a command that cannot be executed let command = LoggedCommand::new("dummy-command").unwrap(); // Execute the command with logging - let _ = command.execute(&mut logger).await; + let _ = command.execute(Some(&mut command_log)).await; // The fact that the command cannot be executed must be logged let log_content = String::from_utf8(std::fs::read(log_file_path)?)?; diff --git a/crates/core/plugin_sm/Cargo.toml b/crates/core/plugin_sm/Cargo.toml index 782aca504fd..16ce982e923 100644 --- a/crates/core/plugin_sm/Cargo.toml +++ b/crates/core/plugin_sm/Cargo.toml @@ -11,6 +11,7 @@ repository = { workspace = true } [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +camino = { workspace = true } csv = { workspace = true } download = { workspace = true } logged_command = { workspace = true } diff --git a/crates/core/plugin_sm/src/log_file.rs b/crates/core/plugin_sm/src/log_file.rs index 9f459e9a6ff..76624defa6b 100644 --- a/crates/core/plugin_sm/src/log_file.rs +++ b/crates/core/plugin_sm/src/log_file.rs @@ -1,22 +1,22 @@ -use std::path::Path; -use std::path::PathBuf; +use camino::Utf8Path; +use camino::Utf8PathBuf; use tokio::fs::File; use tokio::io::BufWriter; pub struct LogFile { - path: PathBuf, + path: Utf8PathBuf, buffer: BufWriter, } impl LogFile { - pub async fn try_new(path: PathBuf) -> Result { - let file = File::create(path.clone()).await?; + pub async fn try_new(path: Utf8PathBuf) -> Result { + let file = File::create(&path).await?; let buffer = BufWriter::new(file); Ok(LogFile { path, buffer }) } - pub fn path(&self) -> &Path { + pub fn path(&self) -> &Utf8Path { &self.path } diff --git a/crates/core/plugin_sm/src/operation_logs.rs b/crates/core/plugin_sm/src/operation_logs.rs index fd677ff010e..0f5ad92383d 100644 --- a/crates/core/plugin_sm/src/operation_logs.rs +++ b/crates/core/plugin_sm/src/operation_logs.rs @@ -1,8 +1,8 @@ +use camino::Utf8PathBuf; use std::cmp::Reverse; use std::collections::BinaryHeap; use std::collections::HashMap; use std::path::Path; -use std::path::PathBuf; use time::format_description; use time::OffsetDateTime; use tracing::log; @@ -23,7 +23,7 @@ pub enum OperationLogsError { #[derive(Debug)] pub struct OperationLogs { - pub log_dir: PathBuf, + pub log_dir: Utf8PathBuf, } pub enum LogKind { @@ -36,7 +36,7 @@ const UPDATE_PREFIX: &str = "software-update"; const LIST_PREFIX: &str = "software-list"; impl OperationLogs { - pub fn try_new(log_dir: PathBuf) -> Result { + pub fn try_new(log_dir: Utf8PathBuf) -> Result { std::fs::create_dir_all(log_dir.clone())?; let operation_logs = OperationLogs { log_dir }; @@ -111,13 +111,13 @@ impl OperationLogs { fn remove_old_logs( log_tracker: &mut BinaryHeap>, - dir_path: &Path, + dir_path: impl AsRef, n: usize, ) -> Result<(), OperationLogsError> { while log_tracker.len() > n { if let Some(rname) = log_tracker.pop() { let name = rname.0; - let path = dir_path.join(name.clone()); + let path = dir_path.as_ref().join(name.clone()); if let Err(err) = std::fs::remove_file(path) { log::warn!("Fail to remove out-dated log file {} : {}", name, err); } @@ -131,6 +131,7 @@ mod tests { use super::*; use std::fs::File; use std::path::Path; + use std::path::PathBuf; use tempfile::TempDir; #[tokio::test] @@ -151,7 +152,7 @@ mod tests { let unrelated_2 = create_file(log_dir.path(), "bar"); // Open the log dir - let _operation_logs = OperationLogs::try_new(log_dir.path().to_path_buf())?; + let _operation_logs = OperationLogs::try_new(log_dir.into_path().try_into().unwrap())?; // Outdated logs are removed assert!(!update_log_1.exists()); @@ -179,7 +180,8 @@ mod tests { async fn on_new_log_keep_the_latest_logs_plus_the_new_one() -> Result<(), anyhow::Error> { // Create a log dir let log_dir = TempDir::new()?; - let operation_logs = OperationLogs::try_new(log_dir.path().to_path_buf())?; + let operation_logs = + OperationLogs::try_new(log_dir.path().to_path_buf().try_into().unwrap())?; // Add a bunch of fake log files let swlist_log_1 = create_file(log_dir.path(), "software-list-1996-02-22T16:39:57z"); diff --git a/crates/core/plugin_sm/src/plugin.rs b/crates/core/plugin_sm/src/plugin.rs index 54cd7c5cfc1..b30344a1852 100644 --- a/crates/core/plugin_sm/src/plugin.rs +++ b/crates/core/plugin_sm/src/plugin.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use csv::ReaderBuilder; use download::Downloader; +use logged_command::CommandLog; use logged_command::LoggedCommand; use regex::Regex; use reqwest::Identity; @@ -11,50 +12,48 @@ use std::path::PathBuf; use std::process::Output; use tedge_api::*; use tedge_config::SudoCommandBuilder; -use tokio::fs::File; use tokio::io::AsyncWriteExt; -use tokio::io::BufWriter; use tracing::error; #[async_trait] pub trait Plugin { - async fn prepare(&self, logger: &mut BufWriter) -> Result<(), SoftwareError>; + async fn prepare(&self, command_log: Option<&mut CommandLog>) -> Result<(), SoftwareError>; async fn install( &self, module: &SoftwareModule, - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, ) -> Result<(), SoftwareError>; async fn remove( &self, module: &SoftwareModule, - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, ) -> Result<(), SoftwareError>; async fn update_list( &self, modules: &[SoftwareModuleUpdate], - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, ) -> Result<(), SoftwareError>; - async fn finalize(&self, logger: &mut BufWriter) -> Result<(), SoftwareError>; + async fn finalize(&self, command_log: Option<&mut CommandLog>) -> Result<(), SoftwareError>; async fn list( &self, - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, ) -> Result, SoftwareError>; async fn version( &self, module: &SoftwareModule, - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, ) -> Result, SoftwareError>; async fn apply( &self, update: &SoftwareModuleUpdate, - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, download_path: &Path, ) -> Result<(), SoftwareError> { match update.clone() { @@ -65,18 +64,18 @@ pub trait Plugin { self.install_from_url( &mut module, &url, - logger, + command_log, download_path, self.identity(), ) .await? } - None => self.install(&module, logger).await?, + None => self.install(&module, command_log).await?, } Ok(()) } - SoftwareModuleUpdate::Remove { module } => self.remove(&module, logger).await, + SoftwareModuleUpdate::Remove { module } => self.remove(&module, command_log).await, } } @@ -85,13 +84,13 @@ pub trait Plugin { async fn apply_all( &self, mut updates: Vec, - logger: &mut BufWriter, + mut command_log: Option<&mut CommandLog>, download_path: &Path, ) -> Vec { let mut failed_updates = Vec::new(); // Prepare the updates - if let Err(prepare_error) = self.prepare(logger).await { + if let Err(prepare_error) = self.prepare(command_log.as_deref_mut()).await { failed_updates.push(prepare_error); return failed_updates; } @@ -105,8 +104,14 @@ pub trait Plugin { }; let module_url = module.url.clone(); if let Some(url) = module_url { - match Self::download_from_url(module, &url, logger, download_path, self.identity()) - .await + match Self::download_from_url( + module, + &url, + command_log.as_deref_mut(), + download_path, + self.identity(), + ) + .await { Err(prepare_error) => { failed_updates.push(prepare_error); @@ -119,10 +124,13 @@ pub trait Plugin { // Execute the updates if failed_updates.is_empty() { - let outcome = self.update_list(&updates, logger).await; + let outcome = self.update_list(&updates, command_log.as_deref_mut()).await; if let Err(SoftwareError::UpdateListNotSupported(_)) = outcome { for update in updates.iter() { - if let Err(error) = self.apply(update, logger, download_path).await { + if let Err(error) = self + .apply(update, command_log.as_deref_mut(), download_path) + .await + { failed_updates.push(error); }; } @@ -132,13 +140,14 @@ pub trait Plugin { } // Finalize the updates - if let Err(finalize_error) = self.finalize(logger).await { + if let Err(finalize_error) = self.finalize(command_log.as_deref_mut()).await { failed_updates.push(finalize_error); } // Cleanup all the downloaded modules for downloader in downloaders { - if let Err(cleanup_error) = Self::cleanup_downloaded_artefacts(downloader, logger).await + if let Err(cleanup_error) = + Self::cleanup_downloaded_artefacts(downloader, command_log.as_deref_mut()).await { failed_updates.push(cleanup_error); } @@ -151,14 +160,20 @@ pub trait Plugin { &self, module: &mut SoftwareModule, url: &DownloadInfo, - logger: &mut BufWriter, + mut command_log: Option<&mut CommandLog>, download_path: &Path, identity: Option<&Identity>, ) -> Result<(), SoftwareError> { - let downloader = - Self::download_from_url(module, url, logger, download_path, identity).await?; - let result = self.install(module, logger).await; - Self::cleanup_downloaded_artefacts(downloader, logger).await?; + let downloader = Self::download_from_url( + module, + url, + command_log.as_deref_mut(), + download_path, + identity, + ) + .await?; + let result = self.install(module, command_log.as_deref_mut()).await; + Self::cleanup_downloaded_artefacts(downloader, command_log).await?; result } @@ -166,24 +181,25 @@ pub trait Plugin { async fn download_from_url( module: &mut SoftwareModule, url: &DownloadInfo, - logger: &mut BufWriter, + mut command_log: Option<&mut CommandLog>, download_path: &Path, identity: Option<&Identity>, ) -> Result { let sm_path = sm_path(&module.name, &module.version, download_path); let downloader = Downloader::new(sm_path, identity.map(|id| id.to_owned())); - logger - .write_all( - format!( - "----- $ Downloading: {} to {} \n", - &url.url(), - &downloader.filename().to_string_lossy().to_string() + if let Some(ref mut logger) = command_log { + logger + .write( + format!( + "----- $ Downloading: {} to {} \n", + &url.url(), + &downloader.filename().to_string_lossy().to_string() + ) + .as_bytes(), ) - .as_bytes(), - ) - .await?; - logger.flush().await?; + .await?; + } if let Err(err) = downloader @@ -196,9 +212,11 @@ pub trait Plugin { }) { error!("Download error: {err:#?}"); - logger - .write_all(format!("error: {}\n", &err).as_bytes()) - .await?; + if let Some(ref mut logger) = command_log { + logger + .write(format!("error: {}\n", &err).as_bytes()) + .await?; + } return Err(err); } @@ -209,7 +227,7 @@ pub trait Plugin { async fn cleanup_downloaded_artefacts( downloader: Downloader, - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, ) -> Result<(), SoftwareError> { if let Err(err) = downloader .cleanup() @@ -218,9 +236,9 @@ pub trait Plugin { reason: err.to_string(), }) { - logger - .write_all(format!("warn: {}\n", &err).as_bytes()) - .await?; + if let Some(logger) = command_log { + logger.write(format!("warn: {}\n", &err).as_bytes()).await?; + } } Ok(()) } @@ -296,10 +314,10 @@ impl ExternalPluginCommand { pub async fn execute( &self, command: LoggedCommand, - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, ) -> Result { let output = command - .execute(logger) + .execute(command_log) .await .map_err(|err| self.plugin_error(err))?; Ok(output) @@ -340,9 +358,9 @@ const VERSION: &str = "version"; #[async_trait] impl Plugin for ExternalPluginCommand { - async fn prepare(&self, logger: &mut BufWriter) -> Result<(), SoftwareError> { + async fn prepare(&self, command_log: Option<&mut CommandLog>) -> Result<(), SoftwareError> { let command = self.command(PREPARE, None)?; - let output = self.execute(command, logger).await?; + let output = self.execute(command, command_log).await?; if output.status.success() { Ok(()) @@ -357,10 +375,10 @@ impl Plugin for ExternalPluginCommand { async fn install( &self, module: &SoftwareModule, - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, ) -> Result<(), SoftwareError> { let command = self.command(INSTALL, Some(module))?; - let output = self.execute(command, logger).await?; + let output = self.execute(command, command_log).await?; if output.status.success() { Ok(()) @@ -375,10 +393,10 @@ impl Plugin for ExternalPluginCommand { async fn remove( &self, module: &SoftwareModule, - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, ) -> Result<(), SoftwareError> { let command = self.command(REMOVE, Some(module))?; - let output = self.execute(command, logger).await?; + let output = self.execute(command, command_log).await?; if output.status.success() { Ok(()) @@ -393,7 +411,7 @@ impl Plugin for ExternalPluginCommand { async fn update_list( &self, updates: &[SoftwareModuleUpdate], - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, ) -> Result<(), SoftwareError> { let mut command = self.command(UPDATE_LIST, None)?; @@ -432,7 +450,7 @@ impl Plugin for ExternalPluginCommand { child_stdin.write_all(action.as_bytes()).await? } - let output = child.wait_with_output(logger).await?; + let output = child.wait_with_output(command_log).await?; match output.status.code() { Some(0) => Ok(()), Some(1) => Err(SoftwareError::UpdateListNotSupported(self.name.clone())), @@ -447,9 +465,9 @@ impl Plugin for ExternalPluginCommand { } } - async fn finalize(&self, logger: &mut BufWriter) -> Result<(), SoftwareError> { + async fn finalize(&self, command_log: Option<&mut CommandLog>) -> Result<(), SoftwareError> { let command = self.command(FINALIZE, None)?; - let output = self.execute(command, logger).await?; + let output = self.execute(command, command_log).await?; if output.status.success() { Ok(()) @@ -463,10 +481,10 @@ impl Plugin for ExternalPluginCommand { async fn list( &self, - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, ) -> Result, SoftwareError> { let command = self.command(LIST, None)?; - let output = self.execute(command, logger).await?; + let output = self.execute(command, command_log).await?; if output.status.success() { let filtered_output = match (&self.exclude, &self.include) { (None, None) => output.stdout, @@ -525,10 +543,10 @@ impl Plugin for ExternalPluginCommand { async fn version( &self, module: &SoftwareModule, - logger: &mut BufWriter, + command_log: Option<&mut CommandLog>, ) -> Result, SoftwareError> { let command = self.command(VERSION, Some(module))?; - let output = self.execute(command, logger).await?; + let output = self.execute(command, command_log).await?; if output.status.success() { let version = String::from(self.content(output.stdout)?.trim()); diff --git a/crates/core/plugin_sm/src/plugin_manager.rs b/crates/core/plugin_sm/src/plugin_manager.rs index dfba63af608..a9094acaf68 100644 --- a/crates/core/plugin_sm/src/plugin_manager.rs +++ b/crates/core/plugin_sm/src/plugin_manager.rs @@ -1,7 +1,7 @@ -use crate::log_file::LogFile; use crate::plugin::ExternalPluginCommand; use crate::plugin::Plugin; use crate::plugin::LIST; +use logged_command::CommandLog; use std::collections::BTreeMap; use std::fs; use std::io::ErrorKind; @@ -227,16 +227,15 @@ impl ExternalPlugins { pub async fn list( &self, mut response: SoftwareListCommand, - mut log_file: LogFile, + mut command_log: Option, ) -> SoftwareListCommand { - let logger = log_file.buffer(); let mut error_count = 0; if self.plugin_map.is_empty() { response.add_modules("".into(), vec![]); } else { for (software_type, plugin) in self.plugin_map.iter() { - match plugin.list(logger).await { + match plugin.list(command_log.as_mut()).await { Ok(software_list) => response.add_modules(software_type.clone(), software_list), Err(_) => { error_count += 1; @@ -245,7 +244,7 @@ impl ExternalPlugins { } } - if let Some(reason) = ExternalPlugins::error_message(log_file.path(), error_count) { + if let Some(reason) = ExternalPlugins::error_message(error_count, command_log) { response.with_error(reason) } else { response.with_status(CommandStatus::Successful) @@ -255,17 +254,18 @@ impl ExternalPlugins { pub async fn process( &self, request: SoftwareUpdateCommand, - mut log_file: LogFile, + mut command_log: Option, download_path: &Path, ) -> SoftwareUpdateCommand { let mut response = request.clone().with_status(CommandStatus::Executing); - let logger = log_file.buffer(); let mut error_count = 0; for software_type in request.modules_types() { let errors = if let Some(plugin) = self.by_software_type(&software_type) { let updates = request.updates_for(&software_type); - plugin.apply_all(updates, logger, download_path).await + plugin + .apply_all(updates, command_log.as_mut(), download_path) + .await } else { vec![SoftwareError::UnknownSoftwareType { software_type: software_type.clone(), @@ -278,24 +278,23 @@ impl ExternalPlugins { } } - if let Some(reason) = ExternalPlugins::error_message(log_file.path(), error_count) { + if let Some(reason) = ExternalPlugins::error_message(error_count, command_log) { response.with_error(reason) } else { response.with_status(CommandStatus::Successful) } } - fn error_message(log_file: &Path, error_count: i32) -> Option { + fn error_message(error_count: i32, command_log: Option) -> Option { if error_count > 0 { let reason = if error_count == 1 { - format!("1 error, see device log file {}", log_file.display()) + "1 error".into() } else { - format!( - "{} errors, see device log file {}", - error_count, - log_file.display() - ) + format!("{} errors", error_count) }; + let reason = command_log + .map(|log| format!("{}, see device log file {}", reason, log.path)) + .unwrap_or(reason); Some(reason) } else { None diff --git a/crates/core/tedge_agent/src/software_manager/actor.rs b/crates/core/tedge_agent/src/software_manager/actor.rs index 7aef4689203..d390eb52749 100644 --- a/crates/core/tedge_agent/src/software_manager/actor.rs +++ b/crates/core/tedge_agent/src/software_manager/actor.rs @@ -6,8 +6,7 @@ use crate::state_repository::error::StateError; use crate::state_repository::state::AgentStateRepository; use anyhow::anyhow; use async_trait::async_trait; -use plugin_sm::operation_logs::LogKind; -use plugin_sm::operation_logs::OperationLogs; +use logged_command::CommandLog; use plugin_sm::plugin_manager::ExternalPlugins; use plugin_sm::plugin_manager::Plugins; use serde::Deserialize; @@ -101,9 +100,6 @@ impl Actor for SoftwareManagerActor { } async fn run(mut self) -> Result<(), RuntimeError> { - let operation_logs = OperationLogs::try_new(self.config.log_dir.clone().into()) - .map_err(SoftwareManagerError::FromOperationsLogs)?; - let mut plugins = ExternalPlugins::open( &self.config.sm_plugins_dir, self.config.default_plugin_type.clone(), @@ -137,7 +133,7 @@ impl Actor for SoftwareManagerActor { while let Some(request) = input_receiver.recv().await { tokio::select! { - _ = self.handle_request(request, &mut plugins, &operation_logs) => { + _ = self.handle_request(request, &mut plugins) => { if let Err(SoftwareManagerError::NotRunningLatestVersion) = Self::detect_self_update() { warn!("Tedge-agent is no more running the latest-version => a restart is required"); // Make sure the operation status is properly reported before the restart @@ -186,12 +182,11 @@ impl SoftwareManagerActor { &mut self, request: SoftwareCommand, plugins: &mut ExternalPlugins, - operation_logs: &OperationLogs, ) -> Result<(), SoftwareManagerError> { match request { SoftwareCommand::SoftwareUpdateCommand(request) => { match self - .handle_software_update_operation(request, plugins, operation_logs) + .handle_software_update_operation(request, plugins) .await { Ok(()) => {} @@ -199,10 +194,7 @@ impl SoftwareManagerActor { } } SoftwareCommand::SoftwareListCommand(request) => { - if let Err(err) = self - .handle_software_list_operation(request, plugins, operation_logs) - .await - { + if let Err(err) = self.handle_software_list_operation(request, plugins).await { error!("{:?}", err); } } @@ -245,7 +237,6 @@ impl SoftwareManagerActor { &mut self, request: SoftwareUpdateCommand, plugins: &mut ExternalPlugins, - operation_logs: &OperationLogs, ) -> Result<(), SoftwareManagerError> { if request.status() != CommandStatus::Scheduled { // Only handle commands in the scheduled state @@ -261,17 +252,16 @@ impl SoftwareManagerActor { let executing_response = request.clone().with_status(CommandStatus::Executing); self.output_sender.send(executing_response.into()).await?; - let response = match operation_logs.new_log_file(LogKind::SoftwareUpdate).await { - Ok(log_file) => { - plugins - .process(request, log_file, self.config.tmp_dir.as_std_path()) - .await - } - Err(err) => { - error!("{}", err); - request.with_error(format!("{}", err)) - } - }; + let command_log = request.payload.log_path.clone().map(|path| { + CommandLog::from_log_path( + path, + OperationType::SoftwareUpdate.to_string(), + request.cmd_id.clone(), + ) + }); + let response = plugins + .process(request, command_log, self.config.tmp_dir.as_std_path()) + .await; self.output_sender.send(response.into()).await?; self.state_repository.clear().await?; @@ -318,7 +308,6 @@ impl SoftwareManagerActor { &mut self, request: SoftwareListCommand, plugins: &ExternalPlugins, - operation_logs: &OperationLogs, ) -> Result<(), SoftwareManagerError> { if request.status() != CommandStatus::Scheduled { // Only handle commands in the scheduled state @@ -331,13 +320,15 @@ impl SoftwareManagerActor { let executing_response = request.clone().with_status(CommandStatus::Executing); self.output_sender.send(executing_response.into()).await?; - let response = match operation_logs.new_log_file(LogKind::SoftwareList).await { - Ok(log_file) => plugins.list(request, log_file).await, - Err(err) => { - error!("{}", err); - request.with_error(format!("{}", err)) - } - }; + let command_log = request.payload.log_path.clone().map(|path| { + CommandLog::from_log_path( + path, + OperationType::SoftwareList.to_string(), + request.cmd_id.clone(), + ) + }); + + let response = plugins.list(request, command_log).await; self.output_sender.send(response.into()).await?; self.state_repository.clear().await?; diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs b/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs index 67ae7fe021a..fd79a6e39c4 100644 --- a/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs +++ b/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use camino::Utf8PathBuf; use log::error; use log::info; +use logged_command::CommandLog; use std::process::Output; use std::time::Duration; use tedge_actors::fan_in_message_type; @@ -28,16 +29,11 @@ use tedge_api::workflow::GenericCommandMetadata; use tedge_api::workflow::GenericCommandState; use tedge_api::workflow::GenericStateUpdate; use tedge_api::workflow::OperationAction; -use tedge_api::workflow::OperationName; use tedge_api::workflow::WorkflowExecutionError; use tedge_api::workflow::WorkflowSupervisor; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; use tedge_script_ext::Execute; -use time::format_description; -use time::OffsetDateTime; -use tokio::fs::File; -use tokio::io::AsyncWriteExt; use tokio::time::sleep; /// A generic command state that is published by the [TedgeOperationConverterActor] @@ -446,146 +442,3 @@ enum CommandTopicError { #[error("Not a command topic")] InvalidCommandTopic, } - -/// Log all command steps -struct CommandLog { - /// Path to the command log file - path: Utf8PathBuf, - - /// operation name - operation: OperationName, - - /// the chain of operations leading to this command - invoking_operations: Vec, - - /// command id - cmd_id: CommandId, - - /// The log file of the root command invoking this command - /// - /// None, if not open yet. - file: Option, -} - -impl CommandLog { - pub fn new( - log_dir: Utf8PathBuf, - operation: OperationName, - cmd_id: CommandId, - invoking_operations: Vec, - root_operation: Option, - root_cmd_id: Option, - ) -> Self { - let root_operation = root_operation.unwrap_or(operation.clone()); - let root_cmd_id = root_cmd_id.unwrap_or(cmd_id.clone()); - - let path = log_dir.join(format!("workflow-{}-{}.log", root_operation, root_cmd_id)); - CommandLog { - path, - operation, - invoking_operations, - cmd_id, - file: None, - } - } - - async fn open(&mut self) -> Result<&mut File, std::io::Error> { - if self.file.is_none() { - self.file = Some( - File::options() - .append(true) - .create(true) - .open(self.path.clone()) - .await?, - ); - } - Ok(self.file.as_mut().unwrap()) - } - - async fn log_header(&mut self, topic: &str) { - let now = OffsetDateTime::now_utc() - .format(&format_description::well_known::Rfc3339) - .unwrap(); - let cmd_id = &self.cmd_id; - let operation = &self.operation; - let header_message = format!( - r#" -================================================================== -Triggered {operation} workflow -================================================================== - -topic: {topic} -operation: {operation} -cmd_id: {cmd_id} -time: {now} - -================================================================== -"# - ); - if let Err(err) = self.write(&header_message).await { - error!("Fail to log to {}: {err}", self.path) - } - } - - async fn log_state_action(&mut self, state: &GenericCommandState, action: &OperationAction) { - if state.is_init() && self.invoking_operations.is_empty() { - self.log_header(state.topic.name.as_str()).await; - } - let step = &state.status; - let state = &state.payload.to_string(); - let message = format!( - r#" -State: {state} - -Action: {action} -"# - ); - self.log_step(step, &message).await - } - - async fn log_step(&mut self, step: &str, action: &str) { - let now = OffsetDateTime::now_utc() - .format(&format_description::well_known::Rfc3339) - .unwrap(); - let operation = &self.operation; - let parent_operation = if self.invoking_operations.is_empty() { - operation.to_string() - } else { - format!("{} > {}", self.invoking_operations.join(" > "), operation) - }; - - let message = format!( - r#" -----------------------[ {parent_operation} @ {step} | time={now} ]---------------------- -{action} -"# - ); - if let Err(err) = self.write(&message).await { - error!("Fail to log to {}: {err}", self.path) - } - } - - async fn log_script_output(&mut self, result: &Result) { - if let Err(err) = self.write_script_output(result).await { - error!("Fail to log to {}: {err}", self.path) - } - } - - async fn write_script_output( - &mut self, - result: &Result, - ) -> Result<(), std::io::Error> { - let file = self.open().await?; - logged_command::LoggedCommand::log_outcome("", result, file).await?; - file.sync_all().await?; - Ok(()) - } - - async fn write(&mut self, message: &str) -> Result<(), std::io::Error> { - let file = self.open().await?; - file.write_all(message.as_bytes()).await?; - file.flush().await?; - file.sync_all().await?; - Ok(()) - } -} diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs b/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs index fa7ec0070fe..937a9cb0922 100644 --- a/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs +++ b/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs @@ -75,7 +75,9 @@ async fn convert_incoming_software_list_request() -> Result<(), DynError> { log_path: Some( tmp_dir .path() - .join("workflow-software_list-some-cmd-id.log"), + .join("workflow-software_list-some-cmd-id.log") + .try_into() + .unwrap(), ), }, }]) @@ -122,7 +124,13 @@ async fn convert_incoming_software_update_request() -> Result<(), DynError> { status: CommandStatus::Scheduled, update_list: vec![debian_list], failures: vec![], - log_path: Some(tmp_dir.path().join("workflow-software_update-1234.log")), + log_path: Some( + tmp_dir + .path() + .join("workflow-software_update-1234.log") + .try_into() + .unwrap(), + ), }, }]) .await; @@ -156,7 +164,13 @@ async fn convert_incoming_restart_request() -> Result<(), DynError> { cmd_id: "random".to_string(), payload: RestartCommandPayload { status: CommandStatus::Scheduled, - log_path: Some(tmp_dir.path().join("workflow-restart-random.log")), + log_path: Some( + tmp_dir + .path() + .join("workflow-restart-random.log") + .try_into() + .unwrap(), + ), }, }]) .await; diff --git a/crates/core/tedge_api/src/commands.rs b/crates/core/tedge_api/src/commands.rs index c5fac16c856..0d5f268b48a 100644 --- a/crates/core/tedge_api/src/commands.rs +++ b/crates/core/tedge_api/src/commands.rs @@ -7,6 +7,8 @@ use crate::mqtt_topics::OperationType; use crate::software::*; use crate::workflow::GenericCommandData; use crate::workflow::GenericCommandState; +use camino::Utf8Path; +use camino::Utf8PathBuf; use download::DownloadInfo; use log::error; use mqtt_channel::MqttError; @@ -18,8 +20,6 @@ use serde::Deserialize; use serde::Serialize; use serde_json::Value; use std::fmt; -use std::path::Path; -use std::path::PathBuf; use time::OffsetDateTime; /// A command instance with its target and its current state of execution @@ -350,7 +350,7 @@ pub struct SoftwareListCommandPayload { pub current_software_list: Vec, #[serde(default, skip_serializing_if = "Option::is_none")] - pub log_path: Option, + pub log_path: Option, } impl Jsonify for SoftwareListCommandPayload {} @@ -428,7 +428,7 @@ pub struct SoftwareUpdateCommandPayload { pub failures: Vec, #[serde(default, skip_serializing_if = "Option::is_none")] - pub log_path: Option, + pub log_path: Option, } impl Jsonify for SoftwareUpdateCommandPayload {} @@ -540,8 +540,8 @@ impl SoftwareUpdateCommand { }) } - pub fn set_log_path(&mut self, path: &Path) { - self.payload.log_path = Some(path.into()) + pub fn set_log_path(&mut self, path: impl AsRef) { + self.payload.log_path = Some(path.as_ref().into()) } } @@ -660,7 +660,7 @@ pub struct RestartCommandPayload { pub status: CommandStatus, #[serde(default, skip_serializing_if = "Option::is_none")] - pub log_path: Option, + pub log_path: Option, } impl RestartCommandPayload { @@ -777,7 +777,7 @@ pub struct LogUploadCmdPayload { pub search_text: Option, pub lines: usize, #[serde(default, skip_serializing_if = "Option::is_none")] - pub log_path: Option, + pub log_path: Option, } impl Jsonify for LogUploadCmdPayload {} @@ -828,7 +828,7 @@ pub struct ConfigSnapshotCmdPayload { #[serde(skip_serializing_if = "Option::is_none")] pub path: Option, #[serde(default, skip_serializing_if = "Option::is_none")] - pub log_path: Option, + pub log_path: Option, } impl Jsonify for ConfigSnapshotCmdPayload {} @@ -877,7 +877,7 @@ pub struct ConfigUpdateCmdPayload { #[serde(skip_serializing_if = "Option::is_none")] pub path: Option, #[serde(default, skip_serializing_if = "Option::is_none")] - pub log_path: Option, + pub log_path: Option, } impl Jsonify for ConfigUpdateCmdPayload {} @@ -928,7 +928,7 @@ pub struct FirmwareUpdateCmdPayload { pub name: String, pub version: String, #[serde(default, skip_serializing_if = "Option::is_none")] - pub log_path: Option, + pub log_path: Option, } impl Jsonify for FirmwareUpdateCmdPayload {} diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index b1e2db28789..3464062de4d 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -49,6 +49,7 @@ use c8y_auth_proxy::url::ProxyUrlGenerator; use c8y_http_proxy::handle::C8YHttpProxy; use c8y_http_proxy::messages::CreateEvent; use camino::Utf8Path; +use logged_command::CommandLog; use logged_command::LoggedCommand; use plugin_sm::operation_logs::OperationLogs; use plugin_sm::operation_logs::OperationLogsError; @@ -270,7 +271,7 @@ impl CumulocityConverter { let alarm_converter = AlarmConverter::new(); let log_dir = config.logs_path.join(TEDGE_AGENT_LOG_DIR); - let operation_logs = OperationLogs::try_new(log_dir.into())?; + let operation_logs = OperationLogs::try_new(log_dir)?; let c8y_endpoint = C8yEndPoint::new(&c8y_host, &device_id); @@ -816,6 +817,7 @@ impl CumulocityConverter { ) -> Result<(), CumulocityMapperError> { let command = command.to_owned(); let payload = payload.to_string(); + let cmd_id = self.command_id.new_id(); let mut logged = LoggedCommand::new(&command).map_err(|e| CumulocityMapperError::ExecuteFailed { @@ -835,12 +837,14 @@ impl CumulocityConverter { operation_name: operation_name.to_string(), }); - let mut log_file = self + let log_file = self .operation_logs .new_log_file(plugin_sm::operation_logs::LogKind::Operation( operation_name.to_string(), )) .await?; + let mut command_log = + CommandLog::from_log_path(log_file.path(), operation_name.clone(), cmd_id); match maybe_child_process { Ok(child_process) => { @@ -850,7 +854,6 @@ impl CumulocityConverter { tokio::spawn(async move { let op_name = op_name.as_str(); - let logger = log_file.buffer(); // mqtt client publishes executing let topic = C8yTopic::SmartRestResponse.to_topic(&c8y_prefix).unwrap(); @@ -865,7 +868,11 @@ impl CumulocityConverter { // execute the command and wait until it finishes // mqtt client publishes failed or successful depending on the exit code if let Ok(output) = child_process - .wait_for_output_with_timeout(logger, graceful_timeout, forceful_timeout) + .wait_for_output_with_timeout( + &mut command_log, + graceful_timeout, + forceful_timeout, + ) .await { match output.status.code() { diff --git a/tests/RobotFramework/tests/cumulocity/software_management/software.robot b/tests/RobotFramework/tests/cumulocity/software_management/software.robot index d69cf6fe986..d11fc329058 100644 --- a/tests/RobotFramework/tests/cumulocity/software_management/software.robot +++ b/tests/RobotFramework/tests/cumulocity/software_management/software.robot @@ -115,6 +115,13 @@ Operation log uploaded automatically with default auto_log_upload setting as nev Operation Should Be FAILED ${OPERATION} timeout=60 Validate operation log not uploaded +Workflow log includes plugin output + ${OPERATION}= Install Software c8y-remote-access-plugin + Operation Should Be SUCCESSFUL ${OPERATION} timeout=60 + ${operation_log_file}= Execute Command ls -t /var/log/tedge/agent/workflow-software_update-* | head -n 1 strip=${True} + ${log_output}= Execute Command cat ${operation_log_file} + Should Contain ${log_output} Executing command: "apt-get" "--quiet" "--yes" "update" + *** Keywords *** Custom Setup From 4fbf29f913643c0df8f2609caa6da749f54d0d2d Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Wed, 22 May 2024 13:48:14 +0000 Subject: [PATCH 2/3] Use workflow log paths for software-management log type in tedge-log-plugin.toml --- .../extensions/tedge_log_manager/src/lib.rs | 2 +- .../extensions/tedge_log_manager/src/tests.rs | 5 +++- crates/tests/tedge_test_utils/src/fs.rs | 2 +- docs/src/operate/c8y/log-management.md | 2 +- .../references/agent/tedge-log-management.md | 2 +- docs/src/start/getting-started.md | 2 +- .../tests/cumulocity/log/log_operation.robot | 25 +++++++++++++++++++ 7 files changed, 34 insertions(+), 6 deletions(-) diff --git a/crates/extensions/tedge_log_manager/src/lib.rs b/crates/extensions/tedge_log_manager/src/lib.rs index 012dfad5f30..f263ac05b39 100644 --- a/crates/extensions/tedge_log_manager/src/lib.rs +++ b/crates/extensions/tedge_log_manager/src/lib.rs @@ -106,7 +106,7 @@ impl LogManagerBuilder { } // creating tedge-log-plugin.toml - let agent_logs_path = format!("{}/agent/software-*", config.log_dir); + let agent_logs_path = format!("{}/agent/workflow-software_*", config.log_dir); let example_config = toml! { [[files]] type = "software-management" diff --git a/crates/extensions/tedge_log_manager/src/tests.rs b/crates/extensions/tedge_log_manager/src/tests.rs index dcb742e46a5..494dafbe0fd 100644 --- a/crates/extensions/tedge_log_manager/src/tests.rs +++ b/crates/extensions/tedge_log_manager/src/tests.rs @@ -149,7 +149,10 @@ async fn default_plugin_config() { read_to_string(tempdir.path().join("tedge-log-plugin.toml")).unwrap(); let plugin_config_toml: Table = from_str(&plugin_config_content).unwrap(); - let agent_logs_path = format!("{}/agent/software-*", tempdir.path().to_string_lossy()); + let agent_logs_path = format!( + "{}/agent/workflow-software_*", + tempdir.path().to_string_lossy() + ); let expected_config = toml! { [[files]] type = "software-management" diff --git a/crates/tests/tedge_test_utils/src/fs.rs b/crates/tests/tedge_test_utils/src/fs.rs index 0f4ad6a8215..322ca09ea1f 100644 --- a/crates/tests/tedge_test_utils/src/fs.rs +++ b/crates/tests/tedge_test_utils/src/fs.rs @@ -155,7 +155,7 @@ pub fn create_full_tedge_dir_structure() { .file("c8y-log-plugin.toml") .with_toml_content(toml::toml! { files = [ - {type = "software-management", path = "/var/log/tedge/agent/software-*" } + {type = "software-management", path = "/var/log/tedge/agent/workflow-software_*" } ] }); ttd.dir("contrib").dir("collectd").file("collectd.conf"); diff --git a/docs/src/operate/c8y/log-management.md b/docs/src/operate/c8y/log-management.md index 2161fa1a300..763500fa508 100644 --- a/docs/src/operate/c8y/log-management.md +++ b/docs/src/operate/c8y/log-management.md @@ -17,7 +17,7 @@ an example toml file would be: ```toml title="file: /etc/tedge/plugins/tedge-log-plugin.toml" files = [ - { type = "software-management", path = "/var/log/tedge/agent/software-*" }, + { type = "software-management", path = "/var/log/tedge/agent/workflow-software_*" }, { type = "mosquitto", path = "/var/log/mosquitto/mosquitto.log" } ] ``` diff --git a/docs/src/references/agent/tedge-log-management.md b/docs/src/references/agent/tedge-log-management.md index 878d3d3232a..1378bb1c065 100644 --- a/docs/src/references/agent/tedge-log-management.md +++ b/docs/src/references/agent/tedge-log-management.md @@ -31,7 +31,7 @@ The `type` given to these paths are used as the log type associated to a log pat ```toml title="file: /etc/tedge/plugins/tedge-log-plugin.toml" files = [ { type = "mosquitto", path = '/var/log/mosquitto/mosquitto.log' }, - { type = "software-management", path = '/var/log/tedge/agent/software-*' }, + { type = "software-management", path = '/var/log/tedge/agent/workflow-software_*' }, { type = "c8y_CustomOperation", path = '/var/log/tedge/agent/c8y_CustomOperation/*' } ] ``` diff --git a/docs/src/start/getting-started.md b/docs/src/start/getting-started.md index 7891a2e523b..e8645ade0f8 100644 --- a/docs/src/start/getting-started.md +++ b/docs/src/start/getting-started.md @@ -448,7 +448,7 @@ Log files can be added by creating or editing the following file with the given ```toml title="file: /etc/tedge/plugins/tedge-log-plugin.toml" files = [ - { type = "software-management", path = "/var/log/tedge/agent/software-*" }, + { type = "software-management", path = "/var/log/tedge/agent/workflow-software_*" }, { type = "mosquitto", path = "/var/log/mosquitto/mosquitto.log" }, { type = "daemon", path = "/var/log/daemon.log" }, { type = "user", path = "/var/log/user.log" }, diff --git a/tests/RobotFramework/tests/cumulocity/log/log_operation.robot b/tests/RobotFramework/tests/cumulocity/log/log_operation.robot index 6f9021a0f34..d9e2f6f90d4 100644 --- a/tests/RobotFramework/tests/cumulocity/log/log_operation.robot +++ b/tests/RobotFramework/tests/cumulocity/log/log_operation.robot @@ -128,6 +128,12 @@ Default plugin configuration Cumulocity.Set Device ${DEVICE_SN} Cumulocity.Should Support Log File Types software-management + ${start_timestamp}= Get Current Date UTC -24 hours result_format=%Y-%m-%dT%H:%M:%S+0000 + ${end_timestamp}= Get Current Date UTC +60 seconds result_format=%Y-%m-%dT%H:%M:%S+0000 + ${operation}= Create Log Request Operation ${start_timestamp} ${end_timestamp} log_type=software-management + ${operation}= Operation Should Be SUCCESSFUL ${operation} timeout=120 + Log Operation Attachment File Contains ${operation} expected_pattern=.*software_list @ successful + *** Keywords *** Setup LogFiles ThinEdgeIO.Transfer To Device ${CURDIR}/tedge-log-plugin.toml /etc/tedge/plugins/tedge-log-plugin.toml @@ -199,6 +205,25 @@ Log File Contents Should Be Equal ${event}= Cumulocity.Event Attachment Should Have File Info ${event_id} name=${expected_filename} mime_type=${expected_mime_type} RETURN ${contents} +Create Log Request Operation + [Arguments] ${start_timestamp} ${end_timestamp} ${log_type} ${search_text}=${EMPTY} ${maximum_lines}=1000 + ${start_timestamp}= Get Current Date UTC -24 hours result_format=%Y-%m-%dT%H:%M:%S+0000 + ${end_timestamp}= Get Current Date UTC +60 seconds result_format=%Y-%m-%dT%H:%M:%S+0000 + ${operation}= Cumulocity.Create Operation + ... description=Log file request + ... fragments={"c8y_LogfileRequest":{"dateFrom":"${start_timestamp}","dateTo":"${end_timestamp}","logFile":"${log_type}","searchText":"${search_text}","maximumLines":${maximum_lines}}} + RETURN ${operation} + +Log Operation Attachment File Contains + [Arguments] ${operation} ${expected_pattern} + ${event_url_parts}= Split String ${operation["c8y_LogfileRequest"]["file"]} separator=/ + ${event_id}= Set Variable ${event_url_parts}[-2] + ${contents}= Cumulocity.Event Should Have An Attachment + ... ${event_id} + ... expected_pattern=${expected_pattern} + ... encoding=utf-8 + + Disable log upload capability of tedge-agent [Arguments] ${device_sn}=${DEVICE_SN} Execute Command tedge config set agent.enable.log_upload false From a507d7a84008030012c2da373eba49b2945054e1 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Fri, 31 May 2024 10:50:31 +0000 Subject: [PATCH 3/3] Move command log structs into tedge_api --- Cargo.lock | 20 +- Cargo.toml | 1 - crates/common/logged_command/Cargo.toml | 32 - crates/common/logged_command/src/lib.rs | 5 - .../logged_command/src/logged_command.rs | 599 ------------------ crates/common/tedge_utils/src/signals.rs | 18 + crates/core/plugin_sm/Cargo.toml | 1 - crates/core/plugin_sm/src/plugin.rs | 11 +- crates/core/plugin_sm/src/plugin_manager.rs | 2 +- crates/core/tedge_agent/Cargo.toml | 1 - .../tedge_agent/src/software_manager/actor.rs | 2 +- .../src/tedge_operation_converter/actor.rs | 3 +- crates/core/tedge_api/Cargo.toml | 2 + crates/core/tedge_api/src/lib.rs | 2 + .../tedge_api/src/workflow/log/command_log.rs | 305 +++++++++ .../src/workflow/log/logged_command.rs | 287 +++++++++ crates/core/tedge_api/src/workflow/log/mod.rs | 2 + crates/core/tedge_api/src/workflow/mod.rs | 1 + .../core/tedge_api/src/workflow/supervisor.rs | 2 +- crates/extensions/c8y_mapper_ext/Cargo.toml | 1 - .../c8y_mapper_ext/src/converter.rs | 4 +- 21 files changed, 634 insertions(+), 667 deletions(-) delete mode 100644 crates/common/logged_command/Cargo.toml delete mode 100644 crates/common/logged_command/src/lib.rs delete mode 100644 crates/common/logged_command/src/logged_command.rs create mode 100644 crates/core/tedge_api/src/workflow/log/command_log.rs create mode 100644 crates/core/tedge_api/src/workflow/log/logged_command.rs create mode 100644 crates/core/tedge_api/src/workflow/log/mod.rs diff --git a/Cargo.lock b/Cargo.lock index b7298ff1df8..516d742e4d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -706,7 +706,6 @@ dependencies = [ "camino", "clock", "json-writer", - "logged_command", "mime", "plugin_sm", "proptest", @@ -1854,21 +1853,6 @@ dependencies = [ "toml 0.7.8", ] -[[package]] -name = "logged_command" -version = "1.1.1" -dependencies = [ - "anyhow", - "camino", - "log", - "nix", - "shell-words", - "tedge_api", - "tedge_test_utils", - "time", - "tokio", -] - [[package]] name = "mach2" version = "0.4.2" @@ -2499,7 +2483,6 @@ dependencies = [ "camino", "csv", "download", - "logged_command", "regex", "reqwest", "serde", @@ -3538,7 +3521,6 @@ dependencies = [ "hyper", "lazy_static", "log", - "logged_command", "path-clean", "plugin_sm", "rcgen", @@ -3693,11 +3675,13 @@ dependencies = [ "serde", "serde_json", "shell-words", + "tedge_test_utils", "tedge_utils", "tempfile", "test-case", "thiserror", "time", + "tokio", "toml 0.7.8", "walkdir", ] diff --git a/Cargo.toml b/Cargo.toml index fa9336cf378..32687c905e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,7 +84,6 @@ json-writer = { path = "crates/common/json_writer" } lazy_static = "1.4" log = "0.4" log_manager = { path = "crates/common/log_manager" } -logged_command = { path = "crates/common/logged_command" } maplit = "1.0" miette = { version = "5.5.0", features = ["fancy"] } mime = "0.3.17" diff --git a/crates/common/logged_command/Cargo.toml b/crates/common/logged_command/Cargo.toml deleted file mode 100644 index 1ef72d0bc34..00000000000 --- a/crates/common/logged_command/Cargo.toml +++ /dev/null @@ -1,32 +0,0 @@ -[package] -name = "logged_command" -version = { workspace = true } -authors = { workspace = true } -edition = { workspace = true } -rust-version = { workspace = true } -license = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } - -[dependencies] -camino = { workspace = true } -log = { workspace = true } -nix = { workspace = true } -shell-words = { workspace = true } -tedge_api = { workspace = true } -time = { workspace = true, features = ["formatting"] } -tokio = { workspace = true, features = [ - "fs", - "io-util", - "macros", - "process", - "rt", -] } - -[dev-dependencies] -anyhow = { workspace = true } -tedge_test_utils = { workspace = true } -tokio = { workspace = true, features = ["time"] } - -[lints] -workspace = true diff --git a/crates/common/logged_command/src/lib.rs b/crates/common/logged_command/src/lib.rs deleted file mode 100644 index 824cc2f416d..00000000000 --- a/crates/common/logged_command/src/lib.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod logged_command; - -pub use crate::logged_command::CommandLog; -pub use crate::logged_command::LoggedCommand; -pub use crate::logged_command::LoggingChild; diff --git a/crates/common/logged_command/src/logged_command.rs b/crates/common/logged_command/src/logged_command.rs deleted file mode 100644 index 60c023dd6f8..00000000000 --- a/crates/common/logged_command/src/logged_command.rs +++ /dev/null @@ -1,599 +0,0 @@ -use camino::Utf8Path; -use camino::Utf8PathBuf; -use log::error; -use nix::unistd::Pid; -use std::ffi::OsStr; -use std::os::unix::process::ExitStatusExt; -use std::process::Output; -use std::process::Stdio; -use std::time::Duration; -use tedge_api::workflow::CommandId; -use tedge_api::workflow::GenericCommandState; -use tedge_api::workflow::OperationAction; -use tedge_api::workflow::OperationName; -use time::format_description; -use time::OffsetDateTime; -use tokio::fs::File; -use tokio::io::AsyncWrite; -use tokio::io::AsyncWriteExt; -use tokio::io::BufWriter; -use tokio::process::Child; -use tokio::process::Command; - -#[derive(Debug)] -pub enum CmdStatus { - Successful, - KilledWithSigterm, - KilledWithSigKill, -} -#[derive(Debug)] -pub struct LoggingChild { - command_line: String, - pub inner_child: Child, -} - -impl LoggingChild { - pub async fn wait_for_output_with_timeout( - self, - command_log: &mut CommandLog, - graceful_timeout: Duration, - forceful_timeout: Duration, - ) -> Result { - let cid = self.inner_child.id(); - let cmd_line = self.command_line; - let mut status = CmdStatus::Successful; - tokio::select! { - outcome = self.inner_child.wait_with_output() => { - Self::update_and_log_outcome(cmd_line, outcome, command_log, graceful_timeout, &status).await - } - _ = Self::timeout_operation(&mut status, cid, graceful_timeout, forceful_timeout) => { - Err(std::io::Error::new(std::io::ErrorKind::Other,"failed to kill the process: {cmd_line}")) - } - } - } - - pub async fn wait_with_output( - self, - command_log: Option<&mut CommandLog>, - ) -> Result { - let outcome = self.inner_child.wait_with_output().await; - if let Some(command_log) = command_log { - command_log - .log_command_and_output(&self.command_line, &outcome) - .await; - } - outcome - } - - async fn update_and_log_outcome( - command_line: String, - outcome: Result, - command_log: &mut CommandLog, - timeout: Duration, - status: &CmdStatus, - ) -> Result { - let outcome = match status { - CmdStatus::Successful => outcome, - CmdStatus::KilledWithSigterm | CmdStatus::KilledWithSigKill => { - outcome.map(|outcome| update_stderr_message(outcome, timeout))? - } - }; - command_log - .log_command_and_output(&command_line, &outcome) - .await; - outcome - } - - async fn timeout_operation( - status: &mut CmdStatus, - child_id: Option, - graceful_timeout: Duration, - forceful_timeout: Duration, - ) -> Result<(), std::io::Error> { - *status = CmdStatus::Successful; - - tokio::time::sleep(graceful_timeout).await; - - // stop the child process by sending sigterm - *status = CmdStatus::KilledWithSigterm; - send_signal_to_stop_child(child_id, CmdStatus::KilledWithSigterm); - tokio::time::sleep(forceful_timeout).await; - - // stop the child process by sending sigkill - *status = CmdStatus::KilledWithSigKill; - send_signal_to_stop_child(child_id, CmdStatus::KilledWithSigKill); - - // wait for the process to exit after signal - tokio::time::sleep(Duration::from_secs(120)).await; - - Ok(()) - } -} - -fn update_stderr_message(mut output: Output, timeout: Duration) -> Result { - output.stderr.append( - &mut format!( - "operation failed due to timeout: duration={}s", - timeout.as_secs() - ) - .as_bytes() - .to_vec(), - ); - Ok(output) -} - -fn send_signal_to_stop_child(child: Option, signal_type: CmdStatus) { - if let Some(pid) = child { - let pid: Pid = nix::unistd::Pid::from_raw(pid as nix::libc::pid_t); - match signal_type { - CmdStatus::KilledWithSigterm => { - let _ = nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM); - } - CmdStatus::KilledWithSigKill => { - let _ = nix::sys::signal::kill(pid, nix::sys::signal::SIGKILL); - } - _ => {} - } - } -} - -/// A command which execution is logged. -/// -/// This struct wraps the main command with a nice representation of that command. -pub struct LoggedCommand { - command: Command, -} - -impl std::fmt::Display for LoggedCommand { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let command = self.command.as_std(); - - command.get_program().to_string_lossy().fmt(f)?; - for arg in command.get_args() { - // The arguments are displayed as debug, to be properly quoted and distinguished from each other. - write!(f, " {:?}", arg.to_string_lossy())?; - } - Ok(()) - } -} - -impl LoggedCommand { - /// Creates a new `LoggedCommand`. - /// - /// In contrast to [`std::process::Command`], `program` can contain space-separated arguments, - /// which will be properly parsed, split, and passed into `.args()` call for the underlying - /// command. - pub fn new(program: impl AsRef) -> Result { - let mut args = shell_words::split(&program.as_ref().to_string_lossy()) - .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?; - - let mut command = match args.len() { - 0 => { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "command line is empty.", - )) - } - 1 => Command::new(&args[0]), - _ => { - let mut command = Command::new(args.remove(0)); - command.args(&args); - command - } - }; - - command - // TODO: should use tmp from config - .current_dir("/tmp") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - Ok(LoggedCommand { command }) - } - - pub fn arg(&mut self, arg: impl AsRef) -> &mut LoggedCommand { - self.command.arg(arg); - self - } - - /// Execute the command and log its exit status, stdout and stderr - /// - /// If the command has been executed the outcome is returned (successful or not). - /// If the command fails to execute (say not found or not executable) an `std::io::Error` is returned. - /// - /// If the function fails to log the execution of the command, - /// this is logged with `log::error!` without changing the return value. - pub async fn execute( - mut self, - command_log: Option<&mut CommandLog>, - ) -> Result { - let outcome = self.command.output().await; - if let Some(command_log) = command_log { - command_log - .log_command_and_output(&self.to_string(), &outcome) - .await; - } - outcome - } - - pub fn spawn(&mut self) -> Result { - let child = self.command.spawn()?; - Ok(LoggingChild { - command_line: self.to_string(), - inner_child: child, - }) - } - - pub async fn log_outcome( - command_line: &str, - result: &Result, - logger: &mut (impl AsyncWrite + Unpin), - ) -> Result<(), std::io::Error> { - if !command_line.is_empty() { - logger - .write_all(format!("----- $ {}\n", command_line).as_bytes()) - .await?; - } - - match result.as_ref() { - Ok(output) => { - if let Some(code) = &output.status.code() { - let exit_code_msg = if *code == 0 { "OK" } else { "ERROR" }; - logger - .write_all(format!("Exit status: {code} ({exit_code_msg})\n\n").as_bytes()) - .await? - }; - if let Some(signal) = &output.status.signal() { - logger - .write_all(format!("Killed by signal: {signal}\n\n").as_bytes()) - .await? - } - // Log stderr then stdout, so the flow reads chronologically - // as the stderr is used for log messages and the stdout is used for results - if !output.stderr.is_empty() { - logger.write_all(b"stderr < { - logger - .write_all(format!("error: {}\n", &err).as_bytes()) - .await?; - } - } - - logger.flush().await?; - Ok(()) - } -} - -impl From for LoggedCommand { - fn from(mut command: Command) -> Self { - command - // TODO: should use tmp from config - .current_dir("/tmp") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - Self { command } - } -} - -impl From for LoggedCommand { - fn from(mut command: std::process::Command) -> Self { - command - // TODO: should use tmp from config - .current_dir("/tmp") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - Self { - command: tokio::process::Command::from(command), - } - } -} - -/// Log all command steps -pub struct CommandLog { - /// Path to the command log file - pub path: Utf8PathBuf, - - /// operation name - pub operation: OperationName, - - /// the chain of operations leading to this command - pub invoking_operations: Vec, - - /// command id - pub cmd_id: CommandId, - - /// The log file of the root command invoking this command - /// - /// None, if not open yet. - pub file: Option, -} - -impl CommandLog { - pub fn new( - log_dir: Utf8PathBuf, - operation: OperationName, - cmd_id: CommandId, - invoking_operations: Vec, - root_operation: Option, - root_cmd_id: Option, - ) -> Self { - let root_operation = root_operation.unwrap_or(operation.clone()); - let root_cmd_id = root_cmd_id.unwrap_or(cmd_id.clone()); - - let path = log_dir.join(format!("workflow-{}-{}.log", root_operation, root_cmd_id)); - CommandLog { - path, - operation, - invoking_operations, - cmd_id, - file: None, - } - } - - pub fn from_log_path( - path: impl AsRef, - operation: OperationName, - cmd_id: CommandId, - ) -> Self { - Self { - path: path.as_ref().into(), - operation, - cmd_id, - invoking_operations: vec![], - file: None, - } - } - - pub async fn open(&mut self) -> Result<&mut File, std::io::Error> { - if self.file.is_none() { - self.file = Some( - File::options() - .append(true) - .create(true) - .open(self.path.clone()) - .await?, - ); - } - Ok(self.file.as_mut().unwrap()) - } - - pub async fn log_header(&mut self, topic: &str) { - let now = OffsetDateTime::now_utc() - .format(&format_description::well_known::Rfc3339) - .unwrap(); - let cmd_id = &self.cmd_id; - let operation = &self.operation; - let header_message = format!( - r#" -================================================================== -Triggered {operation} workflow -================================================================== - -topic: {topic} -operation: {operation} -cmd_id: {cmd_id} -time: {now} - -================================================================== -"# - ); - if let Err(err) = self.write(&header_message).await { - error!("Fail to log to {}: {err}", self.path) - } - } - - pub async fn log_state_action( - &mut self, - state: &GenericCommandState, - action: &OperationAction, - ) { - if state.is_init() && self.invoking_operations.is_empty() { - self.log_header(state.topic.name.as_str()).await; - } - let step = &state.status; - let state = &state.payload.to_string(); - let message = format!( - r#" -State: {state} - -Action: {action} -"# - ); - self.log_step(step, &message).await - } - - pub async fn log_step(&mut self, step: &str, action: &str) { - let now = OffsetDateTime::now_utc() - .format(&format_description::well_known::Rfc3339) - .unwrap(); - let operation = &self.operation; - let parent_operation = if self.invoking_operations.is_empty() { - operation.to_string() - } else { - format!("{} > {}", self.invoking_operations.join(" > "), operation) - }; - - let message = format!( - r#" -----------------------[ {parent_operation} @ {step} | time={now} ]---------------------- -{action} -"# - ); - if let Err(err) = self.write(&message).await { - error!("Fail to log to {}: {err}", self.path) - } - } - - pub async fn log_script_output(&mut self, result: &Result) { - self.log_command_and_output("", result).await - } - - pub async fn log_command_and_output( - &mut self, - command_line: &str, - result: &Result, - ) { - if let Err(err) = self.write_script_output(command_line, result).await { - error!("Fail to log to {}: {err}", self.path) - } - } - - pub async fn write_script_output( - &mut self, - command_line: &str, - result: &Result, - ) -> Result<(), std::io::Error> { - let file = self.open().await?; - let mut writer = BufWriter::new(file); - LoggedCommand::log_outcome(command_line, result, &mut writer).await?; - Ok(()) - } - - pub async fn write(&mut self, message: impl AsRef<[u8]>) -> Result<(), std::io::Error> { - let file = self.open().await?; - file.write_all(message.as_ref()).await?; - file.flush().await?; - file.sync_all().await?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tedge_test_utils::fs::TempTedgeDir; - - #[tokio::test] - async fn on_execute_are_logged_command_line_exit_status_stdout_and_stderr( - ) -> Result<(), anyhow::Error> { - // Prepare a log file - let tmp_dir = TempTedgeDir::new(); - let workflow_log = tmp_dir.file("workflow.log"); - let log_file_path = workflow_log.path(); - let mut command_log = CommandLog::from_log_path( - workflow_log.utf8_path(), - "software_update".into(), - "123".into(), - ); - - // Prepare a command - let mut command = LoggedCommand::new("echo").unwrap(); - command.arg("Hello").arg("World!"); - - // Execute the command with logging - let _ = command.execute(Some(&mut command_log)).await; - - let log_content = String::from_utf8(std::fs::read(log_file_path)?)?; - assert_eq!( - log_content, - r#"----- $ echo "Hello" "World!" -Exit status: 0 (OK) - -stderr (EMPTY) - -stdout < Result<(), anyhow::Error> { - // Prepare a log file - let tmp_dir = TempTedgeDir::new(); - let workflow_log = tmp_dir.file("workflow.log"); - let log_file_path = workflow_log.path(); - let mut command_log = CommandLog::from_log_path( - workflow_log.utf8_path(), - "software_update".into(), - "123".into(), - ); - - // Prepare a command that triggers some content on stderr - let mut command = LoggedCommand::new("ls").unwrap(); - command.arg("dummy-file"); - - // Execute the command with logging - let _ = command.execute(Some(&mut command_log)).await; - - // On expect the errors to be logged - let log_content = String::from_utf8(std::fs::read(log_file_path)?)?; - #[cfg(target_os = "linux")] - assert_eq!( - log_content, - r#"----- $ ls "dummy-file" -Exit status: 2 (ERROR) - -stderr < Result<(), anyhow::Error> { - // Prepare a log file - let tmp_dir = TempTedgeDir::new(); - let workflow_log = tmp_dir.file("workflow.log"); - let log_file_path = workflow_log.path(); - let mut command_log = CommandLog::from_log_path( - workflow_log.utf8_path(), - "software_update".into(), - "123".into(), - ); - - // Prepare a command that cannot be executed - let command = LoggedCommand::new("dummy-command").unwrap(); - - // Execute the command with logging - let _ = command.execute(Some(&mut command_log)).await; - - // The fact that the command cannot be executed must be logged - let log_content = String::from_utf8(std::fs::read(log_file_path)?)?; - assert_eq!( - log_content, - r#"----- $ dummy-command -error: No such file or directory (os error 2) -"# - ); - Ok(()) - } -} diff --git a/crates/common/tedge_utils/src/signals.rs b/crates/common/tedge_utils/src/signals.rs index 0db93348b44..f6dfa0cb7bb 100644 --- a/crates/common/tedge_utils/src/signals.rs +++ b/crates/common/tedge_utils/src/signals.rs @@ -1,3 +1,4 @@ +use nix::unistd::Pid; use std::io; #[cfg(not(windows))] @@ -16,3 +17,20 @@ pub async fn interrupt() -> io::Result<()> { pub async fn interrupt() -> io::Result<()> { tokio::signal::ctrl_c().await } + +pub enum Signal { + SIGTERM, + SIGKILL, +} + +pub fn terminate_process(pid: u32, signal_type: Signal) { + let pid: Pid = nix::unistd::Pid::from_raw(pid as nix::libc::pid_t); + match signal_type { + Signal::SIGTERM => { + let _ = nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM); + } + Signal::SIGKILL => { + let _ = nix::sys::signal::kill(pid, nix::sys::signal::SIGKILL); + } + } +} diff --git a/crates/core/plugin_sm/Cargo.toml b/crates/core/plugin_sm/Cargo.toml index 16ce982e923..b9a9be841a5 100644 --- a/crates/core/plugin_sm/Cargo.toml +++ b/crates/core/plugin_sm/Cargo.toml @@ -14,7 +14,6 @@ async-trait = { workspace = true } camino = { workspace = true } csv = { workspace = true } download = { workspace = true } -logged_command = { workspace = true } regex = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } diff --git a/crates/core/plugin_sm/src/plugin.rs b/crates/core/plugin_sm/src/plugin.rs index b30344a1852..a474ffa2691 100644 --- a/crates/core/plugin_sm/src/plugin.rs +++ b/crates/core/plugin_sm/src/plugin.rs @@ -1,8 +1,6 @@ use async_trait::async_trait; use csv::ReaderBuilder; use download::Downloader; -use logged_command::CommandLog; -use logged_command::LoggedCommand; use regex::Regex; use reqwest::Identity; use serde::Deserialize; @@ -10,7 +8,14 @@ use std::error::Error; use std::path::Path; use std::path::PathBuf; use std::process::Output; -use tedge_api::*; +use tedge_api::CommandLog; +use tedge_api::DownloadInfo; +use tedge_api::LoggedCommand; +use tedge_api::SoftwareError; +use tedge_api::SoftwareModule; +use tedge_api::SoftwareModuleUpdate; +use tedge_api::SoftwareType; +use tedge_api::DEFAULT; use tedge_config::SudoCommandBuilder; use tokio::io::AsyncWriteExt; use tracing::error; diff --git a/crates/core/plugin_sm/src/plugin_manager.rs b/crates/core/plugin_sm/src/plugin_manager.rs index a9094acaf68..326df65818c 100644 --- a/crates/core/plugin_sm/src/plugin_manager.rs +++ b/crates/core/plugin_sm/src/plugin_manager.rs @@ -1,7 +1,6 @@ use crate::plugin::ExternalPluginCommand; use crate::plugin::Plugin; use crate::plugin::LIST; -use logged_command::CommandLog; use std::collections::BTreeMap; use std::fs; use std::io::ErrorKind; @@ -12,6 +11,7 @@ use std::process::Stdio; use tedge_api::commands::CommandStatus; use tedge_api::commands::SoftwareListCommand; use tedge_api::commands::SoftwareUpdateCommand; +use tedge_api::CommandLog; use tedge_api::SoftwareError; use tedge_api::SoftwareType; use tedge_api::DEFAULT; diff --git a/crates/core/tedge_agent/Cargo.toml b/crates/core/tedge_agent/Cargo.toml index 23122f72d56..2a8ba314a98 100644 --- a/crates/core/tedge_agent/Cargo.toml +++ b/crates/core/tedge_agent/Cargo.toml @@ -22,7 +22,6 @@ futures = { workspace = true } hyper = { workspace = true, features = ["full"] } lazy_static = { workspace = true } log = { workspace = true } -logged_command = { workspace = true } path-clean = { workspace = true } plugin_sm = { workspace = true } reqwest = { workspace = true } diff --git a/crates/core/tedge_agent/src/software_manager/actor.rs b/crates/core/tedge_agent/src/software_manager/actor.rs index d390eb52749..a729e84a8c9 100644 --- a/crates/core/tedge_agent/src/software_manager/actor.rs +++ b/crates/core/tedge_agent/src/software_manager/actor.rs @@ -6,7 +6,6 @@ use crate::state_repository::error::StateError; use crate::state_repository::state::AgentStateRepository; use anyhow::anyhow; use async_trait::async_trait; -use logged_command::CommandLog; use plugin_sm::plugin_manager::ExternalPlugins; use plugin_sm::plugin_manager::Plugins; use serde::Deserialize; @@ -30,6 +29,7 @@ use tedge_api::mqtt_topics::OperationType; use tedge_api::workflow::GenericCommandData; use tedge_api::workflow::GenericCommandMetadata; use tedge_api::workflow::GenericCommandState; +use tedge_api::CommandLog; use tedge_api::Jsonify; use tedge_api::SoftwareType; use tedge_config::TEdgeConfigError; diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs b/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs index fd79a6e39c4..2d9bc576de4 100644 --- a/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs +++ b/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs @@ -4,7 +4,6 @@ use async_trait::async_trait; use camino::Utf8PathBuf; use log::error; use log::info; -use logged_command::CommandLog; use std::process::Output; use std::time::Duration; use tedge_actors::fan_in_message_type; @@ -29,8 +28,10 @@ use tedge_api::workflow::GenericCommandMetadata; use tedge_api::workflow::GenericCommandState; use tedge_api::workflow::GenericStateUpdate; use tedge_api::workflow::OperationAction; +use tedge_api::workflow::OperationName; use tedge_api::workflow::WorkflowExecutionError; use tedge_api::workflow::WorkflowSupervisor; +use tedge_api::CommandLog; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; use tedge_script_ext::Execute; diff --git a/crates/core/tedge_api/Cargo.toml b/crates/core/tedge_api/Cargo.toml index d15fdc14973..0848a5ade2e 100644 --- a/crates/core/tedge_api/Cargo.toml +++ b/crates/core/tedge_api/Cargo.toml @@ -29,6 +29,7 @@ time = { workspace = true, features = [ "serde", "serde-well-known", ] } +tokio = { workspace = true, features = ["fs", "process"] } [dev-dependencies] anyhow = { workspace = true } @@ -36,6 +37,7 @@ assert_matches = { workspace = true } clock = { workspace = true } maplit = { workspace = true } mockall = { workspace = true } +tedge_test_utils = { workspace = true } tempfile = { workspace = true } test-case = { workspace = true } time = { workspace = true, features = ["macros"] } diff --git a/crates/core/tedge_api/src/lib.rs b/crates/core/tedge_api/src/lib.rs index 2563b6082ab..7f97bc97a3c 100644 --- a/crates/core/tedge_api/src/lib.rs +++ b/crates/core/tedge_api/src/lib.rs @@ -23,6 +23,8 @@ pub use error::*; pub use health::*; pub use software::*; pub use store::pending_entity_store; +pub use workflow::log::command_log::CommandLog; +pub use workflow::log::logged_command::LoggedCommand; #[cfg(test)] mod tests { diff --git a/crates/core/tedge_api/src/workflow/log/command_log.rs b/crates/core/tedge_api/src/workflow/log/command_log.rs new file mode 100644 index 00000000000..4a92cc631e9 --- /dev/null +++ b/crates/core/tedge_api/src/workflow/log/command_log.rs @@ -0,0 +1,305 @@ +use super::logged_command::LoggedCommand; +use crate::workflow::CommandId; +use crate::workflow::GenericCommandState; +use crate::workflow::OperationAction; +use crate::workflow::OperationName; +use camino::Utf8Path; +use camino::Utf8PathBuf; +use log::error; +use std::process::Output; +use time::format_description; +use time::OffsetDateTime; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; +use tokio::io::BufWriter; + +/// Log all command steps +pub struct CommandLog { + /// Path to the command log file + pub path: Utf8PathBuf, + + /// operation name + pub operation: OperationName, + + /// the chain of operations leading to this command + pub invoking_operations: Vec, + + /// command id + pub cmd_id: CommandId, + + /// The log file of the root command invoking this command + /// + /// None, if not open yet. + pub file: Option, +} + +impl CommandLog { + pub fn new( + log_dir: Utf8PathBuf, + operation: OperationName, + cmd_id: CommandId, + invoking_operations: Vec, + root_operation: Option, + root_cmd_id: Option, + ) -> Self { + let root_operation = root_operation.unwrap_or(operation.clone()); + let root_cmd_id = root_cmd_id.unwrap_or(cmd_id.clone()); + + let path = log_dir.join(format!("workflow-{}-{}.log", root_operation, root_cmd_id)); + CommandLog { + path, + operation, + invoking_operations, + cmd_id, + file: None, + } + } + + pub fn from_log_path( + path: impl AsRef, + operation: OperationName, + cmd_id: CommandId, + ) -> Self { + Self { + path: path.as_ref().into(), + operation, + cmd_id, + invoking_operations: vec![], + file: None, + } + } + + pub async fn open(&mut self) -> Result<&mut File, std::io::Error> { + if self.file.is_none() { + self.file = Some( + File::options() + .append(true) + .create(true) + .open(self.path.clone()) + .await?, + ); + } + Ok(self.file.as_mut().unwrap()) + } + + pub async fn log_header(&mut self, topic: &str) { + let now = OffsetDateTime::now_utc() + .format(&format_description::well_known::Rfc3339) + .unwrap(); + let cmd_id = &self.cmd_id; + let operation = &self.operation; + let header_message = format!( + r#" +================================================================== +Triggered {operation} workflow +================================================================== + +topic: {topic} +operation: {operation} +cmd_id: {cmd_id} +time: {now} + +================================================================== +"# + ); + if let Err(err) = self.write(&header_message).await { + error!("Fail to log to {}: {err}", self.path) + } + } + + pub async fn log_state_action( + &mut self, + state: &GenericCommandState, + action: &OperationAction, + ) { + if state.is_init() && self.invoking_operations.is_empty() { + self.log_header(state.topic.name.as_str()).await; + } + let step = &state.status; + let state = &state.payload.to_string(); + let message = format!( + r#" +State: {state} + +Action: {action} +"# + ); + self.log_step(step, &message).await + } + + pub async fn log_step(&mut self, step: &str, action: &str) { + let now = OffsetDateTime::now_utc() + .format(&format_description::well_known::Rfc3339) + .unwrap(); + let operation = &self.operation; + let parent_operation = if self.invoking_operations.is_empty() { + operation.to_string() + } else { + format!("{} > {}", self.invoking_operations.join(" > "), operation) + }; + + let message = format!( + r#" +----------------------[ {parent_operation} @ {step} | time={now} ]---------------------- +{action} +"# + ); + if let Err(err) = self.write(&message).await { + error!("Fail to log to {}: {err}", self.path) + } + } + + pub async fn log_script_output(&mut self, result: &Result) { + self.log_command_and_output("", result).await + } + + pub async fn log_command_and_output( + &mut self, + command_line: &str, + result: &Result, + ) { + if let Err(err) = self.write_script_output(command_line, result).await { + error!("Fail to log to {}: {err}", self.path) + } + } + + pub async fn write_script_output( + &mut self, + command_line: &str, + result: &Result, + ) -> Result<(), std::io::Error> { + let file = self.open().await?; + let mut writer = BufWriter::new(file); + LoggedCommand::log_outcome(command_line, result, &mut writer).await?; + Ok(()) + } + + pub async fn write(&mut self, message: impl AsRef<[u8]>) -> Result<(), std::io::Error> { + let file = self.open().await?; + file.write_all(message.as_ref()).await?; + file.flush().await?; + file.sync_all().await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tedge_test_utils::fs::TempTedgeDir; + + #[tokio::test] + async fn on_execute_are_logged_command_line_exit_status_stdout_and_stderr( + ) -> Result<(), anyhow::Error> { + // Prepare a log file + let tmp_dir = TempTedgeDir::new(); + let workflow_log = tmp_dir.file("workflow.log"); + let log_file_path = workflow_log.path(); + let mut command_log = CommandLog::from_log_path( + workflow_log.utf8_path(), + "software_update".into(), + "123".into(), + ); + + // Prepare a command + let mut command = LoggedCommand::new("echo").unwrap(); + command.arg("Hello").arg("World!"); + + // Execute the command with logging + let _ = command.execute(Some(&mut command_log)).await; + + let log_content = String::from_utf8(std::fs::read(log_file_path)?)?; + assert_eq!( + log_content, + r#"----- $ echo "Hello" "World!" +Exit status: 0 (OK) + +stderr (EMPTY) + +stdout < Result<(), anyhow::Error> { + // Prepare a log file + let tmp_dir = TempTedgeDir::new(); + let workflow_log = tmp_dir.file("workflow.log"); + let log_file_path = workflow_log.path(); + let mut command_log = CommandLog::from_log_path( + workflow_log.utf8_path(), + "software_update".into(), + "123".into(), + ); + + // Prepare a command that triggers some content on stderr + let mut command = LoggedCommand::new("ls").unwrap(); + command.arg("dummy-file"); + + // Execute the command with logging + let _ = command.execute(Some(&mut command_log)).await; + + // On expect the errors to be logged + let log_content = String::from_utf8(std::fs::read(log_file_path)?)?; + #[cfg(target_os = "linux")] + assert_eq!( + log_content, + r#"----- $ ls "dummy-file" +Exit status: 2 (ERROR) + +stderr < Result<(), anyhow::Error> { + // Prepare a log file + let tmp_dir = TempTedgeDir::new(); + let workflow_log = tmp_dir.file("workflow.log"); + let log_file_path = workflow_log.path(); + let mut command_log = CommandLog::from_log_path( + workflow_log.utf8_path(), + "software_update".into(), + "123".into(), + ); + + // Prepare a command that cannot be executed + let command = LoggedCommand::new("dummy-command").unwrap(); + + // Execute the command with logging + let _ = command.execute(Some(&mut command_log)).await; + + // The fact that the command cannot be executed must be logged + let log_content = String::from_utf8(std::fs::read(log_file_path)?)?; + assert_eq!( + log_content, + r#"----- $ dummy-command +error: No such file or directory (os error 2) +"# + ); + Ok(()) + } +} diff --git a/crates/core/tedge_api/src/workflow/log/logged_command.rs b/crates/core/tedge_api/src/workflow/log/logged_command.rs new file mode 100644 index 00000000000..67a4290d9b7 --- /dev/null +++ b/crates/core/tedge_api/src/workflow/log/logged_command.rs @@ -0,0 +1,287 @@ +use crate::CommandLog; +use std::ffi::OsStr; +use std::os::unix::process::ExitStatusExt; +use std::process::Output; +use std::process::Stdio; +use std::time::Duration; +use tedge_utils::signals::terminate_process; +use tedge_utils::signals::Signal; +use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; +use tokio::process::Child; +use tokio::process::Command; + +#[derive(Debug)] +pub enum CmdStatus { + Successful, + KilledWithSigterm, + KilledWithSigKill, +} +#[derive(Debug)] +pub struct LoggingChild { + command_line: String, + pub inner_child: Child, +} + +impl LoggingChild { + pub async fn wait_for_output_with_timeout( + self, + command_log: &mut CommandLog, + graceful_timeout: Duration, + forceful_timeout: Duration, + ) -> Result { + let cid = self.inner_child.id(); + let cmd_line = self.command_line; + let mut status = CmdStatus::Successful; + tokio::select! { + outcome = self.inner_child.wait_with_output() => { + Self::update_and_log_outcome(cmd_line, outcome, command_log, graceful_timeout, &status).await + } + _ = Self::timeout_operation(&mut status, cid, graceful_timeout, forceful_timeout) => { + Err(std::io::Error::new(std::io::ErrorKind::Other,"failed to kill the process: {cmd_line}")) + } + } + } + + pub async fn wait_with_output( + self, + command_log: Option<&mut CommandLog>, + ) -> Result { + let outcome = self.inner_child.wait_with_output().await; + if let Some(command_log) = command_log { + command_log + .log_command_and_output(&self.command_line, &outcome) + .await; + } + outcome + } + + async fn update_and_log_outcome( + command_line: String, + outcome: Result, + command_log: &mut CommandLog, + timeout: Duration, + status: &CmdStatus, + ) -> Result { + let outcome = match status { + CmdStatus::Successful => outcome, + CmdStatus::KilledWithSigterm | CmdStatus::KilledWithSigKill => { + outcome.map(|outcome| update_stderr_message(outcome, timeout))? + } + }; + command_log + .log_command_and_output(&command_line, &outcome) + .await; + outcome + } + + async fn timeout_operation( + status: &mut CmdStatus, + child_id: Option, + graceful_timeout: Duration, + forceful_timeout: Duration, + ) -> Result<(), std::io::Error> { + *status = CmdStatus::Successful; + + if let Some(pid) = child_id { + tokio::time::sleep(graceful_timeout).await; + + // stop the child process by sending sigterm + *status = CmdStatus::KilledWithSigterm; + terminate_process(pid, Signal::SIGTERM); + + tokio::time::sleep(forceful_timeout).await; + + // stop the child process by sending sigkill + *status = CmdStatus::KilledWithSigKill; + terminate_process(pid, Signal::SIGKILL); + } + + // wait for the process to exit after signal + tokio::time::sleep(Duration::from_secs(120)).await; + + Ok(()) + } +} + +fn update_stderr_message(mut output: Output, timeout: Duration) -> Result { + output.stderr.append( + &mut format!( + "operation failed due to timeout: duration={}s", + timeout.as_secs() + ) + .as_bytes() + .to_vec(), + ); + Ok(output) +} + +/// A command which execution is logged. +/// +/// This struct wraps the main command with a nice representation of that command. +pub struct LoggedCommand { + command: Command, +} + +impl std::fmt::Display for LoggedCommand { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let command = self.command.as_std(); + + command.get_program().to_string_lossy().fmt(f)?; + for arg in command.get_args() { + // The arguments are displayed as debug, to be properly quoted and distinguished from each other. + write!(f, " {:?}", arg.to_string_lossy())?; + } + Ok(()) + } +} + +impl LoggedCommand { + /// Creates a new `LoggedCommand`. + /// + /// In contrast to [`std::process::Command`], `program` can contain space-separated arguments, + /// which will be properly parsed, split, and passed into `.args()` call for the underlying + /// command. + pub fn new(program: impl AsRef) -> Result { + let mut args = shell_words::split(&program.as_ref().to_string_lossy()) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?; + + let mut command = match args.len() { + 0 => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "command line is empty.", + )) + } + 1 => Command::new(&args[0]), + _ => { + let mut command = Command::new(args.remove(0)); + command.args(&args); + command + } + }; + + command + // TODO: should use tmp from config + .current_dir("/tmp") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + Ok(LoggedCommand { command }) + } + + pub fn arg(&mut self, arg: impl AsRef) -> &mut LoggedCommand { + self.command.arg(arg); + self + } + + /// Execute the command and log its exit status, stdout and stderr + /// + /// If the command has been executed the outcome is returned (successful or not). + /// If the command fails to execute (say not found or not executable) an `std::io::Error` is returned. + /// + /// If the function fails to log the execution of the command, + /// this is logged with `log::error!` without changing the return value. + pub async fn execute( + mut self, + command_log: Option<&mut CommandLog>, + ) -> Result { + let outcome = self.command.output().await; + if let Some(command_log) = command_log { + command_log + .log_command_and_output(&self.to_string(), &outcome) + .await; + } + outcome + } + + pub fn spawn(&mut self) -> Result { + let child = self.command.spawn()?; + Ok(LoggingChild { + command_line: self.to_string(), + inner_child: child, + }) + } + + pub async fn log_outcome( + command_line: &str, + result: &Result, + logger: &mut (impl AsyncWrite + Unpin), + ) -> Result<(), std::io::Error> { + if !command_line.is_empty() { + logger + .write_all(format!("----- $ {}\n", command_line).as_bytes()) + .await?; + } + + match result.as_ref() { + Ok(output) => { + if let Some(code) = &output.status.code() { + let exit_code_msg = if *code == 0 { "OK" } else { "ERROR" }; + logger + .write_all(format!("Exit status: {code} ({exit_code_msg})\n\n").as_bytes()) + .await? + }; + if let Some(signal) = &output.status.signal() { + logger + .write_all(format!("Killed by signal: {signal}\n\n").as_bytes()) + .await? + } + // Log stderr then stdout, so the flow reads chronologically + // as the stderr is used for log messages and the stdout is used for results + if !output.stderr.is_empty() { + logger.write_all(b"stderr < { + logger + .write_all(format!("error: {}\n", &err).as_bytes()) + .await?; + } + } + + logger.flush().await?; + Ok(()) + } +} + +impl From for LoggedCommand { + fn from(mut command: Command) -> Self { + command + // TODO: should use tmp from config + .current_dir("/tmp") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + Self { command } + } +} + +impl From for LoggedCommand { + fn from(mut command: std::process::Command) -> Self { + command + // TODO: should use tmp from config + .current_dir("/tmp") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + Self { + command: tokio::process::Command::from(command), + } + } +} diff --git a/crates/core/tedge_api/src/workflow/log/mod.rs b/crates/core/tedge_api/src/workflow/log/mod.rs new file mode 100644 index 00000000000..0ba14c729a0 --- /dev/null +++ b/crates/core/tedge_api/src/workflow/log/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod command_log; +pub(crate) mod logged_command; diff --git a/crates/core/tedge_api/src/workflow/mod.rs b/crates/core/tedge_api/src/workflow/mod.rs index c81e64c4e96..e564f073ba6 100644 --- a/crates/core/tedge_api/src/workflow/mod.rs +++ b/crates/core/tedge_api/src/workflow/mod.rs @@ -1,4 +1,5 @@ pub mod error; +pub(crate) mod log; mod on_disk; pub mod script; pub mod state; diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 83a89e23ea0..729b276c3af 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -1,5 +1,5 @@ use crate::workflow::*; -use log::info; +use ::log::info; use on_disk::OnDiskCommandBoard; use serde::Serialize; diff --git a/crates/extensions/c8y_mapper_ext/Cargo.toml b/crates/extensions/c8y_mapper_ext/Cargo.toml index 5f4cb878438..72683a6a21b 100644 --- a/crates/extensions/c8y_mapper_ext/Cargo.toml +++ b/crates/extensions/c8y_mapper_ext/Cargo.toml @@ -18,7 +18,6 @@ c8y_http_proxy = { workspace = true } camino = { workspace = true } clock = { workspace = true } json-writer = { workspace = true } -logged_command = { workspace = true } mime = { workspace = true } plugin_sm = { workspace = true } serde = { workspace = true } diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 3464062de4d..03b5b67a45a 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -49,8 +49,6 @@ use c8y_auth_proxy::url::ProxyUrlGenerator; use c8y_http_proxy::handle::C8YHttpProxy; use c8y_http_proxy::messages::CreateEvent; use camino::Utf8Path; -use logged_command::CommandLog; -use logged_command::LoggedCommand; use plugin_sm::operation_logs::OperationLogs; use plugin_sm::operation_logs::OperationLogsError; use serde_json::json; @@ -85,9 +83,11 @@ use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; use tedge_api::pending_entity_store::PendingEntityData; use tedge_api::workflow::GenericCommandState; +use tedge_api::CommandLog; use tedge_api::DownloadInfo; use tedge_api::EntityStore; use tedge_api::Jsonify; +use tedge_api::LoggedCommand; use tedge_config::AutoLogUpload; use tedge_config::SoftwareManagementApiFlag; use tedge_config::TEdgeConfigError;