diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 83999d064eb..a4bb53a1e2d 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -156,6 +156,17 @@ OUTER: // Job is eligible for garbage collection if allEvalsGC { + // if any version of the job is tagged, it should be kept + versions, err := c.snap.JobVersionsByID(ws, job.Namespace, job.ID) + if err != nil { + c.logger.Error("job GC failed to get versions for job", "job", job.ID, "error", err) + continue + } + for _, v := range versions { + if v.TaggedVersion != nil { + continue OUTER + } + } gcJob = append(gcJob, job) gcAlloc = append(gcAlloc, jobAlloc...) gcEval = append(gcEval, jobEval...) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index b47d0ebdac9..f77da894409 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -610,6 +610,94 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) { }) } +// A job that has any of its versions tagged should not be GC-able. +func TestCoreScheduler_EvalGC_JobTaggedVersion(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + store := s1.fsm.State() + job := mock.MinJob() + job.Stop = true // to be GC-able + + // to be GC-able, the job needs an associated eval with a terminal Status, + // so that the job gets considered "dead" and not "pending" + // NOTE: this needs to come before UpsertJob for some Mystery Reason + // (otherwise job Status ends up as "pending" later) + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = structs.EvalStatusComplete + must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 999, []*structs.Evaluation{eval})) + // upsert a couple versions of the job, so the "jobs" table has one + // and the "job_version" table has two. + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job.Copy())) + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1001, nil, job.Copy())) + + jobExists := func(t *testing.T) bool { + t.Helper() + // any job at all + jobs, err := store.Jobs(nil, state.SortDefault) + must.NoError(t, err, must.Sprint("error getting jobs")) + return jobs.Next() != nil + } + forceGC := func(t *testing.T) { + t.Helper() + snap, err := store.Snapshot() + must.NoError(t, err) + core := NewCoreScheduler(s1, snap) + + idx, err := store.LatestIndex() + must.NoError(t, err) + gc := s1.coreJobEval(structs.CoreJobForceGC, idx+1) + + must.NoError(t, core.Process(gc)) + } + + applyTag := func(t *testing.T, idx, version uint64, name, desc string) { + t.Helper() + must.NoError(t, store.UpdateJobVersionTag(idx, job.Namespace, + &structs.JobApplyTagRequest{ + JobID: job.ID, + Name: name, + Tag: &structs.JobTaggedVersion{ + Name: name, + Description: desc, + }, + Version: version, + })) + } + unsetTag := func(t *testing.T, idx uint64, name string) { + t.Helper() + must.NoError(t, store.UpdateJobVersionTag(idx, job.Namespace, + &structs.JobApplyTagRequest{ + JobID: job.ID, + Name: name, + Tag: nil, // this triggers the deletion + })) + } + + // tagging the latest version (latest of the 2 jobs, 0 and 1, is 1) + // will tag the job in the "jobs" table, which should protect from GC + applyTag(t, 2000, 1, "v1", "version 1") + forceGC(t) + must.True(t, jobExists(t), must.Sprint("latest job version being tagged should protect from GC")) + + // untagging latest and tagging the oldest (only one in "job_version" table) + // should also protect from GC + unsetTag(t, 3000, "v1") + applyTag(t, 3001, 0, "v0", "version 0") + forceGC(t) + must.True(t, jobExists(t), must.Sprint("old job version being tagged should protect from GC")) + + //untagging v0 should leave no tags left, so GC should delete the job + //and all its versions + unsetTag(t, 4000, "v0") + forceGC(t) + must.False(t, jobExists(t), must.Sprint("all tags being removed should enable GC")) +} + func TestCoreScheduler_EvalGC_Partial(t *testing.T) { ci.Parallel(t) diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 7aaafd5cfef..6182caf5137 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -372,6 +372,11 @@ func jobIsGCable(obj interface{}) (bool, error) { return false, fmt.Errorf("Unexpected type: %v", obj) } + // job versions that are tagged should be kept + if j.TaggedVersion != nil { + return false, nil + } + // If the job is periodic or parameterized it is only garbage collectable if // it is stopped. periodic := j.Periodic != nil && j.Periodic.Enabled diff --git a/nomad/state/schema_test.go b/nomad/state/schema_test.go index 657cc808bd0..9ffdf78434b 100644 --- a/nomad/state/schema_test.go +++ b/nomad/state/schema_test.go @@ -242,6 +242,14 @@ func Test_jobIsGCable(t *testing.T) { expectedOutput: true, expectedOutputError: nil, }, + { + name: "tagged", + inputObj: &structs.Job{ + TaggedVersion: &structs.JobTaggedVersion{Name: "any"}, + }, + expectedOutput: false, + expectedOutputError: nil, + }, } for _, tc := range testCases { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index bb416c48225..30f05ed9e8a 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2188,10 +2188,21 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) all[max-1], all[max] = all[max], all[max-1] } - // Delete the job outside of the set that are being kept. - d := all[max] - if err := txn.Delete("job_version", d); err != nil { - return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version) + // Find the oldest non-tagged version to delete + deleteIdx := -1 + for i := len(all) - 1; i >= max; i-- { + if all[i].TaggedVersion == nil { + deleteIdx = i + break + } + } + + // If we found a non-tagged version to delete, delete it + if deleteIdx != -1 { + d := all[deleteIdx] + if err := txn.Delete("job_version", d); err != nil { + return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version) + } } return nil diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index cbb81e50f1a..5e44988e437 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2901,6 +2901,152 @@ func TestStateStore_DeleteJobTxn_BatchDeletes(t *testing.T) { require.Equal(t, deletionIndex, index) } +// TestStatestore_JobTaggedVersion tests that job versions which are tagged +// do not count against the configured server.job_tracked_versions count, +// do not get deleted when new versions are created, +// and *do* get deleted immediately when its tag is removed. +func TestStatestore_JobTaggedVersion(t *testing.T) { + ci.Parallel(t) + + state := testStateStore(t) + // tagged versions should be excluded from this limit + state.config.JobTrackedVersions = 5 + + job := mock.MinJob() + job.Stable = true + + // helpers for readability + upsertJob := func(t *testing.T) { + t.Helper() + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, nextIndex(state), nil, job.Copy())) + } + + applyTag := func(t *testing.T, version uint64) { + t.Helper() + name := fmt.Sprintf("v%d", version) + desc := fmt.Sprintf("version %d", version) + req := &structs.JobApplyTagRequest{ + JobID: job.ID, + Name: name, + Tag: &structs.JobTaggedVersion{ + Name: name, + Description: desc, + }, + Version: version, + } + must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, req)) + + // confirm + got, err := state.JobVersionByTagName(nil, job.Namespace, job.ID, name) + must.NoError(t, err) + must.Eq(t, version, got.Version) + must.Eq(t, name, got.TaggedVersion.Name) + must.Eq(t, desc, got.TaggedVersion.Description) + } + unsetTag := func(t *testing.T, name string) { + t.Helper() + req := &structs.JobApplyTagRequest{ + JobID: job.ID, + Name: name, + Tag: nil, // this triggers unset + } + must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, req)) + } + + assertVersions := func(t *testing.T, expect []uint64) { + t.Helper() + jobs, err := state.JobVersionsByID(nil, job.Namespace, job.ID) + must.NoError(t, err) + vs := make([]uint64, len(jobs)) + for i, j := range jobs { + vs[i] = j.Version + } + must.Eq(t, expect, vs) + } + + // we want to end up with JobTrackedVersions (5) versions, + // 0-2 tagged and 3-4 untagged, but also interleave the tagging + // to be somewhat true to normal behavior in reality. + { + // upsert 3 jobs + for range 3 { + upsertJob(t) + } + assertVersions(t, []uint64{2, 1, 0}) + + // tag 2 of them + applyTag(t, 0) + applyTag(t, 1) + // nothing should change + assertVersions(t, []uint64{2, 1, 0}) + + // add 2 more, up to JobTrackedVersions (5) + upsertJob(t) + upsertJob(t) + assertVersions(t, []uint64{4, 3, 2, 1, 0}) + + // tag one more + applyTag(t, 2) + // again nothing should change + assertVersions(t, []uint64{4, 3, 2, 1, 0}) + } + + // removing a tag at this point should leave the version in place + { + unsetTag(t, "v2") + assertVersions(t, []uint64{4, 3, 2, 1, 0}) + } + + // adding more versions should replace 2-4, + // and leave 0-1 in place because they are tagged + { + for range 10 { + upsertJob(t) + } + assertVersions(t, []uint64{14, 13, 12, 11, 10, 1, 0}) + } + + // untagging version 1 now should delete it immediately, + // since we now have more than JobTrackedVersions + { + unsetTag(t, "v1") + assertVersions(t, []uint64{14, 13, 12, 11, 10, 0}) + } + + // test some error conditions + { + // job does not exist + err := state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{ + JobID: "non-existent-job", + Tag: &structs.JobTaggedVersion{Name: "tag name"}, + Version: 0, + }) + must.ErrorContains(t, err, `job "non-existent-job" version 0 not found`) + + // version does not exist + err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{ + JobID: job.ID, + Tag: &structs.JobTaggedVersion{Name: "tag name"}, + Version: 999, + }) + must.ErrorContains(t, err, fmt.Sprintf("job %q version 999 not found", job.ID)) + + // tag name already exists + err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{ + JobID: job.ID, + Tag: &structs.JobTaggedVersion{Name: "v0"}, + Version: 10, + }) + must.ErrorContains(t, err, fmt.Sprintf(`"v0" already exists on a different version of job %q`, job.ID)) + } + + // deleting all versions should also delete tagged versions + txn := state.db.WriteTxn(nextIndex(state)) + must.NoError(t, state.deleteJobVersions(nextIndex(state), job, txn)) + must.NoError(t, txn.Commit()) + assertVersions(t, []uint64{}) +} + func TestStateStore_DeleteJob_MultipleVersions(t *testing.T) { ci.Parallel(t)