From 41e60b8ba71bbc2f4af97221611ed10c6f3f9eb2 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Mon, 30 Sep 2024 00:05:31 -0400 Subject: [PATCH] dekaf: Clean up and stub out location where bearer token will be checked --- Cargo.lock | 1 + crates/dekaf/Cargo.toml | 3 +-- crates/dekaf/src/connector.rs | 9 ++++++-- crates/dekaf/src/lib.rs | 39 ++++++++++++++++++++++++----------- crates/dekaf/src/registry.rs | 6 +++--- crates/dekaf/src/session.rs | 15 +++++++------- 6 files changed, 47 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6af06507f7..0518937246 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1850,6 +1850,7 @@ dependencies = [ "rustls 0.23.10", "rustls-native-certs", "rustls-pemfile 2.1.3", + "schemars", "serde", "serde_json", "simd-doc", diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index 046c6abe4a..e9acc91a4c 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -29,8 +29,6 @@ axum-extra = { workspace = true } axum-server = { workspace = true } base64 = { workspace = true } bytes = { workspace = true } -# TODO(jshearer): Upgrade every other usage of clap to v4 and change this to: -# clap = { workspace = true } clap = { workspace = true } crypto-common = { workspace = true } deadpool = { workspace = true } @@ -52,6 +50,7 @@ rsasl = { workspace = true } rustls = { workspace = true } rustls-native-certs = { workspace = true } rustls-pemfile = { workspace = true } +schemars = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } socket2 = { workspace = true } diff --git a/crates/dekaf/src/connector.rs b/crates/dekaf/src/connector.rs index 44d435aedb..dfce7911c6 100644 --- a/crates/dekaf/src/connector.rs +++ b/crates/dekaf/src/connector.rs @@ -1,18 +1,23 @@ use anyhow::{bail, Context}; use proto_flow::materialize; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; /// Configures the behavior of a whole dekaf task -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)] pub struct DekafConfig { /// Whether or not to expose topic names in a strictly Kafka-compliant format /// for systems that require it. Off by default. pub strict_topic_names: bool, + /// The password that will authenticate Kafka consumers to this task. + // TODO(jshearer): Uncomment when schemars 1.0 is out and we upgrade + // #[schemars(extend("secret" = true))] + pub token: String, } /// Configures a particular binding in a Dekaf-type materialization -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)] pub struct DekafResourceConfig { /// The exposed name of the topic that maps to this binding. This /// will be exposed through the Kafka metadata/discovery APIs. diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index 8d6e40afe2..309fb11727 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -23,6 +23,7 @@ mod api_client; pub use api_client::KafkaApiClient; use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser}; +use connector::DekafConfig; use flow_client::{DEFAULT_AGENT_URL, DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL}; use percent_encoding::{percent_decode_str, utf8_percent_encode}; use serde::{Deserialize, Serialize}; @@ -43,27 +44,25 @@ pub struct App { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ConfigOptions { +pub struct DeprecatedConfigOptions { #[serde(default = "bool::")] pub strict_topic_names: bool, } pub struct Authenticated { client: flow_client::Client, - user_config: ConfigOptions, + task_config: DekafConfig, claims: models::authorizations::ControlClaims, } impl App { #[tracing::instrument(level = "info", err(Debug, level = "warn"), skip(self, password))] async fn authenticate(&self, username: &str, password: &str) -> anyhow::Result { - let username_str = if username.contains("{") { - username.to_string() + let username = if let Ok(decoded) = decode_safe_name(username.to_string()) { + decoded } else { - decode_safe_name(username.to_string()).context("failed to decode username")? + username.to_string() }; - let config: ConfigOptions = serde_json::from_str(&username_str) - .context("failed to parse username as a JSON object")?; let mut client = flow_client::Client::new( DEFAULT_AGENT_URL.to_owned(), @@ -76,11 +75,27 @@ impl App { client.refresh().await?; let claims = client.claims()?; - Ok(Authenticated { - client, - user_config: config, - claims, - }) + if models::Materialization::regex().is_match(username.as_ref()) { + Ok(Authenticated { + client, + task_config: todo!("Fetch and unseal task config"), + claims, + }) + } else if username.contains("{") { + let config: DeprecatedConfigOptions = serde_json::from_str(&username) + .context("failed to parse username as a JSON object")?; + + Ok(Authenticated { + client, + task_config: DekafConfig { + strict_topic_names: config.strict_topic_names, + token: "".to_string(), + }, + claims, + }) + } else { + anyhow::bail!("Invalid username or password") + } } } diff --git a/crates/dekaf/src/registry.rs b/crates/dekaf/src/registry.rs index 62b5ae74da..45840e0897 100644 --- a/crates/dekaf/src/registry.rs +++ b/crates/dekaf/src/registry.rs @@ -1,5 +1,5 @@ use super::App; -use crate::{from_downstream_topic_name, to_downstream_topic_name, Authenticated}; +use crate::{from_downstream_topic_name, to_downstream_topic_name, topology, Authenticated}; use anyhow::Context; use axum::response::{IntoResponse, Response}; use axum_extra::headers; @@ -36,7 +36,7 @@ async fn all_subjects( wrap(async move { let Authenticated { client, - user_config, + task_config, .. } = app.authenticate(auth.username(), auth.password()).await?; @@ -47,7 +47,7 @@ async fn all_subjects( collections .into_iter() .map(|name| { - if user_config.strict_topic_names { + if task_config.strict_topic_names { to_downstream_topic_name(TopicName::from(StrBytes::from_string(name))) .to_string() } else { diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 046621de39..3032d4474d 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -1,7 +1,8 @@ use super::{App, Collection, Read}; use crate::{ - from_downstream_topic_name, from_upstream_topic_name, registry::fetch_all_collection_names, - to_downstream_topic_name, to_upstream_topic_name, Authenticated, ConfigOptions, + connector::DekafConfig, from_downstream_topic_name, from_upstream_topic_name, + to_downstream_topic_name, to_upstream_topic_name, topology::fetch_all_collection_names, + Authenticated, }; use anyhow::Context; use bytes::{BufMut, BytesMut}; @@ -35,7 +36,7 @@ pub struct Session { reads: HashMap<(TopicName, i32), PendingRead>, /// ID of the authenticated user user_id: Option, - config: Option, + task_config: Option, secret: String, } @@ -46,7 +47,7 @@ impl Session { client: None, reads: HashMap::new(), user_id: None, - config: None, + task_config: None, secret, } } @@ -83,11 +84,11 @@ impl Session { let response = match self.app.authenticate(authcid, password).await { Ok(Authenticated { client, - user_config, + task_config, claims, }) => { self.client.replace(client); - self.config.replace(user_config); + self.task_config.replace(task_config); self.user_id.replace(claims.sub.to_string()); let mut response = messages::SaslAuthenticateResponse::default(); @@ -1043,7 +1044,7 @@ impl Session { fn encode_topic_name(&self, name: String) -> TopicName { if self - .config + .task_config .as_ref() .expect("should have config already") .strict_topic_names