Skip to content

Commit

Permalink
dekaf: Request clients to throttle if we hit a timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Oct 7, 2024
1 parent 6325eeb commit 0cee0c3
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ impl Session {
let timeout = futures::future::maybe_done(timeout);
tokio::pin!(timeout);

let mut hit_timeout = false;

// Start reads for all partitions which aren't already pending.
for topic_request in &topic_requests {
let mut key = (from_downstream_topic_name(topic_request.topic.clone()), 0);
Expand Down Expand Up @@ -455,7 +457,7 @@ impl Session {
continue;
};

let batch = if let Some((read, batch)) = tokio::select! {
let (had_timeout, batch) = if let Some((read, batch)) = tokio::select! {
biased; // Prefer to complete a pending read.
read = &mut pending.handle => Some(read??),
_ = &mut timeout => None,
Expand All @@ -465,17 +467,15 @@ impl Session {
pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn(
read.next_batch(partition_request.partition_max_bytes as usize),
));
batch
(false, batch)
} else {
tracing::debug!(
topic = ?key.0,
partition = key.1,
timeout = ?timeout_duration,
"Timed out serving Fetch"
);
bytes::Bytes::new()
(true, bytes::Bytes::new())
};

if had_timeout {
hit_timeout = true
}

partition_responses.push(
PartitionData::default()
.with_partition_index(partition_request.partition)
Expand All @@ -494,6 +494,7 @@ impl Session {

Ok(messages::FetchResponse::default()
.with_session_id(session_id)
.with_throttle_time_ms(if hit_timeout { 5000 } else { 0 })
.with_responses(topic_responses))
}

Expand Down

0 comments on commit 0cee0c3

Please sign in to comment.