diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index 48bec29f3c0..0262d5ce076 100644 --- a/crates/core/c8y_api/src/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs @@ -6,10 +6,10 @@ use std::collections::HashMap; use std::fmt; use tedge_api::alarm::ThinEdgeAlarm; use tedge_api::alarm::ThinEdgeAlarmData; +use tedge_api::commands::SoftwareListCommand; use tedge_api::entity_store::EntityMetadata; use tedge_api::entity_store::EntityType; use tedge_api::event::ThinEdgeEvent; -use tedge_api::messages::SoftwareListCommand; use tedge_api::EntityStore; use tedge_api::Jsonify; use tedge_api::SoftwareModule; @@ -366,11 +366,11 @@ mod tests { use std::collections::HashSet; use tedge_api::alarm::ThinEdgeAlarm; use tedge_api::alarm::ThinEdgeAlarmData; + use tedge_api::commands::SoftwareListCommandPayload; use tedge_api::entity_store::EntityExternalId; use tedge_api::entity_store::EntityRegistrationMessage; use tedge_api::entity_store::InvalidExternalIdError; use tedge_api::event::ThinEdgeEventData; - use tedge_api::messages::SoftwareListCommandPayload; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use test_case::test_case; diff --git a/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs b/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs index 3f184dbd0ce..c02483b79c8 100644 --- a/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs +++ b/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs @@ -313,7 +313,7 @@ pub trait OperationStatusMessage { #[cfg(test)] mod tests { use super::*; - use tedge_api::messages::SoftwareListCommandPayload; + use tedge_api::commands::SoftwareListCommandPayload; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::Jsonify; diff --git a/crates/core/plugin_sm/src/plugin_manager.rs b/crates/core/plugin_sm/src/plugin_manager.rs index 77aad9411d0..84be626a001 100644 --- a/crates/core/plugin_sm/src/plugin_manager.rs +++ b/crates/core/plugin_sm/src/plugin_manager.rs @@ -9,9 +9,9 @@ use std::io::{self}; use std::path::Path; use std::path::PathBuf; use std::process::Stdio; -use tedge_api::messages::CommandStatus; -use tedge_api::messages::SoftwareListCommand; -use tedge_api::messages::SoftwareUpdateCommand; +use tedge_api::commands::CommandStatus; +use tedge_api::commands::SoftwareListCommand; +use tedge_api::commands::SoftwareUpdateCommand; use tedge_api::SoftwareError; use tedge_api::SoftwareType; use tedge_api::DEFAULT; diff --git a/crates/core/tedge_agent/src/operation_file_cache/mod.rs b/crates/core/tedge_agent/src/operation_file_cache/mod.rs index 6dcc70bb286..d461bf0f5bc 100644 --- a/crates/core/tedge_agent/src/operation_file_cache/mod.rs +++ b/crates/core/tedge_agent/src/operation_file_cache/mod.rs @@ -32,7 +32,7 @@ use tedge_actors::RuntimeRequestSink; use tedge_actors::Sender; use tedge_actors::Service; use tedge_actors::SimpleMessageBoxBuilder; -use tedge_api::messages::ConfigUpdateCmdPayload; +use tedge_api::commands::ConfigUpdateCmdPayload; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::ChannelFilter; use tedge_api::mqtt_topics::EntityFilter; diff --git a/crates/core/tedge_agent/src/restart_manager/actor.rs b/crates/core/tedge_agent/src/restart_manager/actor.rs index 0e26971a4c9..a2ad979b261 100644 --- a/crates/core/tedge_agent/src/restart_manager/actor.rs +++ b/crates/core/tedge_agent/src/restart_manager/actor.rs @@ -13,7 +13,7 @@ use tedge_actors::RuntimeError; use tedge_actors::RuntimeRequest; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; -use tedge_api::messages::CommandStatus; +use tedge_api::commands::CommandStatus; use tedge_api::RestartCommand; use tedge_config::system_services::SystemConfig; use tedge_config::system_services::SystemSpecificCommands; diff --git a/crates/core/tedge_agent/src/restart_manager/tests.rs b/crates/core/tedge_agent/src/restart_manager/tests.rs index 242fcfe6d0d..0e242ab4c9f 100644 --- a/crates/core/tedge_agent/src/restart_manager/tests.rs +++ b/crates/core/tedge_agent/src/restart_manager/tests.rs @@ -13,8 +13,8 @@ use tedge_actors::NoConfig; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; -use tedge_api::messages::CommandStatus; -use tedge_api::messages::RestartCommandPayload; +use tedge_api::commands::CommandStatus; +use tedge_api::commands::RestartCommandPayload; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::RestartCommand; use tedge_config::SudoCommandBuilder; diff --git a/crates/core/tedge_agent/src/software_manager/actor.rs b/crates/core/tedge_agent/src/software_manager/actor.rs index debc0193373..925b3436c31 100644 --- a/crates/core/tedge_agent/src/software_manager/actor.rs +++ b/crates/core/tedge_agent/src/software_manager/actor.rs @@ -23,10 +23,10 @@ use tedge_actors::RuntimeError; use tedge_actors::RuntimeRequest; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; -use tedge_api::messages::CommandStatus; -use tedge_api::messages::SoftwareCommandMetadata; -use tedge_api::messages::SoftwareListCommand; -use tedge_api::messages::SoftwareUpdateCommand; +use tedge_api::commands::CommandStatus; +use tedge_api::commands::SoftwareCommandMetadata; +use tedge_api::commands::SoftwareListCommand; +use tedge_api::commands::SoftwareUpdateCommand; use tedge_api::SoftwareType; use tedge_config::TEdgeConfigError; use tracing::error; diff --git a/crates/core/tedge_agent/src/software_manager/tests.rs b/crates/core/tedge_agent/src/software_manager/tests.rs index e8113cfabd1..529edc7201e 100644 --- a/crates/core/tedge_agent/src/software_manager/tests.rs +++ b/crates/core/tedge_agent/src/software_manager/tests.rs @@ -14,14 +14,14 @@ use tedge_actors::NoConfig; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; -use tedge_api::messages::CommandStatus; -use tedge_api::messages::SoftwareCommandMetadata; -use tedge_api::messages::SoftwareListCommand; -use tedge_api::messages::SoftwareModuleAction; -use tedge_api::messages::SoftwareModuleItem; -use tedge_api::messages::SoftwareRequestResponseSoftwareList; -use tedge_api::messages::SoftwareUpdateCommand; -use tedge_api::messages::SoftwareUpdateCommandPayload; +use tedge_api::commands::CommandStatus; +use tedge_api::commands::SoftwareCommandMetadata; +use tedge_api::commands::SoftwareListCommand; +use tedge_api::commands::SoftwareModuleAction; +use tedge_api::commands::SoftwareModuleItem; +use tedge_api::commands::SoftwareRequestResponseSoftwareList; +use tedge_api::commands::SoftwareUpdateCommand; +use tedge_api::commands::SoftwareUpdateCommandPayload; use tedge_api::mqtt_topics::EntityTopicId; use tedge_config::SudoCommandBuilder; use tedge_config::TEdgeConfigLocation; diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs b/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs index 3a625f76211..ac0da5cce55 100644 --- a/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs +++ b/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs @@ -15,10 +15,10 @@ use tedge_actors::MessageReceiver; use tedge_actors::RuntimeError; use tedge_actors::Sender; use tedge_actors::UnboundedLoggingReceiver; -use tedge_api::messages::RestartCommand; -use tedge_api::messages::SoftwareCommandMetadata; -use tedge_api::messages::SoftwareListCommand; -use tedge_api::messages::SoftwareUpdateCommand; +use tedge_api::commands::RestartCommand; +use tedge_api::commands::SoftwareCommandMetadata; +use tedge_api::commands::SoftwareListCommand; +use tedge_api::commands::SoftwareUpdateCommand; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs b/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs index ad1942f8488..7fcd33fd195 100644 --- a/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs +++ b/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs @@ -16,14 +16,14 @@ use tedge_actors::RequestEnvelope; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; -use tedge_api::messages::CommandStatus; -use tedge_api::messages::RestartCommandPayload; -use tedge_api::messages::SoftwareCommandMetadata; -use tedge_api::messages::SoftwareListCommand; -use tedge_api::messages::SoftwareModuleAction; -use tedge_api::messages::SoftwareModuleItem; -use tedge_api::messages::SoftwareRequestResponseSoftwareList; -use tedge_api::messages::SoftwareUpdateCommandPayload; +use tedge_api::commands::CommandStatus; +use tedge_api::commands::RestartCommandPayload; +use tedge_api::commands::SoftwareCommandMetadata; +use tedge_api::commands::SoftwareListCommand; +use tedge_api::commands::SoftwareModuleAction; +use tedge_api::commands::SoftwareModuleItem; +use tedge_api::commands::SoftwareRequestResponseSoftwareList; +use tedge_api::commands::SoftwareUpdateCommandPayload; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::workflow::WorkflowSupervisor; diff --git a/crates/core/tedge_api/src/messages.rs b/crates/core/tedge_api/src/commands.rs similarity index 100% rename from crates/core/tedge_api/src/messages.rs rename to crates/core/tedge_api/src/commands.rs diff --git a/crates/core/tedge_api/src/entity_store.rs b/crates/core/tedge_api/src/entity_store.rs index 6be53ae2bb5..614d671f111 100644 --- a/crates/core/tedge_api/src/entity_store.rs +++ b/crates/core/tedge_api/src/entity_store.rs @@ -8,14 +8,14 @@ // TODO: move entity business logic to its own module use crate::entity_store; -use crate::message_log::MessageLogReader; -use crate::message_log::MessageLogWriter; use crate::mqtt_topics::Channel; use crate::mqtt_topics::EntityTopicId; use crate::mqtt_topics::MqttSchema; use crate::mqtt_topics::TopicIdError; -use crate::pending_entity_store::PendingEntityData; -use crate::pending_entity_store::PendingEntityStore; +use crate::store::message_log::MessageLogReader; +use crate::store::message_log::MessageLogWriter; +use crate::store::pending_entity_store::PendingEntityData; +use crate::store::pending_entity_store::PendingEntityStore; use log::debug; use log::error; use log::info; diff --git a/crates/core/tedge_api/src/lib.rs b/crates/core/tedge_api/src/lib.rs index febb79fad33..1fc1b81cec9 100644 --- a/crates/core/tedge_api/src/lib.rs +++ b/crates/core/tedge_api/src/lib.rs @@ -1,35 +1,28 @@ pub mod alarm; -pub mod builder; -pub mod data; +pub mod commands; pub mod entity_store; pub mod error; pub mod event; -pub mod group; pub mod health; pub mod measurement; -pub mod message_log; -pub mod messages; pub mod mqtt_topics; -pub mod parser; pub mod path; -pub mod pending_entity_store; -mod ring_buffer; -pub mod serialize; mod software; -pub mod utils; +mod store; pub mod workflow; +pub use commands::CommandStatus; +pub use commands::Jsonify; +pub use commands::OperationStatus; +pub use commands::RestartCommand; +pub use commands::SoftwareListCommand; +pub use commands::SoftwareUpdateCommand; pub use download::*; pub use entity_store::EntityStore; pub use error::*; pub use health::*; -pub use messages::CommandStatus; -pub use messages::Jsonify; -pub use messages::OperationStatus; -pub use messages::RestartCommand; -pub use messages::SoftwareListCommand; -pub use messages::SoftwareUpdateCommand; pub use software::*; +pub use store::pending_entity_store; #[cfg(test)] mod tests { diff --git a/crates/core/tedge_api/src/builder.rs b/crates/core/tedge_api/src/measurement/builder.rs similarity index 98% rename from crates/core/tedge_api/src/builder.rs rename to crates/core/tedge_api/src/measurement/builder.rs index 2a742978d73..f424ecab7a4 100644 --- a/crates/core/tedge_api/src/builder.rs +++ b/crates/core/tedge_api/src/measurement/builder.rs @@ -1,6 +1,6 @@ use time::OffsetDateTime; -use crate::data::*; +use crate::measurement::data::*; use crate::measurement::*; /// A `MeasurementVisitor` that builds up `ThinEdgeJson`. diff --git a/crates/core/tedge_api/src/data.rs b/crates/core/tedge_api/src/measurement/data.rs similarity index 100% rename from crates/core/tedge_api/src/data.rs rename to crates/core/tedge_api/src/measurement/data.rs diff --git a/crates/core/tedge_api/src/group.rs b/crates/core/tedge_api/src/measurement/group.rs similarity index 100% rename from crates/core/tedge_api/src/group.rs rename to crates/core/tedge_api/src/measurement/group.rs diff --git a/crates/core/tedge_api/src/measurement.rs b/crates/core/tedge_api/src/measurement/mod.rs similarity index 95% rename from crates/core/tedge_api/src/measurement.rs rename to crates/core/tedge_api/src/measurement/mod.rs index 94d5c87bd04..34317929618 100644 --- a/crates/core/tedge_api/src/measurement.rs +++ b/crates/core/tedge_api/src/measurement/mod.rs @@ -1,5 +1,18 @@ use time::OffsetDateTime; +#[cfg(test)] +pub mod builder; +#[cfg(test)] +mod data; +mod group; +mod parser; +mod serialize; +pub(crate) mod utils; + +pub use group::*; +pub use parser::*; +pub use serialize::*; + /// The `MeasurementVisitor` trait represents the capability to visit a series of measurements, possibly grouped. /// /// Here is an implementation of the `MeasurementVisitor` trait that prints the measurements: diff --git a/crates/core/tedge_api/src/parser.rs b/crates/core/tedge_api/src/measurement/parser.rs similarity index 95% rename from crates/core/tedge_api/src/parser.rs rename to crates/core/tedge_api/src/measurement/parser.rs index cc6320083f1..cf140837567 100644 --- a/crates/core/tedge_api/src/parser.rs +++ b/crates/core/tedge_api/src/measurement/parser.rs @@ -276,7 +276,7 @@ fn invalid_empty_measurement(key: &str) -> String { fn map_error(error: serde_json::Error, input: &str) -> ThinEdgeJsonParserError { const MAX_INPUT_EXCERPT: usize = 80; let input_excerpt = - crate::utils::excerpt(input, error.line(), error.column(), MAX_INPUT_EXCERPT); + crate::measurement::utils::excerpt(input, error.line(), error.column(), MAX_INPUT_EXCERPT); ThinEdgeJsonParserError { error, input_excerpt, @@ -287,11 +287,11 @@ mod tests { use time::macros::datetime; use time::OffsetDateTime; - use crate::parser::parse_str; + use super::parse_str; #[test] fn it_deserializes_thin_edge_json() -> anyhow::Result<()> { - use crate::builder::ThinEdgeJsonBuilder; + use crate::measurement::builder::ThinEdgeJsonBuilder; let input = r#"{ "time" : "2021-04-30T17:03:14.123+02:00", "pressure": 123.4, @@ -333,7 +333,7 @@ mod tests { #[test] fn it_shows_input_excerpt_on_error() { - use crate::builder::ThinEdgeJsonBuilder; + use crate::measurement::builder::ThinEdgeJsonBuilder; let input = "{\n\"time\" : null\n}"; @@ -349,7 +349,7 @@ mod tests { #[test] fn it_accepts_unix_timestamps_in_place_of_iso_8601() { - use crate::builder::ThinEdgeJsonBuilder; + use crate::measurement::builder::ThinEdgeJsonBuilder; let input = "{\n\"time\" : 1701949168,\n\"test\": 1023}"; let expected_timestamp = OffsetDateTime::parse( @@ -367,7 +367,7 @@ mod tests { #[test] fn it_accepts_decimals_as_unix_timestamps() { - use crate::builder::ThinEdgeJsonBuilder; + use crate::measurement::builder::ThinEdgeJsonBuilder; let input = "{\n\"time\" : 1701949168.001,\n\"test\": 1023}"; let expected_timestamp = OffsetDateTime::parse( @@ -385,7 +385,7 @@ mod tests { #[test] fn it_produces_a_clear_error_message_when_unix_timestamp_is_out_of_range() { - use crate::builder::ThinEdgeJsonBuilder; + use crate::measurement::builder::ThinEdgeJsonBuilder; let input = "{\n\"time\" : -377705116801,\n\"test\": 1023}"; let mut builder = ThinEdgeJsonBuilder::default(); @@ -400,7 +400,7 @@ mod tests { #[test] fn parse_type_as_measurement() { - use crate::builder::ThinEdgeJsonBuilder; + use crate::measurement::builder::ThinEdgeJsonBuilder; let input = r#"{ "time" : "2021-04-30T17:03:14.123+02:00", diff --git a/crates/core/tedge_api/src/serialize.rs b/crates/core/tedge_api/src/measurement/serialize.rs similarity index 100% rename from crates/core/tedge_api/src/serialize.rs rename to crates/core/tedge_api/src/measurement/serialize.rs diff --git a/crates/core/tedge_api/src/utils.rs b/crates/core/tedge_api/src/measurement/utils.rs similarity index 100% rename from crates/core/tedge_api/src/utils.rs rename to crates/core/tedge_api/src/measurement/utils.rs diff --git a/crates/core/tedge_api/src/store/message_log.rs b/crates/core/tedge_api/src/store/message_log.rs new file mode 100644 index 00000000000..fdef6ff2aed --- /dev/null +++ b/crates/core/tedge_api/src/store/message_log.rs @@ -0,0 +1,164 @@ +//! The message log is a persistent append-only log of MQTT messages. +//! Each line is the JSON representation of that MQTT message. +//! The underlying file is a JSON lines file. +use mqtt_channel::MqttMessage; +use serde_json::json; +use std::fs::File; +use std::fs::OpenOptions; +use std::io::BufRead; +use std::io::BufReader; +use std::io::BufWriter; +use std::io::Write; +use std::path::Path; + +const LOG_FILE_NAME: &str = "entity_store.jsonl"; +const LOG_FORMAT_VERSION: &str = "1.0"; + +#[derive(thiserror::Error, Debug)] +pub enum LogEntryError { + #[error(transparent)] + FromStdIo(std::io::Error), + + #[error("Deserialization failed with {0} while parsing {1}")] + FromSerdeJson(#[source] serde_json::Error, String), +} + +/// A reader to read the log file entries line by line +pub(crate) struct MessageLogReader { + reader: BufReader, +} + +impl MessageLogReader { + pub fn new

(log_dir: P) -> Result + where + P: AsRef, + { + let file = OpenOptions::new() + .read(true) + .open(log_dir.as_ref().join(LOG_FILE_NAME))?; + let mut reader = BufReader::new(file); + + let mut version_info = String::new(); + reader.read_line(&mut version_info)?; + // TODO: Validate if the read version is supported + + Ok(MessageLogReader { reader }) + } + + /// Return the next MQTT message from the log + /// The reads start from the beginning of the file + /// and each read advances the file pointer to the next line + pub fn next_message(&mut self) -> Result, LogEntryError> { + let mut buffer = String::new(); + match self.reader.read_line(&mut buffer) { + Ok(bytes_read) if bytes_read > 0 => { + let message: MqttMessage = serde_json::from_str(&buffer) + .map_err(|err| LogEntryError::FromSerdeJson(err, buffer))?; + Ok(Some(message)) + } + Ok(_) => Ok(None), // EOF + Err(err) => Err(LogEntryError::FromStdIo(err)), + } + } +} + +/// A writer to append new MQTT messages to the end of the log +pub(crate) struct MessageLogWriter { + writer: BufWriter, +} + +impl MessageLogWriter { + pub fn new

(log_dir: P) -> Result + where + P: AsRef, + { + let file = OpenOptions::new() + .create(true) + .append(true) + .open(log_dir.as_ref().join(LOG_FILE_NAME))?; + + // If the file is empty append the version information as a header + let metadata = file.metadata()?; + let file_is_empty = metadata.len() == 0; + + let mut writer = BufWriter::new(file); + + if file_is_empty { + let version_info = json!({ "version": LOG_FORMAT_VERSION }).to_string(); + writeln!(writer, "{}", version_info)?; + } + + Ok(MessageLogWriter { writer }) + } + + pub fn new_truncated

(log_dir: P) -> Result + where + P: AsRef, + { + let _ = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(log_dir.as_ref().join(LOG_FILE_NAME))?; + + MessageLogWriter::new(log_dir) + } + + /// Append the JSON representation of the given message to the log. + /// Each message is appended on a new line. + pub fn append_message(&mut self, message: &MqttMessage) -> Result<(), std::io::Error> { + let json_line = serde_json::to_string(message)?; + writeln!(self.writer, "{}", json_line)?; + self.writer.flush()?; + self.writer.get_ref().sync_all()?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::MessageLogReader; + use super::MessageLogWriter; + use mqtt_channel::MqttMessage; + use mqtt_channel::Topic; + use tempfile::tempdir; + + #[test] + fn test_append_and_retrieve() { + let temp_dir = tempdir().unwrap(); + + // Prepare some dummy messages + let mut messages = vec![]; + for i in 1..5 { + let message = MqttMessage::new( + &Topic::new(&format!("topic{i}")).unwrap(), + format!("payload{i}"), + ); + messages.push(message); + } + + // Populate the log + { + let mut message_log = MessageLogWriter::new(&temp_dir).unwrap(); + let mut message_log_reader = MessageLogReader::new(&temp_dir).unwrap(); + + assert_eq!(message_log_reader.next_message().unwrap(), None); + + for message in messages.clone() { + message_log.append_message(&message).unwrap(); + } + } + + // Read from the log + { + // Reload the message log + let mut message_log_reader = MessageLogReader::new(&temp_dir).unwrap(); + + for message in messages { + assert_eq!(message_log_reader.next_message().unwrap(), Some(message)); + } + // EOF -> None + assert_eq!(message_log_reader.next_message().unwrap(), None); + } + } +} diff --git a/crates/core/tedge_api/src/store/mod.rs b/crates/core/tedge_api/src/store/mod.rs new file mode 100644 index 00000000000..bf9af2c0703 --- /dev/null +++ b/crates/core/tedge_api/src/store/mod.rs @@ -0,0 +1,3 @@ +pub mod message_log; +pub mod pending_entity_store; +mod ring_buffer; diff --git a/crates/core/tedge_api/src/pending_entity_store.rs b/crates/core/tedge_api/src/store/pending_entity_store.rs similarity index 99% rename from crates/core/tedge_api/src/pending_entity_store.rs rename to crates/core/tedge_api/src/store/pending_entity_store.rs index a4a65f28fae..3393f083fc5 100644 --- a/crates/core/tedge_api/src/pending_entity_store.rs +++ b/crates/core/tedge_api/src/store/pending_entity_store.rs @@ -2,7 +2,7 @@ use crate::entity_store::EntityRegistrationMessage; use crate::mqtt_topics::Channel; use crate::mqtt_topics::EntityTopicId; use crate::mqtt_topics::MqttSchema; -use crate::ring_buffer::RingBuffer; +use crate::store::ring_buffer::RingBuffer; use log::error; use mqtt_channel::MqttMessage; use std::collections::HashMap; @@ -11,7 +11,7 @@ use std::collections::HashMap; /// its registration message itself is received. /// It also stores all the child device registration messages received before /// their parents themselves are registered, including their data. -pub struct PendingEntityStore { +pub(crate) struct PendingEntityStore { mqtt_schema: MqttSchema, // This orphans map is keyed by the unregistered parent topic id to their children orphans: HashMap>, @@ -26,7 +26,7 @@ pub struct PendingEntityStore { /// Other metadata messages which are are stored in an unbounded vector, /// as these are more critical data, none of which can be dropped. #[derive(Debug, Clone, Eq, PartialEq)] -pub struct PendingEntityCache { +pub(crate) struct PendingEntityCache { pub reg_message: Option, pub metadata: Vec, } diff --git a/crates/core/tedge_api/src/ring_buffer.rs b/crates/core/tedge_api/src/store/ring_buffer.rs similarity index 97% rename from crates/core/tedge_api/src/ring_buffer.rs rename to crates/core/tedge_api/src/store/ring_buffer.rs index 1106eb2360c..010524b2ea7 100644 --- a/crates/core/tedge_api/src/ring_buffer.rs +++ b/crates/core/tedge_api/src/store/ring_buffer.rs @@ -3,7 +3,7 @@ use std::collections::VecDeque; /// A bounded buffer that replaces older values with newer ones when full #[derive(Debug)] -pub struct RingBuffer { +pub(crate) struct RingBuffer { buffer: VecDeque, size: usize, } diff --git a/crates/core/tedge_api/src/message_log.rs b/crates/core/tedge_api/store/message_log.rs similarity index 98% rename from crates/core/tedge_api/src/message_log.rs rename to crates/core/tedge_api/store/message_log.rs index cbadb8df6b8..7c86a72d929 100644 --- a/crates/core/tedge_api/src/message_log.rs +++ b/crates/core/tedge_api/store/message_log.rs @@ -15,7 +15,7 @@ const LOG_FILE_NAME: &str = "entity_store.jsonl"; const LOG_FORMAT_VERSION: &str = "1.0"; #[derive(thiserror::Error, Debug)] -pub enum LogEntryError { +pub(crate) enum LogEntryError { #[error(transparent)] FromStdIo(std::io::Error), @@ -24,7 +24,7 @@ pub enum LogEntryError { } /// A reader to read the log file entries line by line -pub struct MessageLogReader { +pub(crate) struct MessageLogReader { reader: BufReader, } diff --git a/crates/core/tedge_api/tests/test_suite.rs b/crates/core/tedge_api/tests/test_suite.rs index 0168d2b57a7..b3f2c9e9135 100644 --- a/crates/core/tedge_api/tests/test_suite.rs +++ b/crates/core/tedge_api/tests/test_suite.rs @@ -11,10 +11,10 @@ fn it_rejects_invalid_thin_edge_json() -> anyhow::Result<()> { println!("Fixture: {:?}", fixture.path()); let res: anyhow::Result<_> = { - let mut builder = tedge_api::builder::ThinEdgeJsonBuilder::default(); - tedge_api::parser::parse_str(&input, &mut builder) + let mut builder = tedge_api::measurement::ThinEdgeJsonSerializer::new(); + tedge_api::measurement::parse_str(&input, &mut builder) .map_err(Into::into) - .and_then(|_| builder.done().map_err(Into::into)) + .and_then(|_| builder.into_string().map_err(Into::into)) }; let err_msg = res.unwrap_err().to_string(); @@ -43,8 +43,8 @@ fn it_transforms_valid_thin_edge_json() -> anyhow::Result<()> { let input = std::fs::read_to_string(fixture.path())?; let output = { - let mut builder = tedge_api::serialize::ThinEdgeJsonSerializer::new(); - let res = tedge_api::parser::parse_str(&input, &mut builder); + let mut builder = tedge_api::measurement::ThinEdgeJsonSerializer::new(); + let res = tedge_api::measurement::parse_str(&input, &mut builder); assert!(res.is_ok()); builder.into_string()? }; diff --git a/crates/extensions/aws_mapper_ext/src/error.rs b/crates/extensions/aws_mapper_ext/src/error.rs index ff6f66721f4..ffeccf64b1e 100644 --- a/crates/extensions/aws_mapper_ext/src/error.rs +++ b/crates/extensions/aws_mapper_ext/src/error.rs @@ -1,4 +1,4 @@ -use tedge_api::serialize::ThinEdgeJsonSerializationError; +use tedge_api::measurement::ThinEdgeJsonSerializationError; use tedge_mqtt_ext::MqttError; #[derive(Debug, thiserror::Error)] @@ -7,7 +7,7 @@ pub enum ConversionError { FromThinEdgeJsonSerialization(#[from] ThinEdgeJsonSerializationError), #[error(transparent)] - FromThinEdgeJsonParser(#[from] tedge_api::parser::ThinEdgeJsonParserError), + FromThinEdgeJsonParser(#[from] tedge_api::measurement::ThinEdgeJsonParserError), #[error("The size of the message received on {topic} is {actual_size} which is greater than the threshold size of {threshold}.")] SizeThresholdExceeded { diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 22dea1542ee..b5747e07966 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -64,6 +64,11 @@ use std::path::PathBuf; use std::sync::Arc; use tedge_actors::LoggingSender; use tedge_actors::Sender; +use tedge_api::commands::CommandStatus; +use tedge_api::commands::RestartCommand; +use tedge_api::commands::SoftwareCommandMetadata; +use tedge_api::commands::SoftwareListCommand; +use tedge_api::commands::SoftwareUpdateCommand; use tedge_api::entity_store; use tedge_api::entity_store::EntityExternalId; use tedge_api::entity_store::EntityRegistrationMessage; @@ -72,11 +77,6 @@ use tedge_api::entity_store::Error; use tedge_api::entity_store::InvalidExternalIdError; use tedge_api::event::error::ThinEdgeJsonDeserializerError; use tedge_api::event::ThinEdgeEvent; -use tedge_api::messages::CommandStatus; -use tedge_api::messages::RestartCommand; -use tedge_api::messages::SoftwareCommandMetadata; -use tedge_api::messages::SoftwareListCommand; -use tedge_api::messages::SoftwareUpdateCommand; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::IdGenerator; diff --git a/crates/extensions/c8y_mapper_ext/src/error.rs b/crates/extensions/c8y_mapper_ext/src/error.rs index 562fc115d94..896773e1807 100644 --- a/crates/extensions/c8y_mapper_ext/src/error.rs +++ b/crates/extensions/c8y_mapper_ext/src/error.rs @@ -6,7 +6,7 @@ use c8y_http_proxy::messages::C8YRestError; use plugin_sm::operation_logs::OperationLogsError; use std::path::PathBuf; use tedge_api::entity_store::InvalidExternalIdError; -use tedge_api::serialize::ThinEdgeJsonSerializationError; +use tedge_api::measurement::ThinEdgeJsonSerializationError; use tedge_config::TEdgeConfigError; use tedge_mqtt_ext::MqttError; use tedge_utils::file::FileError; @@ -59,7 +59,7 @@ pub enum ConversionError { ), #[error(transparent)] - FromThinEdgeJsonParser(#[from] tedge_api::parser::ThinEdgeJsonParserError), + FromThinEdgeJsonParser(#[from] tedge_api::measurement::ThinEdgeJsonParserError), #[error(transparent)] SizeThresholdExceeded(#[from] SizeThresholdExceededError), diff --git a/crates/extensions/c8y_mapper_ext/src/json.rs b/crates/extensions/c8y_mapper_ext/src/json.rs index 56b3386c16f..aa6cd58c7a8 100644 --- a/crates/extensions/c8y_mapper_ext/src/json.rs +++ b/crates/extensions/c8y_mapper_ext/src/json.rs @@ -19,7 +19,7 @@ use crate::serializer; use clock::Clock; use clock::WallClock; use tedge_api::entity_store::EntityMetadata; -use tedge_api::parser::*; +use tedge_api::measurement::*; use time::OffsetDateTime; use time::{self}; diff --git a/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs b/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs index fceb449ead9..b6f28d9886e 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs @@ -14,8 +14,8 @@ use c8y_http_proxy::messages::CreateEvent; use camino::Utf8PathBuf; use std::collections::HashMap; use tedge_actors::Sender; -use tedge_api::messages::CommandStatus; -use tedge_api::messages::ConfigSnapshotCmdPayload; +use tedge_api::commands::CommandStatus; +use tedge_api::commands::ConfigSnapshotCmdPayload; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::ChannelFilter; use tedge_api::mqtt_topics::EntityFilter; diff --git a/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs b/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs index 9eb47ae0084..3ab30f938bd 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs @@ -7,10 +7,10 @@ use c8y_api::smartrest::smartrest_serializer::set_operation_executing; use c8y_api::smartrest::smartrest_serializer::succeed_operation_no_payload; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use std::sync::Arc; +use tedge_api::commands::CommandStatus; +use tedge_api::commands::ConfigUpdateCmdPayload; use tedge_api::entity_store::EntityExternalId; use tedge_api::entity_store::EntityMetadata; -use tedge_api::messages::CommandStatus; -use tedge_api::messages::ConfigUpdateCmdPayload; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::ChannelFilter; use tedge_api::mqtt_topics::EntityFilter; diff --git a/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs b/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs index 73ae4a925a1..d1978b4b037 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs @@ -6,9 +6,9 @@ use c8y_api::smartrest::smartrest_serializer::fail_operation; use c8y_api::smartrest::smartrest_serializer::set_operation_executing; use c8y_api::smartrest::smartrest_serializer::succeed_operation_no_payload; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; +use tedge_api::commands::FirmwareInfo; +use tedge_api::commands::FirmwareUpdateCmdPayload; use tedge_api::entity_store::EntityExternalId; -use tedge_api::messages::FirmwareInfo; -use tedge_api::messages::FirmwareUpdateCmdPayload; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::ChannelFilter::Command; use tedge_api::mqtt_topics::ChannelFilter::CommandMetadata; diff --git a/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs b/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs index 1282c712822..a4f2717ad70 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs @@ -14,9 +14,9 @@ use c8y_http_proxy::messages::CreateEvent; use camino::Utf8PathBuf; use std::collections::HashMap; use tedge_actors::Sender; -use tedge_api::messages::CommandStatus; -use tedge_api::messages::LogMetadata; -use tedge_api::messages::LogUploadCmdPayload; +use tedge_api::commands::CommandStatus; +use tedge_api::commands::LogMetadata; +use tedge_api::commands::LogUploadCmdPayload; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::ChannelFilter::Command; use tedge_api::mqtt_topics::ChannelFilter::CommandMetadata; diff --git a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs index 6bf9b2ed7d9..5ce9dd8bb4c 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs @@ -15,7 +15,7 @@ use crate::converter::CumulocityConverter; use crate::error::ConversionError; -use tedge_api::messages::ConfigMetadata; +use tedge_api::commands::ConfigMetadata; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::Jsonify; use tedge_mqtt_ext::MqttMessage; diff --git a/crates/extensions/collectd_ext/src/batcher.rs b/crates/extensions/collectd_ext/src/batcher.rs index 1f108c9f5d6..094af4bc487 100644 --- a/crates/extensions/collectd_ext/src/batcher.rs +++ b/crates/extensions/collectd_ext/src/batcher.rs @@ -1,9 +1,9 @@ use clock::Timestamp; -use tedge_api::group::MeasurementGroup; -use tedge_api::group::MeasurementGrouper; -use tedge_api::group::MeasurementGrouperError; +use tedge_api::measurement::MeasurementGroup; +use tedge_api::measurement::MeasurementGrouper; +use tedge_api::measurement::MeasurementGrouperError; use tedge_api::measurement::MeasurementVisitor; -use tedge_api::serialize::ThinEdgeJsonSerializer; +use tedge_api::measurement::ThinEdgeJsonSerializer; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; diff --git a/crates/extensions/collectd_ext/src/error.rs b/crates/extensions/collectd_ext/src/error.rs index cfffc9f670c..df4133b50ab 100644 --- a/crates/extensions/collectd_ext/src/error.rs +++ b/crates/extensions/collectd_ext/src/error.rs @@ -11,15 +11,15 @@ pub enum DeviceMonitorError { FromInvalidCollectdMeasurement(#[from] crate::collectd::CollectdError), #[error(transparent)] - FromInvalidThinEdgeJson(#[from] tedge_api::group::MeasurementGrouperError), + FromInvalidThinEdgeJson(#[from] tedge_api::measurement::MeasurementGrouperError), #[error(transparent)] FromThinEdgeJsonSerializationError( - #[from] tedge_api::serialize::ThinEdgeJsonSerializationError, + #[from] tedge_api::measurement::ThinEdgeJsonSerializationError, ), #[error(transparent)] - FromBatchingError(#[from] SendError), + FromBatchingError(#[from] SendError), } impl From for RuntimeError { diff --git a/crates/extensions/tedge_config_manager/src/actor.rs b/crates/extensions/tedge_config_manager/src/actor.rs index 1b482d141d7..2ccc03bb518 100644 --- a/crates/extensions/tedge_config_manager/src/actor.rs +++ b/crates/extensions/tedge_config_manager/src/actor.rs @@ -15,9 +15,9 @@ use tedge_actors::LoggingSender; use tedge_actors::MessageReceiver; use tedge_actors::RuntimeError; use tedge_actors::Sender; -use tedge_api::messages::CommandStatus; -use tedge_api::messages::ConfigSnapshotCmdPayload; -use tedge_api::messages::ConfigUpdateCmdPayload; +use tedge_api::commands::CommandStatus; +use tedge_api::commands::ConfigSnapshotCmdPayload; +use tedge_api::commands::ConfigUpdateCmdPayload; use tedge_api::Jsonify; use tedge_downloader_ext::DownloadRequest; use tedge_downloader_ext::DownloadResult; diff --git a/crates/extensions/tedge_log_manager/src/actor.rs b/crates/extensions/tedge_log_manager/src/actor.rs index 772b7eae67b..c80fefddb00 100644 --- a/crates/extensions/tedge_log_manager/src/actor.rs +++ b/crates/extensions/tedge_log_manager/src/actor.rs @@ -18,8 +18,8 @@ use tedge_actors::NoMessage; use tedge_actors::RuntimeError; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; -use tedge_api::messages::CommandStatus; -use tedge_api::messages::LogUploadCmdPayload; +use tedge_api::commands::CommandStatus; +use tedge_api::commands::LogUploadCmdPayload; use tedge_api::Jsonify; use tedge_file_system_ext::FsWatchEvent; use tedge_mqtt_ext::MqttMessage;