Skip to content

Commit

Permalink
Add isIgnored helper method and gauge
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Dec 5, 2024
1 parent def6d4a commit 98e9a2d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 13 deletions.
7 changes: 7 additions & 0 deletions internal/controller/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,20 @@ var (
var (
// Overridden to return len(jobCancelCheckers) by podWatcher.
jobCancelCheckerGaugeFunc = func() int { return 0 }
ignoredJobsGaugeFunc = func() int { return 0 }

_ = promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: promNamespace,
Subsystem: "pod_watcher",
Name: "num_job_cancel_checkers",
Help: "Current count of job cancellation checkers",
}, func() float64 { return float64(jobCancelCheckerGaugeFunc()) })
_ = promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: promNamespace,
Subsystem: "pod_watcher",
Name: "num_ignored_jobs",
Help: "Current count of jobs ignored for podWatcher checks",
}, func() float64 { return float64(ignoredJobsGaugeFunc()) })

podWatcherOnAddEventCounter = promauto.NewCounter(prometheus.CounterOpts{
Namespace: promNamespace,
Expand Down
40 changes: 27 additions & 13 deletions internal/controller/scheduler/pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type podWatcher struct {

// Jobs that we've failed, cancelled, or were found to be in a terminal
// state.
ignoreJobsMu sync.RWMutex
ignoreJobs map[uuid.UUID]struct{}
ignoredJobsMu sync.RWMutex
ignoredJob map[uuid.UUID]struct{}

// The job cancel checkers query the job state every so often.
jobCancelCheckerInterval time.Duration
Expand Down Expand Up @@ -95,10 +95,15 @@ func NewPodWatcher(logger *zap.Logger, k8s kubernetes.Interface, cfg *config.Con
cfg: cfg,
imagePullBackOffGracePeriod: imagePullBackOffGracePeriod,
jobCancelCheckerInterval: jobCancelCheckerInterval,
ignoreJobs: make(map[uuid.UUID]struct{}),
ignoredJob: make(map[uuid.UUID]struct{}),
cancelCheckerChs: make(map[uuid.UUID]*onceChan),
agentTags: agentTags,
}
ignoredJobsGaugeFunc = func() int {
pw.ignoredJobsMu.RLock()
defer pw.ignoredJobsMu.RUnlock()
return len(pw.ignoredJob)
}
jobCancelCheckerGaugeFunc = func() int {
pw.cancelCheckerChsMu.Lock()
defer pw.cancelCheckerChsMu.Unlock()
Expand Down Expand Up @@ -171,6 +176,11 @@ func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) {
return
}

if w.isIgnored(jobUUID) {
log.Debug("Job is currently ignored for podWatcher checks")
return
}

// Check for an init container that failed for any reason.
// (Note: users can define their own init containers through podSpec.)
w.failOnInitContainerFailure(ctx, log, pod, jobUUID)
Expand Down Expand Up @@ -208,10 +218,7 @@ func (w *podWatcher) jobUUIDAndLogger(pod *corev1.Pod) (uuid.UUID, *zap.Logger,
return uuid.UUID{}, log, errors.New("pod labels do not match agent tags for this controller")
}

w.ignoreJobsMu.RLock()
defer w.ignoreJobsMu.RUnlock()

if _, ignore := w.ignoreJobs[jobUUID]; ignore {
if w.isIgnored(jobUUID) {
log.Debug("Job already failed, canceled, or wasn't in a failable/cancellable state")
return jobUUID, log, errors.New("job ignored")
}
Expand Down Expand Up @@ -551,15 +558,22 @@ func (w *podWatcher) jobCancelChecker(ctx context.Context, stopCh <-chan struct{
}

func (w *podWatcher) ignoreJob(jobUUID uuid.UUID) {
w.ignoreJobsMu.Lock()
defer w.ignoreJobsMu.Unlock()
w.ignoreJobs[jobUUID] = struct{}{}
w.ignoredJobsMu.Lock()
defer w.ignoredJobsMu.Unlock()
w.ignoredJob[jobUUID] = struct{}{}
}

func (w *podWatcher) unignoreJob(jobUUID uuid.UUID) {
w.ignoreJobsMu.Lock()
defer w.ignoreJobsMu.Unlock()
delete(w.ignoreJobs, jobUUID)
w.ignoredJobsMu.Lock()
defer w.ignoredJobsMu.Unlock()
delete(w.ignoredJob, jobUUID)
}

func (w *podWatcher) isIgnored(jobUUID uuid.UUID) bool {
w.ignoredJobsMu.RLock()
defer w.ignoredJobsMu.RUnlock()
_, ignore := w.ignoredJob[jobUUID]
return ignore
}

// onceChan stores a channel and a [sync.Once] to be used for closing the
Expand Down

0 comments on commit 98e9a2d

Please sign in to comment.