From d37b0048c22b5fc83f1bc167b2f2350cf3256624 Mon Sep 17 00:00:00 2001 From: Phil Renaud Date: Tue, 17 Sep 2024 16:46:15 -0400 Subject: [PATCH 1/5] First pass for preventing a job from being deleted if it has a TaggedVersion --- nomad/core_sched.go | 31 +++++++++++++++++++++++++++---- nomad/state/state_store.go | 21 ++++++++++++++++----- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 83999d064eb..2ccd526a6b8 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -127,8 +127,8 @@ OUTER: for i := iter.Next(); i != nil; i = iter.Next() { job := i.(*structs.Job) - // Ignore new jobs. - if job.CreateIndex > oldThreshold { + // Ignore new jobs and jobs with TaggedVersion + if job.CreateIndex > oldThreshold || job.TaggedVersion != nil { continue } @@ -156,7 +156,21 @@ OUTER: // Job is eligible for garbage collection if allEvalsGC { - gcJob = append(gcJob, job) + safeToDelete := true + versions, err := c.snap.JobVersionsByID(ws, job.Namespace, job.ID) + if err != nil { + c.logger.Error("job GC failed to get versions for job", "job", job.ID, "error", err) + continue + } + for _, version := range versions { + if version.TaggedVersion != nil { + safeToDelete = false + break + } + } + if safeToDelete { + gcJob = append(gcJob, job) + } gcAlloc = append(gcAlloc, jobAlloc...) gcEval = append(gcEval, jobEval...) } @@ -182,8 +196,16 @@ OUTER: // jobReap contacts the leader and issues a reap on the passed jobs func (c *CoreScheduler) jobReap(jobs []*structs.Job, leaderACL string) error { + // Filter out jobs with TaggedVersion + jobsToReap := make([]*structs.Job, 0, len(jobs)) + for _, job := range jobs { + if job.TaggedVersion == nil { + jobsToReap = append(jobsToReap, job) + } + } + // Call to the leader to issue the reap - for _, req := range c.partitionJobReap(jobs, leaderACL, structs.MaxUUIDsPerWriteRequest) { + for _, req := range c.partitionJobReap(jobsToReap, leaderACL, structs.MaxUUIDsPerWriteRequest) { var resp structs.JobBatchDeregisterResponse if err := c.srv.RPC(structs.JobBatchDeregisterRPCMethod, req, &resp); err != nil { c.logger.Error("batch job reap failed", "error", err) @@ -214,6 +236,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string, if remaining := len(jobs) - submittedJobs; remaining > 0 { if remaining <= available { + // TODO: do I need to check for TaggedVersion here as well? for _, job := range jobs[submittedJobs:] { jns := structs.NamespacedID{ID: job.ID, Namespace: job.Namespace} req.Jobs[jns] = option diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index bb416c48225..01e186ef418 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2171,7 +2171,7 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) return nil } - // We have to delete a historic job to make room. + // We have to delete historic jobs to make room. // Find index of the highest versioned stable job stableIdx := -1 for i, j := range all { @@ -2188,10 +2188,21 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) all[max-1], all[max] = all[max], all[max-1] } - // Delete the job outside of the set that are being kept. - d := all[max] - if err := txn.Delete("job_version", d); err != nil { - return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version) + // Find the oldest non-tagged version to delete + deleteIdx := -1 + for i := len(all) - 1; i >= max; i-- { + if all[i].TaggedVersion == nil { + deleteIdx = i + break + } + } + + // If we found a non-tagged version to delete, delete it + if deleteIdx != -1 { + d := all[deleteIdx] + if err := txn.Delete("job_version", d); err != nil { + return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version) + } } return nil From 94674a71e5cc9a0e5b85fe7e80de6f89b70ebbcb Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Tue, 24 Sep 2024 10:22:59 -0400 Subject: [PATCH 2/5] some changes, add tests --- nomad/core_sched.go | 28 ++++-------- nomad/core_sched_test.go | 62 +++++++++++++++++++++++++ nomad/state/schema.go | 5 ++ nomad/state/schema_test.go | 8 ++++ nomad/state/state_store_test.go | 81 +++++++++++++++++++++++++++++++++ 5 files changed, 164 insertions(+), 20 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 2ccd526a6b8..a4bb53a1e2d 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -127,8 +127,8 @@ OUTER: for i := iter.Next(); i != nil; i = iter.Next() { job := i.(*structs.Job) - // Ignore new jobs and jobs with TaggedVersion - if job.CreateIndex > oldThreshold || job.TaggedVersion != nil { + // Ignore new jobs. + if job.CreateIndex > oldThreshold { continue } @@ -156,21 +156,18 @@ OUTER: // Job is eligible for garbage collection if allEvalsGC { - safeToDelete := true + // if any version of the job is tagged, it should be kept versions, err := c.snap.JobVersionsByID(ws, job.Namespace, job.ID) if err != nil { c.logger.Error("job GC failed to get versions for job", "job", job.ID, "error", err) continue } - for _, version := range versions { - if version.TaggedVersion != nil { - safeToDelete = false - break + for _, v := range versions { + if v.TaggedVersion != nil { + continue OUTER } } - if safeToDelete { - gcJob = append(gcJob, job) - } + gcJob = append(gcJob, job) gcAlloc = append(gcAlloc, jobAlloc...) gcEval = append(gcEval, jobEval...) } @@ -196,16 +193,8 @@ OUTER: // jobReap contacts the leader and issues a reap on the passed jobs func (c *CoreScheduler) jobReap(jobs []*structs.Job, leaderACL string) error { - // Filter out jobs with TaggedVersion - jobsToReap := make([]*structs.Job, 0, len(jobs)) - for _, job := range jobs { - if job.TaggedVersion == nil { - jobsToReap = append(jobsToReap, job) - } - } - // Call to the leader to issue the reap - for _, req := range c.partitionJobReap(jobsToReap, leaderACL, structs.MaxUUIDsPerWriteRequest) { + for _, req := range c.partitionJobReap(jobs, leaderACL, structs.MaxUUIDsPerWriteRequest) { var resp structs.JobBatchDeregisterResponse if err := c.srv.RPC(structs.JobBatchDeregisterRPCMethod, req, &resp); err != nil { c.logger.Error("batch job reap failed", "error", err) @@ -236,7 +225,6 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string, if remaining := len(jobs) - submittedJobs; remaining > 0 { if remaining <= available { - // TODO: do I need to check for TaggedVersion here as well? for _, job := range jobs[submittedJobs:] { jns := structs.NamespacedID{ID: job.ID, Namespace: job.Namespace} req.Jobs[jns] = option diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index b47d0ebdac9..3bb78856044 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -610,6 +610,68 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) { }) } +// A job that has any of its versions tagged should not be GC-able. +func TestCoreScheduler_EvalGC_JobVersionTags(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + store := s1.fsm.State() + job := mock.MinJob() + job.Stop = true // to be GC-able + + // upsert a couple versions of the job, so the "jobs" table has one + // and the "job_version" table has two. + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job.Copy())) + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1001, nil, job.Copy())) + // to be GC-able, also need an associated eval with a terminal Status + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = structs.EvalStatusComplete + must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval})) + + jobExists := func(t *testing.T) bool { + t.Helper() + // any job at all + jobs, err := store.Jobs(nil, state.SortDefault) + must.NoError(t, err, must.Sprint("error getting jobs")) + return jobs.Next() != nil + } + forceGC := func(t *testing.T) { + t.Helper() + snap, err := store.Snapshot() + must.NoError(t, err) + core := NewCoreScheduler(s1, snap) + + idx, err := store.LatestIndex() + must.NoError(t, err) + gc := s1.coreJobEval(structs.CoreJobForceGC, idx+1) + + must.NoError(t, core.Process(gc)) + } + + // tagging the latest version (latest of the 2 jobs, 0 and 1, is 1) + // will tag the job in the "jobs" table, which should protect from GC + must.NoError(t, store.UpdateJobVersionTag(2000, job.Namespace, job.ID, 1, "v1", "version 1")) + forceGC(t) + must.True(t, jobExists(t), must.Sprint("latest job version being tagged should protect from GC")) + + // untagging latest and tagging the oldest, which is only in "job_version" + // table should also protect from GC + must.NoError(t, store.UnsetJobVersionTag(3000, job.Namespace, job.ID, "v1")) + must.NoError(t, store.UpdateJobVersionTag(3001, job.Namespace, job.ID, 0, "v0", "version 0")) + forceGC(t) + must.True(t, jobExists(t), must.Sprint("old job version being tagged should protect from GC")) + + // untagging v0 should leave no tags left, so GC should delete the job + // and all its versions + must.NoError(t, store.UnsetJobVersionTag(4000, job.Namespace, job.ID, "v0")) + forceGC(t) + must.False(t, jobExists(t), must.Sprint("all tags being removed should enable GC")) +} + func TestCoreScheduler_EvalGC_Partial(t *testing.T) { ci.Parallel(t) diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 7aaafd5cfef..6182caf5137 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -372,6 +372,11 @@ func jobIsGCable(obj interface{}) (bool, error) { return false, fmt.Errorf("Unexpected type: %v", obj) } + // job versions that are tagged should be kept + if j.TaggedVersion != nil { + return false, nil + } + // If the job is periodic or parameterized it is only garbage collectable if // it is stopped. periodic := j.Periodic != nil && j.Periodic.Enabled diff --git a/nomad/state/schema_test.go b/nomad/state/schema_test.go index 657cc808bd0..9ffdf78434b 100644 --- a/nomad/state/schema_test.go +++ b/nomad/state/schema_test.go @@ -242,6 +242,14 @@ func Test_jobIsGCable(t *testing.T) { expectedOutput: true, expectedOutputError: nil, }, + { + name: "tagged", + inputObj: &structs.Job{ + TaggedVersion: &structs.JobTaggedVersion{Name: "any"}, + }, + expectedOutput: false, + expectedOutputError: nil, + }, } for _, tc := range testCases { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index cbb81e50f1a..8dcd7ad5d0d 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2901,6 +2901,87 @@ func TestStateStore_DeleteJobTxn_BatchDeletes(t *testing.T) { require.Equal(t, deletionIndex, index) } +// TestStatestore_JobVersionTags tests that job versions which are tagged +// do not count against the configured server.job_tracked_versions count, +// do not get deleted when new versions are created, +// and *do* get deleted immediately when its tag is removed. +func TestStatestore_JobVersionTags(t *testing.T) { + ci.Parallel(t) + + state := testStateStore(t) + + job := mock.MinJob() + job.Stable = true + + // add job and fill the versions bucket + state.config.JobTrackedVersions = 5 + for range state.config.JobTrackedVersions { + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, nextIndex(state), nil, job.Copy())) + } + + // tag versions 0-2 + for x := range 3 { + v := uint64(x) + name := fmt.Sprintf("v%d", x) + desc := fmt.Sprintf("version %d", x) + + // apply tag + must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, job.ID, v, name, desc)) + + // confirm + got, err := state.JobVersionByTagName(nil, job.Namespace, job.ID, name) + must.NoError(t, err) + must.Eq(t, name, got.TaggedVersion.Name) + must.Eq(t, desc, got.TaggedVersion.Description) + } + + // take a little detour to test error conditions with the setup so far + t.Run("errors", func(t *testing.T) { + // job does not exist + err := state.UpdateJobVersionTag(nextIndex(state), "default", "non-existent-job", 0, "tag name", "tag desc") + must.ErrorContains(t, err, `job "non-existent-job" version 0 not found`) + + // version does not exist + err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, job.ID, 999, "tag name", "tag desc") + must.ErrorContains(t, err, fmt.Sprintf("job %q version 999 not found", job.ID)) + + // tag name already exists + err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, job.ID, 3, "v0", "tag desc") + must.ErrorContains(t, err, fmt.Sprintf(`"v0" already exists on a different version of job %q`, job.ID)) + }) + + // make some more jobs; they should replace 3-4 but leave 0-2 alone + for range 10 { + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, nextIndex(state), nil, job.Copy())) + } + + assertVersions := func(t *testing.T, expect []uint64) { + t.Helper() + jobs, err := state.JobVersionsByID(nil, job.Namespace, job.ID) + must.NoError(t, err) + vs := make([]uint64, len(jobs)) + for i, j := range jobs { + vs[i] = j.Version + } + must.Eq(t, expect, vs) + } + + // there should be this many: JobTrackedVersions + # of tagged versions + // we tagged 3 of them, so 5 + 3 = 8 + assertVersions(t, []uint64{14, 13, 12, 11, 10, 2, 1, 0}) + + // untag version 1 - it should get deleted immediately, + // since we have more than JobTrackedVersions. + must.NoError(t, state.UnsetJobVersionTag(nextIndex(state), job.Namespace, job.ID, "v1")) + assertVersions(t, []uint64{14, 13, 12, 11, 10, 2, 0}) + + // deleting all versions should also delete tagged versions + txn := state.db.WriteTxn(nextIndex(state)) + must.NoError(t, state.deleteJobVersions(nextIndex(state), job, txn)) + must.NoError(t, txn.Commit()) + assertVersions(t, []uint64{}) +} + func TestStateStore_DeleteJob_MultipleVersions(t *testing.T) { ci.Parallel(t) From 63caddffe208f0e22b853c134b39df76f09ff480 Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Tue, 24 Sep 2024 16:16:26 -0400 Subject: [PATCH 3/5] fix tests after rebase --- nomad/core_sched_test.go | 54 ++++++++++++++++++++++++--------- nomad/state/state_store_test.go | 38 ++++++++++++++++++----- 2 files changed, 71 insertions(+), 21 deletions(-) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 3bb78856044..f77da894409 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -611,7 +611,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) { } // A job that has any of its versions tagged should not be GC-able. -func TestCoreScheduler_EvalGC_JobVersionTags(t *testing.T) { +func TestCoreScheduler_EvalGC_JobTaggedVersion(t *testing.T) { ci.Parallel(t) s1, cleanupS1 := TestServer(t, nil) @@ -622,15 +622,18 @@ func TestCoreScheduler_EvalGC_JobVersionTags(t *testing.T) { job := mock.MinJob() job.Stop = true // to be GC-able + // to be GC-able, the job needs an associated eval with a terminal Status, + // so that the job gets considered "dead" and not "pending" + // NOTE: this needs to come before UpsertJob for some Mystery Reason + // (otherwise job Status ends up as "pending" later) + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = structs.EvalStatusComplete + must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 999, []*structs.Evaluation{eval})) // upsert a couple versions of the job, so the "jobs" table has one // and the "job_version" table has two. must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job.Copy())) must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1001, nil, job.Copy())) - // to be GC-able, also need an associated eval with a terminal Status - eval := mock.Eval() - eval.JobID = job.ID - eval.Status = structs.EvalStatusComplete - must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval})) jobExists := func(t *testing.T) bool { t.Helper() @@ -652,22 +655,45 @@ func TestCoreScheduler_EvalGC_JobVersionTags(t *testing.T) { must.NoError(t, core.Process(gc)) } + applyTag := func(t *testing.T, idx, version uint64, name, desc string) { + t.Helper() + must.NoError(t, store.UpdateJobVersionTag(idx, job.Namespace, + &structs.JobApplyTagRequest{ + JobID: job.ID, + Name: name, + Tag: &structs.JobTaggedVersion{ + Name: name, + Description: desc, + }, + Version: version, + })) + } + unsetTag := func(t *testing.T, idx uint64, name string) { + t.Helper() + must.NoError(t, store.UpdateJobVersionTag(idx, job.Namespace, + &structs.JobApplyTagRequest{ + JobID: job.ID, + Name: name, + Tag: nil, // this triggers the deletion + })) + } + // tagging the latest version (latest of the 2 jobs, 0 and 1, is 1) // will tag the job in the "jobs" table, which should protect from GC - must.NoError(t, store.UpdateJobVersionTag(2000, job.Namespace, job.ID, 1, "v1", "version 1")) + applyTag(t, 2000, 1, "v1", "version 1") forceGC(t) must.True(t, jobExists(t), must.Sprint("latest job version being tagged should protect from GC")) - // untagging latest and tagging the oldest, which is only in "job_version" - // table should also protect from GC - must.NoError(t, store.UnsetJobVersionTag(3000, job.Namespace, job.ID, "v1")) - must.NoError(t, store.UpdateJobVersionTag(3001, job.Namespace, job.ID, 0, "v0", "version 0")) + // untagging latest and tagging the oldest (only one in "job_version" table) + // should also protect from GC + unsetTag(t, 3000, "v1") + applyTag(t, 3001, 0, "v0", "version 0") forceGC(t) must.True(t, jobExists(t), must.Sprint("old job version being tagged should protect from GC")) - // untagging v0 should leave no tags left, so GC should delete the job - // and all its versions - must.NoError(t, store.UnsetJobVersionTag(4000, job.Namespace, job.ID, "v0")) + //untagging v0 should leave no tags left, so GC should delete the job + //and all its versions + unsetTag(t, 4000, "v0") forceGC(t) must.False(t, jobExists(t), must.Sprint("all tags being removed should enable GC")) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 8dcd7ad5d0d..b77f9e449b1 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2901,11 +2901,11 @@ func TestStateStore_DeleteJobTxn_BatchDeletes(t *testing.T) { require.Equal(t, deletionIndex, index) } -// TestStatestore_JobVersionTags tests that job versions which are tagged +// TestStatestore_JobTaggedVersion tests that job versions which are tagged // do not count against the configured server.job_tracked_versions count, // do not get deleted when new versions are created, // and *do* get deleted immediately when its tag is removed. -func TestStatestore_JobVersionTags(t *testing.T) { +func TestStatestore_JobTaggedVersion(t *testing.T) { ci.Parallel(t) state := testStateStore(t) @@ -2926,7 +2926,15 @@ func TestStatestore_JobVersionTags(t *testing.T) { desc := fmt.Sprintf("version %d", x) // apply tag - must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, job.ID, v, name, desc)) + must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{ + JobID: job.ID, + Name: name, + Tag: &structs.JobTaggedVersion{ + Name: name, + Description: desc, + }, + Version: v, + })) // confirm got, err := state.JobVersionByTagName(nil, job.Namespace, job.ID, name) @@ -2938,15 +2946,27 @@ func TestStatestore_JobVersionTags(t *testing.T) { // take a little detour to test error conditions with the setup so far t.Run("errors", func(t *testing.T) { // job does not exist - err := state.UpdateJobVersionTag(nextIndex(state), "default", "non-existent-job", 0, "tag name", "tag desc") + err := state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{ + JobID: "non-existent-job", + Tag: &structs.JobTaggedVersion{Name: "tag name"}, + Version: 0, + }) must.ErrorContains(t, err, `job "non-existent-job" version 0 not found`) // version does not exist - err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, job.ID, 999, "tag name", "tag desc") + err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{ + JobID: job.ID, + Tag: &structs.JobTaggedVersion{Name: "tag name"}, + Version: 999, + }) must.ErrorContains(t, err, fmt.Sprintf("job %q version 999 not found", job.ID)) // tag name already exists - err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, job.ID, 3, "v0", "tag desc") + err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{ + JobID: job.ID, + Tag: &structs.JobTaggedVersion{Name: "v0"}, + Version: 3, + }) must.ErrorContains(t, err, fmt.Sprintf(`"v0" already exists on a different version of job %q`, job.ID)) }) @@ -2972,7 +2992,11 @@ func TestStatestore_JobVersionTags(t *testing.T) { // untag version 1 - it should get deleted immediately, // since we have more than JobTrackedVersions. - must.NoError(t, state.UnsetJobVersionTag(nextIndex(state), job.Namespace, job.ID, "v1")) + must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{ + JobID: job.ID, + Name: "v1", + Tag: nil, // this triggers unset + })) assertVersions(t, []uint64{14, 13, 12, 11, 10, 2, 0}) // deleting all versions should also delete tagged versions From bb1b5d227a76f7aed656a15303cc51875f87136c Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Tue, 24 Sep 2024 16:36:43 -0400 Subject: [PATCH 4/5] make comment true again --- nomad/state/state_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 01e186ef418..30f05ed9e8a 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2171,7 +2171,7 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) return nil } - // We have to delete historic jobs to make room. + // We have to delete a historic job to make room. // Find index of the highest versioned stable job stableIdx := -1 for i, j := range all { From 201b9dbfa3368f7b68f9603c785e62b972a13a0a Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Wed, 25 Sep 2024 13:19:26 -0400 Subject: [PATCH 5/5] refactor state store tests --- nomad/state/state_store_test.go | 131 +++++++++++++++++++++----------- 1 file changed, 86 insertions(+), 45 deletions(-) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index b77f9e449b1..5e44988e437 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2909,42 +2909,112 @@ func TestStatestore_JobTaggedVersion(t *testing.T) { ci.Parallel(t) state := testStateStore(t) + // tagged versions should be excluded from this limit + state.config.JobTrackedVersions = 5 job := mock.MinJob() job.Stable = true - // add job and fill the versions bucket - state.config.JobTrackedVersions = 5 - for range state.config.JobTrackedVersions { + // helpers for readability + upsertJob := func(t *testing.T) { + t.Helper() must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, nextIndex(state), nil, job.Copy())) } - // tag versions 0-2 - for x := range 3 { - v := uint64(x) - name := fmt.Sprintf("v%d", x) - desc := fmt.Sprintf("version %d", x) - - // apply tag - must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{ + applyTag := func(t *testing.T, version uint64) { + t.Helper() + name := fmt.Sprintf("v%d", version) + desc := fmt.Sprintf("version %d", version) + req := &structs.JobApplyTagRequest{ JobID: job.ID, Name: name, Tag: &structs.JobTaggedVersion{ Name: name, Description: desc, }, - Version: v, - })) + Version: version, + } + must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, req)) // confirm got, err := state.JobVersionByTagName(nil, job.Namespace, job.ID, name) must.NoError(t, err) + must.Eq(t, version, got.Version) must.Eq(t, name, got.TaggedVersion.Name) must.Eq(t, desc, got.TaggedVersion.Description) } + unsetTag := func(t *testing.T, name string) { + t.Helper() + req := &structs.JobApplyTagRequest{ + JobID: job.ID, + Name: name, + Tag: nil, // this triggers unset + } + must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, req)) + } + + assertVersions := func(t *testing.T, expect []uint64) { + t.Helper() + jobs, err := state.JobVersionsByID(nil, job.Namespace, job.ID) + must.NoError(t, err) + vs := make([]uint64, len(jobs)) + for i, j := range jobs { + vs[i] = j.Version + } + must.Eq(t, expect, vs) + } + + // we want to end up with JobTrackedVersions (5) versions, + // 0-2 tagged and 3-4 untagged, but also interleave the tagging + // to be somewhat true to normal behavior in reality. + { + // upsert 3 jobs + for range 3 { + upsertJob(t) + } + assertVersions(t, []uint64{2, 1, 0}) + + // tag 2 of them + applyTag(t, 0) + applyTag(t, 1) + // nothing should change + assertVersions(t, []uint64{2, 1, 0}) + + // add 2 more, up to JobTrackedVersions (5) + upsertJob(t) + upsertJob(t) + assertVersions(t, []uint64{4, 3, 2, 1, 0}) - // take a little detour to test error conditions with the setup so far - t.Run("errors", func(t *testing.T) { + // tag one more + applyTag(t, 2) + // again nothing should change + assertVersions(t, []uint64{4, 3, 2, 1, 0}) + } + + // removing a tag at this point should leave the version in place + { + unsetTag(t, "v2") + assertVersions(t, []uint64{4, 3, 2, 1, 0}) + } + + // adding more versions should replace 2-4, + // and leave 0-1 in place because they are tagged + { + for range 10 { + upsertJob(t) + } + assertVersions(t, []uint64{14, 13, 12, 11, 10, 1, 0}) + } + + // untagging version 1 now should delete it immediately, + // since we now have more than JobTrackedVersions + { + unsetTag(t, "v1") + assertVersions(t, []uint64{14, 13, 12, 11, 10, 0}) + } + + // test some error conditions + { // job does not exist err := state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{ JobID: "non-existent-job", @@ -2965,40 +3035,11 @@ func TestStatestore_JobTaggedVersion(t *testing.T) { err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{ JobID: job.ID, Tag: &structs.JobTaggedVersion{Name: "v0"}, - Version: 3, + Version: 10, }) must.ErrorContains(t, err, fmt.Sprintf(`"v0" already exists on a different version of job %q`, job.ID)) - }) - - // make some more jobs; they should replace 3-4 but leave 0-2 alone - for range 10 { - must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, nextIndex(state), nil, job.Copy())) } - assertVersions := func(t *testing.T, expect []uint64) { - t.Helper() - jobs, err := state.JobVersionsByID(nil, job.Namespace, job.ID) - must.NoError(t, err) - vs := make([]uint64, len(jobs)) - for i, j := range jobs { - vs[i] = j.Version - } - must.Eq(t, expect, vs) - } - - // there should be this many: JobTrackedVersions + # of tagged versions - // we tagged 3 of them, so 5 + 3 = 8 - assertVersions(t, []uint64{14, 13, 12, 11, 10, 2, 1, 0}) - - // untag version 1 - it should get deleted immediately, - // since we have more than JobTrackedVersions. - must.NoError(t, state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{ - JobID: job.ID, - Name: "v1", - Tag: nil, // this triggers unset - })) - assertVersions(t, []uint64{14, 13, 12, 11, 10, 2, 0}) - // deleting all versions should also delete tagged versions txn := state.db.WriteTxn(nextIndex(state)) must.NoError(t, state.deleteJobVersions(nextIndex(state), job, txn))