Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add OperationHandlerActor #3032

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,24 @@ impl TEdgeComponent for CumulocityMapper {
)?);

let mut c8y_mapper_actor = C8yMapperBuilder::try_new(
c8y_mapper_config,
c8y_mapper_config.clone(),
&mut mqtt_actor,
&mut c8y_http_proxy_actor,
&mut timer_actor,
&mut uploader_actor,
&mut downloader_actor,
&mut fs_watch_actor,
&mut service_monitor_actor,
)?;

let c8y_prefix = &tedge_config.c8y.bridge.topic_prefix;

let operation_handler_actor = c8y_mapper_ext::operations::OperationHandlerBuilder::new(
c8y_mapper_config.to_operation_handler_config(),
&mut c8y_mapper_actor,
&mut uploader_actor,
&mut downloader_actor,
&mut c8y_http_proxy_actor,
);

// Adaptor translating commands sent on te/device/main///cmd/+/+ into requests on tedge/commands/req/+/+
// and translating the responses received on tedge/commands/res/+/+ to te/device/main///cmd/+/+
let old_to_new_agent_adapter = OldAgentAdapter::builder(c8y_prefix, &mut mqtt_actor);
Expand Down Expand Up @@ -252,6 +259,7 @@ impl TEdgeComponent for CumulocityMapper {
if let Some(availability_actor) = availability_actor {
runtime.spawn(availability_actor).await?;
}
runtime.spawn(operation_handler_actor).await?;
runtime.run_to_completion().await?;

Ok(())
Expand Down
1 change: 0 additions & 1 deletion crates/extensions/c8y_mapper_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ url = { workspace = true }
[dev-dependencies]
assert-json-diff = { workspace = true }
assert_matches = { workspace = true }
mockito = { workspace = true }
proptest = { workspace = true }
rand = { workspace = true }
tedge_actors = { workspace = true, features = ["test-helpers"] }
Expand Down
61 changes: 49 additions & 12 deletions crates/extensions/c8y_mapper_ext/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::time::Duration;
use tedge_actors::fan_in_message_type;
use tedge_actors::Actor;
use tedge_actors::Builder;
use tedge_actors::ClientMessageBox;
use tedge_actors::CloneSender;
use tedge_actors::DynSender;
use tedge_actors::LoggingSender;
Expand All @@ -27,6 +26,7 @@ use tedge_actors::Sender;
use tedge_actors::Service;
use tedge_actors::SimpleMessageBox;
use tedge_actors::SimpleMessageBoxBuilder;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::ChannelFilter;
Expand Down Expand Up @@ -80,6 +80,9 @@ pub struct C8yMapperActor {
timer_sender: LoggingSender<SyncStart>,
bridge_status_messages: SimpleMessageBox<MqttMessage, MqttMessage>,
message_handlers: HashMap<ChannelFilter, Vec<LoggingSender<MqttMessage>>>,
// these handlers require entity metadata, so the entity already has to be registered
registered_message_handlers:
HashMap<ChannelFilter, Vec<LoggingSender<(MqttMessage, EntityMetadata)>>>,
}

#[async_trait]
Expand Down Expand Up @@ -140,6 +143,10 @@ impl C8yMapperActor {
timer_sender: LoggingSender<SyncStart>,
bridge_status_messages: SimpleMessageBox<MqttMessage, MqttMessage>,
message_handlers: HashMap<ChannelFilter, Vec<LoggingSender<MqttMessage>>>,
registered_message_handlers: HashMap<
ChannelFilter,
Vec<LoggingSender<(MqttMessage, EntityMetadata)>>,
>,
) -> Self {
Self {
converter,
Expand All @@ -148,6 +155,7 @@ impl C8yMapperActor {
timer_sender,
bridge_status_messages,
message_handlers,
registered_message_handlers,
}
}

Expand Down Expand Up @@ -267,6 +275,25 @@ impl C8yMapperActor {
sender.send(message.clone()).await?;
}
}

if let Some(message_handler) = self.registered_message_handlers.get_mut(&channel.into()) {
let (entity, _) = self
.converter
.mqtt_schema
.entity_channel_of(&message.topic)
.expect("message should've been confirmed to be using MQTT topic scheme v1");

let entity = self
.converter
.entity_store
.get(&entity)
.expect("entity should've already been registered");

for sender in message_handler {
sender.send((message.clone(), entity.clone())).await?;
}
}

Ok(())
}

Expand Down Expand Up @@ -331,11 +358,11 @@ pub struct C8yMapperBuilder {
mqtt_publisher: DynSender<MqttMessage>,
http_proxy: C8YHttpProxy,
timer_sender: DynSender<SyncStart>,
downloader: ClientMessageBox<IdDownloadRequest, IdDownloadResult>,
uploader: ClientMessageBox<IdUploadRequest, IdUploadResult>,
auth_proxy: ProxyUrlGenerator,
bridge_monitor_builder: SimpleMessageBoxBuilder<MqttMessage, MqttMessage>,
message_handlers: HashMap<ChannelFilter, Vec<LoggingSender<MqttMessage>>>,
registered_message_handlers:
HashMap<ChannelFilter, Vec<LoggingSender<(MqttMessage, EntityMetadata)>>>,
}

impl C8yMapperBuilder {
Expand All @@ -345,8 +372,6 @@ impl C8yMapperBuilder {
mqtt: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
http: &mut impl Service<C8YRestRequest, C8YRestResult>,
timer: &mut impl Service<SyncStart, SyncComplete>,
uploader: &mut impl Service<IdUploadRequest, IdUploadResult>,
downloader: &mut impl Service<IdDownloadRequest, IdDownloadResult>,
fs_watcher: &mut impl MessageSource<FsWatchEvent, PathBuf>,
service_monitor: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
) -> Result<Self, FileError> {
Expand All @@ -360,9 +385,6 @@ impl C8yMapperBuilder {
let http_proxy = C8YHttpProxy::new(http);
let timer_sender = timer.connect_client(box_builder.get_sender().sender_clone());

let downloader = ClientMessageBox::new(downloader);
let uploader = ClientMessageBox::new(uploader);

fs_watcher.connect_sink(
config.ops_dir.as_std_path().to_path_buf(),
&box_builder.get_sender(),
Expand All @@ -382,18 +404,18 @@ impl C8yMapperBuilder {
);

let message_handlers = HashMap::new();
let registered_message_handlers = HashMap::new();

Ok(Self {
config,
box_builder,
mqtt_publisher,
http_proxy,
timer_sender,
uploader,
downloader,
auth_proxy,
bridge_monitor_builder,
message_handlers,
registered_message_handlers,
})
}

Expand Down Expand Up @@ -426,6 +448,22 @@ impl MessageSource<MqttMessage, Vec<ChannelFilter>> for C8yMapperBuilder {
}
}

impl MessageSource<(MqttMessage, EntityMetadata), Vec<ChannelFilter>> for C8yMapperBuilder {
fn connect_sink(
&mut self,
config: Vec<ChannelFilter>,
peer: &impl MessageSink<(MqttMessage, EntityMetadata)>,
) {
let sender = LoggingSender::new("Mapper MQTT".into(), peer.get_sender());
for channel in config {
self.registered_message_handlers
.entry(channel)
.or_default()
.push(sender.clone());
}
}
}

impl MessageSink<PublishMessage> for C8yMapperBuilder {
fn get_sender(&self) -> DynSender<PublishMessage> {
self.box_builder.get_sender().sender_clone()
Expand All @@ -444,8 +482,6 @@ impl Builder<C8yMapperActor> for C8yMapperBuilder {
mqtt_publisher.clone(),
self.http_proxy,
self.auth_proxy,
self.uploader,
self.downloader,
)
.map_err(|err| RuntimeError::ActorError(Box::new(err)))?;

Expand All @@ -459,6 +495,7 @@ impl Builder<C8yMapperActor> for C8yMapperBuilder {
timer_sender,
bridge_monitor_box,
self.message_handlers,
self.registered_message_handlers,
))
}
}
1 change: 1 addition & 0 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const STATE_DIR_NAME: &str = ".tedge-mapper-c8y";
const C8Y_CLOUD: &str = "c8y";
const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations";

#[derive(Clone)]
pub struct C8yMapperConfig {
pub device_id: String,
pub device_topic_id: EntityTopicId,
Expand Down
55 changes: 2 additions & 53 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ use super::config::MQTT_MESSAGE_SIZE_THRESHOLD;
use super::error::CumulocityMapperError;
use super::service_monitor;
use crate::actor::CmdId;
use crate::actor::IdDownloadRequest;
use crate::actor::IdDownloadResult;
use crate::dynamic_discovery::DiscoverOp;
use crate::error::ConversionError;
use crate::json;
use crate::operations;
use crate::operations::OperationHandler;
use anyhow::anyhow;
use anyhow::Context;
use c8y_api::http_proxy::C8yEndPoint;
Expand Down Expand Up @@ -58,7 +54,6 @@ use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tedge_actors::ClientMessageBox;
use tedge_actors::LoggingSender;
use tedge_actors::Sender;
use tedge_api::commands::RestartCommand;
Expand Down Expand Up @@ -86,8 +81,6 @@ use tedge_config::TEdgeConfigError;
use tedge_config::TopicPrefix;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::Topic;
use tedge_uploader_ext::UploadRequest;
use tedge_uploader_ext::UploadResult;
use tedge_utils::file::create_directory_with_defaults;
use tedge_utils::file::create_file_with_defaults;
use tedge_utils::file::FileError;
Expand Down Expand Up @@ -186,8 +179,6 @@ pub struct CumulocityConverter {
pub command_id: IdGenerator,
// Keep active command IDs to avoid creation of multiple commands for an operation
pub active_commands: HashSet<CmdId>,

pub operation_handler: OperationHandler,
}

impl CumulocityConverter {
Expand All @@ -196,8 +187,6 @@ impl CumulocityConverter {
mqtt_publisher: LoggingSender<MqttMessage>,
http_proxy: C8YHttpProxy,
auth_proxy: ProxyUrlGenerator,
uploader: ClientMessageBox<(String, UploadRequest), (String, UploadResult)>,
downloader: ClientMessageBox<IdDownloadRequest, IdDownloadResult>,
) -> Result<Self, CumulocityConverterBuildError> {
let device_id = config.device_id.clone();
let device_topic_id = config.device_topic_id.clone();
Expand Down Expand Up @@ -247,15 +236,6 @@ impl CumulocityConverter {

let command_id = config.id_generator();

let operation_handler = OperationHandler::new(
&config,
downloader,
uploader,
mqtt_publisher.clone(),
http_proxy.clone(),
auth_proxy.clone(),
);

Ok(CumulocityConverter {
size_threshold,
config: Arc::new(config),
Expand All @@ -276,7 +256,6 @@ impl CumulocityConverter {
auth_proxy,
command_id,
active_commands: HashSet::new(),
operation_handler,
})
}

Expand Down Expand Up @@ -1186,16 +1165,6 @@ impl CumulocityConverter {
Channel::Command { cmd_id, .. } if self.command_id.is_generator_of(cmd_id) => {
self.active_commands.insert(cmd_id.clone());

let entity = self.entity_store.try_get(&source)?;
let external_id = entity.external_id.clone();
let entity = operations::EntityTarget {
topic_id: entity.topic_id.clone(),
external_id: external_id.clone(),
smartrest_publish_topic: self
.smartrest_publish_topic_for_entity(&entity.topic_id)?,
};

self.operation_handler.handle(entity, message.clone()).await;
Ok(vec![])
}

Expand Down Expand Up @@ -1514,10 +1483,6 @@ pub fn get_local_child_devices_list(path: &Path) -> Result<HashSet<String>, Cumu
#[cfg(test)]
pub(crate) mod tests {
use super::CumulocityConverter;
use crate::actor::IdDownloadRequest;
use crate::actor::IdDownloadResult;
use crate::actor::IdUploadRequest;
use crate::actor::IdUploadResult;
use crate::config::C8yMapperConfig;
use crate::Capabilities;
use anyhow::Result;
Expand All @@ -1538,7 +1503,6 @@ pub(crate) mod tests {
use tedge_actors::test_helpers::FakeServerBox;
use tedge_actors::test_helpers::FakeServerBoxBuilder;
use tedge_actors::Builder;
use tedge_actors::ClientMessageBox;
use tedge_actors::CloneSender;
use tedge_actors::LoggingSender;
use tedge_actors::MessageReceiver;
Expand Down Expand Up @@ -3145,23 +3109,8 @@ pub(crate) mod tests {
let auth_proxy_port = config.auth_proxy_port;
let auth_proxy = ProxyUrlGenerator::new(auth_proxy_addr, auth_proxy_port, Protocol::Http);

let mut uploader_builder: FakeServerBoxBuilder<IdUploadRequest, IdUploadResult> =
FakeServerBox::builder();
let uploader = ClientMessageBox::new(&mut uploader_builder);

let mut downloader_builder: FakeServerBoxBuilder<IdDownloadRequest, IdDownloadResult> =
FakeServerBox::builder();
let downloader = ClientMessageBox::new(&mut downloader_builder);

let converter = CumulocityConverter::new(
config,
mqtt_publisher,
http_proxy,
auth_proxy,
uploader,
downloader,
)
.unwrap();
let converter =
CumulocityConverter::new(config, mqtt_publisher, http_proxy, auth_proxy).unwrap();

(converter, c8y_proxy_builder.build())
}
Expand Down
17 changes: 16 additions & 1 deletion crates/extensions/c8y_mapper_ext/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use c8y_api::json_c8y_deserializer::C8yDeviceControlOperation;

pub mod actor;
pub mod alarm_converter;
pub mod availability;
Expand All @@ -9,7 +11,7 @@ pub mod error;
mod fragments;
mod inventory;
pub mod json;
mod operations;
pub mod operations;
mod serializer;
pub mod service_monitor;
#[cfg(test)]
Expand All @@ -24,6 +26,19 @@ pub struct Capabilities {
pub device_profile: bool,
}

impl Capabilities {
pub fn is_enabled(&self, operation: &C8yDeviceControlOperation) -> bool {
match operation {
C8yDeviceControlOperation::LogfileRequest(_) => self.log_upload,
C8yDeviceControlOperation::UploadConfigFile(_) => self.config_snapshot,
C8yDeviceControlOperation::DownloadConfigFile(_) => self.config_update,
C8yDeviceControlOperation::Firmware(_) => self.firmware_update,
C8yDeviceControlOperation::DeviceProfile(_) => self.device_profile,
_ => true,
}
}
}

#[cfg(test)]
impl Default for Capabilities {
fn default() -> Self {
Expand Down
Loading
Loading