Skip to content

Commit

Permalink
dekaf: Clean up and stub out location where bearer token will be checked
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Sep 30, 2024
1 parent 308728f commit 41e60b8
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ axum-extra = { workspace = true }
axum-server = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
# TODO(jshearer): Upgrade every other usage of clap to v4 and change this to:
# clap = { workspace = true }
clap = { workspace = true }
crypto-common = { workspace = true }
deadpool = { workspace = true }
Expand All @@ -52,6 +50,7 @@ rsasl = { workspace = true }
rustls = { workspace = true }
rustls-native-certs = { workspace = true }
rustls-pemfile = { workspace = true }
schemars = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
socket2 = { workspace = true }
Expand Down
9 changes: 7 additions & 2 deletions crates/dekaf/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
use anyhow::{bail, Context};
use proto_flow::materialize;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

/// Configures the behavior of a whole dekaf task
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)]
pub struct DekafConfig {
/// Whether or not to expose topic names in a strictly Kafka-compliant format
/// for systems that require it. Off by default.
pub strict_topic_names: bool,
/// The password that will authenticate Kafka consumers to this task.
// TODO(jshearer): Uncomment when schemars 1.0 is out and we upgrade
// #[schemars(extend("secret" = true))]
pub token: String,
}

/// Configures a particular binding in a Dekaf-type materialization
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)]
pub struct DekafResourceConfig {
/// The exposed name of the topic that maps to this binding. This
/// will be exposed through the Kafka metadata/discovery APIs.
Expand Down
39 changes: 27 additions & 12 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod api_client;
pub use api_client::KafkaApiClient;

use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser};
use connector::DekafConfig;
use flow_client::{DEFAULT_AGENT_URL, DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL};
use percent_encoding::{percent_decode_str, utf8_percent_encode};
use serde::{Deserialize, Serialize};
Expand All @@ -43,27 +44,25 @@ pub struct App {
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConfigOptions {
pub struct DeprecatedConfigOptions {
#[serde(default = "bool::<false>")]
pub strict_topic_names: bool,
}

pub struct Authenticated {
client: flow_client::Client,
user_config: ConfigOptions,
task_config: DekafConfig,
claims: models::authorizations::ControlClaims,
}

impl App {
#[tracing::instrument(level = "info", err(Debug, level = "warn"), skip(self, password))]
async fn authenticate(&self, username: &str, password: &str) -> anyhow::Result<Authenticated> {
let username_str = if username.contains("{") {
username.to_string()
let username = if let Ok(decoded) = decode_safe_name(username.to_string()) {
decoded
} else {
decode_safe_name(username.to_string()).context("failed to decode username")?
username.to_string()
};
let config: ConfigOptions = serde_json::from_str(&username_str)
.context("failed to parse username as a JSON object")?;

let mut client = flow_client::Client::new(
DEFAULT_AGENT_URL.to_owned(),
Expand All @@ -76,11 +75,27 @@ impl App {
client.refresh().await?;
let claims = client.claims()?;

Ok(Authenticated {
client,
user_config: config,
claims,
})
if models::Materialization::regex().is_match(username.as_ref()) {
Ok(Authenticated {
client,
task_config: todo!("Fetch and unseal task config"),
claims,
})
} else if username.contains("{") {
let config: DeprecatedConfigOptions = serde_json::from_str(&username)
.context("failed to parse username as a JSON object")?;

Ok(Authenticated {
client,
task_config: DekafConfig {
strict_topic_names: config.strict_topic_names,
token: "".to_string(),
},
claims,
})
} else {
anyhow::bail!("Invalid username or password")
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/dekaf/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::App;
use crate::{from_downstream_topic_name, to_downstream_topic_name, Authenticated};
use crate::{from_downstream_topic_name, to_downstream_topic_name, topology, Authenticated};
use anyhow::Context;
use axum::response::{IntoResponse, Response};
use axum_extra::headers;
Expand Down Expand Up @@ -36,7 +36,7 @@ async fn all_subjects(
wrap(async move {
let Authenticated {
client,
user_config,
task_config,
..
} = app.authenticate(auth.username(), auth.password()).await?;

Expand All @@ -47,7 +47,7 @@ async fn all_subjects(
collections
.into_iter()
.map(|name| {
if user_config.strict_topic_names {
if task_config.strict_topic_names {
to_downstream_topic_name(TopicName::from(StrBytes::from_string(name)))
.to_string()
} else {
Expand Down
15 changes: 8 additions & 7 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::{App, Collection, Read};
use crate::{
from_downstream_topic_name, from_upstream_topic_name, registry::fetch_all_collection_names,
to_downstream_topic_name, to_upstream_topic_name, Authenticated, ConfigOptions,
connector::DekafConfig, from_downstream_topic_name, from_upstream_topic_name,
to_downstream_topic_name, to_upstream_topic_name, topology::fetch_all_collection_names,
Authenticated,
};
use anyhow::Context;
use bytes::{BufMut, BytesMut};
Expand Down Expand Up @@ -35,7 +36,7 @@ pub struct Session {
reads: HashMap<(TopicName, i32), PendingRead>,
/// ID of the authenticated user
user_id: Option<String>,
config: Option<ConfigOptions>,
task_config: Option<DekafConfig>,
secret: String,
}

Expand All @@ -46,7 +47,7 @@ impl Session {
client: None,
reads: HashMap::new(),
user_id: None,
config: None,
task_config: None,
secret,
}
}
Expand Down Expand Up @@ -83,11 +84,11 @@ impl Session {
let response = match self.app.authenticate(authcid, password).await {
Ok(Authenticated {
client,
user_config,
task_config,
claims,
}) => {
self.client.replace(client);
self.config.replace(user_config);
self.task_config.replace(task_config);
self.user_id.replace(claims.sub.to_string());

let mut response = messages::SaslAuthenticateResponse::default();
Expand Down Expand Up @@ -1043,7 +1044,7 @@ impl Session {

fn encode_topic_name(&self, name: String) -> TopicName {
if self
.config
.task_config
.as_ref()
.expect("should have config already")
.strict_topic_names
Expand Down

0 comments on commit 41e60b8

Please sign in to comment.