diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index c81c484962..3648532a93 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -72,7 +72,7 @@ impl Authenticated { self.client = self .client .clone() - .with_creds(Some(access)) + .with_user_access_token(Some(access)) .with_fresh_gazette_client(); } @@ -98,7 +98,7 @@ impl App { let client = self .client_base .clone() - .with_creds(Some(access.clone())) + .with_user_access_token(Some(access.clone())) .with_fresh_gazette_client(); let claims = flow_client::client::client_claims(&client)?; diff --git a/crates/flow-client/src/client.rs b/crates/flow-client/src/client.rs index de5a2d72c6..e41a143923 100644 --- a/crates/flow-client/src/client.rs +++ b/crates/flow-client/src/client.rs @@ -59,9 +59,9 @@ impl Client { } } - pub fn with_creds(self, user_access_token: Option) -> Self { + pub fn with_user_access_token(self, user_access_token: Option) -> Self { Self { - user_access_token: user_access_token.or(self.user_access_token), + user_access_token, ..self } } @@ -312,7 +312,7 @@ pub async fn refresh_authorizations( (Some(access), None) => { // We have an access token but no refresh token. Create one. let refresh_token = api_exec::( - client.clone().with_creds(Some(access.to_owned())).rpc( + client.clone().with_user_access_token(Some(access.to_owned())).rpc( "create_refresh_token", serde_json::json!({"multi_use": true, "valid_for": "90d", "detail": "Created by flowctl"}) .to_string(), @@ -331,10 +331,15 @@ pub async fn refresh_authorizations( access_token: String, refresh_token: Option, // Set iff the token was single-use. } + // We either never had an access token, or we had one and it expired, + // in which case the client may have an invalid access token configured. + // The `generate_access_token` RPC only needs the provided refresh token + // for authentication, so we should use an unauthenticated client to make + // the request. let Response { access_token, refresh_token: next_refresh_token, - } = api_exec::(client.rpc( + } = api_exec::(client.clone().with_user_access_token(None).rpc( "generate_access_token", serde_json::json!({"refresh_token_id": id, "secret": secret}).to_string(), )) diff --git a/crates/flowctl/src/lib.rs b/crates/flowctl/src/lib.rs index b7e10f2360..c705724736 100644 --- a/crates/flowctl/src/lib.rs +++ b/crates/flowctl/src/lib.rs @@ -148,7 +148,7 @@ impl Cli { config.user_access_token = Some(access.to_owned()); config.user_refresh_token = Some(refresh.to_owned()); - anon_client.with_creds(Some(access)) + anon_client.with_user_access_token(Some(access)) } Err(err) => { tracing::debug!(?err, "Error refreshing credentials");