Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow scaling system jobs to 0 #24363

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/24363.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core: add the possibility to scale system jobs between 0 and 1
```
25 changes: 25 additions & 0 deletions e2e/scaling/input/namespace_default_system.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: BUSL-1.1

job "system_job" {
type = "system"

group "system_job_group" {

task "system_task" {
driver = "docker"

config {
image = "busybox:1"

command = "/bin/sh"
args = ["-c", "sleep 15000"]
}

env {
version = "1"
}
}
}
}

100 changes: 99 additions & 1 deletion e2e/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)

type ScalingE2ETest struct {
Expand All @@ -27,7 +28,6 @@ func init() {
new(ScalingE2ETest),
},
})

}

func (tc *ScalingE2ETest) BeforeAll(f *framework.F) {
Expand Down Expand Up @@ -165,3 +165,101 @@ func (tc *ScalingE2ETest) TestScalingNamespaces(f *framework.F) {
"Nomad e2e testing", false, nil, &aWriteOpts)
f.NoError(err)
}

// TestScalingBasic performs basic scaling e2e tests within a single namespace using
// using a SystemScheduler.
func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) {
t := f.T()
nomadClient := tc.Nomad()

// Register a system job with a scaling policy without a group count, it should
// default to 1 per node.

jobID := "test-scaling-" + uuid.Generate()[0:8]
e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scaling/input/namespace_default_system.nomad", jobID, "")

jobs := nomadClient.Jobs()
initialAllocs, _, err := jobs.Allocations(jobID, true, nil)
f.NoError(err)

nodeStubList, _, err := nomadClient.Nodes().List(&api.QueryOptions{Namespace: "default"})
f.NoError(err)

// A system job will spawn an allocation per node, we need to know how many nodes
// there are to know how many allocations to expect.
numberOfNodes := len(nodeStubList)

f.Equal(numberOfNodes, len(initialAllocs))
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(initialAllocs)

// Wait for allocations to get past initial pending state
e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs)

// Try to scale beyond 1
testMeta := map[string]interface{}{"scaling-e2e-test": "value"}
scaleResp, _, err := tc.Nomad().Jobs().Scale(jobID, "system_job_group", pointer.Of(3),
"Nomad e2e testing", false, testMeta, nil)

f.Error(err)
f.Nil(scaleResp)

// The same allocs should be running.
jobs = nomadClient.Jobs()
allocs1, _, err := jobs.Allocations(jobID, true, nil)
f.NoError(err)

f.Equal(len(initialAllocs), len(allocs1))

for i, a := range allocs1 {
f.Equal(a.ID, initialAllocs[i].ID)
}

// Scale down to 0
testMeta = map[string]interface{}{"scaling-e2e-test": "value"}
scaleResp, _, err = tc.Nomad().Jobs().Scale(jobID, "system_job_group", pointer.Of(0),
"Nomad e2e testing", false, testMeta, nil)
f.NoError(err)
f.NotEmpty(scaleResp.EvalID)

// Assert job is still up but no allocs are running
stopedAllocs, _, err := jobs.Allocations(jobID, false, nil)
f.NoError(err)

f.Equal(numberOfNodes, len(filterAllocsByDesiredStatus(structs.AllocDesiredStatusStop, stopedAllocs)))
f.Equal(numberOfNodes, len(stopedAllocs))

// Scale up to 1 again
testMeta = map[string]interface{}{"scaling-e2e-test": "value"}
scaleResp, _, err = tc.Nomad().Jobs().Scale(jobID, "system_job_group", pointer.Of(1),
"Nomad e2e testing", false, testMeta, nil)
f.NoError(err)
f.NotEmpty(scaleResp.EvalID)

// Wait for new allocation to get past initial pending state
e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs)

// Assert job is still running and there is a running allocation again
allocs, _, err := jobs.Allocations(jobID, true, nil)
f.NoError(err)
f.Equal(numberOfNodes*2, len(allocs))

f.Equal(numberOfNodes, len(filterAllocsByDesiredStatus(structs.AllocDesiredStatusStop, allocs)))
f.Equal(numberOfNodes, len(filterAllocsByDesiredStatus(structs.AllocDesiredStatusRun, allocs)))

// Remove the job.
_, _, err = tc.Nomad().Jobs().Deregister(jobID, true, nil)
f.NoError(err)
f.NoError(tc.Nomad().System().GarbageCollect())
}

func filterAllocsByDesiredStatus(status string, allocs []*api.AllocationListStub) []*api.AllocationListStub {
res := []*api.AllocationListStub{}

for _, a := range allocs {
if a.DesiredStatus == status {
res = append(res, a)
}
}

return res
}
5 changes: 3 additions & 2 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,8 +1027,9 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
if job == nil {
return structs.NewErrRPCCoded(404, fmt.Sprintf("job %q not found", args.JobID))
}
if job.Type == structs.JobTypeSystem {
return structs.NewErrRPCCoded(http.StatusBadRequest, `cannot scale jobs of type "system"`)

if job.Type == structs.JobTypeSystem && *args.Count > 1 {
mismithhisler marked this conversation as resolved.
Show resolved Hide resolved
return structs.NewErrRPCCoded(http.StatusBadRequest, `jobs of type "system" can only be scaled between 0 and 1`)
}

// Since job is going to be mutated we must copy it since state store methods
Expand Down
29 changes: 26 additions & 3 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8271,20 +8271,43 @@ func TestJobEndpoint_Scale_SystemJob(t *testing.T) {
mockSystemJob := mock.SystemJob()
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 10, nil, mockSystemJob))

// Scale to 0
scaleReq := &structs.JobScaleRequest{
JobID: mockSystemJob.ID,
Target: map[string]string{
structs.ScalingTargetGroup: mockSystemJob.TaskGroups[0].Name,
},
Count: pointer.Of(int64(13)),
Count: pointer.Of(int64(0)),
WriteRequest: structs.WriteRequest{
Region: DefaultRegion,
Namespace: mockSystemJob.Namespace,
},
}
var resp structs.JobRegisterResponse

resp := structs.JobRegisterResponse{}
err := msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp)
must.NoError(t, err)

// Scale to a negative number
scaleReq.Count = pointer.Of(int64(-5))

resp = structs.JobRegisterResponse{}
must.ErrorContains(t, msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp),
`400,scaling action count can't be negative`)

// Scale back to 1
scaleReq.Count = pointer.Of(int64(1))

resp = structs.JobRegisterResponse{}
err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp)
must.NoError(t, err)

// Scale beyond 1
scaleReq.Count = pointer.Of(int64(13))

resp = structs.JobRegisterResponse{}
must.ErrorContains(t, msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp),
`400,cannot scale jobs of type "system"`)
`400,jobs of type "system" can only be scaled between 0 and 1`)
}

func TestJobEndpoint_Scale_BatchJob(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7055,10 +7055,6 @@ func (tg *TaskGroup) Canonicalize(job *Job) {
tg.EphemeralDisk = DefaultEphemeralDisk()
}

if job.Type == JobTypeSystem && tg.Count == 0 {
tg.Count = 1
}

if tg.Scaling != nil {
tg.Scaling.Canonicalize(job, tg, nil)
}
Expand Down