Skip to content

Commit

Permalink
dekaf: Add a bunch of descriptive timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Oct 7, 2024
1 parent 52defc3 commit 86745ef
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
25 changes: 15 additions & 10 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use kafka_protocol::{
},
protocol::{buf::ByteBuf, Decodable, Encodable, Message, StrBytes},
};
use std::sync::Arc;
use std::{
collections::HashMap,
time::{SystemTime, UNIX_EPOCH},
};
use std::{sync::Arc, time::Duration};
use tracing::instrument;

struct PendingRead {
Expand Down Expand Up @@ -184,15 +184,20 @@ impl Session {

// Concurrently fetch Collection instances for all requested topics.
let collections: anyhow::Result<Vec<(TopicName, Option<Collection>)>> =
futures::future::try_join_all(requests.into_iter().map(|topic| async move {
let maybe_collection = Collection::new(
client,
from_downstream_topic_name(topic.name.to_owned().unwrap_or_default()).as_str(),
)
.await?;
Ok((topic.name.unwrap_or_default(), maybe_collection))
}))
.await;
tokio::time::timeout(
Duration::from_secs(10),
futures::future::try_join_all(requests.into_iter().map(|topic| async move {
let maybe_collection = Collection::new(
client,
from_downstream_topic_name(topic.name.to_owned().unwrap_or_default())
.as_str(),
)
.await?;
Ok((topic.name.unwrap_or_default(), maybe_collection))
})),
)
.await
.map_err(|e| anyhow::anyhow!("Timed out loading metadata {e}"))?;

let mut topics = IndexMap::new();

Expand Down
17 changes: 13 additions & 4 deletions crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use anyhow::Context;
use futures::{StreamExt, TryStreamExt};
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use gazette::{broker, journal, uuid};
use proto_flow::flow;
use std::time::Duration;

/// Fetch the names of all collections which the current user may read.
/// Each is mapped into a kafka topic.
Expand Down Expand Up @@ -154,7 +155,11 @@ impl Collection {
}),
..Default::default()
};
let response = journal_client.list(request).await?;
let response = tokio::time::timeout(Duration::from_secs(5), journal_client.list(request))
.await
.map_err(|e| {
anyhow::anyhow!("timed out fetching partitions for {collection}: {e}")
})??;
let mut partitions = Vec::with_capacity(response.journals.len());

for journal in response.journals {
Expand Down Expand Up @@ -238,8 +243,12 @@ impl Collection {
client: &flow_client::Client,
collection: &str,
) -> anyhow::Result<journal::Client> {
let (_, journal_client) =
flow_client::fetch_collection_authorization(client, collection).await?;
let (_, journal_client) = tokio::time::timeout(
Duration::from_secs(5),
flow_client::fetch_collection_authorization(client, collection),
)
.map_err(|e| anyhow::anyhow!("timed out building journal client for {collection}: {e}"))
.await??;

Ok(journal_client)
}
Expand Down

0 comments on commit 86745ef

Please sign in to comment.