From 271b730c9ebbd003f867b2d3eaebaa366177e37b Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Wed, 2 Oct 2024 12:14:15 -0400 Subject: [PATCH] dekaf: Implement a couple memory optimizations --- crates/dekaf/src/api_client.rs | 27 +++++++++++++++++++-------- crates/dekaf/src/lib.rs | 16 ++++++++-------- crates/dekaf/src/main.rs | 25 +++++++++++-------------- crates/dekaf/src/session.rs | 10 +++++----- 4 files changed, 43 insertions(+), 35 deletions(-) diff --git a/crates/dekaf/src/api_client.rs b/crates/dekaf/src/api_client.rs index bab79307a6..17ae6133c6 100644 --- a/crates/dekaf/src/api_client.rs +++ b/crates/dekaf/src/api_client.rs @@ -7,8 +7,10 @@ use kafka_protocol::{ protocol::{self, Decodable, Encodable, Request}, }; use rsasl::{config::SASLConfig, mechname::Mechname, prelude::SASLClient}; +use rustls::RootCertStore; use std::{boxed::Box, cell::Cell, collections::HashMap, fmt::Debug, io, time::Duration}; use std::{io::BufWriter, pin::Pin, sync::Arc}; +use tokio::sync::OnceCell; use tokio::sync::RwLock; use tokio_rustls::rustls; use tokio_util::{codec, task::AbortOnDropHandle}; @@ -24,22 +26,31 @@ type BoxedKafkaConnection = Pin< >, >; +static ROOT_CERT_STORE: OnceCell> = OnceCell::const_new(); + #[tracing::instrument(skip_all)] async fn async_connect(broker_url: &str) -> anyhow::Result { // Establish a TCP connection to the Kafka broker let parsed_url = Url::parse(broker_url)?; - // This returns an Err indicating that the default provider is already set - // but without this call rustls crashes with the following error: - // `no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point` - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - - let mut root_cert_store = rustls::RootCertStore::empty(); - root_cert_store.add_parsable_certificates(rustls_native_certs::load_native_certs()?); + let root_certs = ROOT_CERT_STORE + .get_or_try_init(|| async { + // This returns an Err indicating that the default provider is already set + // but without this call rustls crashes with the following error: + // `no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point` + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + + let mut certs = rustls::RootCertStore::empty(); + certs.add_parsable_certificates( + rustls_native_certs::load_native_certs().expect("failed to load native certs"), + ); + Ok::, anyhow::Error>(Arc::new(certs)) + }) + .await?; let tls_config = rustls::ClientConfig::builder() - .with_root_certificates(root_cert_store) + .with_root_certificates(root_certs.to_owned()) .with_no_client_auth(); let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config)); diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index 0be0ec58f2..8c1016e607 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -24,17 +24,13 @@ pub use api_client::KafkaApiClient; use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser}; use connector::DekafConfig; -use flow_client::{ - client::RefreshToken, DEFAULT_AGENT_URL, DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL, -}; +use flow_client::{client::RefreshToken, DEFAULT_AGENT_URL}; use percent_encoding::{percent_decode_str, utf8_percent_encode}; use serde::{Deserialize, Serialize}; -use serde_json::de; use std::time::SystemTime; +use url::Url; pub struct App { - /// Anonymous API client for the Estuary control plane. - pub anon_client: postgrest::Postgrest, /// Hostname which is advertised for Kafka access. pub advertise_host: String, /// Port which is advertised for Kafka access. @@ -43,6 +39,10 @@ pub struct App { pub kafka_client: KafkaApiClient, /// Secret used to secure Prometheus endpoint pub secret: String, + /// Supabase endpoint + pub api_endpoint: Url, + /// Supabase api key + pub api_key: String, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -71,8 +71,8 @@ impl App { let mut client = flow_client::Client::new( DEFAULT_AGENT_URL.to_owned(), - DEFAULT_PG_PUBLIC_TOKEN.to_string(), - DEFAULT_PG_URL.to_owned(), + self.api_key.to_owned(), + self.api_endpoint.to_owned(), None, Some(refresh), ); diff --git a/crates/dekaf/src/main.rs b/crates/dekaf/src/main.rs index 19286dd661..6fcfd2e3c7 100644 --- a/crates/dekaf/src/main.rs +++ b/crates/dekaf/src/main.rs @@ -5,6 +5,7 @@ use anyhow::{bail, Context}; use axum_server::tls_rustls::RustlsConfig; use clap::{Args, Parser}; use dekaf::{KafkaApiClient, Session}; +use flow_client::{DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL, LOCAL_PG_PUBLIC_TOKEN, LOCAL_PG_URL}; use futures::{FutureExt, TryStreamExt}; use rsasl::config::SASLConfig; use rustls::pki_types::CertificateDer; @@ -16,6 +17,7 @@ use std::{ }; use tokio::io::{split, AsyncRead, AsyncWrite, AsyncWriteExt}; use tracing_subscriber::{filter::LevelFilter, EnvFilter}; +use url::Url; /// A Kafka-compatible proxy for reading Estuary Flow collections. #[derive(Debug, Parser, serde::Serialize)] @@ -24,14 +26,14 @@ pub struct Cli { /// Endpoint of the Estuary API to use. #[arg( long, - default_value = MANAGED_API_ENDPOINT, + default_value = DEFAULT_PG_URL.as_str(), env = "API_ENDPOINT" )] - api_endpoint: String, + api_endpoint: Url, /// Public (anon) API key to use during authentication to the Estuary API. #[arg( long, - default_value = MANAGED_API_KEY, + default_value = DEFAULT_PG_PUBLIC_TOKEN, env = "API_KEY" )] api_key: String, @@ -106,10 +108,10 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); tracing::info!("Starting dekaf"); - let (api_endpoint, api_token) = if cli.local { - (LOCAL_API_ENDPOINT, LOCAL_API_KEY) + let (api_endpoint, api_key) = if cli.local { + (LOCAL_PG_URL.to_owned(), LOCAL_PG_PUBLIC_TOKEN.to_string()) } else { - (cli.api_endpoint.as_str(), cli.api_key.as_str()) + (cli.api_endpoint, cli.api_key) }; let upstream_kafka_host = format!( @@ -118,7 +120,6 @@ async fn main() -> anyhow::Result<()> { ); let app = Arc::new(dekaf::App { - anon_client: postgrest::Postgrest::new(api_endpoint).insert_header("apikey", api_token), advertise_host: cli.advertise_host.to_owned(), advertise_kafka_port: cli.kafka_port, kafka_client: KafkaApiClient::connect( @@ -131,7 +132,9 @@ async fn main() -> anyhow::Result<()> { ).await.context( "failed to connect or authenticate to upstream Kafka broker used for serving group management APIs", )?, - secret: cli.encryption_secret.to_owned() + secret: cli.encryption_secret.to_owned(), + api_endpoint, + api_key }); tracing::info!( @@ -320,9 +323,3 @@ fn validate_certificate_name( } return Ok(false); } - -const MANAGED_API_KEY: &str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImV5cmNubXV6enlyaXlwZGFqd2RrIiwicm9sZSI6ImFub24iLCJpYXQiOjE2NDg3NTA1NzksImV4cCI6MTk2NDMyNjU3OX0.y1OyXD3-DYMz10eGxzo1eeamVMMUwIIeOoMryTRAoco"; -const MANAGED_API_ENDPOINT: &str = "https://eyrcnmuzzyriypdajwdk.supabase.co/rest/v1"; - -const LOCAL_API_KEY: &str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0"; -const LOCAL_API_ENDPOINT: &str = "http://127.0.0.1:5431/rest/v1"; diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 3032d4474d..5a1cdcf748 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -27,7 +27,7 @@ use tracing::instrument; struct PendingRead { offset: i64, // Journal offset to be completed by this PendingRead. last_write_head: i64, // Most-recent observed journal write head. - handle: tokio::task::JoinHandle>, + handle: tokio_util::task::AbortOnDropHandle>, } pub struct Session { @@ -402,9 +402,9 @@ impl Session { let pending = PendingRead { offset: fetch_offset, last_write_head: fetch_offset, - handle: tokio::spawn( + handle: tokio_util::task::AbortOnDropHandle::new(tokio::spawn( read.next_batch(partition_request.partition_max_bytes as usize), - ), + )), }; tracing::info!( @@ -453,9 +453,9 @@ impl Session { } { pending.offset = read.offset; pending.last_write_head = read.last_write_head; - pending.handle = tokio::spawn( + pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn( read.next_batch(partition_request.partition_max_bytes as usize), - ); + )); batch } else { bytes::Bytes::new()