diff --git a/Makefile-test.mk b/Makefile-test.mk index 866081b7a6..1b1106febb 100644 --- a/Makefile-test.mk +++ b/Makefile-test.mk @@ -88,11 +88,13 @@ test-integration: gomod-download envtest ginkgo dep-crds kueuectl ginkgo-top ## CREATE_KIND_CLUSTER ?= true .PHONY: test-e2e -test-e2e: kustomize ginkgo yq gomod-download dep-crds kueuectl ginkgo-top run-test-e2e-$(E2E_KIND_VERSION:kindest/node:v%=%) +test-e2e: kustomize ginkgo yq gomod-download dep-crds kueuectl ginkgo-top run-test-e2e-$(E2E_KIND_VERSION:kindest/node:v%=%) run-test-tas-e2e-$(E2E_KIND_VERSION:kindest/node:v%=%) .PHONY: test-multikueue-e2e test-multikueue-e2e: kustomize ginkgo yq gomod-download dep-crds ginkgo-top run-test-multikueue-e2e-$(E2E_KIND_VERSION:kindest/node:v%=%) +.PHONY: test-tas-e2e +test-tas-e2e: kustomize ginkgo yq gomod-download dep-crds kueuectl ginkgo-top run-test-tas-e2e-$(E2E_KIND_VERSION:kindest/node:v%=%) E2E_TARGETS := $(addprefix run-test-e2e-,${E2E_K8S_VERSIONS}) MULTIKUEUE-E2E_TARGETS := $(addprefix run-test-multikueue-e2e-,${E2E_K8S_VERSIONS}) @@ -104,15 +106,25 @@ FORCE: run-test-e2e-%: K8S_VERSION = $(@:run-test-e2e-%=%) run-test-e2e-%: FORCE @echo Running e2e for k8s ${K8S_VERSION} - E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) KUBEFLOW_MPI_VERSION=$(KUBEFLOW_MPI_VERSION) ./hack/e2e-test.sh + E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" \ + JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) KUBEFLOW_MPI_VERSION=$(KUBEFLOW_MPI_VERSION) KIND_CLUSTER_FILE="kind-cluster.yaml" E2E_TARGET_FOLDER="singlecluster" ./hack/e2e-test.sh $(PROJECT_DIR)/bin/ginkgo-top -i $(ARTIFACTS)/$@/e2e.json > $(ARTIFACTS)/$@/e2e-top.yaml run-test-multikueue-e2e-%: K8S_VERSION = $(@:run-test-multikueue-e2e-%=%) run-test-multikueue-e2e-%: FORCE @echo Running multikueue e2e for k8s ${K8S_VERSION} - E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) KUBEFLOW_MPI_VERSION=$(KUBEFLOW_MPI_VERSION) ./hack/multikueue-e2e-test.sh + E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" \ + JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) KUBEFLOW_MPI_VERSION=$(KUBEFLOW_MPI_VERSION) ./hack/multikueue-e2e-test.sh $(PROJECT_DIR)/bin/ginkgo-top -i $(ARTIFACTS)/$@/e2e.json > $(ARTIFACTS)/$@/e2e-top.yaml +run-test-tas-e2e-%: K8S_VERSION = $(@:run-test-tas-e2e-%=%) +run-test-tas-e2e-%: FORCE + @echo Running tas e2e for k8s ${K8S_VERSION} + E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" \ + JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) KUBEFLOW_MPI_VERSION=$(KUBEFLOW_MPI_VERSION) KIND_CLUSTER_FILE="tas-kind-cluster.yaml" E2E_TARGET_FOLDER="tas" ./hack/e2e-test.sh + $(PROJECT_DIR)/bin/ginkgo-top -i $(ARTIFACTS)/$@/e2e.json > $(ARTIFACTS)/$@/e2e-top.yaml + + SCALABILITY_RUNNER := $(PROJECT_DIR)/bin/performance-scheduler-runner .PHONY: performance-scheduler-runner performance-scheduler-runner: diff --git a/hack/e2e-test.sh b/hack/e2e-test.sh index d4281d6cee..4d32404248 100755 --- a/hack/e2e-test.sh +++ b/hack/e2e-test.sh @@ -42,7 +42,7 @@ function startup { if [ ! -d "$ARTIFACTS" ]; then mkdir -p "$ARTIFACTS" fi - cluster_create "$KIND_CLUSTER_NAME" "$SOURCE_DIR/kind-cluster.yaml" + cluster_create "$KIND_CLUSTER_NAME" "$SOURCE_DIR/$KIND_CLUSTER_FILE" fi } @@ -66,4 +66,4 @@ startup kind_load kueue_deploy # shellcheck disable=SC2086 -$GINKGO $GINKGO_ARGS --junit-report=junit.xml --json-report=e2e.json --output-dir="$ARTIFACTS" -v ./test/e2e/singlecluster/... +$GINKGO $GINKGO_ARGS --junit-report=junit.xml --json-report=e2e.json --output-dir="$ARTIFACTS" -v ./test/e2e/$E2E_TARGET_FOLDER/... diff --git a/hack/tas-kind-cluster.yaml b/hack/tas-kind-cluster.yaml new file mode 100644 index 0000000000..ecd4e67aa8 --- /dev/null +++ b/hack/tas-kind-cluster.yaml @@ -0,0 +1,59 @@ +--- +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +nodes: + - role: control-plane + kubeadmConfigPatches: + - | + kind: ClusterConfiguration + apiVersion: kubeadm.k8s.io/v1beta3 + scheduler: + extraArgs: + v: "2" + controllerManager: + extraArgs: + v: "2" + apiServer: + extraArgs: + enable-aggregator-routing: "true" + v: "2" + - role: worker + labels: + cloud.provider.com/node-group: tas-group + cloud.provider.com/topology-block: b1 + cloud.provider.com/topology-rack: r1 + - role: worker + labels: + cloud.provider.com/node-group: tas-group + cloud.provider.com/topology-block: b1 + cloud.provider.com/topology-rack: r1 + - role: worker + labels: + cloud.provider.com/node-group: tas-group + cloud.provider.com/topology-block: b1 + cloud.provider.com/topology-rack: r2 + - role: worker + labels: + cloud.provider.com/node-group: tas-group + cloud.provider.com/topology-block: b1 + cloud.provider.com/topology-rack: r2 + - role: worker + labels: + cloud.provider.com/node-group: tas-group + cloud.provider.com/topology-block: b2 + cloud.provider.com/topology-rack: r1 + - role: worker + labels: + cloud.provider.com/node-group: tas-group + cloud.provider.com/topology-block: b2 + cloud.provider.com/topology-rack: r1 + - role: worker + labels: + cloud.provider.com/node-group: tas-group + cloud.provider.com/topology-block: b2 + cloud.provider.com/topology-rack: r2 + - role: worker + labels: + cloud.provider.com/node-group: tas-group + cloud.provider.com/topology-block: b2 + cloud.provider.com/topology-rack: r2 diff --git a/pkg/util/testingjobs/job/wrappers.go b/pkg/util/testingjobs/job/wrappers.go index 862194afdd..956da07d05 100644 --- a/pkg/util/testingjobs/job/wrappers.go +++ b/pkg/util/testingjobs/job/wrappers.go @@ -51,7 +51,7 @@ func MakeJob(name, ns string) *JobWrapper { { Name: "c", Image: "pause", - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}, Limits: corev1.ResourceList{}}, }, }, NodeSelector: map[string]string{}, @@ -180,6 +180,12 @@ func (j *JobWrapper) Request(r corev1.ResourceName, v string) *JobWrapper { return j } +// Limit adds a resource limit to the default container. +func (j *JobWrapper) Limit(r corev1.ResourceName, v string) *JobWrapper { + j.Spec.Template.Spec.Containers[0].Resources.Limits[r] = resource.MustParse(v) + return j +} + func (j *JobWrapper) Image(image string, args []string) *JobWrapper { j.Spec.Template.Spec.Containers[0].Image = image j.Spec.Template.Spec.Containers[0].Args = args diff --git a/test/e2e/tas/suite_test.go b/test/e2e/tas/suite_test.go new file mode 100644 index 0000000000..e468663b3f --- /dev/null +++ b/test/e2e/tas/suite_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tase2e + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + clientutil "sigs.k8s.io/kueue/pkg/util/client" + "sigs.k8s.io/kueue/test/util" +) + +var ( + k8sClient client.WithWatch + ctx context.Context +) + +func TestAPIs(t *testing.T) { + suiteName := "End To End TAS Suite" + if ver, found := os.LookupEnv("E2E_KIND_VERSION"); found { + suiteName = fmt.Sprintf("%s: %s", suiteName, ver) + } + gomega.RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, + suiteName, + ) +} + +var _ = ginkgo.BeforeSuite(func() { + ctrl.SetLogger(util.NewTestingLogger(ginkgo.GinkgoWriter, -3)) + + k8sClient, _ = util.CreateClientUsingCluster("") + ctx = context.Background() + + waitForAvailableStart := time.Now() + util.WaitForKueueAvailability(ctx, k8sClient) + util.WaitForJobSetAvailability(ctx, k8sClient) + ginkgo.GinkgoLogr.Info("Kueue and JobSet operators are available in the cluster", "waitingTime", time.Since(waitForAvailableStart)) + + nodes := &corev1.NodeList{} + requiredLabels := client.MatchingLabels{} + requiredLabelKeys := client.HasLabels{tasNodeGroupLabel} + err := k8sClient.List(ctx, nodes, requiredLabels, requiredLabelKeys) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "failed to list nodes for TAS") + + for _, n := range nodes.Items { + err := clientutil.PatchStatus(ctx, k8sClient, &n, func() (bool, error) { + n.Status.Capacity[extraResource] = resource.MustParse("1") + n.Status.Allocatable[extraResource] = resource.MustParse("1") + return true, nil + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } +}) diff --git a/test/e2e/tas/tas_test.go b/test/e2e/tas/tas_test.go new file mode 100644 index 0000000000..db095ea441 --- /dev/null +++ b/test/e2e/tas/tas_test.go @@ -0,0 +1,236 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tase2e + +import ( + "fmt" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" + "sigs.k8s.io/kueue/pkg/util/testing" + testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" + "sigs.k8s.io/kueue/pkg/workload" + "sigs.k8s.io/kueue/test/util" +) + +const ( + instanceType = "tas-group" + tasNodeGroupLabel = "cloud.provider.com/node-group" + topologyLevelRack = "cloud.provider.com/topology-rack" + topologyLevelBlock = "cloud.provider.com/topology-block" + topologyLevelHostname = "kubernetes.io/hostname" + extraResource = "example.com/gpu" +) + +var _ = ginkgo.Describe("TopologyAwareScheduling", func() { + var ns *corev1.Namespace + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "e2e-tas-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + + ginkgo.When("Creating a Job that can't fit in one Rack", func() { + var ( + topology *kueuealpha.Topology + tasFlavor *kueue.ResourceFlavor + localQueue *kueue.LocalQueue + clusterQueue *kueue.ClusterQueue + ) + ginkgo.BeforeEach(func() { + topology = testing.MakeTopology("datacenter").Levels([]string{ + topologyLevelBlock, + topologyLevelRack, + topologyLevelHostname, + }).Obj() + gomega.Expect(k8sClient.Create(ctx, topology)).Should(gomega.Succeed()) + + tasFlavor = testing.MakeResourceFlavor("tas-flavor"). + NodeLabel(tasNodeGroupLabel, instanceType).TopologyName(topology.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, tasFlavor)).Should(gomega.Succeed()) + clusterQueue = testing.MakeClusterQueue("cluster-queue"). + ResourceGroup( + *testing.MakeFlavorQuotas("tas-flavor"). + Resource(extraResource, "8"). + Obj(), + ). + Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed()) + util.ExpectClusterQueuesToBeActive(ctx, k8sClient, clusterQueue) + + localQueue = testing.MakeLocalQueue("main", ns.Name).ClusterQueue("cluster-queue").Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteAllJobsInNamespace(ctx, k8sClient, ns)).Should(gomega.Succeed()) + // Force remove workloads to be sure that cluster queue can be removed. + gomega.Expect(util.DeleteWorkloadsInNamespace(ctx, k8sClient, ns)).Should(gomega.Succeed()) + gomega.Expect(util.DeleteObject(ctx, k8sClient, localQueue)).Should(gomega.Succeed()) + gomega.Expect(util.DeleteObject(ctx, k8sClient, topology)).Should(gomega.Succeed()) + util.ExpectObjectToBeDeleted(ctx, k8sClient, clusterQueue, true) + util.ExpectObjectToBeDeleted(ctx, k8sClient, tasFlavor, true) + }) + + ginkgo.It("Should not admit a Job if Rack required", func() { + sampleJob := testingjob.MakeJob("test-job", ns.Name). + Queue(localQueue.Name). + Parallelism(3). + Completions(3). + Request(extraResource, "1"). + Limit(extraResource, "1"). + Obj() + jobKey := client.ObjectKeyFromObject(sampleJob) + sampleJob = (&testingjob.JobWrapper{Job: *sampleJob}). + PodAnnotation(kueuealpha.PodSetRequiredTopologyAnnotation, topologyLevelRack). + Image(util.E2eTestSleepImage, []string{"100ms"}). + Obj() + gomega.Expect(k8sClient.Create(ctx, sampleJob)).Should(gomega.Succeed()) + + expectJobWithSuspendedAndNodeSelectors(jobKey, true, nil) + wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(sampleJob.Name, sampleJob.UID), Namespace: ns.Name} + ginkgo.By(fmt.Sprintf("workload %q not getting an admission", wlLookupKey), func() { + createdWorkload := &kueue.Workload{} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) + g.Expect(createdWorkload.Status.Admission).Should(gomega.BeNil()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + }) + + ginkgo.It("should admit a Job to TAS Block if Rack preferred", func() { + sampleJob := testingjob.MakeJob("test-job", ns.Name). + Queue(localQueue.Name). + Parallelism(3). + Completions(3). + Request(extraResource, "1"). + Limit(extraResource, "1"). + Obj() + jobKey := client.ObjectKeyFromObject(sampleJob) + sampleJob = (&testingjob.JobWrapper{Job: *sampleJob}). + PodAnnotation(kueuealpha.PodSetPreferredTopologyAnnotation, topologyLevelRack). + Image(util.E2eTestSleepImage, []string{"100ms"}). + Obj() + gomega.Expect(k8sClient.Create(ctx, sampleJob)).Should(gomega.Succeed()) + + expectJobWithSuspendedAndNodeSelectors(jobKey, false, map[string]string{ + tasNodeGroupLabel: instanceType, + }) + wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(sampleJob.Name, sampleJob.UID), Namespace: ns.Name} + createdWorkload := &kueue.Workload{} + ginkgo.By(fmt.Sprintf("await for admission of workload %q and verify TopologyAssignment", wlLookupKey), func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) + g.Expect(createdWorkload.Status.Admission).ShouldNot(gomega.BeNil()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + gomega.Expect(createdWorkload.Status.Admission.PodSetAssignments).Should(gomega.HaveLen(1)) + gomega.Expect(createdWorkload.Status.Admission.PodSetAssignments[0].TopologyAssignment.Levels).Should(gomega.BeComparableTo( + []string{ + topologyLevelBlock, + topologyLevelRack, + topologyLevelHostname, + }, + )) + podCountPerBlock := map[string]int32{} + for _, d := range createdWorkload.Status.Admission.PodSetAssignments[0].TopologyAssignment.Domains { + podCountPerBlock[d.Values[0]] += d.Count + } + // both pod assignments are in the same block + gomega.Expect(podCountPerBlock).Should(gomega.HaveLen(1)) + // pod assignment count equals job parallelism + for _, pd := range podCountPerBlock { + gomega.Expect(pd).Should(gomega.Equal(ptr.Deref[int32](sampleJob.Spec.Parallelism, 0))) + } + }) + ginkgo.By(fmt.Sprintf("verify the workload %q gets finished", wlLookupKey), func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) + g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeTrue()) + g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + }) + + ginkgo.It("Should admit a Job to TAS Block if Block required", func() { + sampleJob := testingjob.MakeJob("test-job", ns.Name). + Queue(localQueue.Name). + Parallelism(3). + Completions(3). + Request(extraResource, "1"). + Limit(extraResource, "1"). + Obj() + jobKey := client.ObjectKeyFromObject(sampleJob) + sampleJob = (&testingjob.JobWrapper{Job: *sampleJob}). + PodAnnotation(kueuealpha.PodSetRequiredTopologyAnnotation, topologyLevelBlock). + Image(util.E2eTestSleepImage, []string{"100ms"}). + Obj() + gomega.Expect(k8sClient.Create(ctx, sampleJob)).Should(gomega.Succeed()) + + expectJobWithSuspendedAndNodeSelectors(jobKey, false, map[string]string{ + tasNodeGroupLabel: instanceType, + }) + wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(sampleJob.Name, sampleJob.UID), Namespace: ns.Name} + createdWorkload := &kueue.Workload{} + ginkgo.By(fmt.Sprintf("await for admission of workload %q and verify TopologyAssignment", wlLookupKey), func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) + g.Expect(createdWorkload.Status.Admission).ShouldNot(gomega.BeNil()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + gomega.Expect(createdWorkload.Status.Admission.PodSetAssignments).Should(gomega.HaveLen(1)) + gomega.Expect(createdWorkload.Status.Admission.PodSetAssignments[0].TopologyAssignment.Levels).Should(gomega.BeComparableTo( + []string{ + topologyLevelBlock, + topologyLevelRack, + topologyLevelHostname, + }, + )) + podCountPerBlock := map[string]int32{} + for _, d := range createdWorkload.Status.Admission.PodSetAssignments[0].TopologyAssignment.Domains { + podCountPerBlock[d.Values[0]] += d.Count + } + // both pod assignments are in the same block + gomega.Expect(podCountPerBlock).Should(gomega.HaveLen(1)) + // pod assignment count equals job parallelism + for _, pd := range podCountPerBlock { + gomega.Expect(pd).Should(gomega.Equal(ptr.Deref[int32](sampleJob.Spec.Parallelism, 0))) + } + }) + + ginkgo.By(fmt.Sprintf("verify the workload %q gets finished", wlLookupKey), func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) + g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeTrue()) + g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + }) + }) +}) diff --git a/test/e2e/tas/util.go b/test/e2e/tas/util.go new file mode 100644 index 0000000000..610468b87a --- /dev/null +++ b/test/e2e/tas/util.go @@ -0,0 +1,36 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tase2e + +import ( + "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + + "sigs.k8s.io/kueue/test/util" +) + +// +kubebuilder:docs-gen:collapse=Imports +func expectJobWithSuspendedAndNodeSelectors(key types.NamespacedName, suspended bool, ns map[string]string) { + job := &batchv1.Job{} + gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, key, job)).To(gomega.Succeed()) + g.Expect(job.Spec.Suspend).Should(gomega.Equal(ptr.To(suspended))) + g.Expect(job.Spec.Template.Spec.NodeSelector).Should(gomega.Equal(ns)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) +}