From 06dfe08414fb64ea3a950becb934ba2f03cb3308 Mon Sep 17 00:00:00 2001 From: Xudong Sun Date: Mon, 3 Jun 2024 20:35:17 -0500 Subject: [PATCH] Use tracing Signed-off-by: Xudong Sun --- e2e/Cargo.toml | 2 + e2e/src/common.rs | 15 +- e2e/src/fluent_e2e.rs | 52 ++--- e2e/src/main.rs | 21 +- e2e/src/rabbitmq_e2e.rs | 152 +++++++------- e2e/src/zookeeper_e2e.rs | 185 +++++++++--------- .../trusted/zookeeper_api_exec.rs | 150 +++++++++----- src/deps_hack/src/lib.rs | 4 +- src/fluent_controller.rs | 47 ++--- src/rabbitmq_controller.rs | 26 +-- src/shim_layer/controller_runtime.rs | 51 +++-- src/shim_layer/fault_injection.rs | 58 ++++-- src/v_replica_set_controller.rs | 12 +- src/v_stateful_set_controller.rs | 25 +-- src/zookeeper_controller.rs | 26 +-- 15 files changed, 452 insertions(+), 374 deletions(-) diff --git a/e2e/Cargo.toml b/e2e/Cargo.toml index 16f3cf477..81745dec2 100644 --- a/e2e/Cargo.toml +++ b/e2e/Cargo.toml @@ -33,3 +33,5 @@ thiserror = "1.0.29" tokio-stream = { version = "0.1.9", features = ["net"] } zookeeper = "0.8" tungstenite = "0.20.1" +tracing = "0.1.36" +tracing-subscriber = "0.3.17" diff --git a/e2e/src/common.rs b/e2e/src/common.rs index a1ad43b4a..16c467de3 100644 --- a/e2e/src/common.rs +++ b/e2e/src/common.rs @@ -14,6 +14,7 @@ use kube::{ }; use std::process::Command; use thiserror::Error; +use tracing::*; #[derive(Debug, Error)] pub enum Error { @@ -72,18 +73,18 @@ pub async fn apply(yaml: String, client: Client, discovery: &Discovery) -> Resul let gvk = if let Some(tm) = &obj.types { GroupVersionKind::try_from(tm).unwrap() } else { - println!("cannot apply object without valid TypeMeta {:?}", obj); + error!("cannot apply object without valid TypeMeta {:?}", obj); return Err(Error::ApplyFailed); }; let name = obj.name_any(); if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) { let api = dynamic_api(ar, caps, client.clone(), namespace, false); - println!("Applying {}: \n{}", gvk.kind, serde_yaml::to_string(&obj)?); + info!("Applying {}: \n{}", gvk.kind, serde_yaml::to_string(&obj)?); let data: serde_json::Value = serde_json::to_value(&obj)?; let _r = api.patch(&name, &ssapply, &Patch::Apply(data)).await?; - println!("applied {} {}", gvk.kind, name); + info!("applied {} {}", gvk.kind, name); } else { - println!("Cannot apply document for unknown {:?}", gvk); + error!("Cannot apply document for unknown {:?}", gvk); return Err(Error::ApplyFailed); } @@ -124,10 +125,10 @@ pub async fn get_output_and_err(mut attached: AttachedProcess) -> (String, Strin } pub fn run_command(program: &str, args: Vec<&str>, err_msg: &str) -> (String, String) { - println!("{} {}", program, args.join(" ")); + info!("{} {}", program, args.join(" ")); let cmd = Command::new(program).args(args).output().expect(err_msg); - println!("cmd output: {}", String::from_utf8_lossy(&cmd.stdout)); - println!("cmd error: {}", String::from_utf8_lossy(&cmd.stderr)); + info!("cmd output: {}", String::from_utf8_lossy(&cmd.stdout)); + info!("cmd error: {}", String::from_utf8_lossy(&cmd.stderr)); ( String::from_utf8_lossy(&cmd.stdout).to_string(), String::from_utf8_lossy(&cmd.stderr).to_string(), diff --git a/e2e/src/fluent_e2e.rs b/e2e/src/fluent_e2e.rs index a754a565b..9ebceb6d2 100644 --- a/e2e/src/fluent_e2e.rs +++ b/e2e/src/fluent_e2e.rs @@ -1,6 +1,6 @@ #![allow(unused_imports)] #![allow(unused_variables)] -use k8s_openapi::api::core::v1::{Pod, ServiceAccount, Service}; +use k8s_openapi::api::core::v1::{Pod, Service, ServiceAccount}; use k8s_openapi::api::rbac::v1::RoleBinding; use k8s_openapi::api::{apps::v1::DaemonSet, rbac::v1::Role}; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; @@ -20,6 +20,7 @@ use std::path::PathBuf; use std::thread; use std::time::{Duration, Instant}; use tokio::time::sleep; +use tracing::*; use crate::common::*; @@ -115,6 +116,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on desired state test"); return Err(Error::Timeout); } // Check daemon set @@ -127,7 +129,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E let role = role_api.get(&(fb_name.clone() + "-role")).await; match role { Err(e) => { - println!("Get role failed with error {}.", e); + info!("Get role failed with error {}.", e); continue; } _ => {} @@ -136,7 +138,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E let sa = sa_api.get(&fb_name.clone()).await; match sa { Err(e) => { - println!("Get service account failed with error {}.", e); + info!("Get service account failed with error {}.", e); continue; } _ => {} @@ -145,7 +147,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E let rb = rb_api.get(&(fb_name.clone() + "-role-binding")).await; match rb { Err(e) => { - println!("Get role binding failed with error {}.", e); + info!("Get role binding failed with error {}.", e); continue; } _ => {} @@ -154,7 +156,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E let svc = svc_api.get(&fb_name).await; match svc { Err(e) => { - println!("Get service failed with error {}.", e); + info!("Get service failed with error {}.", e); continue; } _ => {} @@ -163,16 +165,16 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E let ds = ds_api.get(&fb_name).await; match ds { Err(e) => { - println!("Get daemon set failed with error {}.", e); + info!("Get daemon set failed with error {}.", e); continue; } Ok(ds) => { if ds.status.as_ref().unwrap().number_ready == node_number() { // this number depends on the number of nodes - println!("All daemons are ready now."); + info!("All daemons are ready now."); break; } else { - println!( + info!( "Only {} daemons are ready now.", ds.status.as_ref().unwrap().number_ready ); @@ -180,7 +182,7 @@ pub async fn desired_state_test(client: Client, fb_name: String) -> Result<(), E } }; } - println!("Desired state test passed."); + info!("Desired state test passed."); Ok(()) } @@ -206,6 +208,7 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error> loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on relabel test"); return Err(Error::Timeout); } @@ -213,7 +216,7 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error> let ds = ds_api.get(&fb_name).await; match ds { Err(e) => { - println!("Get daemon set failed with error {}.", e); + info!("Get daemon set failed with error {}.", e); continue; } Ok(ds) => { @@ -230,7 +233,7 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error> .unwrap() .contains_key("key") { - println!("Label for pod is not updated yet"); + info!("Label for pod is not updated yet"); continue; } @@ -241,7 +244,7 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error> .updated_number_scheduled .is_none() { - println!("No daemon set pod is updated yet."); + info!("No daemon set pod is updated yet."); continue; } else if *ds .status @@ -252,9 +255,9 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error> .unwrap() == node_number() { - println!("Relabel is done."); + info!("Relabel is done."); } else { - println!( + info!( "Relabel is in progress. {} pods are updated now.", ds.status .as_ref() @@ -267,10 +270,10 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error> } if ds.status.as_ref().unwrap().number_ready == node_number() { - println!("All daemon set pods are ready."); + info!("All daemon set pods are ready."); break; } else { - println!( + info!( "Only {} pods are ready now.", ds.status.as_ref().unwrap().number_ready ); @@ -280,7 +283,7 @@ pub async fn relabel_test(client: Client, fb_name: String) -> Result<(), Error> }; } - println!("Relabel test passed."); + info!("Relabel test passed."); Ok(()) } @@ -306,6 +309,7 @@ pub async fn service_selector_test(client: Client, fb_name: String) -> Result<() loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on service selector test"); return Err(Error::Timeout); } @@ -313,7 +317,7 @@ pub async fn service_selector_test(client: Client, fb_name: String) -> Result<() let svc = svc_api.get(&fb_name).await; match svc { Err(e) => { - println!("Get service failed with error {}.", e); + info!("Get service failed with error {}.", e); continue; } Ok(svc) => { @@ -326,16 +330,16 @@ pub async fn service_selector_test(client: Client, fb_name: String) -> Result<() .unwrap() .contains_key("never-match-anything") { - println!("Selector for service is updated yet"); + info!("Selector for service is updated yet"); break; } else { - println!("Selector for service is not updated yet"); + info!("Selector for service is not updated yet"); } } }; } - println!("Service selector test passed."); + info!("Service selector test passed."); Ok(()) } @@ -346,11 +350,11 @@ pub async fn fluent_e2e_test() -> Result<(), Error> { let fb_crd = crd_api.get("fluentbits.anvil.dev").await; match fb_crd { Err(e) => { - println!("No CRD found, create one before run the e2e test."); + error!("No CRD found, create one before run the e2e test."); return Err(Error::CRDGetFailed(e)); } Ok(crd) => { - println!("CRD found, continue to run the e2e test."); + info!("CRD found, continue to run the e2e test."); } } @@ -363,6 +367,6 @@ pub async fn fluent_e2e_test() -> Result<(), Error> { relabel_test(client.clone(), fb_name.clone()).await?; service_selector_test(client.clone(), fb_name.clone()).await?; - println!("E2e test passed."); + info!("E2e test passed."); Ok(()) } diff --git a/e2e/src/main.rs b/e2e/src/main.rs index 141845f72..f8e58d3ba 100644 --- a/e2e/src/main.rs +++ b/e2e/src/main.rs @@ -7,46 +7,49 @@ pub mod zookeeper_e2e; use common::Error; use fluent_e2e::fluent_e2e_test; -use rabbitmq_e2e::{rabbitmq_e2e_test, rabbitmq_scaling_e2e_test, rabbitmq_ephemeral_e2e_test}; +use rabbitmq_e2e::{rabbitmq_e2e_test, rabbitmq_ephemeral_e2e_test, rabbitmq_scaling_e2e_test}; use std::str::FromStr; use std::{env, sync::Arc}; +use tracing::*; use zookeeper_e2e::{zookeeper_e2e_test, zookeeper_ephemeral_e2e_test, zookeeper_scaling_e2e_test}; #[tokio::main] async fn main() -> Result<(), Error> { + tracing_subscriber::fmt::init(); + let args: Vec = env::args().collect(); let cmd = args[1].clone(); match cmd.as_str() { "zookeeper" => { - println!("Running zookeeper end-to-end test"); + info!("Running zookeeper end-to-end test"); return zookeeper_e2e_test().await; } "zookeeper-scaling" => { - println!("Running zookeeper end-to-end test for scaling"); + info!("Running zookeeper end-to-end test for scaling"); return zookeeper_scaling_e2e_test().await; } "zookeeper-ephemeral" => { - println!("Running zookeeper end-to-end test for ephemeral storage"); + info!("Running zookeeper end-to-end test for ephemeral storage"); return zookeeper_ephemeral_e2e_test().await; } "rabbitmq" => { - println!("Running rabbitmq end-to-end test"); + info!("Running rabbitmq end-to-end test"); return rabbitmq_e2e_test().await; } "rabbitmq-scaling" => { - println!("Running rabbitmq end-to-end test for scaling"); + info!("Running rabbitmq end-to-end test for scaling"); return rabbitmq_scaling_e2e_test().await; } "rabbitmq-ephemeral" => { - println!("Running rabbitmq end-to-end test for ephemeral storage"); + info!("Running rabbitmq end-to-end test for ephemeral storage"); return rabbitmq_ephemeral_e2e_test().await; } "fluent" => { - println!("Running fluent end-to-end test"); + info!("Running fluent end-to-end test"); return fluent_e2e_test().await; } _ => { - println!("Please specify one controller"); + error!("Wrong command. Please specify the correct e2e test workload."); Ok(()) } } diff --git a/e2e/src/rabbitmq_e2e.rs b/e2e/src/rabbitmq_e2e.rs index 34b46d199..4ff25a776 100644 --- a/e2e/src/rabbitmq_e2e.rs +++ b/e2e/src/rabbitmq_e2e.rs @@ -20,6 +20,7 @@ use std::path::PathBuf; use std::thread; use std::time::{Duration, Instant}; use tokio::time::sleep; +use tracing::*; use crate::common::*; @@ -69,6 +70,7 @@ pub async fn desired_state_test(client: Client, rabbitmq_name: String) -> Result loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on desired state test"); return Err(Error::Timeout); } // Check config map @@ -76,7 +78,7 @@ pub async fn desired_state_test(client: Client, rabbitmq_name: String) -> Result let cm = cm_api.get(&rabbitmq_cm_name).await; match cm { Err(e) => { - println!("Get config map failed with {}, continue to wait.", e); + info!("Get config map failed with {}, continue to wait.", e); continue; } Ok(cm) => { @@ -85,12 +87,12 @@ pub async fn desired_state_test(client: Client, rabbitmq_name: String) -> Result if !user_config.contains("default_user = new_user") || !user_config.contains("default_pass = new_pass") { - println!( + error!( "Config map is not consistent with rabbitmq cluster spec. E2e test failed." ); return Err(Error::RabbitmqConfigMapFailed); } - println!("Config map is found as expected."); + info!("Config map is found as expected."); } }; // Check stateful set @@ -98,17 +100,17 @@ pub async fn desired_state_test(client: Client, rabbitmq_name: String) -> Result let sts = sts_api.get(&rabbitmq_sts_name).await; match sts { Err(e) => { - println!("Get stateful set failed with {}, continue to wait.", e); + info!("Get stateful set failed with {}, continue to wait.", e); continue; } Ok(sts) => { if sts.spec.as_ref().unwrap().replicas != Some(3) { - println!("Stateful set spec is not consistent with rabbitmq cluster spec. E2e test failed."); + error!("Stateful set spec is not consistent with rabbitmq cluster spec. E2e test failed."); return Err(Error::RabbitmqStsFailed); } - println!("Stateful set is found as expected."); + info!("Stateful set is found as expected."); if sts.status.as_ref().unwrap().ready_replicas.is_none() { - println!("No stateful set pod is ready."); + info!("No stateful set pod is ready."); continue; } else if *sts .status @@ -119,10 +121,10 @@ pub async fn desired_state_test(client: Client, rabbitmq_name: String) -> Result .unwrap() == 3 { - println!("All stateful set pods are ready."); + info!("All stateful set pods are ready."); break; } else { - println!( + info!( "Only {} pods are ready now.", sts.status .as_ref() @@ -136,11 +138,10 @@ pub async fn desired_state_test(client: Client, rabbitmq_name: String) -> Result } }; } - println!("Desired state test passed."); + info!("Desired state test passed."); Ok(()) } - pub async fn relabel_test(client: Client, rabbitmq_name: String) -> Result<(), Error> { let rabbitmq_sts_name = format!("{}-server", &rabbitmq_name); let timeout = Duration::from_secs(360); @@ -164,6 +165,7 @@ pub async fn relabel_test(client: Client, rabbitmq_name: String) -> Result<(), E loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on relabel test"); return Err(Error::Timeout); } @@ -171,7 +173,7 @@ pub async fn relabel_test(client: Client, rabbitmq_name: String) -> Result<(), E let sts = sts_api.get(&rabbitmq_sts_name).await; match sts { Err(e) => { - println!("Get stateful set failed with error {}.", e); + info!("Get stateful set failed with error {}.", e); continue; } Ok(sts) => { @@ -188,12 +190,12 @@ pub async fn relabel_test(client: Client, rabbitmq_name: String) -> Result<(), E .unwrap() .contains_key("key") { - println!("Label for pod is not updated yet"); + info!("Label for pod is not updated yet"); continue; } if sts.status.as_ref().unwrap().updated_replicas.is_none() { - println!("No stateful set pod is updated yet."); + info!("No stateful set pod is updated yet."); continue; } else if *sts .status @@ -204,9 +206,9 @@ pub async fn relabel_test(client: Client, rabbitmq_name: String) -> Result<(), E .unwrap() == 3 { - println!("Relabel is done."); + info!("Relabel is done."); } else { - println!( + info!( "Relabel is in progress. {} pods are updated now.", sts.status .as_ref() @@ -219,7 +221,7 @@ pub async fn relabel_test(client: Client, rabbitmq_name: String) -> Result<(), E } if sts.status.as_ref().unwrap().ready_replicas.is_none() { - println!("No stateful set pod is ready."); + info!("No stateful set pod is ready."); continue; } else if *sts .status @@ -230,10 +232,10 @@ pub async fn relabel_test(client: Client, rabbitmq_name: String) -> Result<(), E .unwrap() == 3 { - println!("All stateful set pods are ready."); + info!("All stateful set pods are ready."); break; } else { - println!( + info!( "Only {} pods are ready now.", sts.status .as_ref() @@ -248,7 +250,7 @@ pub async fn relabel_test(client: Client, rabbitmq_name: String) -> Result<(), E }; } - println!("Relabel test passed."); + info!("Relabel test passed."); Ok(()) } @@ -275,6 +277,7 @@ pub async fn reconfiguration_test(client: Client, rabbitmq_name: String) -> Resu loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on reconfiguration test"); return Err(Error::Timeout); } @@ -282,12 +285,12 @@ pub async fn reconfiguration_test(client: Client, rabbitmq_name: String) -> Resu let sts = sts_api.get(&rabbitmq_sts_name).await; match sts { Err(e) => { - println!("Get stateful set failed with error {}.", e); + info!("Get stateful set failed with error {}.", e); continue; } Ok(sts) => { if sts.status.as_ref().unwrap().updated_replicas.is_none() { - println!("No stateful set pod is updated yet."); + info!("No stateful set pod is updated yet."); continue; } else if *sts .status @@ -298,9 +301,9 @@ pub async fn reconfiguration_test(client: Client, rabbitmq_name: String) -> Resu .unwrap() == 3 { - println!("Reconfiguration is done."); + info!("Reconfiguration is done."); } else { - println!( + info!( "Reconfiguration is in progress. {} pods are updated now.", sts.status .as_ref() @@ -313,7 +316,7 @@ pub async fn reconfiguration_test(client: Client, rabbitmq_name: String) -> Resu } if sts.status.as_ref().unwrap().ready_replicas.is_none() { - println!("No stateful set pod is ready."); + info!("No stateful set pod is ready."); continue; } else if *sts .status @@ -324,10 +327,10 @@ pub async fn reconfiguration_test(client: Client, rabbitmq_name: String) -> Resu .unwrap() == 3 { - println!("All stateful set pods are ready."); + info!("All stateful set pods are ready."); break; } else { - println!( + info!( "Only {} pods are ready now.", sts.status .as_ref() @@ -348,25 +351,29 @@ pub async fn reconfiguration_test(client: Client, rabbitmq_name: String) -> Resu let attached = pod_api .exec( pod_name.as_str(), - vec!["cat", "/etc/rabbitmq/conf.d/90-userDefinedConfiguration.conf"], + vec![ + "cat", + "/etc/rabbitmq/conf.d/90-userDefinedConfiguration.conf", + ], &AttachParams::default().stderr(true), ) .await?; let (out, err) = get_output_and_err(attached).await; if err != "" { - println!("Reconfiguration test failed with {}.", err); + error!("Reconfiguration test failed with {}.", err); return Err(Error::ZookeeperWorkloadFailed); } else { - println!("{}", out); - if !out.contains("log.console = true") || !out.contains("log.console.level = debug") - || !out.contains("log.console.formatter = json") { - println!("Test failed because of unexpected zoo.cfg data."); - println!("The config file is {}", out); + info!("The config file is: {}", out); + if !out.contains("log.console = true") + || !out.contains("log.console.level = debug") + || !out.contains("log.console.formatter = json") + { + error!("Test failed because of unexpected zoo.cfg data."); return Err(Error::ZookeeperWorkloadFailed); } } - println!("Reconfiguration test passed."); + info!("Reconfiguration test passed."); Ok(()) } @@ -392,25 +399,24 @@ pub async fn scaling_test(client: Client, rabbitmq_name: String) -> Result<(), E loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on scaling test"); return Err(Error::Timeout); } let sts = sts_api.get(&rabbitmq_sts_name).await; match sts { Err(e) => { - println!("Get stateful set failed with error {}.", e); + info!("Get stateful set failed with error {}.", e); continue; } Ok(sts) => { if sts.spec.unwrap().replicas != Some(4) { - println!( - "Stateful set spec is not consistent with rabbitmq cluster spec yet." - ); + info!("Stateful set spec is not consistent with rabbitmq cluster spec yet."); continue; } - println!("Stateful set is found as expected."); + info!("Stateful set is found as expected."); if sts.status.as_ref().unwrap().ready_replicas.is_none() { - println!("No stateful set pod is ready."); + info!("No stateful set pod is ready."); continue; } else if *sts .status @@ -421,10 +427,10 @@ pub async fn scaling_test(client: Client, rabbitmq_name: String) -> Result<(), E .unwrap() == 4 { - println!("Scale up is done with 4 replicas ready."); + info!("Scale up is done with 4 replicas ready."); break; } else { - println!( + info!( "Scale up is in progress. {} pods are ready now.", sts.status .as_ref() @@ -438,7 +444,7 @@ pub async fn scaling_test(client: Client, rabbitmq_name: String) -> Result<(), E } }; } - println!("Scaling test passed."); + info!("Scaling test passed."); Ok(()) } @@ -465,6 +471,7 @@ pub async fn upgrading_test(client: Client, rabbitmq_name: String) -> Result<(), loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on upgrading test"); return Err(Error::Timeout); } @@ -472,12 +479,12 @@ pub async fn upgrading_test(client: Client, rabbitmq_name: String) -> Result<(), let sts = sts_api.get(&rabbitmq_sts_name).await; match sts { Err(e) => { - println!("Get stateful set failed with error {}.", e); + info!("Get stateful set failed with error {}.", e); continue; } Ok(sts) => { if sts.status.as_ref().unwrap().updated_replicas.is_none() { - println!("No stateful set pod is updated yet."); + info!("No stateful set pod is updated yet."); continue; } else if *sts .status @@ -488,9 +495,9 @@ pub async fn upgrading_test(client: Client, rabbitmq_name: String) -> Result<(), .unwrap() == 3 { - println!("Upgrading is done."); + info!("Upgrading is done."); } else { - println!( + info!( "Upgrading is in progress. {} pods are updated now.", sts.status .as_ref() @@ -503,7 +510,7 @@ pub async fn upgrading_test(client: Client, rabbitmq_name: String) -> Result<(), } if sts.status.as_ref().unwrap().ready_replicas.is_none() { - println!("No stateful set pod is ready."); + info!("No stateful set pod is ready."); continue; } else if *sts .status @@ -514,10 +521,10 @@ pub async fn upgrading_test(client: Client, rabbitmq_name: String) -> Result<(), .unwrap() == 3 { - println!("All stateful set pods are ready."); + info!("All stateful set pods are ready."); break; } else { - println!( + info!( "Only {} pods are ready now.", sts.status .as_ref() @@ -532,7 +539,7 @@ pub async fn upgrading_test(client: Client, rabbitmq_name: String) -> Result<(), }; } - println!("Upgrading test passed."); + info!("Upgrading test passed."); Ok(()) } @@ -548,12 +555,12 @@ pub async fn authenticate_user_test(client: Client, rabbitmq_name: String) -> Re .await?; let (out, err) = get_output_and_err(attached).await; if err != "" { - println!("User and password test failed with {}.", err); + error!("User and password test failed with {}.", err); return Err(Error::RabbitmqUserPassFailed); } else { - println!("{}", out); + info!("{}", out); } - println!("Authenticate user test passed."); + info!("Authenticate user test passed."); Ok(()) } @@ -573,36 +580,37 @@ pub async fn rabbitmq_workload_test(client: Client, rabbitmq_name: String) -> Re let pod_name = "perf-test"; let pod_api: Api = Api::default_namespaced(client.clone()); let timeout = Duration::from_secs(600); - let pert_test_duration = Duration::from_secs(20); + let perf_test_duration = Duration::from_secs(20); let start = Instant::now(); let mut perf_test_start: Option = None; loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on perf test"); return Err(Error::Timeout); } match pod_api.get(pod_name).await { Err(e) => { - println!("Get pod failed with {}, continue to wait.", e); + info!("Get pod failed with {}, continue to wait.", e); continue; } Ok(pod) => { if pod.status.is_none() { - println!("Pod status is not available yet."); + info!("Pod status is not available yet."); continue; } else if pod.status.unwrap().phase != Some("Running".to_string()) { - println!("Perf test pod is not running yet."); + info!("Perf test pod is not running yet."); continue; } else { if perf_test_start.is_none() { - println!("Perf test starts running."); + info!("Perf test starts running."); perf_test_start = Some(Instant::now()); continue; } else { - if perf_test_start.unwrap().elapsed() > pert_test_duration { + if perf_test_start.unwrap().elapsed() > perf_test_duration { break; } else { - println!("Keep running perf test."); + info!("Keep running perf test."); continue; } } @@ -611,7 +619,7 @@ pub async fn rabbitmq_workload_test(client: Client, rabbitmq_name: String) -> Re }; } // Shall we delete the perf test pod here? - println!("Rabbitmq workload test passed."); + info!("Rabbitmq workload test passed."); Ok(()) } @@ -622,11 +630,11 @@ pub async fn rabbitmq_e2e_test() -> Result<(), Error> { let rabbitmq_crd = crd_api.get("rabbitmqclusters.anvil.dev").await; match rabbitmq_crd { Err(e) => { - println!("No CRD found, create one before run the e2e test."); + error!("No CRD found, create one before run the e2e test."); return Err(Error::CRDGetFailed(e)); } Ok(crd) => { - println!("CRD found, continue to run the e2e test."); + info!("CRD found, continue to run the e2e test."); } } @@ -641,7 +649,7 @@ pub async fn rabbitmq_e2e_test() -> Result<(), Error> { upgrading_test(client.clone(), rabbitmq_name.clone()).await?; rabbitmq_workload_test(client.clone(), rabbitmq_name.clone()).await?; - println!("E2e test passed."); + info!("E2e test passed."); Ok(()) } @@ -652,11 +660,11 @@ pub async fn rabbitmq_scaling_e2e_test() -> Result<(), Error> { let rabbitmq_crd = crd_api.get("rabbitmqclusters.anvil.dev").await; match rabbitmq_crd { Err(e) => { - println!("No CRD found, create one before run the e2e test."); + error!("No CRD found, create one before run the e2e test."); return Err(Error::CRDGetFailed(e)); } Ok(crd) => { - println!("CRD found, continue to run the e2e test."); + info!("CRD found, continue to run the e2e test."); } } @@ -668,7 +676,7 @@ pub async fn rabbitmq_scaling_e2e_test() -> Result<(), Error> { scaling_test(client.clone(), rabbitmq_name.clone()).await?; rabbitmq_workload_test(client.clone(), rabbitmq_name.clone()).await?; - println!("E2e test passed."); + info!("E2e test passed."); Ok(()) } @@ -679,11 +687,11 @@ pub async fn rabbitmq_ephemeral_e2e_test() -> Result<(), Error> { let rabbitmq_crd = crd_api.get("rabbitmqclusters.anvil.dev").await; match rabbitmq_crd { Err(e) => { - println!("No CRD found, create one before run the e2e test."); + error!("No CRD found, create one before run the e2e test."); return Err(Error::CRDGetFailed(e)); } Ok(crd) => { - println!("CRD found, continue to run the e2e test."); + info!("CRD found, continue to run the e2e test."); } } @@ -695,6 +703,6 @@ pub async fn rabbitmq_ephemeral_e2e_test() -> Result<(), Error> { scaling_test(client.clone(), rabbitmq_name.clone()).await?; rabbitmq_workload_test(client.clone(), rabbitmq_name.clone()).await?; - println!("E2e test passed."); + info!("E2e test passed."); Ok(()) } diff --git a/e2e/src/zookeeper_e2e.rs b/e2e/src/zookeeper_e2e.rs index 6148d283e..87151e10f 100644 --- a/e2e/src/zookeeper_e2e.rs +++ b/e2e/src/zookeeper_e2e.rs @@ -19,6 +19,7 @@ use std::path::PathBuf; use std::thread; use std::time::{Duration, Instant}; use tokio::time::sleep; +use tracing::*; use crate::common::*; @@ -111,13 +112,14 @@ pub async fn desired_state_test(client: Client, zk_name: String) -> Result<(), E loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on desired state test"); return Err(Error::Timeout); } let svc = svc_api.get(&(zk_name.clone() + "-headless")).await; match svc { Err(e) => { - println!("Get headless service failed with error {}.", e); + info!("Get headless service failed with error {}.", e); continue; } _ => {} @@ -126,7 +128,7 @@ pub async fn desired_state_test(client: Client, zk_name: String) -> Result<(), E let svc = svc_api.get(&(zk_name.clone() + "-client")).await; match svc { Err(e) => { - println!("Get client service failed with error {}.", e); + info!("Get client service failed with error {}.", e); continue; } _ => {} @@ -135,7 +137,7 @@ pub async fn desired_state_test(client: Client, zk_name: String) -> Result<(), E let svc = svc_api.get(&(zk_name.clone() + "-admin-server")).await; match svc { Err(e) => { - println!("Get admin server service failed with error {}.", e); + info!("Get admin server service failed with error {}.", e); continue; } _ => {} @@ -144,7 +146,7 @@ pub async fn desired_state_test(client: Client, zk_name: String) -> Result<(), E let cm = cm_api.get(&(zk_name.clone() + "-configmap")).await; match cm { Err(e) => { - println!("Get config map failed with error {}.", e); + info!("Get config map failed with error {}.", e); continue; } _ => {} @@ -154,17 +156,17 @@ pub async fn desired_state_test(client: Client, zk_name: String) -> Result<(), E let sts = sts_api.get(&zk_name).await; match sts { Err(e) => { - println!("Get stateful set failed with error {}.", e); + info!("Get stateful set failed with error {}.", e); continue; } Ok(sts) => { if sts.spec.unwrap().replicas != Some(3) { - println!("Stateful set spec is not consistent with zookeeper cluster spec. E2e test failed."); + error!("Stateful set spec is not consistent with zookeeper cluster spec. E2e test failed."); return Err(Error::ZookeeperStsFailed); } - println!("Stateful set is found as expected."); + info!("Stateful set is found as expected."); if sts.status.as_ref().unwrap().ready_replicas.is_none() { - println!("No stateful set pod is ready."); + info!("No stateful set pod is ready."); continue; } else if *sts .status @@ -175,10 +177,10 @@ pub async fn desired_state_test(client: Client, zk_name: String) -> Result<(), E .unwrap() == 3 { - println!("All stateful set pods are ready."); + info!("All stateful set pods are ready."); break; } else { - println!( + info!( "Only {} pods are ready now.", sts.status .as_ref() @@ -192,7 +194,7 @@ pub async fn desired_state_test(client: Client, zk_name: String) -> Result<(), E } }; } - println!("Desired state test passed."); + info!("Desired state test passed."); Ok(()) } @@ -203,6 +205,7 @@ pub async fn status_test(client: Client, zk_name: String) -> Result<(), Error> { loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on status test"); return Err(Error::Timeout); } if run_command( @@ -213,11 +216,11 @@ pub async fn status_test(client: Client, zk_name: String) -> Result<(), Error> { .0 .contains("ready_replicas: 3") { - println!("Status gets updated to 3 ready replicas now."); + info!("Status gets updated to 3 ready replicas now."); break; } } - println!("Status test passed."); + info!("Status test passed."); Ok(()) } @@ -243,6 +246,7 @@ pub async fn scaling_test(client: Client, zk_name: String, persistent: bool) -> loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on scaling test"); return Err(Error::Timeout); } @@ -250,19 +254,17 @@ pub async fn scaling_test(client: Client, zk_name: String, persistent: bool) -> let sts = sts_api.get(&zk_name).await; match sts { Err(e) => { - println!("Get stateful set failed with error {}.", e); + info!("Get stateful set failed with error {}.", e); continue; } Ok(sts) => { if sts.spec.unwrap().replicas != Some(5) { - println!( - "Stateful set spec is not consistent with zookeeper cluster spec yet." - ); + info!("Stateful set spec is not consistent with zookeeper cluster spec yet."); continue; } - println!("Stateful set is found as expected."); + info!("Stateful set is found as expected."); if sts.status.as_ref().unwrap().ready_replicas.is_none() { - println!("No stateful set pod is ready."); + info!("No stateful set pod is ready."); } else if *sts .status .as_ref() @@ -272,10 +274,10 @@ pub async fn scaling_test(client: Client, zk_name: String, persistent: bool) -> .unwrap() == 5 { - println!("Scale up is done with 5 replicas ready."); + info!("Scale up is done with 5 replicas ready."); break; } else { - println!( + info!( "Scale up is in progress. {} pods are ready now.", sts.status .as_ref() @@ -307,6 +309,7 @@ pub async fn scaling_test(client: Client, zk_name: String, persistent: bool) -> loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on scaling test"); return Err(Error::Timeout); } @@ -314,19 +317,17 @@ pub async fn scaling_test(client: Client, zk_name: String, persistent: bool) -> let sts = sts_api.get(&zk_name).await; match sts { Err(e) => { - println!("Get stateful set failed with error {}.", e); + info!("Get stateful set failed with error {}.", e); continue; } Ok(sts) => { if sts.spec.unwrap().replicas != Some(3) { - println!( - "Stateful set spec is not consistent with zookeeper cluster spec yet." - ); + info!("Stateful set spec is not consistent with zookeeper cluster spec yet."); continue; } - println!("Stateful set is found as expected."); + info!("Stateful set is found as expected."); if sts.status.as_ref().unwrap().ready_replicas.is_none() { - println!("No stateful set pod is ready."); + info!("No stateful set pod is ready."); } else if *sts .status .as_ref() @@ -337,21 +338,21 @@ pub async fn scaling_test(client: Client, zk_name: String, persistent: bool) -> == 3 { if !persistent { - println!("Scale down is done with 3 pods ready."); + info!("Scale down is done with 3 pods ready."); break; } else { let pvcs = pvc_api.list(&ListParams::default()).await; let pvc_num = pvcs.unwrap().items.len(); if pvc_num == 3 { - println!("Scale down is done with 3 pods ready and 3 pvcs."); + info!("Scale down is done with 3 pods ready and 3 pvcs."); break; } else { - println!("Scale down is in progress. {} pvcs exist", pvc_num); + info!("Scale down is in progress. {} pvcs exist", pvc_num); continue; } } } else { - println!( + info!( "Scale down is in progress. {} pods are ready now.", sts.status .as_ref() @@ -383,25 +384,24 @@ pub async fn scaling_test(client: Client, zk_name: String, persistent: bool) -> loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on scaling test"); return Err(Error::Timeout); } let sts = sts_api.get(&zk_name).await; match sts { Err(e) => { - println!("Get stateful set failed with error {}.", e); + info!("Get stateful set failed with error {}.", e); continue; } Ok(sts) => { if sts.spec.unwrap().replicas != Some(5) { - println!( - "Stateful set spec is not consistent with zookeeper cluster spec yet." - ); + info!("Stateful set spec is not consistent with zookeeper cluster spec yet."); continue; } - println!("Stateful set is found as expected."); + info!("Stateful set is found as expected."); if sts.status.as_ref().unwrap().ready_replicas.is_none() { - println!("No stateful set pod is ready."); + info!("No stateful set pod is ready."); } else if *sts .status .as_ref() @@ -411,10 +411,10 @@ pub async fn scaling_test(client: Client, zk_name: String, persistent: bool) -> .unwrap() == 5 { - println!("Scale up is done with 5 replicas ready."); + info!("Scale up is done with 5 replicas ready."); break; } else { - println!( + info!( "Scale up is in progress. {} pods are ready now.", sts.status .as_ref() @@ -429,7 +429,7 @@ pub async fn scaling_test(client: Client, zk_name: String, persistent: bool) -> }; } - println!("Scaling test passed."); + info!("Scaling test passed."); Ok(()) } @@ -455,6 +455,7 @@ pub async fn relabel_test(client: Client, zk_name: String) -> Result<(), Error> loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on relabel test"); return Err(Error::Timeout); } @@ -462,7 +463,7 @@ pub async fn relabel_test(client: Client, zk_name: String) -> Result<(), Error> let sts = sts_api.get(&zk_name).await; match sts { Err(e) => { - println!("Get stateful set failed with error {}.", e); + info!("Get stateful set failed with error {}.", e); continue; } Ok(sts) => { @@ -479,12 +480,12 @@ pub async fn relabel_test(client: Client, zk_name: String) -> Result<(), Error> .unwrap() .contains_key("key") { - println!("Label for pod is not updated yet"); + info!("Label for pod is not updated yet"); continue; } if sts.status.as_ref().unwrap().updated_replicas.is_none() { - println!("No stateful set pod is updated yet."); + info!("No stateful set pod is updated yet."); continue; } else if *sts .status @@ -495,9 +496,9 @@ pub async fn relabel_test(client: Client, zk_name: String) -> Result<(), Error> .unwrap() == 3 { - println!("Relabel is done."); + info!("Relabel is done."); } else { - println!( + info!( "Relabel is in progress. {} pods are updated now.", sts.status .as_ref() @@ -510,7 +511,7 @@ pub async fn relabel_test(client: Client, zk_name: String) -> Result<(), Error> } if sts.status.as_ref().unwrap().ready_replicas.is_none() { - println!("No stateful set pod is ready."); + info!("No stateful set pod is ready."); continue; } else if *sts .status @@ -521,10 +522,10 @@ pub async fn relabel_test(client: Client, zk_name: String) -> Result<(), Error> .unwrap() == 3 { - println!("All stateful set pods are ready."); + info!("All stateful set pods are ready."); break; } else { - println!( + info!( "Only {} pods are ready now.", sts.status .as_ref() @@ -539,7 +540,7 @@ pub async fn relabel_test(client: Client, zk_name: String) -> Result<(), Error> }; } - println!("Relabel test passed."); + info!("Relabel test passed."); Ok(()) } @@ -565,6 +566,7 @@ pub async fn upgrading_test(client: Client, zk_name: String) -> Result<(), Error loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on upgrading test"); return Err(Error::Timeout); } @@ -572,12 +574,12 @@ pub async fn upgrading_test(client: Client, zk_name: String) -> Result<(), Error let sts = sts_api.get(&zk_name).await; match sts { Err(e) => { - println!("Get stateful set failed with error {}.", e); + info!("Get stateful set failed with error {}.", e); continue; } Ok(sts) => { if sts.status.as_ref().unwrap().updated_replicas.is_none() { - println!("No stateful set pod is updated yet."); + info!("No stateful set pod is updated yet."); continue; } else if *sts .status @@ -588,9 +590,9 @@ pub async fn upgrading_test(client: Client, zk_name: String) -> Result<(), Error .unwrap() == 3 { - println!("Upgrading is done."); + info!("Upgrading is done."); } else { - println!( + info!( "Upgrading is in progress. {} pods are updated now.", sts.status .as_ref() @@ -603,7 +605,7 @@ pub async fn upgrading_test(client: Client, zk_name: String) -> Result<(), Error } if sts.status.as_ref().unwrap().ready_replicas.is_none() { - println!("No stateful set pod is ready."); + info!("No stateful set pod is ready."); continue; } else if *sts .status @@ -614,10 +616,10 @@ pub async fn upgrading_test(client: Client, zk_name: String) -> Result<(), Error .unwrap() == 3 { - println!("All stateful set pods are ready."); + info!("All stateful set pods are ready."); break; } else { - println!( + info!( "Only {} pods are ready now.", sts.status .as_ref() @@ -632,7 +634,7 @@ pub async fn upgrading_test(client: Client, zk_name: String) -> Result<(), Error }; } - println!("Upgrading test passed."); + info!("Upgrading test passed."); Ok(()) } @@ -658,6 +660,7 @@ pub async fn reconfiguration_test(client: Client, zk_name: String) -> Result<(), loop { sleep(Duration::from_secs(5)).await; if start.elapsed() > timeout { + error!("Time out on reconfiguration test"); return Err(Error::Timeout); } @@ -665,12 +668,12 @@ pub async fn reconfiguration_test(client: Client, zk_name: String) -> Result<(), let sts = sts_api.get(&zk_name).await; match sts { Err(e) => { - println!("Get stateful set failed with error {}.", e); + info!("Get stateful set failed with error {}.", e); continue; } Ok(sts) => { if sts.status.as_ref().unwrap().updated_replicas.is_none() { - println!("No stateful set pod is updated yet."); + info!("No stateful set pod is updated yet."); continue; } else if *sts .status @@ -681,9 +684,9 @@ pub async fn reconfiguration_test(client: Client, zk_name: String) -> Result<(), .unwrap() == 3 { - println!("Reconfiguration is done."); + info!("Reconfiguration is done."); } else { - println!( + info!( "Reconfiguration is in progress. {} pods are updated now.", sts.status .as_ref() @@ -696,7 +699,7 @@ pub async fn reconfiguration_test(client: Client, zk_name: String) -> Result<(), } if sts.status.as_ref().unwrap().ready_replicas.is_none() { - println!("No stateful set pod is ready."); + info!("No stateful set pod is ready."); continue; } else if *sts .status @@ -707,10 +710,10 @@ pub async fn reconfiguration_test(client: Client, zk_name: String) -> Result<(), .unwrap() == 3 { - println!("All stateful set pods are ready."); + info!("All stateful set pods are ready."); break; } else { - println!( + info!( "Only {} pods are ready now.", sts.status .as_ref() @@ -737,17 +740,17 @@ pub async fn reconfiguration_test(client: Client, zk_name: String) -> Result<(), .await?; let (out, err) = get_output_and_err(attached).await; if err != "" { - println!("Reconfiguration test failed with {}.", err); + error!("Reconfiguration test failed with {}.", err); return Err(Error::ZookeeperWorkloadFailed); } else { - println!("{}", out); + info!("{}", out); if !out.contains("initLimit=15") { - println!("Test failed because of unexpected zoo.cfg data."); + error!("Test failed because of unexpected zoo.cfg data."); return Err(Error::ZookeeperWorkloadFailed); } } - println!("Reconfiguration test passed."); + info!("Reconfiguration test passed."); Ok(()) } @@ -777,12 +780,12 @@ pub async fn zk_workload_test(client: Client, zk_name: String) -> Result<(), Err .await?; let (out, err) = get_output_and_err(attached).await; if err != "" { - println!("ZK workload test failed with {}.", err); + error!("ZK workload test failed with {}.", err); return Err(Error::ZookeeperWorkloadFailed); } else { - println!("{}", out); + info!("{}", out); if !out.contains("test-data") { - println!("Test failed because of unexpected get output."); + error!("Test failed because of unexpected get output."); return Err(Error::ZookeeperWorkloadFailed); } } @@ -805,12 +808,12 @@ pub async fn zk_workload_test(client: Client, zk_name: String) -> Result<(), Err .await?; let (out, err) = get_output_and_err(attached).await; if err != "" { - println!("ZK workload test failed with {}.", err); + error!("ZK workload test failed with {}.", err); return Err(Error::ZookeeperWorkloadFailed); } else { - println!("{}", out); + info!("{}", out); if !out.contains("test-data-2") { - println!("Test failed because of unexpected get output."); + error!("Test failed because of unexpected get output."); return Err(Error::ZookeeperWorkloadFailed); } } @@ -825,17 +828,17 @@ pub async fn zk_workload_test(client: Client, zk_name: String) -> Result<(), Err .await?; let (out, err) = get_output_and_err(attached).await; if err != "" { - println!("ZK workload test failed with {}.", err); + error!("ZK workload test failed with {}.", err); return Err(Error::ZookeeperWorkloadFailed); } else { - println!("{}", out); + info!("{}", out); if !out.contains("test-data-2") { - println!("Test failed because of unexpected get output."); + error!("Test failed because of unexpected get output."); return Err(Error::ZookeeperWorkloadFailed); } } - println!("Zookeeper workload test passed."); + info!("Zookeeper workload test passed."); Ok(()) } @@ -855,17 +858,17 @@ pub async fn zk_workload_test2(client: Client, zk_name: String) -> Result<(), Er .await?; let (out, err) = get_output_and_err(attached).await; if err != "" { - println!("ZK workload test failed with {}.", err); + error!("ZK workload test failed with {}.", err); return Err(Error::ZookeeperWorkloadFailed); } else { - println!("{}", out); + info!("{}", out); if !out.contains("test-data-2") { - println!("Test failed because of unexpected get output."); + error!("Test failed because of unexpected get output."); return Err(Error::ZookeeperWorkloadFailed); } } - println!("Zookeeper workload test2 passed."); + info!("Zookeeper workload test2 passed."); Ok(()) } @@ -876,11 +879,11 @@ pub async fn zookeeper_e2e_test() -> Result<(), Error> { let zk_crd = crd_api.get("zookeeperclusters.anvil.dev").await; match zk_crd { Err(e) => { - println!("No CRD found, create one before run the e2e test."); + error!("No CRD found, create one before run the e2e test."); return Err(Error::CRDGetFailed(e)); } Ok(crd) => { - println!("CRD found, continue to run the e2e test."); + info!("CRD found, continue to run the e2e test."); } } @@ -896,7 +899,7 @@ pub async fn zookeeper_e2e_test() -> Result<(), Error> { upgrading_test(client.clone(), zk_name.clone()).await?; zk_workload_test2(client.clone(), zk_name.clone()).await?; // Test if the data is still there after upgrading - println!("E2e test passed."); + info!("E2e test passed."); Ok(()) } @@ -907,11 +910,11 @@ pub async fn zookeeper_scaling_e2e_test() -> Result<(), Error> { let zk_crd = crd_api.get("zookeeperclusters.anvil.dev").await; match zk_crd { Err(e) => { - println!("No CRD found, create one before run the e2e test."); + error!("No CRD found, create one before run the e2e test."); return Err(Error::CRDGetFailed(e)); } Ok(crd) => { - println!("CRD found, continue to run the e2e test."); + info!("CRD found, continue to run the e2e test."); } } @@ -924,7 +927,7 @@ pub async fn zookeeper_scaling_e2e_test() -> Result<(), Error> { scaling_test(client.clone(), zk_name.clone(), true).await?; zk_workload_test(client.clone(), zk_name.clone()).await?; - println!("E2e test passed."); + info!("E2e test passed."); Ok(()) } @@ -935,11 +938,11 @@ pub async fn zookeeper_ephemeral_e2e_test() -> Result<(), Error> { let zk_crd = crd_api.get("zookeeperclusters.anvil.dev").await; match zk_crd { Err(e) => { - println!("No CRD found, create one before run the e2e test."); + error!("No CRD found, create one before run the e2e test."); return Err(Error::CRDGetFailed(e)); } Ok(crd) => { - println!("CRD found, continue to run the e2e test."); + info!("CRD found, continue to run the e2e test."); } } @@ -952,6 +955,6 @@ pub async fn zookeeper_ephemeral_e2e_test() -> Result<(), Error> { scaling_test(client.clone(), zk_name.clone(), false).await?; zk_workload_test(client.clone(), zk_name.clone()).await?; - println!("E2e test passed."); + info!("E2e test passed."); Ok(()) } diff --git a/src/controller_examples/zookeeper_controller/trusted/zookeeper_api_exec.rs b/src/controller_examples/zookeeper_controller/trusted/zookeeper_api_exec.rs index 3fe3a7849..98635e754 100644 --- a/src/controller_examples/zookeeper_controller/trusted/zookeeper_api_exec.rs +++ b/src/controller_examples/zookeeper_controller/trusted/zookeeper_api_exec.rs @@ -9,10 +9,10 @@ use crate::zookeeper_controller::trusted::{ }, }; use core::time::Duration; +use deps_hack::tracing::info; use deps_hack::zookeeper::{Acl, CreateMode, WatchedEvent, Watcher, ZkError, ZkResult, ZooKeeper}; -use vstd::{prelude::*, string::*, view::*}; - use vstd::pervasive::unreached; +use vstd::{prelude::*, string::*, view::*}; verus! { @@ -92,57 +92,77 @@ impl View for ZKAPIOutput { pub struct ZKAPIShimLayer {} -#[verifier(external)] +} + impl ExternalAPIShimLayer for ZKAPIShimLayer { type Input = ZKAPIInput; type Output = ZKAPIOutput; fn call_external_api(input: ZKAPIInput) -> ZKAPIOutput { match input { - ZKAPIInput::ExistsRequest(zk_name, zk_namespace, port, path) - => ZKAPIOutput::ExistsResponse(zk_exists(zk_name, zk_namespace, port, path)), - ZKAPIInput::CreateRequest(zk_name, zk_namespace, port, path, data) - => ZKAPIOutput::CreateResponse(zk_create(zk_name, zk_namespace, port, path, data)), - ZKAPIInput::SetDataRequest(zk_name, zk_namespace, port, path, data, version) - => ZKAPIOutput::SetDataResponse(zk_set_data(zk_name, zk_namespace, port, path, data, version)), + ZKAPIInput::ExistsRequest(zk_name, zk_namespace, port, path) => { + ZKAPIOutput::ExistsResponse(zk_exists(zk_name, zk_namespace, port, path)) + } + ZKAPIInput::CreateRequest(zk_name, zk_namespace, port, path, data) => { + ZKAPIOutput::CreateResponse(zk_create(zk_name, zk_namespace, port, path, data)) + } + ZKAPIInput::SetDataRequest(zk_name, zk_namespace, port, path, data, version) => { + ZKAPIOutput::SetDataResponse(zk_set_data( + zk_name, + zk_namespace, + port, + path, + data, + version, + )) + } } } } struct NoopWatcher; -#[verifier(external)] impl Watcher for NoopWatcher { fn handle(&self, _e: WatchedEvent) {} } -#[verifier(external)] pub fn set_up_zk_client(name: &String, namespace: &String, port: i32) -> ZkResult { let uri = &format!("{}-client.{}.svc.cluster.local:{}", name, namespace, port); - println!("Connecting to zk uri {} ...", uri); + info!("Connecting to zk uri {} ...", uri); ZooKeeper::connect(uri, Duration::from_secs(10), NoopWatcher) } -#[verifier(external)] -pub fn zk_exists(name: String, namespace: String, port: i32, path: Vec) -> ZKAPIExistsResult { - let result = ZKAPIExistsResult {res: zk_exists_internal(name, namespace, port, path)}; +pub fn zk_exists( + name: String, + namespace: String, + port: i32, + path: Vec, +) -> ZKAPIExistsResult { + let result = ZKAPIExistsResult { + res: zk_exists_internal(name, namespace, port, path), + }; match result.res { - Err(_) => println!("Checking existence of zk node failed"), + Err(_) => info!("Checking existence of zk node failed"), Ok(o) => { - println!("Checking existence of zk node successfully"); + info!("Checking existence of zk node successfully"); match o { - Some(version) => println!("The zk node exists and version is {}", version), - None => println!("The zk node does not exist"), + Some(version) => info!("The zk node exists and version is {}", version), + None => info!("The zk node does not exist"), } - }, + } } result } -#[verifier(external)] -pub fn zk_exists_internal(name: String, namespace: String, port: i32, path: Vec) -> Result, ZKAPIError> { - let zk_client = set_up_zk_client(&name, &namespace, port).map_err(|_e| ZKAPIError::ZKNodeExistsFailed)?; +pub fn zk_exists_internal( + name: String, + namespace: String, + port: i32, + path: Vec, +) -> Result, ZKAPIError> { + let zk_client = + set_up_zk_client(&name, &namespace, port).map_err(|_e| ZKAPIError::ZKNodeExistsFailed)?; let path_as_string = format!("/{}", path.join("/")); - println!("Checking existence of {} ...", &path_as_string); + info!("Checking existence of {} ...", &path_as_string); let exist_result = zk_client.exists(path_as_string.as_str(), false); let _ = zk_client.close(); match exist_result { @@ -154,23 +174,41 @@ pub fn zk_exists_internal(name: String, namespace: String, port: i32, path: Vec< } } -#[verifier(external)] -pub fn zk_create(name: String, namespace: String, port: i32, path: Vec, data: String) -> ZKAPICreateResult { - let result = ZKAPICreateResult {res: zk_create_internal(name, namespace, port, path, data)}; +pub fn zk_create( + name: String, + namespace: String, + port: i32, + path: Vec, + data: String, +) -> ZKAPICreateResult { + let result = ZKAPICreateResult { + res: zk_create_internal(name, namespace, port, path, data), + }; match result.res { - Err(_) => println!("Create zk node failed"), - Ok(_) => println!("Create zk node successfully"), + Err(_) => info!("Create zk node failed"), + Ok(_) => info!("Create zk node successfully"), } result } -#[verifier(external)] -pub fn zk_create_internal(name: String, namespace: String, port: i32, path: Vec, data: String) -> Result<(), ZKAPIError> { - let zk_client = set_up_zk_client(&name, &namespace, port).map_err(|_e| ZKAPIError::ZKNodeCreateFailed)?; +pub fn zk_create_internal( + name: String, + namespace: String, + port: i32, + path: Vec, + data: String, +) -> Result<(), ZKAPIError> { + let zk_client = + set_up_zk_client(&name, &namespace, port).map_err(|_e| ZKAPIError::ZKNodeCreateFailed)?; let path_as_string = format!("/{}", path.join("/")); let data_as_string = data; - println!("Creating {} {} ...", &path_as_string, &data_as_string); - let create_result = zk_client.create(path_as_string.as_str(), data_as_string.as_str().as_bytes().to_vec(), Acl::open_unsafe().to_vec(), CreateMode::Persistent); + info!("Creating {} {} ...", &path_as_string, &data_as_string); + let create_result = zk_client.create( + path_as_string.as_str(), + data_as_string.as_str().as_bytes().to_vec(), + Acl::open_unsafe().to_vec(), + CreateMode::Persistent, + ); let _ = zk_client.close(); match create_result { Err(e) => match e { @@ -181,28 +219,48 @@ pub fn zk_create_internal(name: String, namespace: String, port: i32, path: Vec< } } -#[verifier(external)] -pub fn zk_set_data(name: String, namespace: String, port: i32, path: Vec, data: String, version: i32) -> ZKAPISetDataResult { - let result = ZKAPISetDataResult {res: zk_set_data_internal(name, namespace, port, path, data, version)}; +pub fn zk_set_data( + name: String, + namespace: String, + port: i32, + path: Vec, + data: String, + version: i32, +) -> ZKAPISetDataResult { + let result = ZKAPISetDataResult { + res: zk_set_data_internal(name, namespace, port, path, data, version), + }; match result.res { - Err(_) => println!("Set zk node failed"), - Ok(_) => println!("Set zk node successfully"), + Err(_) => info!("Set zk node failed"), + Ok(_) => info!("Set zk node successfully"), } result } -#[verifier(external)] -pub fn zk_set_data_internal(name: String, namespace: String, port: i32, path: Vec, data: String, version: i32) -> Result<(), ZKAPIError> { - let zk_client = set_up_zk_client(&name, &namespace, port).map_err(|_e| ZKAPIError::ZKNodeSetDataFailed)?; +pub fn zk_set_data_internal( + name: String, + namespace: String, + port: i32, + path: Vec, + data: String, + version: i32, +) -> Result<(), ZKAPIError> { + let zk_client = + set_up_zk_client(&name, &namespace, port).map_err(|_e| ZKAPIError::ZKNodeSetDataFailed)?; let path_as_string = format!("/{}", path.join("/")); let data_as_string = data; - println!("Setting {} {} {} ...", &path_as_string, &data_as_string, version); - let set_result = zk_client.set_data(path_as_string.as_str(), data_as_string.as_str().as_bytes().to_vec(), Some(version)); + info!( + "Setting {} {} {} ...", + &path_as_string, &data_as_string, version + ); + let set_result = zk_client.set_data( + path_as_string.as_str(), + data_as_string.as_str().as_bytes().to_vec(), + Some(version), + ); let _ = zk_client.close(); match set_result { Err(_) => Err(ZKAPIError::ZKNodeSetDataFailed), Ok(_) => Ok(()), } } - -} diff --git a/src/deps_hack/src/lib.rs b/src/deps_hack/src/lib.rs index d328270b4..b630d2776 100644 --- a/src/deps_hack/src/lib.rs +++ b/src/deps_hack/src/lib.rs @@ -15,6 +15,7 @@ pub use serde_yaml; pub use thiserror; pub use tokio; pub use tracing; +pub use tracing_subscriber; pub use zookeeper; #[derive(Debug, thiserror::Error)] @@ -302,7 +303,8 @@ pub struct VStatefulSetSpec { #[serde(rename = "minReadySeconds")] pub min_ready_seconds: Option, #[serde(rename = "persistentVolumeClaimRetentionPolicy")] - pub persistent_volume_claim_retention_policy: Option, + pub persistent_volume_claim_retention_policy: + Option, pub ordinals: Option, } diff --git a/src/fluent_controller.rs b/src/fluent_controller.rs index 746352326..f42f37b21 100644 --- a/src/fluent_controller.rs +++ b/src/fluent_controller.rs @@ -13,55 +13,48 @@ pub mod state_machine; pub mod temporal_logic; pub mod vstd_ext; -use builtin::*; -use builtin_macros::*; - -use crate::external_api::exec::*; use crate::fluent_controller::{ - fluentbit::{ - exec::reconciler::FluentBitReconciler, - trusted::exec_types::{FluentBit, FluentBitReconcileState}, - }, - fluentbit_config::{ - exec::reconciler::FluentBitConfigReconciler, - trusted::exec_types::{FluentBitConfig, FluentBitConfigReconcileState}, - }, + fluentbit::exec::reconciler::FluentBitReconciler, + fluentbit_config::exec::reconciler::FluentBitConfigReconciler, }; use deps_hack::anyhow::Result; use deps_hack::futures; use deps_hack::kube::CustomResourceExt; use deps_hack::serde_yaml; use deps_hack::tokio; +use deps_hack::tracing::{error, info}; +use deps_hack::tracing_subscriber; use shim_layer::controller_runtime::run_controller; use std::env; -verus! { - -#[verifier(external)] #[tokio::main] async fn main() -> Result<()> { + tracing_subscriber::fmt::init(); let args: Vec = env::args().collect(); let cmd = args[1].clone(); if cmd == String::from("export") { - println!("exporting custom resource definition"); println!("{}", serde_yaml::to_string(&deps_hack::FluentBit::crd())?); - println!("{}", serde_yaml::to_string(&deps_hack::FluentBitConfig::crd())?); + println!( + "{}", + serde_yaml::to_string(&deps_hack::FluentBitConfig::crd())? + ); } else if cmd == String::from("run") { - println!("running fluent-controller"); - let fluentbit_controller_fut = run_controller::(false); - let fluentbit_config_controller_fut = run_controller::(false); + info!("running fluent-controller"); + let fluentbit_controller_fut = + run_controller::(false); + let fluentbit_config_controller_fut = + run_controller::(false); futures::try_join!(fluentbit_controller_fut, fluentbit_config_controller_fut)?; - println!("controller terminated"); } else if cmd == String::from("crash") { - println!("running fluent-controller in crash-testing mode"); - let fluentbit_controller_fut = run_controller::(true); - let fluentbit_config_controller_fut = run_controller::(true); + info!("running fluent-controller in crash-testing mode"); + let fluentbit_controller_fut = + run_controller::(true); + let fluentbit_config_controller_fut = + run_controller::(true); futures::try_join!(fluentbit_controller_fut, fluentbit_config_controller_fut)?; - println!("controller terminated"); } else { - println!("wrong command; please use \"export\", \"run\" or \"crash\""); + error!("wrong command; please use \"export\", \"run\" or \"crash\""); } Ok(()) } -} diff --git a/src/rabbitmq_controller.rs b/src/rabbitmq_controller.rs index 791b44ea8..c9169af6d 100644 --- a/src/rabbitmq_controller.rs +++ b/src/rabbitmq_controller.rs @@ -13,41 +13,35 @@ pub mod state_machine; pub mod temporal_logic; pub mod vstd_ext; -use builtin::*; -use builtin_macros::*; - -use crate::external_api::exec::*; use crate::rabbitmq_controller::exec::reconciler::RabbitmqReconciler; -use crate::rabbitmq_controller::trusted::exec_types::{RabbitmqCluster, RabbitmqReconcileState}; use deps_hack::anyhow::Result; use deps_hack::kube::CustomResourceExt; use deps_hack::serde_yaml; use deps_hack::tokio; +use deps_hack::tracing::{error, info}; +use deps_hack::tracing_subscriber; use shim_layer::controller_runtime::run_controller; use std::env; -verus! { - -#[verifier(external)] #[tokio::main] async fn main() -> Result<()> { + tracing_subscriber::fmt::init(); let args: Vec = env::args().collect(); let cmd = args[1].clone(); if cmd == String::from("export") { - println!("exporting custom resource definition"); - println!("{}", serde_yaml::to_string(&deps_hack::RabbitmqCluster::crd())?); + println!( + "{}", + serde_yaml::to_string(&deps_hack::RabbitmqCluster::crd())? + ); } else if cmd == String::from("run") { - println!("running rabbitmq-controller"); + info!("running rabbitmq-controller"); run_controller::(false).await?; - println!("controller terminated"); } else if cmd == String::from("crash") { - println!("running rabbitmq-controller in crash-testing mode"); + info!("running rabbitmq-controller in crash-testing mode"); run_controller::(true).await?; - println!("controller terminated"); } else { - println!("wrong command; please use \"export\", \"run\" or \"crash\""); + error!("wrong command; please use \"export\", \"run\" or \"crash\""); } Ok(()) } -} diff --git a/src/shim_layer/controller_runtime.rs b/src/shim_layer/controller_runtime.rs index 6d6e57e57..5559fd3ab 100644 --- a/src/shim_layer/controller_runtime.rs +++ b/src/shim_layer/controller_runtime.rs @@ -22,6 +22,7 @@ use deps_hack::kube::{ }; use deps_hack::kube_core::{ErrorResponse, NamespaceResourceScope}; use deps_hack::serde::{de::DeserializeOwned, Serialize}; +use deps_hack::tracing::{error, info, warn}; use deps_hack::Error; use std::sync::Arc; use std::time::Duration; @@ -61,19 +62,19 @@ where return reconcile_with::(cr, ctx, fault_injection).await; }; - println!("starting controller"); + info!("starting controller"); // TODO: the controller should also listen to the owned resources Controller::new(crs, watcher::Config::default()) // The controller's reconcile is triggered when a CR is created/updated .shutdown_on_signal() .run(reconcile, error_policy, Arc::new(Data { client })) // The reconcile function is registered .for_each(|res| async move { match res { - Ok(o) => println!("reconciled {:?}", o), - Err(e) => println!("reconcile failed: {}", e), + Ok(o) => info!("reconciled {:?}", o), + Err(e) => info!("reconcile failed: {}", e), } }) .await; - println!("controller terminated"); + info!("controller terminated"); Ok(()) } @@ -121,14 +122,14 @@ where Err(deps_hack::kube_client::error::Error::Api(ErrorResponse { reason, .. })) if &reason == "NotFound" => { - println!( + warn!( "{} Custom resource {} not found, end reconcile", log_header, cr_name ); return Ok(Action::await_change()); } Err(err) => { - println!( + warn!( "{} Get custom resource {} failed with error: {}, will retry reconcile", log_header, cr_name, err ); @@ -138,7 +139,7 @@ where } // Wrap the custom resource with Verus-friendly wrapper type (which has a ghost version, i.e., view) let cr = get_cr_resp.unwrap(); - println!( + info!( "{} Get cr {}", log_header, deps_hack::k8s_openapi::serde_json::to_string(&cr).unwrap() @@ -158,11 +159,11 @@ where check_fault_timing = false; // If reconcile core is done, then breaks the loop if ReconcilerType::reconcile_done(&state) { - println!("{} done", log_header); + info!("{} done", log_header); break; } if ReconcilerType::reconcile_error(&state) { - println!("{} error", log_header); + warn!("{} error", log_header); return Err(Error::ReconcileCoreError); } // Feed the current reconcile state and get the new state and the pending request @@ -186,16 +187,13 @@ where kube_resp = KubeAPIResponse::GetResponse(KubeGetResponse { res: Err(kube_error_to_ghost(&err)), }); - println!( - "{} Get {} failed with error: {}", - log_header, key, err - ); + info!("{} Get {} failed with error: {}", log_header, key, err); } Ok(obj) => { kube_resp = KubeAPIResponse::GetResponse(KubeGetResponse { res: Ok(DynamicObject::from_kube(obj)), }); - println!("{} Get {} done", log_header, key); + info!("{} Get {} done", log_header, key); } } } @@ -212,10 +210,7 @@ where kube_resp = KubeAPIResponse::ListResponse(KubeListResponse { res: Err(kube_error_to_ghost(&err)), }); - println!( - "{} List {} failed with error: {}", - log_header, key, err - ); + info!("{} List {} failed with error: {}", log_header, key, err); } Ok(obj_list) => { kube_resp = KubeAPIResponse::ListResponse(KubeListResponse { @@ -225,7 +220,7 @@ where .map(|obj| DynamicObject::from_kube(obj)) .collect()), }); - println!("{} List {} done", log_header, key); + info!("{} List {} done", log_header, key); } } } @@ -245,7 +240,7 @@ where KubeAPIResponse::CreateResponse(KubeCreateResponse { res: Err(kube_error_to_ghost(&err)), }); - println!( + info!( "{} Create {} failed with error: {}", log_header, key, err ); @@ -255,7 +250,7 @@ where KubeAPIResponse::CreateResponse(KubeCreateResponse { res: Ok(DynamicObject::from_kube(obj)), }); - println!("{} Create {} done", log_header, key); + info!("{} Create {} done", log_header, key); } } } @@ -274,7 +269,7 @@ where KubeAPIResponse::DeleteResponse(KubeDeleteResponse { res: Err(kube_error_to_ghost(&err)), }); - println!( + info!( "{} Delete {} failed with error: {}", log_header, key, err ); @@ -284,7 +279,7 @@ where KubeAPIResponse::DeleteResponse(KubeDeleteResponse { res: Ok(()), }); - println!("{} Delete {} done", log_header, key); + info!("{} Delete {} done", log_header, key); } } } @@ -304,7 +299,7 @@ where KubeAPIResponse::UpdateResponse(KubeUpdateResponse { res: Err(kube_error_to_ghost(&err)), }); - println!( + info!( "{} Update {} failed with error: {}", log_header, key, err ); @@ -314,7 +309,7 @@ where KubeAPIResponse::UpdateResponse(KubeUpdateResponse { res: Ok(DynamicObject::from_kube(obj)), }); - println!("{} Update {} done", log_header, key); + info!("{} Update {} done", log_header, key); } } } @@ -344,7 +339,7 @@ where res: Err(kube_error_to_ghost(&err)), }, ); - println!( + info!( "{} UpdateStatus {} failed with error: {}", log_header, key, err ); @@ -355,7 +350,7 @@ where res: Ok(DynamicObject::from_kube(obj)), }, ); - println!("{} UpdateStatus {} done", log_header, key); + info!("{} UpdateStatus {} done", log_header, key); } } } @@ -375,7 +370,7 @@ where // and fault injection option is on, then check whether to crash at this point let result = crash_or_continue(client, &cr_key, &log_header).await; if result.is_err() { - println!( + error!( "{} crash_or_continue fails due to {}", log_header, result.unwrap_err() diff --git a/src/shim_layer/fault_injection.rs b/src/shim_layer/fault_injection.rs index b097881e1..18da8cf09 100644 --- a/src/shim_layer/fault_injection.rs +++ b/src/shim_layer/fault_injection.rs @@ -11,23 +11,37 @@ use deps_hack::kube::{ api::{Api, ObjectMeta, PostParams, Resource}, Client, }; +use deps_hack::tracing::info; use deps_hack::Error; -verus! { - -#[verifier(external)] -pub async fn crash_or_continue(client: &Client, cr_key: &String, log_header: &String) -> Result<(), String> { +pub async fn crash_or_continue( + client: &Client, + cr_key: &String, + log_header: &String, +) -> Result<(), String> { // We require the fault injection configuration is stored by a ConfigMap object // in the default namespace called "fault-injection-config" let config_map_name = "fault-injection-config"; let config_map_api = Api::::namespaced(client.clone(), "default"); - let mut config_map = config_map_api.get(&config_map_name).await + let mut config_map = config_map_api + .get(&config_map_name) + .await .map_err(|_e| "Fail to get fault injection config".to_string())?; - println!("{} Get {}: {}", log_header, config_map_name, deps_hack::k8s_openapi::serde_json::to_string(&config_map).unwrap()); - let data = config_map.data.as_ref().ok_or_else(|| "Fail to unwrap data".to_string())?; + info!( + "{} Get {}: {}", + log_header, + config_map_name, + deps_hack::k8s_openapi::serde_json::to_string(&config_map).unwrap() + ); + let data = config_map + .data + .as_ref() + .ok_or_else(|| "Fail to unwrap data".to_string())?; // The configuration should tell us a cr_key and we will crash the controller when it is managing that object // This is to make the fault injection more deterministic when the controller manages multiple cr objects of different types - let cr_key_val = data.get("cr_key").ok_or_else(|| "Fail to get cr_key".to_string())?; + let cr_key_val = data + .get("cr_key") + .ok_or_else(|| "Fail to get cr_key".to_string())?; // We only want to crash when the controller is managing the object identified by cr_key if cr_key_val.to_string() != cr_key.to_string() { return Ok(()); @@ -35,13 +49,27 @@ pub async fn crash_or_continue(client: &Client, cr_key: &String, log_header: &St // The configuration should have the two entries: // 1. the current number of requests that the controller has issued, and // 2. the expected number of requests after which the controller should crash - let current_val = data.get("current").ok_or_else(|| "Fail to get current".to_string())?; - let current = current_val.parse::().map_err(|_e| "Fail to parse current value to i32".to_string())?; - let expected_val = data.get("expected").ok_or_else(|| "Fail to get expected".to_string())?; - let expected = expected_val.parse::().map_err(|_e| "Fail to parse expected value to i32".to_string())?; + let current_val = data + .get("current") + .ok_or_else(|| "Fail to get current".to_string())?; + let current = current_val + .parse::() + .map_err(|_e| "Fail to parse current value to i32".to_string())?; + let expected_val = data + .get("expected") + .ok_or_else(|| "Fail to get expected".to_string())?; + let expected = expected_val + .parse::() + .map_err(|_e| "Fail to parse expected value to i32".to_string())?; // We increment current entry here before panic, otherwise we might end up crashing at the same point forever - config_map.data.as_mut().unwrap().insert("current".to_string(), (current + 1).to_string()); - config_map_api.replace(config_map_name, &PostParams::default(), &config_map).await + config_map + .data + .as_mut() + .unwrap() + .insert("current".to_string(), (current + 1).to_string()); + config_map_api + .replace(config_map_name, &PostParams::default(), &config_map) + .await .map_err(|_e| "Fail to update fault injection config".to_string())?; if current == expected { // Now it is time to crash according to fault-injection-config @@ -49,5 +77,3 @@ pub async fn crash_or_continue(client: &Client, cr_key: &String, log_header: &St } return Ok(()); } - -} diff --git a/src/v_replica_set_controller.rs b/src/v_replica_set_controller.rs index 46fcd5308..ab221d764 100644 --- a/src/v_replica_set_controller.rs +++ b/src/v_replica_set_controller.rs @@ -18,27 +18,27 @@ use deps_hack::anyhow::Result; use deps_hack::kube::CustomResourceExt; use deps_hack::serde_yaml; use deps_hack::tokio; +use deps_hack::tracing::{error, info}; +use deps_hack::tracing_subscriber; use shim_layer::controller_runtime::run_controller; use std::env; #[tokio::main] async fn main() -> Result<()> { + tracing_subscriber::fmt::init(); let args: Vec = env::args().collect(); let cmd = args[1].clone(); if cmd == String::from("export") { - println!("exporting custom resource definition"); println!("{}", serde_yaml::to_string(&deps_hack::VReplicaSet::crd())?); } else if cmd == String::from("run") { - println!("running v-replica-set-controller"); + info!("running v-replica-set-controller"); run_controller::(false).await?; - println!("controller terminated"); } else if cmd == String::from("crash") { - println!("running v-replica-set-controller in crash-testing mode"); + info!("running v-replica-set-controller in crash-testing mode"); run_controller::(true).await?; - println!("controller terminated"); } else { - println!("wrong command; please use \"export\", \"run\" or \"crash\""); + error!("wrong command; please use \"export\", \"run\" or \"crash\""); } Ok(()) } diff --git a/src/v_stateful_set_controller.rs b/src/v_stateful_set_controller.rs index 60d7fe1a8..3c08688c0 100644 --- a/src/v_stateful_set_controller.rs +++ b/src/v_stateful_set_controller.rs @@ -11,38 +11,33 @@ pub mod state_machine; pub mod temporal_logic; pub mod vstd_ext; -use builtin::*; -use builtin_macros::*; - -use crate::external_api::exec::*; use deps_hack::anyhow::Result; use deps_hack::futures; use deps_hack::kube::CustomResourceExt; use deps_hack::serde_yaml; use deps_hack::tokio; +use deps_hack::tracing::{error, info}; +use deps_hack::tracing_subscriber; use shim_layer::controller_runtime::run_controller; use std::env; -verus! { - -#[verifier(external)] #[tokio::main] async fn main() -> Result<()> { + tracing_subscriber::fmt::init(); let args: Vec = env::args().collect(); let cmd = args[1].clone(); if cmd == String::from("export") { - println!("exporting custom resource definition"); - println!("{}", serde_yaml::to_string(&deps_hack::VStatefulSet::crd())?); + println!( + "{}", + serde_yaml::to_string(&deps_hack::VStatefulSet::crd())? + ); } else if cmd == String::from("run") { - println!("running vstatefulset-controller"); - println!("controller terminated"); + info!("running vstatefulset-controller"); } else if cmd == String::from("crash") { - println!("running vstatefulset-controller in crash-testing mode"); - println!("controller terminated"); + info!("running vstatefulset-controller in crash-testing mode"); } else { - println!("wrong command; please use \"export\", \"run\" or \"crash\""); + error!("wrong command; please use \"export\", \"run\" or \"crash\""); } Ok(()) } -} diff --git a/src/zookeeper_controller.rs b/src/zookeeper_controller.rs index 25d16334d..1ba8ed215 100644 --- a/src/zookeeper_controller.rs +++ b/src/zookeeper_controller.rs @@ -13,41 +13,35 @@ pub mod vstd_ext; #[path = "controller_examples/zookeeper_controller/mod.rs"] pub mod zookeeper_controller; -use builtin::*; -use builtin_macros::*; - use crate::zookeeper_controller::exec::reconciler::ZookeeperReconciler; -use crate::zookeeper_controller::trusted::exec_types::{ZookeeperCluster, ZookeeperReconcileState}; -use crate::zookeeper_controller::trusted::zookeeper_api_exec::*; use deps_hack::anyhow::Result; use deps_hack::kube::CustomResourceExt; use deps_hack::serde_yaml; use deps_hack::tokio; +use deps_hack::tracing::{error, info}; +use deps_hack::tracing_subscriber; use shim_layer::controller_runtime::run_controller; use std::env; -verus! { - -#[verifier(external)] #[tokio::main] async fn main() -> Result<()> { + tracing_subscriber::fmt::init(); let args: Vec = env::args().collect(); let cmd = args[1].clone(); if cmd == String::from("export") { - println!("exporting custom resource definition"); - println!("{}", serde_yaml::to_string(&deps_hack::ZookeeperCluster::crd())?); + println!( + "{}", + serde_yaml::to_string(&deps_hack::ZookeeperCluster::crd())? + ); } else if cmd == String::from("run") { - println!("running zookeeper-controller"); + info!("running zookeeper-controller"); run_controller::(false).await?; - println!("controller terminated"); } else if cmd == String::from("crash") { - println!("running zookeeper-controller in crash-testing mode"); + info!("running zookeeper-controller in crash-testing mode"); run_controller::(true).await?; - println!("controller terminated"); } else { - println!("wrong command; please use \"export\", \"run\" or \"crash\""); + error!("wrong command; please use \"export\", \"run\" or \"crash\""); } Ok(()) } -}