Skip to content

Commit

Permalink
Add scaling tests and perf tests for zookeeper and rabbitmq controlle…
Browse files Browse the repository at this point in the history
…rs (#254)

Add scaling tests for both zookeeper and rabbitmq, and add perf test for
rabbitmq. Also fix a liveness bug in the zookeeper controller
implementation, and remove unused entry `CLUSTER_SIZE` from the env.sh
in the config map object.

---------

Signed-off-by: Xudong Sun <[email protected]>
  • Loading branch information
marshtompsxd authored Sep 13, 2023
1 parent c8cfcbd commit 0492da2
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 20 deletions.
8 changes: 8 additions & 0 deletions e2e/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use kube::{
discovery::{ApiCapabilities, ApiResource, Discovery, Scope},
Client, CustomResource,
};
use std::process::Command;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum Error {
#[error("Failed to get kube client: {0}")]
Expand Down Expand Up @@ -120,3 +122,9 @@ pub async fn get_output_and_err(mut attached: AttachedProcess) -> (String, Strin
attached.join().await.unwrap();
(out, err)
}

pub fn run_command(program: &str, args: Vec<&str>, err_msg: &str) {
let cmd = Command::new(program).args(args).output().expect(err_msg);
println!("cmd output: {}", String::from_utf8_lossy(&cmd.stdout));
println!("cmd error: {}", String::from_utf8_lossy(&cmd.stderr));
}
130 changes: 130 additions & 0 deletions e2e/src/rabbitmq_e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,134 @@ pub async fn authenticate_user_test(client: Client, rabbitmq_name: String) -> Re
Ok(())
}

pub async fn scaling_test(client: Client, rabbitmq_name: String) -> Result<(), Error> {
let timeout = Duration::from_secs(360);
let start = Instant::now();
let sts_api: Api<StatefulSet> = Api::default_namespaced(client.clone());
let rabbitmq_sts_name = format!("{}-server", &rabbitmq_name);

run_command(
"kubectl",
vec![
"patch",
"rbmq",
"rabbitmq",
"--type=json",
"-p",
"[{\"op\": \"replace\", \"path\": \"/spec/replicas\", \"value\": 4}]",
],
"failed to scale rabbitmq",
);

loop {
sleep(Duration::from_secs(5)).await;
if start.elapsed() > timeout {
return Err(Error::Timeout);
}

let sts = sts_api.get(&rabbitmq_sts_name).await;
match sts {
Err(e) => {
println!("Get stateful set failed with error {}.", e);
continue;
}
Ok(sts) => {
if sts.spec.unwrap().replicas != Some(4) {
println!(
"Stateful set spec is not consistent with zookeeper cluster spec yet."
);
continue;
}
println!("Stateful set is found as expected.");
if sts.status.as_ref().unwrap().ready_replicas.is_none() {
println!("No stateful set pod is ready.");
} else if *sts
.status
.as_ref()
.unwrap()
.ready_replicas
.as_ref()
.unwrap()
== 4
{
println!("Scale up is done with 4 replicas ready.");
break;
} else {
println!(
"Scale up is in progress. {} pods are ready now.",
sts.status
.as_ref()
.unwrap()
.ready_replicas
.as_ref()
.unwrap()
);
}
}
};
}
println!("Scaling test passed.");
Ok(())
}

pub async fn rabbitmq_workload_test(client: Client, rabbitmq_name: String) -> Result<(), Error> {
run_command(
"kubectl",
vec![
"run",
"perf-test",
"--image=pivotalrabbitmq/perf-test",
"--",
"--uri",
"\"amqp://new_user:new_pass@rabbitmq\"",
],
"failed to run perf test pod",
);
let pod_name = "perf-test";
let pod_api: Api<Pod> = Api::default_namespaced(client.clone());
let timeout = Duration::from_secs(600);
let pert_test_duration = Duration::from_secs(20);
let start = Instant::now();
let mut perf_test_start: Option<Instant> = None;
loop {
sleep(Duration::from_secs(5)).await;
if start.elapsed() > timeout {
return Err(Error::Timeout);
}
match pod_api.get(pod_name).await {
Err(e) => {
println!("Get pod failed with {}, continue to wait.", e);
continue;
}
Ok(pod) => {
if pod.status.is_none() {
println!("Pod status is not available yet.");
continue;
} else if pod.status.unwrap().phase != Some("Running".to_string()) {
println!("Perf test pod is not running yet.");
continue;
} else {
if perf_test_start.is_none() {
println!("Perf test starts running.");
perf_test_start = Some(Instant::now());
continue;
} else {
if perf_test_start.unwrap().elapsed() > pert_test_duration {
break;
} else {
println!("Keep running perf test.");
continue;
}
}
}
}
};
}
// Shall we delete the perf test pod here?
println!("Rabbitmq workload test passed.");
Ok(())
}

pub async fn rabbitmq_e2e_test() -> Result<(), Error> {
// check if the CRD is already registered
let client = Client::try_default().await?;
Expand All @@ -158,7 +286,9 @@ pub async fn rabbitmq_e2e_test() -> Result<(), Error> {
let rabbitmq_name = apply(rabbitmq_cluster(), client.clone(), &discovery).await?;

desired_state_test(client.clone(), rabbitmq_name.clone()).await?;
scaling_test(client.clone(), rabbitmq_name.clone()).await?;
authenticate_user_test(client.clone(), rabbitmq_name.clone()).await?;
rabbitmq_workload_test(client.clone(), rabbitmq_name.clone()).await?;

println!("E2e test passed.");
Ok(())
Expand Down
174 changes: 172 additions & 2 deletions e2e/src/zookeeper_e2e.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
use k8s_openapi::api::apps::v1::StatefulSet;
use k8s_openapi::api::core::v1::Pod;
use k8s_openapi::api::core::v1::{ConfigMap, Pod, Service};
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use kube::{
api::{
Expand Down Expand Up @@ -65,13 +65,52 @@ pub fn zookeeper_cluster() -> String {
pub async fn desired_state_test(client: Client, zk_name: String) -> Result<(), Error> {
let timeout = Duration::from_secs(360);
let start = Instant::now();
let svc_api: Api<Service> = Api::default_namespaced(client.clone());
let cm_api: Api<ConfigMap> = Api::default_namespaced(client.clone());
let sts_api: Api<StatefulSet> = Api::default_namespaced(client.clone());
loop {
sleep(Duration::from_secs(5)).await;
if start.elapsed() > timeout {
return Err(Error::Timeout);
}

let svc = svc_api.get(&(zk_name.clone() + "-headless")).await;
match svc {
Err(e) => {
println!("Get headless service failed with error {}.", e);
continue;
}
_ => {}
};

let svc = svc_api.get(&(zk_name.clone() + "-client")).await;
match svc {
Err(e) => {
println!("Get client service failed with error {}.", e);
continue;
}
_ => {}
};

let svc = svc_api.get(&(zk_name.clone() + "-admin-server")).await;
match svc {
Err(e) => {
println!("Get admin server service failed with error {}.", e);
continue;
}
_ => {}
};

let cm = cm_api.get(&(zk_name.clone() + "-configmap")).await;
match cm {
Err(e) => {
println!("Get config map failed with error {}.", e);
continue;
}
_ => {}
};

// Check stateful set
let sts_api: Api<StatefulSet> = Api::default_namespaced(client.clone());
let sts = sts_api.get(&zk_name).await;
match sts {
Err(e) => {
Expand Down Expand Up @@ -115,6 +154,136 @@ pub async fn desired_state_test(client: Client, zk_name: String) -> Result<(), E
Ok(())
}

pub async fn scaling_test(client: Client, zk_name: String) -> Result<(), Error> {
let timeout = Duration::from_secs(360);
let start = Instant::now();
let sts_api: Api<StatefulSet> = Api::default_namespaced(client.clone());
run_command(
"kubectl",
vec![
"patch",
"zk",
"zookeeper",
"--type=json",
"-p",
"[{\"op\": \"replace\", \"path\": \"/spec/replicas\", \"value\": 2}]",
],
"failed to scale zk",
);

loop {
sleep(Duration::from_secs(5)).await;
if start.elapsed() > timeout {
return Err(Error::Timeout);
}

// Check stateful set
let sts = sts_api.get(&zk_name).await;
match sts {
Err(e) => {
println!("Get stateful set failed with error {}.", e);
continue;
}
Ok(sts) => {
if sts.spec.unwrap().replicas != Some(2) {
println!(
"Stateful set spec is not consistent with zookeeper cluster spec yet."
);
continue;
}
println!("Stateful set is found as expected.");
if sts.status.as_ref().unwrap().ready_replicas.is_none() {
println!("No stateful set pod is ready.");
} else if *sts
.status
.as_ref()
.unwrap()
.ready_replicas
.as_ref()
.unwrap()
== 2
{
println!("Scale down is done with 2 replicas ready.");
break;
} else {
println!(
"Scale down is in progress. {} pods are ready now.",
sts.status
.as_ref()
.unwrap()
.ready_replicas
.as_ref()
.unwrap()
);
}
}
};
}

run_command(
"kubectl",
vec![
"patch",
"zk",
"zookeeper",
"--type=json",
"-p",
"[{\"op\": \"replace\", \"path\": \"/spec/replicas\", \"value\": 3}]",
],
"failed to scale zk",
);

loop {
sleep(Duration::from_secs(5)).await;
if start.elapsed() > timeout {
return Err(Error::Timeout);
}

let sts = sts_api.get(&zk_name).await;
match sts {
Err(e) => {
println!("Get stateful set failed with error {}.", e);
continue;
}
Ok(sts) => {
if sts.spec.unwrap().replicas != Some(3) {
println!(
"Stateful set spec is not consistent with zookeeper cluster spec yet."
);
continue;
}
println!("Stateful set is found as expected.");
if sts.status.as_ref().unwrap().ready_replicas.is_none() {
println!("No stateful set pod is ready.");
} else if *sts
.status
.as_ref()
.unwrap()
.ready_replicas
.as_ref()
.unwrap()
== 3
{
println!("Scale up is done with 3 replicas ready.");
break;
} else {
println!(
"Scale up is in progress. {} pods are ready now.",
sts.status
.as_ref()
.unwrap()
.ready_replicas
.as_ref()
.unwrap()
);
}
}
};
}
println!("Scaling test passed.");
Ok(())
}

pub async fn zk_workload_test(client: Client, zk_name: String) -> Result<(), Error> {
let pod_api: Api<Pod> = Api::default_namespaced(client.clone());
let pod_name = zk_name.clone() + "-0";
Expand Down Expand Up @@ -199,6 +368,7 @@ pub async fn zookeeper_e2e_test() -> Result<(), Error> {
let zk_name = apply(zookeeper_cluster(), client.clone(), &discovery).await?;

desired_state_test(client.clone(), zk_name.clone()).await?;
scaling_test(client.clone(), zk_name.clone()).await?;
zk_workload_test(client.clone(), zk_name.clone()).await?;

println!("E2e test passed.");
Expand Down
Loading

0 comments on commit 0492da2

Please sign in to comment.