Skip to content

Commit

Permalink
dekaf: Temporarily implement and use `flow_client::Client::with_fresh…
Browse files Browse the repository at this point in the history
…_gazette_client()`

gRPC connections seem to lose stability after a while, this temporarily fixes that
  • Loading branch information
jshearer committed Oct 7, 2024
1 parent 86745ef commit 4064be8
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
3 changes: 2 additions & 1 deletion crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ impl App {
let client = self
.client_base
.clone()
.with_creds(Some(access), Some(refresh));
.with_creds(Some(access), Some(refresh))
.with_fresh_gazette_client();

let claims = flow_client::client::client_claims(&client)?;

Expand Down
24 changes: 24 additions & 0 deletions crates/flow-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,30 @@ impl Client {
}
}

/// Build a fresh `gazette::journal::Client` and `gazette::shard::Client`
/// There is a bug that causes these clients to hang under heavy/varied load,
/// so until that bug is found+fixed, this is the work-around.
#[deprecated]
pub fn with_fresh_gazette_client(self) -> Self {
let router = gazette::Router::new("local");

let journal_client = gazette::journal::Client::new(
String::new(),
gazette::Metadata::default(),
router.clone(),
);
let shard_client = gazette::shard::Client::new(
String::new(),
gazette::Metadata::default(),
router.clone(),
);
Self {
journal_client,
shard_client,
..self
}
}

pub fn pg_client(&self) -> postgrest::Postgrest {
if let Some(token) = &self.user_access_token {
return self
Expand Down

0 comments on commit 4064be8

Please sign in to comment.