diff --git a/Cargo.lock b/Cargo.lock index feadfec..ae27043 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -811,6 +811,29 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "ibeji-adapter" +version = "0.1.0" +dependencies = [ + "async-trait", + "core-protobuf-data-access", + "freyja-build-common", + "freyja-common", + "futures", + "log", + "proc-macros", + "serde", + "serde_json", + "service_discovery_proto", + "strum", + "strum_macros", + "tempfile", + "tokio", + "tokio-stream", + "tonic", + "tower", +] + [[package]] name = "idna" version = "0.5.0" @@ -1777,6 +1800,17 @@ dependencies = [ "serde", ] +[[package]] +name = "service_discovery_proto" +version = "0.1.0" +source = "git+https://github.com/eclipse-chariott/chariott#57cb120bde842e1bc32628233f6d8ceff1922306" +dependencies = [ + "prost", + "tokio", + "tonic", + "tonic-build", +] + [[package]] name = "sha2" version = "0.10.8" @@ -1807,6 +1841,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "strum" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" + [[package]] name = "strum_macros" version = "0.25.3" diff --git a/Cargo.toml b/Cargo.toml index fa53639..f47aebf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "adapters/data/in_memory_mock_data_adapter", "adapters/data/managed_subscribe_data_adapter", "adapters/data/mqtt_data_adapter", + "adapters/digital_twin/ibeji_adapter", "adapters/digital_twin/in_memory_mock_digital_twin_adapter", "adapters/digital_twin/mock_digital_twin_adapter", "adapters/mapping/in_memory_mock_mapping_adapter", diff --git a/adapters/digital_twin/ibeji_adapter/Cargo.toml b/adapters/digital_twin/ibeji_adapter/Cargo.toml new file mode 100644 index 0000000..8e3e5f6 --- /dev/null +++ b/adapters/digital_twin/ibeji_adapter/Cargo.toml @@ -0,0 +1,31 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. +# SPDX-License-Identifier: MIT + +[package] +name = "ibeji-adapter" +version = "0.1.0" +edition = "2021" +license = "MIT" + +[dependencies] +async-trait = { workspace = true } +core-protobuf-data-access = { workspace = true } +freyja-build-common = { workspace = true } +freyja-common = { workspace = true } +futures = { workspace = true } +log = { workspace = true } +proc-macros = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +service_discovery_proto = { workspace = true } +strum = { workspace = true } +strum_macros = { workspace = true } +tempfile = { workspace = true } +tokio = { workspace = true } +tokio-stream = { workspace = true, features = ["net"] } +tonic = { workspace = true } +tower = { workspace = true } + +[build-dependencies] +freyja-build-common = { workspace = true } diff --git a/adapters/digital_twin/ibeji_adapter/README.md b/adapters/digital_twin/ibeji_adapter/README.md new file mode 100644 index 0000000..8779a11 --- /dev/null +++ b/adapters/digital_twin/ibeji_adapter/README.md @@ -0,0 +1,35 @@ +# Ibeji Adapter + +The Ibeji Adapter is used to integrate with the [Ibeji In-Vehicle Digital Twin Service](https://github.com/eclipse-ibeji/ibeji), and optionally [Chariott](https://github.com/eclipse-chariott/chariott) to discover Ibeji. + +## Configuration + +This adapter supports two different configuration schemas depending on how you want to discover the In-Vehicle Digital Twin Service: + +### Without Chariott + +To bypass Chariott and use a configuration value to specify the In-Vehicle Digital Twin Service URI, you must specify the following configuration: + +- `service_discovery_method`: Set this value to `"Config"`. +- `uri`: The URI for the In-Vehicle Digital Twin Service. +- `max_retries`: The maximum number of times to retry failed attempts to communicate with the In-Vehicle Digital Twin Service. +- `retry_interval_ms`: The duration between retries in milliseconds. + +### Using Chariott + +To use Chariott to discover the In-Vehicle Digital Twin Service, you must specify the following configuration: + +- `service_discovery_method`: Set this value to `"ChariottServiceDiscovery"` to use Chariott. +- `uri`: The URI for Chariott's Service Discovery system. +- `max_retries`: The maximum number of times to retry failed attempts to communicate with Chariott or the In-Vehicle Digital Twin Service. +- `retry_interval_ms`: The duration between retries in milliseconds. +- `metadata`: Metadata for the discovery operation: + - `namespace`: The namespace for the In-Vehicle Digital Twin Service. + - `name`: The service name for the In-Vehicle Digital Twin Service. + - `version`: The version of the In-Vehicle Digital Twin Service to query for. + +An example of a configuration file that uses Chariott can be found at `res/ibeji_adapter_config.chariott_sample.json`. + +### Configuration Overrides + +This adapter supports the same [config override method](https://github.com/eclipse-ibeji/freyja/blob/main/docs/config-overrides.md) as the Freyja mocks. The override filename is `ibeji_adapter_config.json`, and the default config is located at `res/ibeji_adapter_config.default.json`. diff --git a/adapters/digital_twin/ibeji_adapter/build.rs b/adapters/digital_twin/ibeji_adapter/build.rs new file mode 100644 index 0000000..02dfd75 --- /dev/null +++ b/adapters/digital_twin/ibeji_adapter/build.rs @@ -0,0 +1,11 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +use freyja_build_common::copy_config; + +const CONFIG_FILE_STEM: &str = "ibeji_adapter_config"; + +fn main() { + copy_config(CONFIG_FILE_STEM); +} diff --git a/adapters/digital_twin/ibeji_adapter/res/ibeji_adapter_config.chariott_sample.json b/adapters/digital_twin/ibeji_adapter/res/ibeji_adapter_config.chariott_sample.json new file mode 100644 index 0000000..ad8c6cd --- /dev/null +++ b/adapters/digital_twin/ibeji_adapter/res/ibeji_adapter_config.chariott_sample.json @@ -0,0 +1,11 @@ +{ + "service_discovery_method": "ChariottServiceDiscovery", + "uri": "http://0.0.0.0:50000", + "max_retries": 5, + "retry_interval_ms": 1000, + "discover_request": { + "namespace": "sdv.ibeji", + "name": "invehicle_digital_twin", + "version": "1.0" + } +} \ No newline at end of file diff --git a/adapters/digital_twin/ibeji_adapter/res/ibeji_adapter_config.default.json b/adapters/digital_twin/ibeji_adapter/res/ibeji_adapter_config.default.json new file mode 100644 index 0000000..1c7f507 --- /dev/null +++ b/adapters/digital_twin/ibeji_adapter/res/ibeji_adapter_config.default.json @@ -0,0 +1,6 @@ +{ + "service_discovery_method": "FromConfig", + "uri": "http://0.0.0.0:5010", + "max_retries": 5, + "retry_interval_ms": 1000 +} \ No newline at end of file diff --git a/adapters/digital_twin/ibeji_adapter/src/config.rs b/adapters/digital_twin/ibeji_adapter/src/config.rs new file mode 100644 index 0000000..7c476e0 --- /dev/null +++ b/adapters/digital_twin/ibeji_adapter/src/config.rs @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +use serde::{Deserialize, Serialize}; + +/// Configuration for the Ibeji Adapter. +/// Supports two different schemas based on the service discovery method. +#[derive(Clone, Serialize, Deserialize)] +#[serde(tag = "service_discovery_method")] +pub enum Config { + /// Use a URI from the config for the In-Vehicle Digital Twin Service + FromConfig { + /// The URI for the In-Vehicle Digital Twin Service + uri: String, + + /// The maximum number of retries for communication attempts + max_retries: u32, + + /// The duration between retries in milliseconds + retry_interval_ms: u64, + }, + + /// Use Chariott's Service Discovery system to discover the In-Vehicle Digital Twin Service + ChariottServiceDiscovery { + /// The URI for the Chariott Discovery Service + uri: String, + + /// The maximum number of retries for communication attempts + max_retries: u32, + + /// The duration between retries in milliseconds + retry_interval_ms: u64, + + /// The request to send to Chariott + discover_request: ChariottDiscoverRequest, + }, +} + +/// A Chariott Service Discovery request +#[derive(Clone, Serialize, Deserialize)] +pub struct ChariottDiscoverRequest { + /// The service namespace + pub namespace: String, + + /// The service name + pub name: String, + + /// The service version + pub version: String, +} diff --git a/adapters/digital_twin/ibeji_adapter/src/ibeji_adapter.rs b/adapters/digital_twin/ibeji_adapter/src/ibeji_adapter.rs new file mode 100644 index 0000000..ddb8dd9 --- /dev/null +++ b/adapters/digital_twin/ibeji_adapter/src/ibeji_adapter.rs @@ -0,0 +1,323 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +use std::time::Duration; + +use async_trait::async_trait; +use core_protobuf_data_access::invehicle_digital_twin::v1::{ + invehicle_digital_twin_client::InvehicleDigitalTwinClient, + FindByIdRequest as IbejiFindByIdRequest, +}; +use log::info; +use service_discovery_proto::service_registry::v1::{ + service_registry_client::ServiceRegistryClient, DiscoverRequest, +}; +use tonic::{transport::Channel, Request}; + +use crate::config::{ChariottDiscoverRequest, Config}; +use freyja_build_common::config_file_stem; +use freyja_common::{ + config_utils, + digital_twin_adapter::{ + DigitalTwinAdapter, DigitalTwinAdapterError, FindByIdRequest, FindByIdResponse, + }, + entity::{Entity, EntityEndpoint}, + out_dir, + retry_utils::execute_with_retry, +}; + +/// Contacts the In-Vehicle Digital Twin Service in Ibeji +pub struct IbejiAdapter { + client: InvehicleDigitalTwinClient, +} + +impl IbejiAdapter { + /// Retrieves Ibeji's In-Vehicle Digital Twin URI from Chariott + /// + /// # Arguments + /// - `chariott_discovery_request`: the uri for Chariott's service discovery + /// - `metadata`: optional configuration metadata for discovering Ibeji using Chariott + async fn retrieve_ibeji_invehicle_digital_twin_uri_from_chariott( + chariott_service_discovery_uri: &str, + chariott_discovery_request: ChariottDiscoverRequest, + ) -> Result { + let mut service_registry_client = + ServiceRegistryClient::connect(chariott_service_discovery_uri.to_owned()) + .await + .map_err(DigitalTwinAdapterError::communication)?; + + let discover_request = Request::new(DiscoverRequest { + namespace: chariott_discovery_request.namespace, + name: chariott_discovery_request.name, + version: chariott_discovery_request.version, + }); + + let service = service_registry_client + .discover(discover_request) + .await + .map_err(DigitalTwinAdapterError::communication)? + .into_inner() + .service + .ok_or_else(|| { + DigitalTwinAdapterError::communication( + "Cannot discover the uri of Ibeji's In-Vehicle Digital Twin Service", + ) + })?; + + Ok(service.uri) + } +} + +#[async_trait] +impl DigitalTwinAdapter for IbejiAdapter { + /// Creates a new instance of a DigitalTwinAdapter with default settings + fn create_new() -> Result { + let config = config_utils::read_from_files( + config_file_stem!(), + config_utils::JSON_EXT, + out_dir!(), + DigitalTwinAdapterError::io, + DigitalTwinAdapterError::deserialize, + )?; + + let (invehicle_digital_twin_service_uri, max_retries, retry_interval_ms) = match config { + Config::FromConfig { + uri, + max_retries, + retry_interval_ms, + } => (uri, max_retries, retry_interval_ms), + Config::ChariottServiceDiscovery { + uri, + max_retries, + retry_interval_ms, + discover_request, + } => { + let invehicle_digital_twin_service_uri = futures::executor::block_on(async { + execute_with_retry( + max_retries, + Duration::from_millis(retry_interval_ms), + || { + Self::retrieve_ibeji_invehicle_digital_twin_uri_from_chariott( + &uri, + discover_request.clone(), + ) + }, + Some(String::from("Connection retry for connecting to Chariott")), + ) + .await + }) + .map_err(DigitalTwinAdapterError::communication)?; + + info!("Discovered the uri of the In-Vehicle Digital Twin Service via Chariott: {invehicle_digital_twin_service_uri}"); + + ( + invehicle_digital_twin_service_uri, + max_retries, + retry_interval_ms, + ) + } + }; + + let client = futures::executor::block_on(async { + execute_with_retry( + max_retries, + Duration::from_millis(retry_interval_ms), + || InvehicleDigitalTwinClient::connect(invehicle_digital_twin_service_uri.clone()), + Some(String::from("Connection retry for connecting to Ibeji")), + ) + .await + .map_err(DigitalTwinAdapterError::communication) + }) + .map_err(DigitalTwinAdapterError::communication)?; + + Ok(Self { client }) + } + + /// Gets entity access information + /// + /// # Arguments + /// - `request`: the request for finding an entity's access information + async fn find_by_id( + &self, + request: FindByIdRequest, + ) -> Result { + let entity_id = request.entity_id; + let request = tonic::Request::new(IbejiFindByIdRequest { + id: entity_id.clone(), + }); + + let response = self + .client + .clone() + .find_by_id(request) + .await + .map_err(DigitalTwinAdapterError::entity_not_found)?; + + // Extract the response from find_by_id + let entity_access_info = response + .into_inner() + .entity_access_info + .ok_or(format!("Cannot find {entity_id} with find_by_id")) + .map_err(DigitalTwinAdapterError::entity_not_found)?; + + Ok(FindByIdResponse { + entity: Entity { + id: entity_access_info.id, + name: Some(entity_access_info.name), + description: Some(entity_access_info.description), + endpoints: entity_access_info + .endpoint_info_list + .into_iter() + .map(|e| EntityEndpoint { + protocol: e.protocol, + operations: e.operations, + uri: e.uri, + context: e.context, + }) + .collect(), + }, + }) + } +} + +#[cfg(test)] +mod ibeji_digital_twin_adapter_tests { + use core_protobuf_data_access::invehicle_digital_twin::v1::{ + invehicle_digital_twin_server::InvehicleDigitalTwin, EndpointInfo, EntityAccessInfo, + FindByIdResponse as IbejiFindByIdResponse, RegisterRequest, RegisterResponse, + }; + use tonic::{Response, Status}; + + use super::*; + + const AMBIENT_AIR_TEMPERATURE_ID: &str = "dtmi:sdv:Vehicle:Cabin:HVAC:AmbientAirTemperature;1"; + + pub struct MockInVehicleTwin {} + + #[tonic::async_trait] + impl InvehicleDigitalTwin for MockInVehicleTwin { + async fn find_by_id( + &self, + request: Request, + ) -> Result, Status> { + let entity_id = request.into_inner().id; + + if entity_id != AMBIENT_AIR_TEMPERATURE_ID { + return Err(Status::not_found( + "Unable to find the entity with id {entity_id}", + )); + } + + let endpoint_info = EndpointInfo { + protocol: String::from("grpc"), + uri: String::from("http://[::1]:40010"), // Devskim: ignore DS137138 + context: String::from("dtmi:sdv:Vehicle:Cabin:HVAC:AmbientAirTemperature;1"), + operations: vec![String::from("Get"), String::from("Subscribe")], + }; + + let entity_access_info = EntityAccessInfo { + name: String::from("AmbientAirTemperature"), + id: String::from("dtmi:sdv:Vehicle:Cabin:HVAC:AmbientAirTemperature;1"), + description: String::from("Ambient air temperature"), + endpoint_info_list: vec![endpoint_info], + }; + + let response = IbejiFindByIdResponse { + entity_access_info: Some(entity_access_info), + }; + + Ok(Response::new(response)) + } + + async fn register( + &self, + _request: Request, + ) -> Result, Status> { + let response = RegisterResponse {}; + Ok(Response::new(response)) + } + } + + /// The tests below uses Unix sockets to create a channel between a gRPC client and a gRPC server. + /// Unix sockets are more ideal than using TCP/IP sockets since Rust tests will run in parallel + /// so you would need to set an arbitrary port per test for TCP/IP sockets. + #[cfg(unix)] + mod unix_tests { + use super::*; + + use std::sync::Arc; + + use core_protobuf_data_access::invehicle_digital_twin::v1::invehicle_digital_twin_server::InvehicleDigitalTwinServer; + use tempfile::TempPath; + use tokio::net::{UnixListener, UnixStream}; + use tokio_stream::wrappers::UnixListenerStream; + use tonic::transport::{Channel, Endpoint, Server, Uri}; + use tower::service_fn; + + async fn create_test_grpc_client( + bind_path: Arc, + ) -> InvehicleDigitalTwinClient { + let channel = Endpoint::try_from("http://URI_IGNORED") // Devskim: ignore DS137138 + .unwrap() + .connect_with_connector(service_fn(move |_: Uri| { + let bind_path = bind_path.clone(); + async move { UnixStream::connect(bind_path.as_ref()).await } + })) + .await + .unwrap(); + + InvehicleDigitalTwinClient::new(channel) + } + + async fn run_test_grpc_server(uds_stream: UnixListenerStream) { + let mock_in_vehicle_twin = MockInVehicleTwin {}; + Server::builder() + .add_service(InvehicleDigitalTwinServer::new(mock_in_vehicle_twin)) + .serve_with_incoming(uds_stream) + .await + .unwrap(); + } + + #[tokio::test] + async fn find_by_id_test() { + // Create the Unix Socket + let bind_path = Arc::new(tempfile::NamedTempFile::new().unwrap().into_temp_path()); + let uds = match UnixListener::bind(bind_path.as_ref()) { + Ok(unix_listener) => unix_listener, + Err(_) => { + std::fs::remove_file(bind_path.as_ref()).unwrap(); + UnixListener::bind(bind_path.as_ref()).unwrap() + } + }; + let uds_stream = UnixListenerStream::new(uds); + + let request_future = async { + let client = create_test_grpc_client(bind_path.clone()).await; + let ibeji_digital_twin_adapter = IbejiAdapter { client }; + + let request = FindByIdRequest { + entity_id: String::from("invalid_entity"), + }; + + let result = ibeji_digital_twin_adapter.find_by_id(request).await; + + assert!(result.is_err()); + + let request = FindByIdRequest { + entity_id: String::from(AMBIENT_AIR_TEMPERATURE_ID), + }; + + let result = ibeji_digital_twin_adapter.find_by_id(request).await; + assert!(result.is_ok()); + }; + + tokio::select! { + _ = run_test_grpc_server(uds_stream) => (), + _ = request_future => () + } + + std::fs::remove_file(bind_path.as_ref()).unwrap(); + } + } +} diff --git a/adapters/digital_twin/ibeji_adapter/src/lib.rs b/adapters/digital_twin/ibeji_adapter/src/lib.rs new file mode 100644 index 0000000..cc09597 --- /dev/null +++ b/adapters/digital_twin/ibeji_adapter/src/lib.rs @@ -0,0 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +mod config; +pub mod ibeji_adapter;