From 8f37c49553afd129ea73e9046cf63aef3f547538 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Thu, 11 Jan 2024 14:57:47 -0500 Subject: [PATCH 1/3] Use a short timeout instead of now_or_never. --- tests/test_high_consumers.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tests/test_high_consumers.rs b/tests/test_high_consumers.rs index 97ca4f5a0..548ccb94d 100644 --- a/tests/test_high_consumers.rs +++ b/tests/test_high_consumers.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::error::Error; use std::sync::Arc; -use futures::future::{self, FutureExt}; +use futures::future; use futures::stream::StreamExt; use maplit::hashmap; use rdkafka_sys::RDKafkaErrorCode; @@ -491,7 +491,7 @@ async fn test_consumer_commit_metadata() -> Result<(), Box> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_consume_partition_order() { let _r = env_logger::try_init(); @@ -545,8 +545,8 @@ async fn test_consume_partition_order() { let partition1 = consumer.split_partition_queue(&topic_name, 1).unwrap(); let mut i = 0; - while i < 12 { - if let Some(m) = consumer.recv().now_or_never() { + while i < 5 { + if let Ok(m) = time::timeout(Duration::from_millis(1000), consumer.recv()).await { // retry on transient errors until we get a message let m = match m { Err(KafkaError::MessageConsumption( @@ -564,9 +564,11 @@ async fn test_consume_partition_order() { let partition: i32 = m.partition(); assert!(partition == 0 || partition == 2); i += 1; + } else { + panic!("Timeout receiving message"); } - if let Some(m) = partition1.recv().now_or_never() { + if let Ok(m) = time::timeout(Duration::from_millis(1000), partition1.recv()).await { // retry on transient errors until we get a message let m = match m { Err(KafkaError::MessageConsumption( @@ -583,6 +585,8 @@ async fn test_consume_partition_order() { }; assert_eq!(m.partition(), 1); i += 1; + } else { + panic!("Timeout receiving message"); } } } From e347f9aece4a07428bf132e821e5c146b2b07e54 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Thu, 11 Jan 2024 14:58:07 -0500 Subject: [PATCH 2/3] Disable valgrind for now, and start up kafka/zk before starting building --- test_suite.sh | 50 ++++++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/test_suite.sh b/test_suite.sh index 3d875bad9..900abd507 100755 --- a/test_suite.sh +++ b/test_suite.sh @@ -29,34 +29,36 @@ run_with_valgrind() { # Initialize. git submodule update --init -cargo test --no-run docker-compose up -d +cargo test # Run unit tests. -echo_good "*** Run unit tests ***" -for test_file in target/debug/deps/rdkafka-* -do - if [[ -x "$test_file" ]] - then - echo_good "Executing "$test_file"" - run_with_valgrind "$test_file" - fi -done -echo_good "*** Unit tests succeeded ***" - -# Run integration tests. - -echo_good "*** Run unit tests ***" -for test_file in target/debug/deps/test_* -do - if [[ -x "$test_file" ]] - then - echo_good "Executing "$test_file"" - run_with_valgrind "$test_file" - fi -done -echo_good "*** Integration tests succeeded ***" +#echo_good "*** Run unit tests ***" +#for test_file in target/debug/deps/rdkafka-* +#do +# if [[ -x "$test_file" ]] +# then +# echo_good "Executing "$test_file"" +# run_with_valgrind "$test_file" +# fi +#done +#echo_good "*** Unit tests succeeded ***" +# +## Run integration tests. +# +#echo_good "*** Run integration tests ***" +#for test_file in target/debug/deps/test_* +#do +# if [[ -x "$test_file" ]] +# then +# #echo_good "*** Restarting kafka/zk ***" +# #docker-compose restart --timeout 30 +# echo_good "Executing "$test_file"" +# run_with_valgrind "$test_file" +# fi +#done +#echo_good "*** Integration tests succeeded ***" # Run smol runtime example. From 321c04078a365635680f17704e10e93e40b0ac2a Mon Sep 17 00:00:00 2001 From: David Blewett Date: Thu, 11 Jan 2024 15:30:10 -0500 Subject: [PATCH 3/3] Avoid topic pollution by prefixing with test name. --- tests/test_admin.rs | 14 +++++++------- tests/test_high_consumers.rs | 20 ++++++++++---------- tests/test_high_producers.rs | 4 ++-- tests/test_low_consumers.rs | 12 ++++++------ tests/test_low_producers.rs | 22 ++++++++++++++-------- tests/test_metadata.rs | 6 +++--- tests/test_transactions.rs | 8 ++++---- tests/utils.rs | 6 +++--- 8 files changed, 49 insertions(+), 43 deletions(-) diff --git a/tests/test_admin.rs b/tests/test_admin.rs index 79c20ac81..846a96c2a 100644 --- a/tests/test_admin.rs +++ b/tests/test_admin.rs @@ -32,7 +32,7 @@ fn create_admin_client() -> AdminClient { async fn create_consumer_group(consumer_group_name: &str) { let admin_client = create_admin_client(); - let topic_name = &rand_test_topic(); + let topic_name = &rand_test_topic(consumer_group_name); let consumer: BaseConsumer = create_config() .set("group.id", consumer_group_name.clone()) .create() @@ -124,8 +124,8 @@ async fn test_topics() { // Verify that topics are created as specified, and that they can later // be deleted. { - let name1 = rand_test_topic(); - let name2 = rand_test_topic(); + let name1 = rand_test_topic("test_topics"); + let name2 = rand_test_topic("test_topics"); // Test both the builder API and the literal construction. let topic1 = @@ -254,7 +254,7 @@ async fn test_topics() { // Verify that incorrect replication configurations are ignored when // creating partitions. { - let name = rand_test_topic(); + let name = rand_test_topic("test_topics"); let topic = NewTopic::new(&name, 1, TopicReplication::Fixed(1)); let res = admin_client @@ -291,7 +291,7 @@ async fn test_topics() { // Verify that deleting a non-existent topic fails. { - let name = rand_test_topic(); + let name = rand_test_topic("test_topics"); let res = admin_client .delete_topics(&[&name], &opts) .await @@ -305,8 +305,8 @@ async fn test_topics() { // Verify that mixed-success operations properly report the successful and // failing operators. { - let name1 = rand_test_topic(); - let name2 = rand_test_topic(); + let name1 = rand_test_topic("test_topics"); + let name2 = rand_test_topic("test_topics"); let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1)); let topic2 = NewTopic::new(&name2, 1, TopicReplication::Fixed(1)); diff --git a/tests/test_high_consumers.rs b/tests/test_high_consumers.rs index 548ccb94d..4aec62595 100644 --- a/tests/test_high_consumers.rs +++ b/tests/test_high_consumers.rs @@ -70,7 +70,7 @@ async fn test_produce_consume_base() { let _r = env_logger::try_init(); let start_time = current_time_millis(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_produce_consume_base"); let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await; let consumer = create_stream_consumer(&rand_test_group(), None); consumer.subscribe(&[topic_name.as_str()]).unwrap(); @@ -105,7 +105,7 @@ async fn test_produce_consume_base() { async fn test_produce_consume_base_concurrent() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_produce_consume_base_concurrent"); populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await; let consumer = Arc::new(create_stream_consumer(&rand_test_group(), None)); @@ -135,7 +135,7 @@ async fn test_produce_consume_base_concurrent() { async fn test_produce_consume_base_assign() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_produce_consume_base_assign"); populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await; populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await; populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await; @@ -170,7 +170,7 @@ async fn test_produce_consume_base_assign() { async fn test_produce_consume_base_unassign() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_produce_consume_base_unassign"); populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await; populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await; populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await; @@ -195,7 +195,7 @@ async fn test_produce_consume_base_unassign() { async fn test_produce_consume_base_incremental_assign_and_unassign() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_produce_consume_base_incremental_assign_and_unassign"); populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await; populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await; populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await; @@ -236,7 +236,7 @@ async fn test_produce_consume_base_incremental_assign_and_unassign() { async fn test_produce_consume_with_timestamp() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_produce_consume_with_timestamp"); let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), Some(1111)).await; let consumer = create_stream_consumer(&rand_test_group(), None); @@ -277,7 +277,7 @@ async fn test_produce_consume_with_timestamp() { async fn test_consumer_commit_message() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_consumer_commit_message"); populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await; populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await; populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await; @@ -355,7 +355,7 @@ async fn test_consumer_commit_message() { async fn test_consumer_store_offset_commit() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_consumer_store_offset_commit"); populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await; populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await; populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await; @@ -440,7 +440,7 @@ async fn test_consumer_store_offset_commit() { async fn test_consumer_commit_metadata() -> Result<(), Box> { let _ = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_consumer_commit_metadata"); let group_name = rand_test_group(); populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await; @@ -495,7 +495,7 @@ async fn test_consumer_commit_metadata() -> Result<(), Box> { async fn test_consume_partition_order() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_consume_partition_order"); populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(0), None).await; populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(1), None).await; populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(2), None).await; diff --git a/tests/test_high_producers.rs b/tests/test_high_producers.rs index 85b8784e0..bddc1beae 100644 --- a/tests/test_high_producers.rs +++ b/tests/test_high_producers.rs @@ -30,7 +30,7 @@ fn future_producer(config_overrides: HashMap<&str, &str>) -> FutureProducer = (0..10) .map(|_| { @@ -60,7 +60,7 @@ async fn test_future_producer_send_full() { config.insert("message.timeout.ms", "5000"); config.insert("queue.buffering.max.messages", "1"); let producer = &future_producer(config); - let topic_name = &rand_test_topic(); + let topic_name = &rand_test_topic("test_future_producer_send_full"); // Fill up the queue. producer diff --git a/tests/test_low_consumers.rs b/tests/test_low_consumers.rs index c4aa305f7..e6642b688 100644 --- a/tests/test_low_consumers.rs +++ b/tests/test_low_consumers.rs @@ -31,7 +31,7 @@ fn create_base_consumer( async fn test_produce_consume_seek() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_produce_consume_seek"); populate_topic(&topic_name, 5, &value_fn, &key_fn, Some(0), None).await; let consumer = create_base_consumer(&rand_test_group(), None); consumer.subscribe(&[topic_name.as_str()]).unwrap(); @@ -96,7 +96,7 @@ async fn test_produce_consume_seek() { async fn test_produce_consume_seek_partitions() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_produce_consume_seek_partitions"); populate_topic(&topic_name, 30, &value_fn, &key_fn, None, None).await; let consumer = create_base_consumer(&rand_test_group(), None); @@ -158,7 +158,7 @@ async fn test_produce_consume_iter() { let _r = env_logger::try_init(); let start_time = current_time_millis(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_produce_consume_iter"); let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await; let consumer = create_base_consumer(&rand_test_group(), None); consumer.subscribe(&[topic_name.as_str()]).unwrap(); @@ -196,7 +196,7 @@ async fn test_pause_resume_consumer_iter() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_pause_resume_consumer_iter"); populate_topic( &topic_name, MESSAGE_COUNT, @@ -237,7 +237,7 @@ async fn test_pause_resume_consumer_iter() { async fn test_consume_partition_order() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_consume_partition_order"); populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(0), None).await; populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(1), None).await; populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(2), None).await; @@ -357,7 +357,7 @@ async fn test_consume_partition_order() { async fn test_produce_consume_message_queue_nonempty_callback() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_produce_consume_message_queue_nonempty_callback"); create_topic(&topic_name, 1).await; diff --git a/tests/test_low_producers.rs b/tests/test_low_producers.rs index bf5c9ef2a..493642617 100644 --- a/tests/test_low_producers.rs +++ b/tests/test_low_producers.rs @@ -191,7 +191,7 @@ where #[test] fn test_base_producer_queue_full() { let producer = base_producer(hashmap! { "queue.buffering.max.messages" => "10" }); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_base_producer_queue_full"); let results = (0..30) .map(|id| { @@ -235,7 +235,7 @@ fn test_base_producer_timeout() { "bootstrap.servers" => "1.2.3.4" }, ); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_base_producer_timeout"); let results_count = (0..10) .map(|id| { @@ -346,7 +346,7 @@ fn test_base_producer_headers() { ids: ids_set.clone(), }; let producer = base_producer_with_context(context, HashMap::new()); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_base_producer_headers"); let results_count = (0..10) .map(|id| { @@ -387,7 +387,7 @@ fn test_base_producer_headers() { fn test_threaded_producer_send() { let context = CollectingContext::new(); let producer = threaded_producer_with_context(context.clone(), HashMap::new()); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_threaded_producer_send"); let results_count = (0..10) .map(|id| { @@ -431,7 +431,7 @@ fn test_base_producer_opaque_arc() -> Result<(), Box> { let shared_count = Arc::new(Mutex::new(0)); let context = OpaqueArcContext {}; let producer = base_producer_with_context(context, HashMap::new()); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_base_producer_opaque_arc"); let results_count = (0..10) .map(|_| { @@ -482,7 +482,13 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() { let producer = base_producer_with_context(context.clone(), config_overrides); producer - .send(BaseRecord::<(), str, usize>::with_opaque_to(&rand_test_topic(), 0).payload("")) + .send( + BaseRecord::<(), str, usize>::with_opaque_to( + &rand_test_topic("test_register_custom_partitioner_linger_non_zero_key_null"), + 0, + ) + .payload(""), + ) .unwrap(); producer.flush(Duration::from_secs(10)).unwrap(); @@ -499,7 +505,7 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() { fn test_custom_partitioner_base_producer() { let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2)); let producer = base_producer_with_context(context.clone(), HashMap::new()); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_custom_partitioner_base_producer"); let results_count = (0..10) .map(|id| { @@ -527,7 +533,7 @@ fn test_custom_partitioner_base_producer() { fn test_custom_partitioner_threaded_producer() { let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2)); let producer = threaded_producer_with_context(context.clone(), HashMap::new()); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_custom_partitioner_threaded_producer"); let results_count = (0..10) .map(|id| { diff --git a/tests/test_metadata.rs b/tests/test_metadata.rs index e62bee556..ba4d4f2ca 100644 --- a/tests/test_metadata.rs +++ b/tests/test_metadata.rs @@ -31,7 +31,7 @@ fn create_consumer(group_id: &str) -> StreamConsumer { async fn test_metadata() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_metadata"); populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await; populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await; populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None).await; @@ -92,7 +92,7 @@ async fn test_metadata() { async fn test_subscription() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_subscription"); populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await; let consumer = create_consumer(&rand_test_group()); consumer.subscribe(&[topic_name.as_str()]).unwrap(); @@ -109,7 +109,7 @@ async fn test_subscription() { async fn test_group_membership() { let _r = env_logger::try_init(); - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_group_membership"); let group_name = rand_test_group(); populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await; populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await; diff --git a/tests/test_transactions.rs b/tests/test_transactions.rs index 5d6e47160..1fe84a98b 100644 --- a/tests/test_transactions.rs +++ b/tests/test_transactions.rs @@ -64,8 +64,8 @@ fn count_records(topic: &str, iso: IsolationLevel) -> Result #[tokio::test] async fn test_transaction_abort() -> Result<(), Box> { - let consume_topic = rand_test_topic(); - let produce_topic = rand_test_topic(); + let consume_topic = rand_test_topic("test_transaction_abort"); + let produce_topic = rand_test_topic("test_transaction_abort"); populate_topic(&consume_topic, 30, &value_fn, &key_fn, Some(0), None).await; @@ -132,8 +132,8 @@ async fn test_transaction_abort() -> Result<(), Box> { #[tokio::test] async fn test_transaction_commit() -> Result<(), Box> { - let consume_topic = rand_test_topic(); - let produce_topic = rand_test_topic(); + let consume_topic = rand_test_topic("test_transaction_commit"); + let produce_topic = rand_test_topic("test_transaction_commit"); populate_topic(&consume_topic, 30, &value_fn, &key_fn, Some(0), None).await; diff --git a/tests/utils.rs b/tests/utils.rs index 6730f9747..447213672 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -17,12 +17,12 @@ use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::statistics::Statistics; use rdkafka::TopicPartitionList; -pub fn rand_test_topic() -> String { +pub fn rand_test_topic(test_name: &str) -> String { let id = rand::thread_rng() .gen_ascii_chars() .take(10) .collect::(); - format!("__test_{}", id) + format!("__{}_{}", test_name, id) } pub fn rand_test_group() -> String { @@ -170,7 +170,7 @@ mod tests { #[tokio::test] async fn test_populate_topic() { - let topic_name = rand_test_topic(); + let topic_name = rand_test_topic("test_populate_topic"); let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None).await; let total_messages = message_map