Skip to content

Commit

Permalink
dekaf: Temporarily implement and use `flow_client::Client::with_fresh…
Browse files Browse the repository at this point in the history
…_gazette_client()`
  • Loading branch information
jshearer committed Oct 7, 2024
1 parent 1896afc commit cd09a83
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
3 changes: 2 additions & 1 deletion crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ impl App {
let client = self
.client_base
.clone()
.with_creds(Some(access), Some(refresh));
.with_creds(Some(access), Some(refresh))
.with_fresh_gazette_client();

let claims = flow_client::client::client_claims(&client)?;

Expand Down
15 changes: 12 additions & 3 deletions crates/flow-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,18 @@ impl Client {
user_access_token: Option<String>,
user_refresh_token: Option<RefreshToken>,
) -> Self {
// Test whether a fresh set of Gazette clients solves the timeout issue
Self {
user_access_token: user_access_token.or(self.user_access_token),
user_refresh_token: user_refresh_token.or(self.user_refresh_token),
..self
}
}

/// Build a fresh `gazette::journal::Client` and `gazette::shard::Client`
/// There is a bug that causes these clients to hang under heavy/varied load,
/// so until that bug is found+fixed, this is the work-around.
#[deprecated]
pub fn with_fresh_gazette_client(self) -> Self {
let router = gazette::Router::new("local");

let journal_client = gazette::journal::Client::new(
Expand All @@ -82,8 +93,6 @@ impl Client {
router.clone(),
);
Self {
user_access_token: user_access_token.or(self.user_access_token),
user_refresh_token: user_refresh_token.or(self.user_refresh_token),
journal_client,
shard_client,
..self
Expand Down

0 comments on commit cd09a83

Please sign in to comment.