diff --git a/crates/runtime/src/capture/image.rs b/crates/runtime/src/capture/image.rs index ccde8e063b..9dae3d369f 100644 --- a/crates/runtime/src/capture/image.rs +++ b/crates/runtime/src/capture/image.rs @@ -1,99 +1,156 @@ use super::extract_endpoint; -use crate::{container, eof_on_error, inject_error, unseal}; -use futures::{Stream, TryStreamExt}; -use proto_flow::capture::{Request, Response}; -use proto_flow::ops; +use crate::{ + image_connector::{Connector, Container, StartRpcFuture, UnsealFuture, Unsealed}, + unseal, +}; +use futures::{channel::mpsc, FutureExt, Stream}; +use proto_flow::{ + capture::{Request, Response}, + runtime::CaptureRequestExt, +}; -pub async fn image_connector( - image: String, +fn unseal(mut request: Request) -> Result, Request> { + if !matches!( + request, + Request { spec: Some(_), .. } + | Request { + discover: Some(_), + .. + } + | Request { + validate: Some(_), + .. + } + | Request { apply: Some(_), .. } + | Request { open: Some(_), .. } + ) { + return Err(request); // Not an unseal-able request. + }; + + Ok(async move { + let (endpoint, config_json) = extract_endpoint(&mut request)?; + + let models::CaptureEndpoint::Connector(models::ConnectorConfig { + image, + config: sealed_config, + }) = endpoint; + /* else { anyhow::bail!("task connector type has changed and is no longer an image") }; */ + + *config_json = unseal::decrypt_sops(&sealed_config).await?.to_string(); + + let log_level = match request.get_internal() { + Some(Ok(CaptureRequestExt { + labels: Some(labels), + .. + })) => Some(labels.log_level()), + _ => None, + }; + + Ok(Unsealed { + image, + log_level, + request, + }) + } + .boxed()) +} + +fn start_rpc( + channel: tonic::transport::Channel, + rx: mpsc::Receiver, +) -> StartRpcFuture { + async move { + proto_grpc::capture::connector_client::ConnectorClient::new(channel) + .capture(rx) + .await + } + .boxed() +} + +fn attach_container(response: &mut Response, container: Container) { + response + .set_internal(&mut bytes::BytesMut::new(), |internal| { + internal.container = Some(container); + }) + .unwrap(); +} + +pub fn connector( log_handler: L, - network: String, + network: &str, request_rx: R, task_name: &str, -) -> tonic::Result>> +) -> mpsc::Receiver> where - L: Fn(&ops::Log) + Send + Sync + 'static, + L: Fn(&ops::Log) + Clone + Send + Sync + 'static, R: Stream> + Send + Unpin + 'static, { - let (container, channel, guard) = container::start( - &image, + let (connector, response_rx) = Connector::new( + attach_container, log_handler, - &network, + network, + request_rx, + start_rpc, task_name, ops::TaskType::Capture, - ) - .await - .map_err(crate::anyhow_to_status)?; - - // Adapt requests by identifying instances that carry endpoint configuration. - // Verify they remain compatible with our started container, and then unseal their config. - // Or if they're not compatible, then map to Status::aborted(). - let request_rx = request_rx.and_then(move |mut request| { - let must_unseal = if matches!( - request, - Request { spec: Some(_), .. } - | Request { - discover: Some(_), - .. - } - | Request { - validate: Some(_), - .. - } - | Request { apply: Some(_), .. } - | Request { open: Some(_), .. } - ) { - Some(image.clone()) // Outer closure owns `image`. - } else { - None - }; + unseal, + ); + tokio::spawn(async move { connector.run().await }); - async move { - if let Some(expect_image) = must_unseal { - let (endpoint, config_json) = - extract_endpoint(&mut request).map_err(crate::anyhow_to_status)?; + response_rx +} - let sealed_config = match endpoint { - models::CaptureEndpoint::Connector(models::ConnectorConfig { - image: this_image, - config, - }) if expect_image == this_image => config, +#[cfg(test)] +mod test { + use super::connector; + use futures::StreamExt; + use serde_json::json; - _ => return Err(tonic::Status::aborted("connector image has changed")), - }; + #[tokio::test] + async fn test_http_ingest_spec() { + if let Err(_) = locate_bin::locate("flow-connector-init") { + // Skip if `flow-connector-init` isn't available (yet). We're probably on CI. + // This test is useful as a sanity check for local development + // and we have plenty of other coverage during CI. + return; + } - *config_json = unseal::decrypt_sops(&sealed_config) - .await - .map_err(crate::anyhow_to_status)? - .to_string(); + let request_rx = futures::stream::repeat(Ok(serde_json::from_value(json!({ + "spec": { + "connectorType": "IMAGE", + "config": { + "image": "ghcr.io/estuary/source-http-ingest:dev", + "config": {}, + } } + })) + .unwrap())); - Ok(request) - } - }); - - let (request_rx, error_rx) = eof_on_error(request_rx); - - // Start a capture RPC. - let container_response = proto_grpc::capture::connector_client::ConnectorClient::new(channel) - .capture(request_rx) - .await?; - let response_rx = container_response.into_inner(); - - // Adapt responses by enriching the first Response with the image Container. - let mut container = Some(container); - let response_rx = response_rx.and_then(move |mut response| { - _ = &guard; // Move so it's retained while responses are still being read. - - if container.is_some() { - response - .set_internal(&mut bytes::BytesMut::new(), |internal| { - internal.container = container.take(); - }) - .unwrap(); - } - futures::future::ready(Ok(response)) - }); + let response_rx = connector(ops::tracing_log_handler, "", request_rx.take(2), "a-task"); + + let responses: Vec<_> = response_rx.collect().await; + assert_eq!(responses.len(), 2); + + for resp in responses { + let resp = resp.unwrap(); - Ok(inject_error(response_rx, error_rx)) + assert!(resp.spec.is_some()); + + let container = resp + .get_internal() + .expect("has internal field") + .expect("internal decodes") + .container + .expect("internal has attached container"); + + assert_eq!( + container.network_ports, + [proto_flow::flow::NetworkPort { + number: 8080, + protocol: String::new(), + public: true + }] + ); + } + } } diff --git a/crates/runtime/src/capture/mod.rs b/crates/runtime/src/capture/mod.rs index 100391085a..91a29488bd 100644 --- a/crates/runtime/src/capture/mod.rs +++ b/crates/runtime/src/capture/mod.rs @@ -81,17 +81,13 @@ where let request_rx = adjust_log_level(request_rx, self.set_log_level); let response_rx = match endpoint { - models::CaptureEndpoint::Connector(models::ConnectorConfig { image, .. }) => { - image::image_connector( - image, - self.log_handler, - self.container_network, - request_rx, - &self.task_name, - ) - .await? - .boxed() - } + models::CaptureEndpoint::Connector(models::ConnectorConfig { .. }) => image::connector( + self.log_handler, + &self.container_network, + request_rx, + &self.task_name, + ) + .boxed(), }; Ok(response_rx) diff --git a/crates/runtime/src/container.rs b/crates/runtime/src/container.rs index 871542e8d5..d5639eef4a 100644 --- a/crates/runtime/src/container.rs +++ b/crates/runtime/src/container.rs @@ -1,6 +1,6 @@ use anyhow::Context; use futures::channel::oneshot; -use proto_flow::{flow, ops, runtime}; +use proto_flow::{flow, runtime}; use tokio::io::AsyncBufReadExt; // Port on which flow-connector-init listens for requests. @@ -16,6 +16,7 @@ const CONNECTOR_INIT_PORT: u16 = 49092; pub async fn start( image: &str, log_handler: L, + log_level: Option, network: &str, task_name: &str, task_type: ops::TaskType, @@ -68,6 +69,7 @@ where // This is default `docker run` behavior if --network is not provided. let network = if network == "" { "bridge" } else { network }; + let log_level = log_level.unwrap_or(ops::LogLevel::Warn); let mut process: async_process::Child = async_process::Command::new("docker") .args([ @@ -92,6 +94,7 @@ where ), // Thread-through the logging configuration of the connector. "--env=LOG_FORMAT=json".to_string(), + format!("--env=LOG_LEVEL={}", log_level.as_str_name()), // Cgroup memory / CPU resource limits. // TODO(johnny): we intend to tighten these down further, over time. "--memory=1g".to_string(), @@ -343,6 +346,7 @@ mod test { let (container, channel, _guard) = start( "ghcr.io/estuary/source-http-ingest:dev", ops::tracing_log_handler, + Some(ops::LogLevel::Debug), "", "a-task-name", proto_flow::ops::TaskType::Capture, diff --git a/crates/runtime/src/derive/image.rs b/crates/runtime/src/derive/image.rs index 3dabdfd69e..13511aabf9 100644 --- a/crates/runtime/src/derive/image.rs +++ b/crates/runtime/src/derive/image.rs @@ -1,94 +1,96 @@ use super::extract_endpoint; -use crate::{container, eof_on_error, inject_error, unseal}; -use futures::{Stream, TryStreamExt}; -use proto_flow::derive::{Request, Response}; -use proto_flow::ops; +use crate::{ + image_connector::{Connector, Container, StartRpcFuture, UnsealFuture, Unsealed}, + unseal, +}; +use futures::{channel::mpsc, FutureExt, Stream}; +use proto_flow::{ + derive::{Request, Response}, + runtime::DeriveRequestExt, +}; -pub async fn image_connector( - image: String, +fn unseal(mut request: Request) -> Result, Request> { + if !matches!( + request, + Request { spec: Some(_), .. } + | Request { + validate: Some(_), + .. + } + | Request { open: Some(_), .. } + ) { + return Err(request); // Not an unseal-able request. + }; + + Ok(async move { + let (endpoint, config_json) = extract_endpoint(&mut request)?; + + let models::DeriveUsing::Connector(models::ConnectorConfig { + image, + config: sealed_config, + }) = endpoint else { + anyhow::bail!("task connector type has changed and is no longer an image") + }; + *config_json = unseal::decrypt_sops(&sealed_config).await?.to_string(); + + let log_level = match request.get_internal() { + Some(Ok(DeriveRequestExt { + labels: Some(labels), + .. + })) => Some(labels.log_level()), + _ => None, + }; + + Ok(Unsealed { + image, + log_level, + request, + }) + } + .boxed()) +} + +fn start_rpc( + channel: tonic::transport::Channel, + rx: mpsc::Receiver, +) -> StartRpcFuture { + async move { + proto_grpc::derive::connector_client::ConnectorClient::new(channel) + .derive(rx) + .await + } + .boxed() +} + +fn attach_container(response: &mut Response, container: Container) { + response + .set_internal(&mut bytes::BytesMut::new(), |internal| { + internal.container = Some(container); + }) + .unwrap(); +} + +pub fn connector( log_handler: L, - network: String, + network: &str, request_rx: R, task_name: &str, -) -> tonic::Result>> +) -> mpsc::Receiver> where - L: Fn(&ops::Log) + Send + Sync + 'static, + L: Fn(&ops::Log) + Clone + Send + Sync + 'static, R: Stream> + Send + Unpin + 'static, { - let (container, channel, guard) = container::start( - &image, + let (connector, response_rx) = Connector::new( + attach_container, log_handler, - &network, + network, + request_rx, + start_rpc, task_name, - ops::TaskType::Capture, - ) - .await - .map_err(crate::anyhow_to_status)?; - - // Adapt requests by identifying instances that carry endpoint configuration. - // Verify they remain compatible with our started container, and then unseal their config. - // Or if they're not compatible, then map to Status::aborted(). - let request_rx = request_rx.and_then(move |mut request| { - let must_unseal = if matches!( - request, - Request { spec: Some(_), .. } - | Request { - validate: Some(_), - .. - } - | Request { open: Some(_), .. } - ) { - Some(image.clone()) // Outer closure owns `image`. - } else { - None - }; - - async move { - if let Some(expect_image) = must_unseal { - let (endpoint, config_json) = - extract_endpoint(&mut request).map_err(crate::anyhow_to_status)?; - - let sealed_config = match endpoint { - models::DeriveUsing::Connector(models::ConnectorConfig { - image: this_image, - config, - }) if expect_image == this_image => config, - - _ => return Err(tonic::Status::aborted("connector image has changed")), - }; - - *config_json = unseal::decrypt_sops(&sealed_config) - .await - .map_err(crate::anyhow_to_status)? - .to_string(); - } - - Ok(request) - } - }); - - let (request_rx, error_rx) = eof_on_error(request_rx); - - // Start a derive RPC. - let container_response = proto_grpc::derive::connector_client::ConnectorClient::new(channel) - .derive(request_rx) - .await?; - let response_rx = container_response.into_inner(); - - // Adapt responses by enriching the first Response with the image Container. - let mut container = Some(container); - let response_rx = response_rx.and_then(move |mut response| { - _ = &guard; // Move so it's retained while responses are still being read. - - if container.is_some() { - response - .set_internal(&mut bytes::BytesMut::new(), |internal| { - internal.container = container.take(); - }) - .unwrap(); - } - futures::future::ready(Ok(response)) - }); + ops::TaskType::Derivation, + unseal, + ); + tokio::spawn(async move { connector.run().await }); - Ok(inject_error(response_rx, error_rx)) + response_rx } diff --git a/crates/runtime/src/derive/mod.rs b/crates/runtime/src/derive/mod.rs index cc8afb6749..8001bb0f35 100644 --- a/crates/runtime/src/derive/mod.rs +++ b/crates/runtime/src/derive/mod.rs @@ -92,20 +92,18 @@ where response_rx.boxed() } - models::DeriveUsing::Connector(models::ConnectorConfig { image, .. }) => { + models::DeriveUsing::Connector(models::ConnectorConfig { .. }) => { // Request interceptor for stateful RocksDB storage. let (request_rx, rocks_back) = rocksdb::adapt_requests(&peek_request, request_rx) .map_err(crate::anyhow_to_status)?; // Invoke the underlying image connector. - let response_rx = image::image_connector( - image, + let response_rx = image::connector( self.log_handler, - self.container_network, + &self.container_network, request_rx, &self.task_name, - ) - .await?; + ); // Response interceptor for stateful RocksDB storage. let response_rx = rocks_back.adapt_responses(response_rx); diff --git a/crates/runtime/src/image_connector.rs b/crates/runtime/src/image_connector.rs new file mode 100644 index 0000000000..d2e5c25656 --- /dev/null +++ b/crates/runtime/src/image_connector.rs @@ -0,0 +1,315 @@ +use super::container; +use futures::future::BoxFuture; +use futures::SinkExt; +use futures::{channel::mpsc, Stream, StreamExt}; +use proto_flow::ops; + +/// Container is a description of a running Container instance. +pub use proto_flow::runtime::Container; + +/// Unsealed is a container context that's ready to spawn. +pub struct Unsealed { + /// Image to run. + pub image: String, + /// Log-level of the container, if known. + pub log_level: Option, + /// First request of the connector stream. + pub request: Request, +} + +/// UnsealFuture is the response type of a function that unseals Requests. +pub type UnsealFuture = BoxFuture<'static, anyhow::Result>>; + +/// StartRpcFuture is the response type of a function that starts a connector RPC. +pub type StartRpcFuture = + BoxFuture<'static, tonic::Result>>>; + +/// Connector manages the lifecycle of delegate containers in the broader +/// context of a longer-lived connectors RPC stream. +/// +/// * Request: The RPC Request type. +/// * Response: The RPC Response type. +/// * Requests: A Stream of Request. +/// * Unseal: Attempt to Unseal a Request, returning Ok with a future that +/// resolves the Unsealed Result or, if the Request does not unseal, +/// then an Error with the unmodified Request. +/// * StartRpc: Start an RPC stream with the container channel. +/// * Attach: Attach a Container description to the first Response +/// of each delegate container lifecycle. +pub struct Connector +where + Requests: Stream> + Send + Unpin + 'static, + Unseal: Fn(Request) -> Result, Request>, + StartRpc: Fn(tonic::transport::Channel, mpsc::Receiver) -> StartRpcFuture, + Attach: Fn(&mut Response, Container), + L: Fn(&ops::Log) + Clone + Send + Sync + 'static, +{ + attach_container: Attach, + container: Option, + log_handler: L, + network: String, + request_rx: Requests, + response_tx: mpsc::Sender>, + start_rpc: StartRpc, + state: State, + task_name: String, + task_type: ops::TaskType, + unseal: Unseal, +} + +enum State { + // We're ready to start a container. + Idle, + // Container has an active bidirectional stream. + Running { + container_rx: tonic::Streaming, + container_tx: mpsc::Sender, + guard: container::Guard, + }, + // We must restart a new container. We've sent the current one EOF, + // and are waiting to see its EOF before we begin a new instance. + Restarting { + container_rx: tonic::Streaming, + _guard: container::Guard, + unseal: UnsealFuture, + }, + // Requests reach EOF. We've sent EOF into the container and are + // draining its final responses. + Draining { + container_rx: tonic::Streaming, + _guard: container::Guard, + }, +} + +impl + Connector +where + Request: serde::Serialize, + Requests: Stream> + Send + Unpin + 'static, + Unseal: Fn(Request) -> Result, Request>, + StartRpc: Fn(tonic::transport::Channel, mpsc::Receiver) -> StartRpcFuture, + Attach: Fn(&mut Response, Container), + L: Fn(&ops::Log) + Clone + Send + Sync + 'static, +{ + pub fn new( + attach_container: Attach, + log_handler: L, + network: &str, + request_rx: Requests, + start_rpc: StartRpc, + task_name: &str, + task_type: ops::TaskType, + unseal: Unseal, + ) -> (Self, mpsc::Receiver>) { + let (response_tx, response_rx) = mpsc::channel(crate::CHANNEL_BUFFER); + + ( + Self { + attach_container, + unseal, + container: None, + network: network.to_string(), + request_rx, + start_rpc, + response_tx, + state: State::::Idle, + task_name: task_name.to_string(), + log_handler, + task_type, + }, + response_rx, + ) + } + + /// Run the Connector until it's complete. + pub async fn run(mut self) { + loop { + // Select over the next request or the next container response. + // We use Result as a semantic "either" type. + let rx = match &mut self.state { + // We're only reading requests. + State::Idle => Err(self.request_rx.next().await), + // We're reading requests and container responses. + State::Running { container_rx, .. } => tokio::select! { + rx = container_rx.next() => Ok(rx), + rx = self.request_rx.next() => Err(rx), + }, + // We're only reading container responses. + State::Restarting { container_rx, .. } | State::Draining { container_rx, .. } => { + Ok(container_rx.next().await) + } + }; + + if !match rx { + Ok(Some(Ok(rx))) => self.container_rx(rx).await, + Ok(Some(Err(status))) => self.on_error(status).await, + Ok(None) => self.container_eof().await, + Err(Some(Ok(rx))) => self.request_rx(rx).await, + Err(Some(Err(status))) => self.on_error(status).await, + Err(None) => self.request_eof().await, + } { + break; + } + } + } + + async fn spawn_container( + &mut self, + Unsealed { + image, + log_level, + request, + }: Unsealed, + ) -> bool { + assert!(matches!(&self.state, State::Idle)); + + let (mut container_tx, container_rx) = mpsc::channel(crate::CHANNEL_BUFFER); + () = container_tx + .try_send(request) + .expect("can always send first request into buffered channel"); + + let started = container::start( + &image, + self.log_handler.clone(), + log_level, + &self.network, + &self.task_name, + self.task_type, + ) + .await; + + let (container, channel, guard) = match started { + Ok(ok) => ok, + Err(err) => return self.on_error(crate::anyhow_to_status(err)).await, + }; + + let container_rx = match (self.start_rpc)(channel, container_rx).await { + Ok(ok) => ok, + Err(status) => return self.on_error(status).await, + } + .into_inner(); + + self.state = State::Running { + container_rx, + container_tx, + guard, + }; + self.container = Some(container); + + true + } + + async fn container_rx(&mut self, mut rx: Response) -> bool { + // Attach Container to the first Response of a delegate container session. + if let Some(container) = self.container.take() { + (self.attach_container)(&mut rx, container); + } + + if let Err(send_error) = self.response_tx.send(Ok(rx)).await { + tracing::warn!(%send_error, "failed to forward container response"); + false // All done. Container is cancelled via Drop. + } else { + true + } + } + + async fn request_rx(&mut self, rx: Request) -> bool { + match (self.unseal)(rx) { + Ok(unseal) => match std::mem::replace(&mut self.state, State::Idle) { + State::Idle => match unseal.await { + Ok(unsealed) => self.spawn_container(unsealed).await, + Err(error) => self.on_error(crate::anyhow_to_status(error)).await, + }, + State::Running { + container_rx, + container_tx: _, // Send EOF. + guard, + } => { + self.state = State::Restarting { + container_rx, + _guard: guard, + unseal, + }; + true + } + State::Restarting { .. } => unreachable!("not reading requests while restarting"), + State::Draining { .. } => unreachable!("not reading requests while draining"), + }, + Err(rx) => match &mut self.state { + State::Idle => { + self.on_error(tonic::Status::invalid_argument(format!( + "invalid Request when no image container is running: {}", + serde_json::to_string(&rx).unwrap() + ))) + .await + } + State::Running { container_tx, .. } => match container_tx.send(rx).await { + Ok(()) => true, + Err(_send_error) => { + self.on_error(tonic::Status::internal( + "connector unexpectedly closed its running request stream", + )) + .await + } + }, + State::Restarting { .. } => unreachable!("not reading requests while restarting"), + State::Draining { .. } => unreachable!("not reading requests while draining"), + }, + } + } + + async fn container_eof(&mut self) -> bool { + match std::mem::replace(&mut self.state, State::Idle) { + State::Idle => unreachable!("not reading responses while idle"), + State::Running { + container_rx: _, + container_tx: _, + guard: _, + } => { + self.on_error(tonic::Status::aborted( + "connector unexpectedly closed its response stream while the request stream was still open")).await + } + State::Restarting { + container_rx: _, + _guard: _, + unseal, + } => { + // Previous delegate has completed; start the next one. + match unseal.await { + Ok(unsealed) => self.spawn_container(unsealed).await, + Err(error) => self.on_error(crate::anyhow_to_status(error)).await, + } + } + State::Draining { + container_rx: _, + _guard: _, + } => false, // `request_rx` has already EOF'd. All done. + } + } + + async fn request_eof(&mut self) -> bool { + match std::mem::replace(&mut self.state, State::Idle) { + State::Idle => false, // No running container. All done. + State::Running { + container_rx, + container_tx: _, // Send EOF. + guard, + } => { + self.state = State::Draining { + container_rx, + _guard: guard, + }; + true // Wait for EOF from container. + } + State::Restarting { .. } => unreachable!("not reading requests"), + State::Draining { .. } => unreachable!("not reading requests"), + } + } + + async fn on_error(&mut self, status: tonic::Status) -> bool { + if let Err(send_error) = self.response_tx.send(Err(status.clone())).await { + tracing::warn!(%status, %send_error, "encountered terminal error but response stream is cancelled"); + } + false // All done. If a container is running, it's cancelled via Drop. + } +} diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 9c68c68029..5083fcf114 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -1,10 +1,9 @@ -use futures::{channel::oneshot, Stream, StreamExt}; -use proto_flow::ops; use std::sync::Arc; mod capture; mod container; mod derive; +mod image_connector; mod materialize; mod task_service; mod tokio_context; @@ -30,7 +29,7 @@ where { container_network: String, log_handler: L, - set_log_level: Option>, + set_log_level: Option>, task_name: String, } @@ -46,7 +45,7 @@ where pub fn new( container_network: String, log_handler: L, - set_log_level: Option>, + set_log_level: Option>, task_name: String, ) -> Self { Self { @@ -78,92 +77,4 @@ where } } -/// Adapt a Stream> into Stream by mapping the first Error into stream EOF. -/// The Error instance is passed through the returned oneshot Receiver. -fn eof_on_error(stream: S) -> (impl Stream, oneshot::Receiver) -where - S: futures::stream::Stream> + Send + 'static, - Error: std::fmt::Debug, -{ - let (error_tx, error_rx) = oneshot::channel(); - - let stream = stream.scan(error_tx, |error_tx, item| { - futures::future::ready(match item { - Ok(ok) => Some(ok), - Err(error) => { - // Replace because send() consumes `error_tx`. - if let Err(error) = std::mem::replace(error_tx, oneshot::channel().0).send(error) { - tracing::warn!( - ?error, - "request error but the response stream has already closed" - ) - } - None // End of stream. - } - }) - }); - - (stream, error_rx) -} - -/// Adapt a Stream> by monitoring a provided oneshot Receiver and, -/// should it ever resolve, injecting its resolved Error into the adapted Stream. -fn inject_error( - stream: S, - error_rx: oneshot::Receiver, -) -> impl Stream> -where - S: futures::stream::Stream> + Send + 'static + Unpin, -{ - let error_rx = futures::stream::unfold(Some(error_rx), |error_rx| async move { - let Some(error_rx) = error_rx else { return None }; - - match error_rx.await { - Ok(error) => Some((Err(error), None)), - Err(_cancelled) => None, - } - }); - futures::stream::select(stream, error_rx) -} - -#[cfg(test)] -mod test { - use super::*; - - #[tokio::test] - async fn test_error_pass_through() { - // Case 1: A stream produces some values and then fails. - let (stream, err_rx) = eof_on_error(futures::stream::iter(vec![ - Ok(1), - Ok(2), - Ok(3), - Err(99), - Ok(100), - ])); - // We see all values prior to failure. - assert_eq!(stream.collect::>().await, vec![1, 2, 3]); - // We see the error after injecting into an empty stream. - assert_eq!( - inject_error(futures::stream::empty::>(), err_rx) - .collect::>() - .await, - vec![Err(99)] - ); - - // Case 2: A stream produces values and EOF's without failure. - let (stream, err_rx) = eof_on_error(futures::stream::iter(vec![ - Result::<_, i32>::Ok(1), - Ok(2), - Ok(3), - ])); - // We see all values. - assert_eq!(stream.collect::>().await, vec![1, 2, 3]); - // We see a clean EOF of our injected stream. - assert_eq!( - inject_error(futures::stream::iter(vec![Ok(4), Ok(5)]), err_rx) - .collect::>() - .await, - vec![Ok(4), Ok(5)] - ); - } -} +const CHANNEL_BUFFER: usize = 8; diff --git a/crates/runtime/src/materialize/image.rs b/crates/runtime/src/materialize/image.rs index a6bb82934f..ea21a7dd01 100644 --- a/crates/runtime/src/materialize/image.rs +++ b/crates/runtime/src/materialize/image.rs @@ -1,96 +1,97 @@ use super::extract_endpoint; -use crate::{container, eof_on_error, inject_error, unseal}; -use futures::{Stream, TryStreamExt}; -use proto_flow::materialize::{Request, Response}; -use proto_flow::ops; +use crate::{ + image_connector::{Connector, Container, StartRpcFuture, UnsealFuture, Unsealed}, + unseal, +}; +use futures::{channel::mpsc, FutureExt, Stream}; +use proto_flow::{ + materialize::{Request, Response}, + runtime::MaterializeRequestExt, +}; -pub async fn image_connector( - image: String, +fn unseal(mut request: Request) -> Result, Request> { + if !matches!( + request, + Request { spec: Some(_), .. } + | Request { + validate: Some(_), + .. + } + | Request { apply: Some(_), .. } + | Request { open: Some(_), .. } + ) { + return Err(request); // Not an unseal-able request. + }; + + Ok(async move { + let (endpoint, config_json) = extract_endpoint(&mut request)?; + + let models::MaterializationEndpoint::Connector(models::ConnectorConfig { + image, + config: sealed_config, + }) = endpoint else { + anyhow::bail!("task connector type has changed and is no longer an image") + }; + *config_json = unseal::decrypt_sops(&sealed_config).await?.to_string(); + + let log_level = match request.get_internal() { + Some(Ok(MaterializeRequestExt { + labels: Some(labels), + .. + })) => Some(labels.log_level()), + _ => None, + }; + + Ok(Unsealed { + image, + log_level, + request, + }) + } + .boxed()) +} + +fn start_rpc( + channel: tonic::transport::Channel, + rx: mpsc::Receiver, +) -> StartRpcFuture { + async move { + proto_grpc::materialize::connector_client::ConnectorClient::new(channel) + .materialize(rx) + .await + } + .boxed() +} + +fn attach_container(response: &mut Response, container: Container) { + response + .set_internal(&mut bytes::BytesMut::new(), |internal| { + internal.container = Some(container); + }) + .unwrap(); +} + +pub fn connector( log_handler: L, - network: String, + network: &str, request_rx: R, task_name: &str, -) -> tonic::Result>> +) -> mpsc::Receiver> where - L: Fn(&ops::Log) + Send + Sync + 'static, + L: Fn(&ops::Log) + Clone + Send + Sync + 'static, R: Stream> + Send + Unpin + 'static, { - let (container, channel, guard) = container::start( - &image, + let (connector, response_rx) = Connector::new( + attach_container, log_handler, - &network, + network, + request_rx, + start_rpc, task_name, ops::TaskType::Materialization, - ) - .await - .map_err(crate::anyhow_to_status)?; - - // Adapt requests by identifying instances that carry endpoint configuration. - // Verify they remain compatible with our started container, and then unseal their config. - // Or if they're not compatible, then map to Status::aborted(). - let request_rx = request_rx.and_then(move |mut request| { - let must_unseal = if matches!( - request, - Request { spec: Some(_), .. } - | Request { - validate: Some(_), - .. - } - | Request { apply: Some(_), .. } - | Request { open: Some(_), .. } - ) { - Some(image.clone()) // Outer closure owns `image`. - } else { - None - }; - - async move { - if let Some(expect_image) = must_unseal { - let (endpoint, config_json) = - extract_endpoint(&mut request).map_err(crate::anyhow_to_status)?; - - let sealed_config = match endpoint { - models::MaterializationEndpoint::Connector(models::ConnectorConfig { - image: this_image, - config, - }) if expect_image == this_image => config, - - _ => return Err(tonic::Status::aborted("connector image has changed")), - }; - - *config_json = unseal::decrypt_sops(&sealed_config) - .await - .map_err(crate::anyhow_to_status)? - .to_string(); - } - - Ok(request) - } - }); - - let (request_rx, error_rx) = eof_on_error(request_rx); - - // Start a materialize RPC. - let container_response = - proto_grpc::materialize::connector_client::ConnectorClient::new(channel) - .materialize(request_rx) - .await?; - let response_rx = container_response.into_inner(); - - // Adapt responses by enriching the first Response with the image Container. - let mut container = Some(container); - let response_rx = response_rx.and_then(move |mut response| { - _ = &guard; // Move so it's retained while responses are still being read. - - if container.is_some() { - response - .set_internal(&mut bytes::BytesMut::new(), |internal| { - internal.container = container.take(); - }) - .unwrap(); - } - futures::future::ready(Ok(response)) - }); + unseal, + ); + tokio::spawn(async move { connector.run().await }); - Ok(inject_error(response_rx, error_rx)) + response_rx } diff --git a/crates/runtime/src/materialize/mod.rs b/crates/runtime/src/materialize/mod.rs index ed4225025d..e057388dcd 100644 --- a/crates/runtime/src/materialize/mod.rs +++ b/crates/runtime/src/materialize/mod.rs @@ -89,17 +89,15 @@ where let request_rx = adjust_log_level(request_rx, self.set_log_level); let response_rx = match endpoint { - models::MaterializationEndpoint::Connector(models::ConnectorConfig { - image, .. - }) => image::image_connector( - image, - self.log_handler, - self.container_network, - request_rx, - &self.task_name, - ) - .await? - .boxed(), + models::MaterializationEndpoint::Connector(models::ConnectorConfig { .. }) => { + image::connector( + self.log_handler, + &self.container_network, + request_rx, + &self.task_name, + ) + .boxed() + } models::MaterializationEndpoint::Sqlite(_) => { return Err(tonic::Status::invalid_argument(