Skip to content

Commit

Permalink
dekaf: Refactor to handle timeouts better
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Oct 7, 2024
1 parent cc0ee81 commit 1896afc
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 40 deletions.
42 changes: 36 additions & 6 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use gazette::journal::{ReadJsonLine, ReadJsonLines};
use gazette::{broker, journal, uuid};
use kafka_protocol::records::Compression;
use lz4_flex::frame::BlockMode;
use std::time::Duration;
use std::time::{Duration, Instant};

pub struct Read {
/// Journal offset to be served by this Read.
Expand All @@ -30,6 +30,15 @@ pub struct Read {
journal_name: String,
}

pub enum BatchResult {
/// Read some docs, stopped reading because reached target bytes
TargetExceededBeforeTimeout(bytes::Bytes),
/// Read some docs, stopped reading because reached timeout
TimeoutExceededBeforeTarget(bytes::Bytes),
/// Read no docs, stopped reading because reached timeout
TimeoutNoData,
}

impl Read {
pub fn new(
client: journal::Client,
Expand Down Expand Up @@ -74,7 +83,11 @@ impl Read {
}

#[tracing::instrument(skip_all,fields(journal_name=self.journal_name))]
pub async fn next_batch(mut self, target_bytes: usize) -> anyhow::Result<(Self, bytes::Bytes)> {
pub async fn next_batch(
mut self,
target_bytes: usize,
timeout: Instant,
) -> anyhow::Result<(Self, BatchResult)> {
use kafka_protocol::records::{
Compression, Record, RecordBatchEncoder, RecordEncodeOptions, TimestampType,
};
Expand All @@ -90,15 +103,22 @@ impl Read {
let mut has_had_parsing_error = false;
let mut transient_errors = 0;

let timeout = tokio::time::sleep_until(timeout.into());
let timeout = futures::future::maybe_done(timeout);
tokio::pin!(timeout);

let mut did_timeout = false;

while records_bytes < target_bytes {
let read = match tokio::select! {
biased; // Attempt to read before yielding.

read = self.stream.next() => read,

() = std::future::ready(()), if records_bytes != 0 => {
break; // Yield if we have records and the stream isn't ready.
}
_ = &mut timeout => {
did_timeout = true;
break; // Yield if we reach a timeout
},
} {
None => bail!("blocking gazette client read never returns EOF"),
Some(resp) => match resp {
Expand Down Expand Up @@ -277,7 +297,17 @@ impl Read {
metrics::counter!("dekaf_bytes_read", "journal_name" => self.journal_name.to_owned())
.increment(records_bytes as u64);

Ok((self, buf.freeze()))
Ok((
self,
match (records.len() > 0, did_timeout) {
(false, true) => BatchResult::TimeoutNoData,
(true, true) => BatchResult::TimeoutExceededBeforeTarget(buf.freeze()),
(true, false) => BatchResult::TargetExceededBeforeTimeout(buf.freeze()),
(false, false) => {
unreachable!("shouldn't be able see no documents, and also not timeout")
}
},
))
}
}

Expand Down
68 changes: 34 additions & 34 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::{App, Collection, Read};
use crate::{
connector::DekafConfig, from_downstream_topic_name, from_upstream_topic_name,
to_downstream_topic_name, to_upstream_topic_name, topology::fetch_all_collection_names,
Authenticated,
read::BatchResult, to_downstream_topic_name, to_upstream_topic_name,
topology::fetch_all_collection_names, Authenticated,
};
use anyhow::Context;
use bytes::{BufMut, BytesMut};
Expand All @@ -18,7 +18,6 @@ use kafka_protocol::{
protocol::{buf::ByteBuf, Decodable, Encodable, Message, StrBytes},
};
use std::{
cmp::max,
collections::HashMap,
time::{SystemTime, UNIX_EPOCH},
};
Expand All @@ -28,7 +27,7 @@ use tracing::instrument;
struct PendingRead {
offset: i64, // Journal offset to be completed by this PendingRead.
last_write_head: i64, // Most-recent observed journal write head.
handle: tokio_util::task::AbortOnDropHandle<anyhow::Result<(Read, bytes::Bytes)>>,
handle: tokio_util::task::AbortOnDropHandle<anyhow::Result<(Read, BatchResult)>>,
}

pub struct Session {
Expand Down Expand Up @@ -368,10 +367,8 @@ impl Session {
.as_ref()
.ok_or(anyhow::anyhow!("Session not authenticated"))?;

let timeout_duration = std::time::Duration::from_millis(max_wait_ms as u64);
let timeout = tokio::time::sleep(timeout_duration);
let timeout = futures::future::maybe_done(timeout);
tokio::pin!(timeout);
let timeout_at =
std::time::Instant::now() + std::time::Duration::from_millis(max_wait_ms as u64);

let mut hit_timeout = false;

Expand Down Expand Up @@ -414,7 +411,7 @@ impl Session {
offset: fetch_offset,
last_write_head: fetch_offset,
handle: tokio_util::task::AbortOnDropHandle::new(tokio::spawn(
read.next_batch(partition_request.partition_max_bytes as usize),
read.next_batch(partition_request.partition_max_bytes as usize, timeout_at),
)),
};

Expand Down Expand Up @@ -457,29 +454,32 @@ impl Session {
continue;
};

let start_time = SystemTime::now();

let (had_timeout, batch) = if let Some((read, batch)) = tokio::select! {
biased; // Prefer to complete a pending read.
read = &mut pending.handle => Some(read??),
_ = &mut timeout => None,
} {
pending.offset = read.offset;
pending.last_write_head = read.last_write_head;
pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn(
read.next_batch(partition_request.partition_max_bytes as usize),
));
(false, batch)
} else {
let hang_time = SystemTime::now().duration_since(start_time)?;
tracing::debug!(
topic = ?key.0,
partition = key.1,
timeout = ?timeout_duration,
?hang_time,
"Timed out serving Fetch"
);
(true, bytes::Bytes::new())
let (read, batch) = (&mut pending.handle).await??;
pending.offset = read.offset;
pending.last_write_head = read.last_write_head;
pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn(
read.next_batch(partition_request.partition_max_bytes as usize, timeout_at),
));

let (had_timeout, batch) = match batch {
BatchResult::TargetExceededBeforeTimeout(b) => (false, Some(b)),
BatchResult::TimeoutExceededBeforeTarget(b) => {
// tracing::debug!(
// read_bytes=b.len(),
// requested_bytes=partition_request.partition_max_bytes,
// timeout=?std::time::Duration::from_millis(max_wait_ms as u64),
// topic=?key.0,
// partition=key.1, "Read some data, but timed out before reaching requested chunk size");
(true, Some(b))
}
BatchResult::TimeoutNoData => {
// tracing::debug!(
// requested_bytes=partition_request.partition_max_bytes,
// timeout=?std::time::Duration::from_millis(max_wait_ms as u64),
// topic=?key.0,
// partition=key.1, "Timed out before reading any data at all");
(true, None)
}
};

if had_timeout {
Expand All @@ -489,7 +489,7 @@ impl Session {
partition_responses.push(
PartitionData::default()
.with_partition_index(partition_request.partition)
.with_records(Some(batch))
.with_records(batch.to_owned())
.with_high_watermark(pending.last_write_head) // Map to kafka cursor.
.with_last_stable_offset(pending.last_write_head),
);
Expand All @@ -504,7 +504,7 @@ impl Session {

Ok(messages::FetchResponse::default()
.with_session_id(session_id)
.with_throttle_time_ms(if hit_timeout { 5000 } else { 0 })
.with_throttle_time_ms(if hit_timeout { 10000 } else { 0 })
.with_responses(topic_responses))
}

Expand Down

0 comments on commit 1896afc

Please sign in to comment.