diff --git a/tests/kubernetes-e2e.rs b/tests/kubernetes-e2e.rs index bc8dfc333cfa08..74fe8eda3fdf7c 100644 --- a/tests/kubernetes-e2e.rs +++ b/tests/kubernetes-e2e.rs @@ -1,8 +1,14 @@ +//! This test is optimized for very quick rebuilds as it doesn't use anything +//! from the `vector` crate, and thus doesn't waste time in a tremendously long +//! link step. + use k8s_openapi::{ api::core::v1::{Container, Pod, PodSpec}, apimachinery::pkg::apis::meta::v1::ObjectMeta, }; -use kubernetes_test_framework::{test_pod, wait_for_resource::WaitFor, Framework, Interface}; +use kubernetes_test_framework::{ + lock, log_lookup, test_pod, wait_for_resource::WaitFor, Framework, Interface, +}; const VECTOR_CONFIG: &str = r#" apiVersion: v1 @@ -32,11 +38,26 @@ fn make_framework() -> Framework { Framework::new(interface) } -fn make_test_pod(namespace: &str, name: &str, command: &str) -> Pod { +fn make_test_pod<'a>( + namespace: &'a str, + name: &'a str, + command: &'a str, + labels: impl IntoIterator + 'a, +) -> Pod { + let labels: std::collections::BTreeMap = labels + .into_iter() + .map(|(key, val)| (key.to_owned(), val.to_owned())) + .collect(); + let labels = if labels.is_empty() { + None + } else { + Some(labels) + }; Pod { metadata: Some(ObjectMeta { name: Some(name.to_owned()), namespace: Some(namespace.to_owned()), + labels, ..ObjectMeta::default() }), spec: Some(PodSpec { @@ -58,8 +79,87 @@ fn parse_json(s: &str) -> Result> Ok(serde_json::from_str(s)?) } +fn generate_long_string(a: usize, b: usize) -> String { + (0..a).fold(String::new(), |mut acc, i| { + let istr = i.to_string(); + for _ in 0..b { + acc.push_str(&istr); + } + acc + }) +} + +/// Read the first line from vector logs and assert that it matches the expected +/// one. +/// This allows detecting the situations where things have gone very wrong. +fn smoke_check_first_line(log_reader: &mut log_lookup::Reader) { + // Wait for first line as a smoke check. + let first_line = log_reader.next().expect("unable to read first line"); + let expected_pat = "INFO vector: Log level \"info\" is enabled.\n"; + assert!( + first_line.ends_with(expected_pat), + "Expected a line ending with {:?} but got {:?}; vector might be malfunctioning", + expected_pat, + first_line + ); +} + +enum FlowControlCommand { + GoOn, + Terminate, +} + +fn look_for_log_line

( + log_reader: &mut log_lookup::Reader, + mut predicate: P, +) -> Result<(), Box> +where + P: FnMut(serde_json::Value) -> FlowControlCommand, +{ + let mut lines_till_we_give_up = 10000; + while let Some(line) = log_reader.next() { + println!("Got line: {:?}", line); + + lines_till_we_give_up -= 1; + if lines_till_we_give_up <= 0 { + println!("Giving up"); + log_reader.kill()?; + break; + } + + if !line.starts_with("{") { + // This isn't a json, must be an entry from Vector's own log stream. + continue; + } + + let val = parse_json(&line)?; + + match predicate(val) { + FlowControlCommand::GoOn => { + // Not what we were looking for, go on. + } + FlowControlCommand::Terminate => { + // We are told we should stop, request that log reader is + // killed. + // This doesn't immediately stop the reading because we want to + // process the pending buffers first. + log_reader.kill()?; + } + } + } + + // Ensure log reader exited. + log_reader.wait().expect("log reader wait failed"); + + Ok(()) +} + +/// This test validates that vector picks up logs at the simplest case +/// possible - a new pod is deployed and prints to stdout, and we assert that +/// vector picks that up. #[test] -fn test() -> Result<(), Box> { +fn simple() -> Result<(), Box> { + let _guard = lock(); let framework = make_framework(); let vector = framework.vector("test-vector", VECTOR_CONFIG)?; @@ -71,6 +171,7 @@ fn test() -> Result<(), Box> { "test-vector-test-pod", "test-pod", "echo MARKER", + vec![], ))?)?; framework.wait( "test-vector-test-pod", @@ -80,41 +181,326 @@ fn test() -> Result<(), Box> { )?; let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader); - // Wait for first line as a smoke check. - let first_line = log_reader.next().expect("unable to read first line"); - let expected_pat = "INFO vector: Log level \"info\" is enabled.\n"; - assert!( - first_line.ends_with(expected_pat), - "Expected a line ending with {:?} but got {:?}; vector might be malfunctioning", - expected_pat, - first_line - ); + // Read the rest of the log lines. + let mut got_marker = false; + look_for_log_line(&mut log_reader, |val| { + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + // A log from something other than our test pod, predend we don't + // see it. + return FlowControlCommand::GoOn; + } + + // Ensure we got the marker. + assert_eq!(val["message"], "MARKER"); + + if got_marker { + // We've already seen one marker! This is not good, we only emitted + // one. + panic!("marker seen more than once"); + } + + // If we did, remember it. + got_marker = true; + + // Request to stop the flow. + FlowControlCommand::Terminate + })?; + + assert!(got_marker); + + drop(test_pod); + drop(test_namespace); + drop(vector); + Ok(()) +} + +/// This test validates that vector properly merges a log message that +/// kubernetes has internally split into multiple partial log lines. +#[test] +fn partial_merge() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let vector = framework.vector("test-vector", VECTOR_CONFIG)?; + framework.wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=10s"])?; + + let test_namespace = framework.namespace("test-vector-test-pod")?; + + let test_message = generate_long_string(8, 8 * 1024); // 64 KiB + let test_pod = framework.test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod", + &format!("echo {}", test_message), + vec![], + ))?)?; + framework.wait( + "test-vector-test-pod", + vec!["pods/test-pod"], + WaitFor::Condition("initialized"), + vec!["--timeout=30s"], + )?; + + let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader); + + // Read the rest of the log lines. + let mut got_expected_line = false; + look_for_log_line(&mut log_reader, |val| { + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + // A log from something other than our test pod, predend we don't + // see it. + return FlowControlCommand::GoOn; + } + + // Ensure the message we got matches the one we emitted. + assert_eq!(val["message"], test_message); + + if got_expected_line { + // We've already seen our expected line once! This is not good, we + // only emitted one. + panic!("test message seen more than once"); + } + + // If we did, remember it. + got_expected_line = true; + + // Request to stop the flow. + FlowControlCommand::Terminate + })?; + + assert!(got_expected_line); + + drop(test_pod); + drop(test_namespace); + drop(vector); + Ok(()) +} + +/// This test validates that vector partail message merging mechanism doesn't +/// interfere with the non-partial messages that don't end with newline. +#[test] +fn no_newline_at_eol() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let vector = framework.vector("test-vector", VECTOR_CONFIG)?; + framework.wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=10s"])?; + + let test_namespace = framework.namespace("test-vector-test-pod")?; + + let test_pod = framework.test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod", + "echo -n MARKER", // `-n` doesn't print newline + vec![], + ))?)?; + framework.wait( + "test-vector-test-pod", + vec!["pods/test-pod"], + WaitFor::Condition("initialized"), + vec!["--timeout=30s"], + )?; + + let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader); + + // Read the rest of the log lines. + let mut got_expected_line = false; + look_for_log_line(&mut log_reader, |val| { + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + // A log from something other than our test pod, predend we don't + // see it. + return FlowControlCommand::GoOn; + } + + // Ensure the message we got matches the one we emitted. + assert_eq!(val["message"], "MARKER"); + + if got_expected_line { + // We've already seen our expected line once! This is not good, we + // only emitted one. + panic!("test message seen more than once"); + } + + // If we did, remember it. + got_expected_line = true; + + // Request to stop the flow. + FlowControlCommand::Terminate + })?; + + assert!(got_expected_line); + + drop(test_pod); + drop(test_namespace); + drop(vector); + Ok(()) +} + +/// This test validates that vector picks up preexisting logs - logs that +/// existed before vector was deployed. +#[test] +fn preexisting() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let test_namespace = framework.namespace("test-vector-test-pod")?; + + let test_pod = framework.test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod", + "echo MARKER", + vec![], + ))?)?; + framework.wait( + "test-vector-test-pod", + vec!["pods/test-pod"], + WaitFor::Condition("initialized"), + vec!["--timeout=30s"], + )?; + + // Wait for some extra time to ensure pod completes. + std::thread::sleep(std::time::Duration::from_secs(10)); + + let vector = framework.vector("test-vector", VECTOR_CONFIG)?; + framework.wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=10s"])?; + + let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader); // Read the rest of the log lines. - let mut lines_till_we_give_up = 10000; let mut got_marker = false; - while let Some(line) = log_reader.next() { - println!("Got line: {:?}", line); + look_for_log_line(&mut log_reader, |val| { + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + // A log from something other than our test pod, predend we don't + // see it. + return FlowControlCommand::GoOn; + } - lines_till_we_give_up -= 1; - if lines_till_we_give_up <= 0 { - println!("Giving up"); - log_reader.kill()?; - break; + // Ensure we got the marker. + assert_eq!(val["message"], "MARKER"); + + if got_marker { + // We've already seen one marker! This is not good, we only emitted + // one. + panic!("marker seen more than once"); } - if !line.starts_with("{") { - // This isn't a json, must be an entry from Vector's own log stream. - continue; + // If we did, remember it. + got_marker = true; + + // Request to stop the flow. + FlowControlCommand::Terminate + })?; + + assert!(got_marker); + + drop(test_pod); + drop(test_namespace); + drop(vector); + Ok(()) +} + +/// This test validates that vector picks up multiple log lines, and that they +/// arrive at the proper order. +#[test] +fn multiple_lines() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let vector = framework.vector("test-vector", VECTOR_CONFIG)?; + framework.wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=10s"])?; + + let test_namespace = framework.namespace("test-vector-test-pod")?; + + let test_messages = vec!["MARKER1", "MARKER2", "MARKER3", "MARKER4", "MARKER5"]; + let test_pod = framework.test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod", + &format!("echo -e {}", test_messages.join(r"\\n")), + vec![], + ))?)?; + framework.wait( + "test-vector-test-pod", + vec!["pods/test-pod"], + WaitFor::Condition("initialized"), + vec!["--timeout=30s"], + )?; + + let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader); + + // Read the rest of the log lines. + let mut test_messages_iter = test_messages.into_iter().peekable(); + look_for_log_line(&mut log_reader, |val| { + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + // A log from something other than our test pod, predend we don't + // see it. + return FlowControlCommand::GoOn; } - let val = parse_json(&line)?; + // Take the next marker. + let current_marker = test_messages_iter + .next() + .expect("expected no more lines since the test messages iter is exausted"); + + // Ensure we got the marker. + assert_eq!(val["message"], current_marker); + + if test_messages_iter.peek().is_some() { + // We're not done yet, so go on. + return FlowControlCommand::GoOn; + } + + // Request to stop the flow. + FlowControlCommand::Terminate + })?; + assert!(test_messages_iter.next().is_none()); + + drop(test_pod); + drop(test_namespace); + drop(vector); + Ok(()) +} + +/// This test validates that vector properly annotates log events with pod +/// metadata obtained from the k8s API. +#[test] +fn pod_metadata_annotation() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let vector = framework.vector("test-vector", VECTOR_CONFIG)?; + framework.wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=10s"])?; + + let test_namespace = framework.namespace("test-vector-test-pod")?; + + let test_pod = framework.test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod", + "echo MARKER", + vec![("label1", "hello"), ("label2", "world")], + ))?)?; + framework.wait( + "test-vector-test-pod", + vec!["pods/test-pod"], + WaitFor::Condition("initialized"), + vec!["--timeout=30s"], + )?; + + let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader); + + // Read the rest of the log lines. + let mut got_marker = false; + look_for_log_line(&mut log_reader, |val| { if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { // A log from something other than our test pod, predend we don't // see it. - continue; + return FlowControlCommand::GoOn; } // Ensure we got the marker. @@ -129,12 +515,17 @@ fn test() -> Result<(), Box> { // If we did, remember it. got_marker = true; - // We got a marker, so we're pretty much done. - log_reader.kill()?; - } + // Assert pod the event is properly annotated with pod metadata. + assert_eq!(val["kubernetes"]["pod_name"], "test-pod"); + // We've already asserted this above, but repeat for completeness. + assert_eq!(val["kubernetes"]["pod_namespace"], "test-vector-test-pod"); + assert_eq!(val["kubernetes"]["pod_uid"].as_str().unwrap().len(), 36); // 36 is a standard UUID string length + assert_eq!(val["kubernetes"]["pod_labels"]["label1"], "hello"); + assert_eq!(val["kubernetes"]["pod_labels"]["label2"], "world"); - // Ensure log reader exited. - log_reader.wait().expect("log reader wait failed"); + // Request to stop the flow. + FlowControlCommand::Terminate + })?; assert!(got_marker);