diff --git a/crates/models/src/connector.rs b/crates/models/src/connector.rs index 788879cd99..1bc7ad193a 100644 --- a/crates/models/src/connector.rs +++ b/crates/models/src/connector.rs @@ -1,6 +1,6 @@ use super::RawValue; use schemars::JsonSchema; -use serde::{Deserialize, Deserializer, Serialize}; +use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; /// Splits a full connector image name into separate image and tag components. diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index a6cdf8de7d..c87a071a95 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -11,8 +11,9 @@ license.workspace = true [dependencies] assemble = { path = "../assemble" } async-process = { path = "../async-process" } -connector-init = { path = "../connector-init" } -coroutines = { path = "../coroutines" } +connector-init = { path = "../connector-init" } +coroutines = { path = "../coroutines" } +dekaf = { path = "../dekaf" } derive-sqlite = { path = "../derive-sqlite" } doc = { path = "../doc" } extractors = { path = "../extractors" } @@ -32,7 +33,7 @@ proto-grpc = { path = "../proto-grpc", features = [ "materialize_client", "materialize_server", "runtime_server", - ]} +] } simd-doc = { path = "../simd-doc" } tuple = { path = "../tuple" } diff --git a/crates/runtime/src/unary.rs b/crates/runtime/src/unary.rs index 7ad7e28b82..2cd3b502d8 100644 --- a/crates/runtime/src/unary.rs +++ b/crates/runtime/src/unary.rs @@ -1,6 +1,6 @@ use super::{LogHandler, Runtime}; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use proto_flow::{capture, derive, materialize}; +use proto_flow::{capture, derive, flow::materialization_spec, materialize}; impl Runtime { pub async fn unary_capture( @@ -20,8 +20,17 @@ impl Runtime { self, request: materialize::Request, ) -> anyhow::Result { - let response = self.serve_materialize(unary_in(request)).boxed(); - unary_out(response).await + match materialization_spec::ConnectorType::try_from( + request.validate.as_ref().unwrap().connector_type, + ) { + Ok(materialization_spec::ConnectorType::Dekaf) => { + dekaf::connector::unary_materialize(request).await + } + _ => { + let response = self.serve_materialize(unary_in(request)).boxed(); + unary_out(response).await + } + } } }