Skip to content

Commit

Permalink
Fix bug in updating labels for zookeeper (#282)
Browse files Browse the repository at this point in the history
Previously the zookeeper controller updates the `volume_claim_templates`
field inside the stateful set when the `.spec.labeles` or
`.spec.annotations` get updated by the user. However,
`volume_claim_templates` is immutable (see validation check here:
https://github.com/kubernetes/kubernetes/blob/v1.26.3/pkg/apis/apps/validation/validation.go#L171-L192)
and such update will deterministically fail, leading to liveness
violation. The current validation check logic in the state machine
specification needs to be further enhanced so that we can formally
reason about such behavior.

---------

Signed-off-by: Xudong Sun <[email protected]>
  • Loading branch information
marshtompsxd authored Sep 19, 2023
1 parent 98db831 commit 0c0fa54
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 119 deletions.
111 changes: 111 additions & 0 deletions e2e/src/zookeeper_e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,116 @@ pub async fn scaling_test(client: Client, zk_name: String) -> Result<(), Error>
Ok(())
}

pub async fn relabel_test(client: Client, zk_name: String) -> Result<(), Error> {
let timeout = Duration::from_secs(360);
let start = Instant::now();
let sts_api: Api<StatefulSet> = Api::default_namespaced(client.clone());
run_command(
"kubectl",
vec![
"patch",
"zk",
"zookeeper",
"--type=json",
"-p",
"[{\"op\": \"add\", \"path\": \"/spec/labels/key\", \"value\": \"val\"}]",
],
"failed to relabel zk",
);

// Sleep for extra 5 seconds to ensure the upgrading has started
sleep(Duration::from_secs(5)).await;
loop {
sleep(Duration::from_secs(5)).await;
if start.elapsed() > timeout {
return Err(Error::Timeout);
}

// Check stateful set
let sts = sts_api.get(&zk_name).await;
match sts {
Err(e) => {
println!("Get stateful set failed with error {}.", e);
continue;
}
Ok(sts) => {
if !sts
.spec
.as_ref()
.unwrap()
.template
.metadata
.as_ref()
.unwrap()
.labels
.as_ref()
.unwrap()
.contains_key("key")
{
println!("Label for pod is not updated yet");
continue;
}

if sts.status.as_ref().unwrap().updated_replicas.is_none() {
println!("No stateful set pod is updated yet.");
continue;
} else if *sts
.status
.as_ref()
.unwrap()
.updated_replicas
.as_ref()
.unwrap()
== 3
{
println!("Relabel is done.");
} else {
println!(
"Relabel is in progress. {} pods are updated now.",
sts.status
.as_ref()
.unwrap()
.updated_replicas
.as_ref()
.unwrap()
);
continue;
}

if sts.status.as_ref().unwrap().ready_replicas.is_none() {
println!("No stateful set pod is ready.");
continue;
} else if *sts
.status
.as_ref()
.unwrap()
.ready_replicas
.as_ref()
.unwrap()
== 3
{
println!("All stateful set pods are ready.");
break;
} else {
println!(
"Only {} pods are ready now.",
sts.status
.as_ref()
.unwrap()
.ready_replicas
.as_ref()
.unwrap()
);
continue;
}
}
};
}

println!("Relabel test passed.");
Ok(())
}

pub async fn upgrading_test(client: Client, zk_name: String) -> Result<(), Error> {
let timeout = Duration::from_secs(360);
let start = Instant::now();
Expand Down Expand Up @@ -673,6 +783,7 @@ pub async fn zookeeper_e2e_test() -> Result<(), Error> {

desired_state_test(client.clone(), zk_name.clone()).await?;
scaling_test(client.clone(), zk_name.clone()).await?;
relabel_test(client.clone(), zk_name.clone()).await?;
reconfiguration_test(client.clone(), zk_name.clone()).await?;
zk_workload_test(client.clone(), zk_name.clone()).await?;
upgrading_test(client.clone(), zk_name.clone()).await?;
Expand Down
121 changes: 69 additions & 52 deletions src/controller_examples/zookeeper_controller/exec/reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,27 +641,28 @@ pub fn reconcile_core(
// Update the stateful set only after we ensure
// that the ZK node has been set correctly.
let found_stateful_set = state.found_stateful_set_opt.as_ref().unwrap();
let latest_config_map_rv = state.latest_config_map_rv_opt.as_ref().unwrap();
let new_stateful_set = update_stateful_set(zk, found_stateful_set, latest_config_map_rv);
let req_o = KubeAPIRequest::UpdateRequest(KubeUpdateRequest {
api_resource: StatefulSet::api_resource(),
name: make_stateful_set_name(zk),
namespace: zk.metadata().namespace().unwrap(),
obj: new_stateful_set.to_dynamic_object(),
});
let state_prime = ZookeeperReconcileState {
reconcile_step: ZookeeperReconcileStep::AfterUpdateStatefulSet,
found_stateful_set_opt: None,
..state
};
return (state_prime, Some(Request::KRequest(req_o)));
} else {
let state_prime = ZookeeperReconcileState {
reconcile_step: ZookeeperReconcileStep::Error,
..state
};
return (state_prime, None);
if found_stateful_set.spec().is_some() {
let latest_config_map_rv = state.latest_config_map_rv_opt.as_ref().unwrap();
let new_stateful_set = update_stateful_set(zk, found_stateful_set, latest_config_map_rv);
let req_o = KubeAPIRequest::UpdateRequest(KubeUpdateRequest {
api_resource: StatefulSet::api_resource(),
name: make_stateful_set_name(zk),
namespace: zk.metadata().namespace().unwrap(),
obj: new_stateful_set.to_dynamic_object(),
});
let state_prime = ZookeeperReconcileState {
reconcile_step: ZookeeperReconcileStep::AfterUpdateStatefulSet,
found_stateful_set_opt: None,
..state
};
return (state_prime, Some(Request::KRequest(req_o)));
}
}
let state_prime = ZookeeperReconcileState {
reconcile_step: ZookeeperReconcileStep::Error,
..state
};
return (state_prime, None);
},
ZookeeperReconcileStep::AfterUpdateZKNode => {
if resp_o.is_some() && resp_o.as_ref().unwrap().is_external_response()
Expand All @@ -671,27 +672,28 @@ pub fn reconcile_core(
// Update the stateful set only after we ensure
// that the ZK node has been set correctly.
let found_stateful_set = state.found_stateful_set_opt.as_ref().unwrap();
let latest_config_map_rv = state.latest_config_map_rv_opt.as_ref().unwrap();
let new_stateful_set = update_stateful_set(zk, found_stateful_set, latest_config_map_rv);
let req_o = KubeAPIRequest::UpdateRequest(KubeUpdateRequest {
api_resource: StatefulSet::api_resource(),
name: make_stateful_set_name(zk),
namespace: zk.metadata().namespace().unwrap(),
obj: new_stateful_set.to_dynamic_object(),
});
let state_prime = ZookeeperReconcileState {
reconcile_step: ZookeeperReconcileStep::AfterUpdateStatefulSet,
found_stateful_set_opt: None,
..state
};
return (state_prime, Some(Request::KRequest(req_o)));
} else {
let state_prime = ZookeeperReconcileState {
reconcile_step: ZookeeperReconcileStep::Error,
..state
};
return (state_prime, None);
if found_stateful_set.spec().is_some() {
let latest_config_map_rv = state.latest_config_map_rv_opt.as_ref().unwrap();
let new_stateful_set = update_stateful_set(zk, found_stateful_set, latest_config_map_rv);
let req_o = KubeAPIRequest::UpdateRequest(KubeUpdateRequest {
api_resource: StatefulSet::api_resource(),
name: make_stateful_set_name(zk),
namespace: zk.metadata().namespace().unwrap(),
obj: new_stateful_set.to_dynamic_object(),
});
let state_prime = ZookeeperReconcileState {
reconcile_step: ZookeeperReconcileStep::AfterUpdateStatefulSet,
found_stateful_set_opt: None,
..state
};
return (state_prime, Some(Request::KRequest(req_o)));
}
}
let state_prime = ZookeeperReconcileState {
reconcile_step: ZookeeperReconcileStep::Error,
..state
};
return (state_prime, None);
},
ZookeeperReconcileStep::AfterCreateStatefulSet => {
if resp_o.is_some() && resp_o.as_ref().unwrap().is_k_response()
Expand Down Expand Up @@ -783,15 +785,25 @@ fn zk_node_data(zk: &ZookeeperCluster) -> (data: String)
new_strlit("CLUSTER_SIZE=").to_string().concat(i32_to_string(zk.spec().replicas()).as_str())
}

fn make_labels(zk: &ZookeeperCluster) -> (labels: StringMap)
fn make_base_labels(zk: &ZookeeperCluster) -> (labels: StringMap)
requires
[email protected]_formed(),
ensures
labels@ == zk_spec::make_labels(zk@),
labels@ == zk_spec::make_base_labels(zk@),
{
let mut labels = StringMap::empty();
labels.insert(new_strlit("app").to_string(), zk.metadata().name().unwrap());
labels.extend(zk.spec().labels());
labels
}

fn make_labels(zk: &ZookeeperCluster) -> (labels: StringMap)
requires
[email protected]_formed(),
ensures
labels@ == zk_spec::make_labels(zk@),
{
let mut labels = zk.spec().labels();
labels.extend(make_base_labels(zk));
labels
}

Expand Down Expand Up @@ -822,6 +834,7 @@ fn update_headless_service(zk: &ZookeeperCluster, found_headless_service: &Servi
headless_service.set_spec({
let mut spec = found_headless_service.spec().unwrap();
spec.set_ports(made_headless_service.spec().unwrap().ports().unwrap());
spec.set_selector(made_headless_service.spec().unwrap().selector().unwrap());
spec
});
headless_service
Expand Down Expand Up @@ -879,6 +892,7 @@ fn update_client_service(zk: &ZookeeperCluster, found_client_service: &Service)
client_service.set_spec({
let mut spec = found_client_service.spec().unwrap();
spec.set_ports(made_client_service.spec().unwrap().ports().unwrap());
spec.set_selector(made_client_service.spec().unwrap().selector().unwrap());
spec
});
client_service
Expand Down Expand Up @@ -932,6 +946,7 @@ fn update_admin_server_service(zk: &ZookeeperCluster, found_admin_server_service
admin_server_service.set_spec({
let mut spec = found_admin_server_service.spec().unwrap();
spec.set_ports(made_admin_server_service.spec().unwrap().ports().unwrap());
spec.set_selector(made_admin_server_service.spec().unwrap().selector().unwrap());
spec
});
admin_server_service
Expand Down Expand Up @@ -990,11 +1005,7 @@ fn make_service(zk: &ZookeeperCluster, name: String, ports: Vec<ServicePort>, cl
service_spec.set_cluster_ip(new_strlit("None").to_string());
}
service_spec.set_ports(ports);
service_spec.set_selector({
let mut selector = StringMap::empty();
selector.insert(new_strlit("app").to_string(), zk.metadata().name().unwrap());
selector
});
service_spec.set_selector(make_base_labels(zk));
service_spec
});

Expand Down Expand Up @@ -1166,6 +1177,7 @@ fn make_stateful_set_name(zk: &ZookeeperCluster) -> (name: String)
fn update_stateful_set(zk: &ZookeeperCluster, found_stateful_set: &StatefulSet, rv: &String) -> (stateful_set: StatefulSet)
requires
[email protected]_formed(),
[email protected]_Some(),
ensures
stateful_set@ == zk_spec::update_stateful_set(zk@, found_stateful_set@, rv@),
{
Expand All @@ -1177,7 +1189,13 @@ fn update_stateful_set(zk: &ZookeeperCluster, found_stateful_set: &StatefulSet,
metadata.set_annotations(made_stateful_set.metadata().annotations().unwrap());
metadata
});
stateful_set.set_spec(made_stateful_set.spec().unwrap());
stateful_set.set_spec({
let mut spec = found_stateful_set.spec().unwrap();
spec.set_replicas(made_stateful_set.spec().unwrap().replicas().unwrap());
spec.set_template(made_stateful_set.spec().unwrap().template());
spec.set_pvc_retention_policy(made_stateful_set.spec().unwrap().persistent_volume_claim_retention_policy().unwrap());
spec
});
stateful_set
}

Expand Down Expand Up @@ -1217,7 +1235,7 @@ fn make_stateful_set(zk: &ZookeeperCluster, rv: &String) -> (stateful_set: State
// Set the selector used for querying pods of this stateful set
stateful_set_spec.set_selector({
let mut selector = LabelSelector::default();
selector.set_match_labels(make_labels(zk));
selector.set_match_labels(make_base_labels(zk));
selector
});
stateful_set_spec.set_pvc_retention_policy({
Expand Down Expand Up @@ -1252,8 +1270,7 @@ fn make_stateful_set(zk: &ZookeeperCluster, rv: &String) -> (stateful_set: State
pvc.set_metadata({
let mut metadata = ObjectMeta::default();
metadata.set_name(new_strlit("data").to_string());
metadata.set_labels(make_labels(zk));
metadata.set_annotations(zk.spec().annotations());
metadata.set_labels(make_base_labels(zk));
metadata
});
pvc.set_spec({
Expand Down
Loading

0 comments on commit 0c0fa54

Please sign in to comment.