diff --git a/Cargo.lock b/Cargo.lock index f39624b317..d6ca586d29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1803,6 +1803,34 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" +[[package]] +name = "data-plane-controller" +version = "0.0.0" +dependencies = [ + "anyhow", + "async-process", + "automations", + "chrono", + "clap 4.5.17", + "futures", + "humantime", + "humantime-serde", + "ipnetwork", + "itertools 0.10.5", + "models", + "ops", + "rustls 0.23.10", + "serde", + "serde_json", + "serde_yaml", + "sqlx", + "tempfile", + "tokio", + "tracing", + "tracing-subscriber", + "url", +] + [[package]] name = "dbl" version = "0.3.2" @@ -3194,6 +3222,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "ipnetwork" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f84f1612606f3753f205a4e9a2efd6fe5b4c573a6269b2cc6c3003d44a0d127" +dependencies = [ + "serde", +] + [[package]] name = "iri-string" version = "0.6.0" @@ -6037,6 +6074,7 @@ dependencies = [ "hkdf", "hmac", "indexmap 1.9.3", + "ipnetwork", "itoa", "libc", "log", diff --git a/Cargo.toml b/Cargo.toml index c1b96f0b35..b563fd5cf6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" indexmap = { version = "1.8", features = ["serde"] } +ipnetwork = { version = "0.19", features = ["serde"] } iri-string = "0.6.0" jemallocator = { version = "0.3", features = ["profiling"] } jemalloc-ctl = "0.3" @@ -141,6 +142,7 @@ strum_macros = "0.24" superslice = "1.0" sqlx = { version = "0.6", features = [ "chrono", + "ipnetwork", "json", "macros", "postgres", diff --git a/crates/data-plane-controller/Cargo.toml b/crates/data-plane-controller/Cargo.toml new file mode 100644 index 0000000000..92925b3eae --- /dev/null +++ b/crates/data-plane-controller/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "data-plane-controller" +version.workspace = true +rust-version.workspace = true +edition.workspace = true +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true + +[dependencies] +async-process = { path = "../async-process" } +automations = { path = "../automations" } +models = { path = "../models" } +ops = { path = "../ops" } + +anyhow = { workspace = true } +chrono = { version = "0.4", features = ["serde"] } +clap = { workspace = true } +futures = { workspace = true } +humantime = { workspace = true } +humantime-serde = { workspace = true } +ipnetwork = { workspace = true } +itertools = { workspace = true } +rustls = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +sqlx = { workspace = true } +tempfile = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +url = { workspace = true } diff --git a/crates/data-plane-controller/src/controller.rs b/crates/data-plane-controller/src/controller.rs new file mode 100644 index 0000000000..5c35d5fa13 --- /dev/null +++ b/crates/data-plane-controller/src/controller.rs @@ -0,0 +1,875 @@ +use super::{run_cmd, stack}; +use crate::repo; +use anyhow::Context; +use itertools::{EitherOrBoth, Itertools}; +use std::collections::VecDeque; + +pub struct Controller { + pub logs_tx: super::logs::Tx, + pub repo: super::repo::Repo, + pub secrets_provider: String, + pub state_backend: url::Url, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub enum Message { + Start(models::Id), + Disable, + Enable, + Preview, + Refresh, + Converge, + // TODO(johnny): `Destroy` variant for managed tear-down. +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct State { + // DataPlane which this controller manages. + data_plane_id: models::Id, + // Git branch of the dry-dock repo for this data-plane. + deploy_branch: String, + // DateTime of the last `pulumi up` for this data-plane. + last_pulumi_up: chrono::DateTime, + // DateTime of the last `pulumi refresh` for this data-plane. + last_refresh: chrono::DateTime, + // Token to which controller logs are directed. + logs_token: sqlx::types::Uuid, + // Pulumi configuration for this data-plane. + stack: stack::PulumiStack, + // Name of the data-plane "stack" within the Pulumi tooling. + stack_name: String, + // Status of this controller. + status: Status, + + // Is this controller disabled? + // When disabled, refresh and converge operations are queued but not run. + #[serde(default, skip_serializing_if = "is_false")] + disabled: bool, + + // Is there a pending preview for this data-plane? + #[serde(default, skip_serializing_if = "is_false")] + pending_preview: bool, + // Is there a pending refresh for this data-plane? + #[serde(default, skip_serializing_if = "is_false")] + pending_refresh: bool, + // Is there a pending converge for this data-plane? + #[serde(default, skip_serializing_if = "is_false")] + pending_converge: bool, + + // When Some, updated Pulumi stack exports to be written back into the `data_planes` row. + #[serde(default, skip_serializing_if = "Option::is_none")] + publish_exports: Option, + // When true, an updated Pulumi stack model to be written back into the `data_planes` row. + #[serde(default, skip_serializing_if = "Option::is_none")] + publish_stack: Option, +} + +#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize)] +pub enum Status { + Idle, + /// Controller is setting the encryption key for Pulumi stack secrets. + SetEncryption, + /// Controller is previewing changes proposed by Pulumi without applying them. + PulumiPreview, + /// Controller is refreshing any remotely-changed resources, + /// such as replaced EC2 instances. + PulumiRefresh, + /// Controller is creating any scaled-up cloud resources, + /// updating DNS records for resources which are scaling down, + /// and updating the Ansible inventory. + PulumiUp1, + /// Controller is awaiting DNS propagation for any replaced resources + /// as well as resources which are scaling down. + AwaitDNS1, + /// Controller is running Ansible to initialize and refresh servers. + Ansible, + /// Controller is updating DNS records for resources which have now + /// started and is destroying any scaled-down cloud resources which + /// have now stopped. + PulumiUp2, + /// Controller is awaiting DNS propagation for any scaled-up + /// resources which have now started. + AwaitDNS2, +} + +#[derive(Debug)] +pub struct Outcome { + data_plane_id: models::Id, + task_id: models::Id, + sleep: std::time::Duration, + // Status to publish into data_planes row. + status: Status, + // When Some, stack exports to publish into data_planes row. + publish_exports: Option, + // When Some, updated configuration to publish into data_planes row. + publish_stack: Option, +} + +impl automations::Executor for Controller { + const TASK_TYPE: automations::TaskType = automations::TaskType(1); + + type Receive = Message; + type State = Option; + type Outcome = Outcome; + + #[tracing::instrument( + ret, + err(Debug, level = tracing::Level::ERROR), + skip_all, + fields(?task_id), + )] + async fn poll<'s>( + &'s self, + pool: &'s sqlx::PgPool, + task_id: models::Id, + _parent_id: Option, + state: &'s mut Self::State, + inbox: &'s mut VecDeque<(models::Id, Option)>, + ) -> anyhow::Result { + if state.is_none() { + self.on_start(pool, task_id, state, inbox).await?; + }; + let state = state.as_mut().unwrap(); + + let sleep = match state.status { + Status::Idle => self.on_idle(pool, task_id, state, inbox).await?, + Status::SetEncryption => self.on_set_encryption(state).await?, + Status::PulumiPreview => self.on_pulumi_preview(state).await?, + Status::PulumiRefresh => self.on_pulumi_refresh(state).await?, + Status::PulumiUp1 => self.on_pulumi_up_1(state).await?, + Status::AwaitDNS1 => self.on_await_dns_1(state).await?, + Status::Ansible => self.on_ansible(state).await?, + Status::PulumiUp2 => self.on_pulumi_up_2(state).await?, + Status::AwaitDNS2 => self.on_await_dns_2(state).await?, + }; + + // We publish an updated stack only when transitioning back to Idle. + let publish_stack = if matches!(state.status, Status::Idle) { + state.publish_stack.take() + } else { + None + }; + + Ok(Outcome { + data_plane_id: state.data_plane_id, + task_id, + sleep, + status: state.status, + publish_exports: state.publish_exports.take(), + publish_stack, + }) + } +} + +impl Controller { + async fn on_start( + &self, + pool: &sqlx::PgPool, + task_id: models::Id, + state: &mut Option, + inbox: &mut VecDeque<(models::Id, Option)>, + ) -> anyhow::Result<()> { + let data_plane_id = match inbox.pop_front() { + Some((_from_id, Some(Message::Start(data_plane_id)))) => data_plane_id, + message => { + anyhow::bail!("expected 'start' message, not {message:?}"); + } + }; + *state = Some(self.fetch_row_state(pool, task_id, data_plane_id).await?); + + Ok(()) + } + + #[tracing::instrument( + skip_all, + fields(data_plane_id = ?state.data_plane_id), + )] + async fn on_idle( + &self, + pool: &sqlx::PgPool, + task_id: models::Id, + state: &mut State, + inbox: &mut VecDeque<(models::Id, Option)>, + ) -> anyhow::Result { + // Handle received messages by clearing corresponding state + // to force the explicitly-requested action. + while let Some((from_id, message)) = inbox.pop_front() { + match message { + Some(Message::Disable) => state.disabled = true, + Some(Message::Enable) => state.disabled = false, + Some(Message::Preview) => state.pending_preview = true, + Some(Message::Refresh) => state.pending_refresh = true, + Some(Message::Converge) => state.pending_converge = true, + + message => anyhow::bail!( + "received unexpected message from {from_id} while idle: {message:?}" + ), + } + } + + // Refresh configuration from the current data_planes row. + let State { + deploy_branch: next_deploy_branch, + logs_token: next_logs_token, + stack: + stack::PulumiStack { + config: next_config, + encrypted_key: next_encrypted_key, + secrets_provider: next_secrets_provider, + }, + stack_name: next_stack_name, + .. + } = self + .fetch_row_state(pool, task_id, state.data_plane_id) + .await?; + + // Sanity check that variables which should not change, haven't. + if state.stack.encrypted_key != next_encrypted_key { + anyhow::bail!( + "pulumi stack encrypted key cannot change from {} to {next_encrypted_key}", + state.stack.encrypted_key, + ); + } + if state.stack.secrets_provider != next_secrets_provider { + anyhow::bail!( + "pulumi stack secrets provider cannot change from {} to {next_secrets_provider}", + state.stack.secrets_provider, + ); + } + if state.stack_name != next_stack_name { + anyhow::bail!( + "pulumi stack name cannot change from {} to {next_stack_name}", + state.stack_name + ); + } + if state.logs_token != next_logs_token { + anyhow::bail!( + "data-plane logs token cannot change from {} to {next_logs_token}", + state.logs_token + ); + } + if state.stack.config.model.gcp_project != next_config.model.gcp_project { + anyhow::bail!( + "pulumi stack gcp_project cannot change from {} to {}", + state.stack.config.model.gcp_project, + next_config.model.gcp_project, + ); + } + for (index, zipped) in state + .stack + .config + .model + .deployments + .iter() + .zip_longest(next_config.model.deployments.iter()) + .enumerate() + { + match zipped { + EitherOrBoth::Left(cur_deployment) => { + anyhow::bail!( + "cannot remove deployment {cur_deployment:?} at index {index}; scale it down with `desired` = 0 instead" + ); + } + EitherOrBoth::Right(next_deployment) => { + if next_deployment.current != 0 { + anyhow::bail!( + "new deployment {next_deployment:?} at index {index} must have `current` = 0; scale up using `desired` instead" + ); + } else if next_deployment.desired == 0 { + anyhow::bail!( + "new deployment {next_deployment:?} at index {index} must have `desired` > 0" + ); + } + } + EitherOrBoth::Both( + current @ stack::Deployment { + current: cur_current, + oci_image: cur_oci_image, + role: cur_role, + template: cur_template, + desired: _, // Allowed to change. + oci_image_override: _, // Allowed to change. + }, + next @ stack::Deployment { + current: next_current, + oci_image: next_oci_image, + role: next_role, + template: next_template, + desired: _, // Allowed to change. + oci_image_override: _, // Allowed to change. + }, + ) => { + if cur_current != next_current + || cur_oci_image != next_oci_image + || cur_role != next_role + || cur_template != next_template + { + anyhow::bail!( + "invalid transition of deployment at index {index} (you many only append new deployments or update `desired` or `oci_image_override` of this one): {current:?} =!=> {next:?}" + ); + } + } + } + } + + // Periodically perform a refresh to detect remote changes to resources. + if state.last_refresh + REFRESH_INTERVAL < chrono::Utc::now() { + state.pending_refresh = true; + } + // Changes to branch or stack configuration require a convergence pass. + if state.deploy_branch != next_deploy_branch { + state.deploy_branch = next_deploy_branch; + state.pending_converge = true; + } + if state.stack.config != next_config { + state.stack.config = next_config; + state.pending_converge = true; + } + + // Decide upon an action to take given the current `state`. + if state.stack.encrypted_key.is_empty() { + state.status = Status::SetEncryption; + Ok(POLL_AGAIN) + } else if state.pending_preview { + // We perform requested previews even when disabled. + state.status = Status::PulumiPreview; + state.pending_preview = false; + Ok(POLL_AGAIN) + } else if state.disabled { + // When disabled, we don't perform refresh or converge operations. + Ok(IDLE_INTERVAL) + } else if state.pending_refresh { + // Start a pending refresh operation. + state.status = Status::PulumiRefresh; + state.pending_refresh = false; + Ok(POLL_AGAIN) + } else if state.pending_converge { + // Start a pending convergence operation. + state.status = Status::PulumiUp1; + state.pending_converge = false; + Ok(POLL_AGAIN) + } else { + // We remain Idle. + Ok(IDLE_INTERVAL) + } + } + + #[tracing::instrument( + skip_all, + fields(data_plane_id = ?state.data_plane_id, logs = ?state.logs_token), + )] + async fn on_set_encryption(&self, state: &mut State) -> anyhow::Result { + let checkout = self.checkout(state).await?; + + () = run_cmd( + async_process::Command::new("pulumi") + .arg("stack") + .arg("change-secrets-provider") + .arg("--stack") + .arg(&state.stack_name) + .arg("--non-interactive") + .arg("--cwd") + .arg(&checkout.path()) + .arg(&self.secrets_provider) + .env("PULUMI_BACKEND_URL", self.state_backend.as_str()) + .env("PULUMI_CONFIG_PASSPHRASE", "") + .env("VIRTUAL_ENV", checkout.path().join("venv")), + "pulumi-change-secrets-provider", + &self.logs_tx, + state.logs_token, + ) + .await?; + + // Pulumi wrote an updated stack YAML. + // Parse it to extract the encryption key. + let updated = std::fs::read( + &checkout + .path() + .join(format!("Pulumi.{}.yaml", state.stack_name)), + ) + .context("failed to read stack YAML")?; + + let updated: stack::PulumiStack = + serde_yaml::from_slice(&updated).context("failed to parse stack from YAML")?; + + state.stack.secrets_provider = self.secrets_provider.clone(); + state.stack.encrypted_key = updated.encrypted_key; + state.publish_stack = Some(state.stack.clone()); + state.status = Status::Idle; + + Ok(POLL_AGAIN) + } + + #[tracing::instrument( + skip_all, + fields(data_plane_id = ?state.data_plane_id, logs = ?state.logs_token), + )] + async fn on_pulumi_preview(&self, state: &mut State) -> anyhow::Result { + let checkout = self.checkout(state).await?; + + () = run_cmd( + async_process::Command::new("pulumi") + .arg("preview") + .arg("--stack") + .arg(&state.stack_name) + .arg("--diff") + .arg("--non-interactive") + .arg("--cwd") + .arg(&checkout.path()) + .env("PULUMI_BACKEND_URL", self.state_backend.as_str()) + .env("VIRTUAL_ENV", checkout.path().join("venv")), + "pulumi-preview", + &self.logs_tx, + state.logs_token, + ) + .await?; + + state.status = Status::Idle; + + Ok(POLL_AGAIN) + } + + #[tracing::instrument( + skip_all, + fields(data_plane_id = ?state.data_plane_id, logs = ?state.logs_token), + )] + async fn on_pulumi_refresh(&self, state: &mut State) -> anyhow::Result { + let checkout = self.checkout(state).await?; + + // Refresh expecting to see no changes. We'll check exit status to see if there were. + let result = run_cmd( + async_process::Command::new("pulumi") + .arg("refresh") + .arg("--stack") + .arg(&state.stack_name) + .arg("--diff") + .arg("--non-interactive") + .arg("--skip-preview") + .arg("--cwd") + .arg(&checkout.path()) + .arg("--yes") + .arg("--expect-no-changes") + .env("PULUMI_BACKEND_URL", self.state_backend.as_str()) + .env("VIRTUAL_ENV", checkout.path().join("venv")), + "pulumi-refresh", + &self.logs_tx, + state.logs_token, + ) + .await; + + if matches!(&result, Err(err) if err.downcast_ref::().is_some()) { + // Run again, but this time allowing changes. + () = run_cmd( + async_process::Command::new("pulumi") + .arg("refresh") + .arg("--stack") + .arg(&state.stack_name) + .arg("--diff") + .arg("--non-interactive") + .arg("--skip-preview") + .arg("--cwd") + .arg(&checkout.path()) + .arg("--yes") + .env("PULUMI_BACKEND_URL", self.state_backend.as_str()) + .env("VIRTUAL_ENV", checkout.path().join("venv")), + "pulumi-refresh-changed", + &self.logs_tx, + state.logs_token, + ) + .await?; + + // We refreshed some changes, and must converge to (for example) + // provision a replaced EC2 instance. + state.pending_converge = true; + } else { + () = result?; + } + + state.status = Status::Idle; + state.last_refresh = chrono::Utc::now(); + + Ok(POLL_AGAIN) + } + + #[tracing::instrument( + skip_all, + fields(data_plane_id = ?state.data_plane_id, logs = ?state.logs_token), + )] + async fn on_pulumi_up_1(&self, state: &mut State) -> anyhow::Result { + let checkout = self.checkout(state).await?; + + () = run_cmd( + async_process::Command::new("pulumi") + .arg("up") + .arg("--stack") + .arg(&state.stack_name) + .arg("--diff") + .arg("--non-interactive") + .arg("--skip-preview") + .arg("--cwd") + .arg(&checkout.path()) + .arg("--yes") + .env("PULUMI_BACKEND_URL", self.state_backend.as_str()) + .env("VIRTUAL_ENV", checkout.path().join("venv")), + "pulumi-up-one", + &self.logs_tx, + state.logs_token, + ) + .await?; + + state.status = Status::AwaitDNS1; + state.last_pulumi_up = chrono::Utc::now(); + + Ok(DNS_TTL) + } + + #[tracing::instrument( + skip_all, + fields(data_plane_id = ?state.data_plane_id), + )] + async fn on_await_dns_1(&self, state: &mut State) -> anyhow::Result { + let remainder = (state.last_pulumi_up + DNS_TTL) - chrono::Utc::now(); + + if remainder > chrono::TimeDelta::zero() { + // PostgreSQL doesn't support nanosecond precision, so we must strip them. + Ok(std::time::Duration::from_micros( + remainder.num_microseconds().unwrap() as u64, + )) + } else { + state.status = Status::Ansible; + + Ok(POLL_AGAIN) + } + } + + #[tracing::instrument( + skip_all, + fields(data_plane_id = ?state.data_plane_id, logs = ?state.logs_token), + )] + async fn on_ansible(&self, state: &mut State) -> anyhow::Result { + let checkout = self.checkout(state).await?; + + // Load exported Pulumi state. + let output = async_process::output( + async_process::Command::new("pulumi") + .arg("stack") + .arg("output") + .arg("--stack") + .arg(&state.stack_name) + .arg("--json") + .arg("--non-interactive") + .arg("--show-secrets") + .arg("--cwd") + .arg(&checkout.path()) + .env("PULUMI_BACKEND_URL", self.state_backend.as_str()) + .env("VIRTUAL_ENV", checkout.path().join("venv")), + ) + .await?; + + if !output.status.success() { + anyhow::bail!( + "pulumi stack output failed: {}", + String::from_utf8_lossy(&output.stderr), + ); + } + + let stack::PulumiExports { + ansible, + mut control, + } = serde_json::from_slice(&output.stdout).context("failed to parse pulumi output")?; + + // Install Ansible requirements. + () = run_cmd( + async_process::Command::new(checkout.path().join("venv/bin/ansible-galaxy")) + .arg("install") + .arg("--role-file") + .arg("requirements.yml") + .current_dir(checkout.path()), + "ansible-install", + &self.logs_tx, + state.logs_token, + ) + .await?; + + // Write out Ansible inventory. + std::fs::write( + checkout.path().join("ansible-inventory.json"), + serde_json::to_vec_pretty(&ansible).context("failed to serialize ansible inventory")?, + ) + .context("failed to write ansible inventory")?; + + // Write out Ansible SSH key and set it to 0600. + // Ansible is sensitive about their being a trailing newline. + let ssh_key_path = checkout.path().join("ansible-ssh.key"); + let mut ssh_key = std::mem::take(&mut control.ssh_key); + + if !ssh_key.ends_with("\n") { + ssh_key.push('\n'); + } + + std::fs::write(&ssh_key_path, ssh_key).context("failed to write ansible SSH key")?; + std::fs::set_permissions( + ssh_key_path, + std::os::unix::fs::PermissionsExt::from_mode(0o600), + ) + .context("failed to set permissions of ansible SSH key")?; + + // Run the Ansible playbook. + () = run_cmd( + async_process::Command::new(checkout.path().join("venv/bin/ansible-playbook")) + .arg("data-plane.ansible.yaml") + .current_dir(checkout.path()) + .env("ANSIBLE_FORCE_COLOR", "1"), + "ansible-playbook", + &self.logs_tx, + state.logs_token, + ) + .await?; + + // Now that we've completed Ansible, all deployments are current and we + // can prune empty deployments. + state + .stack + .config + .model + .deployments + .retain_mut(|deployment| { + deployment.current = deployment.desired; + deployment.current != 0 + }); + + state.status = Status::PulumiUp2; + state.publish_exports = Some(control); + state.publish_stack = Some(state.stack.clone()); + + Ok(POLL_AGAIN) + } + + #[tracing::instrument( + skip_all, + fields(data_plane_id = ?state.data_plane_id, logs = ?state.logs_token), + )] + async fn on_pulumi_up_2(&self, state: &mut State) -> anyhow::Result { + let checkout = self.checkout(state).await?; + + () = run_cmd( + async_process::Command::new("pulumi") + .arg("up") + .arg("--stack") + .arg(&state.stack_name) + .arg("--diff") + .arg("--non-interactive") + .arg("--skip-preview") + .arg("--cwd") + .arg(&checkout.path()) + .arg("--yes") + .env("PULUMI_BACKEND_URL", self.state_backend.as_str()) + .env("VIRTUAL_ENV", checkout.path().join("venv")), + "pulumi-up-two", + &self.logs_tx, + state.logs_token, + ) + .await?; + + state.status = Status::AwaitDNS2; + state.last_pulumi_up = chrono::Utc::now(); + + Ok(DNS_TTL) + } + + #[tracing::instrument( + skip_all, + fields(data_plane_id = ?state.data_plane_id), + )] + async fn on_await_dns_2(&self, state: &mut State) -> anyhow::Result { + let remainder = (state.last_pulumi_up + DNS_TTL) - chrono::Utc::now(); + + if remainder > chrono::TimeDelta::zero() { + // PostgreSQL doesn't support nanosecond precision, so we must strip them. + Ok(std::time::Duration::from_micros( + remainder.num_microseconds().unwrap() as u64, + )) + } else { + state.status = Status::Idle; + + Ok(POLL_AGAIN) + } + } + + async fn fetch_row_state( + &self, + pool: &sqlx::PgPool, + task_id: models::Id, + data_plane_id: models::Id, + ) -> anyhow::Result { + let row = sqlx::query!( + r#" + SELECT + config AS "config: sqlx::types::Json", + deploy_branch AS "deploy_branch!", + logs_token, + data_plane_name, + data_plane_fqdn, + pulumi_key AS "pulumi_key", + pulumi_stack AS "pulumi_stack!" + FROM data_planes + WHERE id = $1 and controller_task_id = $2 + "#, + data_plane_id as models::Id, + task_id as models::Id, + ) + .fetch_one(pool) + .await + .context("failed to fetch data-plane row")?; + + let mut config = stack::PulumiStackConfig { + model: row.config.0, + }; + config.model.name = Some(row.data_plane_name); + config.model.fqdn = Some(row.data_plane_fqdn); + + let stack = if let Some(key) = row.pulumi_key { + stack::PulumiStack { + config, + secrets_provider: self.secrets_provider.clone(), + encrypted_key: key, + } + } else { + stack::PulumiStack { + config, + secrets_provider: "passphrase".to_string(), + encrypted_key: String::new(), + } + }; + + Ok(State { + data_plane_id, + deploy_branch: row.deploy_branch, + last_pulumi_up: chrono::DateTime::default(), + last_refresh: chrono::DateTime::default(), + logs_token: row.logs_token, + stack, + stack_name: row.pulumi_stack, + status: Status::Idle, + + disabled: true, + pending_preview: false, + pending_refresh: false, + pending_converge: false, + publish_exports: None, + publish_stack: None, + }) + } + + async fn checkout(&self, state: &State) -> anyhow::Result { + let checkout = self + .repo + .checkout(&self.logs_tx, state.logs_token, &state.deploy_branch) + .await?; + + // Write out stack YAML file for Pulumi CLI. + std::fs::write( + &checkout + .path() + .join(format!("Pulumi.{}.yaml", state.stack_name)), + serde_yaml::to_vec(&state.stack).context("failed to encode stack as YAML")?, + ) + .context("failed to write stack YAML")?; + + Ok(checkout) + } +} + +impl automations::Outcome for Outcome { + async fn apply<'s>( + self, + txn: &'s mut sqlx::PgConnection, + ) -> anyhow::Result { + sqlx::query!( + r#" + UPDATE data_planes SET + status = $3, + updated_at = NOW() + WHERE id = $1 AND controller_task_id = $2 + AND status IS DISTINCT FROM $3 + "#, + self.data_plane_id as models::Id, + self.task_id as models::Id, + format!("{:?}", self.status), + ) + .execute(&mut *txn) + .await + .context("failed to update status of data_planes row")?; + + if let Some(stack::PulumiStack { + config: stack::PulumiStackConfig { mut model }, + encrypted_key, + secrets_provider: _, + }) = self.publish_stack + { + // These fields are already implied by other row columns. + model.name = None; + model.fqdn = None; + + _ = sqlx::query!( + r#" + UPDATE data_planes SET + config = $3, + pulumi_key = $4 + WHERE id = $1 AND controller_task_id = $2 + "#, + self.data_plane_id as models::Id, + self.task_id as models::Id, + sqlx::types::Json(model) as sqlx::types::Json, + encrypted_key, + ) + .execute(&mut *txn) + .await + .context("failed to publish stack into data_planes row")?; + } + + if let Some(stack::ControlExports { + aws_iam_user_arn, + aws_link_endpoints, + cidr_blocks, + gcp_service_account_email, + hmac_keys, + ssh_key: _, + }) = self.publish_exports + { + _ = sqlx::query!( + r#" + UPDATE data_planes SET + aws_iam_user_arn = $3, + aws_link_endpoints = $4, + cidr_blocks = $5, + gcp_service_account_email = $6, + hmac_keys = $7 + WHERE id = $1 AND controller_task_id = $2 + "#, + self.data_plane_id as models::Id, + self.task_id as models::Id, + aws_iam_user_arn, + &aws_link_endpoints, + &cidr_blocks, + gcp_service_account_email, + &hmac_keys, + ) + .execute(&mut *txn) + .await + .context("failed to publish exports into data_planes row")?; + } + + Ok(automations::Action::Sleep(self.sleep)) + } +} + +const DNS_TTL: std::time::Duration = std::time::Duration::from_secs(5 * 60); +const IDLE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60); +const POLL_AGAIN: std::time::Duration = std::time::Duration::ZERO; +const REFRESH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(2 * 60 * 60); + +fn is_false(b: &bool) -> bool { + !b +} diff --git a/crates/data-plane-controller/src/lib.rs b/crates/data-plane-controller/src/lib.rs new file mode 100644 index 0000000000..ff392933b6 --- /dev/null +++ b/crates/data-plane-controller/src/lib.rs @@ -0,0 +1,206 @@ +use anyhow::Context; +use futures::{FutureExt, TryFutureExt}; + +mod controller; +mod logs; +mod repo; +mod stack; + +pub use controller::Controller; + +#[derive(clap::Parser, Debug, serde::Serialize)] +#[clap(author, version, about, long_about = None)] +pub struct Args { + /// URL of the postgres database. + #[clap( + long = "database", + env = "DPC_DATABASE_URL", + default_value = "postgres://postgres:postgres@127.0.0.1:5432/postgres" + )] + #[serde(skip_serializing)] + database_url: url::Url, + /// Path to CA certificate of the database. + #[clap(long = "database-ca", env = "DPC_DATABASE_CA")] + database_ca: Option, + /// Number of tasks which may be polled concurrently. + #[clap(long = "concurrency", env = "DPC_CONCURRENCY", default_value = "1")] + concurrency: u32, + /// Interval between polls for dequeue-able tasks when otherwise idle. + #[clap( + long = "dequeue-interval", + env = "DPC_DEQUEUE_INTERVAL", + default_value = "5s" + )] + #[serde(with = "humantime_serde")] + #[arg(value_parser = humantime::parse_duration)] + dequeue_interval: std::time::Duration, + /// Interval before a running task poll is presumed to have failed. + /// Tasks updated their heartbeats every half of this interval. + #[clap( + long = "heartbeat-timeout", + env = "DPC_HEARTBEAT_TIMEOUT", + default_value = "30s" + )] + #[serde(with = "humantime_serde")] + #[arg(value_parser = humantime::parse_duration)] + heartbeat_timeout: std::time::Duration, + /// Repository to clone for Pulumi and Ansible infrastructure. + #[clap( + long = "git-repo", + env = "DPC_GIT_REPO", + default_value = "git@github.com:estuary/est-dry-dock.git" + )] + git_repo: String, + /// Pulumi secrets provider for encryption of stack secrets. + #[clap( + long = "secrets-provider", + env = "DPC_SECRETS_PROVIDER", + default_value = "gcpkms://projects/estuary-control/locations/us-central1/keyRings/pulumi/cryptoKeys/state-secrets" + )] + secrets_provider: String, + /// Pulumi backend for storage of stack states. + #[clap( + long = "state-backend", + env = "DPC_STATE_BACKEND", + default_value = "gs://estuary-pulumi" + )] + state_backend: url::Url, +} + +pub async fn run(args: Args) -> anyhow::Result<()> { + let hostname = std::env::var("HOSTNAME").ok(); + let app_name = if let Some(hostname) = &hostname { + hostname.as_str() + } else { + "data-plane-controller" + }; + tracing::info!(args=?ops::DebugJson(&args), app_name, "started!"); + + let repo = repo::Repo::new(&args.git_repo); + + let mut pg_options = args + .database_url + .as_str() + .parse::() + .context("parsing database URL")? + .application_name(app_name); + + // If a database CA was provided, require that we use TLS with full cert verification. + if let Some(ca) = &args.database_ca { + pg_options = pg_options + .ssl_mode(sqlx::postgres::PgSslMode::VerifyFull) + .ssl_root_cert(ca); + } else { + // Otherwise, prefer TLS but don't require it. + pg_options = pg_options.ssl_mode(sqlx::postgres::PgSslMode::Prefer); + } + + let pg_pool = sqlx::postgres::PgPoolOptions::new() + .acquire_timeout(std::time::Duration::from_secs(5)) + .connect_with(pg_options) + .await + .context("connecting to database")?; + + let shutdown = async { + match tokio::signal::ctrl_c().await { + Ok(()) => { + tracing::info!("caught shutdown signal, stopping..."); + } + Err(err) => { + tracing::error!(?err, "error subscribing to shutdown signal"); + } + } + }; + + let (logs_tx, logs_rx) = tokio::sync::mpsc::channel(120); + let logs_sink = logs::serve_sink(pg_pool.clone(), logs_rx).map_err(|err| anyhow::anyhow!(err)); + + let server = automations::Server::new() + .register(controller::Controller { + logs_tx, + repo, + secrets_provider: args.secrets_provider, + state_backend: args.state_backend, + }) + .serve( + args.concurrency, + pg_pool, + args.dequeue_interval, + args.heartbeat_timeout, + shutdown, + ) + .map(|()| anyhow::Result::<()>::Ok(())); + + let ((), ()) = futures::try_join!(logs_sink, server)?; + + Ok(()) +} + +#[derive(Debug)] +struct NonZeroExit { + status: std::process::ExitStatus, + cmd: String, + logs_token: sqlx::types::Uuid, +} + +impl std::fmt::Display for NonZeroExit { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "command {} exited with status {:?} (logs token {})", + self.cmd, self.status, self.logs_token + ) + } +} + +async fn run_cmd( + cmd: &mut async_process::Command, + stream: &str, + logs_tx: &logs::Tx, + logs_token: sqlx::types::Uuid, +) -> anyhow::Result<()> { + cmd.stdin(std::process::Stdio::null()); + cmd.stderr(std::process::Stdio::piped()); + cmd.stdout(std::process::Stdio::piped()); + + logs_tx + .send(logs::Line { + token: logs_token, + stream: "controller".to_string(), + line: format!("Starting {stream}: {cmd:?}"), + }) + .await + .context("failed to send to logs sink")?; + + tracing::info!(?cmd, "starting command"); + + let mut child: async_process::Child = cmd.spawn()?.into(); + + let stdout = logs::capture_lines( + logs_tx, + format!("{stream}:0"), + logs_token, + child.stdout.take().unwrap(), + ); + let stderr = logs::capture_lines( + logs_tx, + format!("{stream}:1"), + logs_token, + child.stderr.take().unwrap(), + ); + + let ((), (), status) = futures::try_join!(stdout, stderr, child.wait())?; + + tracing::info!(?cmd, ?status, "command completed"); + + if !status.success() { + let err = NonZeroExit { + cmd: format!("{cmd:?}"), + logs_token, + status, + }; + Err(anyhow::anyhow!(err)) + } else { + Ok(()) + } +} diff --git a/crates/data-plane-controller/src/logs.rs b/crates/data-plane-controller/src/logs.rs new file mode 100644 index 0000000000..6518e54305 --- /dev/null +++ b/crates/data-plane-controller/src/logs.rs @@ -0,0 +1,129 @@ +use sqlx::types::{chrono, Uuid}; +use tokio::io::AsyncBufReadExt; + +// Line is a recorded log line. +#[derive(Debug)] +pub struct Line { + // Token which identifies the line's log set. + pub token: Uuid, + // Stream of this logged line. + pub stream: String, + // Contents of the line. + pub line: String, +} + +// Tx is the channel sender of log Lines. +pub type Tx = tokio::sync::mpsc::Sender; + +// capture_job_logs consumes newline-delimited lines from the AsyncRead and +// streams each as a Line to the channel Sender. +#[tracing::instrument(level = "debug", err, skip(tx, reader))] +pub async fn capture_lines( + tx: &Tx, + stream: String, + token: Uuid, + reader: R, +) -> Result<(), std::io::Error> +where + R: tokio::io::AsyncRead + Unpin, +{ + let mut splits = tokio::io::BufReader::new(reader).split(b'\n'); + while let Some(line) = splits.next_segment().await? { + // Attempt a direct conversion to String without a copy. + // Fall back to a lossy UTF8 replacement. + let line = String::from_utf8(line) + .unwrap_or_else(|err| String::from_utf8_lossy(err.as_bytes()).into_owned()); + + tx.send(Line { + token, + stream: stream.clone(), + line, + }) + .await + .unwrap(); + } + Ok(()) +} + +/// NULL is a perfectly valid thing to include in UTF8 bytes, but not according +/// to postgres. It rejects `TEXT` values containing nulls. This replaces all +/// null bytes with a space character. The space was chosen somewhat arbitrarily +/// as a goodenuf replacement in this rare edge case. +fn sanitize_null_bytes(line: String) -> String { + // Check to avoid copying the string if it doesn't contain the character. + if line.contains('\u{0000}') { + line.replace('\u{0000}', " ") + } else { + line + } +} + +// serve_sink consumes log Lines from the receiver, streaming each +// to the `logs` table of the database. +#[tracing::instrument(ret, skip_all)] +pub async fn serve_sink( + pg_pool: sqlx::PgPool, + mut rx: tokio::sync::mpsc::Receiver, +) -> sqlx::Result<()> { + // Lines, re-shaped into a columnar form for vectorized dispatch. + let mut tokens = Vec::new(); + let mut streams = Vec::new(); + let mut lines = Vec::new(); + let mut logged_at = Vec::new(); + + loop { + // Blocking read of the next line. + match rx.recv().await { + Some(Line { + token, + stream, + line, + }) => { + tokens.push(token); + streams.push(stream); + lines.push(sanitize_null_bytes(line)); + logged_at.push(chrono::Utc::now()); + } + None => { + return Ok(()); + } + } + + // Read additional ready lines without blocking. + while let Ok(Line { + token, + stream, + line, + }) = rx.try_recv() + { + tokens.push(token); + streams.push(stream); + lines.push(sanitize_null_bytes(line)); + + // Apply a total order to logs by incrementing logged_at with each line. + // Note that Postgres has microsecond resolution for timestamps. + logged_at.push(*logged_at.last().unwrap() + std::time::Duration::from_micros(1)); + } + + // Dispatch the vector of lines to the table. + let r = sqlx::query( + r#" + INSERT INTO internal.log_lines (token, stream, log_line, logged_at) + SELECT * FROM UNNEST($1, $2, $3, $4) + "#, + ) + .bind(&tokens) + .bind(&streams) + .bind(&lines) + .bind(&logged_at) + .execute(&pg_pool) + .await?; + + tracing::trace!(rows = ?r.rows_affected(), "inserted logs"); + + tokens.clear(); + streams.clear(); + lines.clear(); + logged_at.clear(); + } +} diff --git a/crates/data-plane-controller/src/main.rs b/crates/data-plane-controller/src/main.rs new file mode 100644 index 0000000000..52c80a9386 --- /dev/null +++ b/crates/data-plane-controller/src/main.rs @@ -0,0 +1,32 @@ +use clap::Parser; + +fn main() -> Result<(), anyhow::Error> { + // Required in order for libraries to use `rustls` for TLS. + // See: https://docs.rs/rustls/latest/rustls/crypto/struct.CryptoProvider.html + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .expect("failed to install default crypto provider"); + + // Use reasonable defaults for printing structured logs to stderr. + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_ansi(if matches!(std::env::var("NO_COLOR"), Ok(v) if v == "1") { + false + } else { + true + }) + .finish(); + tracing::subscriber::set_global_default(subscriber).expect("setting tracing default failed"); + + let args = data_plane_controller::Args::parse(); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + + let result = + runtime.block_on(runtime.spawn(async move { data_plane_controller::run(args).await })); + + runtime.shutdown_timeout(std::time::Duration::from_secs(5)); + result? +} diff --git a/crates/data-plane-controller/src/repo.rs b/crates/data-plane-controller/src/repo.rs new file mode 100644 index 0000000000..5207de29e1 --- /dev/null +++ b/crates/data-plane-controller/src/repo.rs @@ -0,0 +1,147 @@ +use super::{logs, run_cmd}; +use anyhow::Context; +use std::sync::Arc; + +pub struct Repo { + inner: Arc, +} + +struct Inner { + repo: String, + idle: std::sync::Mutex>, +} + +pub struct Checkout { + dir: Option, + inner: Arc, +} + +impl Repo { + pub fn new(repo: &str) -> Self { + Self { + inner: std::sync::Arc::new(Inner { + repo: repo.to_string(), + idle: Default::default(), + }), + } + } + + pub async fn checkout( + &self, + logs_tx: &logs::Tx, + logs_token: sqlx::types::Uuid, + branch: &str, + ) -> anyhow::Result { + // Attempt to obtain a pre-created checkout, or clone a new one. + let dir = self.inner.idle.lock().unwrap().pop(); + let dir = if let Some(dir) = dir { + () = run_cmd( + async_process::Command::new("git") + .arg("clean") + .arg("--force") + .current_dir(dir.path()), + "git-clean", + logs_tx, + logs_token, + ) + .await?; + + dir + } else { + self.create_clone(logs_tx, logs_token).await? + }; + + () = run_cmd( + async_process::Command::new("git") + .arg("fetch") + .current_dir(dir.path()), + "git-fetch", + logs_tx, + logs_token, + ) + .await?; + + () = run_cmd( + async_process::Command::new("git") + .arg("checkout") + .arg("--detach") + .arg("--quiet") + .arg(format!("origin/{branch}")) + .current_dir(dir.path()), + "git-checkout", + logs_tx, + logs_token, + ) + .await?; + + () = run_cmd( + async_process::Command::new("poetry") + .arg("install") + .current_dir(dir.path()) + .env("VIRTUAL_ENV", dir.path().join("venv")) + .env("PYTHON_KEYRING_BACKEND", "keyring.backends.null.Keyring"), + "poetry-install", + &logs_tx, + logs_token, + ) + .await?; + + tracing::info!(branch, dir=?dir.path(), "prepared checkout"); + + Ok(Checkout { + inner: self.inner.clone(), + dir: Some(dir), + }) + } + + async fn create_clone( + &self, + logs_tx: &logs::Tx, + logs_token: sqlx::types::Uuid, + ) -> anyhow::Result { + let dir = tempfile::TempDir::with_prefix(format!("dpc_checkout_")) + .context("failed to create temp directory")?; + + () = run_cmd( + async_process::Command::new("git") + .arg("clone") + .arg(&self.inner.repo) + .arg(dir.path()), + "git-clone", + logs_tx, + logs_token, + ) + .await?; + + () = run_cmd( + async_process::Command::new("python3.12") + .arg("-m") + .arg("venv") + .arg(dir.path().join("venv")), + "python-venv", + logs_tx, + logs_token, + ) + .await?; + + tracing::info!(repo=self.inner.repo, dir=?dir.path(), "created repo clone"); + + Ok(dir) + } +} + +impl Checkout { + pub fn path(&self) -> &std::path::Path { + self.dir.as_ref().unwrap().path() + } +} + +impl Drop for Checkout { + fn drop(&mut self) { + self.inner + .idle + .lock() + .unwrap() + .push(self.dir.take().unwrap()); + } +} diff --git a/crates/data-plane-controller/src/stack.rs b/crates/data-plane-controller/src/stack.rs new file mode 100644 index 0000000000..0ec32aa92a --- /dev/null +++ b/crates/data-plane-controller/src/stack.rs @@ -0,0 +1,114 @@ +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct PulumiStack { + #[serde(rename = "secretsprovider")] + pub secrets_provider: String, + #[serde( + default, + rename = "encryptedkey", + skip_serializing_if = "String::is_empty" + )] + pub encrypted_key: String, + pub config: PulumiStackConfig, +} + +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct PulumiStackConfig { + #[serde(rename = "est-dry-dock:model")] + pub model: DataPlane, +} + +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct DataPlane { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub name: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub fqdn: Option, + + pub builds_root: url::Url, + pub builds_kms_keys: Vec, + pub control_plane_api: url::Url, + pub data_buckets: Vec, + pub gcp_project: String, + pub ssh_subnets: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub private_links: Vec, + pub deployments: Vec, +} + +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct AWSPrivateLink { + pub region: String, + pub az_ids: Vec, + pub service_name: String, +} + +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Role { + Etcd, + Gazette, + Reactor, +} + +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct Deployment { + pub role: Role, + pub template: serde_json::Value, + pub oci_image: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub oci_image_override: Option, + pub desired: usize, + pub current: usize, +} + +#[derive(Debug, serde::Deserialize)] +pub struct PulumiExports { + pub ansible: AnsibleInventory, + pub control: ControlExports, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct AnsibleInventory { + pub all: AnsibleInventoryAll, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct AnsibleInventoryAll { + pub children: std::collections::BTreeMap, + pub vars: std::collections::BTreeMap, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct AnsibleRole { + pub hosts: std::collections::BTreeMap, + pub vars: std::collections::BTreeMap, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct AnsibleHost { + pub ansible_host: std::net::Ipv6Addr, + pub ansible_user: String, + pub host_fqdn: String, + pub local_cert_pem: String, + pub local_private_key_pem: String, + pub oci_image: String, + pub private_ip4: Option, + pub provider: String, + pub public_ip4: std::net::Ipv4Addr, + pub public_ip6: std::net::Ipv6Addr, + pub role: String, + pub role_fqdn: String, + pub starting: bool, + pub stopping: bool, + pub zone: String, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct ControlExports { + pub aws_iam_user_arn: String, + pub aws_link_endpoints: Vec, + pub cidr_blocks: Vec, + pub gcp_service_account_email: String, + pub hmac_keys: Vec, + pub ssh_key: String, +}