Skip to content

Commit

Permalink
Use tracing in the shim layer and e2e tests (#490)
Browse files Browse the repository at this point in the history
Signed-off-by: Xudong Sun <[email protected]>
  • Loading branch information
marshtompsxd authored Jun 4, 2024
1 parent 9aaf55e commit e0917a9
Show file tree
Hide file tree
Showing 17 changed files with 676 additions and 461 deletions.
69 changes: 69 additions & 0 deletions e2e/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions e2e/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ thiserror = "1.0.29"
tokio-stream = { version = "0.1.9", features = ["net"] }
zookeeper = "0.8"
tungstenite = "0.20.1"
tracing = "0.1.36"
tracing-subscriber = "0.3.17"
15 changes: 8 additions & 7 deletions e2e/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use kube::{
};
use std::process::Command;
use thiserror::Error;
use tracing::*;

#[derive(Debug, Error)]
pub enum Error {
Expand Down Expand Up @@ -72,18 +73,18 @@ pub async fn apply(yaml: String, client: Client, discovery: &Discovery) -> Resul
let gvk = if let Some(tm) = &obj.types {
GroupVersionKind::try_from(tm).unwrap()
} else {
println!("cannot apply object without valid TypeMeta {:?}", obj);
error!("cannot apply object without valid TypeMeta {:?}", obj);
return Err(Error::ApplyFailed);
};
let name = obj.name_any();
if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
let api = dynamic_api(ar, caps, client.clone(), namespace, false);
println!("Applying {}: \n{}", gvk.kind, serde_yaml::to_string(&obj)?);
info!("Applying {}: \n{}", gvk.kind, serde_yaml::to_string(&obj)?);
let data: serde_json::Value = serde_json::to_value(&obj)?;
let _r = api.patch(&name, &ssapply, &Patch::Apply(data)).await?;
println!("applied {} {}", gvk.kind, name);
info!("applied {} {}", gvk.kind, name);
} else {
println!("Cannot apply document for unknown {:?}", gvk);
error!("Cannot apply document for unknown {:?}", gvk);
return Err(Error::ApplyFailed);
}

Expand Down Expand Up @@ -124,10 +125,10 @@ pub async fn get_output_and_err(mut attached: AttachedProcess) -> (String, Strin
}

pub fn run_command(program: &str, args: Vec<&str>, err_msg: &str) -> (String, String) {
println!("{} {}", program, args.join(" "));
info!("{} {}", program, args.join(" "));
let cmd = Command::new(program).args(args).output().expect(err_msg);
println!("cmd output: {}", String::from_utf8_lossy(&cmd.stdout));
println!("cmd error: {}", String::from_utf8_lossy(&cmd.stderr));
info!("cmd output: {}", String::from_utf8_lossy(&cmd.stdout));
info!("cmd error: {}", String::from_utf8_lossy(&cmd.stderr));
(
String::from_utf8_lossy(&cmd.stdout).to_string(),
String::from_utf8_lossy(&cmd.stderr).to_string(),
Expand Down
52 changes: 28 additions & 24 deletions e2e/src/fluent_e2e.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
use k8s_openapi::api::core::v1::{Pod, ServiceAccount, Service};
use k8s_openapi::api::core::v1::{Pod, Service, ServiceAccount};
use k8s_openapi::api::rbac::v1::RoleBinding;
use k8s_openapi::api::{apps::v1::DaemonSet, rbac::v1::Role};
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
Expand All @@ -20,6 +20,7 @@ use std::path::PathBuf;
use std::thread;
use std::time::{Duration, Instant};
use tokio::time::sleep;
use tracing::*;

use crate::common::*;

Expand Down Expand Up @@ -115,6 +116,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E
loop {
sleep(Duration::from_secs(5)).await;
if start.elapsed() > timeout {
error!("Time out on desired state test");
return Err(Error::Timeout);
}
// Check daemon set
Expand All @@ -127,7 +129,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E
let role = role_api.get(&(fb_name.clone() + "-role")).await;
match role {
Err(e) => {
println!("Get role failed with error {}.", e);
info!("Get role failed with error {}.", e);
continue;
}
_ => {}
Expand All @@ -136,7 +138,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E
let sa = sa_api.get(&fb_name.clone()).await;
match sa {
Err(e) => {
println!("Get service account failed with error {}.", e);
info!("Get service account failed with error {}.", e);
continue;
}
_ => {}
Expand All @@ -145,7 +147,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E
let rb = rb_api.get(&(fb_name.clone() + "-role-binding")).await;
match rb {
Err(e) => {
println!("Get role binding failed with error {}.", e);
info!("Get role binding failed with error {}.", e);
continue;
}
_ => {}
Expand All @@ -154,7 +156,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E
let svc = svc_api.get(&fb_name).await;
match svc {
Err(e) => {
println!("Get service failed with error {}.", e);
info!("Get service failed with error {}.", e);
continue;
}
_ => {}
Expand All @@ -163,24 +165,24 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E
let ds = ds_api.get(&fb_name).await;
match ds {
Err(e) => {
println!("Get daemon set failed with error {}.", e);
info!("Get daemon set failed with error {}.", e);
continue;
}
Ok(ds) => {
if ds.status.as_ref().unwrap().number_ready == node_number() {
// this number depends on the number of nodes
println!("All daemons are ready now.");
info!("All daemons are ready now.");
break;
} else {
println!(
info!(
"Only {} daemons are ready now.",
ds.status.as_ref().unwrap().number_ready
);
}
}
};
}
println!("Desired state test passed.");
info!("Desired state test passed.");
Ok(())
}

Expand All @@ -206,14 +208,15 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error>
loop {
sleep(Duration::from_secs(5)).await;
if start.elapsed() > timeout {
error!("Time out on relabel test");
return Err(Error::Timeout);
}

// Check daemon set
let ds = ds_api.get(&fb_name).await;
match ds {
Err(e) => {
println!("Get daemon set failed with error {}.", e);
info!("Get daemon set failed with error {}.", e);
continue;
}
Ok(ds) => {
Expand All @@ -230,7 +233,7 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error>
.unwrap()
.contains_key("key")
{
println!("Label for pod is not updated yet");
info!("Label for pod is not updated yet");
continue;
}

Expand All @@ -241,7 +244,7 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error>
.updated_number_scheduled
.is_none()
{
println!("No daemon set pod is updated yet.");
info!("No daemon set pod is updated yet.");
continue;
} else if *ds
.status
Expand All @@ -252,9 +255,9 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error>
.unwrap()
== node_number()
{
println!("Relabel is done.");
info!("Relabel is done.");
} else {
println!(
info!(
"Relabel is in progress. {} pods are updated now.",
ds.status
.as_ref()
Expand All @@ -267,10 +270,10 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error>
}

if ds.status.as_ref().unwrap().number_ready == node_number() {
println!("All daemon set pods are ready.");
info!("All daemon set pods are ready.");
break;
} else {
println!(
info!(
"Only {} pods are ready now.",
ds.status.as_ref().unwrap().number_ready
);
Expand All @@ -280,7 +283,7 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error>
};
}

println!("Relabel test passed.");
info!("Relabel test passed.");
Ok(())
}

Expand All @@ -306,14 +309,15 @@ pub async fn service_selector_test(client: Client, fb_name: String) -> Result<()
loop {
sleep(Duration::from_secs(5)).await;
if start.elapsed() > timeout {
error!("Time out on service selector test");
return Err(Error::Timeout);
}

// Check daemon set
let svc = svc_api.get(&fb_name).await;
match svc {
Err(e) => {
println!("Get service failed with error {}.", e);
info!("Get service failed with error {}.", e);
continue;
}
Ok(svc) => {
Expand All @@ -326,16 +330,16 @@ pub async fn service_selector_test(client: Client, fb_name: String) -> Result<()
.unwrap()
.contains_key("never-match-anything")
{
println!("Selector for service is updated yet");
info!("Selector for service is updated yet");
break;
} else {
println!("Selector for service is not updated yet");
info!("Selector for service is not updated yet");
}
}
};
}

println!("Service selector test passed.");
info!("Service selector test passed.");
Ok(())
}

Expand All @@ -346,11 +350,11 @@ pub async fn fluent_e2e_test() -> Result<(), Error> {
let fb_crd = crd_api.get("fluentbits.anvil.dev").await;
match fb_crd {
Err(e) => {
println!("No CRD found, create one before run the e2e test.");
error!("No CRD found, create one before run the e2e test.");
return Err(Error::CRDGetFailed(e));
}
Ok(crd) => {
println!("CRD found, continue to run the e2e test.");
info!("CRD found, continue to run the e2e test.");
}
}

Expand All @@ -363,6 +367,6 @@ pub async fn fluent_e2e_test() -> Result<(), Error> {
relabel_test(client.clone(), fb_name.clone()).await?;
service_selector_test(client.clone(), fb_name.clone()).await?;

println!("E2e test passed.");
info!("E2e test passed.");
Ok(())
}
Loading

0 comments on commit e0917a9

Please sign in to comment.