Skip to content

Commit

Permalink
dekaf: Remove connection healthcheck as FindCoordinator takes ~3-5 …
Browse files Browse the repository at this point in the history
…seconds to respond
  • Loading branch information
jshearer committed Oct 4, 2024
1 parent 4864c05 commit 7eb18e4
Showing 1 changed file with 4 additions and 10 deletions.
14 changes: 4 additions & 10 deletions crates/dekaf/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,10 @@ impl deadpool::managed::Manager for KafkaConnectionParams {

async fn recycle(
&self,
conn: &mut BoxedKafkaConnection,
_conn: &mut BoxedKafkaConnection,
_: &deadpool::managed::Metrics,
) -> deadpool::managed::RecycleResult<anyhow::Error> {
// Other than auth, Kafka connections themselves are stateless
// so the only thing we need to do when recycling a connection
// is to confirm that it's still connected.
get_versions(conn).await.map(|_| ()).map_err(|e| {
tracing::warn!(err=?e, broker=self.broker_url, "Connection failed healthcheck");
deadpool::managed::RecycleError::Backend(e)
})
Ok(())
}
}

Expand Down Expand Up @@ -375,9 +369,9 @@ impl KafkaApiClient {
// It seems that after running for a while, connections can get into
// a broken state where every response returns an error. This, plus
// the healthcheck when recycling a connection solves that problem.
let reap_interval = Duration::from_secs(5);
let reap_interval = Duration::from_secs(30);
let max_age = Duration::from_secs(60 * 30);
let max_idle = Duration::from_secs(0);
let max_idle = Duration::from_secs(60 * 5);
let reaper = tokio_util::task::AbortOnDropHandle::new(tokio::spawn({
let pool = pool.clone();
let broker_url = broker_url.to_string();
Expand Down

0 comments on commit 7eb18e4

Please sign in to comment.