From 5ef5d6765d5a0f6d2490b6818279f35cb9868060 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Wed, 13 Nov 2024 13:09:13 -0500 Subject: [PATCH] dekaf: Introduce `IDLE_SESSION_TIMEOUT_MS` to allow configuring the idle session timeout --- Cargo.lock | 1 + crates/dekaf/Cargo.toml | 1 + crates/dekaf/src/main.rs | 13 ++++++++----- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c2027c150f..179603ba82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1881,6 +1881,7 @@ dependencies = [ "gazette", "hex", "hexdump", + "humantime", "itertools 0.10.5", "kafka-protocol", "labels", diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index 292cecb1a3..463523e2d8 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -27,6 +27,7 @@ axum = { workspace = true } axum-extra = { workspace = true } axum-server = { workspace = true } base64 = { workspace = true } +humantime = { workspace = true } bumpalo = { workspace = true } bytes = { workspace = true } clap = { workspace = true } diff --git a/crates/dekaf/src/main.rs b/crates/dekaf/src/main.rs index 20fac234ce..25a44b67a8 100644 --- a/crates/dekaf/src/main.rs +++ b/crates/dekaf/src/main.rs @@ -82,6 +82,10 @@ pub struct Cli { #[arg(long, env = "BROKER_CONNECTION_POOL_SIZE", default_value = "20")] broker_connection_pool_size: usize, + /// How long to wait for a message before closing an idle connection + #[arg(long, env = "IDLE_SESSION_TIMEOUT", value_parser = humantime::parse_duration, default_value = "30s")] + idle_session_timeout: std::time::Duration, + #[command(flatten)] tls: Option, } @@ -219,7 +223,7 @@ async fn main() -> anyhow::Result<()> { continue }; - tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone())); + tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, cli.idle_session_timeout_ms, stop.clone())); } _ = &mut stop => break, } @@ -240,7 +244,7 @@ async fn main() -> anyhow::Result<()> { }; socket.set_nodelay(true)?; - tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone())); + tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, cli.idle_session_timeout_ms, stop.clone())); } _ = &mut stop => break, } @@ -255,6 +259,7 @@ async fn serve( mut session: Session, socket: S, addr: std::net::SocketAddr, + idle_timeout: std::time::Duration, _stop: impl futures::Future, // TODO(johnny): stop. ) -> anyhow::Result<()> where @@ -278,7 +283,7 @@ where let result = async { loop { - let Some(frame) = tokio::time::timeout(SESSION_TIMEOUT, r.try_next()) + let Some(frame) = tokio::time::timeout(idle_timeout, r.try_next()) .await .context("timeout waiting for next session request")? .context("failed to read next session request")? @@ -336,5 +341,3 @@ fn validate_certificate_name( } return Ok(false); } - -const SESSION_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);