From 63220c8dff259dd256caab0a6b4b249a9243d5a2 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Tue, 5 Nov 2024 11:22:28 +0100 Subject: [PATCH 01/13] func: remove validation scaling for system jobs and dont canonicalize to 1 --- nomad/job_endpoint.go | 5 +++-- nomad/structs/structs.go | 8 ++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index e30ab29a611..f59d4e266ff 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1027,8 +1027,9 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes if job == nil { return structs.NewErrRPCCoded(404, fmt.Sprintf("job %q not found", args.JobID)) } - if job.Type == structs.JobTypeSystem { - return structs.NewErrRPCCoded(http.StatusBadRequest, `cannot scale jobs of type "system"`) + + if job.Type == structs.JobTypeSystem && *args.Count > 1 { + return structs.NewErrRPCCoded(http.StatusBadRequest, `jobs of type "system" can only be scaled to 1 or 0`) } // Since job is going to be mutated we must copy it since state store methods diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ba2872beb87..0b551e50a64 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7055,8 +7055,12 @@ func (tg *TaskGroup) Canonicalize(job *Job) { tg.EphemeralDisk = DefaultEphemeralDisk() } - if job.Type == JobTypeSystem && tg.Count == 0 { - tg.Count = 1 + if job.Type == JobTypeSystem { + if tg.Count > 1 { + tg.Count = 1 + } else if tg.Count < 0 { + tg.Count = 0 + } } if tg.Scaling != nil { From 6f5a2b0e3cb4327a191350fb51833508a75b47e2 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Tue, 5 Nov 2024 11:36:32 +0100 Subject: [PATCH 02/13] test: update test to validate for 0 and improve error message --- nomad/job_endpoint.go | 2 +- nomad/job_endpoint_test.go | 29 ++++++++++++++++++++++++++--- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index f59d4e266ff..7267f0a15b6 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1029,7 +1029,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes } if job.Type == structs.JobTypeSystem && *args.Count > 1 { - return structs.NewErrRPCCoded(http.StatusBadRequest, `jobs of type "system" can only be scaled to 1 or 0`) + return structs.NewErrRPCCoded(http.StatusBadRequest, `jobs of type "system" can only be scaled between 0 and 1`) } // Since job is going to be mutated we must copy it since state store methods diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index f0f8cef1fa2..cd786157468 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -8271,20 +8271,43 @@ func TestJobEndpoint_Scale_SystemJob(t *testing.T) { mockSystemJob := mock.SystemJob() must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 10, nil, mockSystemJob)) + // Scale to 0 scaleReq := &structs.JobScaleRequest{ JobID: mockSystemJob.ID, Target: map[string]string{ structs.ScalingTargetGroup: mockSystemJob.TaskGroups[0].Name, }, - Count: pointer.Of(int64(13)), + Count: pointer.Of(int64(0)), WriteRequest: structs.WriteRequest{ Region: DefaultRegion, Namespace: mockSystemJob.Namespace, }, } - var resp structs.JobRegisterResponse + + resp := structs.JobRegisterResponse{} + err := msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp) + must.NoError(t, err) + + // Scale to a negative number + scaleReq.Count = pointer.Of(int64(-5)) + + resp = structs.JobRegisterResponse{} + must.ErrorContains(t, msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp), + `400,scaling action count can't be negative`) + + // Scale back to 1 + scaleReq.Count = pointer.Of(int64(1)) + + resp = structs.JobRegisterResponse{} + err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp) + must.NoError(t, err) + + // Scale beyond 1 + scaleReq.Count = pointer.Of(int64(13)) + + resp = structs.JobRegisterResponse{} must.ErrorContains(t, msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp), - `400,cannot scale jobs of type "system"`) + `400,jobs of type "system" can only be scaled between 0 and 1`) } func TestJobEndpoint_Scale_BatchJob(t *testing.T) { From af568b0b7ea36fd26b2169215fb7ba3f3d610280 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Tue, 5 Nov 2024 16:38:56 +0100 Subject: [PATCH 03/13] func: remove the canonicalization to 1 from system jobs --- nomad/structs/structs.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 0b551e50a64..21d9c483aa5 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7055,14 +7055,6 @@ func (tg *TaskGroup) Canonicalize(job *Job) { tg.EphemeralDisk = DefaultEphemeralDisk() } - if job.Type == JobTypeSystem { - if tg.Count > 1 { - tg.Count = 1 - } else if tg.Count < 0 { - tg.Count = 0 - } - } - if tg.Scaling != nil { tg.Scaling.Canonicalize(job, tg, nil) } From aae71d8f4ee1b5a49a445219f01fcdaf49a4c20e Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Tue, 5 Nov 2024 17:22:44 +0100 Subject: [PATCH 04/13] docs: add changelog --- .changelog/24363.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/24363.txt diff --git a/.changelog/24363.txt b/.changelog/24363.txt new file mode 100644 index 00000000000..8c7549c85b8 --- /dev/null +++ b/.changelog/24363.txt @@ -0,0 +1,3 @@ +```release-note:improvement +core: add the possibility to scale system jobs between 0 and 1 +``` From 2917d48c7aab4a342bdda0a62242c741f89da035 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Wed, 6 Nov 2024 16:03:46 +0100 Subject: [PATCH 05/13] func: add test for scaling system jobs --- .../input/namespace_default_system.nomad | 39 +++++++++++ e2e/scaling/scaling.go | 70 +++++++++++++++++++ 2 files changed, 109 insertions(+) create mode 100644 e2e/scaling/input/namespace_default_system.nomad diff --git a/e2e/scaling/input/namespace_default_system.nomad b/e2e/scaling/input/namespace_default_system.nomad new file mode 100644 index 00000000000..1e2a4e40ae5 --- /dev/null +++ b/e2e/scaling/input/namespace_default_system.nomad @@ -0,0 +1,39 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: BUSL-1.1 + +job "system_job" { + datacenters = ["dc1"] + + type = "system" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "system_job_group" { + restart { + attempts = 10 + interval = "1m" + + delay = "2s" + mode = "delay" + } + + task "system_task" { + driver = "docker" + + config { + image = "busybox:1" + + command = "/bin/sh" + args = ["-c", "sleep 15000"] + } + + env { + version = "1" + } + } + } +} + diff --git a/e2e/scaling/scaling.go b/e2e/scaling/scaling.go index 10de40b1dd4..98f66c98d8d 100644 --- a/e2e/scaling/scaling.go +++ b/e2e/scaling/scaling.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/nomad/e2e/framework" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" + "github.com/stretchr/testify/require" ) type ScalingE2ETest struct { @@ -165,3 +166,72 @@ func (tc *ScalingE2ETest) TestScalingNamespaces(f *framework.F) { "Nomad e2e testing", false, nil, &aWriteOpts) f.NoError(err) } + +// TestScalingBasic performs basic scaling e2e tests within a single namespace using +// using a SystemScheduler. +func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { + t := f.T() + nomadClient := tc.Nomad() + + // Register a system job with a scaling policy without a group count, it should + // default to 1. + + jobID := "test-scaling-" + uuid.Generate()[0:8] + e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scaling/input/namespace_default_system.nomad", jobID, "") + + jobs := nomadClient.Jobs() + allocs, _, err := jobs.Allocations(jobID, true, nil) + require.NoError(t, err) + require.Equal(t, 1, len(allocs)) + + allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs) + + // Wait for allocations to get past initial pending state + e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs) + + // Try to scale beyond 1 + testMeta := map[string]interface{}{"scaling-e2e-test": "value"} + scaleResp, _, err := tc.Nomad().Jobs().Scale( + jobID, "horizontally_scalable", pointer.Of(3), + "Nomad e2e testing", false, testMeta, nil) + f.Error(err) + f.Empty(scaleResp.EvalID) + + // The same allocs should be running. + jobs = nomadClient.Jobs() + allocs1, _, err := jobs.Allocations(jobID, true, nil) + require.NoError(t, err) + require.EqualValues(t, allocs, allocs1) + + // Scale down to 0 + testMeta = map[string]interface{}{"scaling-e2e-test": "value"} + scaleResp, _, err = tc.Nomad().Jobs().Scale( + jobID, "horizontally_scalable", pointer.Of(0), + "Nomad e2e testing", false, testMeta, nil) + f.NoError(err) + f.NotEmpty(scaleResp.EvalID) + + // Assert job is still running + jobs = nomadClient.Jobs() + allocs, _, err = jobs.Allocations(jobID, true, nil) + require.NoError(t, err) + require.Equal(t, 0, len(allocs)) + + // Scale up to 1 again + testMeta = map[string]interface{}{"scaling-e2e-test": "value"} + scaleResp, _, err = tc.Nomad().Jobs().Scale( + jobID, "horizontally_scalable", pointer.Of(1), + "Nomad e2e testing", false, testMeta, nil) + f.NoError(err) + f.NotEmpty(scaleResp.EvalID) + + // Wait for allocations to get past initial pending state + e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs) + + // Assert job is still running and there is an allocation again + jobs = nomadClient.Jobs() + allocs, _, err = jobs.Allocations(jobID, true, nil) + require.NoError(t, err) + require.Equal(t, 1, len(allocs)) + +} From e81aa488f0b7d7bd374ce1563aa3a98b21c9afe7 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Thu, 7 Nov 2024 12:56:46 +0100 Subject: [PATCH 06/13] temp: add logging to debug test --- e2e/scaling/scaling.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/e2e/scaling/scaling.go b/e2e/scaling/scaling.go index 98f66c98d8d..fa7dde12910 100644 --- a/e2e/scaling/scaling.go +++ b/e2e/scaling/scaling.go @@ -4,6 +4,7 @@ package scaling import ( + "fmt" "os" "github.com/hashicorp/nomad/api" @@ -182,9 +183,15 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { jobs := nomadClient.Jobs() allocs, _, err := jobs.Allocations(jobID, true, nil) require.NoError(t, err) - require.Equal(t, 1, len(allocs)) allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs) + require.Equal(t, 1, len(allocIDs)) + + fmt.Printf("ids %+v\n job %+v", allocIDs, len(allocs)) + + for a := range allocs { + fmt.Printf("alloc %+v\n", a) + } // Wait for allocations to get past initial pending state e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs) From 186bcf7a30ccfc282e7c99d2f79e2678eceaa722 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Thu, 7 Nov 2024 14:47:10 +0100 Subject: [PATCH 07/13] fix: clean up after test is done --- e2e/scaling/scaling.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/e2e/scaling/scaling.go b/e2e/scaling/scaling.go index fa7dde12910..d8e6e5d46c4 100644 --- a/e2e/scaling/scaling.go +++ b/e2e/scaling/scaling.go @@ -241,4 +241,10 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { require.NoError(t, err) require.Equal(t, 1, len(allocs)) + // Remove the job. + _, _, err = tc.Nomad().Jobs().Deregister(jobID, true, nil) + f.NoError(err) + f.NoError(tc.Nomad().System().GarbageCollect()) + tc.namespacedJobIDs = [][2]string{} + } From 215746ae7ae158e7cc07c09f6809c5414d77cb81 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Thu, 7 Nov 2024 17:27:09 +0100 Subject: [PATCH 08/13] fix: scaled down jobs will still have the stop allocation, update test to account for it --- e2e/scaling/scaling.go | 42 +++++++++++++++++++----------------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/e2e/scaling/scaling.go b/e2e/scaling/scaling.go index d8e6e5d46c4..b713223fa18 100644 --- a/e2e/scaling/scaling.go +++ b/e2e/scaling/scaling.go @@ -4,7 +4,6 @@ package scaling import ( - "fmt" "os" "github.com/hashicorp/nomad/api" @@ -12,6 +11,7 @@ import ( "github.com/hashicorp/nomad/e2e/framework" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/require" ) @@ -184,25 +184,19 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { allocs, _, err := jobs.Allocations(jobID, true, nil) require.NoError(t, err) + require.Equal(t, 1, len(allocs)) allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs) - require.Equal(t, 1, len(allocIDs)) - - fmt.Printf("ids %+v\n job %+v", allocIDs, len(allocs)) - - for a := range allocs { - fmt.Printf("alloc %+v\n", a) - } // Wait for allocations to get past initial pending state e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs) // Try to scale beyond 1 testMeta := map[string]interface{}{"scaling-e2e-test": "value"} - scaleResp, _, err := tc.Nomad().Jobs().Scale( - jobID, "horizontally_scalable", pointer.Of(3), + scaleResp, _, err := tc.Nomad().Jobs().Scale(jobID, "system_job_group", pointer.Of(3), "Nomad e2e testing", false, testMeta, nil) + f.Error(err) - f.Empty(scaleResp.EvalID) + f.Nil(scaleResp) // The same allocs should be running. jobs = nomadClient.Jobs() @@ -212,39 +206,41 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { // Scale down to 0 testMeta = map[string]interface{}{"scaling-e2e-test": "value"} - scaleResp, _, err = tc.Nomad().Jobs().Scale( - jobID, "horizontally_scalable", pointer.Of(0), + scaleResp, _, err = tc.Nomad().Jobs().Scale(jobID, "system_job_group", pointer.Of(0), "Nomad e2e testing", false, testMeta, nil) f.NoError(err) f.NotEmpty(scaleResp.EvalID) - // Assert job is still running + // Assert job is still up but no allocs are running jobs = nomadClient.Jobs() - allocs, _, err = jobs.Allocations(jobID, true, nil) + allocs, _, err = jobs.Allocations(jobID, false, nil) require.NoError(t, err) - require.Equal(t, 0, len(allocs)) + require.Equal(t, 1, len(allocs)) + + stopedAlloc := allocs[0].ID + e2eutil.WaitForAllocStatus(t, nomadClient, stopedAlloc, structs.AllocClientStatusComplete) // Scale up to 1 again testMeta = map[string]interface{}{"scaling-e2e-test": "value"} - scaleResp, _, err = tc.Nomad().Jobs().Scale( - jobID, "horizontally_scalable", pointer.Of(1), + scaleResp, _, err = tc.Nomad().Jobs().Scale(jobID, "system_job_group", pointer.Of(1), "Nomad e2e testing", false, testMeta, nil) f.NoError(err) f.NotEmpty(scaleResp.EvalID) - // Wait for allocations to get past initial pending state + // Wait for new allocation to get past initial pending state e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs) - // Assert job is still running and there is an allocation again + // Assert job is still running and there is a running allocation again jobs = nomadClient.Jobs() allocs, _, err = jobs.Allocations(jobID, true, nil) require.NoError(t, err) - require.Equal(t, 1, len(allocs)) + require.Equal(t, 2, len(allocs)) + + require.Equal(t, allocs[1].DesiredStatus, structs.AllocDesiredStatusStop) + require.Equal(t, allocs[0].DesiredStatus, structs.AllocDesiredStatusRun) // Remove the job. _, _, err = tc.Nomad().Jobs().Deregister(jobID, true, nil) f.NoError(err) f.NoError(tc.Nomad().System().GarbageCollect()) - tc.namespacedJobIDs = [][2]string{} - } From 77bf2271b3717a7c01ec49969c7d83487f04d388 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Tue, 12 Nov 2024 12:16:17 +0100 Subject: [PATCH 09/13] Update the e2e test to accomodate for system jobs to have an alloc per node --- e2e/scaling/scaling.go | 53 ++++++++++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/e2e/scaling/scaling.go b/e2e/scaling/scaling.go index b713223fa18..bde3f0f1ae3 100644 --- a/e2e/scaling/scaling.go +++ b/e2e/scaling/scaling.go @@ -175,17 +175,24 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { nomadClient := tc.Nomad() // Register a system job with a scaling policy without a group count, it should - // default to 1. + // default to 1 per node. jobID := "test-scaling-" + uuid.Generate()[0:8] e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scaling/input/namespace_default_system.nomad", jobID, "") jobs := nomadClient.Jobs() - allocs, _, err := jobs.Allocations(jobID, true, nil) + initialAllocs, _, err := jobs.Allocations(jobID, true, nil) + require.NoError(t, err) + + nodeStubList, _, err := nomadClient.Nodes().List(&api.QueryOptions{Namespace: "default"}) require.NoError(t, err) - require.Equal(t, 1, len(allocs)) - allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs) + // A system job will spawn an allocation per node, we need to know how many nodes + // there are to know how many allocations to expect. + numberOfNodes := len(nodeStubList) + + require.Equal(t, numberOfNodes, len(initialAllocs)) + allocIDs := e2eutil.AllocIDsFromAllocationListStubs(initialAllocs) // Wait for allocations to get past initial pending state e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs) @@ -202,7 +209,7 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { jobs = nomadClient.Jobs() allocs1, _, err := jobs.Allocations(jobID, true, nil) require.NoError(t, err) - require.EqualValues(t, allocs, allocs1) + require.EqualValues(t, initialAllocs, allocs1) // Scale down to 0 testMeta = map[string]interface{}{"scaling-e2e-test": "value"} @@ -212,13 +219,18 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { f.NotEmpty(scaleResp.EvalID) // Assert job is still up but no allocs are running - jobs = nomadClient.Jobs() - allocs, _, err = jobs.Allocations(jobID, false, nil) + stopedAllocs, _, err := jobs.Allocations(jobID, false, nil) + require.NoError(t, err) + + for _, alloc := range stopedAllocs { + e2eutil.WaitForAllocStatus(t, nomadClient, alloc.ID, structs.AllocClientStatusComplete) + } + + stopedAllocs, _, err = jobs.Allocations(jobID, false, nil) require.NoError(t, err) - require.Equal(t, 1, len(allocs)) - stopedAlloc := allocs[0].ID - e2eutil.WaitForAllocStatus(t, nomadClient, stopedAlloc, structs.AllocClientStatusComplete) + require.Equal(t, numberOfNodes, len(filterByDesiredStatus(structs.AllocDesiredStatusStop, stopedAllocs))) + require.Equal(t, numberOfNodes, len(stopedAllocs)) // Scale up to 1 again testMeta = map[string]interface{}{"scaling-e2e-test": "value"} @@ -231,16 +243,27 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs) // Assert job is still running and there is a running allocation again - jobs = nomadClient.Jobs() - allocs, _, err = jobs.Allocations(jobID, true, nil) + allocs, _, err := jobs.Allocations(jobID, true, nil) require.NoError(t, err) - require.Equal(t, 2, len(allocs)) + require.Equal(t, numberOfNodes*2, len(allocs)) - require.Equal(t, allocs[1].DesiredStatus, structs.AllocDesiredStatusStop) - require.Equal(t, allocs[0].DesiredStatus, structs.AllocDesiredStatusRun) + require.Equal(t, numberOfNodes, len(filterByDesiredStatus(structs.AllocDesiredStatusStop, allocs))) + require.Equal(t, numberOfNodes, len(filterByDesiredStatus(structs.AllocDesiredStatusRun, allocs))) // Remove the job. _, _, err = tc.Nomad().Jobs().Deregister(jobID, true, nil) f.NoError(err) f.NoError(tc.Nomad().System().GarbageCollect()) } + +func filterByDesiredStatus(status string, allocs []*api.AllocationListStub) []*api.AllocationListStub { + res := []*api.AllocationListStub{} + + for _, a := range allocs { + if a.DesiredStatus == status { + res = append(res, a) + } + } + + return res +} From f6ee8bb6414769974c0381b97681e75e6238ece4 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Wed, 13 Nov 2024 10:31:28 +0100 Subject: [PATCH 10/13] fix: filter to only count ready nodes on the node count --- e2e/scaling/scaling.go | 44 ++++++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/e2e/scaling/scaling.go b/e2e/scaling/scaling.go index bde3f0f1ae3..c9d868716f5 100644 --- a/e2e/scaling/scaling.go +++ b/e2e/scaling/scaling.go @@ -12,7 +12,6 @@ import ( "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/require" ) type ScalingE2ETest struct { @@ -29,7 +28,6 @@ func init() { new(ScalingE2ETest), }, }) - } func (tc *ScalingE2ETest) BeforeAll(f *framework.F) { @@ -182,16 +180,16 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { jobs := nomadClient.Jobs() initialAllocs, _, err := jobs.Allocations(jobID, true, nil) - require.NoError(t, err) + f.NoError(err) nodeStubList, _, err := nomadClient.Nodes().List(&api.QueryOptions{Namespace: "default"}) - require.NoError(t, err) + f.NoError(err) // A system job will spawn an allocation per node, we need to know how many nodes // there are to know how many allocations to expect. - numberOfNodes := len(nodeStubList) + numberOfNodes := len(filterNodeByStatus(api.NodeStatusReady, nodeStubList)) - require.Equal(t, numberOfNodes, len(initialAllocs)) + f.Equal(numberOfNodes, len(initialAllocs)) allocIDs := e2eutil.AllocIDsFromAllocationListStubs(initialAllocs) // Wait for allocations to get past initial pending state @@ -208,8 +206,8 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { // The same allocs should be running. jobs = nomadClient.Jobs() allocs1, _, err := jobs.Allocations(jobID, true, nil) - require.NoError(t, err) - require.EqualValues(t, initialAllocs, allocs1) + f.NoError(err) + f.EqualValues(initialAllocs, allocs1) // Scale down to 0 testMeta = map[string]interface{}{"scaling-e2e-test": "value"} @@ -220,17 +218,17 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { // Assert job is still up but no allocs are running stopedAllocs, _, err := jobs.Allocations(jobID, false, nil) - require.NoError(t, err) + f.NoError(err) for _, alloc := range stopedAllocs { e2eutil.WaitForAllocStatus(t, nomadClient, alloc.ID, structs.AllocClientStatusComplete) } stopedAllocs, _, err = jobs.Allocations(jobID, false, nil) - require.NoError(t, err) + f.NoError(err) - require.Equal(t, numberOfNodes, len(filterByDesiredStatus(structs.AllocDesiredStatusStop, stopedAllocs))) - require.Equal(t, numberOfNodes, len(stopedAllocs)) + f.Equal(numberOfNodes, len(filterAllocsByDesiredStatus(structs.AllocDesiredStatusStop, stopedAllocs))) + f.Equal(numberOfNodes, len(stopedAllocs)) // Scale up to 1 again testMeta = map[string]interface{}{"scaling-e2e-test": "value"} @@ -244,11 +242,11 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { // Assert job is still running and there is a running allocation again allocs, _, err := jobs.Allocations(jobID, true, nil) - require.NoError(t, err) - require.Equal(t, numberOfNodes*2, len(allocs)) + f.NoError(err) + f.Equal(numberOfNodes*2, len(allocs)) - require.Equal(t, numberOfNodes, len(filterByDesiredStatus(structs.AllocDesiredStatusStop, allocs))) - require.Equal(t, numberOfNodes, len(filterByDesiredStatus(structs.AllocDesiredStatusRun, allocs))) + f.Equal(numberOfNodes, len(filterAllocsByDesiredStatus(structs.AllocDesiredStatusStop, allocs))) + f.Equal(numberOfNodes, len(filterAllocsByDesiredStatus(structs.AllocDesiredStatusRun, allocs))) // Remove the job. _, _, err = tc.Nomad().Jobs().Deregister(jobID, true, nil) @@ -256,7 +254,7 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { f.NoError(tc.Nomad().System().GarbageCollect()) } -func filterByDesiredStatus(status string, allocs []*api.AllocationListStub) []*api.AllocationListStub { +func filterAllocsByDesiredStatus(status string, allocs []*api.AllocationListStub) []*api.AllocationListStub { res := []*api.AllocationListStub{} for _, a := range allocs { @@ -267,3 +265,15 @@ func filterByDesiredStatus(status string, allocs []*api.AllocationListStub) []*a return res } + +func filterNodeByStatus(status string, nodes []*api.NodeListStub) []*api.NodeListStub { + res := []*api.NodeListStub{} + + for _, n := range nodes { + if n.Status == status { + res = append(res, n) + } + } + + return res +} From a91726bdd885ff3b0469cd238a48f7d1dbf1178e Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Wed, 13 Nov 2024 11:09:22 +0100 Subject: [PATCH 11/13] fix: remove the datacenter constrain from the system job definition --- e2e/scaling/input/namespace_default_system.nomad | 14 -------------- e2e/scaling/scaling.go | 14 +------------- 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/e2e/scaling/input/namespace_default_system.nomad b/e2e/scaling/input/namespace_default_system.nomad index 1e2a4e40ae5..75a22af8653 100644 --- a/e2e/scaling/input/namespace_default_system.nomad +++ b/e2e/scaling/input/namespace_default_system.nomad @@ -2,23 +2,9 @@ # SPDX-License-Identifier: BUSL-1.1 job "system_job" { - datacenters = ["dc1"] - type = "system" - constraint { - attribute = "${attr.kernel.name}" - value = "linux" - } - group "system_job_group" { - restart { - attempts = 10 - interval = "1m" - - delay = "2s" - mode = "delay" - } task "system_task" { driver = "docker" diff --git a/e2e/scaling/scaling.go b/e2e/scaling/scaling.go index c9d868716f5..6570f8c573f 100644 --- a/e2e/scaling/scaling.go +++ b/e2e/scaling/scaling.go @@ -187,7 +187,7 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { // A system job will spawn an allocation per node, we need to know how many nodes // there are to know how many allocations to expect. - numberOfNodes := len(filterNodeByStatus(api.NodeStatusReady, nodeStubList)) + numberOfNodes := len(nodeStubList) f.Equal(numberOfNodes, len(initialAllocs)) allocIDs := e2eutil.AllocIDsFromAllocationListStubs(initialAllocs) @@ -265,15 +265,3 @@ func filterAllocsByDesiredStatus(status string, allocs []*api.AllocationListStub return res } - -func filterNodeByStatus(status string, nodes []*api.NodeListStub) []*api.NodeListStub { - res := []*api.NodeListStub{} - - for _, n := range nodes { - if n.Status == status { - res = append(res, n) - } - } - - return res -} From 194d4e91ccabfaf045c9dcac09c17e3ca6ccca68 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Wed, 13 Nov 2024 12:05:38 +0100 Subject: [PATCH 12/13] fix: compare alloc IDs to avoid flaky tests when verifying no alloc was stoped --- e2e/scaling/scaling.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/e2e/scaling/scaling.go b/e2e/scaling/scaling.go index 6570f8c573f..dd3260fff4e 100644 --- a/e2e/scaling/scaling.go +++ b/e2e/scaling/scaling.go @@ -207,7 +207,12 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { jobs = nomadClient.Jobs() allocs1, _, err := jobs.Allocations(jobID, true, nil) f.NoError(err) - f.EqualValues(initialAllocs, allocs1) + + f.Equal(len(initialAllocs), len(allocs1)) + + for i, a := range allocs1 { + f.Equal(a.ID, initialAllocs[i].ID) + } // Scale down to 0 testMeta = map[string]interface{}{"scaling-e2e-test": "value"} @@ -220,10 +225,6 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { stopedAllocs, _, err := jobs.Allocations(jobID, false, nil) f.NoError(err) - for _, alloc := range stopedAllocs { - e2eutil.WaitForAllocStatus(t, nomadClient, alloc.ID, structs.AllocClientStatusComplete) - } - stopedAllocs, _, err = jobs.Allocations(jobID, false, nil) f.NoError(err) From 2e99c71d7353898f613e683b1668a09c6cbaa419 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Wed, 13 Nov 2024 14:54:47 +0100 Subject: [PATCH 13/13] fix: remove duplicated code --- e2e/scaling/scaling.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/e2e/scaling/scaling.go b/e2e/scaling/scaling.go index dd3260fff4e..5b3580e03bd 100644 --- a/e2e/scaling/scaling.go +++ b/e2e/scaling/scaling.go @@ -225,9 +225,6 @@ func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { stopedAllocs, _, err := jobs.Allocations(jobID, false, nil) f.NoError(err) - stopedAllocs, _, err = jobs.Allocations(jobID, false, nil) - f.NoError(err) - f.Equal(numberOfNodes, len(filterAllocsByDesiredStatus(structs.AllocDesiredStatusStop, stopedAllocs))) f.Equal(numberOfNodes, len(stopedAllocs))