From 476016b28890df879789c5408dfab5c4eb80c33e Mon Sep 17 00:00:00 2001 From: j chesley <42561540+jches@users.noreply.github.com> Date: Wed, 26 Jun 2024 09:09:58 -0400 Subject: [PATCH] fix(kafka source): consumer subscribe in main kafka source task (#20698) * fix(kafka source): consumer subscribe in main kafka source task This moves the consumer.subscribe(topics) call into the main kafka source loop, to ensure that callback handlers will be set up whenever the consumer is subscribed. In `vector validate` mode for example, the consumer sometimes receives partition assignments, and the consumer invokes pre_rebalance handlers. The main consumer task is not polled in that scenario, so callback messages sent to the coordination task are never handled and the pre_rebalance handler deadlocks. Moving the subscribe call into the main loop means that the consumer will not join the group and will not get a partition assignment unless the main source task is being polled. * fmt --- changelog.d/20687-kafka_source_validate_deadlock.fix.md | 3 +++ src/sources/kafka.rs | 8 ++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) create mode 100644 changelog.d/20687-kafka_source_validate_deadlock.fix.md diff --git a/changelog.d/20687-kafka_source_validate_deadlock.fix.md b/changelog.d/20687-kafka_source_validate_deadlock.fix.md new file mode 100644 index 0000000000000..8d562b144e5f2 --- /dev/null +++ b/changelog.d/20687-kafka_source_validate_deadlock.fix.md @@ -0,0 +1,3 @@ +The kafka source does not deadlock or cause consumer group rebalancing during `vector validate`. + +authors: jches diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 91d55ab0bdf1a..15f7315721c81 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -443,6 +443,12 @@ async fn kafka_source( // EOF signal allowing the coordination task to tell the kafka client task when all partitions have reached EOF let (eof_tx, eof_rx) = eof.then(oneshot::channel::<()>).unzip(); + let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect(); + if let Err(e) = consumer.subscribe(&topics).context(SubscribeSnafu) { + error!("{}", e); + return Err(()); + } + let coordination_task = { let span = span.clone(); let consumer = Arc::clone(&consumer); @@ -1233,8 +1239,6 @@ fn create_consumer( Span::current(), )) .context(CreateSnafu)?; - let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect(); - consumer.subscribe(&topics).context(SubscribeSnafu)?; Ok((consumer, callback_rx)) }