From c6211c3d20754f14a40e8605f09da314622d7291 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Mon, 2 Dec 2024 15:18:07 +1100 Subject: [PATCH] Add job watcher --- .buildkite/rbac.yaml | 6 + .../agent-stack-k8s/templates/rbac.yaml.tpl | 7 + cmd/controller/controller.go | 5 + cmd/controller/controller_test.go | 1 + examples/config.yaml | 1 + internal/controller/config/config.go | 2 + internal/controller/controller.go | 13 + internal/controller/scheduler/completions.go | 33 +-- internal/controller/scheduler/fail_job.go | 39 ++- internal/controller/scheduler/job_watcher.go | 258 ++++++++++++++++++ internal/controller/scheduler/metrics.go | 51 +++- internal/controller/scheduler/once_chan.go | 17 ++ internal/controller/scheduler/pod_watcher.go | 111 +++----- internal/controller/scheduler/scheduler.go | 3 +- internal/controller/scheduler/set_deadline.go | 26 ++ .../controller/scheduler/uuid_and_logger.go | 42 +++ .../fixtures/missing-service-account.yaml | 12 + internal/integration/integration_test.go | 15 + 18 files changed, 541 insertions(+), 101 deletions(-) create mode 100644 internal/controller/scheduler/job_watcher.go create mode 100644 internal/controller/scheduler/once_chan.go create mode 100644 internal/controller/scheduler/set_deadline.go create mode 100644 internal/controller/scheduler/uuid_and_logger.go create mode 100644 internal/integration/fixtures/missing-service-account.yaml diff --git a/.buildkite/rbac.yaml b/.buildkite/rbac.yaml index 3fc6cf70..c97c0582 100644 --- a/.buildkite/rbac.yaml +++ b/.buildkite/rbac.yaml @@ -34,6 +34,12 @@ rules: - pods/eviction verbs: - create + - apiGroups: + - events.k8s.io + resources: + - events + verbs: + - list --- apiVersion: v1 kind: ServiceAccount diff --git a/charts/agent-stack-k8s/templates/rbac.yaml.tpl b/charts/agent-stack-k8s/templates/rbac.yaml.tpl index e50196b1..ad744361 100644 --- a/charts/agent-stack-k8s/templates/rbac.yaml.tpl +++ b/charts/agent-stack-k8s/templates/rbac.yaml.tpl @@ -13,6 +13,7 @@ rules: - watch - create - update + - delete - apiGroups: - "" resources: @@ -33,6 +34,12 @@ rules: - pods/eviction verbs: - create + - apiGroups: + - events.k8s.io + resources: + - events + verbs: + - list --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index b53b62e5..5b5b8646 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -100,6 +100,11 @@ func AddConfigFlags(cmd *cobra.Command) { config.DefaultJobCancelCheckerPollInterval, "Controls the interval between job state queries while a pod is still Pending", ) + cmd.Flags().Duration( + "empty-job-grace-period", + config.DefaultEmptyJobGracePeriod, + "Duration after starting a Kubernetes job that the controller will wait before considering failing the job due to a missing pod (e.g. when the podSpec specifies a missing service account)", + ) cmd.Flags().Bool( "prohibit-kubernetes-plugin", false, diff --git a/cmd/controller/controller_test.go b/cmd/controller/controller_test.go index 6e2dda64..e4fdb1c3 100644 --- a/cmd/controller/controller_test.go +++ b/cmd/controller/controller_test.go @@ -26,6 +26,7 @@ func TestReadAndParseConfig(t *testing.T) { JobTTL: 300 * time.Second, ImagePullBackOffGracePeriod: 60 * time.Second, JobCancelCheckerPollInterval: 10 * time.Second, + EmptyJobGracePeriod: 50 * time.Second, PollInterval: 5 * time.Second, StaleJobDataTimeout: 10 * time.Second, JobCreationConcurrency: 5, diff --git a/examples/config.yaml b/examples/config.yaml index 21b6d725..b941a6ee 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -4,6 +4,7 @@ image: my.registry.dev/buildkite-agent:latest job-ttl: 5m image-pull-backoff-grace-period: 60s job-cancel-checker-poll-interval: 10s +empty-job-grace-period: 50s poll-interval: 5s stale-job-data-timeout: 10s job-creation-concurrency: 5 diff --git a/internal/controller/config/config.go b/internal/controller/config/config.go index 0a8331a6..dd652809 100644 --- a/internal/controller/config/config.go +++ b/internal/controller/config/config.go @@ -17,6 +17,7 @@ const ( DefaultNamespace = "default" DefaultImagePullBackOffGracePeriod = 30 * time.Second DefaultJobCancelCheckerPollInterval = 5 * time.Second + DefaultEmptyJobGracePeriod = 30 * time.Second ) var DefaultAgentImage = "ghcr.io/buildkite/agent:" + version.Version() @@ -49,6 +50,7 @@ type Config struct { PodSpecPatch *corev1.PodSpec `json:"pod-spec-patch" validate:"omitempty"` ImagePullBackOffGracePeriod time.Duration `json:"image-pull-backoff-grace-period" validate:"omitempty"` JobCancelCheckerPollInterval time.Duration `json:"job-cancel-checker-poll-interval" validate:"omitempty"` + EmptyJobGracePeriod time.Duration `json:"empty-job-grace-period" validate:"omitempty"` // WorkspaceVolume allows supplying a volume for /workspace. By default // an EmptyDir volume is created for it. diff --git a/internal/controller/controller.go b/internal/controller/controller.go index f830210b..3cca6362 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -136,7 +136,20 @@ func Run( logger.Fatal("failed to register completions informer", zap.Error(err)) } + // JobWatcher watches for jobs in bad conditions to clean up: + // * Jobs that fail without ever creating a pod + // * Jobs that stall forever without ever creating a pod + jobWatcher := scheduler.NewJobWatcher( + logger.Named("jobWatcher"), + k8sClient, + cfg, + ) + if err := jobWatcher.RegisterInformer(ctx, informerFactory); err != nil { + logger.Fatal("failed to register jobWatcher informer", zap.Error(err)) + } + // PodWatcher watches for other conditions to clean up pods: + // * Pods where an init container failed for any reason // * Pods where a container is in ImagePullBackOff for too long // * Pods that are still pending, but the Buildkite job has been cancelled podWatcher := scheduler.NewPodWatcher( diff --git a/internal/controller/scheduler/completions.go b/internal/controller/scheduler/completions.go index 1191349b..81fe29c8 100644 --- a/internal/controller/scheduler/completions.go +++ b/internal/controller/scheduler/completions.go @@ -8,17 +8,21 @@ import ( "go.uber.org/zap" v1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/retry" - "k8s.io/utils/ptr" ) type completionsWatcher struct { logger *zap.Logger k8s kubernetes.Interface + + // This is the context passed to RegisterInformer. + // It's being stored here (grrrr!) because the k8s ResourceEventHandler + // interface doesn't have context args. (Working around an interface in a + // library outside of our control is a carve-out from the usual rule.) + // The context is needed to ensure goroutines are cleaned up. + resourceEventHandlerCtx context.Context } func NewPodCompletionWatcher(logger *zap.Logger, k8s kubernetes.Interface) *completionsWatcher { @@ -30,14 +34,12 @@ func NewPodCompletionWatcher(logger *zap.Logger, k8s kubernetes.Interface) *comp } // Creates a Pods informer and registers the handler on it -func (w *completionsWatcher) RegisterInformer( - ctx context.Context, - factory informers.SharedInformerFactory, -) error { +func (w *completionsWatcher) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error { informer := factory.Core().V1().Pods().Informer() if _, err := informer.AddEventHandler(w); err != nil { return err } + w.resourceEventHandlerCtx = ctx // see note on field go factory.Start(ctx.Done()) return nil } @@ -49,7 +51,7 @@ func (w *completionsWatcher) OnDelete(obj any) {} func (w *completionsWatcher) OnAdd(obj any, isInInitialList bool) { completionWatcherOnAddEventCounter.Inc() pod := obj.(*v1.Pod) - w.cleanupSidecars(pod) + w.cleanupSidecars(w.resourceEventHandlerCtx, pod) } func (w *completionsWatcher) OnUpdate(old any, new any) { @@ -62,7 +64,7 @@ func (w *completionsWatcher) OnUpdate(old any, new any) { } newPod := new.(*v1.Pod) - w.cleanupSidecars(newPod) + w.cleanupSidecars(w.resourceEventHandlerCtx, newPod) } // cleanupSidecars first checks if the container status of the agent container @@ -70,7 +72,7 @@ func (w *completionsWatcher) OnUpdate(old any, new any) { // it with an ActiveDeadlineSeconds value (defaultTermGracePeriodSeconds). // (So this is not actually sidecar-specific, but is needed because sidecars // would otherwise cause the pod to continue running.) -func (w *completionsWatcher) cleanupSidecars(pod *v1.Pod) { +func (w *completionsWatcher) cleanupSidecars(ctx context.Context, pod *v1.Pod) { terminated := getTermination(pod) if terminated == nil { return @@ -81,16 +83,7 @@ func (w *completionsWatcher) cleanupSidecars(pod *v1.Pod) { zap.Int32("exit code", terminated.ExitCode), ) - if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - ctx := context.TODO() - job, err := w.k8s.BatchV1().Jobs(pod.Namespace).Get(ctx, pod.Labels["job-name"], metav1.GetOptions{}) - if err != nil { - return err - } - job.Spec.ActiveDeadlineSeconds = ptr.To[int64](defaultTermGracePeriodSeconds) - _, err = w.k8s.BatchV1().Jobs(pod.Namespace).Update(ctx, job, metav1.UpdateOptions{}) - return err - }); err != nil { + if err := setJobDeadline(ctx, w.k8s, pod.Namespace, pod.Labels["job-name"]); err != nil { completionWatcherJobCleanupErrorsCounter.WithLabelValues(string(kerrors.ReasonForError(err))).Inc() w.logger.Error("failed to update job with ActiveDeadlineSeconds", zap.Error(err)) return diff --git a/internal/controller/scheduler/fail_job.go b/internal/controller/scheduler/fail_job.go index da8b5995..ce1f7a76 100644 --- a/internal/controller/scheduler/fail_job.go +++ b/internal/controller/scheduler/fail_job.go @@ -6,6 +6,8 @@ import ( "fmt" "os" + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags" + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" "github.com/buildkite/agent-stack-k8s/v2/internal/version" agentcore "github.com/buildkite/agent/v3/core" @@ -16,9 +18,42 @@ import ( "k8s.io/client-go/kubernetes" ) -// failJob fails the job in Buildkite. agentToken needs to be the token value. +// acquireAndFailForObject figures out how to fail the BK job corresponding to +// the k8s object (a pod or job) by inspecting the object's labels. +func acquireAndFailForObject( + ctx context.Context, + logger *zap.Logger, + k8sClient kubernetes.Interface, + cfg *config.Config, + obj metav1.Object, + message string, +) error { + agentToken, err := fetchAgentToken(ctx, logger, k8sClient, obj.GetNamespace(), cfg.AgentTokenSecret) + if err != nil { + logger.Error("fetching agent token from secret", zap.Error(err)) + return err + } + + // Matching tags are required order to connect the temporary agent. + labels := obj.GetLabels() + jobUUID := labels[config.UUIDLabel] + if jobUUID == "" { + logger.Error("object missing UUID label", zap.String("label", config.UUIDLabel)) + return errors.New("missing UUID label") + } + tags := agenttags.TagsFromLabels(labels) + opts := cfg.AgentConfig.ControllerOptions() + + if err := acquireAndFail(ctx, logger, agentToken, jobUUID, tags, message, opts...); err != nil { + logger.Error("failed to acquire and fail the job on Buildkite", zap.Error(err)) + return err + } + return nil +} + +// acquireAndFail fails the job in Buildkite. agentToken needs to be the token value. // Use fetchAgentToken to fetch it from the k8s secret. -func failJob( +func acquireAndFail( ctx context.Context, zapLogger *zap.Logger, agentToken string, diff --git a/internal/controller/scheduler/job_watcher.go b/internal/controller/scheduler/job_watcher.go new file mode 100644 index 00000000..f5b51a04 --- /dev/null +++ b/internal/controller/scheduler/job_watcher.go @@ -0,0 +1,258 @@ +package scheduler + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/model" + "github.com/jedib0t/go-pretty/v6/table" + + "go.uber.org/zap" + batchv1 "k8s.io/api/batch/v1" + eventsv1 "k8s.io/api/events/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/duration" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" +) + +// jobWatcher watches k8s jobs for failure to start a pod. The corresponding +// Buildkite job is failed with an error message if this happens. Also, if such +// a k8s job doesn't enter a terminal state on its own, jobWatcher sets a +// deadline so that it is cleaned up. +type jobWatcher struct { + // Logs go here + logger *zap.Logger + + k8s kubernetes.Interface + cfg *config.Config + + // Tracks stalling jobs (jobs that have yet to create pods). + stallingJobsMu sync.Mutex + stallingJobs map[*batchv1.Job]struct{} + + // This is the context passed to RegisterInformer. + // It's being stored here (grrrr!) because the k8s ResourceEventHandler + // interface doesn't have context args. (Working around an interface in a + // library outside of our control is a carve-out from the usual rule.) + // The context is needed to ensure goroutines are cleaned up. + resourceEventHandlerCtx context.Context +} + +// NewJobWatcher creates a JobWatcher. +func NewJobWatcher(logger *zap.Logger, k8sClient kubernetes.Interface, cfg *config.Config) *jobWatcher { + return &jobWatcher{ + logger: logger, + k8s: k8sClient, + cfg: cfg, + stallingJobs: make(map[*batchv1.Job]struct{}), + } +} + +// RegisterInformer registers the limiter to listen for Kubernetes job events. +func (w *jobWatcher) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error { + informer := factory.Batch().V1().Jobs() + jobInformer := informer.Informer() + if _, err := jobInformer.AddEventHandler(w); err != nil { + return err + } + w.resourceEventHandlerCtx = ctx // See field comment + go factory.Start(ctx.Done()) + // No need to wait for cache sync here. These are cleanup tasks, not + // barriers to prevent creating new jobs. + go w.stalledJobChecker(ctx) + return nil +} + +// OnAdd is called by k8s to inform us a resource is added. +func (w *jobWatcher) OnAdd(obj any, _ bool) { + jobWatcherOnAddEventCounter.Inc() + kjob, _ := obj.(*batchv1.Job) + if kjob == nil { + return + } + // Same logic whether we are considering pre-existing jobs, or new jobs. + w.runChecks(w.resourceEventHandlerCtx, kjob) +} + +// OnUpdate is called by k8s to inform us a resource is updated. +func (w *jobWatcher) OnUpdate(_, curr any) { + jobWatcherOnUpdateEventCounter.Inc() + kjob, _ := curr.(*batchv1.Job) + if kjob == nil { + return + } + // Same logic whether or not anything relevant changed about the job. + w.runChecks(w.resourceEventHandlerCtx, kjob) +} + +// OnDelete is called by k8s to inform us a resource is deleted. +func (w *jobWatcher) OnDelete(prev any) { + jobWatcherOnDeleteEventCounter.Inc() + kjob, _ := prev.(*batchv1.Job) + if kjob == nil { + return + } + w.removeFromStalling(kjob) + + // TODO: consider catching jobs that were deleted manually? +} + +func (w *jobWatcher) runChecks(ctx context.Context, kjob *batchv1.Job) { + log := loggerForObject(w.logger, kjob) + + if model.JobFinished(kjob) { + w.removeFromStalling(kjob) + w.checkFinishedWithoutPod(ctx, log, kjob) + } else { + w.checkStalledWithoutPod(log, kjob) + } +} + +func (w *jobWatcher) checkFinishedWithoutPod(ctx context.Context, log *zap.Logger, kjob *batchv1.Job) { + log.Debug("Checking job for finishing without a pod") + + // If the job is finished, there should be one finished pod. + if kjob.Status.Failed+kjob.Status.Succeeded > 0 { + // All's well with the world. + return + } + + // Because no pod has been created, the agent hasn't started. + // We can acquire the Buildkite job and fail it ourselves. + log.Info("The Kuberentes job ended without starting a pod. Failing the corresponding Buildkite job") + + w.fetchEventsAndFailJob(ctx, log, kjob, "The Kubernetes job ended without starting a pod.\n") +} + +func (w *jobWatcher) checkStalledWithoutPod(log *zap.Logger, kjob *batchv1.Job) { + log.Debug("Checking job for stalling without a pod") + + // If the job is not finished and there is no pod, it should start one + // before too long. Otherwise the job is stalled. + pods := kjob.Status.Active + kjob.Status.Failed + kjob.Status.Succeeded + // Ready and Terminating are subsets of Active (I think) + if utp := kjob.Status.UncountedTerminatedPods; utp != nil { + pods += int32(len(utp.Succeeded)) + pods += int32(len(utp.Failed)) + } + if pods > 0 { + // All's well with the world. + w.removeFromStalling(kjob) + return + } + + if kjob.Status.StartTime == nil { + // the _job_ hasn't even started? + return + } + + w.addToStalling(kjob) +} + +func (w *jobWatcher) fetchEventsAndFailJob(ctx context.Context, log *zap.Logger, kjob *batchv1.Job, message string) { + // List the events for the job, which might contain useful info for + // diagnosing the problem. + events := w.k8s.EventsV1().Events(w.cfg.Namespace) + evlist, err := events.List(ctx, metav1.ListOptions{ + FieldSelector: fields.AndSelectors( + fields.OneTermEqualSelector("involvedObject.kind", "Job"), + fields.OneTermEqualSelector("involvedObject.name", kjob.Name), + ).String(), + }) + if err != nil { + log.Error("Couldn't get events for job", zap.Error(err)) + message = fmt.Sprintf("%s\nCouldn't get events for job %s: %v", message, kjob.Name, err) + } + if evlist != nil { + message += "\n" + w.formatEvents(evlist) + } + + if err := acquireAndFailForObject(ctx, log, w.k8s, w.cfg, kjob, message); err != nil { + // Maybe the job was cancelled in the meantime? + log.Error("Could not fail Buildkite job", zap.Error(err)) + jobWatcherBuildkiteJobFailErrorsCounter.Inc() + return + } + jobWatcherBuildkiteJobFailsCounter.Inc() +} + +func (w *jobWatcher) formatEvents(evlist *eventsv1.EventList) string { + if len(evlist.Items) == 0 { + return "Events: none" + } + + tw := table.NewWriter() + tw.SetStyle(table.StyleColoredDark) + tw.AppendHeader(table.Row{"LAST EVENT", "REPEATED", "TYPE", "REASON", "NOTE"}) + tw.AppendSeparator() + for _, event := range evlist.Items { + if event.Series == nil { + tw.AppendRow(table.Row{event.EventTime.Time, "-", event.Type, event.Reason, event.Note}) + continue + } + lastTime := event.Series.LastObservedTime.Time + firstToLast := duration.HumanDuration(lastTime.Sub(event.EventTime.Time)) + countMsg := fmt.Sprintf("x%d over %s", event.Series.Count, firstToLast) + tw.AppendRow(table.Row{lastTime, countMsg, event.Type, event.Reason, event.Note}) + } + return tw.Render() +} + +func (w *jobWatcher) addToStalling(kjob *batchv1.Job) { + w.stallingJobsMu.Lock() + defer w.stallingJobsMu.Unlock() + w.stallingJobs[kjob] = struct{}{} +} + +func (w *jobWatcher) removeFromStalling(kjob *batchv1.Job) { + w.stallingJobsMu.Lock() + defer w.stallingJobsMu.Unlock() + delete(w.stallingJobs, kjob) +} + +func (w *jobWatcher) stalledJobChecker(ctx context.Context) { + ticker := time.Tick(time.Second) + for { + select { + case <-ctx.Done(): + return + + case <-ticker: + // continue below + } + + // Gather stalled jobs + var stalled []*batchv1.Job + w.stallingJobsMu.Lock() + for kjob := range w.stallingJobs { + if time.Since(kjob.Status.StartTime.Time) >= w.cfg.EmptyJobGracePeriod { + stalled = append(stalled, kjob) + delete(w.stallingJobs, kjob) + } + } + w.stallingJobsMu.Unlock() + + // Fail BK jobs and delete k8s jobs. + jobs := w.k8s.BatchV1().Jobs(w.cfg.Namespace) + for _, kjob := range stalled { + // Fail the BK job. This lists events for the job, so delete it afterwards. + log := loggerForObject(w.logger, kjob) + stallDuration := duration.HumanDuration(time.Since(kjob.Status.StartTime.Time)) + message := fmt.Sprintf("The Kubernetes job spent %s without starting a pod.\n", stallDuration) + w.fetchEventsAndFailJob(ctx, log, kjob, message) + + if err := jobs.Delete(ctx, kjob.Name, metav1.DeleteOptions{}); err != nil { + jobWatcherJobCleanupErrorsCounter.WithLabelValues(string(kerrors.ReasonForError(err))).Inc() + w.logger.Error("failed to delete stalled job", zap.Error(err)) + continue + } + jobWatcherJobCleanupsCounter.Inc() + } + } +} diff --git a/internal/controller/scheduler/metrics.go b/internal/controller/scheduler/metrics.go index fcfd2208..6e8ef921 100644 --- a/internal/controller/scheduler/metrics.go +++ b/internal/controller/scheduler/metrics.go @@ -53,6 +53,53 @@ var ( }) ) +// Job watcher metrics +var ( + jobWatcherOnAddEventCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "onadd_events_total", + Help: "Count of OnAdd informer events", + }) + jobWatcherOnUpdateEventCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "onupdate_events_total", + Help: "Count of OnUpdate informer events", + }) + jobWatcherOnDeleteEventCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "ondelete_events_total", + Help: "Count of OnDelete informer events", + }) + + jobWatcherBuildkiteJobFailsCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "jobs_failed_on_buildkite_total", + Help: "Count of jobs that jobWatcher successfully acquired and failed on Buildkite", + }) + jobWatcherBuildkiteJobFailErrorsCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "job_fail_on_buildkite_errors_total", + Help: "Count of errors when jobWatcher tried to acquire and fail a job on Buildkite", + }) + jobWatcherJobCleanupsCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "cleanups_total", + Help: "Count of stalled jobs successfully cleaned up", + }) + jobWatcherJobCleanupErrorsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "cleanup_errors_total", + Help: "Count of errors during attempts to clean up a stalled job", + }, []string{"reason"}) +) + // Pod watcher metrics var ( @@ -144,12 +191,12 @@ var ( Namespace: promNamespace, Subsystem: "completion_watcher", Name: "cleanups_total", - Help: "Count of jobs successfully cleaned up", + Help: "Count of jobs with finished agents successfully cleaned up", }) completionWatcherJobCleanupErrorsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: promNamespace, Subsystem: "completion_watcher", Name: "cleanup_errors_total", - Help: "Count of errors during attempts to clean up a job", + Help: "Count of errors during attempts to clean up a job with a finished agent", }, []string{"reason"}) ) diff --git a/internal/controller/scheduler/once_chan.go b/internal/controller/scheduler/once_chan.go new file mode 100644 index 00000000..246dbc1a --- /dev/null +++ b/internal/controller/scheduler/once_chan.go @@ -0,0 +1,17 @@ +package scheduler + +import "sync" + +// onceChan stores a channel and a [sync.Once] to be used for closing the +// channel at most once. +type onceChan struct { + once sync.Once + ch chan struct{} +} + +func (oc *onceChan) closeOnce() { + if oc == nil { + return + } + oc.once.Do(func() { close(oc.ch) }) +} diff --git a/internal/controller/scheduler/pod_watcher.go b/internal/controller/scheduler/pod_watcher.go index 11675ab1..dc784824 100644 --- a/internal/controller/scheduler/pod_watcher.go +++ b/internal/controller/scheduler/pod_watcher.go @@ -126,7 +126,7 @@ func (w *podWatcher) OnDelete(maybePod any) { return } - jobUUID, _, err := w.jobUUIDAndLogger(pod) + jobUUID, _, err := jobUUIDAndLoggerForObject(w.logger, pod) if err != nil { return } @@ -166,14 +166,19 @@ func (w *podWatcher) OnUpdate(oldMaybePod, newMaybePod any) { } func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) { - jobUUID, log, err := w.jobUUIDAndLogger(pod) + jobUUID, log, err := jobUUIDAndLoggerForObject(w.logger, pod) if err != nil { return } + if w.isIgnored(jobUUID) { + log.Debug("Job is currently ignored for podWatcher checks") + return + } + // Check for an init container that failed for any reason. // (Note: users can define their own init containers through podSpec.) - w.failOnInitContainerFailure(ctx, log, pod, jobUUID) + w.failOnInitContainerFailure(ctx, log, pod) // Check for a container stuck in ImagePullBackOff or InvalidImageName, // and fail or cancel the job accordingly. @@ -184,40 +189,6 @@ func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) { w.startOrStopJobCancelChecker(ctx, log, pod, jobUUID) } -func (w *podWatcher) jobUUIDAndLogger(pod *corev1.Pod) (uuid.UUID, *zap.Logger, error) { - log := w.logger.With(zap.String("namespace", pod.Namespace), zap.String("podName", pod.Name)) - - rawJobUUID, exists := pod.Labels[config.UUIDLabel] - if !exists { - log.Debug("Job UUID label not present. Skipping.") - return uuid.UUID{}, log, errors.New("no job UUID label") - } - - jobUUID, err := uuid.Parse(rawJobUUID) - if err != nil { - log.Warn("Job UUID label was not a UUID!", zap.String("jobUUID", rawJobUUID)) - return uuid.UUID{}, log, err - } - - log = log.With(zap.String("jobUUID", jobUUID.String())) - - // Check that tags match - there may be pods around that were created by - // another controller using different tags. - if !agenttags.JobTagsMatchAgentTags(agenttags.ScanLabels(pod.Labels), w.agentTags) { - log.Debug("Pod labels do not match agent tags for this controller. Skipping.") - return uuid.UUID{}, log, errors.New("pod labels do not match agent tags for this controller") - } - - w.ignoreJobsMu.RLock() - defer w.ignoreJobsMu.RUnlock() - - if _, ignore := w.ignoreJobs[jobUUID]; ignore { - log.Debug("Job already failed, canceled, or wasn't in a failable/cancellable state") - return jobUUID, log, errors.New("job ignored") - } - return jobUUID, log, nil -} - func (w *podWatcher) failOnImagePullFailure(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID) { log.Debug("Checking pod containers for ImagePullBackOff or InvalidImageName") @@ -299,13 +270,25 @@ func (w *podWatcher) failOnImagePullFailure(ctx context.Context, log *zap.Logger // We can acquire it and fail it ourselves. log.Info("One or more job containers are in ImagePullBackOff. Failing.") message := w.formatImagePullFailureMessage(images) - if err := w.failJob(ctx, log, pod, jobUUID, message); errors.Is(err, agentcore.ErrJobAcquisitionRejected) { - // If the error was because BK rejected the acquisition(?), then its probably moved - // on to a state where we need to cancel instead. + switch err := acquireAndFailForObject(ctx, log, w.k8s, w.cfg, pod, message); { + case errors.Is(err, agentcore.ErrJobAcquisitionRejected): + podWatcherBuildkiteJobFailErrorsCounter.Inc() + // If the error was because BK rejected the job acquisition, then + // it's moved on to a state where we need to cancel instead. + // (The init container probably successfully pulled, but another + // pull of the same image later on failed after the agent started.) log.Info("Attempting to cancel job instead") w.cancelJob(ctx, log, pod, jobUUID) return + + case err != nil: + podWatcherBuildkiteJobFailErrorsCounter.Inc() + + // Maybe the job was cancelled in the meantime? + log.Error("Could not fail Buildkite job", zap.Error(err)) + return } + podWatcherBuildkiteJobFailsCounter.Inc() // Also evict the pod, because it won't die on its own. w.evictPod(ctx, log, pod, jobUUID) @@ -330,7 +313,7 @@ func (w *podWatcher) failOnImagePullFailure(ctx context.Context, log *zap.Logger } } -func (w *podWatcher) failOnInitContainerFailure(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID) { +func (w *podWatcher) failOnInitContainerFailure(ctx context.Context, log *zap.Logger, pod *corev1.Pod) { log.Debug("Checking pod for failed init containers") containerFails := make(map[string]*corev1.ContainerStateTerminated) @@ -359,8 +342,13 @@ func (w *podWatcher) failOnInitContainerFailure(ctx context.Context, log *zap.Lo // probably shouldn't interfere. log.Info("One or more init containers failed. Failing.") message := w.formatInitContainerFails(containerFails) - // failJob logs on error - that's sufficient error handling. - _ = w.failJob(ctx, log, pod, jobUUID, message) + if err := acquireAndFailForObject(ctx, log, w.k8s, w.cfg, pod, message); err != nil { + // Maybe the job was cancelled in the meantime? + log.Error("Could not fail Buildkite job", zap.Error(err)) + podWatcherBuildkiteJobFailErrorsCounter.Inc() + return + } + podWatcherBuildkiteJobFailsCounter.Inc() // No need to fall back to cancelling if acquire failed - see above. // No need to evict, the pod should be considered failed already. } @@ -398,26 +386,6 @@ func (w *podWatcher) formatImagePullFailureMessage(images map[string]struct{}) s return message.String() } -func (w *podWatcher) failJob(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID, message string) error { - agentToken, err := fetchAgentToken(ctx, w.logger, w.k8s, w.cfg.Namespace, w.cfg.AgentTokenSecret) - if err != nil { - log.Error("Couldn't fetch agent token in order to fail the job", zap.Error(err)) - return err - } - - // Tags are required order to connect the agent. - tags := agenttags.TagsFromLabels(pod.Labels) - opts := w.cfg.AgentConfig.ControllerOptions() - - if err := failJob(ctx, w.logger, agentToken, jobUUID.String(), tags, message, opts...); err != nil { - log.Error("Couldn't fail the job", zap.Error(err)) - podWatcherBuildkiteJobFailErrorsCounter.Inc() - return err - } - podWatcherBuildkiteJobFailsCounter.Inc() - return nil -} - func (w *podWatcher) evictPod(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID) { eviction := &policyv1.Eviction{ ObjectMeta: pod.ObjectMeta, @@ -562,18 +530,11 @@ func (w *podWatcher) unignoreJob(jobUUID uuid.UUID) { delete(w.ignoreJobs, jobUUID) } -// onceChan stores a channel and a [sync.Once] to be used for closing the -// channel at most once. -type onceChan struct { - once sync.Once - ch chan struct{} -} - -func (oc *onceChan) closeOnce() { - if oc == nil { - return - } - oc.once.Do(func() { close(oc.ch) }) +func (w *podWatcher) isIgnored(jobUUID uuid.UUID) bool { + w.ignoreJobsMu.RLock() + defer w.ignoreJobsMu.RUnlock() + _, ignore := w.ignoreJobs[jobUUID] + return ignore } // All container-\d containers will have the agent installed as their PID 1. diff --git a/internal/controller/scheduler/scheduler.go b/internal/controller/scheduler/scheduler.go index e0eac559..23052031 100644 --- a/internal/controller/scheduler/scheduler.go +++ b/internal/controller/scheduler/scheduler.go @@ -30,7 +30,6 @@ import ( ) const ( - defaultTermGracePeriodSeconds = 60 agentTokenKey = "BUILDKITE_AGENT_TOKEN" AgentContainerName = "agent" CopyAgentContainerName = "copy-agent" @@ -884,7 +883,7 @@ func (w *worker) failJob(ctx context.Context, inputs buildInputs, message string } opts := w.cfg.AgentConfig.ControllerOptions() - if err := failJob(ctx, w.logger, agentToken, inputs.uuid, inputs.agentQueryRules, message, opts...); err != nil { + if err := acquireAndFail(ctx, w.logger, agentToken, inputs.uuid, inputs.agentQueryRules, message, opts...); err != nil { w.logger.Error("failed to acquire and fail the job on Buildkite", zap.Error(err)) schedulerBuildkiteJobFailErrorsCounter.Inc() return err diff --git a/internal/controller/scheduler/set_deadline.go b/internal/controller/scheduler/set_deadline.go new file mode 100644 index 00000000..945b3e93 --- /dev/null +++ b/internal/controller/scheduler/set_deadline.go @@ -0,0 +1,26 @@ +package scheduler + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" + "k8s.io/utils/ptr" +) + +const defaultTermGracePeriodSeconds = 60 + +func setJobDeadline(ctx context.Context, k8sClient kubernetes.Interface, namespace, jobName string) error { + jobs := k8sClient.BatchV1().Jobs(namespace) + + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + job, err := jobs.Get(ctx, jobName, metav1.GetOptions{}) + if err != nil { + return err + } + job.Spec.ActiveDeadlineSeconds = ptr.To[int64](defaultTermGracePeriodSeconds) + _, err = jobs.Update(ctx, job, metav1.UpdateOptions{}) + return err + }) +} diff --git a/internal/controller/scheduler/uuid_and_logger.go b/internal/controller/scheduler/uuid_and_logger.go new file mode 100644 index 00000000..768a34b4 --- /dev/null +++ b/internal/controller/scheduler/uuid_and_logger.go @@ -0,0 +1,42 @@ +package scheduler + +import ( + "errors" + + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" + "github.com/google/uuid" + "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// loggerForObject curries a logger with namespace, name, and job UUID taken +// from the object labels. +func loggerForObject(baseLog *zap.Logger, obj metav1.Object) *zap.Logger { + return baseLog.With( + zap.String("namespace", obj.GetNamespace()), + zap.String("name", obj.GetName()), + zap.String("jobUUID", obj.GetLabels()[config.UUIDLabel]), + ) +} + +// jobUUIDAndLoggerForObject parses the Buildkite job UUID from the object +// labels, and curries a logger with namespace, name, and jobUUID. +func jobUUIDAndLoggerForObject(baseLog *zap.Logger, obj metav1.Object) (uuid.UUID, *zap.Logger, error) { + log := baseLog.With(zap.String("namespace", obj.GetNamespace()), zap.String("name", obj.GetName())) + + rawJobUUID, exists := obj.GetLabels()[config.UUIDLabel] + if !exists { + log.Debug("Job UUID label not present. Skipping.") + return uuid.UUID{}, log, errors.New("no job UUID label") + } + + jobUUID, err := uuid.Parse(rawJobUUID) + if err != nil { + log.Warn("Job UUID label was not a UUID!", zap.String("jobUUID", rawJobUUID)) + return uuid.UUID{}, log, err + } + + log = log.With(zap.String("jobUUID", jobUUID.String())) + + return jobUUID, log, nil +} diff --git a/internal/integration/fixtures/missing-service-account.yaml b/internal/integration/fixtures/missing-service-account.yaml new file mode 100644 index 00000000..cc2a128f --- /dev/null +++ b/internal/integration/fixtures/missing-service-account.yaml @@ -0,0 +1,12 @@ +steps: + - label: ":x:" + agents: + queue: "{{.queue}}" + plugins: + - kubernetes: + podSpec: + serviceAccount: does-not-exist + containers: + - image: buildkite/agent:latest + command: + - "true" diff --git a/internal/integration/integration_test.go b/internal/integration/integration_test.go index f8e7c1ae..7427772a 100644 --- a/internal/integration/integration_test.go +++ b/internal/integration/integration_test.go @@ -391,6 +391,21 @@ func TestInvalidPodJSON(t *testing.T) { ) } +func TestMissingServiceAccount(t *testing.T) { + tc := testcase{ + T: t, + Fixture: "missing-service-account.yaml", + Repo: repoHTTP, + GraphQL: api.NewClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint), + }.Init() + ctx := context.Background() + pipelineID := tc.PrepareQueueAndPipelineWithCleanup(ctx) + tc.StartController(ctx, cfg) + build := tc.TriggerBuild(ctx, pipelineID) + tc.AssertFail(ctx, build) + tc.AssertLogsContain(build, "error looking up service account") +} + func TestEnvVariables(t *testing.T) { tc := testcase{ T: t,