Skip to content

Commit

Permalink
tests: update disconnected client scheduler tests to avoid blocking
Browse files Browse the repository at this point in the history
While working on #20462, I discovered that some of the scheduler tests for
disconnected clients making long blocking queries. The tests used
`testutil.WaitForResult` to wait for an evaluation to be written to the state
store. The evaluation was never written, but the tests were not correctly
returning an error for an empty query. This resulted in the tests blocking for
5s and then continuing anyways.

In practice, the evaluation is never written to the state store as part of the
test harness `Process` method, so this test assertion was meaningless. Remove
the broken assertion from the two top-level tests that used it, and upgrade
these tests to use `shoenig/test` in the process. This will save us ~50s per
test run.
  • Loading branch information
tgross committed May 16, 2024
1 parent c8c67da commit 70a9a8c
Showing 1 changed file with 69 additions and 124 deletions.
193 changes: 69 additions & 124 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -3644,27 +3643,31 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
ci.Parallel(t)

cases := []struct {
name string
jobSpecFn func(*structs.Job)
when time.Time
rescheduled bool
}{
// Test using stop_after_client_disconnect, remove after its deprecated in favor
// of Disconnect.StopOnClientAfter introduced in 1.8.0.
{
rescheduled: true,
name: "legacy no stop_after_client_disconnect with reschedule",
jobSpecFn: func(job *structs.Job) {
job.TaskGroups[0].Count = 1
job.TaskGroups[0].StopAfterClientDisconnect = nil
},
rescheduled: true,
},
{
name: "legacy stop_after_client_disconnect without reschedule",
jobSpecFn: func(job *structs.Job) {
job.TaskGroups[0].Count = 1
job.TaskGroups[0].StopAfterClientDisconnect = pointer.Of(1 * time.Second)
},
rescheduled: false,
},
{
name: "legacy stop_after_client_disconnect with reschedule",
jobSpecFn: func(job *structs.Job) {
job.TaskGroups[0].Count = 1
job.TaskGroups[0].StopAfterClientDisconnect = pointer.Of(1 * time.Second)
Expand All @@ -3673,6 +3676,7 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
rescheduled: true,
},
{
name: "legacy stop_after_client_disconnect reschedule later",
jobSpecFn: func(job *structs.Job) {
job.TaskGroups[0].Count = 1
job.TaskGroups[0].StopAfterClientDisconnect = pointer.Of(1 * time.Second)
Expand All @@ -3682,15 +3686,17 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
},
// Tests using the new disconnect block
{
rescheduled: true,
name: "no StopOnClientAfter with reschedule",
jobSpecFn: func(job *structs.Job) {
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Disconnect = &structs.DisconnectStrategy{
StopOnClientAfter: nil,
}
},
rescheduled: true,
},
{
name: "StopOnClientAfter without reschedule",
jobSpecFn: func(job *structs.Job) {
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Disconnect = &structs.DisconnectStrategy{
Expand All @@ -3700,6 +3706,7 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
rescheduled: false,
},
{
name: "StopOnClientAfter with reschedule",
jobSpecFn: func(job *structs.Job) {
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Disconnect = &structs.DisconnectStrategy{
Expand All @@ -3710,6 +3717,7 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
rescheduled: true,
},
{
name: "StopOnClientAfter reschedule later",
jobSpecFn: func(job *structs.Job) {
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Disconnect = &structs.DisconnectStrategy{
Expand All @@ -3722,18 +3730,18 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
}

for i, tc := range cases {
t.Run(fmt.Sprintf(""), func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
h := NewHarness(t)

// Node, which is down
node := mock.Node()
node.Status = structs.NodeStatusDown
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

job := mock.Job()

tc.jobSpecFn(job)
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))

// Alloc for the running group
alloc := mock.Alloc()
Expand All @@ -3751,7 +3759,7 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
}}
}
allocs := []*structs.Allocation{alloc}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))

// Create a mock evaluation to deal with drain
evals := []*structs.Evaluation{{
Expand All @@ -3764,93 +3772,69 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
Status: structs.EvalStatusPending,
}}
eval := evals[0]
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))

// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
require.Equal(t, h.Evals[0].Status, structs.EvalStatusComplete)
require.Len(t, h.Plans, 1, "plan")
must.NoError(t, err)
must.Eq(t, h.Evals[0].Status, structs.EvalStatusComplete)
must.Len(t, 1, h.Plans, must.Sprint("expected a plan"))

// One followup eval created, either delayed or blocked
require.Len(t, h.CreateEvals, 1)
must.Len(t, 1, h.CreateEvals)
e := h.CreateEvals[0]
require.Equal(t, eval.ID, e.PreviousEval)
must.Eq(t, eval.ID, e.PreviousEval)

if tc.rescheduled {
require.Equal(t, "blocked", e.Status)
must.Eq(t, "blocked", e.Status)
} else {
require.Equal(t, "pending", e.Status)
require.NotEmpty(t, e.WaitUntil)
must.Eq(t, "pending", e.Status)
must.NotEq(t, time.Time{}, e.WaitUntil)
}

// This eval is still being inserted in the state store
ws := memdb.NewWatchSet()
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(ws, e.ID)
if err != nil {
return false, err
}
if found == nil {
return false, nil
}
return true, nil
}, func(err error) {
require.NoError(t, err)
})

alloc, err = h.State.AllocByID(ws, alloc.ID)
require.NoError(t, err)
alloc, err = h.State.AllocByID(nil, alloc.ID)
must.NoError(t, err)

// Allocations have been transitioned to lost
require.Equal(t, structs.AllocDesiredStatusStop, alloc.DesiredStatus)
require.Equal(t, structs.AllocClientStatusLost, alloc.ClientStatus)
must.Eq(t, structs.AllocDesiredStatusStop, alloc.DesiredStatus)
must.Eq(t, structs.AllocClientStatusLost, alloc.ClientStatus)
// At least 1, 2 if we manually set the tc.when
require.NotEmpty(t, alloc.AllocStates)
must.SliceNotEmpty(t, alloc.AllocStates)

if tc.rescheduled {
// Register a new node, leave it up, process the followup eval
node = mock.Node()
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
require.NoError(t, h.Process(NewServiceScheduler, eval))

as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)

testutil.WaitForResult(func() (bool, error) {
as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
if err != nil {
return false, err
}
return len(as) == 2, nil
}, func(err error) {
require.NoError(t, err)
})
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
must.NoError(t, h.Process(NewServiceScheduler, eval))

as, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false)
must.NoError(t, err)
must.Len(t, 2, as)

a2 := as[0]
if a2.ID == alloc.ID {
a2 = as[1]
}

require.Equal(t, structs.AllocClientStatusPending, a2.ClientStatus)
require.Equal(t, structs.AllocDesiredStatusRun, a2.DesiredStatus)
require.Equal(t, node.ID, a2.NodeID)
must.Eq(t, structs.AllocClientStatusPending, a2.ClientStatus)
must.Eq(t, structs.AllocDesiredStatusRun, a2.DesiredStatus)
must.Eq(t, node.ID, a2.NodeID)

// No blocked evals
require.Empty(t, h.ReblockEvals)
require.Len(t, h.CreateEvals, 1)
require.Equal(t, h.CreateEvals[0].ID, e.ID)
must.SliceEmpty(t, h.ReblockEvals)
must.Len(t, 1, h.CreateEvals)
must.Eq(t, h.CreateEvals[0].ID, e.ID)
} else {
// No new alloc was created
as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
as, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false)
must.NoError(t, err)

require.Len(t, as, 1)
must.Len(t, 1, as)
old := as[0]

require.Equal(t, alloc.ID, old.ID)
require.Equal(t, structs.AllocClientStatusLost, old.ClientStatus)
require.Equal(t, structs.AllocDesiredStatusStop, old.DesiredStatus)
must.Eq(t, alloc.ID, old.ID)
must.Eq(t, structs.AllocClientStatusLost, old.ClientStatus)
must.Eq(t, structs.AllocDesiredStatusStop, old.DesiredStatus)
}
})
}
Expand Down Expand Up @@ -7307,14 +7291,14 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)

job := version.jobSpec(maxClientDisconnect)
job.TaskGroups[0].Count = count
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))

disconnectedNode, job, unknownAllocs := initNodeAndAllocs(t, h, job,
structs.NodeStatusReady, structs.AllocClientStatusRunning)

// Now disconnect the node
disconnectedNode.Status = structs.NodeStatusDisconnected
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), disconnectedNode))
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), disconnectedNode))

// Create an evaluation triggered by the disconnect
evals := []*structs.Evaluation{{
Expand All @@ -7328,88 +7312,49 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
}}

nodeStatusUpdateEval := evals[0]
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))

// Process the evaluation
err := h.Process(NewServiceScheduler, nodeStatusUpdateEval)
require.NoError(t, err)
require.Equal(t, structs.EvalStatusComplete, h.Evals[0].Status)
require.Len(t, h.Plans, 1, "plan")
must.NoError(t, err)
must.Eq(t, structs.EvalStatusComplete, h.Evals[0].Status)
must.Len(t, 1, h.Plans, must.Sprint("expected a plan"))

// Two followup delayed eval created
require.Len(t, h.CreateEvals, 2)
must.Len(t, 2, h.CreateEvals)
followUpEval1 := h.CreateEvals[0]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval1.PreviousEval)
require.Equal(t, "pending", followUpEval1.Status)
require.NotEmpty(t, followUpEval1.WaitUntil)
must.Eq(t, nodeStatusUpdateEval.ID, followUpEval1.PreviousEval)
must.Eq(t, "pending", followUpEval1.Status)
must.NotEq(t, time.Time{}, followUpEval1.WaitUntil)

followUpEval2 := h.CreateEvals[1]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval2.PreviousEval)
require.Equal(t, "pending", followUpEval2.Status)
require.NotEmpty(t, followUpEval2.WaitUntil)

// Insert eval1 in the state store
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(nil, followUpEval1.ID)
if err != nil {
return false, err
}
if found == nil {
return false, nil
}

require.Equal(t, nodeStatusUpdateEval.ID, found.PreviousEval)
require.Equal(t, "pending", found.Status)
require.NotEmpty(t, found.WaitUntil)

return true, nil
}, func(err error) {

require.NoError(t, err)
})

// Insert eval2 in the state store
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(nil, followUpEval2.ID)
if err != nil {
return false, err
}
if found == nil {
return false, nil
}

require.Equal(t, nodeStatusUpdateEval.ID, found.PreviousEval)
require.Equal(t, "pending", found.Status)
require.NotEmpty(t, found.WaitUntil)

return true, nil
}, func(err error) {

require.NoError(t, err)
})
must.Eq(t, nodeStatusUpdateEval.ID, followUpEval2.PreviousEval)
must.Eq(t, "pending", followUpEval2.Status)
must.NotEq(t, time.Time{}, followUpEval2.WaitUntil)

// Validate that the ClientStatus updates are part of the plan.
require.Len(t, h.Plans[0].NodeAllocation[disconnectedNode.ID], count)
// Pending update should have unknown status.
must.Len(t, count, h.Plans[0].NodeAllocation[disconnectedNode.ID])

// Pending update should have unknown status.
for _, nodeAlloc := range h.Plans[0].NodeAllocation[disconnectedNode.ID] {
require.Equal(t, nodeAlloc.ClientStatus, structs.AllocClientStatusUnknown)
}

// Simulate that NodeAllocation got processed.
err = h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), h.Plans[0].NodeAllocation[disconnectedNode.ID])
require.NoError(t, err, "plan.NodeUpdate")
must.NoError(t, h.State.UpsertAllocs(
structs.MsgTypeTestSetup, h.NextIndex(),
h.Plans[0].NodeAllocation[disconnectedNode.ID]))

// Validate that the StateStore Upsert applied the ClientStatus we specified.

for _, alloc := range unknownAllocs {
alloc, err = h.State.AllocByID(nil, alloc.ID)
require.NoError(t, err)
require.Equal(t, alloc.ClientStatus, structs.AllocClientStatusUnknown)
must.NoError(t, err)
must.Eq(t, alloc.ClientStatus, structs.AllocClientStatusUnknown)

// Allocations have been transitioned to unknown
require.Equal(t, structs.AllocDesiredStatusRun, alloc.DesiredStatus)
require.Equal(t, structs.AllocClientStatusUnknown, alloc.ClientStatus)
must.Eq(t, structs.AllocDesiredStatusRun, alloc.DesiredStatus)
must.Eq(t, structs.AllocClientStatusUnknown, alloc.ClientStatus)
}
})
}
Expand Down

0 comments on commit 70a9a8c

Please sign in to comment.