Skip to content

Commit

Permalink
Add job watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Dec 5, 2024
1 parent b167812 commit 002c360
Show file tree
Hide file tree
Showing 11 changed files with 405 additions and 2 deletions.
6 changes: 6 additions & 0 deletions .buildkite/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ rules:
- pods/eviction
verbs:
- create
- apiGroups:
- ""
resources:
- events
verbs:
- list
---
apiVersion: v1
kind: ServiceAccount
Expand Down
7 changes: 7 additions & 0 deletions charts/agent-stack-k8s/templates/rbac.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rules:
- watch
- create
- update
- delete
- apiGroups:
- ""
resources:
Expand All @@ -33,6 +34,12 @@ rules:
- pods/eviction
verbs:
- create
- apiGroups:
- ""
resources:
- events
verbs:
- list
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
Expand Down
5 changes: 5 additions & 0 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func AddConfigFlags(cmd *cobra.Command) {
config.DefaultJobCancelCheckerPollInterval,
"Controls the interval between job state queries while a pod is still Pending",
)
cmd.Flags().Duration(
"empty-job-grace-period",
config.DefaultEmptyJobGracePeriod,
"Duration after starting a Kubernetes job that the controller will wait before considering failing the job due to a missing pod (e.g. when the podSpec specifies a missing service account)",
)
cmd.Flags().Bool(
"prohibit-kubernetes-plugin",
false,
Expand Down
1 change: 1 addition & 0 deletions cmd/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestReadAndParseConfig(t *testing.T) {
JobTTL: 300 * time.Second,
ImagePullBackOffGracePeriod: 60 * time.Second,
JobCancelCheckerPollInterval: 10 * time.Second,
EmptyJobGracePeriod: 50 * time.Second,
PollInterval: 5 * time.Second,
StaleJobDataTimeout: 10 * time.Second,
JobCreationConcurrency: 5,
Expand Down
1 change: 1 addition & 0 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ image: my.registry.dev/buildkite-agent:latest
job-ttl: 5m
image-pull-backoff-grace-period: 60s
job-cancel-checker-poll-interval: 10s
empty-job-grace-period: 50s
poll-interval: 5s
stale-job-data-timeout: 10s
job-creation-concurrency: 5
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
DefaultNamespace = "default"
DefaultImagePullBackOffGracePeriod = 30 * time.Second
DefaultJobCancelCheckerPollInterval = 5 * time.Second
DefaultEmptyJobGracePeriod = 30 * time.Second
)

var DefaultAgentImage = "ghcr.io/buildkite/agent:" + version.Version()
Expand Down Expand Up @@ -49,6 +50,7 @@ type Config struct {
PodSpecPatch *corev1.PodSpec `json:"pod-spec-patch" validate:"omitempty"`
ImagePullBackOffGracePeriod time.Duration `json:"image-pull-backoff-grace-period" validate:"omitempty"`
JobCancelCheckerPollInterval time.Duration `json:"job-cancel-checker-poll-interval" validate:"omitempty"`
EmptyJobGracePeriod time.Duration `json:"empty-job-grace-period" validate:"omitempty"`

// WorkspaceVolume allows supplying a volume for /workspace. By default
// an EmptyDir volume is created for it.
Expand Down
13 changes: 13 additions & 0 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,20 @@ func Run(
logger.Fatal("failed to register completions informer", zap.Error(err))
}

// JobWatcher watches for jobs in bad conditions to clean up:
// * Jobs that fail without ever creating a pod
// * Jobs that stall forever without ever creating a pod
jobWatcher := scheduler.NewJobWatcher(
logger.Named("jobWatcher"),
k8sClient,
cfg,
)
if err := jobWatcher.RegisterInformer(ctx, informerFactory); err != nil {
logger.Fatal("failed to register jobWatcher informer", zap.Error(err))
}

// PodWatcher watches for other conditions to clean up pods:
// * Pods where an init container failed for any reason
// * Pods where a container is in ImagePullBackOff for too long
// * Pods that are still pending, but the Buildkite job has been cancelled
podWatcher := scheduler.NewPodWatcher(
Expand Down
271 changes: 271 additions & 0 deletions internal/controller/scheduler/job_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
package scheduler

import (
"context"
"fmt"
"sync"
"time"

"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/model"

"github.com/jedib0t/go-pretty/v6/table"
"go.uber.org/zap"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/duration"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/ptr"
)

// jobWatcher watches k8s jobs for failure to start a pod. The corresponding
// Buildkite job is failed with an error message if this happens. Also, if such
// a k8s job doesn't enter a terminal state on its own, jobWatcher sets a
// deadline so that it is cleaned up.
type jobWatcher struct {
// Logs go here
logger *zap.Logger

k8s kubernetes.Interface
cfg *config.Config

// Tracks stalling jobs (jobs that have yet to create pods).
stallingJobsMu sync.Mutex
stallingJobs map[*batchv1.Job]struct{}

// This is the context passed to RegisterInformer.
// It's being stored here (grrrr!) because the k8s ResourceEventHandler
// interface doesn't have context args. (Working around an interface in a
// library outside of our control is a carve-out from the usual rule.)
// The context is needed to ensure goroutines are cleaned up.
resourceEventHandlerCtx context.Context
}

// NewJobWatcher creates a JobWatcher.
func NewJobWatcher(logger *zap.Logger, k8sClient kubernetes.Interface, cfg *config.Config) *jobWatcher {
w := &jobWatcher{
logger: logger,
k8s: k8sClient,
cfg: cfg,
stallingJobs: make(map[*batchv1.Job]struct{}),
}
jobsStallingGaugeFunc = func() int {
w.stallingJobsMu.Lock()
defer w.stallingJobsMu.Unlock()
return len(w.stallingJobs)
}
return w
}

// RegisterInformer registers the limiter to listen for Kubernetes job events.
func (w *jobWatcher) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error {
informer := factory.Batch().V1().Jobs()
jobInformer := informer.Informer()
if _, err := jobInformer.AddEventHandler(w); err != nil {
return err
}
w.resourceEventHandlerCtx = ctx // See field comment
go factory.Start(ctx.Done())
// No need to wait for cache sync here. These are cleanup tasks, not
// barriers to prevent creating new jobs.
go w.stalledJobChecker(ctx)
return nil
}

// OnAdd is called by k8s to inform us a resource is added.
func (w *jobWatcher) OnAdd(obj any, _ bool) {
jobWatcherOnAddEventCounter.Inc()
kjob, _ := obj.(*batchv1.Job)
if kjob == nil {
return
}
// Same logic whether we are considering pre-existing jobs, or new jobs.
w.runChecks(w.resourceEventHandlerCtx, kjob)
}

// OnUpdate is called by k8s to inform us a resource is updated.
func (w *jobWatcher) OnUpdate(_, curr any) {
jobWatcherOnUpdateEventCounter.Inc()
kjob, _ := curr.(*batchv1.Job)
if kjob == nil {
return
}
// Same logic whether or not anything relevant changed about the job.
w.runChecks(w.resourceEventHandlerCtx, kjob)
}

// OnDelete is called by k8s to inform us a resource is deleted.
func (w *jobWatcher) OnDelete(prev any) {
jobWatcherOnDeleteEventCounter.Inc()
kjob, _ := prev.(*batchv1.Job)
if kjob == nil {
return
}
w.removeFromStalling(kjob)

// TODO: consider catching jobs that were deleted manually?
}

func (w *jobWatcher) runChecks(ctx context.Context, kjob *batchv1.Job) {
log := loggerForObject(w.logger, kjob)

if model.JobFinished(kjob) {
w.removeFromStalling(kjob)
w.checkFinishedWithoutPod(ctx, log, kjob)
} else {
w.checkStalledWithoutPod(log, kjob)
}
}

func (w *jobWatcher) checkFinishedWithoutPod(ctx context.Context, log *zap.Logger, kjob *batchv1.Job) {
log.Debug("Checking job for finishing without a pod")

// If the job is finished, there should be one finished pod.
if kjob.Status.Failed+kjob.Status.Succeeded > 0 {
// All's well with the world.
return
}

jobWatcherFinishedWithoutPodCounter.Inc()

// Because no pod has been created, the agent hasn't started.
// We can acquire the Buildkite job and fail it ourselves.
log.Info("The Kubernetes job ended without starting a pod. Failing the corresponding Buildkite job")
w.fetchEventsAndFailJob(ctx, log, kjob, "The Kubernetes job ended without starting a pod.\n")
}

func (w *jobWatcher) checkStalledWithoutPod(log *zap.Logger, kjob *batchv1.Job) {
log.Debug("Checking job for stalling without a pod")

// If the job is not finished and there is no pod, it should start one
// before too long. Otherwise the job is stalled.
pods := kjob.Status.Active + kjob.Status.Failed + kjob.Status.Succeeded
// Ready and Terminating are subsets of Active (I think)
if utp := kjob.Status.UncountedTerminatedPods; utp != nil {
pods += int32(len(utp.Succeeded))
pods += int32(len(utp.Failed))
}
if pods > 0 {
// All's well with the world.
w.removeFromStalling(kjob)
return
}

if kjob.Status.StartTime == nil {
// the _job_ hasn't even started?
return
}

w.addToStalling(kjob)
}

func (w *jobWatcher) fetchEventsAndFailJob(ctx context.Context, log *zap.Logger, kjob *batchv1.Job, message string) {
// List the events for the job, which might contain useful info for
// diagnosing the problem.
events := w.k8s.CoreV1().Events(w.cfg.Namespace)
evlist, err := events.List(ctx, metav1.ListOptions{
FieldSelector: fields.AndSelectors(
fields.OneTermEqualSelector("involvedObject.kind", "Job"),
fields.OneTermEqualSelector("involvedObject.name", kjob.Name),
).String(),
})
if err != nil {
log.Error("Couldn't get events for job", zap.Error(err))
message = fmt.Sprintf("%s\nCouldn't get events for job %s: %v", message, kjob.Name, err)
}
if evlist != nil {
message += "\n" + w.formatEvents(evlist)
}

if err := acquireAndFailForObject(ctx, log, w.k8s, w.cfg, kjob, message); err != nil {
// Maybe the job was cancelled in the meantime?
log.Error("Could not fail Buildkite job", zap.Error(err))
jobWatcherBuildkiteJobFailErrorsCounter.Inc()
return
}
jobWatcherBuildkiteJobFailsCounter.Inc()
}

func (w *jobWatcher) formatEvents(evlist *corev1.EventList) string {
if len(evlist.Items) == 0 {
return "Events: none"
}

tw := table.NewWriter()
tw.SetStyle(table.StyleColoredDark)
tw.AppendHeader(table.Row{"LAST EVENT", "REPEATED", "TYPE", "REASON", "MESSAGE"})
tw.AppendSeparator()
for _, event := range evlist.Items {
if event.Series == nil {
tw.AppendRow(table.Row{event.EventTime.Time, "-", event.Type, event.Reason, event.Message})
continue
}
lastTime := event.Series.LastObservedTime.Time
firstToLast := duration.HumanDuration(lastTime.Sub(event.EventTime.Time))
countMsg := fmt.Sprintf("x%d over %s", event.Series.Count, firstToLast)
tw.AppendRow(table.Row{lastTime, countMsg, event.Type, event.Reason, event.Message})
}
return tw.Render()
}

func (w *jobWatcher) addToStalling(kjob *batchv1.Job) {
w.stallingJobsMu.Lock()
defer w.stallingJobsMu.Unlock()
w.stallingJobs[kjob] = struct{}{}
}

func (w *jobWatcher) removeFromStalling(kjob *batchv1.Job) {
w.stallingJobsMu.Lock()
defer w.stallingJobsMu.Unlock()
delete(w.stallingJobs, kjob)
}

func (w *jobWatcher) stalledJobChecker(ctx context.Context) {
ticker := time.Tick(time.Second)
for {
select {
case <-ctx.Done():
return

case <-ticker:
// continue below
}

// Gather stalled jobs
var stalled []*batchv1.Job
w.stallingJobsMu.Lock()
for kjob := range w.stallingJobs {
if time.Since(kjob.Status.StartTime.Time) >= w.cfg.EmptyJobGracePeriod {
stalled = append(stalled, kjob)
delete(w.stallingJobs, kjob)
}
}
w.stallingJobsMu.Unlock()

jobWatcherStalledWithoutPodCounter.Add(float64(len(stalled)))

// Fail BK jobs and delete k8s jobs.
jobs := w.k8s.BatchV1().Jobs(w.cfg.Namespace)
for _, kjob := range stalled {
// Fail the BK job. This lists events for the job, so delete it afterwards.
log := loggerForObject(w.logger, kjob)
stallDuration := duration.HumanDuration(time.Since(kjob.Status.StartTime.Time))
message := fmt.Sprintf("The Kubernetes job spent %s without starting a pod.\n", stallDuration)
w.fetchEventsAndFailJob(ctx, log, kjob, message)

if err := jobs.Delete(ctx, kjob.Name, metav1.DeleteOptions{
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground),
}); err != nil {
jobWatcherJobCleanupErrorsCounter.WithLabelValues(string(kerrors.ReasonForError(err))).Inc()
w.logger.Error("failed to delete stalled job", zap.Error(err))
continue
}
jobWatcherJobCleanupsCounter.Inc()
}
}
}
Loading

0 comments on commit 002c360

Please sign in to comment.