diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index f88b8d2706..8fa4e2dff7 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -7,7 +7,7 @@ use gazette::journal::{ReadJsonLine, ReadJsonLines}; use gazette::{broker, journal, uuid}; use kafka_protocol::records::Compression; use lz4_flex::frame::BlockMode; -use std::time::Duration; +use std::time::{Duration, Instant}; pub struct Read { /// Journal offset to be served by this Read. @@ -30,6 +30,15 @@ pub struct Read { journal_name: String, } +pub enum BatchResult { + /// Read some docs, stopped reading because reached target bytes + TargetExceededBeforeTimeout(bytes::Bytes), + /// Read some docs, stopped reading because reached timeout + TimeoutExceededBeforeTarget(bytes::Bytes), + /// Read no docs, stopped reading because reached timeout + TimeoutNoData, +} + impl Read { pub fn new( client: journal::Client, @@ -74,7 +83,11 @@ impl Read { } #[tracing::instrument(skip_all,fields(journal_name=self.journal_name))] - pub async fn next_batch(mut self, target_bytes: usize) -> anyhow::Result<(Self, bytes::Bytes)> { + pub async fn next_batch( + mut self, + target_bytes: usize, + timeout: Instant, + ) -> anyhow::Result<(Self, BatchResult)> { use kafka_protocol::records::{ Compression, Record, RecordBatchEncoder, RecordEncodeOptions, TimestampType, }; @@ -90,15 +103,22 @@ impl Read { let mut has_had_parsing_error = false; let mut transient_errors = 0; + let timeout = tokio::time::sleep_until(timeout.into()); + let timeout = futures::future::maybe_done(timeout); + tokio::pin!(timeout); + + let mut did_timeout = false; + while records_bytes < target_bytes { let read = match tokio::select! { biased; // Attempt to read before yielding. read = self.stream.next() => read, - () = std::future::ready(()), if records_bytes != 0 => { - break; // Yield if we have records and the stream isn't ready. - } + _ = &mut timeout => { + did_timeout = true; + break; // Yield if we reach a timeout + }, } { None => bail!("blocking gazette client read never returns EOF"), Some(resp) => match resp { @@ -277,7 +297,17 @@ impl Read { metrics::counter!("dekaf_bytes_read", "journal_name" => self.journal_name.to_owned()) .increment(records_bytes as u64); - Ok((self, buf.freeze())) + Ok(( + self, + match (records.len() > 0, did_timeout) { + (false, true) => BatchResult::TimeoutNoData, + (true, true) => BatchResult::TimeoutExceededBeforeTarget(buf.freeze()), + (true, false) => BatchResult::TargetExceededBeforeTimeout(buf.freeze()), + (false, false) => { + unreachable!("shouldn't be able see no documents, and also not timeout") + } + }, + )) } } diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index f7d6c2732b..7ba2e34ff0 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -1,8 +1,8 @@ use super::{App, Collection, Read}; use crate::{ connector::DekafConfig, from_downstream_topic_name, from_upstream_topic_name, - to_downstream_topic_name, to_upstream_topic_name, topology::fetch_all_collection_names, - Authenticated, + read::BatchResult, to_downstream_topic_name, to_upstream_topic_name, + topology::fetch_all_collection_names, Authenticated, }; use anyhow::Context; use bytes::{BufMut, BytesMut}; @@ -18,7 +18,6 @@ use kafka_protocol::{ protocol::{buf::ByteBuf, Decodable, Encodable, Message, StrBytes}, }; use std::{ - cmp::max, collections::HashMap, time::{SystemTime, UNIX_EPOCH}, }; @@ -28,7 +27,7 @@ use tracing::instrument; struct PendingRead { offset: i64, // Journal offset to be completed by this PendingRead. last_write_head: i64, // Most-recent observed journal write head. - handle: tokio_util::task::AbortOnDropHandle>, + handle: tokio_util::task::AbortOnDropHandle>, } pub struct Session { @@ -368,10 +367,8 @@ impl Session { .as_ref() .ok_or(anyhow::anyhow!("Session not authenticated"))?; - let timeout_duration = std::time::Duration::from_millis(max_wait_ms as u64); - let timeout = tokio::time::sleep(timeout_duration); - let timeout = futures::future::maybe_done(timeout); - tokio::pin!(timeout); + let timeout_at = + std::time::Instant::now() + std::time::Duration::from_millis(max_wait_ms as u64); let mut hit_timeout = false; @@ -414,7 +411,7 @@ impl Session { offset: fetch_offset, last_write_head: fetch_offset, handle: tokio_util::task::AbortOnDropHandle::new(tokio::spawn( - read.next_batch(partition_request.partition_max_bytes as usize), + read.next_batch(partition_request.partition_max_bytes as usize, timeout_at), )), }; @@ -457,29 +454,32 @@ impl Session { continue; }; - let start_time = SystemTime::now(); - - let (had_timeout, batch) = if let Some((read, batch)) = tokio::select! { - biased; // Prefer to complete a pending read. - read = &mut pending.handle => Some(read??), - _ = &mut timeout => None, - } { - pending.offset = read.offset; - pending.last_write_head = read.last_write_head; - pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn( - read.next_batch(partition_request.partition_max_bytes as usize), - )); - (false, batch) - } else { - let hang_time = SystemTime::now().duration_since(start_time)?; - tracing::debug!( - topic = ?key.0, - partition = key.1, - timeout = ?timeout_duration, - ?hang_time, - "Timed out serving Fetch" - ); - (true, bytes::Bytes::new()) + let (read, batch) = (&mut pending.handle).await??; + pending.offset = read.offset; + pending.last_write_head = read.last_write_head; + pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn( + read.next_batch(partition_request.partition_max_bytes as usize, timeout_at), + )); + + let (had_timeout, batch) = match batch { + BatchResult::TargetExceededBeforeTimeout(b) => (false, Some(b)), + BatchResult::TimeoutExceededBeforeTarget(b) => { + // tracing::debug!( + // read_bytes=b.len(), + // requested_bytes=partition_request.partition_max_bytes, + // timeout=?std::time::Duration::from_millis(max_wait_ms as u64), + // topic=?key.0, + // partition=key.1, "Read some data, but timed out before reaching requested chunk size"); + (true, Some(b)) + } + BatchResult::TimeoutNoData => { + // tracing::debug!( + // requested_bytes=partition_request.partition_max_bytes, + // timeout=?std::time::Duration::from_millis(max_wait_ms as u64), + // topic=?key.0, + // partition=key.1, "Timed out before reading any data at all"); + (true, None) + } }; if had_timeout { @@ -489,7 +489,7 @@ impl Session { partition_responses.push( PartitionData::default() .with_partition_index(partition_request.partition) - .with_records(Some(batch)) + .with_records(batch.to_owned()) .with_high_watermark(pending.last_write_head) // Map to kafka cursor. .with_last_stable_offset(pending.last_write_head), ); @@ -504,7 +504,7 @@ impl Session { Ok(messages::FetchResponse::default() .with_session_id(session_id) - .with_throttle_time_ms(if hit_timeout { 5000 } else { 0 }) + .with_throttle_time_ms(if hit_timeout { 10000 } else { 0 }) .with_responses(topic_responses)) }