Skip to content

Commit

Permalink
Backport of tests: update disconnected client scheduler tests to avoi…
Browse files Browse the repository at this point in the history
…d blocking into release/1.7.x (#20617)

While working on #20462, I discovered that some of the scheduler tests for
disconnected clients making long blocking queries. The tests used
`testutil.WaitForResult` to wait for an evaluation to be written to the state
store. The evaluation was never written, but the tests were not correctly
returning an error for an empty query. This resulted in the tests blocking for
5s and then continuing anyways.

In practice, the evaluation is never written to the state store as part of the
test harness `Process` method, so this test assertion was meaningless. Remove
the broken assertion from the two top-level tests that used it, and upgrade
these tests to use `shoenig/test` in the process. This will save us ~50s per
test run.

Co-authored-by: Tim Gross <[email protected]>
  • Loading branch information
hc-github-team-nomad-core and tgross authored May 16, 2024
1 parent ece96b6 commit a12eefb
Showing 1 changed file with 73 additions and 121 deletions.
194 changes: 73 additions & 121 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -3644,43 +3643,60 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
ci.Parallel(t)

cases := []struct {
stop time.Duration
name string
jobSpecFn func(*structs.Job)
when time.Time
rescheduled bool
}{
{
name: "legacy no stop_after_client_disconnect with reschedule",
jobSpecFn: func(job *structs.Job) {
job.TaskGroups[0].Count = 1
job.TaskGroups[0].StopAfterClientDisconnect = nil
},
rescheduled: true,
},
{
stop: 1 * time.Second,
name: "legacy stop_after_client_disconnect without reschedule",
jobSpecFn: func(job *structs.Job) {
job.TaskGroups[0].Count = 1
job.TaskGroups[0].StopAfterClientDisconnect = pointer.Of(1 * time.Second)
},
rescheduled: false,
},
{
stop: 1 * time.Second,
name: "legacy stop_after_client_disconnect with reschedule",
jobSpecFn: func(job *structs.Job) {
job.TaskGroups[0].Count = 1
job.TaskGroups[0].StopAfterClientDisconnect = pointer.Of(1 * time.Second)
},
when: time.Now().UTC().Add(-10 * time.Second),
rescheduled: true,
},
{
stop: 1 * time.Second,
name: "legacy stop_after_client_disconnect reschedule later",
jobSpecFn: func(job *structs.Job) {
job.TaskGroups[0].Count = 1
job.TaskGroups[0].StopAfterClientDisconnect = pointer.Of(1 * time.Second)
},
when: time.Now().UTC().Add(10 * time.Minute),
rescheduled: false,
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf(""), func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
h := NewHarness(t)

// Node, which is down
node := mock.Node()
node.Status = structs.NodeStatusDown
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

// Job with allocations and stop_after_client_disconnect
job := mock.Job()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].StopAfterClientDisconnect = &tc.stop
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
tc.jobSpecFn(job)
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))

// Alloc for the running group
alloc := mock.Alloc()
Expand All @@ -3698,7 +3714,7 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
}}
}
allocs := []*structs.Allocation{alloc}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))

// Create a mock evaluation to deal with drain
evals := []*structs.Evaluation{{
Expand All @@ -3711,93 +3727,69 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
Status: structs.EvalStatusPending,
}}
eval := evals[0]
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))

// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
require.Equal(t, h.Evals[0].Status, structs.EvalStatusComplete)
require.Len(t, h.Plans, 1, "plan")
must.NoError(t, err)
must.Eq(t, h.Evals[0].Status, structs.EvalStatusComplete)
must.Len(t, 1, h.Plans, must.Sprint("expected a plan"))

// One followup eval created, either delayed or blocked
require.Len(t, h.CreateEvals, 1)
must.Len(t, 1, h.CreateEvals)
e := h.CreateEvals[0]
require.Equal(t, eval.ID, e.PreviousEval)
must.Eq(t, eval.ID, e.PreviousEval)

if tc.rescheduled {
require.Equal(t, "blocked", e.Status)
must.Eq(t, "blocked", e.Status)
} else {
require.Equal(t, "pending", e.Status)
require.NotEmpty(t, e.WaitUntil)
must.Eq(t, "pending", e.Status)
must.NotEq(t, time.Time{}, e.WaitUntil)
}

// This eval is still being inserted in the state store
ws := memdb.NewWatchSet()
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(ws, e.ID)
if err != nil {
return false, err
}
if found == nil {
return false, nil
}
return true, nil
}, func(err error) {
require.NoError(t, err)
})

alloc, err = h.State.AllocByID(ws, alloc.ID)
require.NoError(t, err)
alloc, err = h.State.AllocByID(nil, alloc.ID)
must.NoError(t, err)

// Allocations have been transitioned to lost
require.Equal(t, structs.AllocDesiredStatusStop, alloc.DesiredStatus)
require.Equal(t, structs.AllocClientStatusLost, alloc.ClientStatus)
must.Eq(t, structs.AllocDesiredStatusStop, alloc.DesiredStatus)
must.Eq(t, structs.AllocClientStatusLost, alloc.ClientStatus)
// At least 1, 2 if we manually set the tc.when
require.NotEmpty(t, alloc.AllocStates)
must.SliceNotEmpty(t, alloc.AllocStates)

if tc.rescheduled {
// Register a new node, leave it up, process the followup eval
node = mock.Node()
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
require.NoError(t, h.Process(NewServiceScheduler, eval))

as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)

testutil.WaitForResult(func() (bool, error) {
as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
if err != nil {
return false, err
}
return len(as) == 2, nil
}, func(err error) {
require.NoError(t, err)
})
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
must.NoError(t, h.Process(NewServiceScheduler, eval))

as, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false)
must.NoError(t, err)
must.Len(t, 2, as)

a2 := as[0]
if a2.ID == alloc.ID {
a2 = as[1]
}

require.Equal(t, structs.AllocClientStatusPending, a2.ClientStatus)
require.Equal(t, structs.AllocDesiredStatusRun, a2.DesiredStatus)
require.Equal(t, node.ID, a2.NodeID)
must.Eq(t, structs.AllocClientStatusPending, a2.ClientStatus)
must.Eq(t, structs.AllocDesiredStatusRun, a2.DesiredStatus)
must.Eq(t, node.ID, a2.NodeID)

// No blocked evals
require.Empty(t, h.ReblockEvals)
require.Len(t, h.CreateEvals, 1)
require.Equal(t, h.CreateEvals[0].ID, e.ID)
must.SliceEmpty(t, h.ReblockEvals)
must.Len(t, 1, h.CreateEvals)
must.Eq(t, h.CreateEvals[0].ID, e.ID)
} else {
// No new alloc was created
as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
as, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false)
must.NoError(t, err)

require.Len(t, as, 1)
must.Len(t, 1, as)
old := as[0]

require.Equal(t, alloc.ID, old.ID)
require.Equal(t, structs.AllocClientStatusLost, old.ClientStatus)
require.Equal(t, structs.AllocDesiredStatusStop, old.DesiredStatus)
must.Eq(t, alloc.ID, old.ID)
must.Eq(t, structs.AllocClientStatusLost, old.ClientStatus)
must.Eq(t, structs.AllocDesiredStatusStop, old.DesiredStatus)
}
})
}
Expand Down Expand Up @@ -7226,7 +7218,7 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)

// Now disconnect the node
disconnectedNode.Status = structs.NodeStatusDisconnected
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), disconnectedNode))
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), disconnectedNode))

// Create an evaluation triggered by the disconnect
evals := []*structs.Evaluation{{
Expand All @@ -7240,72 +7232,32 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
}}

nodeStatusUpdateEval := evals[0]
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))

// Process the evaluation
err := h.Process(NewServiceScheduler, nodeStatusUpdateEval)
require.NoError(t, err)
require.Equal(t, structs.EvalStatusComplete, h.Evals[0].Status)
require.Len(t, h.Plans, 1, "plan")
must.NoError(t, err)
must.Eq(t, structs.EvalStatusComplete, h.Evals[0].Status)
must.Len(t, 1, h.Plans, must.Sprint("expected plan"))

// Two followup delayed eval created
require.Len(t, h.CreateEvals, 2)
must.Len(t, 2, h.CreateEvals)
followUpEval1 := h.CreateEvals[0]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval1.PreviousEval)
require.Equal(t, "pending", followUpEval1.Status)
require.NotEmpty(t, followUpEval1.WaitUntil)
must.Eq(t, nodeStatusUpdateEval.ID, followUpEval1.PreviousEval)
must.Eq(t, "pending", followUpEval1.Status)
must.NotEq(t, time.Time{}, followUpEval1.WaitUntil)

followUpEval2 := h.CreateEvals[1]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval2.PreviousEval)
require.Equal(t, "pending", followUpEval2.Status)
require.NotEmpty(t, followUpEval2.WaitUntil)

// Insert eval1 in the state store
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(nil, followUpEval1.ID)
if err != nil {
return false, err
}
if found == nil {
return false, nil
}

require.Equal(t, nodeStatusUpdateEval.ID, found.PreviousEval)
require.Equal(t, "pending", found.Status)
require.NotEmpty(t, found.WaitUntil)

return true, nil
}, func(err error) {

require.NoError(t, err)
})

// Insert eval2 in the state store
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(nil, followUpEval2.ID)
if err != nil {
return false, err
}
if found == nil {
return false, nil
}

require.Equal(t, nodeStatusUpdateEval.ID, found.PreviousEval)
require.Equal(t, "pending", found.Status)
require.NotEmpty(t, found.WaitUntil)

return true, nil
}, func(err error) {

require.NoError(t, err)
})
must.Eq(t, nodeStatusUpdateEval.ID, followUpEval2.PreviousEval)
must.Eq(t, "pending", followUpEval2.Status)
must.NotEq(t, time.Time{}, followUpEval2.WaitUntil)

// Validate that the ClientStatus updates are part of the plan.
require.Len(t, h.Plans[0].NodeAllocation[disconnectedNode.ID], count)
must.Len(t, count, h.Plans[0].NodeAllocation[disconnectedNode.ID])
// Pending update should have unknown status.

for _, nodeAlloc := range h.Plans[0].NodeAllocation[disconnectedNode.ID] {
require.Equal(t, nodeAlloc.ClientStatus, structs.AllocClientStatusUnknown)
must.Eq(t, nodeAlloc.ClientStatus, structs.AllocClientStatusUnknown)
}

// Simulate that NodeAllocation got processed.
Expand Down

0 comments on commit a12eefb

Please sign in to comment.