Skip to content

Commit

Permalink
--wip-- [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
MOZGIII committed Jun 10, 2020
1 parent 5fc6e96 commit c63e8ef
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 0 deletions.
10 changes: 10 additions & 0 deletions lib/kubernetes-test-framework/src/log_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ impl Reader {
pub fn kill(&mut self) -> std::io::Result<()> {
self.child.kill()
}

pub fn next_with_timeout(&mut self) -> Option<Result<String>> {
let mut s = String::new();
let result = self.reader.read_line(&mut s);
match result {
Ok(0) => None,
Ok(_) => Some(s),
Err(err) => panic!(err),
}
}
}

impl Iterator for Reader {
Expand Down
115 changes: 115 additions & 0 deletions tests/kubernetes-e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,3 +473,118 @@ fn pod_metadata_annotation() -> Result<(), Box<dyn std::error::Error>> {
drop(vector);
Ok(())
}

#[test]
fn does_not_pick_up_excluded() -> Result<(), Box<dyn std::error::Error>> {
let _guard = lock();
let framework = make_framework();

let vector = framework.vector("test-vector", VECTOR_CONFIG)?;
framework.wait_for_rollout("test-vector", "daemonset/vector", vec!["--timeout=10s"])?;

let test_namespace = framework.namespace("test-vector-test-pod")?;

let test_pod = framework.test_pod(test_pod::Config::from_pod(&make_test_pod(
"test-vector-test-pod",
"test-pod",
"echo MARKER",
vec![("vector.dev/exclude", "true")],
))?)?;
framework.wait(
"test-vector-test-pod",
vec!["pods/test-pod"],
WaitFor::Condition("initialized"),
vec!["--timeout=30s"],
)?;

let mut log_reader = framework.logs("test-vector", "daemonset/vector")?;
smoke_check_first_line(&mut log_reader);

// Read the rest of the log lines.
let mut got_marker = false;
loop {
println!("Got line: {:?}", line);

if !line.starts_with("{") {
// This isn't a json, must be an entry from Vector's own log stream.
continue;
}

let val = parse_json(&line)?;

if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" {
// A log from something other than our test pod, predend we don't
// see it.
return FlowControlCommand::GoOn;
}

// Ensure we got the marker.
assert_eq!(val["message"], "MARKER");

if got_marker {
// We've already seen one marker! This is not good, we only emitted
// one.
panic!("marker seen more than once");
}

// If we did, remember it.
got_marker = true;
}

// Ensure log reader exited.
log_reader.wait().expect("log reader wait failed");

assert!(!got_marker);

deadline.join().unwrap();
reader.join().unwrap();

drop(test_pod);
drop(test_namespace);
drop(vector);
Ok(())
}

struct DeadlineIter<T>
where
T: Send,
{
handle: std::thread::JoinHandle<()>,
receiver: std::sync::mpsc::Receiver<Option<T>>,
deadline: std::time::Instant,
}

impl<T> DeadlineIter<T>
where
T: Send,
{
pub fn new<I>(iter: I, deadline: std::time::Instant) -> Self
where
I: Iterator<Item = T> + Send,
{
let (sender, receiver) = std::sync::mpsc::channel();
std::thread::spawn(move || {
for item in iter {
sender.send(Some(item)).unwrap();
}
sender.send(None).unwrap();
});
Self {
handle,
receiver,
deadline,
}
}
}

impl<T> Iterator for DeadlineIter<T>
where
T: Send,
{
type Item = Result<T, RecvTimeoutError>;

fn next(&mut self) -> Option<Self::Item> {
let timeout = self.deadline - std::time::Instant::now();
self.receiver.recv_timeout(timeout).transpose()
}
}

0 comments on commit c63e8ef

Please sign in to comment.