Skip to content

Commit

Permalink
differently exclude tagged job versions from being pruned (#24102)
Browse files Browse the repository at this point in the history
* test bug: tagged versions count against limit
  specifically tagged versions that are not the oldest

* fix: use original logic, sans tagged versions
  • Loading branch information
gulducat authored Oct 2, 2024
1 parent 3ecf0d2 commit 6b9bcb8
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 35 deletions.
32 changes: 13 additions & 19 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2160,8 +2160,9 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn)
return fmt.Errorf("index update failed: %v", err)
}

// Get all the historic jobs for this ID
all, err := s.jobVersionByID(txn, nil, job.Namespace, job.ID)
// Get all the historic jobs for this ID, except those with a VersionTag,
// as they should always be kept. They are in Version order, high to low.
all, err := s.jobVersionByID(txn, nil, job.Namespace, job.ID, false)
if err != nil {
return fmt.Errorf("failed to look up job versions for %q: %v", job.ID, err)
}
Expand All @@ -2188,21 +2189,10 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn)
all[max-1], all[max] = all[max], all[max-1]
}

// Find the oldest non-tagged version to delete
deleteIdx := -1
for i := len(all) - 1; i >= max; i-- {
if all[i].VersionTag == nil {
deleteIdx = i
break
}
}

// If we found a non-tagged version to delete, delete it
if deleteIdx != -1 {
d := all[deleteIdx]
if err := txn.Delete("job_version", d); err != nil {
return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version)
}
// Delete the oldest one
d := all[max]
if err := txn.Delete("job_version", d); err != nil {
return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version)
}

return nil
Expand Down Expand Up @@ -2314,7 +2304,7 @@ func (s *StateStore) jobsByIDPrefixAllNamespaces(ws memdb.WatchSet, prefix strin
func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) {
txn := s.db.ReadTxn()

return s.jobVersionByID(txn, ws, namespace, id)
return s.jobVersionByID(txn, ws, namespace, id, true)
}

// JobVersionByTagName returns a Job if it has a Tag with the passed name
Expand All @@ -2335,7 +2325,7 @@ func (s *StateStore) JobVersionByTagName(ws memdb.WatchSet, namespace, id string
// jobVersionByID is the underlying implementation for retrieving all tracked
// versions of a job and is called under an existing transaction. A watch set
// can optionally be passed in to add the job histories to the watch set.
func (s *StateStore) jobVersionByID(txn *txn, ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) {
func (s *StateStore) jobVersionByID(txn *txn, ws memdb.WatchSet, namespace, id string, includeTagged bool) ([]*structs.Job, error) {
// Get all the historic jobs for this ID
iter, err := txn.Get("job_version", "id_prefix", namespace, id)
if err != nil {
Expand All @@ -2357,6 +2347,10 @@ func (s *StateStore) jobVersionByID(txn *txn, ws memdb.WatchSet, namespace, id s
continue
}

if !includeTagged && j.VersionTag != nil {
continue
}

all = append(all, j)
}

Expand Down
34 changes: 18 additions & 16 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2975,42 +2975,44 @@ func TestStatestore_JobVersionTag(t *testing.T) {
assertVersions(t, []uint64{2, 1, 0})

// tag 2 of them
applyTag(t, 0)
applyTag(t, 1)
applyTag(t, 2)
// nothing should change
assertVersions(t, []uint64{2, 1, 0})

// add 2 more, up to JobTrackedVersions (5)
upsertJob(t)
upsertJob(t)
assertVersions(t, []uint64{4, 3, 2, 1, 0})
// add 3 more, up to JobTrackedVersions (5) + 1 (6)
for range 3 {
upsertJob(t)
}
assertVersions(t, []uint64{5, 4, 3, 2, 1, 0})

// tag one more
applyTag(t, 2)
applyTag(t, 3)
// again nothing should change
assertVersions(t, []uint64{4, 3, 2, 1, 0})
assertVersions(t, []uint64{5, 4, 3, 2, 1, 0})
}

// removing a tag at this point should leave the version in place
// removing a tag at this point should leave the version in place,
// because we still have room within JobTrackedVersions
{
unsetTag(t, "v2")
assertVersions(t, []uint64{4, 3, 2, 1, 0})
unsetTag(t, "v3")
assertVersions(t, []uint64{5, 4, 3, 2, 1, 0})
}

// adding more versions should replace 2-4,
// and leave 0-1 in place because they are tagged
// adding more versions should replace 0,3-5
// and leave 1-2 in place because they are tagged
{
for range 10 {
upsertJob(t)
}
assertVersions(t, []uint64{14, 13, 12, 11, 10, 1, 0})
assertVersions(t, []uint64{15, 14, 13, 12, 11, 2, 1})
}

// untagging version 1 now should delete it immediately,
// since we now have more than JobTrackedVersions
{
unsetTag(t, "v1")
assertVersions(t, []uint64{14, 13, 12, 11, 10, 0})
assertVersions(t, []uint64{15, 14, 13, 12, 11, 2})
}

// test some error conditions
Expand All @@ -3034,10 +3036,10 @@ func TestStatestore_JobVersionTag(t *testing.T) {
// tag name already exists
err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{
JobID: job.ID,
Tag: &structs.JobVersionTag{Name: "v0"},
Tag: &structs.JobVersionTag{Name: "v2"},
Version: 10,
})
must.ErrorContains(t, err, fmt.Sprintf(`"v0" already exists on a different version of job %q`, job.ID))
must.ErrorContains(t, err, fmt.Sprintf(`"v2" already exists on a different version of job %q`, job.ID))
}

// deleting all versions should also delete tagged versions
Expand Down

0 comments on commit 6b9bcb8

Please sign in to comment.