diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 5a1cdcf748..0a36aa5542 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -17,11 +17,11 @@ use kafka_protocol::{ }, protocol::{buf::ByteBuf, Decodable, Encodable, Message, StrBytes}, }; -use std::sync::Arc; use std::{ collections::HashMap, time::{SystemTime, UNIX_EPOCH}, }; +use std::{sync::Arc, time::Duration}; use tracing::instrument; struct PendingRead { @@ -184,15 +184,20 @@ impl Session { // Concurrently fetch Collection instances for all requested topics. let collections: anyhow::Result)>> = - futures::future::try_join_all(requests.into_iter().map(|topic| async move { - let maybe_collection = Collection::new( - client, - from_downstream_topic_name(topic.name.to_owned().unwrap_or_default()).as_str(), - ) - .await?; - Ok((topic.name.unwrap_or_default(), maybe_collection)) - })) - .await; + tokio::time::timeout( + Duration::from_secs(10), + futures::future::try_join_all(requests.into_iter().map(|topic| async move { + let maybe_collection = Collection::new( + client, + from_downstream_topic_name(topic.name.to_owned().unwrap_or_default()) + .as_str(), + ) + .await?; + Ok((topic.name.unwrap_or_default(), maybe_collection)) + })), + ) + .await + .map_err(|e| anyhow::anyhow!("Timed out loading metadata {e}"))?; let mut topics = IndexMap::new(); diff --git a/crates/dekaf/src/topology.rs b/crates/dekaf/src/topology.rs index 5f6a47530f..34b885f52d 100644 --- a/crates/dekaf/src/topology.rs +++ b/crates/dekaf/src/topology.rs @@ -1,7 +1,8 @@ use anyhow::Context; -use futures::{StreamExt, TryStreamExt}; +use futures::{StreamExt, TryFutureExt, TryStreamExt}; use gazette::{broker, journal, uuid}; use proto_flow::flow; +use std::time::Duration; /// Fetch the names of all collections which the current user may read. /// Each is mapped into a kafka topic. @@ -154,7 +155,11 @@ impl Collection { }), ..Default::default() }; - let response = journal_client.list(request).await?; + let response = tokio::time::timeout(Duration::from_secs(5), journal_client.list(request)) + .await + .map_err(|e| { + anyhow::anyhow!("timed out fetching partitions for {collection}: {e}") + })??; let mut partitions = Vec::with_capacity(response.journals.len()); for journal in response.journals { @@ -238,8 +243,12 @@ impl Collection { client: &flow_client::Client, collection: &str, ) -> anyhow::Result { - let (_, journal_client) = - flow_client::fetch_collection_authorization(client, collection).await?; + let (_, journal_client) = tokio::time::timeout( + Duration::from_secs(5), + flow_client::fetch_collection_authorization(client, collection), + ) + .map_err(|e| anyhow::anyhow!("timed out building journal client for {collection}: {e}")) + .await??; Ok(journal_client) }