Skip to content

Commit

Permalink
backport of commit f7847c6 (#24355)
Browse files Browse the repository at this point in the history
Co-authored-by: Piotr Kazmierczak <[email protected]>
  • Loading branch information
hc-github-team-nomad-core and pkazmierczak authored Nov 1, 2024
1 parent 0126bb8 commit 6590572
Show file tree
Hide file tree
Showing 27 changed files with 695 additions and 1,083 deletions.
3 changes: 3 additions & 0 deletions .changelog/24112.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
state: Fixed setting GC threshold to more than 72hrs being ignored
```
15 changes: 15 additions & 0 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,11 @@ type CSIVolume struct {
CreateIndex uint64
ModifyIndex uint64

// CreateTime stored as UnixNano
CreateTime int64
// ModifyTime stored as UnixNano
ModifyTime int64

// ExtraKeysHCL is used by the hcl parser to report unexpected keys
ExtraKeysHCL []string `hcl1:",unusedKeys" json:"-"`
}
Expand Down Expand Up @@ -401,6 +406,11 @@ type CSIVolumeListStub struct {

CreateIndex uint64
ModifyIndex uint64

// CreateTime stored as UnixNano
CreateTime int64
// ModifyTime stored as UnixNano
ModifyTime int64
}

type CSIVolumeListExternalResponse struct {
Expand Down Expand Up @@ -543,6 +553,11 @@ type CSIPlugin struct {
NodesExpected int
CreateIndex uint64
ModifyIndex uint64

// CreateTime stored as UnixNano
CreateTime int64
// ModifyTime stored as UnixNano
ModifyTime int64
}

type CSIPluginListStub struct {
Expand Down
7 changes: 7 additions & 0 deletions api/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ type Deployment struct {

CreateIndex uint64
ModifyIndex uint64

// Creation and modification times, stored as UnixNano
CreateTime int64
ModifyTime int64
}

// DeploymentState tracks the state of a deployment for a given task group.
Expand Down Expand Up @@ -261,6 +265,9 @@ type DeploymentPromoteRequest struct {
// Groups is used to set the promotion status per task group
Groups []string

// PromotedAt is the timestamp stored as Unix nano
PromotedAt int64

WriteRequest
}

Expand Down
63 changes: 27 additions & 36 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ type BlockedEvals struct {
// blocked eval exists for each job. The value is the blocked evaluation ID.
jobs map[structs.NamespacedID]string

// unblockIndexes maps computed node classes or quota name to the index in
// which they were unblocked. This is used to check if an evaluation could
// have been unblocked between the time they were in the scheduler and the
// time they are being blocked.
unblockIndexes map[string]uint64
// unblockIndexes maps computed node classes or quota name to the index and
// time at which they were unblocked. This is used to check if an
// evaluation could have been unblocked between the time they were in the
// scheduler and the time they are being blocked.
unblockIndexes map[string]unblockEvent

// duplicates is the set of evaluations for jobs that had pre-existing
// blocked evaluations. These should be marked as cancelled since only one
Expand All @@ -76,14 +76,16 @@ type BlockedEvals struct {
// duplicates.
duplicateCh chan struct{}

// timetable is used to correlate indexes with their insertion time. This
// allows us to prune based on time.
timetable *TimeTable

// stopCh is used to stop any created goroutines.
stopCh chan struct{}
}

// unblockEvent keeps a record of the index and time of the unblock
type unblockEvent struct {
index uint64
timestamp time.Time
}

// capacityUpdate stores unblock data.
type capacityUpdate struct {
computedClass string
Expand All @@ -107,7 +109,7 @@ func NewBlockedEvals(evalBroker *EvalBroker, logger hclog.Logger) *BlockedEvals
escaped: make(map[string]wrappedEval),
system: newSystemEvals(),
jobs: make(map[structs.NamespacedID]string),
unblockIndexes: make(map[string]uint64),
unblockIndexes: make(map[string]unblockEvent),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
Expand Down Expand Up @@ -143,12 +145,6 @@ func (b *BlockedEvals) SetEnabled(enabled bool) {
}
}

func (b *BlockedEvals) SetTimetable(timetable *TimeTable) {
b.l.Lock()
b.timetable = timetable
b.l.Unlock()
}

// Block tracks the passed evaluation and enqueues it into the eval broker when
// a suitable node calls unblock.
func (b *BlockedEvals) Block(eval *structs.Evaluation) {
Expand Down Expand Up @@ -303,10 +299,10 @@ func latestEvalIndex(eval *structs.Evaluation) uint64 {
// the lock held.
func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
var max uint64 = 0
for id, index := range b.unblockIndexes {
for id, u := range b.unblockIndexes {
// Calculate the max unblock index
if max < index {
max = index
if max < u.index {
max = u.index
}

// The evaluation is blocked because it has hit a quota limit not class
Expand All @@ -315,7 +311,7 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
if eval.QuotaLimitReached != id {
// Not a match
continue
} else if eval.SnapshotIndex < index {
} else if eval.SnapshotIndex < u.index {
// The evaluation was processed before the quota specification was
// updated, so unblock the evaluation.
return true
Expand All @@ -326,7 +322,7 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
}

elig, ok := eval.ClassEligibility[id]
if !ok && eval.SnapshotIndex < index {
if !ok && eval.SnapshotIndex < u.index {
// The evaluation was processed and did not encounter this class
// because it was added after it was processed. Thus for correctness
// we need to unblock it.
Expand All @@ -335,7 +331,7 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {

// The evaluation could use the computed node class and the eval was
// processed before the last unblock.
if elig && eval.SnapshotIndex < index {
if elig && eval.SnapshotIndex < u.index {
return true
}
}
Expand Down Expand Up @@ -415,7 +411,7 @@ func (b *BlockedEvals) Unblock(computedClass string, index uint64) {
// Store the index in which the unblock happened. We use this on subsequent
// block calls in case the evaluation was in the scheduler when a trigger
// occurred.
b.unblockIndexes[computedClass] = index
b.unblockIndexes[computedClass] = unblockEvent{index, time.Now()}

// Capture chan in lock as Flush overwrites it
ch := b.capacityChangeCh
Expand Down Expand Up @@ -450,7 +446,7 @@ func (b *BlockedEvals) UnblockQuota(quota string, index uint64) {
// Store the index in which the unblock happened. We use this on subsequent
// block calls in case the evaluation was in the scheduler when a trigger
// occurred.
b.unblockIndexes[quota] = index
b.unblockIndexes[quota] = unblockEvent{index, time.Now()}
ch := b.capacityChangeCh
done := b.stopCh
b.l.Unlock()
Expand Down Expand Up @@ -479,10 +475,11 @@ func (b *BlockedEvals) UnblockClassAndQuota(class, quota string, index uint64) {
// Store the index in which the unblock happened. We use this on subsequent
// block calls in case the evaluation was in the scheduler when a trigger
// occurred.
now := time.Now()
if quota != "" {
b.unblockIndexes[quota] = index
b.unblockIndexes[quota] = unblockEvent{index, now}
}
b.unblockIndexes[class] = index
b.unblockIndexes[class] = unblockEvent{index, now}

// Capture chan inside the lock to prevent a race with it getting reset
// in Flush.
Expand Down Expand Up @@ -699,8 +696,7 @@ func (b *BlockedEvals) Flush() {
b.captured = make(map[string]wrappedEval)
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[structs.NamespacedID]string)
b.unblockIndexes = make(map[string]uint64)
b.timetable = nil
b.unblockIndexes = make(map[string]unblockEvent)
b.duplicates = nil
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
b.stopCh = make(chan struct{})
Expand Down Expand Up @@ -781,18 +777,13 @@ func (b *BlockedEvals) prune(stopCh <-chan struct{}) {
}

// pruneUnblockIndexes is used to prune any tracked entry that is excessively
// old. This protects againsts unbounded growth of the map.
// old. This protects against unbounded growth of the map.
func (b *BlockedEvals) pruneUnblockIndexes(cutoff time.Time) {
b.l.Lock()
defer b.l.Unlock()

if b.timetable == nil {
return
}

oldThreshold := b.timetable.NearestIndex(cutoff)
for key, index := range b.unblockIndexes {
if index < oldThreshold {
for key, u := range b.unblockIndexes {
if u.timestamp.Before(cutoff) {
delete(b.unblockIndexes, key)
}
}
Expand Down
Loading

0 comments on commit 6590572

Please sign in to comment.