Skip to content

Commit

Permalink
Factor out flow-client from flowctl
Browse files Browse the repository at this point in the history
Largely retain the same functionality as `flowctl::Client`, just in a new home so that it can be shared without introducing inconvenient dependencies
  • Loading branch information
jshearer committed Sep 30, 2024
1 parent f71454b commit 308728f
Show file tree
Hide file tree
Showing 21 changed files with 353 additions and 267 deletions.
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ aes-siv = { workspace = true }
allocator = { path = "../allocator" }
avro = { path = "../avro" }
doc = { path = "../doc" }
# flowctl = { path = "../flowctl" }
flow-client = { path = "../flow-client" }
gazette = { path = "../gazette" }
labels = { path = "../labels" }
models = { path = "../models" }
ops = { path = "../ops" }
proto-flow = { path = "../proto-flow" }
proto-gazette = { path = "../proto-gazette" }
Expand Down
66 changes: 13 additions & 53 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod api_client;
pub use api_client::KafkaApiClient;

use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser};
use itertools::Itertools;
use flow_client::{DEFAULT_AGENT_URL, DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL};
use percent_encoding::{percent_decode_str, utf8_percent_encode};
use serde::{Deserialize, Serialize};
use serde_json::de;
Expand All @@ -49,17 +49,9 @@ pub struct ConfigOptions {
}

pub struct Authenticated {
client: postgrest::Postgrest,
client: flow_client::Client,
user_config: ConfigOptions,
claims: JwtClaims,
}

#[derive(Deserialize)]
struct JwtClaims {
/// Unix timestamp in seconds when this token will expire
exp: u64,
/// ID of the user that owns this token
sub: String,
claims: models::authorizations::ControlClaims,
}

impl App {
Expand All @@ -73,51 +65,19 @@ impl App {
let config: ConfigOptions = serde_json::from_str(&username_str)
.context("failed to parse username as a JSON object")?;

#[derive(serde::Deserialize)]
struct RefreshToken {
id: String,
secret: String,
}
let RefreshToken {
id: refresh_token_id,
secret,
} = serde_json::from_slice(&base64::decode(password).context("password is not base64")?)
.context("failed to decode refresh token from password")?;

tracing::info!(refresh_token_id, "authenticating refresh token");
let mut client = flow_client::Client::new(
DEFAULT_AGENT_URL.to_owned(),
DEFAULT_PG_PUBLIC_TOKEN.to_string(),
DEFAULT_PG_URL.to_owned(),
None,
Some(String::from_utf8(base64::decode(password)?.to_vec())?.try_into()?),
);

#[derive(serde::Deserialize)]
struct AccessToken {
access_token: String,
}
let AccessToken { access_token } = self
.anon_client
.rpc(
"generate_access_token",
serde_json::json!({"refresh_token_id": refresh_token_id, "secret": secret})
.to_string(),
)
.execute()
.await
.and_then(|r| r.error_for_status())
.context("generating access token")?
.json()
.await?;

let authenticated_client = self
.anon_client
.clone()
.insert_header("Authorization", format!("Bearer {access_token}"));

let claims = base64::decode(access_token.split(".").collect_vec()[1])
.map_err(anyhow::Error::from)
.and_then(|decoded| {
de::from_slice::<JwtClaims>(&decoded[..]).map_err(anyhow::Error::from)
})
.context("Failed to parse access token claims")?;
client.refresh().await?;
let claims = client.claims()?;

Ok(Authenticated {
client: authenticated_client,
client,
user_config: config,
claims,
})
Expand Down
54 changes: 26 additions & 28 deletions crates/dekaf/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,31 @@ async fn all_subjects(
>,
) -> Response {
wrap(async move {
// let Authenticated {
// client,
// user_config,
// ..
// } = app.authenticate(auth.username(), auth.password()).await?;

let r: Vec<bool> = vec![];
Ok(r)
// fetch_all_collection_names(&client)
// .await
// .context("failed to list collections from the control plane")
// .map(|collections| {
// collections
// .into_iter()
// .map(|name| {
// if user_config.strict_topic_names {
// to_downstream_topic_name(TopicName::from(StrBytes::from_string(name)))
// .to_string()
// } else {
// name
// }
// })
// .flat_map(|collection| {
// vec![format!("{collection}-key"), format!("{collection}-value")]
// })
// .collect_vec()
// })
let Authenticated {
client,
user_config,
..
} = app.authenticate(auth.username(), auth.password()).await?;

topology::fetch_all_collection_names(&client.pg_client())
.await
.context("failed to list collections from the control plane")
.map(|collections| {
collections
.into_iter()
.map(|name| {
if user_config.strict_topic_names {
to_downstream_topic_name(TopicName::from(StrBytes::from_string(name)))
.to_string()
} else {
name
}
})
.flat_map(|collection| {
vec![format!("{collection}-key"), format!("{collection}-value")]
})
.collect_vec()
})
})
.await
}
Expand Down Expand Up @@ -97,7 +95,7 @@ async fn get_subject_latest(
.with_context(|| format!("collection {collection} does not exist"))?;

let (key_id, value_id) = collection
.registered_schema_ids(&client)
.registered_schema_ids(&client.pg_client())
.await
.context("failed to resolve registered Avro schemas")?;

Expand Down
45 changes: 31 additions & 14 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{App, Collection, Read};
use crate::{
from_downstream_topic_name, from_upstream_topic_name, to_downstream_topic_name,
to_upstream_topic_name, Authenticated, ConfigOptions,
from_downstream_topic_name, from_upstream_topic_name, registry::fetch_all_collection_names,
to_downstream_topic_name, to_upstream_topic_name, Authenticated, ConfigOptions,
};
use anyhow::Context;
use bytes::{BufMut, BytesMut};
Expand Down Expand Up @@ -31,7 +31,7 @@ struct PendingRead {

pub struct Session {
app: Arc<App>,
client: postgrest::Postgrest,
client: Option<flow_client::Client>,
reads: HashMap<(TopicName, i32), PendingRead>,
/// ID of the authenticated user
user_id: Option<String>,
Expand All @@ -41,10 +41,9 @@ pub struct Session {

impl Session {
pub fn new(app: Arc<App>, secret: String) -> Self {
let client = app.anon_client.clone();
Self {
app,
client,
client: None,
reads: HashMap::new(),
user_id: None,
config: None,
Expand Down Expand Up @@ -87,9 +86,9 @@ impl Session {
user_config,
claims,
}) => {
self.client = client;
self.client.replace(client);
self.config.replace(user_config);
self.user_id.replace(claims.sub);
self.user_id.replace(claims.sub.to_string());

let mut response = messages::SaslAuthenticateResponse::default();
response.session_lifetime_ms = (1000
Expand Down Expand Up @@ -144,7 +143,14 @@ impl Session {
async fn metadata_all_topics(
&mut self,
) -> anyhow::Result<IndexMap<TopicName, MetadataResponseTopic>> {
let collections = vec![]; //fetch_all_collection_names(&self.client).await?;
let collections = fetch_all_collection_names(
&self
.client
.as_ref()
.ok_or(anyhow::anyhow!("Session not authenticated"))?
.pg_client(),
)
.await?;

tracing::debug!(collections=?ops::DebugJson(&collections), "fetched all collections");

Expand All @@ -170,7 +176,10 @@ impl Session {
&mut self,
requests: Vec<messages::metadata_request::MetadataRequestTopic>,
) -> anyhow::Result<IndexMap<TopicName, MetadataResponseTopic>> {
let client = &self.client;
let client = &self
.client
.as_ref()
.ok_or(anyhow::anyhow!("Session not authenticated"))?;

// Concurrently fetch Collection instances for all requested topics.
let collections: anyhow::Result<Vec<(TopicName, Option<Collection>)>> =
Expand Down Expand Up @@ -247,7 +256,10 @@ impl Session {
&mut self,
request: messages::ListOffsetsRequest,
) -> anyhow::Result<messages::ListOffsetsResponse> {
let client = &self.client;
let client = &self
.client
.as_ref()
.ok_or(anyhow::anyhow!("Session not authenticated"))?;

// Concurrently fetch Collection instances and offsets for all requested topics and partitions.
// Map each "topic" into Vec<(Partition Index, Option<(Journal Offset, Timestamp))>.
Expand Down Expand Up @@ -342,7 +354,11 @@ impl Session {
..
} = request;

let client = &self.client;
let client = &self
.client
.as_ref()
.ok_or(anyhow::anyhow!("Session not authenticated"))?;

let timeout = tokio::time::sleep(std::time::Duration::from_millis(max_wait_ms as u64));
let timeout = futures::future::maybe_done(timeout);
tokio::pin!(timeout);
Expand Down Expand Up @@ -370,10 +386,11 @@ impl Session {
tracing::debug!(collection = ?&key.0, partition=partition_request.partition, "Partition doesn't exist!");
continue; // Partition doesn't exist.
};
let (key_schema_id, value_schema_id) =
collection.registered_schema_ids(&client).await?;
let (key_schema_id, value_schema_id) = collection
.registered_schema_ids(&client.pg_client())
.await?;

let read = Read::new(
let read: Read = Read::new(
collection.journal_client.clone(),
&collection,
partition,
Expand Down
Loading

0 comments on commit 308728f

Please sign in to comment.