diff --git a/changelog.d/20687-kafka_source_validate_deadlock.fix.md b/changelog.d/20687-kafka_source_validate_deadlock.fix.md new file mode 100644 index 0000000000000..8d562b144e5f2 --- /dev/null +++ b/changelog.d/20687-kafka_source_validate_deadlock.fix.md @@ -0,0 +1,3 @@ +The kafka source does not deadlock or cause consumer group rebalancing during `vector validate`. + +authors: jches diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 91d55ab0bdf1a..15f7315721c81 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -443,6 +443,12 @@ async fn kafka_source( // EOF signal allowing the coordination task to tell the kafka client task when all partitions have reached EOF let (eof_tx, eof_rx) = eof.then(oneshot::channel::<()>).unzip(); + let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect(); + if let Err(e) = consumer.subscribe(&topics).context(SubscribeSnafu) { + error!("{}", e); + return Err(()); + } + let coordination_task = { let span = span.clone(); let consumer = Arc::clone(&consumer); @@ -1233,8 +1239,6 @@ fn create_consumer( Span::current(), )) .context(CreateSnafu)?; - let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect(); - consumer.subscribe(&topics).context(SubscribeSnafu)?; Ok((consumer, callback_rx)) }