Skip to content

Commit

Permalink
runtime: Delegate to Dekaf's unary_materialize when appropriate
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Sep 30, 2024
1 parent fd09a83 commit f71454b
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
2 changes: 1 addition & 1 deletion crates/models/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::RawValue;
use schemars::JsonSchema;
use serde::{Deserialize, Deserializer, Serialize};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

/// Splits a full connector image name into separate image and tag components.
Expand Down
7 changes: 4 additions & 3 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ license.workspace = true
[dependencies]
assemble = { path = "../assemble" }
async-process = { path = "../async-process" }
connector-init = { path = "../connector-init" }
coroutines = { path = "../coroutines" }
connector-init = { path = "../connector-init" }
coroutines = { path = "../coroutines" }
dekaf = { path = "../dekaf" }
derive-sqlite = { path = "../derive-sqlite" }
doc = { path = "../doc" }
extractors = { path = "../extractors" }
Expand All @@ -32,7 +33,7 @@ proto-grpc = { path = "../proto-grpc", features = [
"materialize_client",
"materialize_server",
"runtime_server",
]}
] }
simd-doc = { path = "../simd-doc" }
tuple = { path = "../tuple" }

Expand Down
15 changes: 12 additions & 3 deletions crates/runtime/src/unary.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{LogHandler, Runtime};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use proto_flow::{capture, derive, materialize};
use proto_flow::{capture, derive, flow::materialization_spec, materialize};

impl<L: LogHandler> Runtime<L> {
pub async fn unary_capture(
Expand All @@ -20,8 +20,17 @@ impl<L: LogHandler> Runtime<L> {
self,
request: materialize::Request,
) -> anyhow::Result<materialize::Response> {
let response = self.serve_materialize(unary_in(request)).boxed();
unary_out(response).await
match materialization_spec::ConnectorType::try_from(
request.validate.as_ref().unwrap().connector_type,
) {
Ok(materialization_spec::ConnectorType::Dekaf) => {
dekaf::connector::unary_materialize(request).await
}
_ => {
let response = self.serve_materialize(unary_in(request)).boxed();
unary_out(response).await
}
}
}
}

Expand Down

0 comments on commit f71454b

Please sign in to comment.