Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent pruning and reaping of TaggedVersion jobs #23983

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ OUTER:

// Job is eligible for garbage collection
if allEvalsGC {
// if any version of the job is tagged, it should be kept
versions, err := c.snap.JobVersionsByID(ws, job.Namespace, job.ID)
if err != nil {
c.logger.Error("job GC failed to get versions for job", "job", job.ID, "error", err)
continue
}
for _, v := range versions {
if v.TaggedVersion != nil {
continue OUTER
}
}
gcJob = append(gcJob, job)
gcAlloc = append(gcAlloc, jobAlloc...)
gcEval = append(gcEval, jobEval...)
Expand Down
88 changes: 88 additions & 0 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,94 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
})
}

// A job that has any of its versions tagged should not be GC-able.
func TestCoreScheduler_EvalGC_JobTaggedVersion(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

store := s1.fsm.State()
job := mock.MinJob()
job.Stop = true // to be GC-able

// to be GC-able, the job needs an associated eval with a terminal Status,
// so that the job gets considered "dead" and not "pending"
// NOTE: this needs to come before UpsertJob for some Mystery Reason
// (otherwise job Status ends up as "pending" later)
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusComplete
must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 999, []*structs.Evaluation{eval}))
// upsert a couple versions of the job, so the "jobs" table has one
// and the "job_version" table has two.
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job.Copy()))
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1001, nil, job.Copy()))

jobExists := func(t *testing.T) bool {
t.Helper()
// any job at all
jobs, err := store.Jobs(nil, state.SortDefault)
must.NoError(t, err, must.Sprint("error getting jobs"))
return jobs.Next() != nil
}
forceGC := func(t *testing.T) {
t.Helper()
snap, err := store.Snapshot()
must.NoError(t, err)
core := NewCoreScheduler(s1, snap)

idx, err := store.LatestIndex()
must.NoError(t, err)
gc := s1.coreJobEval(structs.CoreJobForceGC, idx+1)

must.NoError(t, core.Process(gc))
}

applyTag := func(t *testing.T, idx, version uint64, name, desc string) {
t.Helper()
must.NoError(t, store.UpdateJobVersionTag(idx, job.Namespace,
&structs.JobApplyTagRequest{
JobID: job.ID,
Name: name,
Tag: &structs.JobTaggedVersion{
Name: name,
Description: desc,
},
Version: version,
}))
}
unsetTag := func(t *testing.T, idx uint64, name string) {
t.Helper()
must.NoError(t, store.UpdateJobVersionTag(idx, job.Namespace,
&structs.JobApplyTagRequest{
JobID: job.ID,
Name: name,
Tag: nil, // this triggers the deletion
}))
}

// tagging the latest version (latest of the 2 jobs, 0 and 1, is 1)
// will tag the job in the "jobs" table, which should protect from GC
applyTag(t, 2000, 1, "v1", "version 1")
forceGC(t)
must.True(t, jobExists(t), must.Sprint("latest job version being tagged should protect from GC"))

// untagging latest and tagging the oldest (only one in "job_version" table)
// should also protect from GC
unsetTag(t, 3000, "v1")
applyTag(t, 3001, 0, "v0", "version 0")
forceGC(t)
must.True(t, jobExists(t), must.Sprint("old job version being tagged should protect from GC"))

//untagging v0 should leave no tags left, so GC should delete the job
//and all its versions
unsetTag(t, 4000, "v0")
forceGC(t)
must.False(t, jobExists(t), must.Sprint("all tags being removed should enable GC"))
}

func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
ci.Parallel(t)

Expand Down
5 changes: 5 additions & 0 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@ func jobIsGCable(obj interface{}) (bool, error) {
return false, fmt.Errorf("Unexpected type: %v", obj)
}

// job versions that are tagged should be kept
if j.TaggedVersion != nil {
return false, nil
}

// If the job is periodic or parameterized it is only garbage collectable if
// it is stopped.
periodic := j.Periodic != nil && j.Periodic.Enabled
Expand Down
8 changes: 8 additions & 0 deletions nomad/state/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,14 @@ func Test_jobIsGCable(t *testing.T) {
expectedOutput: true,
expectedOutputError: nil,
},
{
name: "tagged",
inputObj: &structs.Job{
TaggedVersion: &structs.JobTaggedVersion{Name: "any"},
},
expectedOutput: false,
expectedOutputError: nil,
},
}

for _, tc := range testCases {
Expand Down
19 changes: 15 additions & 4 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2188,10 +2188,21 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn)
all[max-1], all[max] = all[max], all[max-1]
}

// Delete the job outside of the set that are being kept.
d := all[max]
if err := txn.Delete("job_version", d); err != nil {
return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version)
// Find the oldest non-tagged version to delete
deleteIdx := -1
for i := len(all) - 1; i >= max; i-- {
if all[i].TaggedVersion == nil {
deleteIdx = i
break
}
}

// If we found a non-tagged version to delete, delete it
if deleteIdx != -1 {
d := all[deleteIdx]
if err := txn.Delete("job_version", d); err != nil {
return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version)
}
}

return nil
Expand Down
146 changes: 146 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2901,6 +2901,152 @@ func TestStateStore_DeleteJobTxn_BatchDeletes(t *testing.T) {
require.Equal(t, deletionIndex, index)
}

// TestStatestore_JobTaggedVersion tests that job versions which are tagged
// do not count against the configured server.job_tracked_versions count,
// do not get deleted when new versions are created,
// and *do* get deleted immediately when its tag is removed.
func TestStatestore_JobTaggedVersion(t *testing.T) {
ci.Parallel(t)

state := testStateStore(t)
// tagged versions should be excluded from this limit
state.config.JobTrackedVersions = 5

job := mock.MinJob()
job.Stable = true

// helpers for readability
upsertJob := func(t *testing.T) {
t.Helper()
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, nextIndex(state), nil, job.Copy()))
}

applyTag := func(t *testing.T, version uint64) {
t.Helper()
name := fmt.Sprintf("v%d", version)
desc := fmt.Sprintf("version %d", version)
req := &structs.JobApplyTagRequest{
JobID: job.ID,
Name: name,
Tag: &structs.JobTaggedVersion{
Name: name,
Description: desc,
},
Version: version,
}
must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, req))

// confirm
got, err := state.JobVersionByTagName(nil, job.Namespace, job.ID, name)
must.NoError(t, err)
must.Eq(t, version, got.Version)
must.Eq(t, name, got.TaggedVersion.Name)
must.Eq(t, desc, got.TaggedVersion.Description)
}
unsetTag := func(t *testing.T, name string) {
t.Helper()
req := &structs.JobApplyTagRequest{
JobID: job.ID,
Name: name,
Tag: nil, // this triggers unset
}
must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, req))
}

assertVersions := func(t *testing.T, expect []uint64) {
t.Helper()
jobs, err := state.JobVersionsByID(nil, job.Namespace, job.ID)
must.NoError(t, err)
vs := make([]uint64, len(jobs))
for i, j := range jobs {
vs[i] = j.Version
}
must.Eq(t, expect, vs)
}

// we want to end up with JobTrackedVersions (5) versions,
// 0-2 tagged and 3-4 untagged, but also interleave the tagging
// to be somewhat true to normal behavior in reality.
{
// upsert 3 jobs
for range 3 {
upsertJob(t)
}
assertVersions(t, []uint64{2, 1, 0})

// tag 2 of them
applyTag(t, 0)
applyTag(t, 1)
// nothing should change
assertVersions(t, []uint64{2, 1, 0})

// add 2 more, up to JobTrackedVersions (5)
upsertJob(t)
upsertJob(t)
assertVersions(t, []uint64{4, 3, 2, 1, 0})

// tag one more
applyTag(t, 2)
// again nothing should change
assertVersions(t, []uint64{4, 3, 2, 1, 0})
}

// removing a tag at this point should leave the version in place
{
unsetTag(t, "v2")
assertVersions(t, []uint64{4, 3, 2, 1, 0})
}

// adding more versions should replace 2-4,
// and leave 0-1 in place because they are tagged
{
for range 10 {
upsertJob(t)
}
assertVersions(t, []uint64{14, 13, 12, 11, 10, 1, 0})
}

// untagging version 1 now should delete it immediately,
// since we now have more than JobTrackedVersions
{
unsetTag(t, "v1")
assertVersions(t, []uint64{14, 13, 12, 11, 10, 0})
}

// test some error conditions
{
// job does not exist
err := state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{
JobID: "non-existent-job",
Tag: &structs.JobTaggedVersion{Name: "tag name"},
Version: 0,
})
must.ErrorContains(t, err, `job "non-existent-job" version 0 not found`)

// version does not exist
err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{
JobID: job.ID,
Tag: &structs.JobTaggedVersion{Name: "tag name"},
Version: 999,
})
must.ErrorContains(t, err, fmt.Sprintf("job %q version 999 not found", job.ID))

// tag name already exists
err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{
JobID: job.ID,
Tag: &structs.JobTaggedVersion{Name: "v0"},
Version: 10,
})
must.ErrorContains(t, err, fmt.Sprintf(`"v0" already exists on a different version of job %q`, job.ID))
}

// deleting all versions should also delete tagged versions
txn := state.db.WriteTxn(nextIndex(state))
must.NoError(t, state.deleteJobVersions(nextIndex(state), job, txn))
must.NoError(t, txn.Commit())
assertVersions(t, []uint64{})
}

func TestStateStore_DeleteJob_MultipleVersions(t *testing.T) {
ci.Parallel(t)

Expand Down