From e1362fd16bb6c4cea971ecb9c1f34e2d9c924f31 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Mon, 7 Oct 2024 12:45:58 -0400 Subject: [PATCH] dekaf: Request clients to throttle if we hit a timeout --- crates/dekaf/src/read.rs | 42 ++++++++++++++++++++++++++++----- crates/dekaf/src/session.rs | 46 ++++++++++++++++++++----------------- 2 files changed, 61 insertions(+), 27 deletions(-) diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index f88b8d2706..8fa4e2dff7 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -7,7 +7,7 @@ use gazette::journal::{ReadJsonLine, ReadJsonLines}; use gazette::{broker, journal, uuid}; use kafka_protocol::records::Compression; use lz4_flex::frame::BlockMode; -use std::time::Duration; +use std::time::{Duration, Instant}; pub struct Read { /// Journal offset to be served by this Read. @@ -30,6 +30,15 @@ pub struct Read { journal_name: String, } +pub enum BatchResult { + /// Read some docs, stopped reading because reached target bytes + TargetExceededBeforeTimeout(bytes::Bytes), + /// Read some docs, stopped reading because reached timeout + TimeoutExceededBeforeTarget(bytes::Bytes), + /// Read no docs, stopped reading because reached timeout + TimeoutNoData, +} + impl Read { pub fn new( client: journal::Client, @@ -74,7 +83,11 @@ impl Read { } #[tracing::instrument(skip_all,fields(journal_name=self.journal_name))] - pub async fn next_batch(mut self, target_bytes: usize) -> anyhow::Result<(Self, bytes::Bytes)> { + pub async fn next_batch( + mut self, + target_bytes: usize, + timeout: Instant, + ) -> anyhow::Result<(Self, BatchResult)> { use kafka_protocol::records::{ Compression, Record, RecordBatchEncoder, RecordEncodeOptions, TimestampType, }; @@ -90,15 +103,22 @@ impl Read { let mut has_had_parsing_error = false; let mut transient_errors = 0; + let timeout = tokio::time::sleep_until(timeout.into()); + let timeout = futures::future::maybe_done(timeout); + tokio::pin!(timeout); + + let mut did_timeout = false; + while records_bytes < target_bytes { let read = match tokio::select! { biased; // Attempt to read before yielding. read = self.stream.next() => read, - () = std::future::ready(()), if records_bytes != 0 => { - break; // Yield if we have records and the stream isn't ready. - } + _ = &mut timeout => { + did_timeout = true; + break; // Yield if we reach a timeout + }, } { None => bail!("blocking gazette client read never returns EOF"), Some(resp) => match resp { @@ -277,7 +297,17 @@ impl Read { metrics::counter!("dekaf_bytes_read", "journal_name" => self.journal_name.to_owned()) .increment(records_bytes as u64); - Ok((self, buf.freeze())) + Ok(( + self, + match (records.len() > 0, did_timeout) { + (false, true) => BatchResult::TimeoutNoData, + (true, true) => BatchResult::TimeoutExceededBeforeTarget(buf.freeze()), + (true, false) => BatchResult::TargetExceededBeforeTimeout(buf.freeze()), + (false, false) => { + unreachable!("shouldn't be able see no documents, and also not timeout") + } + }, + )) } } diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 0a36aa5542..053903a3f2 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -1,8 +1,8 @@ use super::{App, Collection, Read}; use crate::{ connector::DekafConfig, from_downstream_topic_name, from_upstream_topic_name, - to_downstream_topic_name, to_upstream_topic_name, topology::fetch_all_collection_names, - Authenticated, + read::BatchResult, to_downstream_topic_name, to_upstream_topic_name, + topology::fetch_all_collection_names, Authenticated, }; use anyhow::Context; use bytes::{BufMut, BytesMut}; @@ -27,7 +27,7 @@ use tracing::instrument; struct PendingRead { offset: i64, // Journal offset to be completed by this PendingRead. last_write_head: i64, // Most-recent observed journal write head. - handle: tokio_util::task::AbortOnDropHandle>, + handle: tokio_util::task::AbortOnDropHandle>, } pub struct Session { @@ -365,9 +365,10 @@ impl Session { .as_ref() .ok_or(anyhow::anyhow!("Session not authenticated"))?; - let timeout = tokio::time::sleep(std::time::Duration::from_millis(max_wait_ms as u64)); - let timeout = futures::future::maybe_done(timeout); - tokio::pin!(timeout); + let timeout_at = + std::time::Instant::now() + std::time::Duration::from_millis(max_wait_ms as u64); + + let mut hit_timeout = false; // Start reads for all partitions which aren't already pending. for topic_request in &topic_requests { @@ -408,7 +409,7 @@ impl Session { offset: fetch_offset, last_write_head: fetch_offset, handle: tokio_util::task::AbortOnDropHandle::new(tokio::spawn( - read.next_batch(partition_request.partition_max_bytes as usize), + read.next_batch(partition_request.partition_max_bytes as usize, timeout_at), )), }; @@ -451,25 +452,27 @@ impl Session { continue; }; - let batch = if let Some((read, batch)) = tokio::select! { - biased; // Prefer to complete a pending read. - read = &mut pending.handle => Some(read??), - _ = &mut timeout => None, - } { - pending.offset = read.offset; - pending.last_write_head = read.last_write_head; - pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn( - read.next_batch(partition_request.partition_max_bytes as usize), - )); - batch - } else { - bytes::Bytes::new() + let (read, batch) = (&mut pending.handle).await??; + pending.offset = read.offset; + pending.last_write_head = read.last_write_head; + pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn( + read.next_batch(partition_request.partition_max_bytes as usize, timeout_at), + )); + + let (timeout, batch) = match batch { + BatchResult::TargetExceededBeforeTimeout(b) => (false, Some(b)), + BatchResult::TimeoutExceededBeforeTarget(b) => (true, Some(b)), + BatchResult::TimeoutNoData => (true, None), }; + if timeout { + hit_timeout = true + } + partition_responses.push( PartitionData::default() .with_partition_index(partition_request.partition) - .with_records(Some(batch)) + .with_records(batch.to_owned()) .with_high_watermark(pending.last_write_head) // Map to kafka cursor. .with_last_stable_offset(pending.last_write_head), ); @@ -484,6 +487,7 @@ impl Session { Ok(messages::FetchResponse::default() .with_session_id(session_id) + .with_throttle_time_ms(if hit_timeout { 10000 } else { 0 }) .with_responses(topic_responses)) }