From cd09a83dd646260994dd5ebe3de4b4b721f5f426 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Mon, 7 Oct 2024 16:12:01 -0400 Subject: [PATCH] dekaf: Temporarily implement and use `flow_client::Client::with_fresh_gazette_client()` --- crates/dekaf/src/lib.rs | 3 ++- crates/flow-client/src/client.rs | 15 ++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index d7bfdc4d2c..6c144a71d0 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -72,7 +72,8 @@ impl App { let client = self .client_base .clone() - .with_creds(Some(access), Some(refresh)); + .with_creds(Some(access), Some(refresh)) + .with_fresh_gazette_client(); let claims = flow_client::client::client_claims(&client)?; diff --git a/crates/flow-client/src/client.rs b/crates/flow-client/src/client.rs index 1a0b64525e..170b25bccd 100644 --- a/crates/flow-client/src/client.rs +++ b/crates/flow-client/src/client.rs @@ -68,7 +68,18 @@ impl Client { user_access_token: Option, user_refresh_token: Option, ) -> Self { - // Test whether a fresh set of Gazette clients solves the timeout issue + Self { + user_access_token: user_access_token.or(self.user_access_token), + user_refresh_token: user_refresh_token.or(self.user_refresh_token), + ..self + } + } + + /// Build a fresh `gazette::journal::Client` and `gazette::shard::Client` + /// There is a bug that causes these clients to hang under heavy/varied load, + /// so until that bug is found+fixed, this is the work-around. + #[deprecated] + pub fn with_fresh_gazette_client(self) -> Self { let router = gazette::Router::new("local"); let journal_client = gazette::journal::Client::new( @@ -82,8 +93,6 @@ impl Client { router.clone(), ); Self { - user_access_token: user_access_token.or(self.user_access_token), - user_refresh_token: user_refresh_token.or(self.user_refresh_token), journal_client, shard_client, ..self