diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 02fcdee2c9227..fe17b1ddccdbf 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -265,21 +265,49 @@ jobs: - run: make slim-builds - run: make test-integration-splunk - test-integration-kubernetes: - name: Integration - Linux, Kubernetes, flaky - # This is an experimental test. Allow it to fail without failing the whole - # workflow, but keep it executing on every build to gather stats. - continue-on-error: true + test-e2e-kubernetes: + name: E2E - K8s ${{ matrix.kubernetes_version }} / ${{ matrix.container_runtime }} runs-on: ubuntu-latest strategy: matrix: - kubernetes: - - v1.18.2 - - v1.17.5 - - v1.16.9 - - v1.15.11 - - v1.14.10 + minikube_version: + - 'v1.11.0' # https://github.com/kubernetes/minikube/issues/8799 + kubernetes_version: + - 'v1.18.6' + - 'v1.17.9' + - 'v1.16.12' # v1.16.13 is broken, see https://github.com/kubernetes/minikube/issues/8840 + - 'v1.15.12' + - 'v1.14.10' + container_runtime: + - docker + - containerd + - crio fail-fast: false steps: - - name: Temporarily off - run: "true" + - name: Setup Minikube + run: | + set -xeuo pipefail + + curl -Lo kubectl \ + 'https://storage.googleapis.com/kubernetes-release/release/${{ matrix.kubernetes_version }}/bin/linux/amd64/kubectl' + sudo install kubectl /usr/local/bin/ + + curl -Lo minikube \ + 'https://storage.googleapis.com/minikube/releases/${{ matrix.minikube_version }}/minikube-linux-amd64' + sudo install minikube /usr/local/bin/ + + minikube config set profile minikube + minikube config set vm-driver docker + minikube config set kubernetes-version '${{ matrix.kubernetes_version }}' + minikube config set container-runtime '${{ matrix.container_runtime }}' + # Start minikube, try again once if fails and print logs if the second + # attempt fails too. + minikube start || minikube delete && minikube start || minikube logs + kubectl cluster-info + - name: Checkout + uses: actions/checkout@v1 + - run: USE_CONTAINER=none make slim-builds + - run: make test-e2e-kubernetes + env: + USE_MINIKUBE_CACHE: "true" + PACKAGE_DEB_USE_CONTAINER: docker diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 806e324db6729..39644112c0642 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -37,6 +37,8 @@ expanding into more specifics. 1. [Benchmarking](#benchmarking) 1. [Profiling](#profiling) 1. [Kubernetes](#kubernetes) + 1. [Dev flow](#kubernetes-dev-flow) + 1. [E2E tests](#kubernetes-e2e-tests) 1. [Humans](#humans) 1. [Documentation](#documentation) 1. [Changelog](#changelog) @@ -550,13 +552,15 @@ navigated in your favorite web browser. ### Kubernetes +#### Kubernetes Dev Flow + There is a special flow for when you develop portions of Vector that are designed to work with Kubernetes, like `kubernetes_logs` source or the `deployment/kubernetes/*.yaml` configs. This flow facilitates building Vector and deploying it into a cluster. -#### Requirements +##### Requirements There are some extra requirements besides what you'd normally need to work on Vector: @@ -570,7 +574,7 @@ Vector: * [`minikube`](https://minikube.sigs.k8s.io/)-powered or other k8s cluster * [`cargo watch`](https://github.com/passcod/cargo-watch) -#### The dev flow +##### The dev flow Once you have the requirements, use the `scripts/skaffold.sh dev` command. @@ -596,7 +600,7 @@ the cluster state and exit. `scripts/skaffold.sh` wraps `skaffold`, you can use other `skaffold` subcommands if it fits you better. -#### Troubleshooting +##### Troubleshooting You might need to tweak `skaffold`, here are some hints: @@ -614,7 +618,7 @@ You might need to tweak `skaffold`, here are some hints: * For the rest of the `skaffold` tweaks you might want to apply check out [this page](https://skaffold.dev/docs/environment/). -#### Going through the dev flow manually +##### Going through the dev flow manually Is some cases `skaffold` may not work. It's possible to go through the dev flow manually, without `skaffold`. @@ -627,6 +631,90 @@ required. Essentially, the steps you have to take to deploy manually are the same that `skaffold` will perform, and they're outlined at the previous section. +#### Kubernetes E2E tests + +Kubernetes integration has a lot of parts that can go wrong. + +To cope with the complexity and ensure we maintain high quality, we use +E2E (end-to-end) tests. + +> E2E tests normally run at CI, so there's typically no need to run them +> manually. + +##### Requirements + +* `kubernetes` cluster (`minikube` has special support, but any cluster should + work) +* `docker` +* `kubectl` +* `bash` + +Vector release artifacts are prepared for E2E tests, so the ability to do that +is required too, see Vector [docs](https://vector.dev) for more details. + +> Note: `minikube` has a bug in the latest versions that affects our test +> process - see https://github.com/kubernetes/minikube/issues/8799. +> Use version `1.11.0` for now. + +> Note: `minikube` has troubles running on ZFS systems. If you're using ZFS, we +> suggest using a cloud cluster or [`minik8s`](https://microk8s.io/) with local +> registry. + +##### Running the E2E tests + +To run the E2E tests, use the following command: + +```shell +CONTAINER_IMAGE_REPO=/vector-test make test-e2e-kubernetes +``` + +Where `CONTAINER_IMAGE_REPO` is the docker image repo name to use, without part +after the `:`. Replace `` with your Docker Hub username. + +You can also pass additional parameters to adjust the behavior of the test: + +* `QUICK_BUILD=true` - use development build and a skaffold image from the dev + flow instead of a production docker image. Significantly speeds up the + preparation process, but doesn't guarantee the correctness in the release + build. Useful for development of the tests or Vector code to speed up the + iteration cycles. + +* `USE_MINIKUBE_CACHE=true` - instead of pushing the built docker image to the + registry under the specified name, directly load the image into + a `minikube`-controlled cluster node. + Requires you to test against a `minikube` cluster. Eliminates the need to have + a registry to run tests. + When `USE_MINIKUBE_CACHE=true` is set, we provide a default value for the + `CONTAINER_IMAGE_REPO` so it can be omitted. + Can be set to `auto` (default) to automatically detect whether to use + `minikube cache` or not, based on the current `kubectl` context. To opt-out, + set `USE_MINIKUBE_CACHE=false`. + +* `CONTAINER_IMAGE=/vector-test:tag` - completely skip the step + of building the Vector docker image, and use the specified image instead. + Useful to speed up the iterations speed when you already have a Vector docker + image you want to test against. + +* `SKIP_CONTAINER_IMAGE_PUBLISHING=true` - completely skip the image publishing + step. Useful when you want to speed up the iteration speed and when you know + the Vector image you want to test is already available to the cluster you're + testing against. + +* `SCOPE` - pass a filter to the `cargo test` command to filter out the tests, + effectively equivalent to `cargo test -- $SCOPE`. + +Passing additional commands is done like so: + +```shell +QUICK_BUILD=true USE_MINIKUBE_CACHE=true make test-e2e-kubernetes +``` + +or + +```shell +QUICK_BUILD=true CONTAINER_IMAGE_REPO=/vector-test make test-e2e-kubernetes +``` + ## Humans After making your change, you'll want to prepare it for Vector's users diff --git a/Cargo.lock b/Cargo.lock index ba94114a33ee2..e037183707f35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1887,6 +1887,17 @@ dependencies = [ "url 2.1.1", ] +[[package]] +name = "k8s-test-framework" +version = "0.1.0" +dependencies = [ + "k8s-openapi", + "once_cell", + "serde_json", + "tempfile", + "tokio 0.2.21", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -5443,6 +5454,7 @@ dependencies = [ "inventory", "jemallocator", "k8s-openapi", + "k8s-test-framework", "lazy_static 1.4.0", "leveldb", "libc", diff --git a/Cargo.toml b/Cargo.toml index 167b7fd14805e..bd3449c257244 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ members = [ "lib/file-source", "lib/tracing-limit", "lib/vector-wasm", + "lib/k8s-test-framework", ] [dependencies] @@ -195,6 +196,7 @@ tokio-test = "0.2" tokio = { version = "0.2", features = ["test-util"] } assert_cmd = "1.0" reqwest = { version = "0.10.6", features = ["json"] } +k8s-test-framework = { version = "0.1", path = "lib/k8s-test-framework" } [features] # Default features for *-unknown-linux-gnu and *-apple-darwin @@ -431,11 +433,13 @@ kafka-integration-tests = ["sources-kafka", "sinks-kafka"] loki-integration-tests = ["sinks-loki"] pulsar-integration-tests = ["sinks-pulsar"] splunk-integration-tests = ["sinks-splunk_hec", "warp"] -kubernetes-integration-tests = ["sources-kubernetes-logs"] shutdown-tests = ["sources","sinks-console","sinks-prometheus","sinks-blackhole","unix","rdkafka","transforms-log_to_metric","transforms-lua"] disable-resolv-conf = [] +# E2E tests +kubernetes-e2e-tests = ["k8s-openapi"] + [[bench]] name = "bench" harness = false @@ -453,5 +457,9 @@ name = "wasm" harness = false required-features = ["transforms-wasm", "transforms-lua"] +[[test]] +name = "kubernetes-e2e" +required-features = ["kubernetes-e2e-tests"] + [patch.'https://github.com/tower-rs/tower'] tower-layer = "0.3" diff --git a/Makefile b/Makefile index a8267addefae1..822ca2bb7aec3 100644 --- a/Makefile +++ b/Makefile @@ -281,9 +281,9 @@ ifeq ($(AUTODESPAWN), true) ${MAYBE_ENVIRONMENT_EXEC} $(CONTAINER_TOOL)-compose stop endif -PACKAGE_DEB_USE_CONTAINER ?= "$(USE_CONTAINER)" -test-integration-kubernetes: ## Runs Kubernetes integration tests (Sorry, no `ENVIRONMENT=true` support) - PACKAGE_DEB_USE_CONTAINER="$(PACKAGE_DEB_USE_CONTAINER)" USE_CONTAINER=none $(RUN) test-integration-kubernetes +PACKAGE_DEB_USE_CONTAINER ?= $(USE_CONTAINER) +test-e2e-kubernetes: ## Runs Kubernetes E2E tests (Sorry, no `ENVIRONMENT=true` support) + PACKAGE_DEB_USE_CONTAINER="$(PACKAGE_DEB_USE_CONTAINER)" scripts/test-e2e-kubernetes.sh test-shutdown: ## Runs shutdown tests ifeq ($(AUTOSPAWN), true) diff --git a/distribution/kubernetes/vector-namespaced.yaml b/distribution/kubernetes/vector-namespaced.yaml index 6e52aaba2ff7f..c92e2bc33c5cb 100644 --- a/distribution/kubernetes/vector-namespaced.yaml +++ b/distribution/kubernetes/vector-namespaced.yaml @@ -49,6 +49,11 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + # Set a reasonable log level to avoid issues with internal logs + # overwriting console output at E2E tests. Feel free to change at + # a real deployment. + - name: LOG + value: info volumeMounts: - name: var-log mountPath: /var/log/ diff --git a/kustomization.yaml b/kustomization.yaml index f8145e36474c6..0ad86d586077d 100644 --- a/kustomization.yaml +++ b/kustomization.yaml @@ -8,3 +8,6 @@ resources: - skaffold/manifests/namespace.yaml - skaffold/manifests/config.yaml - distribution/kubernetes/vector-namespaced.yaml + +patchesStrategicMerge: + - skaffold/manifests/patches/env.yaml diff --git a/lib/k8s-test-framework/Cargo.toml b/lib/k8s-test-framework/Cargo.toml new file mode 100644 index 0000000000000..7ec283a159c83 --- /dev/null +++ b/lib/k8s-test-framework/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "k8s-test-framework" +version = "0.1.0" +authors = ["Vector Contributors "] +edition = "2018" +description = "Kubernetes Test Framework used to test Vector in Kubernetes" + +[dependencies] +k8s-openapi = { version = "0.9", default-features = false, features = ["v1_15"] } +serde_json = "1" +tempfile = "3" +once_cell = "1" +tokio = { version = "0.2", features = ["process", "io-util"] } + +[dev-dependencies] +tokio = { version = "0.2", features = ["macros", "rt-threaded"] } diff --git a/lib/k8s-test-framework/src/exec_tail.rs b/lib/k8s-test-framework/src/exec_tail.rs new file mode 100644 index 0000000000000..076392b108ff5 --- /dev/null +++ b/lib/k8s-test-framework/src/exec_tail.rs @@ -0,0 +1,32 @@ +//! Perform a log lookup. + +use super::{Reader, Result}; +use std::process::Stdio; +use tokio::process::Command; + +/// Exec a `tail` command reading the specified `file` within a `Container` +/// in a `Pod` of a specified `resource` at the specified `namespace` via the +/// specified `kubectl_command`. +/// Returns a [`Reader`] that managed the reading process. +pub fn exec_tail( + kubectl_command: &str, + namespace: &str, + resource: &str, + file: &str, +) -> Result { + let mut command = Command::new(kubectl_command); + + command.stdin(Stdio::null()).stderr(Stdio::inherit()); + + command.arg("exec"); + command.arg("-n").arg(namespace); + command.arg(resource); + command.arg("--"); + command.arg("tail"); + command.arg("--follow=name"); + command.arg("--retry"); + command.arg(file); + + let reader = Reader::spawn(command)?; + Ok(reader) +} diff --git a/lib/k8s-test-framework/src/framework.rs b/lib/k8s-test-framework/src/framework.rs new file mode 100644 index 0000000000000..1e1c099fff500 --- /dev/null +++ b/lib/k8s-test-framework/src/framework.rs @@ -0,0 +1,117 @@ +//! The test framework main entry point. + +use super::{ + exec_tail, log_lookup, namespace, test_pod, up_down, vector, wait_for_resource, + wait_for_rollout, Interface, Reader, Result, +}; + +/// Framework wraps the interface to the system with an easy-to-use rust API +/// optimized for implementing test cases. +#[derive(Debug)] +pub struct Framework { + interface: Interface, +} + +impl Framework { + /// Create a new [`Framework`] powered by the passed interface. + pub fn new(interface: Interface) -> Self { + Self { interface } + } + + /// Deploy `vector` into a cluster. + pub async fn vector( + &self, + namespace: &str, + custom_resource: &str, + ) -> Result> { + let mut manager = vector::manager( + self.interface.deploy_vector_command.as_str(), + namespace, + custom_resource, + )?; + manager.up().await?; + Ok(manager) + } + + /// Create a new namespace. + pub async fn namespace( + &self, + namespace: &str, + ) -> Result> { + let mut manager = namespace::manager(&self.interface.kubectl_command, namespace); + manager.up().await?; + Ok(manager) + } + + /// Create a new test `Pod`. + pub async fn test_pod( + &self, + config: test_pod::Config, + ) -> Result> { + let mut manager = test_pod::manager(&self.interface.kubectl_command, config); + manager.up().await?; + Ok(manager) + } + + /// Initialize log lookup for a particular `resource` in a particular + /// `namespace`. + pub fn logs(&self, namespace: &str, resource: &str) -> Result { + log_lookup(&self.interface.kubectl_command, namespace, resource) + } + + /// Exec a `tail -f` command reading the specified `file` within + /// a `Container` in a `Pod` of a specified `resource` at the specified + /// `namespace`. + pub fn exec_tail(&self, namespace: &str, resource: &str, file: &str) -> Result { + exec_tail(&self.interface.kubectl_command, namespace, resource, file) + } + + /// Wait for a set of `resources` in a specified `namespace` to acheive + /// `wait_for` state. + /// Use `extra` to pass additional arguments to `kubectl`. + pub async fn wait<'a>( + &self, + namespace: &str, + resources: impl IntoIterator, + wait_for: wait_for_resource::WaitFor<&'_ str>, + extra: impl IntoIterator, + ) -> Result<()> { + wait_for_resource::namespace( + &self.interface.kubectl_command, + namespace, + resources, + wait_for, + extra, + ) + .await + } + + /// Wait for a set of `resources` in any namespace to acheive `wait_for` + /// state. + /// Use `extra` to pass additional arguments to `kubectl`. + pub async fn wait_all_namespaces<'a>( + &self, + resources: impl IntoIterator, + wait_for: wait_for_resource::WaitFor<&'_ str>, + extra: impl IntoIterator, + ) -> Result<()> { + wait_for_resource::all_namespaces( + &self.interface.kubectl_command, + resources, + wait_for, + extra, + ) + .await + } + + /// Wait for a rollout of a `resource` to complete. + /// Use `extra` to pass additional arguments to `kubectl`. + pub async fn wait_for_rollout<'a>( + &self, + namespace: &str, + resource: &str, + extra: impl IntoIterator, + ) -> Result<()> { + wait_for_rollout::run(&self.interface.kubectl_command, namespace, resource, extra).await + } +} diff --git a/lib/k8s-test-framework/src/interface.rs b/lib/k8s-test-framework/src/interface.rs new file mode 100644 index 0000000000000..0b305e5b99f59 --- /dev/null +++ b/lib/k8s-test-framework/src/interface.rs @@ -0,0 +1,27 @@ +//! An interface into the system. + +use std::env; + +/// An interface between the test framework and external CLI commands and test +/// utilities. +#[derive(Debug)] +pub struct Interface { + /// A command used to deploy `vector` into the kubernetes cluster and + /// delete if from there. + pub deploy_vector_command: String, + + /// A `kubectl` command used for generic cluster interaction. + pub kubectl_command: String, +} + +impl Interface { + /// Create a new [`Interface`] instance with the parameters obtained from + /// the process environment. + pub fn from_env() -> Option { + Some(Self { + deploy_vector_command: env::var("KUBE_TEST_DEPLOY_COMMAND").ok()?, + kubectl_command: env::var("VECTOR_TEST_KUBECTL") + .unwrap_or_else(|_| "kubectl".to_owned()), + }) + } +} diff --git a/lib/k8s-test-framework/src/lib.rs b/lib/k8s-test-framework/src/lib.rs new file mode 100644 index 0000000000000..eafdd10af9215 --- /dev/null +++ b/lib/k8s-test-framework/src/lib.rs @@ -0,0 +1,42 @@ +//! Kubernetes test framework. +//! +//! The main goal of the design of this test framework is to wire kubernetes +//! components testing through the same tools that are available to the +//! developer as executable commands, rather than using a rust interface to talk +//! to k8s cluster directly. +//! This enables very trivial troubleshooting and allows us to use the same +//! deployment mechanisms that we use for prodcution - effectively giving us +//! the opportunity to test e2e: not just the code layer, but also the +//! deployment configuration. + +#![deny( + missing_debug_implementations, + missing_copy_implementations, + missing_docs +)] + +mod exec_tail; +pub mod framework; +pub mod interface; +mod lock; +mod log_lookup; +pub mod namespace; +mod reader; +mod resource_file; +pub mod test_pod; +mod up_down; +mod util; +pub mod vector; +pub mod wait_for_resource; +pub mod wait_for_rollout; + +// Re-export some unit for trivial accessibility. + +use exec_tail::exec_tail; +pub use framework::Framework; +pub use interface::Interface; +pub use lock::lock; +use log_lookup::log_lookup; +pub use reader::Reader; + +type Result = std::result::Result>; diff --git a/lib/k8s-test-framework/src/lock.rs b/lib/k8s-test-framework/src/lock.rs new file mode 100644 index 0000000000000..ecec583c33039 --- /dev/null +++ b/lib/k8s-test-framework/src/lock.rs @@ -0,0 +1,15 @@ +use once_cell::sync::OnceCell; +use std::sync::{Mutex, MutexGuard}; + +/// A shared lock to use commonly among the tests. +/// The goal is to guranatee that only one test is executing concurrently, since +/// tests use a shared resource - a k8s cluster - and will conflict with each +/// other unless they're executing sequentially. +pub fn lock() -> MutexGuard<'static, ()> { + static INSTANCE: OnceCell> = OnceCell::new(); + match INSTANCE.get_or_init(|| Mutex::new(())).lock() { + Ok(guard) => guard, + // Ignore poison error. + Err(err) => err.into_inner(), + } +} diff --git a/lib/k8s-test-framework/src/log_lookup.rs b/lib/k8s-test-framework/src/log_lookup.rs new file mode 100644 index 0000000000000..6906e5b7058fb --- /dev/null +++ b/lib/k8s-test-framework/src/log_lookup.rs @@ -0,0 +1,22 @@ +//! Perform a log lookup. + +use super::{Reader, Result}; +use std::process::Stdio; +use tokio::process::Command; + +/// Initiate a log lookup (`kubectl log`) with the specified `kubectl_command` +/// for the specified `resource` at the specified `namespace`. +/// Returns a [`Reader`] that managed the reading process. +pub fn log_lookup(kubectl_command: &str, namespace: &str, resource: &str) -> Result { + let mut command = Command::new(kubectl_command); + + command.stdin(Stdio::null()).stderr(Stdio::inherit()); + + command.arg("logs"); + command.arg("-f"); + command.arg("-n").arg(namespace); + command.arg(resource); + + let reader = Reader::spawn(command)?; + Ok(reader) +} diff --git a/lib/k8s-test-framework/src/namespace.rs b/lib/k8s-test-framework/src/namespace.rs new file mode 100644 index 0000000000000..8143b674f1e07 --- /dev/null +++ b/lib/k8s-test-framework/src/namespace.rs @@ -0,0 +1,35 @@ +//! Manage namespaces. + +use crate::up_down; +use std::process::{Command, Stdio}; + +/// Parameters required to build a `kubectl` command to manage the namespace. +#[derive(Debug)] +pub struct CommandBuilder { + kubectl_command: String, + namespace: String, +} + +impl up_down::CommandBuilder for CommandBuilder { + fn build(&self, command_to_build: up_down::CommandToBuild) -> Command { + let mut command = Command::new(&self.kubectl_command); + command + .arg(match command_to_build { + up_down::CommandToBuild::Up => "create", + up_down::CommandToBuild::Down => "delete", + }) + .arg("namespace") + .arg(&self.namespace) + .stdin(Stdio::null()); + command + } +} + +/// Create a new [`up_down::Manager`] for the specified `namespace` and using +/// the specified `kubectl_command`. +pub fn manager(kubectl_command: &str, namespace: &str) -> up_down::Manager { + up_down::Manager::new(CommandBuilder { + kubectl_command: kubectl_command.to_owned(), + namespace: namespace.to_owned(), + }) +} diff --git a/lib/k8s-test-framework/src/reader.rs b/lib/k8s-test-framework/src/reader.rs new file mode 100644 index 0000000000000..9d8c57338013e --- /dev/null +++ b/lib/k8s-test-framework/src/reader.rs @@ -0,0 +1,123 @@ +//! Read process output. + +use std::process::{ExitStatus, Stdio}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::{Child, ChildStdout, Command}; + +/// Keeps track of the command invocation, proving the interface to +/// read the output and send a termination signal. +#[derive(Debug)] +pub struct Reader { + child: Child, + reader: BufReader, +} + +impl Reader { + /// Spawn a command and provide a [`Reader`]. + pub fn spawn(mut command: Command) -> std::io::Result { + Self::prepare_stdout(&mut command); + let child = command.spawn()?; + Ok(Self::new(child)) + } + + fn prepare_stdout(command: &mut Command) { + command.stdout(Stdio::piped()); + } + + fn new(mut child: Child) -> Self { + let stdout = child.stdout.take().unwrap(); + let reader = BufReader::new(stdout); + Reader { child, reader } + } + + /// Wait for the `kubectl logs` process to exit and return the exit code. + pub async fn wait(&mut self) -> std::io::Result { + (&mut self.child).await + } + + /// Send a termination signal to the `kubectl logs` process. + pub fn kill(&mut self) -> std::io::Result<()> { + self.child.kill() + } + + /// Read one line from the stdout of the `kubectl logs` process. + pub async fn read_line(&mut self) -> Option { + let mut s = String::new(); + let result = self.reader.read_line(&mut s).await; + match result { + Ok(0) => None, + Ok(_) => Some(s), + Err(err) => panic!(err), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + async fn collect(reader: &mut Reader) -> Vec { + let mut list = Vec::new(); + while let Some(line) = reader.read_line().await { + list.push(line) + } + list + } + + #[tokio::test] + async fn test_reader_finite() { + let mut command = Command::new("echo"); + command.arg("test"); + + let mut reader = Reader::spawn(command).expect("unable to spawn"); + + // Collect all line, expect stream to finish. + let lines = collect(&mut reader).await; + // Assert we got all the lines we expected. + assert_eq!(lines, vec!["test\n".to_owned()]); + + // Ensure wait doesn't fail, and that we exit status is success. + let exit_status = reader.wait().await.expect("wait failed"); + assert!(exit_status.success()); + } + + #[tokio::test] + async fn test_reader_inifinite() { + let mut command = Command::new("bash"); + command.arg("-c"); + command.arg(r#"NUM=0; while true; do echo "Line $NUM"; NUM=$((NUM+=1)); sleep 0.01; done"#); + + let mut reader = Reader::spawn(command).expect("unable to spawn"); + + // Read the lines and at some point ask the command we're reading from + // to stop. + let mut expected_num = 0; + while let Some(line) = reader.read_line().await { + // Assert we're getting expected lines. + assert_eq!(line, format!("Line {}\n", expected_num)); + + // On line 100 issue a `kill` to stop the infinite stream. + if expected_num == 100 { + reader.kill().expect("process already stopped") + } + + // If we are past 200 it means we issued `kill` at 100 and it wasn't + // effective. This is problem, fail the test. + // We don't to this immediately after `kill` to allow for some + // potential race condition. That kind of race is not just ok, but + // is desirable in the real-life usage to read-up the whole stdout + // buffer. + if expected_num > 200 { + panic!("went too far without stop being effective"); + } + + // Bump the expected num for the next iteration. + expected_num += 1; + } + + // Ensure wait doesn't fail. We killed the process, so expect + // a non-success exit code. + let exit_status = reader.wait().await.expect("wait failed"); + assert!(!exit_status.success()); + } +} diff --git a/lib/k8s-test-framework/src/resource_file.rs b/lib/k8s-test-framework/src/resource_file.rs new file mode 100644 index 0000000000000..3a4f8f170c3b2 --- /dev/null +++ b/lib/k8s-test-framework/src/resource_file.rs @@ -0,0 +1,21 @@ +use std::path::{Path, PathBuf}; +use tempfile::{tempdir, TempDir}; + +#[derive(Debug)] +pub struct ResourceFile { + dir: TempDir, + path: PathBuf, +} + +impl ResourceFile { + pub fn new(data: &str) -> std::io::Result { + let dir = tempdir()?; + let path = dir.path().join("custom.yaml"); + std::fs::write(&path, data)?; + Ok(Self { dir, path }) + } + + pub fn path(&self) -> &Path { + self.path.as_path() + } +} diff --git a/lib/k8s-test-framework/src/test_pod.rs b/lib/k8s-test-framework/src/test_pod.rs new file mode 100644 index 0000000000000..e58e7d026ee12 --- /dev/null +++ b/lib/k8s-test-framework/src/test_pod.rs @@ -0,0 +1,58 @@ +//! Manage test pods. + +use super::{resource_file::ResourceFile, Result}; +use crate::up_down; +use k8s_openapi::api::core::v1::Pod; +use std::process::{Command, Stdio}; + +/// A config that holds a test `Pod` resource file. +#[derive(Debug)] +pub struct Config { + test_pod_resource_file: ResourceFile, +} + +impl Config { + /// Create a [`Config`] using a structured [`Pod`] object. + pub fn from_pod(pod: &Pod) -> Result { + Self::from_resource_string(serde_json::to_string(pod)?.as_str()) + } + + /// Create a [`Config`] using an unstructured resource string. + pub fn from_resource_string(resource: &str) -> Result { + let test_pod_resource_file = ResourceFile::new(resource)?; + Ok(Self { + test_pod_resource_file, + }) + } +} + +/// Parameters required to build a `kubectl` command to manage the test `Pod`. +#[derive(Debug)] +pub struct CommandBuilder { + kubectl_command: String, + config: Config, +} + +impl up_down::CommandBuilder for CommandBuilder { + fn build(&self, command_to_build: up_down::CommandToBuild) -> Command { + let mut command = Command::new(&self.kubectl_command); + command + .arg(match command_to_build { + up_down::CommandToBuild::Up => "create", + up_down::CommandToBuild::Down => "delete", + }) + .arg("-f") + .arg(self.config.test_pod_resource_file.path()) + .stdin(Stdio::null()); + command + } +} + +/// Create a new [`up_down::Manager`] with the specified `config` and using +/// the specified `kubectl_command`. +pub fn manager(kubectl_command: &str, config: Config) -> up_down::Manager { + up_down::Manager::new(CommandBuilder { + kubectl_command: kubectl_command.to_owned(), + config, + }) +} diff --git a/lib/k8s-test-framework/src/up_down.rs b/lib/k8s-test-framework/src/up_down.rs new file mode 100644 index 0000000000000..38c2a441ef3ee --- /dev/null +++ b/lib/k8s-test-framework/src/up_down.rs @@ -0,0 +1,79 @@ +use super::Result; +use crate::util::{run_command, run_command_blocking}; +use std::process::Command; + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum CommandToBuild { + Up, + Down, +} + +pub trait CommandBuilder { + fn build(&self, command_to_build: CommandToBuild) -> Command; +} + +#[derive(Debug)] +pub struct Manager +where + B: CommandBuilder, +{ + command_builder: B, + needs_drop: bool, +} + +impl Manager +where + B: CommandBuilder, +{ + pub fn new(command_builder: B) -> Self { + Self { + command_builder, + needs_drop: false, + } + } + + pub async fn up(&mut self) -> Result<()> { + self.needs_drop = true; + self.exec(CommandToBuild::Up).await + } + + pub async fn down(&mut self) -> Result<()> { + self.needs_drop = false; + self.exec(CommandToBuild::Down).await + } + + pub fn up_blocking(&mut self) -> Result<()> { + self.needs_drop = true; + self.exec_blocking(CommandToBuild::Up) + } + + pub fn down_blocking(&mut self) -> Result<()> { + self.needs_drop = false; + self.exec_blocking(CommandToBuild::Down) + } + + fn build(&self, command_to_build: CommandToBuild) -> Command { + self.command_builder.build(command_to_build) + } + + async fn exec(&self, command_to_build: CommandToBuild) -> Result<()> { + let command = self.build(command_to_build); + run_command(tokio::process::Command::from(command)).await + } + + fn exec_blocking(&self, command_to_build: CommandToBuild) -> Result<()> { + let command = self.build(command_to_build); + run_command_blocking(command) + } +} + +impl Drop for Manager +where + B: CommandBuilder, +{ + fn drop(&mut self) { + if self.needs_drop { + self.down_blocking().expect("turndown failed"); + } + } +} diff --git a/lib/k8s-test-framework/src/util.rs b/lib/k8s-test-framework/src/util.rs new file mode 100644 index 0000000000000..87c53585589b8 --- /dev/null +++ b/lib/k8s-test-framework/src/util.rs @@ -0,0 +1,18 @@ +use crate::Result; + +pub async fn run_command(mut command: tokio::process::Command) -> Result<()> { + let exit_status = command.spawn()?.await?; + if !exit_status.success() { + return Err(format!("exec failed: {:?}", command).into()); + } + Ok(()) +} + +pub fn run_command_blocking(mut command: std::process::Command) -> Result<()> { + let mut child = command.spawn()?; + let exit_status = child.wait()?; + if !exit_status.success() { + return Err(format!("exec failed: {:?}", command).into()); + } + Ok(()) +} diff --git a/lib/k8s-test-framework/src/vector.rs b/lib/k8s-test-framework/src/vector.rs new file mode 100644 index 0000000000000..41cf1cf1bf7ae --- /dev/null +++ b/lib/k8s-test-framework/src/vector.rs @@ -0,0 +1,48 @@ +//! Manage Vector. + +use super::{resource_file::ResourceFile, Result}; +use crate::up_down; +use std::process::{Command, Stdio}; + +/// Parameters required to build a `kubectl` command to manage Vector in the +/// Kubernetes cluster. +#[derive(Debug)] +pub struct CommandBuilder { + interface_command: String, + namespace: String, + custom_resource_file: ResourceFile, +} + +impl up_down::CommandBuilder for CommandBuilder { + fn build(&self, command_to_build: up_down::CommandToBuild) -> Command { + let mut command = Command::new(&self.interface_command); + command + .arg(match command_to_build { + up_down::CommandToBuild::Up => "up", + up_down::CommandToBuild::Down => "down", + }) + .arg(&self.namespace) + .env( + "CUSTOM_RESOURCE_CONIFGS_FILE", + self.custom_resource_file.path(), + ) + .stdin(Stdio::null()); + command + } +} + +/// Takes care of deploying Vector into the Kubernetes cluster. +/// +/// Manages the config file secret accordingly. +pub fn manager( + interface_command: &str, + namespace: &str, + custom_resource: &str, +) -> Result> { + let custom_resource_file = ResourceFile::new(custom_resource)?; + Ok(up_down::Manager::new(CommandBuilder { + interface_command: interface_command.to_owned(), + namespace: namespace.to_owned(), + custom_resource_file, + })) +} diff --git a/lib/k8s-test-framework/src/wait_for_resource.rs b/lib/k8s-test-framework/src/wait_for_resource.rs new file mode 100644 index 0000000000000..944872cfe4ef7 --- /dev/null +++ b/lib/k8s-test-framework/src/wait_for_resource.rs @@ -0,0 +1,92 @@ +//! Wait for a resource to reach a certain condition. + +use super::Result; +use crate::util::run_command; +use std::{ffi::OsStr, process::Stdio}; +use tokio::process::Command; + +/// Specify what condition to wait for. +#[derive(Debug)] +pub enum WaitFor +where + C: std::fmt::Display, +{ + /// Wait for resource deletion. + Delete, + /// Wait for the specified condition. + Condition(C), +} + +/// Wait for a set of `resources` within a `namespace` to reach a `wait_for` +/// condition. +/// Use `extra` to pass additional arguments to `kubectl`. +pub async fn namespace( + kubectl_command: Cmd, + namespace: NS, + resources: impl IntoIterator, + wait_for: WaitFor, + extra: impl IntoIterator, +) -> Result<()> +where + Cmd: AsRef, + NS: AsRef, + R: AsRef, + Cond: std::fmt::Display, + Ex: AsRef, +{ + let mut command = prepare_base_command(kubectl_command, resources, wait_for, extra); + command.arg("-n").arg(namespace); + run_command(command).await +} + +/// Wait for a set of `resources` at any namespace to reach a `wait_for` +/// condition. +/// Use `extra` to pass additional arguments to `kubectl`. +pub async fn all_namespaces( + kubectl_command: Cmd, + resources: impl IntoIterator, + wait_for: WaitFor, + extra: impl IntoIterator, +) -> Result<()> +where + Cmd: AsRef, + R: AsRef, + Cond: std::fmt::Display, + Ex: AsRef, +{ + let mut command = prepare_base_command(kubectl_command, resources, wait_for, extra); + command.arg("--all-namespaces=true"); + run_command(command).await +} + +fn prepare_base_command( + kubectl_command: Cmd, + resources: impl IntoIterator, + wait_for: WaitFor, + extra: impl IntoIterator, +) -> Command +where + Cmd: AsRef, + R: AsRef, + Cond: std::fmt::Display, + Ex: AsRef, +{ + let mut command = Command::new(kubectl_command); + + command + .stdin(Stdio::null()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()); + + command.arg("wait"); + command.args(resources); + + command.arg("--for"); + match wait_for { + WaitFor::Delete => command.arg("delete"), + WaitFor::Condition(cond) => command.arg(format!("condition={}", cond)), + }; + + command.args(extra); + command +} diff --git a/lib/k8s-test-framework/src/wait_for_rollout.rs b/lib/k8s-test-framework/src/wait_for_rollout.rs new file mode 100644 index 0000000000000..a017232afe9ca --- /dev/null +++ b/lib/k8s-test-framework/src/wait_for_rollout.rs @@ -0,0 +1,37 @@ +//! Wait for a resource rollout to complete. + +use super::Result; +use crate::util::run_command; +use std::{ffi::OsStr, process::Stdio}; +use tokio::process::Command; + +/// Wait for a rollout of a `resource` within a `namespace` to complete via +/// the specifed `kubectl_command`. +/// Use `extra` to pass additional arguments to `kubectl`. +pub async fn run( + kubectl_command: Cmd, + namespace: NS, + resource: R, + extra: impl IntoIterator, +) -> Result<()> +where + Cmd: AsRef, + NS: AsRef, + R: AsRef, + Ex: AsRef, +{ + let mut command = Command::new(kubectl_command); + + command + .stdin(Stdio::null()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()); + + command.arg("rollout").arg("status"); + command.arg("-n").arg(namespace); + command.arg(resource); + command.args(extra); + + run_command(command).await?; + Ok(()) +} diff --git a/scripts/copy-docker-image-to-minikube.sh b/scripts/copy-docker-image-to-minikube.sh deleted file mode 100755 index 0efe834b69e24..0000000000000 --- a/scripts/copy-docker-image-to-minikube.sh +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# copy-docker-image-to-minikube.sh -# -# SUMMARY -# -# Copies a list of images from the host docker engine to the minikube docker -# engine via save/load commands. -# -# Requires minikube and docker to be available. -# -# USAGE -# -# copy-docker-image-to-minikube.sh timberio/vector:latest - -# Image to copy. -IMAGES=("${@:?"Specify the images to copy in the arguments list"}") - -# Prepare temp dir to store the images archive. -TD="$(mktemp -d)" -IMAGES_ARCHIVE="$TD/images.tar.gz" - -# Save images. -docker save "${IMAGES[@]}" | gzip >"$IMAGES_ARCHIVE" - -# Start a subshell to preserve the env state. -( - # Switch to minikube docker. - # shellcheck source=minikube-docker-env.sh disable=SC1091 - . scripts/minikube-docker-env.sh - - # Load images. - docker load -i "$IMAGES_ARCHIVE" -) - -# Clear temp dir. -rm -rf "$TD" diff --git a/scripts/deploy-kubernetes-test.sh b/scripts/deploy-kubernetes-test.sh index f7e9a73755bc4..854604a80c472 100755 --- a/scripts/deploy-kubernetes-test.sh +++ b/scripts/deploy-kubernetes-test.sh @@ -8,7 +8,7 @@ set -euo pipefail # Deploys Vector into Kubernetes for testing purposes. # Uses the same installation method our users would use. # -# This script impements cli interface required by the kubernetes integration +# This script implements cli interface required by the kubernetes E2E # tests. # # USAGE @@ -55,20 +55,21 @@ up() { $VECTOR_TEST_KUBECTL create --namespace "$NAMESPACE" -f "$CUSTOM_RESOURCE_CONIFGS_FILE" fi - sed "s|timerio/vector:latest|$CONTAINER_IMAGE|" < "distribution/kubernetes/vector-namespaced.yaml" \ + sed 's|image: timberio/vector:[^$]*$'"|image: $CONTAINER_IMAGE|" < "distribution/kubernetes/vector-namespaced.yaml" \ | $VECTOR_TEST_KUBECTL create --namespace "$NAMESPACE" -f - } down() { - $VECTOR_TEST_KUBECTL delete --namespace "$NAMESPACE" -f - < "distribution/kubernetes/vector-namespaced.yaml" + # A workaround for `kubectl` from a `snap` package. + cat < "distribution/kubernetes/vector-namespaced.yaml" | $VECTOR_TEST_KUBECTL delete --namespace "$NAMESPACE" -f - if [[ -n "$CUSTOM_RESOURCE_CONIFGS_FILE" ]]; then $VECTOR_TEST_KUBECTL delete --namespace "$NAMESPACE" -f "$CUSTOM_RESOURCE_CONIFGS_FILE" fi - $VECTOR_TEST_KUBECTL delete namespace "$NAMESPACE" - templated-config-global | $VECTOR_TEST_KUBECTL delete -f - + + $VECTOR_TEST_KUBECTL delete namespace "$NAMESPACE" } case "$COMMAND" in diff --git a/scripts/minikube-docker-env.sh b/scripts/minikube-docker-env.sh deleted file mode 100644 index fc2ffdd8ab9f0..0000000000000 --- a/scripts/minikube-docker-env.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -if ! COMMANDS="$(minikube --shell bash docker-env)"; then - echo "Unable to obtain docker env from minikube; is minikube started?" >&2 - exit 7 -fi - -eval "$COMMANDS" diff --git a/scripts/skaffold-dockerignore.sh b/scripts/skaffold-dockerignore.sh new file mode 100755 index 0000000000000..a82f73af27c66 --- /dev/null +++ b/scripts/skaffold-dockerignore.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash +set -euo pipefail +cd "$(dirname "${BASH_SOURCE[0]}")/.." + +# skaffold-dockerignore.sh +# +# SUMMARY +# +# Prepare .dockerignore for skaffold docker image build so we don't send the +# whole `target/debug` dir to the docker as the context. + +cat <target/debug/.dockerignore +**/* +!vector +EOF diff --git a/scripts/skaffold.sh b/scripts/skaffold.sh index dd16c70603a7d..41e359fb19c93 100755 --- a/scripts/skaffold.sh +++ b/scripts/skaffold.sh @@ -7,10 +7,7 @@ cargo build # Prepare .dockerignore so we don't send the whole dir to the docker as the # context. -cat <target/debug/.dockerignore -**/* -!vector -EOF +scripts/skaffold-dockerignore.sh # Watch for changes in he background and rebuild the vector binary. cargo watch -x build & diff --git a/scripts/test-e2e-kubernetes.sh b/scripts/test-e2e-kubernetes.sh new file mode 100755 index 0000000000000..ea29395b14e85 --- /dev/null +++ b/scripts/test-e2e-kubernetes.sh @@ -0,0 +1,142 @@ +#!/usr/bin/env bash +set -euo pipefail + +# test-e2e-kubernetes.sh +# +# SUMMARY +# +# Run E2E tests for Kubernetes. + +random-string() { + local CHARS="abcdefghijklmnopqrstuvwxyz0123456789" + # shellcheck disable=SC2034 + for i in {1..8}; do + echo -n "${CHARS:RANDOM%${#CHARS}:1}" + done + echo +} + +# Detect if current kubectl context is `minikube`. +is_kubectl_context_minikube() { + [[ "$(kubectl config current-context || true)" == "minikube" ]] +} + +# Whether to use `minikube cache` to pass image to the k8s cluster. +# After we build vector docker image, instead of pushing to the remote repo, +# we'll be using `minikube cache` to make image available to the cluster. +# This effectively eliminates the requirement to have a docker registry, but +# it requires that we run against minikube cluster. +is_minikube_cache_enabled() { + local MODE="${USE_MINIKUBE_CACHE:-"auto"}" + if [[ "$MODE" == "auto" ]]; then + if is_kubectl_context_minikube; then + echo "Note: detected minikube kubectl context, using minikube cache" >&2 + return 0 + else + echo "Note: detected non-minikube kubectl context, docker repo is required" >&2 + return 1 + fi + else + [[ "$MODE" == "true" ]] + fi +} + +# Build a docker image if it wasn't provided. +if [[ -z "${CONTAINER_IMAGE:-}" ]]; then + # Require a repo to put the container image at. + # + # Hint #1: you can use `scripts/start-docker-registry.sh`, but it requires + # manually preparing the environment to allow insecure registries, and it can + # also not work if you k8s cluster doesn't have network connectivity to the + # registry. + # + # Hint #2: if using with minikube, set `USE_MINIKUBE_CACHE` to `true` and you + # can omit the `CONTAINER_IMAGE_REPO`. + # + if is_minikube_cache_enabled; then + # If `minikube cache` will be used, the push access to the docker repo + # is not required, and we can provide a default value for the + # `CONTAINER_IMAGE_REPO`. + # CRIO prefixes the image name with `localhost/` when it's passed via + # `minikube cache`, so, in our default value default, to work around that + # issue, we use the repo name that already contains that prefix, such that + # the effective image name on the minikube node matches the one expected in + # tests. + CONTAINER_IMAGE_REPO="${CONTAINER_IMAGE_REPO:-"localhost/vector-test"}" + else + # If not using `minikube cache`, it's mandatory to have a push access to the + # repo, so we don't offer a default value and explicilty require the user to + # specify a `CONTAINER_IMAGE_REPO`. + CONTAINER_IMAGE_REPO="${CONTAINER_IMAGE_REPO:?"You have to specify CONTAINER_IMAGE_REPO to upload the test image to."}" + fi + + # Assign a default test run ID if none is provided by the user. + TEST_RUN_ID="${TEST_RUN_ID:-"$(date +%s)-$(random-string)"}" + + if [[ "${QUICK_BUILD:-"false"}" == "true" ]]; then + # Build in debug mode. + cargo build + + # Prepare test image parameters. + VERSION_TAG="test-$TEST_RUN_ID" + + # Prepare the container image for the deployment command and docker build. + CONTAINER_IMAGE="$CONTAINER_IMAGE_REPO:$VERSION_TAG-debug" + + # Build docker image. + scripts/skaffold-dockerignore.sh + docker build --tag "$CONTAINER_IMAGE" -f skaffold/docker/Dockerfile target/debug + else + # Package a .deb file to build a docker container, unless skipped. + if [[ -z "${SKIP_PACKAGE_DEB:-}" ]]; then + make package-deb-x86_64 USE_CONTAINER="${PACKAGE_DEB_USE_CONTAINER:-"docker"}" + fi + + # Prepare test image parameters. + VERSION_TAG="test-$TEST_RUN_ID" + BASE_TAG="debian" + + # Build docker image with Vector - the same way it's done for releses. Don't + # do the push - we'll handle it later. + REPO="$CONTAINER_IMAGE_REPO" \ + CHANNEL="test" \ + BASE="$BASE_TAG" \ + TAG="$VERSION_TAG" \ + PUSH="" \ + scripts/build-docker.sh + + # Prepare the container image for the deployment command. + CONTAINER_IMAGE="$CONTAINER_IMAGE_REPO:$VERSION_TAG-$BASE_TAG" + fi +fi + +if [[ -z "${SKIP_CONTAINER_IMAGE_PUBLISHING:-}" ]]; then + # Make the container image accessible to the k8s cluster. + if is_minikube_cache_enabled; then + minikube cache add "$CONTAINER_IMAGE" + else + docker push "$CONTAINER_IMAGE" + fi +fi + +# Export the container image to be accessible from the deployment command. +export CONTAINER_IMAGE + +# Set the deployment command for integration tests. +export KUBE_TEST_DEPLOY_COMMAND="scripts/deploy-kubernetes-test.sh" + +# Prepare args. +CARGO_TEST_ARGS=() +if [[ -n "${SCOPE:-}" && "$SCOPE" != '""' ]]; then + CARGO_TEST_ARGS+=("$SCOPE") +fi + +# Run the tests. +cargo test \ + --test kubernetes-e2e \ + --no-default-features \ + --features kubernetes-e2e-tests \ + -- \ + --nocapture \ + --test-threads 1 \ + "${CARGO_TEST_ARGS[@]}" diff --git a/scripts/test-integration-kubernetes.sh b/scripts/test-integration-kubernetes.sh deleted file mode 100755 index 329154d95fdec..0000000000000 --- a/scripts/test-integration-kubernetes.sh +++ /dev/null @@ -1,75 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# test-integration-kubernetes.sh -# -# SUMMARY -# -# Run integration tests for Kubernetes components only. - -random-string() { - local CHARS="abcdefghijklmnopqrstuvwxyz0123456789" - # shellcheck disable=SC2034 - for i in {1..8}; do - echo -n "${CHARS:RANDOM%${#CHARS}:1}" - done - echo -} - -# Require a repo to put the container image at. -# -# Hint #1: you can use `scripts/start-docker-registry.sh`, but it requires -# manually preparing the environment to allow insecure registries, and it can -# also not work if you k8s cluster doesn't have network connectivity to the -# registry. -# -# Hint #2: if using with minikube, set `USE_MINIKUBE_DOCKER` to `true` and use -# any value for `CONTAINER_IMAGE_REPO` (for instance, `vector-test` will do). -# -CONTAINER_IMAGE_REPO="${CONTAINER_IMAGE_REPO:?"You have to specify CONTAINER_IMAGE_REPO to upload the test image to."}" - -# Whether to use minikube docker. -# After we build vector docker image, instead of pushing to the remote repo, -# we'll be exporting it to a file after (from the "host" docker engine), and -# then importing that file into the minikube in-cluster docker engine, that -# nodes have access to. -# This effectively eliminates the requirement to have a docker registry, but -# it requires that we run against minikube cluster. -USE_MINIKUBE_DOCKER="${USE_MINIKUBE_DOCKER:-"false"}" - -# Assign a default test run ID if none is provided by the user. -TEST_RUN_ID="${TEST_RUN_ID:-"test-$(date +%s)-$(random-string)"}" - -if [[ -z "${SKIP_PACKAGE_DEB:-}" ]]; then - make package-deb-x86_64 USE_CONTAINER="${PACKAGE_DEB_USE_CONTAINER:-"docker"}" -fi - -# Prepare test image parameters. -VERSION_TAG="test-$TEST_RUN_ID" -BASE_TAG="debian" - -# Build docker image with Vector - the same way it's done for releses. Don't -# do the push - we'll handle it later. -REPO="$CONTAINER_IMAGE_REPO" \ - CHANNEL="test" \ - BASE="$BASE_TAG" \ - TAG="$VERSION_TAG" \ - PUSH="" \ - scripts/build-docker.sh - -# Prepare the container image for the deployment command. -export CONTAINER_IMAGE="$CONTAINER_IMAGE_REPO:$VERSION_TAG-$BASE_TAG" - -# Make the container image accessible to the k8s cluster. -if [[ "$USE_MINIKUBE_DOCKER" == "true" ]]; then - scripts/copy-docker-image-to-minikube.sh "$CONTAINER_IMAGE" -else - docker push "$CONTAINER_IMAGE" -fi - -# Set the deployment command for integration tests. -export KUBE_TEST_DEPLOY_COMMAND="scripts/deploy-kubernetes-test.sh" - - -# Run the tests. -cargo test --no-default-features --features kubernetes-integration-tests diff --git a/skaffold/manifests/patches/env.yaml b/skaffold/manifests/patches/env.yaml new file mode 100644 index 0000000000000..0b67aee5789f5 --- /dev/null +++ b/skaffold/manifests/patches/env.yaml @@ -0,0 +1,12 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: vector +spec: + template: + spec: + containers: + - name: vector + env: + - name: LOG + $patch: delete diff --git a/tests/kubernetes-e2e.rs b/tests/kubernetes-e2e.rs new file mode 100644 index 0000000000000..f9c5d1e58da4f --- /dev/null +++ b/tests/kubernetes-e2e.rs @@ -0,0 +1,732 @@ +//! This test is optimized for very quick rebuilds as it doesn't use anything +//! from the `vector` crate, and thus doesn't waste time in a tremendously long +//! link step. + +use futures::{SinkExt, StreamExt}; +use k8s_openapi::{ + api::core::v1::{Container, Pod, PodSpec}, + apimachinery::pkg::apis::meta::v1::ObjectMeta, +}; +use k8s_test_framework::{ + lock, test_pod, wait_for_resource::WaitFor, Framework, Interface, Reader, +}; +use std::collections::HashSet; + +const VECTOR_CONFIG: &str = r#" +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-config +data: + vector.toml: | + [sinks.stdout] + type = "console" + inputs = ["kubernetes_logs"] + target = "stdout" + encoding = "json" +"#; + +const BUSYBOX_IMAGE: &str = "busybox:1.28"; + +fn make_framework() -> Framework { + let interface = Interface::from_env().expect("interface is not ready"); + Framework::new(interface) +} + +fn make_test_pod<'a>( + namespace: &'a str, + name: &'a str, + command: &'a str, + labels: impl IntoIterator + 'a, +) -> Pod { + let labels: std::collections::BTreeMap = labels + .into_iter() + .map(|(key, val)| (key.to_owned(), val.to_owned())) + .collect(); + let labels = if labels.is_empty() { + None + } else { + Some(labels) + }; + Pod { + metadata: ObjectMeta { + name: Some(name.to_owned()), + namespace: Some(namespace.to_owned()), + labels, + ..ObjectMeta::default() + }, + spec: Some(PodSpec { + containers: vec![Container { + name: name.to_owned(), + image: Some(BUSYBOX_IMAGE.to_owned()), + command: Some(vec!["sh".to_owned()]), + args: Some(vec!["-c".to_owned(), command.to_owned()]), + ..Container::default() + }], + restart_policy: Some("Never".to_owned()), + ..PodSpec::default() + }), + ..Pod::default() + } +} + +fn parse_json(s: &str) -> Result> { + Ok(serde_json::from_str(s)?) +} + +fn generate_long_string(a: usize, b: usize) -> String { + (0..a).fold(String::new(), |mut acc, i| { + let istr = i.to_string(); + for _ in 0..b { + acc.push_str(&istr); + } + acc + }) +} + +/// Read the first line from vector logs and assert that it matches the expected +/// one. +/// This allows detecting the situations where things have gone very wrong. +async fn smoke_check_first_line(log_reader: &mut Reader) { + // Wait for first line as a smoke check. + let first_line = log_reader + .read_line() + .await + .expect("unable to read first line"); + let expected_pat = "INFO vector: Log level \"info\" is enabled.\n"; + assert!( + first_line.ends_with(expected_pat), + "Expected a line ending with {:?} but got {:?}; vector might be malfunctioning", + expected_pat, + first_line + ); +} + +enum FlowControlCommand { + GoOn, + Terminate, +} + +async fn look_for_log_line

( + log_reader: &mut Reader, + mut predicate: P, +) -> Result<(), Box> +where + P: FnMut(serde_json::Value) -> FlowControlCommand, +{ + let mut lines_till_we_give_up = 10000; + while let Some(line) = log_reader.read_line().await { + println!("Got line: {:?}", line); + + lines_till_we_give_up -= 1; + if lines_till_we_give_up <= 0 { + println!("Giving up"); + log_reader.kill()?; + break; + } + + if !line.starts_with("{") { + // This isn't a json, must be an entry from Vector's own log stream. + continue; + } + + let val = parse_json(&line)?; + + match predicate(val) { + FlowControlCommand::GoOn => { + // Not what we were looking for, go on. + } + FlowControlCommand::Terminate => { + // We are told we should stop, request that log reader is + // killed. + // This doesn't immediately stop the reading because we want to + // process the pending buffers first. + log_reader.kill()?; + } + } + } + + // Ensure log reader exited. + log_reader.wait().await.expect("log reader wait failed"); + + Ok(()) +} + +/// This test validates that vector picks up logs at the simplest case +/// possible - a new pod is deployed and prints to stdout, and we assert that +/// vector picks that up. +#[tokio::test] +async fn simple() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let vector = framework.vector("test-vector", VECTOR_CONFIG).await?; + framework + .wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=60s"]) + .await?; + + let test_namespace = framework.namespace("test-vector-test-pod").await?; + + let test_pod = framework + .test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod", + "echo MARKER", + vec![], + ))?) + .await?; + framework + .wait( + "test-vector-test-pod", + vec!["pods/test-pod"], + WaitFor::Condition("initialized"), + vec!["--timeout=60s"], + ) + .await?; + + let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader).await; + + // Read the rest of the log lines. + let mut got_marker = false; + look_for_log_line(&mut log_reader, |val| { + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + // A log from something other than our test pod, pretend we don't + // see it. + return FlowControlCommand::GoOn; + } + + // Ensure we got the marker. + assert_eq!(val["message"], "MARKER"); + + if got_marker { + // We've already seen one marker! This is not good, we only emitted + // one. + panic!("marker seen more than once"); + } + + // If we did, remember it. + got_marker = true; + + // Request to stop the flow. + FlowControlCommand::Terminate + }) + .await?; + + assert!(got_marker); + + drop(test_pod); + drop(test_namespace); + drop(vector); + Ok(()) +} + +/// This test validates that vector properly merges a log message that +/// kubernetes has internally split into multiple partial log lines. +#[tokio::test] +async fn partial_merge() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let vector = framework.vector("test-vector", VECTOR_CONFIG).await?; + framework + .wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=60s"]) + .await?; + + let test_namespace = framework.namespace("test-vector-test-pod").await?; + + let test_message = generate_long_string(8, 8 * 1024); // 64 KiB + let test_pod = framework + .test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod", + &format!("echo {}", test_message), + vec![], + ))?) + .await?; + framework + .wait( + "test-vector-test-pod", + vec!["pods/test-pod"], + WaitFor::Condition("initialized"), + vec!["--timeout=60s"], + ) + .await?; + + let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader).await; + + // Read the rest of the log lines. + let mut got_expected_line = false; + look_for_log_line(&mut log_reader, |val| { + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + // A log from something other than our test pod, pretend we don't + // see it. + return FlowControlCommand::GoOn; + } + + // Ensure the message we got matches the one we emitted. + assert_eq!(val["message"], test_message); + + if got_expected_line { + // We've already seen our expected line once! This is not good, we + // only emitted one. + panic!("test message seen more than once"); + } + + // If we did, remember it. + got_expected_line = true; + + // Request to stop the flow. + FlowControlCommand::Terminate + }) + .await?; + + assert!(got_expected_line); + + drop(test_pod); + drop(test_namespace); + drop(vector); + Ok(()) +} + +/// This test validates that vector picks up preexisting logs - logs that +/// existed before vector was deployed. +#[tokio::test] +async fn preexisting() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let test_namespace = framework.namespace("test-vector-test-pod").await?; + + let test_pod = framework + .test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod", + "echo MARKER", + vec![], + ))?) + .await?; + framework + .wait( + "test-vector-test-pod", + vec!["pods/test-pod"], + WaitFor::Condition("initialized"), + vec!["--timeout=60s"], + ) + .await?; + + // Wait for some extra time to ensure pod completes. + tokio::time::delay_for(std::time::Duration::from_secs(10)).await; + + let vector = framework.vector("test-vector", VECTOR_CONFIG).await?; + framework + .wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=60s"]) + .await?; + + let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader).await; + + // Read the rest of the log lines. + let mut got_marker = false; + look_for_log_line(&mut log_reader, |val| { + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + // A log from something other than our test pod, pretend we don't + // see it. + return FlowControlCommand::GoOn; + } + + // Ensure we got the marker. + assert_eq!(val["message"], "MARKER"); + + if got_marker { + // We've already seen one marker! This is not good, we only emitted + // one. + panic!("marker seen more than once"); + } + + // If we did, remember it. + got_marker = true; + + // Request to stop the flow. + FlowControlCommand::Terminate + }) + .await?; + + assert!(got_marker); + + drop(test_pod); + drop(test_namespace); + drop(vector); + Ok(()) +} + +/// This test validates that vector picks up multiple log lines, and that they +/// arrive at the proper order. +#[tokio::test] +async fn multiple_lines() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let vector = framework.vector("test-vector", VECTOR_CONFIG).await?; + framework + .wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=60s"]) + .await?; + + let test_namespace = framework.namespace("test-vector-test-pod").await?; + + let test_messages = vec!["MARKER1", "MARKER2", "MARKER3", "MARKER4", "MARKER5"]; + let test_pod = framework + .test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod", + &format!("echo -e {}", test_messages.join(r"\\n")), + vec![], + ))?) + .await?; + framework + .wait( + "test-vector-test-pod", + vec!["pods/test-pod"], + WaitFor::Condition("initialized"), + vec!["--timeout=60s"], + ) + .await?; + + let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader).await; + + // Read the rest of the log lines. + let mut test_messages_iter = test_messages.into_iter().peekable(); + look_for_log_line(&mut log_reader, |val| { + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + // A log from something other than our test pod, pretend we don't + // see it. + return FlowControlCommand::GoOn; + } + + // Take the next marker. + let current_marker = test_messages_iter + .next() + .expect("expected no more lines since the test messages iter is exausted"); + + // Ensure we got the marker. + assert_eq!(val["message"], current_marker); + + if test_messages_iter.peek().is_some() { + // We're not done yet, so go on. + return FlowControlCommand::GoOn; + } + + // Request to stop the flow. + FlowControlCommand::Terminate + }) + .await?; + + assert!(test_messages_iter.next().is_none()); + + drop(test_pod); + drop(test_namespace); + drop(vector); + Ok(()) +} + +/// This test validates that vector properly annotates log events with pod +/// metadata obtained from the k8s API. +#[tokio::test] +async fn pod_metadata_annotation() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let vector = framework.vector("test-vector", VECTOR_CONFIG).await?; + framework + .wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=60s"]) + .await?; + + let test_namespace = framework.namespace("test-vector-test-pod").await?; + + let test_pod = framework + .test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod", + "echo MARKER", + vec![("label1", "hello"), ("label2", "world")], + ))?) + .await?; + framework + .wait( + "test-vector-test-pod", + vec!["pods/test-pod"], + WaitFor::Condition("initialized"), + vec!["--timeout=60s"], + ) + .await?; + + let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader).await; + + // Read the rest of the log lines. + let mut got_marker = false; + look_for_log_line(&mut log_reader, |val| { + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + // A log from something other than our test pod, pretend we don't + // see it. + return FlowControlCommand::GoOn; + } + + // Ensure we got the marker. + assert_eq!(val["message"], "MARKER"); + + if got_marker { + // We've already seen one marker! This is not good, we only emitted + // one. + panic!("marker seen more than once"); + } + + // If we did, remember it. + got_marker = true; + + // Assert pod the event is properly annotated with pod metadata. + assert_eq!(val["kubernetes"]["pod_name"], "test-pod"); + // We've already asserted this above, but repeat for completeness. + assert_eq!(val["kubernetes"]["pod_namespace"], "test-vector-test-pod"); + assert_eq!(val["kubernetes"]["pod_uid"].as_str().unwrap().len(), 36); // 36 is a standard UUID string length + assert_eq!(val["kubernetes"]["pod_labels"]["label1"], "hello"); + assert_eq!(val["kubernetes"]["pod_labels"]["label2"], "world"); + + // Request to stop the flow. + FlowControlCommand::Terminate + }) + .await?; + + assert!(got_marker); + + drop(test_pod); + drop(test_namespace); + drop(vector); + Ok(()) +} + +/// This test validates that vector properly filters out the logs that are +/// requested to be excluded from collection, based on k8s API `Pod` labels. +#[tokio::test] +async fn pod_filtering() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let vector = framework.vector("test-vector", VECTOR_CONFIG).await?; + framework + .wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=60s"]) + .await?; + + let test_namespace = framework.namespace("test-vector-test-pod").await?; + + let excluded_test_pod = framework + .test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod-excluded", + "echo EXCLUDED_MARKER", + vec![("vector.dev/exclude", "true")], + ))?) + .await?; + framework + .wait( + "test-vector-test-pod", + vec!["pods/test-pod-excluded"], + WaitFor::Condition("initialized"), + vec!["--timeout=60s"], + ) + .await?; + + let control_test_pod = framework + .test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod-control", + "echo CONTROL_MARKER", + vec![], + ))?) + .await?; + framework + .wait( + "test-vector-test-pod", + vec!["pods/test-pod-control"], + WaitFor::Condition("initialized"), + vec!["--timeout=60s"], + ) + .await?; + + let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader).await; + + // Read the log lines until the reasoable amount of time passes for us + // to be confident that vector shoud've picked up the excluded message + // if it wasn't fitlering it. + let mut got_control_marker = false; + let mut lines_till_we_give_up: usize = 10000; + let (stop_tx, mut stop_rx) = futures::channel::mpsc::channel(0); + loop { + let line = tokio::select! { + result = stop_rx.next() => { + result.unwrap(); + log_reader.kill()?; + continue; + } + line = log_reader.read_line() => line, + }; + let line = match line { + Some(line) => line, + None => break, + }; + println!("Got line: {:?}", line); + + lines_till_we_give_up -= 1; + if lines_till_we_give_up <= 0 { + println!("Giving up"); + log_reader.kill()?; + break; + } + + if !line.starts_with("{") { + // This isn't a json, must be an entry from Vector's own log stream. + continue; + } + + let val = parse_json(&line)?; + + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + // A log from something other than our test pod, pretend we don't + // see it. + continue; + } + + // Ensure we got the log event from the control pod. + assert_eq!(val["kubernetes"]["pod_name"], "test-pod-control"); + + // Ensure the test sanity by validating that we got the control marker. + // If we get an excluded marker here - it's an error. + assert_eq!(val["message"], "CONTROL_MARKER"); + + if got_control_marker { + // We've already seen one control marker! This is not good, we only + // emitted one. + panic!("control marker seen more than once"); + } + + // Remember that we've seen a control marker. + got_control_marker = true; + + // Request termination in a while. + let mut stop_tx = stop_tx.clone(); + tokio::spawn(async move { + // Wait for two minutes - a reasonable time for vector internals to + // pick up new `Pod` and collect events from them in idle load. + // Here, we're assuming that if the `Pod` that was supposed to be + // ignored was in fact collected (meaning something's wrong with + // the exclusion logic), we'd see it's data within this time frame. + // It's not enough to just wait for `Pod` complete, we should still + // apply a reasonably big timeout before we stop waiting for the + // logs to appear to have high confidence that Vector has enough + // time to pick them up and spit them out. + let duration = std::time::Duration::from_secs(120); + println!("Starting stop timer, due in {} seconds", duration.as_secs()); + tokio::time::delay_for(duration).await; + println!("Stop timer complete"); + stop_tx.send(()).await.unwrap(); + }); + } + + // Ensure log reader exited. + log_reader.wait().await.expect("log reader wait failed"); + + assert!(got_control_marker); + + drop(excluded_test_pod); + drop(control_test_pod); + drop(test_namespace); + drop(vector); + Ok(()) +} + +/// This test validates that vector properly collects logs from multiple +/// `Namespace`s and `Pod`s. +#[tokio::test] +async fn multiple_ns() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let vector = framework.vector("test-vector", VECTOR_CONFIG).await?; + framework + .wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=60s"]) + .await?; + + const NS_PREFIX: &str = "test-vector-test-pod"; + + let mut test_namespaces = vec![]; + let mut expected_namespaces = HashSet::new(); + for i in 0..10 { + let name = format!("{}-{}", NS_PREFIX, i); + test_namespaces.push(framework.namespace(&name).await?); + expected_namespaces.insert(name); + } + + let mut test_pods = vec![]; + for ns in &expected_namespaces { + let test_pod = framework + .test_pod(test_pod::Config::from_pod(&make_test_pod( + ns, + "test-pod", + "echo MARKER", + vec![], + ))?) + .await?; + framework + .wait( + ns, + vec!["pods/test-pod"], + WaitFor::Condition("initialized"), + vec!["--timeout=60s"], + ) + .await?; + test_pods.push(test_pod); + } + + let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader).await; + + // Read the rest of the log lines. + look_for_log_line(&mut log_reader, |val| { + let ns = match val["kubernetes"]["pod_namespace"].as_str() { + Some(val) if val.starts_with(NS_PREFIX) => val, + _ => { + // A log from something other than our test pod, pretend we + // don't see it. + return FlowControlCommand::GoOn; + } + }; + + // Ensure we got the marker. + assert_eq!(val["message"], "MARKER"); + + // Remove the namespace from the list of namespaces we still expect to + // get. + let as_expected = expected_namespaces.remove(ns); + assert!(as_expected); + + if expected_namespaces.is_empty() { + // We got all the messages we expected, request to stop the flow. + FlowControlCommand::Terminate + } else { + // We didn't get all the messages yet. + FlowControlCommand::GoOn + } + }) + .await?; + + // Ensure that we have collected messages from all the namespaces. + assert!(expected_namespaces.is_empty()); + + drop(test_pods); + drop(test_namespaces); + drop(vector); + Ok(()) +}