Skip to content

Commit

Permalink
Update tests to tokio from async-std
Browse files Browse the repository at this point in the history
  • Loading branch information
PLeVasseur committed Nov 10, 2024
1 parent f7351ad commit 092f3ab
Show file tree
Hide file tree
Showing 13 changed files with 68 additions and 42 deletions.
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions up-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ usubscription-static-file = {path="../utils/usubscription-static-file"}
async-broadcast = { version = "0.7.0" }
chrono = { version = "0.4.31", features = [] }
integration-test-utils = { path = "../utils/integration-test-utils" }
tokio-condvar = { version = "0.3.0" }
4 changes: 2 additions & 2 deletions up-streamer/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::sync::Arc;
use log::*;
use std::sync::Arc;
use up_rust::UTransport;

const ENDPOINT_TAG: &str = "Endpoint:";
Expand All @@ -26,7 +26,7 @@ const ENDPOINT_FN_NEW_TAG: &str = "new():";
///
/// ```
/// use std::sync::Arc;
/// use async_std::sync::Mutex;
/// use tokio::sync::Mutex;
/// use up_rust::UTransport;
/// use up_streamer::Endpoint;
///
Expand Down
18 changes: 9 additions & 9 deletions up-streamer/src/ustreamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ const USTREAMER_FN_DELETE_FORWARDING_RULE_TAG: &str = "delete_forwarding_rule():
const THREAD_NUM: usize = 10;

// Create a separate tokio Runtime for running the callback
lazy_static::lazy_static! {
lazy_static! {
static ref CB_RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(THREAD_NUM)
.enable_all()
.build()
.expect("Unable to create callback runtime");
.worker_threads(THREAD_NUM)
.enable_all()
.build()
.expect("Unable to create callback runtime");
}

fn uauthority_to_uuri(authority_name: &str) -> UUri {
Expand Down Expand Up @@ -367,7 +367,7 @@ impl ForwardingListeners {
/// use usubscription_static_file::USubscriptionStaticFile;
/// use std::sync::Arc;
/// use std::path::PathBuf;
/// use async_std::sync::Mutex;
/// use tokio::sync::Mutex;
/// use up_rust::{UListener, UTransport};
/// use up_streamer::{Endpoint, UStreamer};
/// # pub mod up_client_foo {
Expand Down Expand Up @@ -1054,7 +1054,7 @@ mod tests {
}
}

#[async_std::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_simple_with_a_single_input_and_output_endpoint() {
// Local endpoint
let local_authority = "local";
Expand Down Expand Up @@ -1123,7 +1123,7 @@ mod tests {
.is_err());
}

#[async_std::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_advanced_where_there_is_a_local_endpoint_and_two_remote_endpoints() {
// Local endpoint
let local_authority = "local";
Expand Down Expand Up @@ -1188,7 +1188,7 @@ mod tests {
.is_ok());
}

#[async_std::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_advanced_where_there_is_a_local_endpoint_and_two_remote_endpoints_but_the_remote_endpoints_have_the_same_instance_of_utransport(
) {
// Local endpoint
Expand Down
8 changes: 4 additions & 4 deletions up-streamer/tests/single_local_single_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
********************************************************************************/

use async_broadcast::broadcast;
use async_std::sync::{Condvar, Mutex};
use async_std::task;
use futures::future::join;
use integration_test_utils::{
check_messages_in_order, check_send_receive_message_discrepancy, local_authority,
Expand All @@ -29,14 +27,16 @@ use log::debug;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio_condvar::Condvar;
use up_rust::{UListener, UTransport};
use up_streamer::{Endpoint, UStreamer};
use usubscription_static_file::USubscriptionStaticFile;

const DURATION_TO_RUN_CLIENTS: u128 = 1_000;
const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000;

#[async_std::test]
#[tokio::test(flavor = "multi_thread")]
async fn single_local_single_remote() {
// using async_broadcast to simulate communication protocol
let (tx_1, rx_1) = broadcast(10000);
Expand Down Expand Up @@ -175,7 +175,7 @@ async fn single_local_single_remote() {

debug!("after signal_to_resume");

task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;

debug!("past wait on clients to run, now tell them to stop");
{
Expand Down
12 changes: 6 additions & 6 deletions up-streamer/tests/single_local_two_remote_add_remove_rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
********************************************************************************/

use async_broadcast::broadcast;
use async_std::sync::{Condvar, Mutex};
use async_std::task;
use futures::future::join;
use integration_test_utils::{
check_messages_in_order, check_send_receive_message_discrepancy, local_authority,
Expand All @@ -30,14 +28,16 @@ use log::debug;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio_condvar::Condvar;
use up_rust::{UListener, UTransport};
use up_streamer::{Endpoint, UStreamer};
use usubscription_static_file::USubscriptionStaticFile;

const DURATION_TO_RUN_CLIENTS: u128 = 500;
const SENT_MESSAGE_VEC_CAPACITY: usize = 20_000;

#[async_std::test]
#[tokio::test(flavor = "multi_thread")]
async fn single_local_two_remote_add_remove_rules() {
// using async_broadcast to simulate communication protocol
let (tx_1, rx_1) = broadcast(20000);
Expand Down Expand Up @@ -206,7 +206,7 @@ async fn single_local_two_remote_add_remove_rules() {

debug!("signalled to resume");

task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;

{
let mut local_command = local_command.lock().await;
Expand Down Expand Up @@ -289,7 +289,7 @@ async fn single_local_two_remote_add_remove_rules() {

debug!("signalled local, remote_a, remote_b to resume");

task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;

debug!("after running local, remote_a, remote_b");

Expand Down Expand Up @@ -345,7 +345,7 @@ async fn single_local_two_remote_add_remove_rules() {

debug!("signalled all to resume: local & remote_b");

task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;

{
let mut local_command = local_command.lock().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
********************************************************************************/

use async_broadcast::broadcast;
use async_std::sync::{Condvar, Mutex};
use async_std::task;
use futures::future::join;
use integration_test_utils::{
check_messages_in_order, check_send_receive_message_discrepancy, local_authority,
Expand All @@ -30,14 +28,16 @@ use log::debug;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio_condvar::Condvar;
use up_rust::{UListener, UTransport};
use up_streamer::{Endpoint, UStreamer};
use usubscription_static_file::USubscriptionStaticFile;

const DURATION_TO_RUN_CLIENTS: u128 = 1_000;
const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000;

#[async_std::test]
#[tokio::test(flavor = "multi_thread")]
async fn single_local_two_remote_authorities_different_remote_transport() {
// using async_broadcast to simulate communication protocol
let (tx_1, rx_1) = broadcast(20000);
Expand Down Expand Up @@ -253,7 +253,7 @@ async fn single_local_two_remote_authorities_different_remote_transport() {
// Now signal both clients to resume
signal_to_resume(all_signal_should_pause.clone()).await;

task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;

{
let mut local_command = local_command.lock().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
********************************************************************************/

use async_broadcast::broadcast;
use async_std::sync::{Condvar, Mutex};
use async_std::task;
use futures::future::join;
use integration_test_utils::{
check_messages_in_order, check_send_receive_message_discrepancy, local_authority,
Expand All @@ -30,14 +28,16 @@ use log::debug;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio_condvar::Condvar;
use up_rust::{UListener, UTransport};
use up_streamer::{Endpoint, UStreamer};
use usubscription_static_file::USubscriptionStaticFile;

const DURATION_TO_RUN_CLIENTS: u128 = 1_000;
const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000;

#[async_std::test]
#[tokio::test(flavor = "multi_thread")]
async fn single_local_two_remote_authorities_same_remote_transport() {
// using async_broadcast to simulate communication protocol
let (tx_1, rx_1) = broadcast(20000);
Expand Down Expand Up @@ -257,7 +257,7 @@ async fn single_local_two_remote_authorities_same_remote_transport() {
// Now signal both clients to resume
signal_to_resume(all_signal_should_pause.clone()).await;

task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;

{
let mut local_command = local_command.lock().await;
Expand Down
2 changes: 1 addition & 1 deletion up-streamer/tests/usubscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use up_rust::{UCode, UStatus, UTransport};
use up_streamer::{Endpoint, UStreamer};
use usubscription_static_file::USubscriptionStaticFile;

#[async_std::test]
#[tokio::test(flavor = "multi_thread")]
async fn usubscription_bad_data() {
let utransport_foo: Arc<dyn UTransport> =
Arc::new(UPClientFailingRegister::new("upclient_foo").await);
Expand Down
3 changes: 2 additions & 1 deletion utils/integration-test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ license.workspace = true

[dependencies]
async-broadcast = { version = "0.7.0" }
async-std = { workspace = true, features = ["unstable"] }
async-trait = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
Expand All @@ -33,3 +32,5 @@ serde_json = { workspace = true }
up-rust = { workspace = true }
up-streamer = { path = "../../up-streamer" }
rand = "0.8.5"
tokio = { workspace = true }
tokio-condvar = { version = "0.3.0" }
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use async_std::sync::Mutex;
use async_trait::async_trait;
use log::debug;
use std::sync::Arc;
use tokio::sync::Mutex;
use up_rust::{UListener, UMessage};

#[derive(Clone)]
Expand Down
19 changes: 13 additions & 6 deletions utils/integration-test-utils/src/integration_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@

use crate::UPClientFoo;
use async_broadcast::{Receiver, Sender};
use async_std::sync::{Condvar, Mutex};
use async_std::task;
use log::{debug, error};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use tokio::runtime::Builder;
use tokio::sync::Mutex;
use tokio_condvar::Condvar;
use up_rust::{UListener, UMessage, UStatus, UTransport, UUri, UUID};

pub type Signal = Arc<(Mutex<bool>, Condvar)>;
Expand Down Expand Up @@ -197,13 +198,13 @@ async fn poll_for_new_command(
let (lock, cvar) = &*pause_execution;
let mut should_pause = lock.lock().await;
while *should_pause {
task::sleep(Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(100)).await;

let command = client_command.lock().await;
if *command == ClientCommand::Stop {
let times: u64 = client.times_received.load(Ordering::SeqCst);
println!("{name} had rx of: {times}");
task::sleep(Duration::from_millis(1000)).await;
tokio::time::sleep(Duration::from_millis(1000)).await;
return true;
} else {
match &*command {
Expand Down Expand Up @@ -292,7 +293,13 @@ pub async fn run_client(
client_history: ClientHistory,
) -> JoinHandle<Vec<UMessage>> {
std::thread::spawn(move || {
task::block_on(async move {
// Create a new single-threaded runtime
let runtime = Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");

runtime.block_on(async move {
let client = configure_client(&client_configuration).await;

let mut active_connection_listing = Vec::new();
Expand Down Expand Up @@ -353,7 +360,7 @@ pub async fn run_client(
)
.await;

task::sleep(Duration::from_millis(1)).await;
tokio::time::sleep(Duration::from_millis(1)).await;
}
})
})
Expand Down
Loading

0 comments on commit 092f3ab

Please sign in to comment.