diff --git a/.github/workflows/derive-typescript.yaml b/.github/workflows/derive-typescript.yaml index 278cc9c8f3..ed61ed4276 100644 --- a/.github/workflows/derive-typescript.yaml +++ b/.github/workflows/derive-typescript.yaml @@ -49,6 +49,5 @@ jobs: run: docker build -t ghcr.io/estuary/derive-typescript:dev crates/derive-typescript/ - name: push docker image - # TODO(johnny): Uncomment once merged. - # if: ${{ github.ref == 'refs/heads/master' }} + if: ${{ github.ref == 'refs/heads/master' }} run: docker push ghcr.io/estuary/derive-typescript:dev diff --git a/crates/derive-typescript/Dockerfile b/crates/derive-typescript/Dockerfile index 2fc53597fc..a9dea4231a 100644 --- a/crates/derive-typescript/Dockerfile +++ b/crates/derive-typescript/Dockerfile @@ -1,4 +1,4 @@ -FROM denoland/deno:distroless-1.36.3 +FROM denoland/deno:distroless-1.39.4 COPY target/x86_64-unknown-linux-musl/release/derive-typescript / diff --git a/crates/flowctl/src/connector.rs b/crates/flowctl/src/connector.rs deleted file mode 100644 index 6c99da2489..0000000000 --- a/crates/flowctl/src/connector.rs +++ /dev/null @@ -1,124 +0,0 @@ -use anyhow::anyhow; -use anyhow::Context; -use futures::{stream, Stream, TryStream}; -use proto_flow::capture::{Request, Response}; -use proto_grpc::capture::connector_client::ConnectorClient; -use std::{ - fs, - pin::Pin, - process::{Child, Command, Output}, -}; -use tempfile::{tempdir, TempDir}; - -fn pull(image: &str) -> anyhow::Result { - Command::new("docker") - .args(["pull", image]) - .output() - .map_err(|e| e.into()) -} - -fn inspect(image: &str) -> anyhow::Result { - Command::new("docker") - .args(["inspect", image]) - .output() - .map_err(|e| e.into()) -} - -const CONNECTOR_INIT_PORT: u16 = 49092; - -pub fn docker_spawn(image: &str, args: &[&str], network: &str) -> anyhow::Result<(Child, TempDir, u16)> { - pull(image).context(format!("pulling {image}"))?; - - let inspect_output = inspect(image).context(format!("inspecting {image}"))?; - - let target_inspect = "/tmp/image-inspect.json"; - let dir = tempdir().context("creating temp directory")?; - let host_inspect = dir.path().join("image-inspect.json"); - let host_inspect_str = host_inspect.clone().into_os_string().into_string().unwrap(); - - fs::write(&host_inspect, inspect_output.stdout)?; - let host_connector_init = - locate_bin::locate("flow-connector-init").context("locating flow-connector-init")?; - let host_connector_init_str = host_connector_init.into_os_string().into_string().unwrap(); - let target_connector_init = "/tmp/connector_init"; - - let port = portpicker::pick_unused_port().expect("No ports free"); - - let child = Command::new("docker") - .args( - [ - &[ - "run", - "--rm", - "--entrypoint", - target_connector_init, - "--mount", - &format!( - "type=bind,source={host_connector_init_str},target={target_connector_init}" - ), - "--mount", - &format!("type=bind,source={host_inspect_str},target={target_inspect}"), - "--network", - network, - "--publish", - &format!("0.0.0.0:{}:{}/tcp", port, CONNECTOR_INIT_PORT), - image, - &format!("--image-inspect-json-path={target_inspect}"), - &format!("--port={CONNECTOR_INIT_PORT}"), - ], - args, - ] - .concat(), - ) - .spawn() - .context("spawning docker run child")?; - - Ok((child, dir, port)) -} - -async fn connector_client(port: u16) -> anyhow::Result> { - loop { - match ConnectorClient::connect(format!("tcp://127.0.0.1:{port}")).await { - Ok(client) => return Ok(client), - Err(_) => { - std::thread::sleep(std::time::Duration::from_millis(1000)); - continue; - } - }; - } -} - -pub async fn docker_run(image: &str, network: &str, req: Request) -> anyhow::Result { - let (_child, _dir, port) = docker_spawn(image, &[], network)?; - - let mut client = connector_client(port).await?; - - let mut response_stream = client.capture(stream::once(async { req })).await?; - - let response = response_stream.get_mut().message().await?; - - return response.ok_or(anyhow!("no response message")); -} - -pub async fn docker_run_stream( - image: &str, - network: &str, - stream: Pin + Send + Sync>>, -) -> anyhow::Result< - Pin, Ok = Response, Error = anyhow::Error>>>, -> { - let (_child, _dir, port) = docker_spawn(image, &[], network)?; - let mut client = connector_client(port).await?; - let response_stream = client.capture(stream).await?; - - Ok(Box::pin(stream::try_unfold( - response_stream, - |mut rs| async move { - if let Some(msg) = rs.get_mut().message().await? { - Ok(Some((msg, rs))) - } else { - Ok(None) - } - }, - ))) -} diff --git a/crates/runtime/src/container.rs b/crates/runtime/src/container.rs index 66212e7fb5..8f9c3876e4 100644 --- a/crates/runtime/src/container.rs +++ b/crates/runtime/src/container.rs @@ -465,7 +465,7 @@ async fn inspect_image_and_copy( } // TODO(johnny): Consider better packaging and versioning of `flow-connector-init`. -const CONNECTOR_INIT_IMAGE: &str = "ghcr.io/estuary/flow:v0.3.9-12-g1bf50ba62"; +const CONNECTOR_INIT_IMAGE: &str = "ghcr.io/estuary/flow:v0.3.11-60-gfc3f40ac5"; const CONNECTOR_INIT_IMAGE_PATH: &str = "/usr/local/bin/flow-connector-init"; #[cfg(test)]