From bb3e312b2d42eddb105e52920a441d415cebe8f8 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Mon, 21 Oct 2024 12:34:15 -0400 Subject: [PATCH 1/2] flow-client: Make sure to remove any potentially expired JWT from the client used to exchange a refresh token for an access token in `refresh_authorizations()` While running down some issues with Dekaf, namely frequent consumer group rebalances, I noticed that these rebalances correlated with Dekaf errors such as: ``` dekaf: error=failed to obtain access token Caused by: Unauthorized: {"code":"PGRST301","details":null,"hint":null,"message":"JWT expired"} ``` --- crates/flow-client/src/client.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/flow-client/src/client.rs b/crates/flow-client/src/client.rs index de5a2d72c6..1f5663bbf5 100644 --- a/crates/flow-client/src/client.rs +++ b/crates/flow-client/src/client.rs @@ -66,6 +66,13 @@ impl Client { } } + pub fn as_anonymous(self) -> Self { + Self { + user_access_token: None, + ..self + } + } + /// Build a fresh `gazette::journal::Client` and `gazette::shard::Client` /// There is a bug that causes these clients to hang under heavy/varied load, /// so until that bug is found+fixed, this is the work-around. @@ -331,10 +338,15 @@ pub async fn refresh_authorizations( access_token: String, refresh_token: Option, // Set iff the token was single-use. } + // We either never had an access token, or we had one and it expired, + // in which case the client may have an invalid access token configured. + // The `generate_access_token` RPC only needs the provided refresh token + // for authentication, so we should use an unauthenticated client to make + // the request. let Response { access_token, refresh_token: next_refresh_token, - } = api_exec::(client.rpc( + } = api_exec::(client.clone().as_anonymous().rpc( "generate_access_token", serde_json::json!({"refresh_token_id": id, "secret": secret}).to_string(), )) From 43413b46c7c55d89d3f689d6da6cf1770f745779 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Mon, 21 Oct 2024 12:48:14 -0400 Subject: [PATCH 2/2] flow-client: Rename `with_creds` to `with_user_access_token` to better represent its behavior --- crates/dekaf/src/lib.rs | 4 ++-- crates/flow-client/src/client.rs | 15 ++++----------- crates/flowctl/src/lib.rs | 2 +- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index c81c484962..3648532a93 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -72,7 +72,7 @@ impl Authenticated { self.client = self .client .clone() - .with_creds(Some(access)) + .with_user_access_token(Some(access)) .with_fresh_gazette_client(); } @@ -98,7 +98,7 @@ impl App { let client = self .client_base .clone() - .with_creds(Some(access.clone())) + .with_user_access_token(Some(access.clone())) .with_fresh_gazette_client(); let claims = flow_client::client::client_claims(&client)?; diff --git a/crates/flow-client/src/client.rs b/crates/flow-client/src/client.rs index 1f5663bbf5..e41a143923 100644 --- a/crates/flow-client/src/client.rs +++ b/crates/flow-client/src/client.rs @@ -59,16 +59,9 @@ impl Client { } } - pub fn with_creds(self, user_access_token: Option) -> Self { + pub fn with_user_access_token(self, user_access_token: Option) -> Self { Self { - user_access_token: user_access_token.or(self.user_access_token), - ..self - } - } - - pub fn as_anonymous(self) -> Self { - Self { - user_access_token: None, + user_access_token, ..self } } @@ -319,7 +312,7 @@ pub async fn refresh_authorizations( (Some(access), None) => { // We have an access token but no refresh token. Create one. let refresh_token = api_exec::( - client.clone().with_creds(Some(access.to_owned())).rpc( + client.clone().with_user_access_token(Some(access.to_owned())).rpc( "create_refresh_token", serde_json::json!({"multi_use": true, "valid_for": "90d", "detail": "Created by flowctl"}) .to_string(), @@ -346,7 +339,7 @@ pub async fn refresh_authorizations( let Response { access_token, refresh_token: next_refresh_token, - } = api_exec::(client.clone().as_anonymous().rpc( + } = api_exec::(client.clone().with_user_access_token(None).rpc( "generate_access_token", serde_json::json!({"refresh_token_id": id, "secret": secret}).to_string(), )) diff --git a/crates/flowctl/src/lib.rs b/crates/flowctl/src/lib.rs index b7e10f2360..c705724736 100644 --- a/crates/flowctl/src/lib.rs +++ b/crates/flowctl/src/lib.rs @@ -148,7 +148,7 @@ impl Cli { config.user_access_token = Some(access.to_owned()); config.user_refresh_token = Some(refresh.to_owned()); - anon_client.with_creds(Some(access)) + anon_client.with_user_access_token(Some(access)) } Err(err) => { tracing::debug!(?err, "Error refreshing credentials");