Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent pruning and reaping of TaggedVersion jobs #23983

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ OUTER:

// Job is eligible for garbage collection
if allEvalsGC {
// if any version of the job is tagged, it should be kept
versions, err := c.snap.JobVersionsByID(ws, job.Namespace, job.ID)
if err != nil {
c.logger.Error("job GC failed to get versions for job", "job", job.ID, "error", err)
continue
}
for _, v := range versions {
if v.TaggedVersion != nil {
continue OUTER
}
}
gcJob = append(gcJob, job)
gcAlloc = append(gcAlloc, jobAlloc...)
gcEval = append(gcEval, jobEval...)
Expand Down
88 changes: 88 additions & 0 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,94 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
})
}

// A job that has any of its versions tagged should not be GC-able.
func TestCoreScheduler_EvalGC_JobTaggedVersion(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

store := s1.fsm.State()
job := mock.MinJob()
job.Stop = true // to be GC-able

// to be GC-able, the job needs an associated eval with a terminal Status,
// so that the job gets considered "dead" and not "pending"
// NOTE: this needs to come before UpsertJob for some Mystery Reason
// (otherwise job Status ends up as "pending" later)
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusComplete
must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 999, []*structs.Evaluation{eval}))
// upsert a couple versions of the job, so the "jobs" table has one
// and the "job_version" table has two.
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job.Copy()))
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1001, nil, job.Copy()))

jobExists := func(t *testing.T) bool {
t.Helper()
// any job at all
jobs, err := store.Jobs(nil, state.SortDefault)
must.NoError(t, err, must.Sprint("error getting jobs"))
return jobs.Next() != nil
}
forceGC := func(t *testing.T) {
t.Helper()
snap, err := store.Snapshot()
must.NoError(t, err)
core := NewCoreScheduler(s1, snap)

idx, err := store.LatestIndex()
must.NoError(t, err)
gc := s1.coreJobEval(structs.CoreJobForceGC, idx+1)

must.NoError(t, core.Process(gc))
}

applyTag := func(t *testing.T, idx, version uint64, name, desc string) {
t.Helper()
must.NoError(t, store.UpdateJobVersionTag(idx, job.Namespace,
&structs.JobApplyTagRequest{
JobID: job.ID,
Name: name,
Tag: &structs.JobTaggedVersion{
Name: name,
Description: desc,
},
Version: version,
}))
}
unsetTag := func(t *testing.T, idx uint64, name string) {
t.Helper()
must.NoError(t, store.UpdateJobVersionTag(idx, job.Namespace,
&structs.JobApplyTagRequest{
JobID: job.ID,
Name: name,
Tag: nil, // this triggers the deletion
}))
}

// tagging the latest version (latest of the 2 jobs, 0 and 1, is 1)
// will tag the job in the "jobs" table, which should protect from GC
applyTag(t, 2000, 1, "v1", "version 1")
forceGC(t)
must.True(t, jobExists(t), must.Sprint("latest job version being tagged should protect from GC"))

// untagging latest and tagging the oldest (only one in "job_version" table)
// should also protect from GC
unsetTag(t, 3000, "v1")
applyTag(t, 3001, 0, "v0", "version 0")
forceGC(t)
must.True(t, jobExists(t), must.Sprint("old job version being tagged should protect from GC"))

//untagging v0 should leave no tags left, so GC should delete the job
//and all its versions
unsetTag(t, 4000, "v0")
forceGC(t)
must.False(t, jobExists(t), must.Sprint("all tags being removed should enable GC"))
}

func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
ci.Parallel(t)

Expand Down
5 changes: 5 additions & 0 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@ func jobIsGCable(obj interface{}) (bool, error) {
return false, fmt.Errorf("Unexpected type: %v", obj)
}

// job versions that are tagged should be kept
if j.TaggedVersion != nil {
return false, nil
}

// If the job is periodic or parameterized it is only garbage collectable if
// it is stopped.
periodic := j.Periodic != nil && j.Periodic.Enabled
Expand Down
8 changes: 8 additions & 0 deletions nomad/state/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,14 @@ func Test_jobIsGCable(t *testing.T) {
expectedOutput: true,
expectedOutputError: nil,
},
{
name: "tagged",
inputObj: &structs.Job{
TaggedVersion: &structs.JobTaggedVersion{Name: "any"},
},
expectedOutput: false,
expectedOutputError: nil,
},
}

for _, tc := range testCases {
Expand Down
19 changes: 15 additions & 4 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2188,10 +2188,21 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn)
all[max-1], all[max] = all[max], all[max-1]
}

// Delete the job outside of the set that are being kept.
d := all[max]
if err := txn.Delete("job_version", d); err != nil {
return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version)
// Find the oldest non-tagged version to delete
deleteIdx := -1
for i := len(all) - 1; i >= max; i-- {
if all[i].TaggedVersion == nil {
deleteIdx = i
break
}
}

// If we found a non-tagged version to delete, delete it
if deleteIdx != -1 {
d := all[deleteIdx]
if err := txn.Delete("job_version", d); err != nil {
return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version)
}
}

return nil
Expand Down
105 changes: 105 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2901,6 +2901,111 @@ func TestStateStore_DeleteJobTxn_BatchDeletes(t *testing.T) {
require.Equal(t, deletionIndex, index)
}

// TestStatestore_JobTaggedVersion tests that job versions which are tagged
// do not count against the configured server.job_tracked_versions count,
// do not get deleted when new versions are created,
// and *do* get deleted immediately when its tag is removed.
func TestStatestore_JobTaggedVersion(t *testing.T) {
ci.Parallel(t)

state := testStateStore(t)

job := mock.MinJob()
job.Stable = true

// add job and fill the versions bucket
state.config.JobTrackedVersions = 5
for range state.config.JobTrackedVersions {
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, nextIndex(state), nil, job.Copy()))
}

// tag versions 0-2
for x := range 3 {
v := uint64(x)
name := fmt.Sprintf("v%d", x)
desc := fmt.Sprintf("version %d", x)

// apply tag
must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{
JobID: job.ID,
Name: name,
Tag: &structs.JobTaggedVersion{
Name: name,
Description: desc,
},
Version: v,
}))

// confirm
got, err := state.JobVersionByTagName(nil, job.Namespace, job.ID, name)
must.NoError(t, err)
must.Eq(t, name, got.TaggedVersion.Name)
must.Eq(t, desc, got.TaggedVersion.Description)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're testing the cases where we've got a "full" tracked versions when we tag, but not when we don't. It might make more sense to insert < max tracked, tag one, insert the rest, and then tag again. That'll make sure we've covered both paths.


// take a little detour to test error conditions with the setup so far
t.Run("errors", func(t *testing.T) {
// job does not exist
err := state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{
JobID: "non-existent-job",
Tag: &structs.JobTaggedVersion{Name: "tag name"},
Version: 0,
})
must.ErrorContains(t, err, `job "non-existent-job" version 0 not found`)

// version does not exist
err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{
JobID: job.ID,
Tag: &structs.JobTaggedVersion{Name: "tag name"},
Version: 999,
})
must.ErrorContains(t, err, fmt.Sprintf("job %q version 999 not found", job.ID))

// tag name already exists
err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{
JobID: job.ID,
Tag: &structs.JobTaggedVersion{Name: "v0"},
Version: 3,
})
must.ErrorContains(t, err, fmt.Sprintf(`"v0" already exists on a different version of job %q`, job.ID))
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to tell because the PRs for this feature have been running for a while now, but don't we already have test assertions for these state store methods in the PR that introduced them? Re-testing them doesn't seem useful.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's a bit tricky to hold it all in mind, but I don't believe any of the parent branches/PRs have any test coverage at this layer (I don't see state_store_test.go in other PRs), and when I started testing the "don't let tagged versions go away" logic, it felt natural to elaborate here since I'd already done the requisite setup.


// make some more jobs; they should replace 3-4 but leave 0-2 alone
for range 10 {
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, nextIndex(state), nil, job.Copy()))
}

assertVersions := func(t *testing.T, expect []uint64) {
t.Helper()
jobs, err := state.JobVersionsByID(nil, job.Namespace, job.ID)
must.NoError(t, err)
vs := make([]uint64, len(jobs))
for i, j := range jobs {
vs[i] = j.Version
}
must.Eq(t, expect, vs)
}

// there should be this many: JobTrackedVersions + # of tagged versions
// we tagged 3 of them, so 5 + 3 = 8
assertVersions(t, []uint64{14, 13, 12, 11, 10, 2, 1, 0})

// untag version 1 - it should get deleted immediately,
// since we have more than JobTrackedVersions.
must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{
JobID: job.ID,
Name: "v1",
Tag: nil, // this triggers unset
}))
assertVersions(t, []uint64{14, 13, 12, 11, 10, 2, 0})

// deleting all versions should also delete tagged versions
txn := state.db.WriteTxn(nextIndex(state))
must.NoError(t, state.deleteJobVersions(nextIndex(state), job, txn))
must.NoError(t, txn.Commit())
assertVersions(t, []uint64{})
}

func TestStateStore_DeleteJob_MultipleVersions(t *testing.T) {
ci.Parallel(t)

Expand Down