Skip to content

Commit

Permalink
add OperationHandlerActor
Browse files Browse the repository at this point in the history
A preliminary OperationHandlerActor which still uses OperationHandler
underneath, but receives command messages using the actor runtime.

This initial implementation shows the limitations of the entity store,
which is now a part of CumulocityConverter, and other actors don't have
any access to it. Because OperationHandlerActor needs some information
about a registered entity (smartrest publish topic), I have added
another `MessageSource<(MqttMessage, EntityMetadata)>` impl to the
`C8yMapperActor`, but this obviously very hacky.

When moving more things out of CumulocityConverter into other actors,
we'll need some organized way to access entity metadata (and entity
store) from other actors. First thing that comes to mind is making an
`EntityStoreActor` that manages the entity store and allows queries and
updates to the store via incoming messages.

But we probably shouldn't allow all actors to modify the entity store
freely, and we'll need to make sure what to do when entities are deleted
or updated.

Signed-off-by: Marcel Guzik <[email protected]>
  • Loading branch information
Bravo555 committed Aug 16, 2024
1 parent bc45ad1 commit 2953d56
Show file tree
Hide file tree
Showing 11 changed files with 706 additions and 158 deletions.
14 changes: 11 additions & 3 deletions crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,24 @@ impl TEdgeComponent for CumulocityMapper {
MqttActorBuilder::new(service_monitor_client_config(&tedge_config)?);

let mut c8y_mapper_actor = C8yMapperBuilder::try_new(
c8y_mapper_config,
c8y_mapper_config.clone(),
&mut mqtt_actor,
&mut c8y_http_proxy_actor,
&mut timer_actor,
&mut uploader_actor,
&mut downloader_actor,
&mut fs_watch_actor,
&mut service_monitor_actor,
)?;

let c8y_prefix = &tedge_config.c8y.bridge.topic_prefix;

let operation_handler_actor = c8y_mapper_ext::operations::OperationHandlerBuilder::new(
c8y_mapper_config.to_operation_handler_config(),
&mut c8y_mapper_actor,
&mut uploader_actor,
&mut downloader_actor,
&mut c8y_http_proxy_actor,
);

// Adaptor translating commands sent on te/device/main///cmd/+/+ into requests on tedge/commands/req/+/+
// and translating the responses received on tedge/commands/res/+/+ to te/device/main///cmd/+/+
let old_to_new_agent_adapter = OldAgentAdapter::builder(c8y_prefix, &mut mqtt_actor);
Expand Down Expand Up @@ -254,6 +261,7 @@ impl TEdgeComponent for CumulocityMapper {
if let Some(availability_actor) = availability_actor {
runtime.spawn(availability_actor).await?;
}
runtime.spawn(operation_handler_actor).await?;
runtime.run_to_completion().await?;

Ok(())
Expand Down
61 changes: 49 additions & 12 deletions crates/extensions/c8y_mapper_ext/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::time::Duration;
use tedge_actors::fan_in_message_type;
use tedge_actors::Actor;
use tedge_actors::Builder;
use tedge_actors::ClientMessageBox;
use tedge_actors::CloneSender;
use tedge_actors::DynSender;
use tedge_actors::LoggingSender;
Expand All @@ -27,6 +26,7 @@ use tedge_actors::Sender;
use tedge_actors::Service;
use tedge_actors::SimpleMessageBox;
use tedge_actors::SimpleMessageBoxBuilder;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::ChannelFilter;
Expand Down Expand Up @@ -80,6 +80,9 @@ pub struct C8yMapperActor {
timer_sender: LoggingSender<SyncStart>,
bridge_status_messages: SimpleMessageBox<MqttMessage, MqttMessage>,
message_handlers: HashMap<ChannelFilter, Vec<LoggingSender<MqttMessage>>>,
// these handlers require entity metadata, so the entity already has to be registered
registered_message_handlers:
HashMap<ChannelFilter, Vec<LoggingSender<(MqttMessage, EntityMetadata)>>>,
}

#[async_trait]
Expand Down Expand Up @@ -140,6 +143,10 @@ impl C8yMapperActor {
timer_sender: LoggingSender<SyncStart>,
bridge_status_messages: SimpleMessageBox<MqttMessage, MqttMessage>,
message_handlers: HashMap<ChannelFilter, Vec<LoggingSender<MqttMessage>>>,
registered_message_handlers: HashMap<
ChannelFilter,
Vec<LoggingSender<(MqttMessage, EntityMetadata)>>,
>,
) -> Self {
Self {
converter,
Expand All @@ -148,6 +155,7 @@ impl C8yMapperActor {
timer_sender,
bridge_status_messages,
message_handlers,
registered_message_handlers,
}
}

Expand Down Expand Up @@ -267,6 +275,25 @@ impl C8yMapperActor {
sender.send(message.clone()).await?;
}
}

if let Some(message_handler) = self.registered_message_handlers.get_mut(&channel.into()) {
let (entity, _) = self
.converter
.mqtt_schema
.entity_channel_of(&message.topic)
.expect("message should've been confirmed to be using MQTT topic scheme v1");

let entity = self
.converter
.entity_store
.get(&entity)
.expect("entity should've already been registered");

for sender in message_handler {
sender.send((message.clone(), entity.clone())).await?;
}
}

Ok(())
}

Expand Down Expand Up @@ -331,11 +358,11 @@ pub struct C8yMapperBuilder {
mqtt_publisher: DynSender<MqttMessage>,
http_proxy: C8YHttpProxy,
timer_sender: DynSender<SyncStart>,
downloader: ClientMessageBox<IdDownloadRequest, IdDownloadResult>,
uploader: ClientMessageBox<IdUploadRequest, IdUploadResult>,
auth_proxy: ProxyUrlGenerator,
bridge_monitor_builder: SimpleMessageBoxBuilder<MqttMessage, MqttMessage>,
message_handlers: HashMap<ChannelFilter, Vec<LoggingSender<MqttMessage>>>,
registered_message_handlers:
HashMap<ChannelFilter, Vec<LoggingSender<(MqttMessage, EntityMetadata)>>>,
}

impl C8yMapperBuilder {
Expand All @@ -345,8 +372,6 @@ impl C8yMapperBuilder {
mqtt: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
http: &mut impl Service<C8YRestRequest, C8YRestResult>,
timer: &mut impl Service<SyncStart, SyncComplete>,
uploader: &mut impl Service<IdUploadRequest, IdUploadResult>,
downloader: &mut impl Service<IdDownloadRequest, IdDownloadResult>,
fs_watcher: &mut impl MessageSource<FsWatchEvent, PathBuf>,
service_monitor: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
) -> Result<Self, FileError> {
Expand All @@ -360,9 +385,6 @@ impl C8yMapperBuilder {
let http_proxy = C8YHttpProxy::new(http);
let timer_sender = timer.connect_client(box_builder.get_sender().sender_clone());

let downloader = ClientMessageBox::new(downloader);
let uploader = ClientMessageBox::new(uploader);

fs_watcher.connect_sink(
config.ops_dir.as_std_path().to_path_buf(),
&box_builder.get_sender(),
Expand All @@ -382,18 +404,18 @@ impl C8yMapperBuilder {
);

let message_handlers = HashMap::new();
let registered_message_handlers = HashMap::new();

Ok(Self {
config,
box_builder,
mqtt_publisher,
http_proxy,
timer_sender,
uploader,
downloader,
auth_proxy,
bridge_monitor_builder,
message_handlers,
registered_message_handlers,
})
}

Expand Down Expand Up @@ -426,6 +448,22 @@ impl MessageSource<MqttMessage, Vec<ChannelFilter>> for C8yMapperBuilder {
}
}

impl MessageSource<(MqttMessage, EntityMetadata), Vec<ChannelFilter>> for C8yMapperBuilder {
fn connect_sink(
&mut self,
config: Vec<ChannelFilter>,
peer: &impl MessageSink<(MqttMessage, EntityMetadata)>,
) {
let sender = LoggingSender::new("Mapper MQTT".into(), peer.get_sender());
for channel in config {
self.registered_message_handlers
.entry(channel)
.or_default()
.push(sender.clone());
}
}
}

impl MessageSink<PublishMessage> for C8yMapperBuilder {
fn get_sender(&self) -> DynSender<PublishMessage> {
self.box_builder.get_sender().sender_clone()
Expand All @@ -444,8 +482,6 @@ impl Builder<C8yMapperActor> for C8yMapperBuilder {
mqtt_publisher.clone(),
self.http_proxy,
self.auth_proxy,
self.uploader,
self.downloader,
)
.map_err(|err| RuntimeError::ActorError(Box::new(err)))?;

Expand All @@ -459,6 +495,7 @@ impl Builder<C8yMapperActor> for C8yMapperBuilder {
timer_sender,
bridge_monitor_box,
self.message_handlers,
self.registered_message_handlers,
))
}
}
1 change: 1 addition & 0 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const STATE_DIR_NAME: &str = ".tedge-mapper-c8y";
const C8Y_CLOUD: &str = "c8y";
const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations";

#[derive(Clone)]
pub struct C8yMapperConfig {
pub device_id: String,
pub device_topic_id: EntityTopicId,
Expand Down
55 changes: 2 additions & 53 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ use super::config::MQTT_MESSAGE_SIZE_THRESHOLD;
use super::error::CumulocityMapperError;
use super::service_monitor;
use crate::actor::CmdId;
use crate::actor::IdDownloadRequest;
use crate::actor::IdDownloadResult;
use crate::dynamic_discovery::DiscoverOp;
use crate::error::ConversionError;
use crate::json;
use crate::operations;
use crate::operations::OperationHandler;
use anyhow::anyhow;
use anyhow::Context;
use c8y_api::http_proxy::C8yEndPoint;
Expand Down Expand Up @@ -56,7 +52,6 @@ use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tedge_actors::ClientMessageBox;
use tedge_actors::LoggingSender;
use tedge_actors::Sender;
use tedge_api::commands::RestartCommand;
Expand Down Expand Up @@ -84,8 +79,6 @@ use tedge_config::TEdgeConfigError;
use tedge_config::TopicPrefix;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::Topic;
use tedge_uploader_ext::UploadRequest;
use tedge_uploader_ext::UploadResult;
use tedge_utils::file::create_directory_with_defaults;
use tedge_utils::file::create_file_with_defaults;
use tedge_utils::file::FileError;
Expand Down Expand Up @@ -184,8 +177,6 @@ pub struct CumulocityConverter {
pub command_id: IdGenerator,
// Keep active command IDs to avoid creation of multiple commands for an operation
pub active_commands: HashSet<CmdId>,

pub operation_handler: OperationHandler,
}

impl CumulocityConverter {
Expand All @@ -194,8 +185,6 @@ impl CumulocityConverter {
mqtt_publisher: LoggingSender<MqttMessage>,
http_proxy: C8YHttpProxy,
auth_proxy: ProxyUrlGenerator,
uploader: ClientMessageBox<(String, UploadRequest), (String, UploadResult)>,
downloader: ClientMessageBox<IdDownloadRequest, IdDownloadResult>,
) -> Result<Self, CumulocityConverterBuildError> {
let device_id = config.device_id.clone();
let device_topic_id = config.device_topic_id.clone();
Expand Down Expand Up @@ -245,15 +234,6 @@ impl CumulocityConverter {

let command_id = config.id_generator();

let operation_handler = OperationHandler::new(
&config,
downloader,
uploader,
mqtt_publisher.clone(),
http_proxy.clone(),
auth_proxy.clone(),
);

Ok(CumulocityConverter {
size_threshold,
config: Arc::new(config),
Expand All @@ -274,7 +254,6 @@ impl CumulocityConverter {
auth_proxy,
command_id,
active_commands: HashSet::new(),
operation_handler,
})
}

Expand Down Expand Up @@ -1169,16 +1148,6 @@ impl CumulocityConverter {
Channel::Command { cmd_id, .. } if self.command_id.is_generator_of(cmd_id) => {
self.active_commands.insert(cmd_id.clone());

let entity = self.entity_store.try_get(&source)?;
let external_id = entity.external_id.clone();
let entity = operations::EntityTarget {
topic_id: entity.topic_id.clone(),
external_id: external_id.clone(),
smartrest_publish_topic: self
.smartrest_publish_topic_for_entity(&entity.topic_id)?,
};

self.operation_handler.handle(entity, message.clone()).await;
Ok(vec![])
}

Expand Down Expand Up @@ -1497,10 +1466,6 @@ pub fn get_local_child_devices_list(path: &Path) -> Result<HashSet<String>, Cumu
#[cfg(test)]
pub(crate) mod tests {
use super::CumulocityConverter;
use crate::actor::IdDownloadRequest;
use crate::actor::IdDownloadResult;
use crate::actor::IdUploadRequest;
use crate::actor::IdUploadResult;
use crate::config::C8yMapperConfig;
use crate::Capabilities;
use anyhow::Result;
Expand All @@ -1521,7 +1486,6 @@ pub(crate) mod tests {
use tedge_actors::test_helpers::FakeServerBox;
use tedge_actors::test_helpers::FakeServerBoxBuilder;
use tedge_actors::Builder;
use tedge_actors::ClientMessageBox;
use tedge_actors::CloneSender;
use tedge_actors::LoggingSender;
use tedge_actors::MessageReceiver;
Expand Down Expand Up @@ -3127,23 +3091,8 @@ pub(crate) mod tests {
let auth_proxy_port = config.auth_proxy_port;
let auth_proxy = ProxyUrlGenerator::new(auth_proxy_addr, auth_proxy_port, Protocol::Http);

let mut uploader_builder: FakeServerBoxBuilder<IdUploadRequest, IdUploadResult> =
FakeServerBox::builder();
let uploader = ClientMessageBox::new(&mut uploader_builder);

let mut downloader_builder: FakeServerBoxBuilder<IdDownloadRequest, IdDownloadResult> =
FakeServerBox::builder();
let downloader = ClientMessageBox::new(&mut downloader_builder);

let converter = CumulocityConverter::new(
config,
mqtt_publisher,
http_proxy,
auth_proxy,
uploader,
downloader,
)
.unwrap();
let converter =
CumulocityConverter::new(config, mqtt_publisher, http_proxy, auth_proxy).unwrap();

(converter, c8y_proxy_builder.build())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/c8y_mapper_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod error;
mod fragments;
mod inventory;
pub mod json;
mod operations;
pub mod operations;
mod serializer;
pub mod service_monitor;
#[cfg(test)]
Expand Down
Loading

0 comments on commit 2953d56

Please sign in to comment.