From 9da4f23df3260e409ad0857930bfcc662f38b456 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Sun, 12 Jun 2022 10:38:00 +0800 Subject: [PATCH] [FRS-66] Introduce shuffleworker deployed by statefulset --- .../core/config/KubernetesOptions.java | 54 ++++++ .../KubernetesShuffleWorkerParameters.java | 36 +++- .../KubernetesStatefulSetParameters.java | 40 ++++ .../RemoteShuffleApplicationReconciler.java | 77 +++++--- .../KubernetesStatefulSetBuilder.java | 135 ++++++++++++++ .../kubernetes/operator/util/Constants.java | 1 + ...KubernetesShuffleWorkerParametersTest.java | 15 ++ .../KubernetesStatefulSetBuilderTest.java | 176 ++++++++++++++++++ .../operator/util/KubernetesTestBase.java | 20 +- 9 files changed, 530 insertions(+), 24 deletions(-) create mode 100644 shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/parameters/KubernetesStatefulSetParameters.java create mode 100644 shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/resources/KubernetesStatefulSetBuilder.java create mode 100644 shuffle-kubernetes-operator/src/test/java/com/alibaba/flink/shuffle/kubernetes/operator/resources/KubernetesStatefulSetBuilderTest.java diff --git a/shuffle-core/src/main/java/com/alibaba/flink/shuffle/core/config/KubernetesOptions.java b/shuffle-core/src/main/java/com/alibaba/flink/shuffle/core/config/KubernetesOptions.java index 8cb44626..eae415a5 100644 --- a/shuffle-core/src/main/java/com/alibaba/flink/shuffle/core/config/KubernetesOptions.java +++ b/shuffle-core/src/main/java/com/alibaba/flink/shuffle/core/config/KubernetesOptions.java @@ -18,6 +18,9 @@ import com.alibaba.flink.shuffle.common.config.ConfigOption; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -25,6 +28,8 @@ /** This class holds configuration constants used by the remote shuffle deployment. */ public class KubernetesOptions { + private static final Logger LOG = LoggerFactory.getLogger(KubernetesOptions.class); + // -------------------------------------------------------------------------------------------- // Common configurations. // -------------------------------------------------------------------------------------------- @@ -277,6 +282,41 @@ public class KubernetesOptions { + "value:value1,effect:NoSchedule;key:key2,operator:Exists," + "effect:NoExecute,tolerationSeconds:6000."); + /** The number of shuffler worker. */ + public static final ConfigOption SHUFFLE_WORKER_REPLICAS = + new ConfigOption("remote-shuffle.kubernetes.worker.replicas") + .defaultValue(1) + .description("The number of shuffle worker"); + + /** The access mode of the shuffle worker's pvc. */ + public static final ConfigOption> SHUFFLE_WORKER_PVC_ACCESS_MODE = + new ConfigOption>("remote-shuffle.kubernetes.worker.pvc.access-mode") + .defaultValue(Collections.singletonList("ReadWriteOnce")) + .description("The access mode of the shuffle worker's pvc"); + + /** The storage class of the pvc. */ + public static final ConfigOption SHUFFLE_WORKER_PVC_STORAGE_CLASS = + new ConfigOption("remote-shuffle.kubernetes.worker.pvc.storage-class") + .defaultValue("alibabacloud-filesystem-ssd") + .description("The storage class of the pvc"); + + /** The storage size of each pvc. */ + public static final ConfigOption SHUFFLE_WORKER_PVC_STORAGE_SIZE = + new ConfigOption("remote-shuffle.kubernetes.worker.pvc.storage.size") + .defaultValue("100Gi") + .description("The storage size of each pvc."); + + /** The mount path of the worker pvc. */ + public static final ConfigOption SHUFFLE_WORKER_PVC_MOUNT_PATH = + new ConfigOption("remote-shuffle.kubernetes.worker.pvc.mount.path") + .defaultValue("/shuffle-data-pvc") + .description("The mount path of the worker pvc"); + + public static final ConfigOption SHUFFLE_WORKER_DEPLOY_MODE = + new ConfigOption("remote-shuffle.kubernetes.worker.deploy.mode") + .defaultValue(WorkerMode.DAEMON_SETS.name()) + .description("The deploy mode of the shuffle worker"); + /** * The prefix of Kubernetes resource limit factor. It should not be less than 1. The resource * could be cpu, memory, ephemeral-storage and all other types supported by Kubernetes. @@ -291,6 +331,20 @@ public class KubernetesOptions { public static final String SHUFFLE_WORKER_RESOURCE_LIMIT_FACTOR_PREFIX = "remote-shuffle.kubernetes.worker.limit-factor."; + /** The mode of the shuffle worker. */ + public enum WorkerMode { + STATEFUL_SETS, + DAEMON_SETS; + + public static WorkerMode fromString(String value) { + try { + return WorkerMode.valueOf(value); + } catch (Exception e) { + LOG.warn("Failed to parse the worker mode: " + value, e); + return DAEMON_SETS; + } + } + } // ------------------------------------------------------------------------ /** Not intended to be instantiated. */ diff --git a/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/parameters/KubernetesShuffleWorkerParameters.java b/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/parameters/KubernetesShuffleWorkerParameters.java index 6acf235b..8dca81bc 100644 --- a/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/parameters/KubernetesShuffleWorkerParameters.java +++ b/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/parameters/KubernetesShuffleWorkerParameters.java @@ -24,6 +24,7 @@ import com.alibaba.flink.shuffle.kubernetes.operator.util.Constants; import com.alibaba.flink.shuffle.kubernetes.operator.util.KubernetesUtils; +import io.fabric8.kubernetes.api.model.Quantity; import io.fabric8.kubernetes.api.model.apps.DaemonSet; import java.util.ArrayList; @@ -41,7 +42,7 @@ * ShuffleWorkers configuration to kubernetes {@link DaemonSet} configuration. */ public class KubernetesShuffleWorkerParameters extends AbstractKubernetesParameters - implements KubernetesDaemonSetParameters { + implements KubernetesDaemonSetParameters, KubernetesStatefulSetParameters { private final ShuffleWorkerProcessSpec shuffleWorkerProcessSpec; @@ -189,8 +190,41 @@ public String getDaemonSetName() { return KubernetesUtils.getShuffleWorkersNameWithClusterId(getClusterId()); } + @Override + public String getStatefulSetName() { + return KubernetesUtils.getShuffleWorkersNameWithClusterId(getClusterId()); + } + @Override public KubernetesPodParameters getPodTemplateParameters() { return this; } + + @Override + public int getReplicas() { + return conf.getInteger(KubernetesOptions.SHUFFLE_WORKER_REPLICAS); + } + + @Override + public Map getStorageResource() { + String size = conf.getString(KubernetesOptions.SHUFFLE_WORKER_PVC_STORAGE_SIZE); + Map resource = new HashMap<>(); + resource.put(Constants.RESOURCE_NAME_STORAGE, new Quantity(size)); + return resource; + } + + @Override + public String getStorageClass() { + return conf.getString(KubernetesOptions.SHUFFLE_WORKER_PVC_STORAGE_CLASS); + } + + @Override + public List getAccessMode() { + return conf.getList(KubernetesOptions.SHUFFLE_WORKER_PVC_ACCESS_MODE, String.class); + } + + @Override + public String getMountPath() { + return conf.getString(KubernetesOptions.SHUFFLE_WORKER_PVC_MOUNT_PATH); + } } diff --git a/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/parameters/KubernetesStatefulSetParameters.java b/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/parameters/KubernetesStatefulSetParameters.java new file mode 100644 index 00000000..3bf8c03b --- /dev/null +++ b/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/parameters/KubernetesStatefulSetParameters.java @@ -0,0 +1,40 @@ +/* + * Copyright 2021 The Flink Remote Shuffle Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.flink.shuffle.kubernetes.operator.parameters; + +import io.fabric8.kubernetes.api.model.Quantity; + +import java.util.List; +import java.util.Map; + +/** The parameters for StatefulSet. */ +public interface KubernetesStatefulSetParameters extends KubernetesCommonParameters { + + String getStatefulSetName(); + + KubernetesPodParameters getPodTemplateParameters(); + + int getReplicas(); + + Map getStorageResource(); + + String getStorageClass(); + + List getAccessMode(); + + String getMountPath(); +} diff --git a/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/reconciler/RemoteShuffleApplicationReconciler.java b/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/reconciler/RemoteShuffleApplicationReconciler.java index 7940fb73..38a54204 100644 --- a/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/reconciler/RemoteShuffleApplicationReconciler.java +++ b/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/reconciler/RemoteShuffleApplicationReconciler.java @@ -19,15 +19,18 @@ import com.alibaba.flink.shuffle.common.config.Configuration; import com.alibaba.flink.shuffle.common.functions.RunnableWithException; import com.alibaba.flink.shuffle.core.config.ClusterOptions; +import com.alibaba.flink.shuffle.core.config.KubernetesOptions; import com.alibaba.flink.shuffle.kubernetes.operator.crd.RemoteShuffleApplication; import com.alibaba.flink.shuffle.kubernetes.operator.parameters.K8sRemoteShuffleFileConfigsParameters; import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesConfigMapParameters; import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesDaemonSetParameters; import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesShuffleManagerParameters; import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesShuffleWorkerParameters; +import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesStatefulSetParameters; import com.alibaba.flink.shuffle.kubernetes.operator.resources.KubernetesConfigMapBuilder; import com.alibaba.flink.shuffle.kubernetes.operator.resources.KubernetesDaemonSetBuilder; import com.alibaba.flink.shuffle.kubernetes.operator.resources.KubernetesDeploymentBuilder; +import com.alibaba.flink.shuffle.kubernetes.operator.resources.KubernetesStatefulSetBuilder; import com.alibaba.flink.shuffle.kubernetes.operator.util.Constants; import com.alibaba.flink.shuffle.kubernetes.operator.util.KubernetesInternalOptions; import com.alibaba.flink.shuffle.kubernetes.operator.util.KubernetesUtils; @@ -36,6 +39,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.apps.DaemonSet; import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.StatefulSet; import io.fabric8.kubernetes.client.KubernetesClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +52,7 @@ /** * The {@link RemoteShuffleApplicationReconciler} is responsible for reconciling {@link - * RemoteShuffleApplication} to the desired state which is described by {@link + * RemoteShuffleApplication} to the desired state which is described by {@code * RemoteShuffleApplication#spec}. */ public class RemoteShuffleApplicationReconciler { @@ -130,27 +134,56 @@ private void reconcileShuffleManager(Configuration configuration, HasMetadata ow */ private void reconcileShuffleWorkers(Configuration configuration, HasMetadata owner) { - KubernetesDaemonSetParameters shuffleWorkerParameters = - new KubernetesShuffleWorkerParameters(configuration); - DaemonSet daemonSet = - new KubernetesDaemonSetBuilder() - .buildKubernetesResourceFrom(shuffleWorkerParameters); - KubernetesUtils.setOwnerReference(daemonSet, owner); - - LOG.info("Reconcile shuffle workers {}.", daemonSet.getMetadata().getName()); - executeReconcileWithRetry( - () -> { - LOG.debug("Try to create or update DaemonSet {}.", daemonSet.toString()); - this.kubeClient - .apps() - .daemonSets() - .inNamespace( - checkNotNull( - configuration.getString( - KubernetesInternalOptions.NAMESPACE))) - .createOrReplace(daemonSet); - }, - daemonSet.getMetadata().getName()); + KubernetesOptions.WorkerMode mode = + KubernetesOptions.WorkerMode.fromString( + configuration.getString(KubernetesOptions.SHUFFLE_WORKER_DEPLOY_MODE)); + + switch (mode) { + case STATEFUL_SETS: + KubernetesStatefulSetParameters statefulSetParameters = + new KubernetesShuffleWorkerParameters(configuration); + StatefulSet statefulSet = + KubernetesStatefulSetBuilder.INSTANCE.buildKubernetesResourceFrom( + statefulSetParameters); + KubernetesUtils.setOwnerReference(statefulSet, owner); + LOG.info("Reconcile shuffle workers {}.", statefulSet.getMetadata().getName()); + executeReconcileWithRetry( + () -> { + LOG.debug("Try to create or update StatefulSet {}.", statefulSet); + this.kubeClient + .apps() + .statefulSets() + .inNamespace( + checkNotNull( + configuration.getString( + KubernetesInternalOptions.NAMESPACE))) + .createOrReplace(statefulSet); + }, + statefulSet.getMetadata().getName()); + break; + case DAEMON_SETS: + KubernetesDaemonSetParameters shuffleWorkerParameters = + new KubernetesShuffleWorkerParameters(configuration); + DaemonSet daemonSet = + new KubernetesDaemonSetBuilder() + .buildKubernetesResourceFrom(shuffleWorkerParameters); + KubernetesUtils.setOwnerReference(daemonSet, owner); + LOG.info("Reconcile shuffle workers {}.", daemonSet.getMetadata().getName()); + executeReconcileWithRetry( + () -> { + LOG.debug( + "Try to create or update DaemonSet {}.", daemonSet.toString()); + this.kubeClient + .apps() + .daemonSets() + .inNamespace( + checkNotNull( + configuration.getString( + KubernetesInternalOptions.NAMESPACE))) + .createOrReplace(daemonSet); + }, + daemonSet.getMetadata().getName()); + } } /** diff --git a/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/resources/KubernetesStatefulSetBuilder.java b/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/resources/KubernetesStatefulSetBuilder.java new file mode 100644 index 00000000..a6d2690a --- /dev/null +++ b/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/resources/KubernetesStatefulSetBuilder.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.flink.shuffle.kubernetes.operator.resources; + +import com.alibaba.flink.shuffle.common.utils.CommonUtils; +import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesStatefulSetParameters; +import com.alibaba.flink.shuffle.kubernetes.operator.util.Constants; +import com.alibaba.flink.shuffle.kubernetes.operator.util.KubernetesUtils; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodTemplateSpec; +import io.fabric8.kubernetes.api.model.PodTemplateSpecBuilder; +import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; +import io.fabric8.kubernetes.api.model.VolumeMountBuilder; +import io.fabric8.kubernetes.api.model.apps.StatefulSet; +import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder; +import io.fabric8.kubernetes.api.model.apps.StatefulSetUpdateStrategyBuilder; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** Builder for StatefulSet. */ +public class KubernetesStatefulSetBuilder + implements KubernetesResourceBuilder { + + private static final String PVC_NAME = "shuffle-data"; + public static final KubernetesStatefulSetBuilder INSTANCE = new KubernetesStatefulSetBuilder(); + + @Override + public StatefulSet buildKubernetesResourceFrom(KubernetesStatefulSetParameters parameters) { + final Pod resolvedPod = + new KubernetesPodBuilder() + .buildKubernetesResourceFrom(parameters.getPodTemplateParameters()); + + injectPVCVolumeMounts(resolvedPod, parameters); + final Map labels = resolvedPod.getMetadata().getLabels(); + + return new StatefulSetBuilder() + .withApiVersion(Constants.APPS_API_VERSION) + .editOrNewMetadata() + .withName(parameters.getStatefulSetName()) + .withNamespace(parameters.getNamespace()) + .withLabels(parameters.getLabels()) + .endMetadata() + .editOrNewSpec() + .withServiceName(getServiceName(parameters)) + .withTemplate(getPodTemplate(resolvedPod)) + .withUpdateStrategy( + new StatefulSetUpdateStrategyBuilder() + .withType(Constants.ROLLING_UPDATE) + .build()) + .editOrNewSelector() + .addToMatchLabels(labels) + .endSelector() + .withReplicas(parameters.getReplicas()) + .addNewVolumeClaimTemplate() + .editOrNewMetadata() + .withName(getVolumeMountName(parameters.getStatefulSetName())) + .withLabels(labels) + .withNamespace(parameters.getNamespace()) + .endMetadata() + .editOrNewSpec() + .withStorageClassName(parameters.getStorageClass()) + .withAccessModes(parameters.getAccessMode()) + .withResources( + new ResourceRequirementsBuilder() + .withRequests(parameters.getStorageResource()) + .withLimits(parameters.getStorageResource()) + .build()) + .endSpec() + .endVolumeClaimTemplate() + .endSpec() + .build(); + } + + public PodTemplateSpec getPodTemplate(Pod resolvedPod) { + return new PodTemplateSpecBuilder() + .withMetadata(resolvedPod.getMetadata()) + .withSpec(resolvedPod.getSpec()) + .build(); + } + + private String getServiceName(KubernetesStatefulSetParameters parameters) { + return String.format("%s-service", parameters.getStatefulSetName()); + } + + private void injectPVCVolumeMounts( + Pod resolvedPod, KubernetesStatefulSetParameters parameters) { + List containersList = + resolvedPod.getSpec().getContainers().stream() + .filter( + c -> + c.getName() + .equals( + KubernetesUtils + .SHUFFLE_WORKER_CONTAINER_NAME)) + .collect(Collectors.toList()); + // must be 1; + CommonUtils.checkArgument( + containersList.size() == 1, + String.format( + "The target container is expect equal to 1, but get %s", + containersList.size())); + Container container = containersList.get(0); + container + .getVolumeMounts() + .add( + new VolumeMountBuilder() + .withName(getVolumeMountName(parameters.getStatefulSetName())) + .withMountPath(parameters.getMountPath()) + .build()); + } + + private String getVolumeMountName(String statefulSetName) { + return String.format("%s-%s", statefulSetName, PVC_NAME); + } +} diff --git a/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/util/Constants.java b/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/util/Constants.java index ba26a9e2..bebdff82 100644 --- a/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/util/Constants.java +++ b/shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/util/Constants.java @@ -24,6 +24,7 @@ public class Constants { public static final String RESOURCE_NAME_CPU = "cpu"; public static final String RESOURCE_NAME_MEMORY = "memory"; + public static final String RESOURCE_NAME_STORAGE = "storage"; public static final String RESOURCE_UNIT_MB = "Mi"; public static final int MAXIMUM_CHARACTERS_OF_CLUSTER_ID = 45; diff --git a/shuffle-kubernetes-operator/src/test/java/com/alibaba/flink/shuffle/kubernetes/operator/parameters/KubernetesShuffleWorkerParametersTest.java b/shuffle-kubernetes-operator/src/test/java/com/alibaba/flink/shuffle/kubernetes/operator/parameters/KubernetesShuffleWorkerParametersTest.java index eac113f7..77cb0340 100644 --- a/shuffle-kubernetes-operator/src/test/java/com/alibaba/flink/shuffle/kubernetes/operator/parameters/KubernetesShuffleWorkerParametersTest.java +++ b/shuffle-kubernetes-operator/src/test/java/com/alibaba/flink/shuffle/kubernetes/operator/parameters/KubernetesShuffleWorkerParametersTest.java @@ -69,6 +69,11 @@ public void setup() { WorkerOptions.JVM_METASPACE, MemorySize.parse(CONTAINER_JVM_METASPACE_MB + "m")); conf.setMemorySize( WorkerOptions.JVM_OVERHEAD, MemorySize.parse(CONTAINER_JVM_OVERHEAD_MB + "m")); + conf.setInteger(KubernetesOptions.SHUFFLE_WORKER_REPLICAS, WORKER_REPLICAS); + conf.setString(KubernetesOptions.SHUFFLE_WORKER_PVC_STORAGE_SIZE, PVC_STORAGE_SIZE); + conf.setString(KubernetesOptions.SHUFFLE_WORKER_PVC_STORAGE_CLASS, STORAGE_CLASS); + conf.setList(KubernetesOptions.SHUFFLE_WORKER_PVC_ACCESS_MODE, ACCESS_MODE); + conf.setString(KubernetesOptions.SHUFFLE_WORKER_PVC_MOUNT_PATH, MOUNT_PATH); shuffleWorkerParameters = new KubernetesShuffleWorkerParameters(conf); } @@ -153,4 +158,14 @@ public void testGetContainerCommandAndArgs() { public void testGetDaemonSetName() { assertThat(shuffleWorkerParameters.getDaemonSetName(), is(CLUSTER_ID + "-shuffleworker")); } + + @Test + public void testStatefulSetParameters() { + assertThat(shuffleWorkerParameters.getStatefulSetName(), is(CLUSTER_ID + "-shuffleworker")); + assertThat(shuffleWorkerParameters.getReplicas(), is(WORKER_REPLICAS)); + assertThat(shuffleWorkerParameters.getStorageResource(), is(STORAGE_RESOURCES)); + assertThat(shuffleWorkerParameters.getStorageClass(), is(STORAGE_CLASS)); + assertThat(shuffleWorkerParameters.getAccessMode(), is(ACCESS_MODE)); + assertThat(shuffleWorkerParameters.getMountPath(), is(MOUNT_PATH)); + } } diff --git a/shuffle-kubernetes-operator/src/test/java/com/alibaba/flink/shuffle/kubernetes/operator/resources/KubernetesStatefulSetBuilderTest.java b/shuffle-kubernetes-operator/src/test/java/com/alibaba/flink/shuffle/kubernetes/operator/resources/KubernetesStatefulSetBuilderTest.java new file mode 100644 index 00000000..5b6a6486 --- /dev/null +++ b/shuffle-kubernetes-operator/src/test/java/com/alibaba/flink/shuffle/kubernetes/operator/resources/KubernetesStatefulSetBuilderTest.java @@ -0,0 +1,176 @@ +/* + * Copyright 2021 The Flink Remote Shuffle Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.flink.shuffle.kubernetes.operator.resources; + +import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesContainerParameters; +import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesPodParameters; +import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesStatefulSetParameters; +import com.alibaba.flink.shuffle.kubernetes.operator.util.KubernetesTestBase; +import com.alibaba.flink.shuffle.kubernetes.operator.util.KubernetesUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaim; +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.apps.StatefulSet; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; + +/** Test for {@link KubernetesStatefulSetBuilder}. */ +public class KubernetesStatefulSetBuilderTest extends KubernetesTestBase { + + private StatefulSet statefulSet; + + @Before + public void setup() throws JsonProcessingException { + KubernetesStatefulSetParameters statefulSetParameters = new TestingStatefulSetsParameters(); + statefulSet = + KubernetesStatefulSetBuilder.INSTANCE.buildKubernetesResourceFrom( + statefulSetParameters); + } + + @Test + public void testStatefulSetName() { + assertEquals(STATEFUL_SET_NAME, statefulSet.getMetadata().getName()); + } + + @Test + public void testNameSpace() { + assertEquals(NAMESPACE, statefulSet.getMetadata().getNamespace()); + } + + @Test + public void testLabels() { + assertEquals(USER_LABELS, statefulSet.getMetadata().getLabels()); + } + + @Test + public void testReplicas() { + assertEquals(Integer.valueOf(WORKER_REPLICAS), statefulSet.getSpec().getReplicas()); + } + + @Test + public void testSelector() { + assertEquals(USER_LABELS, statefulSet.getSpec().getTemplate().getMetadata().getLabels()); + } + + @Test + public void testPvc() { + Assert.assertEquals(1, statefulSet.getSpec().getVolumeClaimTemplates().size()); + PersistentVolumeClaim pvc = statefulSet.getSpec().getVolumeClaimTemplates().get(0); + assertEquals(STORAGE_RESOURCES, pvc.getSpec().getResources().getRequests()); + assertEquals(STORAGE_RESOURCES, pvc.getSpec().getResources().getLimits()); + assertEquals(STORAGE_CLASS, pvc.getSpec().getStorageClassName()); + assertEquals(ACCESS_MODE, pvc.getSpec().getAccessModes()); + + // pvc mounted + List containers = + statefulSet.getSpec().getTemplate().getSpec().getContainers().stream() + .filter( + c -> + c.getName() + .equals( + KubernetesUtils + .SHUFFLE_WORKER_CONTAINER_NAME)) + .collect(Collectors.toList()); + Assert.assertEquals(1, containers.size()); + boolean exists = false; + for (VolumeMount volumeMount : containers.get(0).getVolumeMounts()) { + if (volumeMount.getName().equals(pvc.getMetadata().getName())) { + exists = true; + Assert.assertEquals(MOUNT_PATH, volumeMount.getMountPath()); + break; + } + } + Assert.assertTrue("The pvc not mounted to the pod template", exists); + } + + /** Simple {@link KubernetesStatefulSetParameters} implementation for testing purposes. */ + public static class TestingStatefulSetsParameters implements KubernetesStatefulSetParameters { + + @Override + public String getNamespace() { + return NAMESPACE; + } + + @Override + public Map getLabels() { + return USER_LABELS; + } + + @Override + public String getStatefulSetName() { + return STATEFUL_SET_NAME; + } + + @Override + public KubernetesPodParameters getPodTemplateParameters() { + return new TestingPodParametersWithContainerName(); + } + + @Override + public int getReplicas() { + return WORKER_REPLICAS; + } + + @Override + public Map getStorageResource() { + return STORAGE_RESOURCES; + } + + @Override + public String getStorageClass() { + return STORAGE_CLASS; + } + + @Override + public List getAccessMode() { + return ACCESS_MODE; + } + + @Override + public String getMountPath() { + return MOUNT_PATH; + } + } + + /** Simple {@link KubernetesPodParameters} implementation for testing purposes. */ + public static class TestingPodParametersWithContainerName + extends KubernetesPodBuilderTest.TestingPodParameters { + @Override + public KubernetesContainerParameters getContainerParameters() { + return new TestingContainerParametersWithContainerName(); + } + } + + /** Simple {@link KubernetesContainerParameters} implementation for testing purposes. */ + public static class TestingContainerParametersWithContainerName + extends KubernetesContainerBuilderTest.TestingContainerParameters { + @Override + public String getContainerName() { + return KubernetesUtils.SHUFFLE_WORKER_CONTAINER_NAME; + } + } +} diff --git a/shuffle-kubernetes-operator/src/test/java/com/alibaba/flink/shuffle/kubernetes/operator/util/KubernetesTestBase.java b/shuffle-kubernetes-operator/src/test/java/com/alibaba/flink/shuffle/kubernetes/operator/util/KubernetesTestBase.java index d4aa50a9..e511af0e 100644 --- a/shuffle-kubernetes-operator/src/test/java/com/alibaba/flink/shuffle/kubernetes/operator/util/KubernetesTestBase.java +++ b/shuffle-kubernetes-operator/src/test/java/com/alibaba/flink/shuffle/kubernetes/operator/util/KubernetesTestBase.java @@ -19,7 +19,10 @@ import com.alibaba.flink.shuffle.common.config.MemorySize; import com.alibaba.flink.shuffle.kubernetes.operator.parameters.util.ConfigMapVolume; +import io.fabric8.kubernetes.api.model.Quantity; + import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,11 +54,26 @@ public class KubernetesTestBase { protected static final String CONTAINER_JVM_OPTIONS = "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4"; - // deployment, daemonset, configmap name + // deployment, daemonset, stateful-sets, configmap name protected static final String DEPLOYMENT_NAME = "TestingDeployment"; + protected static final String STATEFUL_SET_NAME = "TestingStatefulSet"; protected static final String DAEMON_SET_NAME = "TestingDaemonSet"; protected static final String CONFIG_MAP_NAME = "TestingConfigMap"; + // stateful sets mock parameters + protected static final int WORKER_REPLICAS = 10; + protected static final String PVC_STORAGE_SIZE = "10Gi"; + protected static final Map STORAGE_RESOURCES = + new HashMap() { + { + put(Constants.RESOURCE_NAME_STORAGE, new Quantity(PVC_STORAGE_SIZE)); + } + }; + protected static final String STORAGE_CLASS = "TestingStorageClass"; + protected static final String PVC_ACCESS_MODE = "ReadWriteOnce"; + protected static final List ACCESS_MODE = Collections.singletonList(PVC_ACCESS_MODE); + protected static final String MOUNT_PATH = "/shuffle-data"; + protected static final List> CONTAINER_VOLUME_MOUNTS = new ArrayList>() { {