Skip to content

Commit

Permalink
dekaf: Implement a couple memory optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Oct 2, 2024
1 parent 6b056cb commit 271b730
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 35 deletions.
27 changes: 19 additions & 8 deletions crates/dekaf/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use kafka_protocol::{
protocol::{self, Decodable, Encodable, Request},
};
use rsasl::{config::SASLConfig, mechname::Mechname, prelude::SASLClient};
use rustls::RootCertStore;
use std::{boxed::Box, cell::Cell, collections::HashMap, fmt::Debug, io, time::Duration};
use std::{io::BufWriter, pin::Pin, sync::Arc};
use tokio::sync::OnceCell;
use tokio::sync::RwLock;
use tokio_rustls::rustls;
use tokio_util::{codec, task::AbortOnDropHandle};
Expand All @@ -24,22 +26,31 @@ type BoxedKafkaConnection = Pin<
>,
>;

static ROOT_CERT_STORE: OnceCell<Arc<RootCertStore>> = OnceCell::const_new();

#[tracing::instrument(skip_all)]
async fn async_connect(broker_url: &str) -> anyhow::Result<BoxedKafkaConnection> {
// Establish a TCP connection to the Kafka broker

let parsed_url = Url::parse(broker_url)?;

// This returns an Err indicating that the default provider is already set
// but without this call rustls crashes with the following error:
// `no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point`
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

let mut root_cert_store = rustls::RootCertStore::empty();
root_cert_store.add_parsable_certificates(rustls_native_certs::load_native_certs()?);
let root_certs = ROOT_CERT_STORE
.get_or_try_init(|| async {
// This returns an Err indicating that the default provider is already set
// but without this call rustls crashes with the following error:
// `no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point`
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

let mut certs = rustls::RootCertStore::empty();
certs.add_parsable_certificates(
rustls_native_certs::load_native_certs().expect("failed to load native certs"),
);
Ok::<Arc<RootCertStore>, anyhow::Error>(Arc::new(certs))
})
.await?;

let tls_config = rustls::ClientConfig::builder()
.with_root_certificates(root_cert_store)
.with_root_certificates(root_certs.to_owned())
.with_no_client_auth();

let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
Expand Down
16 changes: 8 additions & 8 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@ pub use api_client::KafkaApiClient;

use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser};
use connector::DekafConfig;
use flow_client::{
client::RefreshToken, DEFAULT_AGENT_URL, DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL,
};
use flow_client::{client::RefreshToken, DEFAULT_AGENT_URL};
use percent_encoding::{percent_decode_str, utf8_percent_encode};
use serde::{Deserialize, Serialize};
use serde_json::de;
use std::time::SystemTime;
use url::Url;

pub struct App {
/// Anonymous API client for the Estuary control plane.
pub anon_client: postgrest::Postgrest,
/// Hostname which is advertised for Kafka access.
pub advertise_host: String,
/// Port which is advertised for Kafka access.
Expand All @@ -43,6 +39,10 @@ pub struct App {
pub kafka_client: KafkaApiClient,
/// Secret used to secure Prometheus endpoint
pub secret: String,
/// Supabase endpoint
pub api_endpoint: Url,
/// Supabase api key
pub api_key: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -71,8 +71,8 @@ impl App {

let mut client = flow_client::Client::new(
DEFAULT_AGENT_URL.to_owned(),
DEFAULT_PG_PUBLIC_TOKEN.to_string(),
DEFAULT_PG_URL.to_owned(),
self.api_key.to_owned(),
self.api_endpoint.to_owned(),
None,
Some(refresh),
);
Expand Down
25 changes: 11 additions & 14 deletions crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use anyhow::{bail, Context};
use axum_server::tls_rustls::RustlsConfig;
use clap::{Args, Parser};
use dekaf::{KafkaApiClient, Session};
use flow_client::{DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL, LOCAL_PG_PUBLIC_TOKEN, LOCAL_PG_URL};
use futures::{FutureExt, TryStreamExt};
use rsasl::config::SASLConfig;
use rustls::pki_types::CertificateDer;
Expand All @@ -16,6 +17,7 @@ use std::{
};
use tokio::io::{split, AsyncRead, AsyncWrite, AsyncWriteExt};
use tracing_subscriber::{filter::LevelFilter, EnvFilter};
use url::Url;

/// A Kafka-compatible proxy for reading Estuary Flow collections.
#[derive(Debug, Parser, serde::Serialize)]
Expand All @@ -24,14 +26,14 @@ pub struct Cli {
/// Endpoint of the Estuary API to use.
#[arg(
long,
default_value = MANAGED_API_ENDPOINT,
default_value = DEFAULT_PG_URL.as_str(),
env = "API_ENDPOINT"
)]
api_endpoint: String,
api_endpoint: Url,
/// Public (anon) API key to use during authentication to the Estuary API.
#[arg(
long,
default_value = MANAGED_API_KEY,
default_value = DEFAULT_PG_PUBLIC_TOKEN,
env = "API_KEY"
)]
api_key: String,
Expand Down Expand Up @@ -106,10 +108,10 @@ async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
tracing::info!("Starting dekaf");

let (api_endpoint, api_token) = if cli.local {
(LOCAL_API_ENDPOINT, LOCAL_API_KEY)
let (api_endpoint, api_key) = if cli.local {
(LOCAL_PG_URL.to_owned(), LOCAL_PG_PUBLIC_TOKEN.to_string())
} else {
(cli.api_endpoint.as_str(), cli.api_key.as_str())
(cli.api_endpoint, cli.api_key)
};

let upstream_kafka_host = format!(
Expand All @@ -118,7 +120,6 @@ async fn main() -> anyhow::Result<()> {
);

let app = Arc::new(dekaf::App {
anon_client: postgrest::Postgrest::new(api_endpoint).insert_header("apikey", api_token),
advertise_host: cli.advertise_host.to_owned(),
advertise_kafka_port: cli.kafka_port,
kafka_client: KafkaApiClient::connect(
Expand All @@ -131,7 +132,9 @@ async fn main() -> anyhow::Result<()> {
).await.context(
"failed to connect or authenticate to upstream Kafka broker used for serving group management APIs",
)?,
secret: cli.encryption_secret.to_owned()
secret: cli.encryption_secret.to_owned(),
api_endpoint,
api_key
});

tracing::info!(
Expand Down Expand Up @@ -320,9 +323,3 @@ fn validate_certificate_name(
}
return Ok(false);
}

const MANAGED_API_KEY: &str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImV5cmNubXV6enlyaXlwZGFqd2RrIiwicm9sZSI6ImFub24iLCJpYXQiOjE2NDg3NTA1NzksImV4cCI6MTk2NDMyNjU3OX0.y1OyXD3-DYMz10eGxzo1eeamVMMUwIIeOoMryTRAoco";
const MANAGED_API_ENDPOINT: &str = "https://eyrcnmuzzyriypdajwdk.supabase.co/rest/v1";

const LOCAL_API_KEY: &str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0";
const LOCAL_API_ENDPOINT: &str = "http://127.0.0.1:5431/rest/v1";
10 changes: 5 additions & 5 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tracing::instrument;
struct PendingRead {
offset: i64, // Journal offset to be completed by this PendingRead.
last_write_head: i64, // Most-recent observed journal write head.
handle: tokio::task::JoinHandle<anyhow::Result<(Read, bytes::Bytes)>>,
handle: tokio_util::task::AbortOnDropHandle<anyhow::Result<(Read, bytes::Bytes)>>,
}

pub struct Session {
Expand Down Expand Up @@ -402,9 +402,9 @@ impl Session {
let pending = PendingRead {
offset: fetch_offset,
last_write_head: fetch_offset,
handle: tokio::spawn(
handle: tokio_util::task::AbortOnDropHandle::new(tokio::spawn(
read.next_batch(partition_request.partition_max_bytes as usize),
),
)),
};

tracing::info!(
Expand Down Expand Up @@ -453,9 +453,9 @@ impl Session {
} {
pending.offset = read.offset;
pending.last_write_head = read.last_write_head;
pending.handle = tokio::spawn(
pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn(
read.next_batch(partition_request.partition_max_bytes as usize),
);
));
batch
} else {
bytes::Bytes::new()
Expand Down

0 comments on commit 271b730

Please sign in to comment.