From 5e4214af6fd44e0b16a7e93f92d0437cdc790fe8 Mon Sep 17 00:00:00 2001 From: Pete LeVasseur Date: Thu, 7 Nov 2024 06:55:20 -0500 Subject: [PATCH] Update tests to tokio from async-std --- Cargo.lock | 13 ++++++++++++- up-streamer/Cargo.toml | 1 + up-streamer/src/endpoint.rs | 4 ++-- up-streamer/src/ustreamer.rs | 18 +++++++++--------- .../tests/single_local_single_remote.rs | 8 ++++---- ...ingle_local_two_remote_add_remove_rules.rs | 12 ++++++------ ..._authorities_different_remote_transport.rs | 8 ++++---- ...emote_authorities_same_remote_transport.rs | 8 ++++---- up-streamer/tests/usubscription.rs | 2 +- utils/integration-test-utils/Cargo.toml | 3 ++- .../src/integration_test_listeners.rs | 2 +- .../src/integration_test_utils.rs | 19 +++++++++++++------ .../src/up_client_foo.rs | 12 +++++++++--- 13 files changed, 68 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9157601b..117477a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1719,13 +1719,14 @@ name = "integration-test-utils" version = "0.1.0" dependencies = [ "async-broadcast", - "async-std", "async-trait", "env_logger 0.10.2", "futures", "log", "rand", "serde_json", + "tokio", + "tokio-condvar", "up-rust", "up-streamer", "uuid", @@ -3731,6 +3732,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "tokio-condvar" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8530e402d24f6a65019baa57593f1769557c670302f493cdf8fa3dfbe4d85ac" +dependencies = [ + "tokio", +] + [[package]] name = "tokio-macros" version = "2.4.0" @@ -4077,6 +4087,7 @@ dependencies = [ "serde_json", "subscription-cache", "tokio", + "tokio-condvar", "up-rust", "usubscription-static-file", "uuid", diff --git a/up-streamer/Cargo.toml b/up-streamer/Cargo.toml index d383edda..e5151b35 100644 --- a/up-streamer/Cargo.toml +++ b/up-streamer/Cargo.toml @@ -39,3 +39,4 @@ usubscription-static-file = {path="../utils/usubscription-static-file"} async-broadcast = { version = "0.7.0" } chrono = { version = "0.4.31", features = [] } integration-test-utils = { path = "../utils/integration-test-utils" } +tokio-condvar = { version = "0.3.0" } diff --git a/up-streamer/src/endpoint.rs b/up-streamer/src/endpoint.rs index 3d4d196e..b9e3dfa3 100644 --- a/up-streamer/src/endpoint.rs +++ b/up-streamer/src/endpoint.rs @@ -11,8 +11,8 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use std::sync::Arc; use log::*; +use std::sync::Arc; use up_rust::UTransport; const ENDPOINT_TAG: &str = "Endpoint:"; @@ -26,7 +26,7 @@ const ENDPOINT_FN_NEW_TAG: &str = "new():"; /// /// ``` /// use std::sync::Arc; -/// use async_std::sync::Mutex; +/// use tokio::sync::Mutex; /// use up_rust::UTransport; /// use up_streamer::Endpoint; /// diff --git a/up-streamer/src/ustreamer.rs b/up-streamer/src/ustreamer.rs index efbf289d..69ae2eb6 100644 --- a/up-streamer/src/ustreamer.rs +++ b/up-streamer/src/ustreamer.rs @@ -41,12 +41,12 @@ const USTREAMER_FN_DELETE_FORWARDING_RULE_TAG: &str = "delete_forwarding_rule(): const THREAD_NUM: usize = 10; // Create a separate tokio Runtime for running the callback -lazy_static::lazy_static! { +lazy_static! { static ref CB_RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread() - .worker_threads(THREAD_NUM) - .enable_all() - .build() - .expect("Unable to create callback runtime"); + .worker_threads(THREAD_NUM) + .enable_all() + .build() + .expect("Unable to create callback runtime"); } fn uauthority_to_uuri(authority_name: &str) -> UUri { @@ -367,7 +367,7 @@ impl ForwardingListeners { /// use usubscription_static_file::USubscriptionStaticFile; /// use std::sync::Arc; /// use std::path::PathBuf; -/// use async_std::sync::Mutex; +/// use tokio::sync::Mutex; /// use up_rust::{UListener, UTransport}; /// use up_streamer::{Endpoint, UStreamer}; /// # pub mod up_client_foo { @@ -1054,7 +1054,7 @@ mod tests { } } - #[async_std::test] + #[tokio::test(flavor = "multi_thread")] async fn test_simple_with_a_single_input_and_output_endpoint() { // Local endpoint let local_authority = "local"; @@ -1123,7 +1123,7 @@ mod tests { .is_err()); } - #[async_std::test] + #[tokio::test(flavor = "multi_thread")] async fn test_advanced_where_there_is_a_local_endpoint_and_two_remote_endpoints() { // Local endpoint let local_authority = "local"; @@ -1188,7 +1188,7 @@ mod tests { .is_ok()); } - #[async_std::test] + #[tokio::test(flavor = "multi_thread")] async fn test_advanced_where_there_is_a_local_endpoint_and_two_remote_endpoints_but_the_remote_endpoints_have_the_same_instance_of_utransport( ) { // Local endpoint diff --git a/up-streamer/tests/single_local_single_remote.rs b/up-streamer/tests/single_local_single_remote.rs index 9f6cd56f..1d5de9ec 100644 --- a/up-streamer/tests/single_local_single_remote.rs +++ b/up-streamer/tests/single_local_single_remote.rs @@ -12,8 +12,6 @@ ********************************************************************************/ use async_broadcast::broadcast; -use async_std::sync::{Condvar, Mutex}; -use async_std::task; use futures::future::join; use integration_test_utils::{ check_messages_in_order, check_send_receive_message_discrepancy, local_authority, @@ -29,6 +27,8 @@ use log::debug; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; +use tokio_condvar::Condvar; use up_rust::{UListener, UTransport}; use up_streamer::{Endpoint, UStreamer}; use usubscription_static_file::USubscriptionStaticFile; @@ -36,7 +36,7 @@ use usubscription_static_file::USubscriptionStaticFile; const DURATION_TO_RUN_CLIENTS: u128 = 1_000; const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000; -#[async_std::test] +#[tokio::test(flavor = "multi_thread")] async fn single_local_single_remote() { // using async_broadcast to simulate communication protocol let (tx_1, rx_1) = broadcast(10000); @@ -175,7 +175,7 @@ async fn single_local_single_remote() { debug!("after signal_to_resume"); - task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await; + tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await; debug!("past wait on clients to run, now tell them to stop"); { diff --git a/up-streamer/tests/single_local_two_remote_add_remove_rules.rs b/up-streamer/tests/single_local_two_remote_add_remove_rules.rs index 5f8a7bb3..f7e1be37 100644 --- a/up-streamer/tests/single_local_two_remote_add_remove_rules.rs +++ b/up-streamer/tests/single_local_two_remote_add_remove_rules.rs @@ -12,8 +12,6 @@ ********************************************************************************/ use async_broadcast::broadcast; -use async_std::sync::{Condvar, Mutex}; -use async_std::task; use futures::future::join; use integration_test_utils::{ check_messages_in_order, check_send_receive_message_discrepancy, local_authority, @@ -30,6 +28,8 @@ use log::debug; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; +use tokio_condvar::Condvar; use up_rust::{UListener, UTransport}; use up_streamer::{Endpoint, UStreamer}; use usubscription_static_file::USubscriptionStaticFile; @@ -37,7 +37,7 @@ use usubscription_static_file::USubscriptionStaticFile; const DURATION_TO_RUN_CLIENTS: u128 = 500; const SENT_MESSAGE_VEC_CAPACITY: usize = 20_000; -#[async_std::test] +#[tokio::test(flavor = "multi_thread")] async fn single_local_two_remote_add_remove_rules() { // using async_broadcast to simulate communication protocol let (tx_1, rx_1) = broadcast(20000); @@ -206,7 +206,7 @@ async fn single_local_two_remote_add_remove_rules() { debug!("signalled to resume"); - task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await; + tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await; { let mut local_command = local_command.lock().await; @@ -289,7 +289,7 @@ async fn single_local_two_remote_add_remove_rules() { debug!("signalled local, remote_a, remote_b to resume"); - task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await; + tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await; debug!("after running local, remote_a, remote_b"); @@ -345,7 +345,7 @@ async fn single_local_two_remote_add_remove_rules() { debug!("signalled all to resume: local & remote_b"); - task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await; + tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await; { let mut local_command = local_command.lock().await; diff --git a/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs b/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs index f987765a..34aae090 100644 --- a/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs +++ b/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs @@ -12,8 +12,6 @@ ********************************************************************************/ use async_broadcast::broadcast; -use async_std::sync::{Condvar, Mutex}; -use async_std::task; use futures::future::join; use integration_test_utils::{ check_messages_in_order, check_send_receive_message_discrepancy, local_authority, @@ -30,6 +28,8 @@ use log::debug; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; +use tokio_condvar::Condvar; use up_rust::{UListener, UTransport}; use up_streamer::{Endpoint, UStreamer}; use usubscription_static_file::USubscriptionStaticFile; @@ -37,7 +37,7 @@ use usubscription_static_file::USubscriptionStaticFile; const DURATION_TO_RUN_CLIENTS: u128 = 1_000; const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000; -#[async_std::test] +#[tokio::test(flavor = "multi_thread")] async fn single_local_two_remote_authorities_different_remote_transport() { // using async_broadcast to simulate communication protocol let (tx_1, rx_1) = broadcast(20000); @@ -253,7 +253,7 @@ async fn single_local_two_remote_authorities_different_remote_transport() { // Now signal both clients to resume signal_to_resume(all_signal_should_pause.clone()).await; - task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await; + tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await; { let mut local_command = local_command.lock().await; diff --git a/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs b/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs index 67dfe75b..a846a788 100644 --- a/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs +++ b/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs @@ -12,8 +12,6 @@ ********************************************************************************/ use async_broadcast::broadcast; -use async_std::sync::{Condvar, Mutex}; -use async_std::task; use futures::future::join; use integration_test_utils::{ check_messages_in_order, check_send_receive_message_discrepancy, local_authority, @@ -30,6 +28,8 @@ use log::debug; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; +use tokio_condvar::Condvar; use up_rust::{UListener, UTransport}; use up_streamer::{Endpoint, UStreamer}; use usubscription_static_file::USubscriptionStaticFile; @@ -37,7 +37,7 @@ use usubscription_static_file::USubscriptionStaticFile; const DURATION_TO_RUN_CLIENTS: u128 = 1_000; const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000; -#[async_std::test] +#[tokio::test(flavor = "multi_thread")] async fn single_local_two_remote_authorities_same_remote_transport() { // using async_broadcast to simulate communication protocol let (tx_1, rx_1) = broadcast(20000); @@ -257,7 +257,7 @@ async fn single_local_two_remote_authorities_same_remote_transport() { // Now signal both clients to resume signal_to_resume(all_signal_should_pause.clone()).await; - task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await; + tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await; { let mut local_command = local_command.lock().await; diff --git a/up-streamer/tests/usubscription.rs b/up-streamer/tests/usubscription.rs index d42cd76d..7dbee0bd 100644 --- a/up-streamer/tests/usubscription.rs +++ b/up-streamer/tests/usubscription.rs @@ -17,7 +17,7 @@ use up_rust::{UCode, UStatus, UTransport}; use up_streamer::{Endpoint, UStreamer}; use usubscription_static_file::USubscriptionStaticFile; -#[async_std::test] +#[tokio::test(flavor = "multi_thread")] async fn usubscription_bad_data() { let utransport_foo: Arc = Arc::new(UPClientFailingRegister::new("upclient_foo").await); diff --git a/utils/integration-test-utils/Cargo.toml b/utils/integration-test-utils/Cargo.toml index 4061b459..ccfc873a 100644 --- a/utils/integration-test-utils/Cargo.toml +++ b/utils/integration-test-utils/Cargo.toml @@ -23,7 +23,6 @@ license.workspace = true [dependencies] async-broadcast = { version = "0.7.0" } -async-std = { workspace = true, features = ["unstable"] } async-trait = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } @@ -33,3 +32,5 @@ serde_json = { workspace = true } up-rust = { workspace = true } up-streamer = { path = "../../up-streamer" } rand = "0.8.5" +tokio = { workspace = true } +tokio-condvar = { version = "0.3.0" } diff --git a/utils/integration-test-utils/src/integration_test_listeners.rs b/utils/integration-test-utils/src/integration_test_listeners.rs index 8e5be2b8..da6ef627 100644 --- a/utils/integration-test-utils/src/integration_test_listeners.rs +++ b/utils/integration-test-utils/src/integration_test_listeners.rs @@ -11,10 +11,10 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use async_std::sync::Mutex; use async_trait::async_trait; use log::debug; use std::sync::Arc; +use tokio::sync::Mutex; use up_rust::{UListener, UMessage}; #[derive(Clone)] diff --git a/utils/integration-test-utils/src/integration_test_utils.rs b/utils/integration-test-utils/src/integration_test_utils.rs index bfc58924..7cd08663 100644 --- a/utils/integration-test-utils/src/integration_test_utils.rs +++ b/utils/integration-test-utils/src/integration_test_utils.rs @@ -13,14 +13,15 @@ use crate::UPClientFoo; use async_broadcast::{Receiver, Sender}; -use async_std::sync::{Condvar, Mutex}; -use async_std::task; use log::{debug, error}; use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::thread::JoinHandle; use std::time::{Duration, Instant}; +use tokio::runtime::Builder; +use tokio::sync::Mutex; +use tokio_condvar::Condvar; use up_rust::{UListener, UMessage, UStatus, UTransport, UUri, UUID}; pub type Signal = Arc<(Mutex, Condvar)>; @@ -197,13 +198,13 @@ async fn poll_for_new_command( let (lock, cvar) = &*pause_execution; let mut should_pause = lock.lock().await; while *should_pause { - task::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let command = client_command.lock().await; if *command == ClientCommand::Stop { let times: u64 = client.times_received.load(Ordering::SeqCst); println!("{name} had rx of: {times}"); - task::sleep(Duration::from_millis(1000)).await; + tokio::time::sleep(Duration::from_millis(1000)).await; return true; } else { match &*command { @@ -292,7 +293,13 @@ pub async fn run_client( client_history: ClientHistory, ) -> JoinHandle> { std::thread::spawn(move || { - task::block_on(async move { + // Create a new single-threaded runtime + let runtime = Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime"); + + runtime.block_on(async move { let client = configure_client(&client_configuration).await; let mut active_connection_listing = Vec::new(); @@ -353,7 +360,7 @@ pub async fn run_client( ) .await; - task::sleep(Duration::from_millis(1)).await; + tokio::time::sleep(Duration::from_millis(1)).await; } }) }) diff --git a/utils/integration-test-utils/src/up_client_foo.rs b/utils/integration-test-utils/src/up_client_foo.rs index 1c5e1b87..28158898 100644 --- a/utils/integration-test-utils/src/up_client_foo.rs +++ b/utils/integration-test-utils/src/up_client_foo.rs @@ -12,14 +12,14 @@ ********************************************************************************/ use async_broadcast::{Receiver, Sender}; -use async_std::sync::Mutex; -use async_std::task; use async_trait::async_trait; use log::{debug, error}; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::thread; +use tokio::runtime::Builder; +use tokio::sync::Mutex; use up_rust::{ ComparableListener, UAttributes, UCode, UListener, UMessage, UMessageType, UStatus, UTransport, UUri, @@ -70,7 +70,13 @@ impl UPClientFoo { let authority_listeners = self.authority_listeners.clone(); let times_received = self.times_received.clone(); thread::spawn(move || { - task::block_on(async { + // Create a new single-threaded runtime + let runtime = Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime"); + + runtime.block_on(async move { while let Ok(received) = protocol_receiver.recv().await { match &received { Ok(msg) => {