Skip to content

Commit

Permalink
runtime: rework image connector
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Aug 31, 2023
1 parent e4a9c10 commit 15e9918
Show file tree
Hide file tree
Showing 9 changed files with 649 additions and 367 deletions.
217 changes: 137 additions & 80 deletions crates/runtime/src/capture/image.rs
Original file line number Diff line number Diff line change
@@ -1,99 +1,156 @@
use super::extract_endpoint;
use crate::{container, eof_on_error, inject_error, unseal};
use futures::{Stream, TryStreamExt};
use proto_flow::capture::{Request, Response};
use proto_flow::ops;
use crate::{
image_connector::{Connector, Container, StartRpcFuture, UnsealFuture, Unsealed},
unseal,
};
use futures::{channel::mpsc, FutureExt, Stream};
use proto_flow::{
capture::{Request, Response},
runtime::CaptureRequestExt,
};

pub async fn image_connector<L, R>(
image: String,
fn unseal(mut request: Request) -> Result<UnsealFuture<Request>, Request> {
if !matches!(
request,
Request { spec: Some(_), .. }
| Request {
discover: Some(_),
..
}
| Request {
validate: Some(_),
..
}
| Request { apply: Some(_), .. }
| Request { open: Some(_), .. }
) {
return Err(request); // Not an unseal-able request.
};

Ok(async move {
let (endpoint, config_json) = extract_endpoint(&mut request)?;

let models::CaptureEndpoint::Connector(models::ConnectorConfig {
image,
config: sealed_config,
}) = endpoint;
/* else { anyhow::bail!("task connector type has changed and is no longer an image") }; */

*config_json = unseal::decrypt_sops(&sealed_config).await?.to_string();

let log_level = match request.get_internal() {
Some(Ok(CaptureRequestExt {
labels: Some(labels),
..
})) => Some(labels.log_level()),
_ => None,
};

Ok(Unsealed {
image,
log_level,
request,
})
}
.boxed())
}

fn start_rpc(
channel: tonic::transport::Channel,
rx: mpsc::Receiver<Request>,
) -> StartRpcFuture<Response> {
async move {
proto_grpc::capture::connector_client::ConnectorClient::new(channel)
.capture(rx)
.await
}
.boxed()
}

fn attach_container(response: &mut Response, container: Container) {
response
.set_internal(&mut bytes::BytesMut::new(), |internal| {
internal.container = Some(container);
})
.unwrap();
}

pub fn connector<L, R>(
log_handler: L,
network: String,
network: &str,
request_rx: R,
task_name: &str,
) -> tonic::Result<impl Stream<Item = tonic::Result<Response>>>
) -> mpsc::Receiver<tonic::Result<Response>>
where
L: Fn(&ops::Log) + Send + Sync + 'static,
L: Fn(&ops::Log) + Clone + Send + Sync + 'static,
R: Stream<Item = tonic::Result<Request>> + Send + Unpin + 'static,
{
let (container, channel, guard) = container::start(
&image,
let (connector, response_rx) = Connector::new(
attach_container,
log_handler,
&network,
network,
request_rx,
start_rpc,
task_name,
ops::TaskType::Capture,
)
.await
.map_err(crate::anyhow_to_status)?;

// Adapt requests by identifying instances that carry endpoint configuration.
// Verify they remain compatible with our started container, and then unseal their config.
// Or if they're not compatible, then map to Status::aborted().
let request_rx = request_rx.and_then(move |mut request| {
let must_unseal = if matches!(
request,
Request { spec: Some(_), .. }
| Request {
discover: Some(_),
..
}
| Request {
validate: Some(_),
..
}
| Request { apply: Some(_), .. }
| Request { open: Some(_), .. }
) {
Some(image.clone()) // Outer closure owns `image`.
} else {
None
};
unseal,
);
tokio::spawn(async move { connector.run().await });

async move {
if let Some(expect_image) = must_unseal {
let (endpoint, config_json) =
extract_endpoint(&mut request).map_err(crate::anyhow_to_status)?;
response_rx
}

let sealed_config = match endpoint {
models::CaptureEndpoint::Connector(models::ConnectorConfig {
image: this_image,
config,
}) if expect_image == this_image => config,
#[cfg(test)]
mod test {
use super::connector;
use futures::StreamExt;
use serde_json::json;

_ => return Err(tonic::Status::aborted("connector image has changed")),
};
#[tokio::test]
async fn test_http_ingest_spec() {
if let Err(_) = locate_bin::locate("flow-connector-init") {
// Skip if `flow-connector-init` isn't available (yet). We're probably on CI.
// This test is useful as a sanity check for local development
// and we have plenty of other coverage during CI.
return;
}

*config_json = unseal::decrypt_sops(&sealed_config)
.await
.map_err(crate::anyhow_to_status)?
.to_string();
let request_rx = futures::stream::repeat(Ok(serde_json::from_value(json!({
"spec": {
"connectorType": "IMAGE",
"config": {
"image": "ghcr.io/estuary/source-http-ingest:dev",
"config": {},
}
}
}))
.unwrap()));

Ok(request)
}
});

let (request_rx, error_rx) = eof_on_error(request_rx);

// Start a capture RPC.
let container_response = proto_grpc::capture::connector_client::ConnectorClient::new(channel)
.capture(request_rx)
.await?;
let response_rx = container_response.into_inner();

// Adapt responses by enriching the first Response with the image Container.
let mut container = Some(container);
let response_rx = response_rx.and_then(move |mut response| {
_ = &guard; // Move so it's retained while responses are still being read.

if container.is_some() {
response
.set_internal(&mut bytes::BytesMut::new(), |internal| {
internal.container = container.take();
})
.unwrap();
}
futures::future::ready(Ok(response))
});
let response_rx = connector(ops::tracing_log_handler, "", request_rx.take(2), "a-task");

let responses: Vec<_> = response_rx.collect().await;
assert_eq!(responses.len(), 2);

for resp in responses {
let resp = resp.unwrap();

Ok(inject_error(response_rx, error_rx))
assert!(resp.spec.is_some());

let container = resp
.get_internal()
.expect("has internal field")
.expect("internal decodes")
.container
.expect("internal has attached container");

assert_eq!(
container.network_ports,
[proto_flow::flow::NetworkPort {
number: 8080,
protocol: String::new(),
public: true
}]
);
}
}
}
18 changes: 7 additions & 11 deletions crates/runtime/src/capture/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,13 @@ where
let request_rx = adjust_log_level(request_rx, self.set_log_level);

let response_rx = match endpoint {
models::CaptureEndpoint::Connector(models::ConnectorConfig { image, .. }) => {
image::image_connector(
image,
self.log_handler,
self.container_network,
request_rx,
&self.task_name,
)
.await?
.boxed()
}
models::CaptureEndpoint::Connector(models::ConnectorConfig { .. }) => image::connector(
self.log_handler,
&self.container_network,
request_rx,
&self.task_name,
)
.boxed(),
};

Ok(response_rx)
Expand Down
6 changes: 5 additions & 1 deletion crates/runtime/src/container.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Context;
use futures::channel::oneshot;
use proto_flow::{flow, ops, runtime};
use proto_flow::{flow, runtime};
use tokio::io::AsyncBufReadExt;

// Port on which flow-connector-init listens for requests.
Expand All @@ -16,6 +16,7 @@ const CONNECTOR_INIT_PORT: u16 = 49092;
pub async fn start<L>(
image: &str,
log_handler: L,
log_level: Option<ops::LogLevel>,
network: &str,
task_name: &str,
task_type: ops::TaskType,
Expand Down Expand Up @@ -68,6 +69,7 @@ where

// This is default `docker run` behavior if --network is not provided.
let network = if network == "" { "bridge" } else { network };
let log_level = log_level.unwrap_or(ops::LogLevel::Warn);

let mut process: async_process::Child = async_process::Command::new("docker")
.args([
Expand All @@ -92,6 +94,7 @@ where
),
// Thread-through the logging configuration of the connector.
"--env=LOG_FORMAT=json".to_string(),
format!("--env=LOG_LEVEL={}", log_level.as_str_name()),
// Cgroup memory / CPU resource limits.
// TODO(johnny): we intend to tighten these down further, over time.
"--memory=1g".to_string(),
Expand Down Expand Up @@ -343,6 +346,7 @@ mod test {
let (container, channel, _guard) = start(
"ghcr.io/estuary/source-http-ingest:dev",
ops::tracing_log_handler,
Some(ops::LogLevel::Debug),
"",
"a-task-name",
proto_flow::ops::TaskType::Capture,
Expand Down
Loading

0 comments on commit 15e9918

Please sign in to comment.