Skip to content

Commit

Permalink
fix(kafka source): consumer subscribe in main kafka source task (#20698)
Browse files Browse the repository at this point in the history
* fix(kafka source): consumer subscribe in main kafka source task

This moves the consumer.subscribe(topics) call into the main kafka
source loop, to ensure that callback handlers will be set up whenever
the consumer is subscribed.

In `vector validate` mode for example, the consumer sometimes receives
partition assignments, and the consumer invokes pre_rebalance handlers.
The main consumer task is not polled in that scenario, so callback
messages sent to the coordination task are never handled and the
pre_rebalance handler deadlocks. Moving the subscribe call into the main
loop means that the consumer will not join the group and will not get
a partition assignment unless the main source task is being polled.

* fmt
  • Loading branch information
jches authored Jun 26, 2024
1 parent 099b043 commit 476016b
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20687-kafka_source_validate_deadlock.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The kafka source does not deadlock or cause consumer group rebalancing during `vector validate`.

authors: jches
8 changes: 6 additions & 2 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,12 @@ async fn kafka_source(
// EOF signal allowing the coordination task to tell the kafka client task when all partitions have reached EOF
let (eof_tx, eof_rx) = eof.then(oneshot::channel::<()>).unzip();

let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect();
if let Err(e) = consumer.subscribe(&topics).context(SubscribeSnafu) {
error!("{}", e);
return Err(());
}

let coordination_task = {
let span = span.clone();
let consumer = Arc::clone(&consumer);
Expand Down Expand Up @@ -1233,8 +1239,6 @@ fn create_consumer(
Span::current(),
))
.context(CreateSnafu)?;
let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect();
consumer.subscribe(&topics).context(SubscribeSnafu)?;

Ok((consumer, callback_rx))
}
Expand Down

0 comments on commit 476016b

Please sign in to comment.