Skip to content

Commit

Permalink
flowctl: support docker desktop and missing flow-connector-init
Browse files Browse the repository at this point in the history
When running on non-linux OS's, ask docker to publish all ports
(including the init port), then extract and pass around the mapping
of container ports to mapped host addresses.

Also fix a flow-connector-init startup race where it writes to stderr
before its port has been bound.
  • Loading branch information
jgraettinger committed Sep 12, 2023
1 parent 167b40a commit 30e150d
Show file tree
Hide file tree
Showing 7 changed files with 455 additions and 162 deletions.
25 changes: 19 additions & 6 deletions crates/connector-init/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::Context;
pub use codec::Codec;
use std::io::Write;
use tokio::signal::unix;
use tonic::transport::server::TcpIncoming;
use anyhow::anyhow;

mod capture;
mod codec;
Expand All @@ -26,8 +26,24 @@ pub struct Args {
pub port: u16,
}

pub async fn run(args: Args) -> anyhow::Result<()> {
let image = inspect::Image::parse_from_json_file(&args.image_inspect_json_path)
pub async fn run(
Args {
image_inspect_json_path,
port,
}: Args,
log_level: String,
) -> anyhow::Result<()> {
// Bind our port before we do anything else.
let addr = format!("0.0.0.0:{}", port).parse().unwrap();
let incoming = TcpIncoming::new(addr, true, None)
.map_err(|e| anyhow::anyhow!("tcp incoming error {}", e))?;

// Now write a byte to stderr to let our container host know that we're alive.
// Whitespace avoids interfering with JSON logs that also write to stderr.
std::io::stderr().write(" ".as_bytes()).unwrap();
tracing::info!(%log_level, port, message = "connector-init started");

let image = inspect::Image::parse_from_json_file(&image_inspect_json_path)
.context("reading image inspect JSON")?;
let entrypoint = image.get_argv()?;

Expand All @@ -36,9 +52,6 @@ pub async fn run(args: Args) -> anyhow::Result<()> {
_ => Codec::Proto,
};

let addr = format!("0.0.0.0:{}", args.port).parse().unwrap();
let incoming = TcpIncoming::new(addr, true, None).map_err(|e| anyhow!("tcp incoming error {}", e))?;

check_protocol(&entrypoint, codec).await?;

let capture = proto_grpc::capture::connector_server::ConnectorServer::new(capture::Proxy {
Expand Down
8 changes: 1 addition & 7 deletions crates/connector-init/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use clap::Parser;
use std::io::Write;
use tracing_subscriber::prelude::*;

fn main() {
// Write a byte to stderr to let our container host know that we're alive.
// Whitespace avoids interfering with JSON logs that also write to stderr.
std::io::stderr().write(" ".as_bytes()).unwrap();

let args = connector_init::Args::parse();

// Map the LOG_LEVEL variable to an equivalent tracing EnvFilter.
Expand Down Expand Up @@ -40,8 +35,7 @@ fn main() {
};

// Run until signaled, then gracefully stop.
tracing::info!(%log_level, port=args.port, message = "connector-init started");
let result = runtime.block_on(connector_init::run(args));
let result = runtime.block_on(connector_init::run(args, log_level));

// Explicitly call Runtime::shutdown_background as an alternative to calling Runtime::Drop.
// This shuts down the runtime without waiting for blocking background tasks to complete,
Expand Down
12 changes: 12 additions & 0 deletions crates/proto-flow/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,22 @@ pub struct RocksDbDescriptor {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Container {
/// IP Address of the running container.
/// If this IP is accessible (it may not be, in contexts like Docker Desktop for Mac),
/// then it is *only* accessible from the hosting server.
#[prost(string, tag = "1")]
pub ip_addr: ::prost::alloc::string::String,
/// Network ports which are available for this container.
#[prost(message, repeated, tag = "2")]
pub network_ports: ::prost::alloc::vec::Vec<super::flow::NetworkPort>,
/// Mapping of ports from `network_ports` to a corresponding "host-ip:port" address,
/// as either IPv4 or IPv6, through which the port can be accessed. If empty,
/// then the container `ip_addr` should be used directly.
#[prost(btree_map = "uint32, string", tag = "3")]
pub mapped_host_ports: ::prost::alloc::collections::BTreeMap<
u32,
::prost::alloc::string::String,
>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
Loading

0 comments on commit 30e150d

Please sign in to comment.