From c63e8ef50c9cbe2129b68205e793eb8fc5cfa986 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Wed, 10 Jun 2020 06:36:03 +0300 Subject: [PATCH] --wip-- [skip ci] --- .../src/log_lookup.rs | 10 ++ tests/kubernetes-e2e.rs | 115 ++++++++++++++++++ 2 files changed, 125 insertions(+) diff --git a/lib/kubernetes-test-framework/src/log_lookup.rs b/lib/kubernetes-test-framework/src/log_lookup.rs index c712d04bc30553..b25c22d9321f6f 100644 --- a/lib/kubernetes-test-framework/src/log_lookup.rs +++ b/lib/kubernetes-test-framework/src/log_lookup.rs @@ -45,6 +45,16 @@ impl Reader { pub fn kill(&mut self) -> std::io::Result<()> { self.child.kill() } + + pub fn next_with_timeout(&mut self) -> Option> { + let mut s = String::new(); + let result = self.reader.read_line(&mut s); + match result { + Ok(0) => None, + Ok(_) => Some(s), + Err(err) => panic!(err), + } + } } impl Iterator for Reader { diff --git a/tests/kubernetes-e2e.rs b/tests/kubernetes-e2e.rs index b9887b94842d71..95626f798182fa 100644 --- a/tests/kubernetes-e2e.rs +++ b/tests/kubernetes-e2e.rs @@ -473,3 +473,118 @@ fn pod_metadata_annotation() -> Result<(), Box> { drop(vector); Ok(()) } + +#[test] +fn does_not_pick_up_excluded() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let vector = framework.vector("test-vector", VECTOR_CONFIG)?; + framework.wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=10s"])?; + + let test_namespace = framework.namespace("test-vector-test-pod")?; + + let test_pod = framework.test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod", + "echo MARKER", + vec![("vector.dev/exclude", "true")], + ))?)?; + framework.wait( + "test-vector-test-pod", + vec!["pods/test-pod"], + WaitFor::Condition("initialized"), + vec!["--timeout=30s"], + )?; + + let mut log_reader = framework.logs("test-vector", "daemonset/vector")?; + smoke_check_first_line(&mut log_reader); + + // Read the rest of the log lines. + let mut got_marker = false; + loop { + println!("Got line: {:?}", line); + + if !line.starts_with("{") { + // This isn't a json, must be an entry from Vector's own log stream. + continue; + } + + let val = parse_json(&line)?; + + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + // A log from something other than our test pod, predend we don't + // see it. + return FlowControlCommand::GoOn; + } + + // Ensure we got the marker. + assert_eq!(val["message"], "MARKER"); + + if got_marker { + // We've already seen one marker! This is not good, we only emitted + // one. + panic!("marker seen more than once"); + } + + // If we did, remember it. + got_marker = true; + } + + // Ensure log reader exited. + log_reader.wait().expect("log reader wait failed"); + + assert!(!got_marker); + + deadline.join().unwrap(); + reader.join().unwrap(); + + drop(test_pod); + drop(test_namespace); + drop(vector); + Ok(()) +} + +struct DeadlineIter +where + T: Send, +{ + handle: std::thread::JoinHandle<()>, + receiver: std::sync::mpsc::Receiver>, + deadline: std::time::Instant, +} + +impl DeadlineIter +where + T: Send, +{ + pub fn new(iter: I, deadline: std::time::Instant) -> Self + where + I: Iterator + Send, + { + let (sender, receiver) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + for item in iter { + sender.send(Some(item)).unwrap(); + } + sender.send(None).unwrap(); + }); + Self { + handle, + receiver, + deadline, + } + } +} + +impl Iterator for DeadlineIter +where + T: Send, +{ + type Item = Result; + + fn next(&mut self) -> Option { + let timeout = self.deadline - std::time::Instant::now(); + self.receiver.recv_timeout(timeout).transpose() + } +}