Skip to content

Commit

Permalink
Support persistence options for zookeeper (#273)
Browse files Browse the repository at this point in the history
Signed-off-by: Xudong Sun <[email protected]>
  • Loading branch information
marshtompsxd authored Sep 17, 2023
1 parent 55f9c2f commit 41508b7
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 66 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,20 @@ jobs:
run: ./local-test.sh zookeeper
- name: Run zookeeper e2e tests
run: cd e2e && cargo run -- zookeeper
zookeeper-ephemeral-e2e-test:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: "^1.20"
- name: Install kind
run: go install sigs.k8s.io/[email protected]
- name: Install Rust toolchain
run: |
curl --proto '=https' --tlsv1.2 --retry 10 --retry-connrefused -fsSL "https://sh.rustup.rs" | sh -s -- --default-toolchain none -y
- name: Deploy zookeeper controller
run: ./local-test.sh zookeeper
- name: Run zookeeper e2e tests
run: cd e2e && cargo run -- zookeeper-ephemeral
24 changes: 24 additions & 0 deletions deploy/zookeeper/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,29 @@ spec:
type: string
default: {}
type: object
persistence:
properties:
enabled:
type: boolean
x-kubernetes-validations:
- rule: "self == oldSelf"
message: changing persistence.enabled is not supported
storageClassName:
nullable: true
type: string
x-kubernetes-validations:
- rule: "self == oldSelf"
message: changing persistence.storageClassName is not supported
storageSize:
description: "Quantity is a fixed-point representation of a number. It provides convenient marshaling/unmarshaling in JSON and YAML, in addition to String() and AsInt64() accessors.\n\nThe serialization format is:\n\n``` <quantity> ::= <signedNumber><suffix>\n\n\t(Note that <suffix> may be empty, from the \"\" case in <decimalSI>.)\n\n<digit> ::= 0 | 1 | ... | 9 <digits> ::= <digit> | <digit><digits> <number> ::= <digits> | <digits>.<digits> | <digits>. | .<digits> <sign> ::= \"+\" | \"-\" <signedNumber> ::= <number> | <sign><number> <suffix> ::= <binarySI> | <decimalExponent> | <decimalSI> <binarySI> ::= Ki | Mi | Gi | Ti | Pi | Ei\n\n\t(International System of units; See: http://physics.nist.gov/cuu/Units/binary.html)\n\n<decimalSI> ::= m | \"\" | k | M | G | T | P | E\n\n\t(Note that 1024 = 1Ki but 1000 = 1k; I didn't choose the capitalization.)\n\n<decimalExponent> ::= \"e\" <signedNumber> | \"E\" <signedNumber> ```\n\nNo matter which of the three exponent forms is used, no quantity may represent a number greater than 2^63-1 in magnitude, nor may it have more than 3 decimal places. Numbers larger or more precise will be capped or rounded up. (E.g.: 0.1m will rounded up to 1m.) This may be extended in the future if we require larger or smaller quantities.\n\nWhen a Quantity is parsed from a string, it will remember the type of suffix it had, and will use the same type again when it is serialized.\n\nBefore serializing, Quantity will be put in \"canonical form\". This means that Exponent/suffix will be adjusted up or down (with a corresponding increase or decrease in Mantissa) such that:\n\n- No precision is lost - No fractional digits will be emitted - The exponent (or suffix) is as large as possible.\n\nThe sign will be omitted unless the number is negative.\n\nExamples:\n\n- 1.5 will be serialized as \"1500m\" - 1.5Gi will be serialized as \"1536Mi\"\n\nNote that the quantity will NEVER be internally represented by a floating point number. That is the whole point of this exercise.\n\nNon-canonical values will still parse as long as they are well formed, but will be re-emitted in their canonical form. (So always use canonical form, or don't diff.)\n\nThis format is intended to make it difficult to use these numbers without writing some sort of special handling code in the hopes that that will cause implementors to also use a fixed point implementation."
type: string
x-kubernetes-validations:
- rule: "self == oldSelf"
message: changing persistence.storageSize is not supported
required:
- enabled
- storageSize
type: object
ports:
properties:
adminServer:
Expand Down Expand Up @@ -646,6 +669,7 @@ spec:
required:
- conf
- image
- persistence
- ports
- replicas
type: object
Expand Down
3 changes: 3 additions & 0 deletions deploy/zookeeper/zookeeper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ spec:
autoPurgeSnapRetainCount: 3
autoPurgePurgeInterval: 1
quorumListenOnAllIps: false
persistence:
enabled: true
storageSize: 20Gi
6 changes: 5 additions & 1 deletion e2e/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use fluent_e2e::fluent_e2e_test;
use rabbitmq_e2e::rabbitmq_e2e_test;
use std::str::FromStr;
use std::{env, sync::Arc};
use zookeeper_e2e::zookeeper_e2e_test;
use zookeeper_e2e::{zookeeper_e2e_test, zookeeper_ephemeral_e2e_test};

#[tokio::main]
async fn main() -> Result<(), Error> {
Expand All @@ -21,6 +21,10 @@ async fn main() -> Result<(), Error> {
println!("Running zookeeper end-to-end test");
return zookeeper_e2e_test().await;
}
"zookeeper-ephemeral" => {
println!("Running zookeeper end-to-end test");
return zookeeper_ephemeral_e2e_test().await;
}
"rabbitmq" => {
println!("Running rabbitmq end-to-end test");
return rabbitmq_e2e_test().await;
Expand Down
71 changes: 71 additions & 0 deletions e2e/src/zookeeper_e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,48 @@ pub fn zookeeper_cluster() -> String {
autoPurgeSnapRetainCount: 3
autoPurgePurgeInterval: 1
quorumListenOnAllIps: false
persistence:
enabled: true
storageSize: 20Gi
"
.to_string()
}

pub fn zookeeper_cluster_ephemeral() -> String {
"
apiVersion: anvil.dev/v1
kind: ZookeeperCluster
metadata:
name: zookeeper
namespace: default
spec:
replicas: 3
image: pravega/zookeeper:0.2.14
ports:
client: 2181
quorum: 2888
leaderElection: 3888
metrics: 7000
adminServer: 8080
conf:
initLimit: 10
syncLimit: 2
tickTime: 2000
globalOutstandingLimit: 1000
preAllocSize: 65536
snapCount: 10000
commitLogCount: 500
snapSizeLimitInKb: 4194304
maxCnxns: 0
maxClientCnxns: 60
minSessionTimeout: 4000
maxSessionTimeout: 40000
autoPurgeSnapRetainCount: 3
autoPurgePurgeInterval: 1
quorumListenOnAllIps: false
persistence:
enabled: false
storageSize: 20Gi
"
.to_string()
}
Expand Down Expand Up @@ -633,3 +675,32 @@ pub async fn zookeeper_e2e_test() -> Result<(), Error> {
println!("E2e test passed.");
Ok(())
}

pub async fn zookeeper_ephemeral_e2e_test() -> Result<(), Error> {
// check if the CRD is already registered
let client = Client::try_default().await?;
let crd_api: Api<CustomResourceDefinition> = Api::all(client.clone());
let zk_crd = crd_api.get("zookeeperclusters.anvil.dev").await;
match zk_crd {
Err(e) => {
println!("No CRD found, create one before run the e2e test.");
return Err(Error::CRDGetFailed(e));
}
Ok(crd) => {
println!("CRD found, continue to run the e2e test.");
}
}

// create a zookeeper cluster
let discovery = Discovery::new(client.clone()).run().await?;
let zk_name = apply(zookeeper_cluster_ephemeral(), client.clone(), &discovery).await?;

desired_state_test(client.clone(), zk_name.clone()).await?;
scaling_test(client.clone(), zk_name.clone()).await?;
reconfiguration_test(client.clone(), zk_name.clone()).await?;
zk_workload_test(client.clone(), zk_name.clone()).await?;
upgrading_test(client.clone(), zk_name.clone()).await?;

println!("E2e test passed.");
Ok(())
}
104 changes: 60 additions & 44 deletions src/controller_examples/zookeeper_controller/exec/reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,55 +1213,63 @@ fn make_stateful_set(zk: &ZookeeperCluster, rv: &String) -> (stateful_set: State
});
// Set the templates used for creating the persistent volume claims attached to each pod
stateful_set_spec.set_volume_claim_templates({
let mut volume_claim_templates = Vec::new();
volume_claim_templates.push({
let mut pvc = PersistentVolumeClaim::default();
pvc.set_metadata({
let mut metadata = ObjectMeta::default();
metadata.set_name(new_strlit("data").to_string());
metadata.set_labels(make_labels(zk));
metadata.set_annotations(zk.spec().annotations());
metadata
});
pvc.set_spec({
let mut pvc_spec = PersistentVolumeClaimSpec::default();
pvc_spec.set_access_modes({
let mut access_modes = Vec::new();
access_modes.push(new_strlit("ReadWriteOnce").to_string());

proof {
assert_seqs_equal!(
[email protected]_values(|mode: String| mode@),
zk_spec::make_stateful_set(zk@, rv@)
.spec.get_Some_0().volume_claim_templates.get_Some_0()[0]
.spec.get_Some_0().access_modes.get_Some_0()
);
}

access_modes
if zk.spec().persistence().enabled() {
let mut volume_claim_templates = Vec::new();
volume_claim_templates.push({
let mut pvc = PersistentVolumeClaim::default();
pvc.set_metadata({
let mut metadata = ObjectMeta::default();
metadata.set_name(new_strlit("data").to_string());
metadata.set_labels(make_labels(zk));
metadata.set_annotations(zk.spec().annotations());
metadata
});
pvc_spec.set_resources({
let mut resources = ResourceRequirements::default();
resources.set_requests({
let mut requests = StringMap::empty();
requests.insert(new_strlit("storage").to_string(), new_strlit("20Gi").to_string());
requests
pvc.set_spec({
let mut pvc_spec = PersistentVolumeClaimSpec::default();
pvc_spec.set_access_modes({
let mut access_modes = Vec::new();
access_modes.push(new_strlit("ReadWriteOnce").to_string());
proof {
assert_seqs_equal!(
[email protected]_values(|mode: String| mode@),
zk_spec::make_stateful_set(zk@, rv@)
.spec.get_Some_0().volume_claim_templates.get_Some_0()[0]
.spec.get_Some_0().access_modes.get_Some_0()
);
}
access_modes
});
resources
pvc_spec.set_resources({
let mut resources = ResourceRequirements::default();
resources.set_requests({
let mut requests = StringMap::empty();
requests.insert(new_strlit("storage").to_string(), zk.spec().persistence().storage_size());
requests
});
resources
});
pvc_spec.overwrite_storage_class_name(zk.spec().persistence().storage_class_name());
pvc_spec
});
pvc_spec
pvc
});
pvc
});

proof {
assert_seqs_equal!(
[email protected]_values(|pvc: PersistentVolumeClaim| pvc@),
zk_spec::make_stateful_set(zk@, rv@).spec.get_Some_0().volume_claim_templates.get_Some_0()
);
proof {
assert_seqs_equal!(
[email protected]_values(|pvc: PersistentVolumeClaim| pvc@),
zk_spec::make_stateful_set(zk@, rv@).spec.get_Some_0().volume_claim_templates.get_Some_0()
);
}
volume_claim_templates
} else {
let empty_templates = Vec::<PersistentVolumeClaim>::new();
proof {
assert_seqs_equal!(
[email protected]_values(|pvc: PersistentVolumeClaim| pvc@),
zk_spec::make_stateful_set(zk@, rv@).spec.get_Some_0().volume_claim_templates.get_Some_0()
);
}
empty_templates
}

volume_claim_templates
});
stateful_set_spec
});
Expand Down Expand Up @@ -1432,6 +1440,14 @@ fn make_zk_pod_spec(zk: &ZookeeperCluster) -> (pod_spec: PodSpec)
});
volume
});
if !zk.spec().persistence().enabled() {
volumes.push({
let mut volume = Volume::default();
volume.set_name(new_strlit("data").to_string());
volume.set_empty_dir();
volume
});
}

proof {
assert_seqs_equal!(
Expand Down
59 changes: 59 additions & 0 deletions src/controller_examples/zookeeper_controller/exec/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ impl ZookeeperClusterSpec {
ZookeeperConfig::from_kube(self.inner.conf.clone())
}

#[verifier(external_body)]
pub fn persistence(&self) -> (persistence: ZookeeperPersistence)
ensures
persistence@ == self@.persistence,
{
ZookeeperPersistence::from_kube(self.inner.persistence.clone())
}

#[verifier(external_body)]
pub fn resources(&self) -> (resources: Option<ResourceRequirements>)
ensures
Expand Down Expand Up @@ -410,4 +418,55 @@ impl ResourceWrapper<deps_hack::ZookeeperConfig> for ZookeeperConfig {
}
}

#[verifier(external_body)]
pub struct ZookeeperPersistence {
inner: deps_hack::ZookeeperPersistence,
}

impl ZookeeperPersistence {
pub spec fn view(&self) -> ZookeeperPersistenceView;

#[verifier(external_body)]
pub fn enabled(&self) -> (enabled: bool)
ensures
enabled == self@.enabled,
{
self.inner.enabled
}

#[verifier(external_body)]
pub fn storage_size(&self) -> (storage_size: String)
ensures
storage_size@ == self@.storage_size,
{
String::from_rust_string(self.inner.storage_size.clone().0)
}

#[verifier(external_body)]
pub fn storage_class_name(&self) -> (storage_class_name: Option<String>)
ensures
self@.storage_class_name.is_Some() == storage_class_name.is_Some(),
storage_class_name.is_Some() ==> storage_class_name.get_Some_0()@ == self@.storage_class_name.get_Some_0(),
{
match &self.inner.storage_class_name {
Some(s) => Some(String::from_rust_string(s.clone())),
None => None,
}
}
}

impl ResourceWrapper<deps_hack::ZookeeperPersistence> for ZookeeperPersistence {
#[verifier(external)]
fn from_kube(inner: deps_hack::ZookeeperPersistence) -> ZookeeperPersistence {
ZookeeperPersistence {
inner: inner
}
}

#[verifier(external)]
fn into_kube(self) -> deps_hack::ZookeeperPersistence {
self.inner
}
}

}
Loading

0 comments on commit 41508b7

Please sign in to comment.