From 83f684760dcee2e8e6e2a84dd8064e589755ac27 Mon Sep 17 00:00:00 2001 From: Marcel Guzik Date: Wed, 31 Jul 2024 15:08:22 +0000 Subject: [PATCH 1/2] add OperationHandlerActor A preliminary OperationHandlerActor which still uses OperationHandler underneath, but receives command messages using the actor runtime. This initial implementation shows the limitations of the entity store, which is now a part of CumulocityConverter, and other actors don't have any access to it. Because OperationHandlerActor needs some information about a registered entity (smartrest publish topic), I have added another `MessageSource<(MqttMessage, EntityMetadata)>` impl to the `C8yMapperActor`, but this obviously very hacky. When moving more things out of CumulocityConverter into other actors, we'll need some organized way to access entity metadata (and entity store) from other actors. First thing that comes to mind is making an `EntityStoreActor` that manages the entity store and allows queries and updates to the store via incoming messages. But we probably shouldn't allow all actors to modify the entity store freely, and we'll need to make sure what to do when entities are deleted or updated. Signed-off-by: Marcel Guzik --- Cargo.lock | 1 - crates/core/tedge_mapper/src/c8y/mapper.rs | 14 +- crates/extensions/c8y_mapper_ext/Cargo.toml | 1 - crates/extensions/c8y_mapper_ext/src/actor.rs | 61 +++- .../extensions/c8y_mapper_ext/src/config.rs | 1 + .../c8y_mapper_ext/src/converter.rs | 55 +-- crates/extensions/c8y_mapper_ext/src/lib.rs | 2 +- .../c8y_mapper_ext/src/operations/actor.rs | 337 ++++++++++++++++++ .../c8y_mapper_ext/src/operations/builder.rs | 135 +++++++ .../c8y_mapper_ext/src/operations/handler.rs | 142 ++++++-- .../src/operations/handlers/mod.rs | 68 ++-- .../c8y_mapper_ext/src/operations/mod.rs | 4 + crates/extensions/c8y_mapper_ext/src/tests.rs | 50 +-- 13 files changed, 710 insertions(+), 161 deletions(-) create mode 100644 crates/extensions/c8y_mapper_ext/src/operations/actor.rs create mode 100644 crates/extensions/c8y_mapper_ext/src/operations/builder.rs diff --git a/Cargo.lock b/Cargo.lock index e7f606981a8..9b681fcda79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -780,7 +780,6 @@ dependencies = [ "clock", "json-writer", "mime", - "mockito", "plugin_sm", "proptest", "rand", diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index c6a89cb69fb..c4f1c945e8a 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -212,17 +212,24 @@ impl TEdgeComponent for CumulocityMapper { )?); let mut c8y_mapper_actor = C8yMapperBuilder::try_new( - c8y_mapper_config, + c8y_mapper_config.clone(), &mut mqtt_actor, &mut c8y_http_proxy_actor, &mut timer_actor, - &mut uploader_actor, - &mut downloader_actor, &mut fs_watch_actor, &mut service_monitor_actor, )?; let c8y_prefix = &tedge_config.c8y.bridge.topic_prefix; + + let operation_handler_actor = c8y_mapper_ext::operations::OperationHandlerBuilder::new( + c8y_mapper_config.to_operation_handler_config(), + &mut c8y_mapper_actor, + &mut uploader_actor, + &mut downloader_actor, + &mut c8y_http_proxy_actor, + ); + // Adaptor translating commands sent on te/device/main///cmd/+/+ into requests on tedge/commands/req/+/+ // and translating the responses received on tedge/commands/res/+/+ to te/device/main///cmd/+/+ let old_to_new_agent_adapter = OldAgentAdapter::builder(c8y_prefix, &mut mqtt_actor); @@ -252,6 +259,7 @@ impl TEdgeComponent for CumulocityMapper { if let Some(availability_actor) = availability_actor { runtime.spawn(availability_actor).await?; } + runtime.spawn(operation_handler_actor).await?; runtime.run_to_completion().await?; Ok(()) diff --git a/crates/extensions/c8y_mapper_ext/Cargo.toml b/crates/extensions/c8y_mapper_ext/Cargo.toml index f75dacd39dc..72683a6a21b 100644 --- a/crates/extensions/c8y_mapper_ext/Cargo.toml +++ b/crates/extensions/c8y_mapper_ext/Cargo.toml @@ -49,7 +49,6 @@ url = { workspace = true } [dev-dependencies] assert-json-diff = { workspace = true } assert_matches = { workspace = true } -mockito = { workspace = true } proptest = { workspace = true } rand = { workspace = true } tedge_actors = { workspace = true, features = ["test-helpers"] } diff --git a/crates/extensions/c8y_mapper_ext/src/actor.rs b/crates/extensions/c8y_mapper_ext/src/actor.rs index b7b5b1409f8..270008cdb54 100644 --- a/crates/extensions/c8y_mapper_ext/src/actor.rs +++ b/crates/extensions/c8y_mapper_ext/src/actor.rs @@ -13,7 +13,6 @@ use std::time::Duration; use tedge_actors::fan_in_message_type; use tedge_actors::Actor; use tedge_actors::Builder; -use tedge_actors::ClientMessageBox; use tedge_actors::CloneSender; use tedge_actors::DynSender; use tedge_actors::LoggingSender; @@ -27,6 +26,7 @@ use tedge_actors::Sender; use tedge_actors::Service; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; +use tedge_api::entity_store::EntityMetadata; use tedge_api::entity_store::EntityRegistrationMessage; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::ChannelFilter; @@ -80,6 +80,9 @@ pub struct C8yMapperActor { timer_sender: LoggingSender, bridge_status_messages: SimpleMessageBox, message_handlers: HashMap>>, + // these handlers require entity metadata, so the entity already has to be registered + registered_message_handlers: + HashMap>>, } #[async_trait] @@ -140,6 +143,10 @@ impl C8yMapperActor { timer_sender: LoggingSender, bridge_status_messages: SimpleMessageBox, message_handlers: HashMap>>, + registered_message_handlers: HashMap< + ChannelFilter, + Vec>, + >, ) -> Self { Self { converter, @@ -148,6 +155,7 @@ impl C8yMapperActor { timer_sender, bridge_status_messages, message_handlers, + registered_message_handlers, } } @@ -267,6 +275,25 @@ impl C8yMapperActor { sender.send(message.clone()).await?; } } + + if let Some(message_handler) = self.registered_message_handlers.get_mut(&channel.into()) { + let (entity, _) = self + .converter + .mqtt_schema + .entity_channel_of(&message.topic) + .expect("message should've been confirmed to be using MQTT topic scheme v1"); + + let entity = self + .converter + .entity_store + .get(&entity) + .expect("entity should've already been registered"); + + for sender in message_handler { + sender.send((message.clone(), entity.clone())).await?; + } + } + Ok(()) } @@ -331,11 +358,11 @@ pub struct C8yMapperBuilder { mqtt_publisher: DynSender, http_proxy: C8YHttpProxy, timer_sender: DynSender, - downloader: ClientMessageBox, - uploader: ClientMessageBox, auth_proxy: ProxyUrlGenerator, bridge_monitor_builder: SimpleMessageBoxBuilder, message_handlers: HashMap>>, + registered_message_handlers: + HashMap>>, } impl C8yMapperBuilder { @@ -345,8 +372,6 @@ impl C8yMapperBuilder { mqtt: &mut (impl MessageSource + MessageSink), http: &mut impl Service, timer: &mut impl Service, - uploader: &mut impl Service, - downloader: &mut impl Service, fs_watcher: &mut impl MessageSource, service_monitor: &mut (impl MessageSource + MessageSink), ) -> Result { @@ -360,9 +385,6 @@ impl C8yMapperBuilder { let http_proxy = C8YHttpProxy::new(http); let timer_sender = timer.connect_client(box_builder.get_sender().sender_clone()); - let downloader = ClientMessageBox::new(downloader); - let uploader = ClientMessageBox::new(uploader); - fs_watcher.connect_sink( config.ops_dir.as_std_path().to_path_buf(), &box_builder.get_sender(), @@ -382,6 +404,7 @@ impl C8yMapperBuilder { ); let message_handlers = HashMap::new(); + let registered_message_handlers = HashMap::new(); Ok(Self { config, @@ -389,11 +412,10 @@ impl C8yMapperBuilder { mqtt_publisher, http_proxy, timer_sender, - uploader, - downloader, auth_proxy, bridge_monitor_builder, message_handlers, + registered_message_handlers, }) } @@ -426,6 +448,22 @@ impl MessageSource> for C8yMapperBuilder { } } +impl MessageSource<(MqttMessage, EntityMetadata), Vec> for C8yMapperBuilder { + fn connect_sink( + &mut self, + config: Vec, + peer: &impl MessageSink<(MqttMessage, EntityMetadata)>, + ) { + let sender = LoggingSender::new("Mapper MQTT".into(), peer.get_sender()); + for channel in config { + self.registered_message_handlers + .entry(channel) + .or_default() + .push(sender.clone()); + } + } +} + impl MessageSink for C8yMapperBuilder { fn get_sender(&self) -> DynSender { self.box_builder.get_sender().sender_clone() @@ -444,8 +482,6 @@ impl Builder for C8yMapperBuilder { mqtt_publisher.clone(), self.http_proxy, self.auth_proxy, - self.uploader, - self.downloader, ) .map_err(|err| RuntimeError::ActorError(Box::new(err)))?; @@ -459,6 +495,7 @@ impl Builder for C8yMapperBuilder { timer_sender, bridge_monitor_box, self.message_handlers, + self.registered_message_handlers, )) } } diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index f77765e9423..2b46d550509 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -35,6 +35,7 @@ const STATE_DIR_NAME: &str = ".tedge-mapper-c8y"; const C8Y_CLOUD: &str = "c8y"; const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations"; +#[derive(Clone)] pub struct C8yMapperConfig { pub device_id: String, pub device_topic_id: EntityTopicId, diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index cac47e432ae..b81cbf72d16 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -4,13 +4,9 @@ use super::config::MQTT_MESSAGE_SIZE_THRESHOLD; use super::error::CumulocityMapperError; use super::service_monitor; use crate::actor::CmdId; -use crate::actor::IdDownloadRequest; -use crate::actor::IdDownloadResult; use crate::dynamic_discovery::DiscoverOp; use crate::error::ConversionError; use crate::json; -use crate::operations; -use crate::operations::OperationHandler; use anyhow::anyhow; use anyhow::Context; use c8y_api::http_proxy::C8yEndPoint; @@ -58,7 +54,6 @@ use std::fs; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; -use tedge_actors::ClientMessageBox; use tedge_actors::LoggingSender; use tedge_actors::Sender; use tedge_api::commands::RestartCommand; @@ -86,8 +81,6 @@ use tedge_config::TEdgeConfigError; use tedge_config::TopicPrefix; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; -use tedge_uploader_ext::UploadRequest; -use tedge_uploader_ext::UploadResult; use tedge_utils::file::create_directory_with_defaults; use tedge_utils::file::create_file_with_defaults; use tedge_utils::file::FileError; @@ -186,8 +179,6 @@ pub struct CumulocityConverter { pub command_id: IdGenerator, // Keep active command IDs to avoid creation of multiple commands for an operation pub active_commands: HashSet, - - pub operation_handler: OperationHandler, } impl CumulocityConverter { @@ -196,8 +187,6 @@ impl CumulocityConverter { mqtt_publisher: LoggingSender, http_proxy: C8YHttpProxy, auth_proxy: ProxyUrlGenerator, - uploader: ClientMessageBox<(String, UploadRequest), (String, UploadResult)>, - downloader: ClientMessageBox, ) -> Result { let device_id = config.device_id.clone(); let device_topic_id = config.device_topic_id.clone(); @@ -247,15 +236,6 @@ impl CumulocityConverter { let command_id = config.id_generator(); - let operation_handler = OperationHandler::new( - &config, - downloader, - uploader, - mqtt_publisher.clone(), - http_proxy.clone(), - auth_proxy.clone(), - ); - Ok(CumulocityConverter { size_threshold, config: Arc::new(config), @@ -276,7 +256,6 @@ impl CumulocityConverter { auth_proxy, command_id, active_commands: HashSet::new(), - operation_handler, }) } @@ -1186,16 +1165,6 @@ impl CumulocityConverter { Channel::Command { cmd_id, .. } if self.command_id.is_generator_of(cmd_id) => { self.active_commands.insert(cmd_id.clone()); - let entity = self.entity_store.try_get(&source)?; - let external_id = entity.external_id.clone(); - let entity = operations::EntityTarget { - topic_id: entity.topic_id.clone(), - external_id: external_id.clone(), - smartrest_publish_topic: self - .smartrest_publish_topic_for_entity(&entity.topic_id)?, - }; - - self.operation_handler.handle(entity, message.clone()).await; Ok(vec![]) } @@ -1514,10 +1483,6 @@ pub fn get_local_child_devices_list(path: &Path) -> Result, Cumu #[cfg(test)] pub(crate) mod tests { use super::CumulocityConverter; - use crate::actor::IdDownloadRequest; - use crate::actor::IdDownloadResult; - use crate::actor::IdUploadRequest; - use crate::actor::IdUploadResult; use crate::config::C8yMapperConfig; use crate::Capabilities; use anyhow::Result; @@ -1538,7 +1503,6 @@ pub(crate) mod tests { use tedge_actors::test_helpers::FakeServerBox; use tedge_actors::test_helpers::FakeServerBoxBuilder; use tedge_actors::Builder; - use tedge_actors::ClientMessageBox; use tedge_actors::CloneSender; use tedge_actors::LoggingSender; use tedge_actors::MessageReceiver; @@ -3145,23 +3109,8 @@ pub(crate) mod tests { let auth_proxy_port = config.auth_proxy_port; let auth_proxy = ProxyUrlGenerator::new(auth_proxy_addr, auth_proxy_port, Protocol::Http); - let mut uploader_builder: FakeServerBoxBuilder = - FakeServerBox::builder(); - let uploader = ClientMessageBox::new(&mut uploader_builder); - - let mut downloader_builder: FakeServerBoxBuilder = - FakeServerBox::builder(); - let downloader = ClientMessageBox::new(&mut downloader_builder); - - let converter = CumulocityConverter::new( - config, - mqtt_publisher, - http_proxy, - auth_proxy, - uploader, - downloader, - ) - .unwrap(); + let converter = + CumulocityConverter::new(config, mqtt_publisher, http_proxy, auth_proxy).unwrap(); (converter, c8y_proxy_builder.build()) } diff --git a/crates/extensions/c8y_mapper_ext/src/lib.rs b/crates/extensions/c8y_mapper_ext/src/lib.rs index 91e3881b971..73cd20e2a03 100644 --- a/crates/extensions/c8y_mapper_ext/src/lib.rs +++ b/crates/extensions/c8y_mapper_ext/src/lib.rs @@ -9,7 +9,7 @@ pub mod error; mod fragments; mod inventory; pub mod json; -mod operations; +pub mod operations; mod serializer; pub mod service_monitor; #[cfg(test)] diff --git a/crates/extensions/c8y_mapper_ext/src/operations/actor.rs b/crates/extensions/c8y_mapper_ext/src/operations/actor.rs new file mode 100644 index 00000000000..b584b98df1e --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/operations/actor.rs @@ -0,0 +1,337 @@ +//! Actor handles c8y operations. +//! +//! First, Cumulocity starts an operation like `c8y_SoftwareUpdate` or `c8y_UploadConfigFile`. This +//! is converted by the mapper into a local thin-edge.io command, that is executed by tedge-agent. +//! As the agent executes a command that corresponds to the operation we need to report on that +//! operation progress by sending smartrest messages like `Set operation to EXECUTING`. +//! +//! The handler ignores clearing messages that it receives, as it alone should send clearing +//! messages. + +use std::collections::HashMap; +use std::future::Future; +use std::sync::Arc; + +use super::handler::is_operation_status_transition_valid; +use super::handler::RunningOperation; +use super::handlers::OperationContext; +use super::handlers::OperationMessage; +use super::handlers::OperationOutcome; +use super::OperationHandler; +use crate::actor::PublishMessage; +use async_trait::async_trait; +use tedge_actors::Actor; +use tedge_actors::CloneSender; +use tedge_actors::MessageReceiver; +use tedge_actors::RuntimeError; +use tedge_actors::Sender; +use tedge_actors::SimpleMessageBox; +use tedge_api::mqtt_topics::Channel; +use tedge_api::workflow::GenericCommandState; +use tedge_mqtt_ext::MqttMessage; +use tokio::sync::Mutex; +use tracing::debug; +use tracing::error; +use tracing::warn; + +pub struct OperationHandlerActor { + pub(super) messages: SimpleMessageBox, + pub(super) operation_handler: OperationHandler, + pub(super) running_operations: RunningOperations, +} + +#[async_trait] +impl Actor for OperationHandlerActor { + fn name(&self) -> &str { + "OperationHandler" + } + + async fn run(mut self) -> Result<(), RuntimeError> { + while let Some(input_message) = self.messages.recv().await { + self.handle_operation_message(input_message).await; + } + + Ok(()) + } +} + +impl OperationHandlerActor { + async fn handle_operation_message(&mut self, message: OperationMessage) { + let context = self.operation_handler.context.clone(); + + // input validation + let Ok((_, channel)) = context + .mqtt_schema + .entity_channel_of(&message.message.topic) + else { + return; + }; + + let Channel::Command { cmd_id, .. } = channel else { + return; + }; + + // don't process sub-workflow calls + if cmd_id.starts_with("sub:") { + return; + } + + if !context.command_id.is_generator_of(cmd_id.as_str()) { + return; + } + + let topic = message.message.topic.clone(); + + let mut message_box = self.messages.sender_clone(); + self.running_operations + .report(message, |outcome| async move { + match outcome { + OperationOutcome::Ignored => {} + OperationOutcome::Executing { extra_messages } => { + for m in extra_messages { + message_box.send(PublishMessage(m)).await.unwrap(); + } + } + OperationOutcome::Finished { messages } => { + for m in messages { + message_box.send(PublishMessage(m)).await.unwrap(); + } + + let clearing_message = MqttMessage::new(&topic, []).with_retain(); + message_box + .send(PublishMessage(clearing_message)) + .await + .unwrap(); + } + } + }) + .await; + } +} + +pub(super) struct RunningOperations { + pub(super) current_statuses: Arc, RunningOperation>>>, + pub(super) context: Arc, +} + +impl RunningOperations { + // If operation status transition hasn't been handled yet, spawn a task that will handle it. + async fn report(&mut self, message: OperationMessage, f: F) + where + F: FnOnce(OperationOutcome) -> Fut + Send + 'static, + Fut: Future + Send, + { + let topic = message.message.topic.name.as_str(); + let status = match GenericCommandState::from_command_message(&message.message) { + // clearing message was either echoed back to us by MQTT broker, or was published by + // some other MQTT client; the latter shouldn't really happen, but the former is + // expected + Ok(command) if command.is_cleared() => { + debug!(topic = %topic, "unexpected clearing message"); + return; + } + Err(err) => { + error!(%err, ?message, "could not parse command payload"); + return; + } + Ok(command) => command.status, + }; + + let context = self.context.clone(); + let mut current_statuses = self.current_statuses.lock().await; + let current_operation = current_statuses.get(topic); + + match current_operation { + None => { + let topic: Arc = topic.into(); + let handle = tokio::spawn(async move { + let outcome = context.report(message).await; + f(outcome).await; + }); + current_statuses.insert(topic, RunningOperation { status, handle }); + } + + // if we have task running, check if new status is allowed and then spawn a new task + // that also waits for old transition to complete + Some(current_operation) => { + let previous_status = ¤t_operation.status; + if status == current_operation.status.as_str() { + debug!( + "already handling operation message with this topic and status, ignoring" + ); + return; + } + + // we got a new status, check if it's not invalid and then await previous one and + // handle the new one + if !is_operation_status_transition_valid(previous_status, &status) { + warn!( + topic = %topic, + previous = previous_status, + next = status, + "attempted invalid status transition, ignoring" + ); + return; + } + + // remove currently running operation task from the hashmap and spawn a new one that + // also waits on the old one + let topic: Arc = topic.into(); + + let _current_statuses = self.current_statuses.clone(); + let _topic = topic.clone(); + + let handle = tokio::spawn(async move { + let outcome = context.report(message).await; + if let OperationOutcome::Finished { .. } = outcome { + _current_statuses.lock().await.remove(&*_topic); + } + f(outcome).await; + }); + + current_statuses.insert(topic, RunningOperation { handle, status }); + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::actor::IdDownloadRequest; + use crate::actor::IdDownloadResult; + use crate::actor::IdUploadRequest; + use crate::actor::IdUploadResult; + use crate::actor::PublishMessage; + use crate::operations::builder::OperationHandlerBuilder; + use crate::operations::handler::OperationHandlerConfig; + use crate::Capabilities; + use c8y_api::http_proxy::C8yEndPoint; + use c8y_auth_proxy::url::Protocol; + use c8y_auth_proxy::url::ProxyUrlGenerator; + use c8y_http_proxy::messages::C8YRestRequest; + use c8y_http_proxy::messages::C8YRestResult; + use tedge_actors::test_helpers::FakeServerBox; + use tedge_actors::test_helpers::FakeServerBoxBuilder; + use tedge_actors::Actor; + use tedge_actors::Builder; + use tedge_actors::Sender; + use tedge_actors::SimpleMessageBox; + use tedge_actors::SimpleMessageBoxBuilder; + use tedge_api::commands::ConfigSnapshotCmd; + use tedge_api::commands::ConfigSnapshotCmdPayload; + use tedge_api::entity_store::EntityMetadata; + use tedge_api::mqtt_topics::EntityTopicId; + use tedge_api::mqtt_topics::IdGenerator; + use tedge_api::mqtt_topics::MqttSchema; + use tedge_api::CommandStatus; + use tedge_config::AutoLogUpload; + use tedge_config::TopicPrefix; + use tedge_mqtt_ext::MqttMessage; + use tedge_test_utils::fs::TempTedgeDir; + use tokio::task::JoinHandle; + use tracing::Level; + + #[tokio::test] + // #[should_panic] + async fn panics_when_task_panics() { + tedge_config::system_services::set_log_level(Level::DEBUG); + let TestHandle { + mut mqtt, + handle: actor_handle, + .. + } = spawn_operation_actor().await; + + let mqtt_schema = MqttSchema::new(); + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_metadata = EntityMetadata::main_device("anything".to_string()); + + // spawn an operation to see if it's successfully joined when it's completed. + // particular operation used is not important, because we want to test only the handler. + // it would be even better if we could define some inline operation so test could be shorter + // TODO(marcel): don't assume operation implementations when testing the handler + let command = ConfigSnapshotCmd { + target: entity_topic_id, + cmd_id: "c8y-mapper-1".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Successful, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + let message = command.command_message(&mqtt_schema); + + mqtt.send((message, entity_metadata)).await.unwrap(); + drop(mqtt); + + actor_handle.await.unwrap(); + } + + struct TestHandle { + handle: JoinHandle<()>, + mqtt: SimpleMessageBox, + _dl: FakeServerBox, + _ul: FakeServerBox, + _c8y_proxy: FakeServerBox, + _ttd: TempTedgeDir, + } + + async fn spawn_operation_actor() -> TestHandle { + let auth_proxy_addr = "127.0.0.1".into(); + let auth_proxy_port = 8001; + let auth_proxy_protocol = Protocol::Http; + + let ttd = TempTedgeDir::new(); + let config = OperationHandlerConfig { + capabilities: Capabilities::default(), + auto_log_upload: AutoLogUpload::OnFailure, + tedge_http_host: Arc::from("127.0.0.1:8000"), + tmp_dir: ttd.utf8_path().into(), + software_management_api: tedge_config::SoftwareManagementApiFlag::Legacy, + mqtt_schema: MqttSchema::with_root("te".to_string()), + c8y_endpoint: C8yEndPoint::new("c8y.url", "c8y.url", "device_id"), + c8y_prefix: TopicPrefix::try_from("c8y").unwrap(), + auth_proxy: ProxyUrlGenerator::new( + auth_proxy_addr, + auth_proxy_port, + auth_proxy_protocol, + ), + id_generator: IdGenerator::new("c8y"), + smartrest_use_operation_id: true, + }; + + let mut mqtt_builder: SimpleMessageBoxBuilder< + PublishMessage, + (MqttMessage, EntityMetadata), + > = SimpleMessageBoxBuilder::new("MQTT", 10); + let mut c8y_proxy_builder: FakeServerBoxBuilder = + FakeServerBoxBuilder::default(); + let mut uploader_builder: FakeServerBoxBuilder = + FakeServerBoxBuilder::default(); + let mut downloader_builder: FakeServerBoxBuilder = + FakeServerBoxBuilder::default(); + + let operation_handler_builder = OperationHandlerBuilder::new( + config, + &mut mqtt_builder, + &mut uploader_builder, + &mut downloader_builder, + &mut c8y_proxy_builder, + ); + + let actor = operation_handler_builder.build(); + let handle = tokio::spawn(async move { actor.run().await.unwrap() }); + + TestHandle { + handle, + mqtt: mqtt_builder.build(), + _dl: downloader_builder.build(), + _ul: uploader_builder.build(), + _c8y_proxy: c8y_proxy_builder.build(), + _ttd: ttd, + } + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/operations/builder.rs b/crates/extensions/c8y_mapper_ext/src/operations/builder.rs new file mode 100644 index 00000000000..98373b0b151 --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/operations/builder.rs @@ -0,0 +1,135 @@ +use c8y_api::smartrest::topic::C8yTopic; +use c8y_http_proxy::handle::C8YHttpProxy; +use c8y_http_proxy::messages::C8YRestRequest; +use c8y_http_proxy::messages::C8YRestResult; +use tedge_actors::futures::channel::mpsc; +use tedge_actors::Builder; +use tedge_actors::ClientMessageBox; +use tedge_actors::CloneSender; +use tedge_actors::MessageSink; +use tedge_actors::MessageSource; +use tedge_actors::NoConfig; +use tedge_actors::RuntimeRequestSink; +use tedge_actors::Service; +use tedge_actors::SimpleMessageBoxBuilder; +use tedge_api::entity_store::EntityMetadata; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::ChannelFilter; +use tedge_mqtt_ext::MqttMessage; + +use crate::actor::IdDownloadRequest; +use crate::actor::IdDownloadResult; +use crate::actor::IdUploadRequest; +use crate::actor::IdUploadResult; +use crate::actor::PublishMessage; + +use super::actor::OperationHandlerActor; +use super::actor::RunningOperations; +use super::handler::OperationHandlerConfig; +use super::handlers::OperationMessage; +use super::EntityTarget; +use super::OperationHandler; + +pub struct OperationHandlerBuilder { + operation_handler: OperationHandler, + box_builder: SimpleMessageBoxBuilder, +} + +impl OperationHandlerBuilder { + pub fn new( + config: OperationHandlerConfig, + + mqtt: &mut (impl MessageSource<(MqttMessage, EntityMetadata), Vec> + + MessageSink), + uploader: &mut impl Service, + downloader: &mut impl Service, + + http: &mut impl Service, + ) -> Self { + // if there are any outgoing MQTT messages, send them immediately + let (operation_handler_sender, _) = mpsc::channel::(10); + + let uploader = ClientMessageBox::new(uploader); + let downloader = ClientMessageBox::new(downloader); + + let c8y_http_proxy = C8YHttpProxy::new(http); + + // TODO(marcel): discarding EntityFilter portion because C8yMapperActor doesn't support it, perhaps it should + let channel_filter = OperationHandler::topic_filter(&config.capabilities) + .into_iter() + .map(|f| f.1) + .collect::>(); + + let mut box_builder: SimpleMessageBoxBuilder = + SimpleMessageBoxBuilder::new("OperationHandlerActor", 10); + + box_builder.connect_mapped_source(channel_filter, mqtt, Self::mqtt_message_parser(&config)); + + mqtt.connect_source(NoConfig, &mut box_builder); + + let operation_handler = OperationHandler::new( + config, + downloader, + uploader, + operation_handler_sender.sender_clone(), + c8y_http_proxy, + ); + + Self { + operation_handler, + box_builder, + } + } + + fn mqtt_message_parser( + config: &OperationHandlerConfig, + ) -> impl Fn((MqttMessage, EntityMetadata)) -> Option { + let mqtt_schema = config.mqtt_schema.clone(); + let prefix = config.c8y_prefix.clone(); + + move |(message, metadata)| { + let (_, channel) = mqtt_schema.entity_channel_of(&message.topic).unwrap(); + + // if not Command, then CommandMetadata + let Channel::Command { operation, cmd_id } = channel else { + return None; + }; + + let smartrest_publish_topic = C8yTopic::smartrest_response_topic(&metadata, &prefix) + .expect("should create a valid topic"); + + Some(OperationMessage { + message, + operation, + cmd_id: cmd_id.into(), + entity: EntityTarget { + topic_id: metadata.topic_id, + external_id: metadata.external_id, + smartrest_publish_topic, + }, + }) + } + } +} + +impl Builder for OperationHandlerBuilder { + type Error = std::convert::Infallible; + + fn try_build(self) -> Result { + let context = self.operation_handler.context.clone(); + Ok(OperationHandlerActor { + operation_handler: self.operation_handler, + messages: self.box_builder.build(), + running_operations: RunningOperations { + current_statuses: Default::default(), + context, + }, + }) + } +} + +impl RuntimeRequestSink for OperationHandlerBuilder { + fn get_signal_sender(&self) -> tedge_actors::DynSender { + self.box_builder.get_signal_sender() + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs index 388fb2bf4fe..f22ad656232 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs @@ -8,18 +8,26 @@ use crate::actor::IdUploadResult; use crate::config::C8yMapperConfig; use crate::Capabilities; use c8y_api::http_proxy::C8yEndPoint; +use c8y_auth_proxy::url::Protocol; use c8y_auth_proxy::url::ProxyUrlGenerator; use c8y_http_proxy::handle::C8YHttpProxy; +use camino::Utf8Path; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; use tedge_actors::ClientMessageBox; -use tedge_actors::LoggingSender; +use tedge_actors::DynSender; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::ChannelFilter; use tedge_api::mqtt_topics::EntityFilter; +use tedge_api::mqtt_topics::IdGenerator; +use tedge_api::mqtt_topics::MqttSchema; use tedge_api::workflow::GenericCommandState; +use tedge_config::AutoLogUpload; +use tedge_config::SoftwareManagementApiFlag; +use tedge_config::TopicPrefix; use tedge_mqtt_ext::MqttMessage; +use tokio::task::JoinHandle; use tracing::debug; use tracing::error; use tracing::warn; @@ -42,46 +50,41 @@ use tracing::warn; /// performs an operation in the background in separate tasks. The operation tasks themselves handle /// reporting their success/failure. pub struct OperationHandler { - context: Arc, - running_operations: HashMap, RunningOperation>, + pub(super) context: Arc, + pub(super) running_operations: HashMap, RunningOperation>, } impl OperationHandler { pub fn new( - c8y_mapper_config: &C8yMapperConfig, + config: OperationHandlerConfig, downloader: ClientMessageBox, uploader: ClientMessageBox, - mqtt_publisher: LoggingSender, + mqtt_publisher: DynSender, http_proxy: C8YHttpProxy, - auth_proxy: ProxyUrlGenerator, ) -> Self { Self { context: Arc::new(OperationContext { - capabilities: c8y_mapper_config.capabilities, - auto_log_upload: c8y_mapper_config.auto_log_upload, - tedge_http_host: c8y_mapper_config.tedge_http_host.clone(), - tmp_dir: c8y_mapper_config.tmp_dir.clone(), - mqtt_schema: c8y_mapper_config.mqtt_schema.clone(), - mqtt_publisher: mqtt_publisher.clone(), - software_management_api: c8y_mapper_config.software_management_api, - smart_rest_use_operation_id: c8y_mapper_config.smartrest_use_operation_id, + capabilities: config.capabilities, + auto_log_upload: config.auto_log_upload, + tedge_http_host: config.tedge_http_host, + tmp_dir: config.tmp_dir, + mqtt_schema: config.mqtt_schema, + software_management_api: config.software_management_api, + c8y_endpoint: config.c8y_endpoint, + auth_proxy: config.auth_proxy.clone(), + mqtt_publisher: mqtt_publisher.sender_clone(), + smart_rest_use_operation_id: config.smartrest_use_operation_id, // TODO(marcel): would be good not to generate new ids from running operations, see if // we can remove it somehow - command_id: c8y_mapper_config.id_generator(), + command_id: config.id_generator, downloader, uploader, - c8y_endpoint: C8yEndPoint::new( - &c8y_mapper_config.c8y_host, - &c8y_mapper_config.c8y_mqtt, - &c8y_mapper_config.device_id, - ), http_proxy: http_proxy.clone(), - auth_proxy: auth_proxy.clone(), }), running_operations: Default::default(), @@ -216,6 +219,33 @@ impl OperationHandler { } } + pub fn handle_spawn(&mut self, entity: EntityTarget, message: MqttMessage) -> JoinHandle<()> { + let context = self.context.clone(); + tokio::spawn(async move { + let Ok((_, channel)) = context.mqtt_schema.entity_channel_of(&message.topic) else { + return; + }; + + let Channel::Command { operation, cmd_id } = channel else { + return; + }; + + // don't process sub-workflow calls + if cmd_id.starts_with("sub:") { + return; + } + + let message = OperationMessage { + operation, + entity, + cmd_id: cmd_id.into(), + message, + }; + + context.update(message).await; + }) + } + /// A topic filter for operation types this object can handle. /// /// The MQTT client should subscribe to topics with this filter to receive MQTT messages that it @@ -253,6 +283,11 @@ impl OperationHandler { (AnyEntity, CommandMetadata(OperationType::FirmwareUpdate)), ]); } + topics.extend([ + (AnyEntity, Command(OperationType::Restart)), + (AnyEntity, Command(OperationType::SoftwareList)), + (AnyEntity, Command(OperationType::SoftwareUpdate)), + ]); if capabilities.device_profile { topics.extend([ @@ -264,17 +299,63 @@ impl OperationHandler { topics } } -struct RunningOperation { - handle: tokio::task::JoinHandle<()>, - status: String, + +#[derive(Debug, Clone)] +pub struct OperationHandlerConfig { + pub capabilities: Capabilities, + pub auto_log_upload: AutoLogUpload, + pub tedge_http_host: Arc, + pub tmp_dir: Arc, + pub software_management_api: SoftwareManagementApiFlag, + pub c8y_endpoint: C8yEndPoint, + pub mqtt_schema: MqttSchema, + pub c8y_prefix: TopicPrefix, + pub auth_proxy: ProxyUrlGenerator, + pub id_generator: IdGenerator, + pub smartrest_use_operation_id: bool, +} + +impl OperationHandlerConfig { + fn from_mapper_config(config: &C8yMapperConfig) -> OperationHandlerConfig { + OperationHandlerConfig { + capabilities: config.capabilities, + auto_log_upload: config.auto_log_upload, + tedge_http_host: config.tedge_http_host.clone(), + tmp_dir: config.tmp_dir.clone(), + software_management_api: config.software_management_api, + c8y_endpoint: C8yEndPoint::new(&config.c8y_host, &config.c8y_mqtt, &config.device_id), + mqtt_schema: config.mqtt_schema.clone(), + c8y_prefix: config.c8y_prefix.clone(), + auth_proxy: ProxyUrlGenerator::new( + config.auth_proxy_addr.clone(), + config.auth_proxy_port, + Protocol::Http, + ), + id_generator: config.id_generator(), + smartrest_use_operation_id: config.smartrest_use_operation_id, + } + } +} + +impl C8yMapperConfig { + pub fn to_operation_handler_config(&self) -> OperationHandlerConfig { + OperationHandlerConfig::from_mapper_config(self) + } +} + +#[derive(Debug)] +pub(super) struct RunningOperation { + pub(super) handle: tokio::task::JoinHandle<()>, + pub(super) status: String, } // TODO: logic of which status transitions are valid should be defined in tedge_api and be // considered together with custom statuses of custom workflows -fn is_operation_status_transition_valid(previous: &str, next: &str) -> bool { +pub fn is_operation_status_transition_valid(previous: &str, next: &str) -> bool { #[allow(clippy::match_like_matches_macro)] match (previous, next) { // not really a transition but false to make sure we're not sending multiple smartrest msgs + // FIXME: this will blow if prev and next are empty! (clearing messages) (prev, next) if prev == next => false, // successful and failed are terminal, can't change them @@ -291,13 +372,13 @@ mod tests { use std::time::Duration; - use c8y_auth_proxy::url::Protocol; use c8y_http_proxy::messages::C8YRestRequest; use c8y_http_proxy::messages::C8YRestResult; use tedge_actors::test_helpers::FakeServerBox; use tedge_actors::test_helpers::FakeServerBoxBuilder; use tedge_actors::test_helpers::MessageReceiverExt; use tedge_actors::Builder; + use tedge_actors::LoggingSender; use tedge_actors::MessageReceiver; use tedge_actors::MessageSink; use tedge_actors::Sender; @@ -836,17 +917,12 @@ mod tests { FakeServerBoxBuilder::default(); let downloader = ClientMessageBox::new(&mut downloader_builder); - let auth_proxy_addr = c8y_mapper_config.auth_proxy_addr.clone(); - let auth_proxy_port = c8y_mapper_config.auth_proxy_port; - let auth_proxy = ProxyUrlGenerator::new(auth_proxy_addr, auth_proxy_port, Protocol::Http); - let operation_handler = OperationHandler::new( - &c8y_mapper_config, + c8y_mapper_config.to_operation_handler_config(), downloader, uploader, - mqtt_publisher, + mqtt_publisher.into(), c8y_proxy, - auth_proxy, ); let mqtt = mqtt_builder.build(); diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs index 3de054869f7..4450b1555af 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs @@ -31,7 +31,7 @@ use c8y_http_proxy::handle::C8YHttpProxy; use camino::Utf8Path; use std::sync::Arc; use tedge_actors::ClientMessageBox; -use tedge_actors::LoggingSender; +use tedge_actors::DynSender; use tedge_actors::Sender; use tedge_api::entity_store::EntityExternalId; use tedge_api::mqtt_topics::EntityTopicId; @@ -42,7 +42,6 @@ use tedge_api::workflow::GenericCommandState; use tedge_config::AutoLogUpload; use tedge_config::SoftwareManagementApiFlag; use tedge_mqtt_ext::MqttMessage; -use tedge_mqtt_ext::QoS; use tedge_mqtt_ext::Topic; use tracing::debug; use tracing::error; @@ -55,6 +54,7 @@ pub(super) struct OperationContext { pub(super) tmp_dir: Arc, pub(super) mqtt_schema: MqttSchema, pub(super) software_management_api: SoftwareManagementApiFlag, + pub(super) command_id: IdGenerator, pub(super) smart_rest_use_operation_id: bool, @@ -64,11 +64,33 @@ pub(super) struct OperationContext { pub(super) downloader: ClientMessageBox, pub(super) uploader: ClientMessageBox, - pub(super) mqtt_publisher: LoggingSender, + pub(super) mqtt_publisher: DynSender, } impl OperationContext { + // will be removed pub async fn update(&self, message: OperationMessage) { + let outcome = self.report(message.clone()).await; + let mut mqtt_publisher = self.mqtt_publisher.sender_clone(); + + match outcome { + OperationOutcome::Ignored => {} + OperationOutcome::Executing { extra_messages } => { + for message in extra_messages { + mqtt_publisher.send(message).await.unwrap(); + } + } + OperationOutcome::Finished { messages } => { + for message in messages { + mqtt_publisher.send(message).await.unwrap(); + } + let clearing_message = MqttMessage::new(&message.message.topic, []).with_retain(); + mqtt_publisher.send(clearing_message).await.unwrap(); + } + } + } + + pub async fn report(&self, message: OperationMessage) -> OperationOutcome { let OperationMessage { entity, cmd_id, @@ -81,7 +103,7 @@ impl OperationContext { Ok(command) => command, Err(err) => { error!(%err, ?message, "could not parse command payload"); - return; + return OperationOutcome::Ignored; } }; @@ -105,22 +127,17 @@ impl OperationContext { OperationType::SoftwareList => { let result = self.publish_software_list(&entity, &cmd_id, &message).await; - let mut mqtt_publisher = self.mqtt_publisher.clone(); match result { Err(err) => { error!("Fail to list installed software packages: {err}"); + return OperationOutcome::Finished { messages: vec![] }; } Ok(OperationOutcome::Finished { messages }) => { - for message in messages { - mqtt_publisher.send(message).await.unwrap(); - } + return OperationOutcome::Finished { messages }; } // command is not yet finished, avoid clearing the command topic - Ok(_) => return, + Ok(outcome) => return outcome, } - - clear_command_topic(command, &mut mqtt_publisher).await; - return; } OperationType::SoftwareUpdate => { self.publish_software_update_status(&entity, &cmd_id, &message) @@ -148,8 +165,6 @@ impl OperationContext { } }; - let mut mqtt_publisher = self.mqtt_publisher.clone(); - // unwrap is safe: at this point all local operations that are not regular c8y // operations should be handled above let c8y_operation = to_c8y_operation(&operation).unwrap(); @@ -160,7 +175,7 @@ impl OperationContext { &entity.smartrest_publish_topic, &cmd_id, ) { - OperationOutcome::Ignored => {} + OperationOutcome::Ignored => OperationOutcome::Ignored, OperationOutcome::Executing { mut extra_messages } => { let c8y_state_executing_payload = match self.get_operation_id(&cmd_id) { Some(op_id) if self.smart_rest_use_operation_id => { @@ -175,11 +190,12 @@ impl OperationContext { let mut messages = vec![c8y_state_executing_message]; messages.append(&mut extra_messages); - for message in messages { - mqtt_publisher.send(message).await.unwrap(); + OperationOutcome::Executing { + extra_messages: messages, } } OperationOutcome::Finished { messages } => { + // TODO(marcel): uploading logs should be pulled out if let Err(e) = self .upload_operation_log(&external_id, &cmd_id, &operation, &command) .await @@ -187,11 +203,7 @@ impl OperationContext { error!("failed to upload operation logs: {e}"); } - for message in messages { - mqtt_publisher.send(message).await.unwrap(); - } - - clear_command_topic(command, &mut mqtt_publisher).await; + OperationOutcome::Finished { messages } } } } @@ -258,18 +270,6 @@ impl OperationContext { } } -async fn clear_command_topic( - command: GenericCommandState, - mqtt_publisher: &mut LoggingSender, -) { - let command = command.clear(); - let clearing_message = command.into_message(); - assert!(clearing_message.payload_bytes().is_empty()); - assert!(clearing_message.retain); - assert_eq!(clearing_message.qos, QoS::AtLeastOnce); - mqtt_publisher.send(clearing_message).await.unwrap(); -} - /// Result of an update of operation's state. /// /// When a new MQTT message is received with an updated state of the operation, the mapper needs to diff --git a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs index 9c59f1d56b0..97da22d6107 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs @@ -26,3 +26,7 @@ mod handlers; pub use handlers::EntityTarget; mod upload; + +mod actor; +mod builder; +pub use builder::OperationHandlerBuilder; diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index b224bd75dc4..77129421a87 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -9,6 +9,7 @@ use crate::actor::IdUploadResult; use crate::actor::PublishMessage; use crate::availability::AvailabilityBuilder; use crate::operations::OperationHandler; +use crate::operations::OperationHandlerBuilder; use crate::Capabilities; use assert_json_diff::assert_json_include; use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; @@ -26,7 +27,6 @@ use std::time::SystemTime; use tedge_actors::test_helpers::FakeServerBox; use tedge_actors::test_helpers::FakeServerBoxBuilder; use tedge_actors::test_helpers::MessageReceiverExt; -use tedge_actors::Actor; use tedge_actors::Builder; use tedge_actors::MessageReceiver; use tedge_actors::MessageSink; @@ -35,6 +35,7 @@ use tedge_actors::NoMessage; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; +use tedge_api::mqtt_topics::ChannelFilter; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_config::AutoLogUpload; @@ -2281,21 +2282,7 @@ async fn c8y_mapper_nested_child_service_event_mapping_to_smartrest() { async fn mapper_processes_operations_concurrently() { let num_operations = 20; - let mut fts_server = mockito::Server::new(); - let _mock = fts_server - .mock( - "GET", - "/tedge/file-transfer/test-device/config_snapshot/c8y-mapper-1234", - ) - // make each download block so it doesn't complete before we submit all operations - .with_chunked_body(|_w| { - std::thread::sleep(Duration::from_secs(5)); - Ok(()) - }) - .expect(num_operations) - .create_async() - .await; - let host_port = fts_server.host_with_port(); + let host_port = "localhost:8888"; let cfg_dir = TempTedgeDir::new(); let TestHandle { @@ -2796,12 +2783,10 @@ pub(crate) async fn spawn_c8y_mapper_actor_with_config( let bridge_health_topic = config.bridge_health_topic.clone(); let mut c8y_mapper_builder = C8yMapperBuilder::try_new( - config, + config.clone(), &mut mqtt_builder, &mut c8y_proxy_builder, &mut timer_builder, - &mut uploader_builder, - &mut downloader_builder, &mut fs_watcher_builder, &mut service_monitor_builder, ) @@ -2809,12 +2794,31 @@ pub(crate) async fn spawn_c8y_mapper_actor_with_config( let mut availability_box_builder: SimpleMessageBoxBuilder = SimpleMessageBoxBuilder::new("Availability", 10); - availability_box_builder - .connect_source(AvailabilityBuilder::channels(), &mut c8y_mapper_builder); + availability_box_builder.connect_source::>( + AvailabilityBuilder::channels(), + &mut c8y_mapper_builder, + ); c8y_mapper_builder.connect_source(NoConfig, &mut availability_box_builder); - let actor = c8y_mapper_builder.build(); - tokio::spawn(async move { actor.run().await }); + let operation_handler_builder = OperationHandlerBuilder::new( + config.to_operation_handler_config(), + &mut c8y_mapper_builder, + &mut uploader_builder, + &mut downloader_builder, + &mut c8y_proxy_builder, + ); + + let mut runtime = tedge_actors::Runtime::new(); + + runtime.spawn(operation_handler_builder).await.unwrap(); + runtime.spawn(c8y_mapper_builder).await.unwrap(); + + tokio::spawn(async move { + runtime.run_to_completion().await.unwrap(); + }); + + // let c8y_mapper_actor = c8y_mapper_builder.build(); + // tokio::spawn(async move { c8y_mapper_actor.run().await }); let mut service_monitor_box = service_monitor_builder.build(); let bridge_status_msg = MqttMessage::new(&bridge_health_topic, "1"); From e4c5aa0d26571221ce196b1d674bfc2c04543d21 Mon Sep 17 00:00:00 2001 From: Marcel Guzik Date: Thu, 12 Sep 2024 20:00:20 +0200 Subject: [PATCH 2/2] Extract handling device control topic messages from converter Signed-off-by: Marcel Guzik --- crates/extensions/c8y_mapper_ext/src/lib.rs | 15 + .../src/operations/c8y_operations.rs | 406 ++++++++++++++++++ .../c8y_mapper_ext/src/operations/mod.rs | 2 + 3 files changed, 423 insertions(+) create mode 100644 crates/extensions/c8y_mapper_ext/src/operations/c8y_operations.rs diff --git a/crates/extensions/c8y_mapper_ext/src/lib.rs b/crates/extensions/c8y_mapper_ext/src/lib.rs index 73cd20e2a03..e5fc286986f 100644 --- a/crates/extensions/c8y_mapper_ext/src/lib.rs +++ b/crates/extensions/c8y_mapper_ext/src/lib.rs @@ -1,3 +1,5 @@ +use c8y_api::json_c8y_deserializer::C8yDeviceControlOperation; + pub mod actor; pub mod alarm_converter; pub mod availability; @@ -24,6 +26,19 @@ pub struct Capabilities { pub device_profile: bool, } +impl Capabilities { + pub fn is_enabled(&self, operation: &C8yDeviceControlOperation) -> bool { + match operation { + C8yDeviceControlOperation::LogfileRequest(_) => self.log_upload, + C8yDeviceControlOperation::UploadConfigFile(_) => self.config_snapshot, + C8yDeviceControlOperation::DownloadConfigFile(_) => self.config_update, + C8yDeviceControlOperation::Firmware(_) => self.firmware_update, + C8yDeviceControlOperation::DeviceProfile(_) => self.device_profile, + _ => true, + } + } +} + #[cfg(test)] impl Default for Capabilities { fn default() -> Self { diff --git a/crates/extensions/c8y_mapper_ext/src/operations/c8y_operations.rs b/crates/extensions/c8y_mapper_ext/src/operations/c8y_operations.rs new file mode 100644 index 00000000000..8dd233e9973 --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/operations/c8y_operations.rs @@ -0,0 +1,406 @@ +//! Keeps track of c8y operations received from the cloud and responds to them. + +use std::{collections::HashMap, sync::Arc}; + +use c8y_api::{ + http_proxy::C8yEndPoint, + json_c8y_deserializer::{ + C8yDeviceControlOperation, C8yDeviceProfile, C8yDownloadConfigFile, C8yFirmware, + C8yLogfileRequest, C8yOperation, C8yRestart, C8ySoftwareUpdate, C8yUploadConfigFile, + }, +}; +use c8y_auth_proxy::url::ProxyUrlGenerator; +use tedge_api::{ + commands::{ + ConfigSnapshotCmdPayload, ConfigUpdateCmdPayload, FirmwareUpdateCmdPayload, + LogUploadCmdPayload, + }, + device_profile::DeviceProfileCmdPayload, + entity_store::EntityMetadata, + mqtt_topics::{Channel, EntityTopicId, IdGenerator, MqttSchema, OperationType}, + CommandStatus, DownloadInfo, Jsonify, RestartCommand, +}; +use tedge_mqtt_ext::MqttMessage; +use tracing::{error, warn}; + +use crate::{ + error::{ConversionError, CumulocityMapperError}, + Capabilities, +}; + +type OperationId = Arc; + +#[derive(Debug, Clone)] +struct C8yOperations { + active_c8y_operations: HashMap, + capabilities: Capabilities, + xid_to_metadata: HashMap, EntityMetadata>, + mqtt_schema: MqttSchema, + command_id: IdGenerator, + c8y_endpoint: C8yEndPoint, + auth_proxy: ProxyUrlGenerator, + tedge_http_host: Arc, +} + +impl C8yOperations { + pub fn new(capabilities: Capabilities) -> Self { + Self { + active_c8y_operations: HashMap::new(), + capabilities, + xid_to_metadata: HashMap::new(), + mqtt_schema: MqttSchema::new(), + command_id: IdGenerator::new("peniz"), + c8y_endpoint: C8yEndPoint::new("peniz", "peniz", "peniz"), + auth_proxy: ProxyUrlGenerator::new( + "peniz".into(), + 2137, + c8y_auth_proxy::url::Protocol::Http, + ), + tedge_http_host: Arc::from("peniz"), + } + } + + pub fn register(&mut self, operation: C8yOperation) { + let entity_xid = &operation.external_source.external_id; + + let c8y_device_control_operation = + C8yDeviceControlOperation::from_json_object(&operation.extras).unwrap(); + + if !self.capabilities.is_enabled(&c8y_device_control_operation) { + warn!("Received an operation which is disabled in configuration"); + return; + } + + let entity_metadata = self.xid_to_metadata.get(entity_xid.as_str()).unwrap(); + let cmd_id = self.command_id.new_id(); + + let msgs = match c8y_device_control_operation { + C8yDeviceControlOperation::Restart(request) => self + .forward_restart_request(entity_metadata, cmd_id) + .unwrap(), + C8yDeviceControlOperation::SoftwareUpdate(request) => self + .forward_software_request(entity_metadata, cmd_id, request) + .unwrap(), + C8yDeviceControlOperation::LogfileRequest(request) => self + .convert_log_upload_request(entity_metadata, cmd_id, request) + .unwrap(), + C8yDeviceControlOperation::UploadConfigFile(request) => self + .convert_config_snapshot_request(entity_metadata, cmd_id, request) + .unwrap(), + C8yDeviceControlOperation::DownloadConfigFile(request) => self + .convert_config_update_request(entity_metadata, cmd_id, request) + .unwrap(), + C8yDeviceControlOperation::Firmware(request) => self + .convert_firmware_update_request(entity_metadata, cmd_id, request) + .unwrap(), + C8yDeviceControlOperation::DeviceProfile(request) => { + if let Some(profile_name) = operation.extras.get("profileName") { + self.convert_device_profile_request( + entity_metadata, + cmd_id, + request, + serde_json::from_value(profile_name.clone()).unwrap(), + ) + .unwrap() + } else { + error!("Received a c8y_DeviceProfile without a profile name"); + vec![] + } + } + C8yDeviceControlOperation::Custom => { + // Ignores custom and static template operations unsupported by thin-edge + // However, these operations can be addressed by SmartREST that is published together with JSON over MQTT + vec![] + } + }; + + let id = Arc::from(operation.op_id.as_str()); + self.active_c8y_operations.insert(id, operation); + + // send local MQTT command + } + + fn forward_restart_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + ) -> Result, CumulocityMapperError> { + let command = RestartCommand::new(&entity.topic_id, cmd_id); + let message = command.command_message(&self.mqtt_schema); + Ok(vec![message]) + } + + fn forward_software_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + software_update_request: C8ySoftwareUpdate, + ) -> Result, CumulocityMapperError> { + let mut command = + software_update_request.into_software_update_command(&entity.topic_id, cmd_id)?; + + command.payload.update_list.iter_mut().for_each(|modules| { + modules.modules.iter_mut().for_each(|module| { + if let Some(url) = &mut module.url { + *url = if let Some(cumulocity_url) = + self.c8y_endpoint.maybe_tenant_url(url.url()) + { + DownloadInfo::new(self.auth_proxy.proxy_url(cumulocity_url).as_ref()) + } else { + DownloadInfo::new(url.url()) + }; + } + }); + }); + + let message = command.command_message(&self.mqtt_schema); + Ok(vec![message]) + } + + /// Convert c8y_UploadConfigFile JSON over MQTT operation to ThinEdge config_snapshot command + pub fn convert_config_snapshot_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + config_upload_request: C8yUploadConfigFile, + ) -> Result, CumulocityMapperError> { + let channel = Channel::Command { + operation: OperationType::ConfigSnapshot, + cmd_id: cmd_id.clone(), + }; + let topic = self.mqtt_schema.topic_for(&entity.topic_id, &channel); + + // Replace '/' with ':' to avoid creating unexpected directories in file transfer repo + let tedge_url = format!( + "http://{}/tedge/file-transfer/{}/config_snapshot/{}-{}", + &self.tedge_http_host, + entity.external_id.as_ref(), + config_upload_request.config_type.replace('/', ":"), + cmd_id + ); + + let request = ConfigSnapshotCmdPayload { + status: CommandStatus::Init, + tedge_url: Some(tedge_url), + config_type: config_upload_request.config_type, + path: None, + log_path: None, + }; + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } + + /// Convert c8y_LogfileRequest operation to a ThinEdge log_upload command + pub fn convert_log_upload_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + log_request: C8yLogfileRequest, + ) -> Result, CumulocityMapperError> { + let channel = Channel::Command { + operation: OperationType::LogUpload, + cmd_id: cmd_id.clone(), + }; + let topic = self.mqtt_schema.topic_for(&entity.topic_id, &channel); + + let tedge_url = format!( + "http://{}/tedge/file-transfer/{}/log_upload/{}-{}", + &self.tedge_http_host, + entity.external_id.as_ref(), + log_request.log_file, + cmd_id + ); + + let request = LogUploadCmdPayload { + status: CommandStatus::Init, + tedge_url, + log_type: log_request.log_file, + date_from: log_request.date_from, + date_to: log_request.date_to, + search_text: Some(log_request.search_text).filter(|s| !s.is_empty()), + lines: log_request.maximum_lines, + log_path: None, + }; + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } + + /// Convert c8y_Firmware JSON over MQTT operation to ThinEdge firmware_update command. + pub fn convert_firmware_update_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + firmware_request: C8yFirmware, + ) -> Result, CumulocityMapperError> { + let channel = Channel::Command { + operation: OperationType::FirmwareUpdate, + cmd_id, + }; + let topic = self.mqtt_schema.topic_for(&entity.topic_id, &channel); + + let tedge_url = + if let Some(c8y_url) = self.c8y_endpoint.maybe_tenant_url(&firmware_request.url) { + self.auth_proxy.proxy_url(c8y_url).to_string() + } else { + firmware_request.url.clone() + }; + + let request = FirmwareUpdateCmdPayload { + status: CommandStatus::Init, + tedge_url: Some(tedge_url), + remote_url: firmware_request.url, + name: firmware_request.name, + version: firmware_request.version, + log_path: None, + }; + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } + + /// Upon receiving a SmartREST c8y_DownloadConfigFile request, convert it to a message on the + /// command channel. + pub fn convert_config_update_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + config_download_request: C8yDownloadConfigFile, + ) -> Result, CumulocityMapperError> { + let config_download_request: &C8yDownloadConfigFile = &config_download_request; + let channel = Channel::Command { + operation: OperationType::ConfigUpdate, + cmd_id: cmd_id.to_string(), + }; + let topic = self.mqtt_schema.topic_for(&entity.topic_id, &channel); + + let proxy_url = self + .c8y_endpoint + .maybe_tenant_url(&config_download_request.url) + .map(|cumulocity_url| self.auth_proxy.proxy_url(cumulocity_url).into()); + + let remote_url = proxy_url.unwrap_or(config_download_request.url.to_string()); + + let request = ConfigUpdateCmdPayload { + status: CommandStatus::Init, + tedge_url: None, + remote_url, + config_type: config_download_request.config_type.clone(), + path: None, + log_path: None, + }; + + // Command messages must be retained + let messages = vec![MqttMessage::new(&topic, request.to_json()).with_retain()]; + Ok(messages) + } + + /// Convert c8y_DeviceProfile JSON over MQTT operation to ThinEdge device_profile command. + pub fn convert_device_profile_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + device_profile_request: C8yDeviceProfile, + profile_name: String, + ) -> Result, CumulocityMapperError> { + let channel = Channel::Command { + operation: OperationType::DeviceProfile, + cmd_id, + }; + let topic = self.mqtt_schema.topic_for(&entity.topic_id, &channel); + + let mut request = DeviceProfileCmdPayload { + status: CommandStatus::Init, + name: profile_name, + operations: Vec::new(), + }; + + if let Some(mut firmware) = device_profile_request.firmware { + if let Some(cumulocity_url) = self.c8y_endpoint.maybe_tenant_url(&firmware.url) { + firmware.url = self.auth_proxy.proxy_url(cumulocity_url).into(); + } + request.add_firmware(firmware.into()); + } + + if let Some(mut software) = device_profile_request.software { + software.lists.iter_mut().for_each(|module| { + if let Some(url) = &mut module.url { + if let Some(cumulocity_url) = self.c8y_endpoint.maybe_tenant_url(url) { + *url = self.auth_proxy.proxy_url(cumulocity_url).into(); + } + } + }); + request.add_software(software.try_into()?); + } + + for mut config in device_profile_request.configuration { + if let Some(cumulocity_url) = self.c8y_endpoint.maybe_tenant_url(&config.url) { + config.url = self.auth_proxy.proxy_url(cumulocity_url).into(); + } + request.add_config(config.into()); + } + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } +} + +/// Converts C8y operations into local MQTT commands +trait IntoCommand { + fn into_command( + self, + topic_id: &EntityTopicId, + cmd_id: String, + mqtt_schema: &MqttSchema, + ) -> Result, anyhow::Error>; +} + +impl IntoCommand for C8yRestart { + fn into_command( + self, + topic_id: &EntityTopicId, + cmd_id: String, + mqtt_schema: &MqttSchema, + ) -> Result, anyhow::Error> { + let command = RestartCommand::new(topic_id, cmd_id); + let message = command.command_message(mqtt_schema); + Ok(vec![message]) + } +} + +// impl IntoCommand for C8ySoftwareUpdate { +// fn into_command( +// self, +// topic_id: &EntityTopicId, +// cmd_id: String, +// mqtt_schema: &MqttSchema, +// ) -> Result, anyhow::Error> { +// let mut command = self.into_software_update_command(topic_id, cmd_id)?; + +// command.payload.update_list.iter_mut().for_each(|modules| { +// modules.modules.iter_mut().for_each(|module| { +// if let Some(url) = &mut module.url { +// *url = if let Some(cumulocity_url) = +// self.c8y_endpoint.maybe_tenant_url(url.url()) +// { +// DownloadInfo::new(self.auth_proxy.proxy_url(cumulocity_url).as_ref()) +// } else { +// DownloadInfo::new(url.url()) +// }; +// } +// }); +// }); + +// let message = command.command_message(&self.mqtt_schema); +// Ok(vec![message]) +// } +// } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs index 97da22d6107..d77006167a6 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs @@ -30,3 +30,5 @@ mod upload; mod actor; mod builder; pub use builder::OperationHandlerBuilder; + +mod c8y_operations;