Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable valgrind in CI temporarily #645

Merged
merged 3 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 26 additions & 24 deletions test_suite.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,36 @@ run_with_valgrind() {
# Initialize.

git submodule update --init
cargo test --no-run
docker-compose up -d
cargo test

# Run unit tests.

echo_good "*** Run unit tests ***"
for test_file in target/debug/deps/rdkafka-*
do
if [[ -x "$test_file" ]]
then
echo_good "Executing "$test_file""
run_with_valgrind "$test_file"
fi
done
echo_good "*** Unit tests succeeded ***"

# Run integration tests.

echo_good "*** Run unit tests ***"
for test_file in target/debug/deps/test_*
do
if [[ -x "$test_file" ]]
then
echo_good "Executing "$test_file""
run_with_valgrind "$test_file"
fi
done
echo_good "*** Integration tests succeeded ***"
#echo_good "*** Run unit tests ***"
#for test_file in target/debug/deps/rdkafka-*
#do
# if [[ -x "$test_file" ]]
# then
# echo_good "Executing "$test_file""
# run_with_valgrind "$test_file"
# fi
#done
#echo_good "*** Unit tests succeeded ***"
#
## Run integration tests.
#
#echo_good "*** Run integration tests ***"
#for test_file in target/debug/deps/test_*
#do
# if [[ -x "$test_file" ]]
# then
# #echo_good "*** Restarting kafka/zk ***"
# #docker-compose restart --timeout 30
# echo_good "Executing "$test_file""
# run_with_valgrind "$test_file"
# fi
#done
#echo_good "*** Integration tests succeeded ***"

# Run smol runtime example.

Expand Down
14 changes: 7 additions & 7 deletions tests/test_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn create_admin_client() -> AdminClient<DefaultClientContext> {

async fn create_consumer_group(consumer_group_name: &str) {
let admin_client = create_admin_client();
let topic_name = &rand_test_topic();
let topic_name = &rand_test_topic(consumer_group_name);
let consumer: BaseConsumer = create_config()
.set("group.id", consumer_group_name.clone())
.create()
Expand Down Expand Up @@ -124,8 +124,8 @@ async fn test_topics() {
// Verify that topics are created as specified, and that they can later
// be deleted.
{
let name1 = rand_test_topic();
let name2 = rand_test_topic();
let name1 = rand_test_topic("test_topics");
let name2 = rand_test_topic("test_topics");

// Test both the builder API and the literal construction.
let topic1 =
Expand Down Expand Up @@ -254,7 +254,7 @@ async fn test_topics() {
// Verify that incorrect replication configurations are ignored when
// creating partitions.
{
let name = rand_test_topic();
let name = rand_test_topic("test_topics");
let topic = NewTopic::new(&name, 1, TopicReplication::Fixed(1));

let res = admin_client
Expand Down Expand Up @@ -291,7 +291,7 @@ async fn test_topics() {

// Verify that deleting a non-existent topic fails.
{
let name = rand_test_topic();
let name = rand_test_topic("test_topics");
let res = admin_client
.delete_topics(&[&name], &opts)
.await
Expand All @@ -305,8 +305,8 @@ async fn test_topics() {
// Verify that mixed-success operations properly report the successful and
// failing operators.
{
let name1 = rand_test_topic();
let name2 = rand_test_topic();
let name1 = rand_test_topic("test_topics");
let name2 = rand_test_topic("test_topics");

let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1));
let topic2 = NewTopic::new(&name2, 1, TopicReplication::Fixed(1));
Expand Down
34 changes: 19 additions & 15 deletions tests/test_high_consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;

use futures::future::{self, FutureExt};
use futures::future;
use futures::stream::StreamExt;
use maplit::hashmap;
use rdkafka_sys::RDKafkaErrorCode;
Expand Down Expand Up @@ -70,7 +70,7 @@ async fn test_produce_consume_base() {
let _r = env_logger::try_init();

let start_time = current_time_millis();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base");
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
let consumer = create_stream_consumer(&rand_test_group(), None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();
Expand Down Expand Up @@ -105,7 +105,7 @@ async fn test_produce_consume_base() {
async fn test_produce_consume_base_concurrent() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base_concurrent");
populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;

let consumer = Arc::new(create_stream_consumer(&rand_test_group(), None));
Expand Down Expand Up @@ -135,7 +135,7 @@ async fn test_produce_consume_base_concurrent() {
async fn test_produce_consume_base_assign() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base_assign");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
Expand Down Expand Up @@ -170,7 +170,7 @@ async fn test_produce_consume_base_assign() {
async fn test_produce_consume_base_unassign() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base_unassign");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
Expand All @@ -195,7 +195,7 @@ async fn test_produce_consume_base_unassign() {
async fn test_produce_consume_base_incremental_assign_and_unassign() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base_incremental_assign_and_unassign");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
Expand Down Expand Up @@ -236,7 +236,7 @@ async fn test_produce_consume_base_incremental_assign_and_unassign() {
async fn test_produce_consume_with_timestamp() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_with_timestamp");
let message_map =
populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), Some(1111)).await;
let consumer = create_stream_consumer(&rand_test_group(), None);
Expand Down Expand Up @@ -277,7 +277,7 @@ async fn test_produce_consume_with_timestamp() {
async fn test_consumer_commit_message() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consumer_commit_message");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await;
Expand Down Expand Up @@ -355,7 +355,7 @@ async fn test_consumer_commit_message() {
async fn test_consumer_store_offset_commit() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consumer_store_offset_commit");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await;
Expand Down Expand Up @@ -440,7 +440,7 @@ async fn test_consumer_store_offset_commit() {
async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
let _ = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consumer_commit_metadata");
let group_name = rand_test_group();
populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;

Expand Down Expand Up @@ -491,11 +491,11 @@ async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_consume_partition_order() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consume_partition_order");
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(2), None).await;
Expand Down Expand Up @@ -545,8 +545,8 @@ async fn test_consume_partition_order() {
let partition1 = consumer.split_partition_queue(&topic_name, 1).unwrap();

let mut i = 0;
while i < 12 {
if let Some(m) = consumer.recv().now_or_never() {
while i < 5 {
if let Ok(m) = time::timeout(Duration::from_millis(1000), consumer.recv()).await {
// retry on transient errors until we get a message
let m = match m {
Err(KafkaError::MessageConsumption(
Expand All @@ -564,9 +564,11 @@ async fn test_consume_partition_order() {
let partition: i32 = m.partition();
assert!(partition == 0 || partition == 2);
i += 1;
} else {
panic!("Timeout receiving message");
}

if let Some(m) = partition1.recv().now_or_never() {
if let Ok(m) = time::timeout(Duration::from_millis(1000), partition1.recv()).await {
// retry on transient errors until we get a message
let m = match m {
Err(KafkaError::MessageConsumption(
Expand All @@ -583,6 +585,8 @@ async fn test_consume_partition_order() {
};
assert_eq!(m.partition(), 1);
i += 1;
} else {
panic!("Timeout receiving message");
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions tests/test_high_producers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn future_producer(config_overrides: HashMap<&str, &str>) -> FutureProducer<Defa
#[tokio::test]
async fn test_future_producer_send() {
let producer = future_producer(HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_future_producer_send");

let results: FuturesUnordered<_> = (0..10)
.map(|_| {
Expand Down Expand Up @@ -60,7 +60,7 @@ async fn test_future_producer_send_full() {
config.insert("message.timeout.ms", "5000");
config.insert("queue.buffering.max.messages", "1");
let producer = &future_producer(config);
let topic_name = &rand_test_topic();
let topic_name = &rand_test_topic("test_future_producer_send_full");

// Fill up the queue.
producer
Expand Down
12 changes: 6 additions & 6 deletions tests/test_low_consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn create_base_consumer(
async fn test_produce_consume_seek() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_seek");
populate_topic(&topic_name, 5, &value_fn, &key_fn, Some(0), None).await;
let consumer = create_base_consumer(&rand_test_group(), None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();
Expand Down Expand Up @@ -96,7 +96,7 @@ async fn test_produce_consume_seek() {
async fn test_produce_consume_seek_partitions() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_seek_partitions");
populate_topic(&topic_name, 30, &value_fn, &key_fn, None, None).await;

let consumer = create_base_consumer(&rand_test_group(), None);
Expand Down Expand Up @@ -158,7 +158,7 @@ async fn test_produce_consume_iter() {
let _r = env_logger::try_init();

let start_time = current_time_millis();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_iter");
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
let consumer = create_base_consumer(&rand_test_group(), None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();
Expand Down Expand Up @@ -196,7 +196,7 @@ async fn test_pause_resume_consumer_iter() {

let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_pause_resume_consumer_iter");
populate_topic(
&topic_name,
MESSAGE_COUNT,
Expand Down Expand Up @@ -237,7 +237,7 @@ async fn test_pause_resume_consumer_iter() {
async fn test_consume_partition_order() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consume_partition_order");
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(2), None).await;
Expand Down Expand Up @@ -357,7 +357,7 @@ async fn test_consume_partition_order() {
async fn test_produce_consume_message_queue_nonempty_callback() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_message_queue_nonempty_callback");

create_topic(&topic_name, 1).await;

Expand Down
22 changes: 14 additions & 8 deletions tests/test_low_producers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ where
#[test]
fn test_base_producer_queue_full() {
let producer = base_producer(hashmap! { "queue.buffering.max.messages" => "10" });
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_base_producer_queue_full");

let results = (0..30)
.map(|id| {
Expand Down Expand Up @@ -235,7 +235,7 @@ fn test_base_producer_timeout() {
"bootstrap.servers" => "1.2.3.4"
},
);
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_base_producer_timeout");

let results_count = (0..10)
.map(|id| {
Expand Down Expand Up @@ -346,7 +346,7 @@ fn test_base_producer_headers() {
ids: ids_set.clone(),
};
let producer = base_producer_with_context(context, HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_base_producer_headers");

let results_count = (0..10)
.map(|id| {
Expand Down Expand Up @@ -387,7 +387,7 @@ fn test_base_producer_headers() {
fn test_threaded_producer_send() {
let context = CollectingContext::new();
let producer = threaded_producer_with_context(context.clone(), HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_threaded_producer_send");

let results_count = (0..10)
.map(|id| {
Expand Down Expand Up @@ -431,7 +431,7 @@ fn test_base_producer_opaque_arc() -> Result<(), Box<dyn Error>> {
let shared_count = Arc::new(Mutex::new(0));
let context = OpaqueArcContext {};
let producer = base_producer_with_context(context, HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_base_producer_opaque_arc");

let results_count = (0..10)
.map(|_| {
Expand Down Expand Up @@ -482,7 +482,13 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() {
let producer = base_producer_with_context(context.clone(), config_overrides);

producer
.send(BaseRecord::<(), str, usize>::with_opaque_to(&rand_test_topic(), 0).payload(""))
.send(
BaseRecord::<(), str, usize>::with_opaque_to(
&rand_test_topic("test_register_custom_partitioner_linger_non_zero_key_null"),
0,
)
.payload(""),
)
.unwrap();
producer.flush(Duration::from_secs(10)).unwrap();

Expand All @@ -499,7 +505,7 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() {
fn test_custom_partitioner_base_producer() {
let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2));
let producer = base_producer_with_context(context.clone(), HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_custom_partitioner_base_producer");

let results_count = (0..10)
.map(|id| {
Expand Down Expand Up @@ -527,7 +533,7 @@ fn test_custom_partitioner_base_producer() {
fn test_custom_partitioner_threaded_producer() {
let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2));
let producer = threaded_producer_with_context(context.clone(), HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_custom_partitioner_threaded_producer");

let results_count = (0..10)
.map(|id| {
Expand Down
Loading