Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use tracing in the shim layer and e2e tests #490

Merged
merged 3 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions e2e/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions e2e/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ thiserror = "1.0.29"
tokio-stream = { version = "0.1.9", features = ["net"] }
zookeeper = "0.8"
tungstenite = "0.20.1"
tracing = "0.1.36"
tracing-subscriber = "0.3.17"
15 changes: 8 additions & 7 deletions e2e/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use kube::{
};
use std::process::Command;
use thiserror::Error;
use tracing::*;

#[derive(Debug, Error)]
pub enum Error {
Expand Down Expand Up @@ -72,18 +73,18 @@ pub async fn apply(yaml: String, client: Client, discovery: &Discovery) -> Resul
let gvk = if let Some(tm) = &obj.types {
GroupVersionKind::try_from(tm).unwrap()
} else {
println!("cannot apply object without valid TypeMeta {:?}", obj);
error!("cannot apply object without valid TypeMeta {:?}", obj);
return Err(Error::ApplyFailed);
};
let name = obj.name_any();
if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
let api = dynamic_api(ar, caps, client.clone(), namespace, false);
println!("Applying {}: \n{}", gvk.kind, serde_yaml::to_string(&obj)?);
info!("Applying {}: \n{}", gvk.kind, serde_yaml::to_string(&obj)?);
let data: serde_json::Value = serde_json::to_value(&obj)?;
let _r = api.patch(&name, &ssapply, &Patch::Apply(data)).await?;
println!("applied {} {}", gvk.kind, name);
info!("applied {} {}", gvk.kind, name);
} else {
println!("Cannot apply document for unknown {:?}", gvk);
error!("Cannot apply document for unknown {:?}", gvk);
return Err(Error::ApplyFailed);
}

Expand Down Expand Up @@ -124,10 +125,10 @@ pub async fn get_output_and_err(mut attached: AttachedProcess) -> (String, Strin
}

pub fn run_command(program: &str, args: Vec<&str>, err_msg: &str) -> (String, String) {
println!("{} {}", program, args.join(" "));
info!("{} {}", program, args.join(" "));
let cmd = Command::new(program).args(args).output().expect(err_msg);
println!("cmd output: {}", String::from_utf8_lossy(&cmd.stdout));
println!("cmd error: {}", String::from_utf8_lossy(&cmd.stderr));
info!("cmd output: {}", String::from_utf8_lossy(&cmd.stdout));
info!("cmd error: {}", String::from_utf8_lossy(&cmd.stderr));
(
String::from_utf8_lossy(&cmd.stdout).to_string(),
String::from_utf8_lossy(&cmd.stderr).to_string(),
Expand Down
52 changes: 28 additions & 24 deletions e2e/src/fluent_e2e.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
use k8s_openapi::api::core::v1::{Pod, ServiceAccount, Service};
use k8s_openapi::api::core::v1::{Pod, Service, ServiceAccount};
use k8s_openapi::api::rbac::v1::RoleBinding;
use k8s_openapi::api::{apps::v1::DaemonSet, rbac::v1::Role};
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
Expand All @@ -20,6 +20,7 @@ use std::path::PathBuf;
use std::thread;
use std::time::{Duration, Instant};
use tokio::time::sleep;
use tracing::*;

use crate::common::*;

Expand Down Expand Up @@ -115,6 +116,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E
loop {
sleep(Duration::from_secs(5)).await;
if start.elapsed() > timeout {
error!("Time out on desired state test");
return Err(Error::Timeout);
}
// Check daemon set
Expand All @@ -127,7 +129,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E
let role = role_api.get(&(fb_name.clone() + "-role")).await;
match role {
Err(e) => {
println!("Get role failed with error {}.", e);
info!("Get role failed with error {}.", e);
continue;
}
_ => {}
Expand All @@ -136,7 +138,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E
let sa = sa_api.get(&fb_name.clone()).await;
match sa {
Err(e) => {
println!("Get service account failed with error {}.", e);
info!("Get service account failed with error {}.", e);
continue;
}
_ => {}
Expand All @@ -145,7 +147,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E
let rb = rb_api.get(&(fb_name.clone() + "-role-binding")).await;
match rb {
Err(e) => {
println!("Get role binding failed with error {}.", e);
info!("Get role binding failed with error {}.", e);
continue;
}
_ => {}
Expand All @@ -154,7 +156,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E
let svc = svc_api.get(&fb_name).await;
match svc {
Err(e) => {
println!("Get service failed with error {}.", e);
info!("Get service failed with error {}.", e);
continue;
}
_ => {}
Expand All @@ -163,24 +165,24 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E
let ds = ds_api.get(&fb_name).await;
match ds {
Err(e) => {
println!("Get daemon set failed with error {}.", e);
info!("Get daemon set failed with error {}.", e);
continue;
}
Ok(ds) => {
if ds.status.as_ref().unwrap().number_ready == node_number() {
// this number depends on the number of nodes
println!("All daemons are ready now.");
info!("All daemons are ready now.");
break;
} else {
println!(
info!(
"Only {} daemons are ready now.",
ds.status.as_ref().unwrap().number_ready
);
}
}
};
}
println!("Desired state test passed.");
info!("Desired state test passed.");
Ok(())
}

Expand All @@ -206,14 +208,15 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error>
loop {
sleep(Duration::from_secs(5)).await;
if start.elapsed() > timeout {
error!("Time out on relabel test");
return Err(Error::Timeout);
}

// Check daemon set
let ds = ds_api.get(&fb_name).await;
match ds {
Err(e) => {
println!("Get daemon set failed with error {}.", e);
info!("Get daemon set failed with error {}.", e);
continue;
}
Ok(ds) => {
Expand All @@ -230,7 +233,7 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error>
.unwrap()
.contains_key("key")
{
println!("Label for pod is not updated yet");
info!("Label for pod is not updated yet");
continue;
}

Expand All @@ -241,7 +244,7 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error>
.updated_number_scheduled
.is_none()
{
println!("No daemon set pod is updated yet.");
info!("No daemon set pod is updated yet.");
continue;
} else if *ds
.status
Expand All @@ -252,9 +255,9 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error>
.unwrap()
== node_number()
{
println!("Relabel is done.");
info!("Relabel is done.");
} else {
println!(
info!(
"Relabel is in progress. {} pods are updated now.",
ds.status
.as_ref()
Expand All @@ -267,10 +270,10 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error>
}

if ds.status.as_ref().unwrap().number_ready == node_number() {
println!("All daemon set pods are ready.");
info!("All daemon set pods are ready.");
break;
} else {
println!(
info!(
"Only {} pods are ready now.",
ds.status.as_ref().unwrap().number_ready
);
Expand All @@ -280,7 +283,7 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error>
};
}

println!("Relabel test passed.");
info!("Relabel test passed.");
Ok(())
}

Expand All @@ -306,14 +309,15 @@ pub async fn service_selector_test(client: Client, fb_name: String) -> Result<()
loop {
sleep(Duration::from_secs(5)).await;
if start.elapsed() > timeout {
error!("Time out on service selector test");
return Err(Error::Timeout);
}

// Check daemon set
let svc = svc_api.get(&fb_name).await;
match svc {
Err(e) => {
println!("Get service failed with error {}.", e);
info!("Get service failed with error {}.", e);
continue;
}
Ok(svc) => {
Expand All @@ -326,16 +330,16 @@ pub async fn service_selector_test(client: Client, fb_name: String) -> Result<()
.unwrap()
.contains_key("never-match-anything")
{
println!("Selector for service is updated yet");
info!("Selector for service is updated yet");
break;
} else {
println!("Selector for service is not updated yet");
info!("Selector for service is not updated yet");
}
}
};
}

println!("Service selector test passed.");
info!("Service selector test passed.");
Ok(())
}

Expand All @@ -346,11 +350,11 @@ pub async fn fluent_e2e_test() -> Result<(), Error> {
let fb_crd = crd_api.get("fluentbits.anvil.dev").await;
match fb_crd {
Err(e) => {
println!("No CRD found, create one before run the e2e test.");
error!("No CRD found, create one before run the e2e test.");
return Err(Error::CRDGetFailed(e));
}
Ok(crd) => {
println!("CRD found, continue to run the e2e test.");
info!("CRD found, continue to run the e2e test.");
}
}

Expand All @@ -363,6 +367,6 @@ pub async fn fluent_e2e_test() -> Result<(), Error> {
relabel_test(client.clone(), fb_name.clone()).await?;
service_selector_test(client.clone(), fb_name.clone()).await?;

println!("E2e test passed.");
info!("E2e test passed.");
Ok(())
}
Loading
Loading