From ca070aaa74f1fa4859a19ae54380e808e58c0345 Mon Sep 17 00:00:00 2001 From: ValMobBIllich Date: Wed, 30 Oct 2024 14:25:57 +0100 Subject: [PATCH] Added an integration test for pub/sub --- .github/workflows/lint-and-test.yaml | 17 ++++++++-- README.md | 6 +++- tests/mosquitto/docker-compose.yaml | 8 +++++ tests/mosquitto/mosquitto.conf | 8 +++++ tests/publish_subscribe.rs | 46 +++++++++++++++++++++++++++ tests/test_lib.rs | 47 ++++++++++++++++++++++++++++ 6 files changed, 129 insertions(+), 3 deletions(-) create mode 100644 tests/mosquitto/docker-compose.yaml create mode 100644 tests/mosquitto/mosquitto.conf create mode 100644 tests/publish_subscribe.rs create mode 100644 tests/test_lib.rs diff --git a/.github/workflows/lint-and-test.yaml b/.github/workflows/lint-and-test.yaml index a9888ca..28276fa 100644 --- a/.github/workflows/lint-and-test.yaml +++ b/.github/workflows/lint-and-test.yaml @@ -20,8 +20,9 @@ on: paths: - "src/**" - "Cargo.*" - workflow_call: - workflow_dispatch: + - "tests/**" + workflow_call: {} + workflow_dispatch: {} concurrency: group: ${{ github.ref }}-${{ github.workflow }} @@ -55,11 +56,23 @@ jobs: - name: Install dependencies run: | cargo install cargo-tarpaulin + sudo apt-get update + sudo DEBIAN_FRONTEND=noninteractive apt-get install -y netcat-openbsd cmake - name: Show toolchain information working-directory: ${{github.workspace}} run: | rustup toolchain list cargo --version + - name: Start Mosquitto + run: | + cd ./tests/mosquitto + docker compose up -d + - name: Wait for MQTT broker to be available + run: | + until nc -z localhost 1883; do + echo "Waiting for MQTT broker..." + sleep 1 + done - name: Run tests and report code coverage run: | # enable nightly features so that we can also include Doctests diff --git a/README.md b/README.md index 1b1cc4b..7af0b04 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,11 @@ cargo test ### Running the Examples -1. Start an MQTT broker (e.g. mosquitto) +1. Start an MQTT broker or use the included Mosquitto broker: +```bash +cd tests/mosquitto +docker compose up +``` 2. Set up your environment (for example with a config file at .cargo/config.toml) diff --git a/tests/mosquitto/docker-compose.yaml b/tests/mosquitto/docker-compose.yaml new file mode 100644 index 0000000..09e6afd --- /dev/null +++ b/tests/mosquitto/docker-compose.yaml @@ -0,0 +1,8 @@ +services: + mosquitto: + image: eclipse-mosquitto:2.0 + volumes: + # read-only prevents the container changing file owners on the host + - ./mosquitto.conf:/mosquitto/config/mosquitto.conf:ro + ports: + - 1883:1883 diff --git a/tests/mosquitto/mosquitto.conf b/tests/mosquitto/mosquitto.conf new file mode 100644 index 0000000..a6d3637 --- /dev/null +++ b/tests/mosquitto/mosquitto.conf @@ -0,0 +1,8 @@ +persistence false +allow_anonymous true +log_type all +log_type debug +log_dest stdout +connection_messages true +listener 1883 + diff --git a/tests/publish_subscribe.rs b/tests/publish_subscribe.rs new file mode 100644 index 0000000..57bc9d5 --- /dev/null +++ b/tests/publish_subscribe.rs @@ -0,0 +1,46 @@ +use std::{ + str::FromStr, + sync::{Arc, Mutex}, + time::Duration, +}; + +use tokio::time::sleep; +use up_rust::{UMessageBuilder, UPayloadFormat, UTransport, UUri}; + +mod test_lib; + +#[tokio::test(flavor = "multi_thread")] +async fn test_publish_and_subscribe() { + let target_data = "TEST"; + + let publisher = test_lib::create_up_transport_mqtt("Publisher") + .await + .unwrap(); + let subscriber = test_lib::create_up_transport_mqtt("Subscriber") + .await + .unwrap(); + + let source = UUri::from_str("//Publisher/A8000/2/8A50").expect("Failed to create source"); + let source_filter = + UUri::from_str("//Publisher/A8000/2/8A50").expect("Failed to create source filter"); + + let listener = Arc::new(test_lib::TestListener { + recv_data: Arc::new(Mutex::new(String::new())), + }); + + subscriber + .register_listener(&source_filter, None, listener.clone()) + .await + .unwrap(); + + sleep(Duration::from_millis(1000)).await; + + let umessage = UMessageBuilder::publish(source) + .build_with_payload(target_data, UPayloadFormat::UPAYLOAD_FORMAT_TEXT) + .unwrap(); + publisher.send(umessage).await.unwrap(); + + sleep(Duration::from_millis(1000)).await; + + assert_eq!(listener.get_recv_data(), target_data) +} diff --git a/tests/test_lib.rs b/tests/test_lib.rs new file mode 100644 index 0000000..f7a1e71 --- /dev/null +++ b/tests/test_lib.rs @@ -0,0 +1,47 @@ +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType}; +use up_rust::{UListener, UMessage, UStatus, UUID}; + +pub struct TestListener { + pub recv_data: Arc>, +} + +impl TestListener { + pub fn get_recv_data(&self) -> String { + self.recv_data.lock().unwrap().to_string() + } +} + +#[async_trait] +impl UListener for TestListener { + async fn on_receive(&self, message: UMessage) { + let data = message.payload.unwrap(); + let value = data.into_iter().map(|c| c as char).collect::(); + *self.recv_data.lock().unwrap() = value; + } +} + +pub async fn create_up_transport_mqtt(authority_name: &str) -> Result { + let config = MqttConfig { + mqtt_protocol: MqttProtocol::Mqtt, + mqtt_hostname: "localhost".to_string(), + mqtt_port: 1883, + max_buffered_messages: 100, + max_subscriptions: 100, + session_expiry_interval: 3600, + ssl_options: None, + username: "testuser".to_string(), + }; + + let client = UPClientMqtt::new( + config, + UUID::build(), + authority_name.to_string(), + UPClientMqttType::Device, + ) + .await?; + + Ok(client) +}