From 92e88f902ea770b1ca466fd9b580d1d0a12f1ec5 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Mon, 2 Dec 2024 15:18:07 +1100 Subject: [PATCH 1/6] Add missing flags for config options --- cmd/controller/controller.go | 10 ++++++++++ internal/controller/config/config.go | 2 ++ internal/controller/monitor/monitor.go | 5 +++-- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index b53b62e5..ecb6fad1 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -90,6 +90,16 @@ func AddConfigFlags(cmd *cobra.Command) { ) cmd.Flags().String("graphql-endpoint", "", "Buildkite GraphQL endpoint URL") + cmd.Flags().Duration( + "stale-job-data-timeout", + config.DefaultStaleJobDataTimeout, + "Duration after querying jobs in Buildkite that the data is considered valid", + ) + cmd.Flags().Int( + "job-creation-concurrency", + config.DefaultJobCreationConcurrency, + "Number of concurrent goroutines to run for converting Buildkite jobs into Kubernetes jobs", + ) cmd.Flags().Duration( "image-pull-backoff-grace-period", config.DefaultImagePullBackOffGracePeriod, diff --git a/internal/controller/config/config.go b/internal/controller/config/config.go index 0a8331a6..167a2068 100644 --- a/internal/controller/config/config.go +++ b/internal/controller/config/config.go @@ -15,8 +15,10 @@ const ( BuildURLAnnotation = "buildkite.com/build-url" JobURLAnnotation = "buildkite.com/job-url" DefaultNamespace = "default" + DefaultStaleJobDataTimeout = 10 * time.Second DefaultImagePullBackOffGracePeriod = 30 * time.Second DefaultJobCancelCheckerPollInterval = 5 * time.Second + DefaultJobCreationConcurrency = 5 ) var DefaultAgentImage = "ghcr.io/buildkite/agent:" + version.Version() diff --git a/internal/controller/monitor/monitor.go b/internal/controller/monitor/monitor.go index 9fa6c76e..98d14726 100644 --- a/internal/controller/monitor/monitor.go +++ b/internal/controller/monitor/monitor.go @@ -14,6 +14,7 @@ import ( "github.com/Khan/genqlient/graphql" "github.com/buildkite/agent-stack-k8s/v2/api" "github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags" + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" "github.com/buildkite/agent-stack-k8s/v2/internal/controller/model" "go.uber.org/zap" "k8s.io/client-go/kubernetes" @@ -46,12 +47,12 @@ func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, er // Default StaleJobDataTimeout to 10s. if cfg.StaleJobDataTimeout <= 0 { - cfg.StaleJobDataTimeout = 10 * time.Second + cfg.StaleJobDataTimeout = config.DefaultStaleJobDataTimeout } // Default CreationConcurrency to 5. if cfg.JobCreationConcurrency <= 0 { - cfg.JobCreationConcurrency = 5 + cfg.JobCreationConcurrency = config.DefaultJobCreationConcurrency } return &Monitor{ From def6d4a999e549c100781c492067e8efa1db1a42 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Mon, 2 Dec 2024 15:18:07 +1100 Subject: [PATCH 2/6] Better ctx in completion watcher --- internal/controller/scheduler/completions.go | 23 +++++++++++++------- internal/controller/scheduler/scheduler.go | 1 - 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/internal/controller/scheduler/completions.go b/internal/controller/scheduler/completions.go index 1191349b..a59656c7 100644 --- a/internal/controller/scheduler/completions.go +++ b/internal/controller/scheduler/completions.go @@ -6,6 +6,7 @@ import ( "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" "go.uber.org/zap" + v1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -16,9 +17,18 @@ import ( "k8s.io/utils/ptr" ) +const defaultTermGracePeriodSeconds = 60 + type completionsWatcher struct { logger *zap.Logger k8s kubernetes.Interface + + // This is the context passed to RegisterInformer. + // It's being stored here (grrrr!) because the k8s ResourceEventHandler + // interface doesn't have context args. (Working around an interface in a + // library outside of our control is a carve-out from the usual rule.) + // The context is needed to ensure goroutines are cleaned up. + resourceEventHandlerCtx context.Context } func NewPodCompletionWatcher(logger *zap.Logger, k8s kubernetes.Interface) *completionsWatcher { @@ -30,14 +40,12 @@ func NewPodCompletionWatcher(logger *zap.Logger, k8s kubernetes.Interface) *comp } // Creates a Pods informer and registers the handler on it -func (w *completionsWatcher) RegisterInformer( - ctx context.Context, - factory informers.SharedInformerFactory, -) error { +func (w *completionsWatcher) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error { informer := factory.Core().V1().Pods().Informer() if _, err := informer.AddEventHandler(w); err != nil { return err } + w.resourceEventHandlerCtx = ctx // see note on field go factory.Start(ctx.Done()) return nil } @@ -49,7 +57,7 @@ func (w *completionsWatcher) OnDelete(obj any) {} func (w *completionsWatcher) OnAdd(obj any, isInInitialList bool) { completionWatcherOnAddEventCounter.Inc() pod := obj.(*v1.Pod) - w.cleanupSidecars(pod) + w.cleanupSidecars(w.resourceEventHandlerCtx, pod) } func (w *completionsWatcher) OnUpdate(old any, new any) { @@ -62,7 +70,7 @@ func (w *completionsWatcher) OnUpdate(old any, new any) { } newPod := new.(*v1.Pod) - w.cleanupSidecars(newPod) + w.cleanupSidecars(w.resourceEventHandlerCtx, newPod) } // cleanupSidecars first checks if the container status of the agent container @@ -70,7 +78,7 @@ func (w *completionsWatcher) OnUpdate(old any, new any) { // it with an ActiveDeadlineSeconds value (defaultTermGracePeriodSeconds). // (So this is not actually sidecar-specific, but is needed because sidecars // would otherwise cause the pod to continue running.) -func (w *completionsWatcher) cleanupSidecars(pod *v1.Pod) { +func (w *completionsWatcher) cleanupSidecars(ctx context.Context, pod *v1.Pod) { terminated := getTermination(pod) if terminated == nil { return @@ -82,7 +90,6 @@ func (w *completionsWatcher) cleanupSidecars(pod *v1.Pod) { ) if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - ctx := context.TODO() job, err := w.k8s.BatchV1().Jobs(pod.Namespace).Get(ctx, pod.Labels["job-name"], metav1.GetOptions{}) if err != nil { return err diff --git a/internal/controller/scheduler/scheduler.go b/internal/controller/scheduler/scheduler.go index e0eac559..a23094bb 100644 --- a/internal/controller/scheduler/scheduler.go +++ b/internal/controller/scheduler/scheduler.go @@ -30,7 +30,6 @@ import ( ) const ( - defaultTermGracePeriodSeconds = 60 agentTokenKey = "BUILDKITE_AGENT_TOKEN" AgentContainerName = "agent" CopyAgentContainerName = "copy-agent" From 98e9a2d552f99c23b9e006a76707f6a3f5f9286c Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Mon, 2 Dec 2024 15:18:07 +1100 Subject: [PATCH 3/6] Add isIgnored helper method and gauge --- internal/controller/scheduler/metrics.go | 7 ++++ internal/controller/scheduler/pod_watcher.go | 40 +++++++++++++------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/internal/controller/scheduler/metrics.go b/internal/controller/scheduler/metrics.go index fcfd2208..49e7d542 100644 --- a/internal/controller/scheduler/metrics.go +++ b/internal/controller/scheduler/metrics.go @@ -58,6 +58,7 @@ var ( var ( // Overridden to return len(jobCancelCheckers) by podWatcher. jobCancelCheckerGaugeFunc = func() int { return 0 } + ignoredJobsGaugeFunc = func() int { return 0 } _ = promauto.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: promNamespace, @@ -65,6 +66,12 @@ var ( Name: "num_job_cancel_checkers", Help: "Current count of job cancellation checkers", }, func() float64 { return float64(jobCancelCheckerGaugeFunc()) }) + _ = promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: promNamespace, + Subsystem: "pod_watcher", + Name: "num_ignored_jobs", + Help: "Current count of jobs ignored for podWatcher checks", + }, func() float64 { return float64(ignoredJobsGaugeFunc()) }) podWatcherOnAddEventCounter = promauto.NewCounter(prometheus.CounterOpts{ Namespace: promNamespace, diff --git a/internal/controller/scheduler/pod_watcher.go b/internal/controller/scheduler/pod_watcher.go index 11675ab1..a3ec4f6e 100644 --- a/internal/controller/scheduler/pod_watcher.go +++ b/internal/controller/scheduler/pod_watcher.go @@ -41,8 +41,8 @@ type podWatcher struct { // Jobs that we've failed, cancelled, or were found to be in a terminal // state. - ignoreJobsMu sync.RWMutex - ignoreJobs map[uuid.UUID]struct{} + ignoredJobsMu sync.RWMutex + ignoredJob map[uuid.UUID]struct{} // The job cancel checkers query the job state every so often. jobCancelCheckerInterval time.Duration @@ -95,10 +95,15 @@ func NewPodWatcher(logger *zap.Logger, k8s kubernetes.Interface, cfg *config.Con cfg: cfg, imagePullBackOffGracePeriod: imagePullBackOffGracePeriod, jobCancelCheckerInterval: jobCancelCheckerInterval, - ignoreJobs: make(map[uuid.UUID]struct{}), + ignoredJob: make(map[uuid.UUID]struct{}), cancelCheckerChs: make(map[uuid.UUID]*onceChan), agentTags: agentTags, } + ignoredJobsGaugeFunc = func() int { + pw.ignoredJobsMu.RLock() + defer pw.ignoredJobsMu.RUnlock() + return len(pw.ignoredJob) + } jobCancelCheckerGaugeFunc = func() int { pw.cancelCheckerChsMu.Lock() defer pw.cancelCheckerChsMu.Unlock() @@ -171,6 +176,11 @@ func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) { return } + if w.isIgnored(jobUUID) { + log.Debug("Job is currently ignored for podWatcher checks") + return + } + // Check for an init container that failed for any reason. // (Note: users can define their own init containers through podSpec.) w.failOnInitContainerFailure(ctx, log, pod, jobUUID) @@ -208,10 +218,7 @@ func (w *podWatcher) jobUUIDAndLogger(pod *corev1.Pod) (uuid.UUID, *zap.Logger, return uuid.UUID{}, log, errors.New("pod labels do not match agent tags for this controller") } - w.ignoreJobsMu.RLock() - defer w.ignoreJobsMu.RUnlock() - - if _, ignore := w.ignoreJobs[jobUUID]; ignore { + if w.isIgnored(jobUUID) { log.Debug("Job already failed, canceled, or wasn't in a failable/cancellable state") return jobUUID, log, errors.New("job ignored") } @@ -551,15 +558,22 @@ func (w *podWatcher) jobCancelChecker(ctx context.Context, stopCh <-chan struct{ } func (w *podWatcher) ignoreJob(jobUUID uuid.UUID) { - w.ignoreJobsMu.Lock() - defer w.ignoreJobsMu.Unlock() - w.ignoreJobs[jobUUID] = struct{}{} + w.ignoredJobsMu.Lock() + defer w.ignoredJobsMu.Unlock() + w.ignoredJob[jobUUID] = struct{}{} } func (w *podWatcher) unignoreJob(jobUUID uuid.UUID) { - w.ignoreJobsMu.Lock() - defer w.ignoreJobsMu.Unlock() - delete(w.ignoreJobs, jobUUID) + w.ignoredJobsMu.Lock() + defer w.ignoredJobsMu.Unlock() + delete(w.ignoredJob, jobUUID) +} + +func (w *podWatcher) isIgnored(jobUUID uuid.UUID) bool { + w.ignoredJobsMu.RLock() + defer w.ignoredJobsMu.RUnlock() + _, ignore := w.ignoredJob[jobUUID] + return ignore } // onceChan stores a channel and a [sync.Once] to be used for closing the From f849b99fdc5d0bda1c20b8f0150570813d6a75c0 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Mon, 2 Dec 2024 15:18:07 +1100 Subject: [PATCH 4/6] Refactor fail_job functions --- internal/controller/scheduler/fail_job.go | 39 ++++++++++++++- internal/controller/scheduler/pod_watcher.go | 51 +++++++++----------- internal/controller/scheduler/scheduler.go | 2 +- 3 files changed, 62 insertions(+), 30 deletions(-) diff --git a/internal/controller/scheduler/fail_job.go b/internal/controller/scheduler/fail_job.go index da8b5995..ce1f7a76 100644 --- a/internal/controller/scheduler/fail_job.go +++ b/internal/controller/scheduler/fail_job.go @@ -6,6 +6,8 @@ import ( "fmt" "os" + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags" + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" "github.com/buildkite/agent-stack-k8s/v2/internal/version" agentcore "github.com/buildkite/agent/v3/core" @@ -16,9 +18,42 @@ import ( "k8s.io/client-go/kubernetes" ) -// failJob fails the job in Buildkite. agentToken needs to be the token value. +// acquireAndFailForObject figures out how to fail the BK job corresponding to +// the k8s object (a pod or job) by inspecting the object's labels. +func acquireAndFailForObject( + ctx context.Context, + logger *zap.Logger, + k8sClient kubernetes.Interface, + cfg *config.Config, + obj metav1.Object, + message string, +) error { + agentToken, err := fetchAgentToken(ctx, logger, k8sClient, obj.GetNamespace(), cfg.AgentTokenSecret) + if err != nil { + logger.Error("fetching agent token from secret", zap.Error(err)) + return err + } + + // Matching tags are required order to connect the temporary agent. + labels := obj.GetLabels() + jobUUID := labels[config.UUIDLabel] + if jobUUID == "" { + logger.Error("object missing UUID label", zap.String("label", config.UUIDLabel)) + return errors.New("missing UUID label") + } + tags := agenttags.TagsFromLabels(labels) + opts := cfg.AgentConfig.ControllerOptions() + + if err := acquireAndFail(ctx, logger, agentToken, jobUUID, tags, message, opts...); err != nil { + logger.Error("failed to acquire and fail the job on Buildkite", zap.Error(err)) + return err + } + return nil +} + +// acquireAndFail fails the job in Buildkite. agentToken needs to be the token value. // Use fetchAgentToken to fetch it from the k8s secret. -func failJob( +func acquireAndFail( ctx context.Context, zapLogger *zap.Logger, agentToken string, diff --git a/internal/controller/scheduler/pod_watcher.go b/internal/controller/scheduler/pod_watcher.go index a3ec4f6e..3b37cfc9 100644 --- a/internal/controller/scheduler/pod_watcher.go +++ b/internal/controller/scheduler/pod_watcher.go @@ -183,7 +183,7 @@ func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) { // Check for an init container that failed for any reason. // (Note: users can define their own init containers through podSpec.) - w.failOnInitContainerFailure(ctx, log, pod, jobUUID) + w.failOnInitContainerFailure(ctx, log, pod) // Check for a container stuck in ImagePullBackOff or InvalidImageName, // and fail or cancel the job accordingly. @@ -306,13 +306,25 @@ func (w *podWatcher) failOnImagePullFailure(ctx context.Context, log *zap.Logger // We can acquire it and fail it ourselves. log.Info("One or more job containers are in ImagePullBackOff. Failing.") message := w.formatImagePullFailureMessage(images) - if err := w.failJob(ctx, log, pod, jobUUID, message); errors.Is(err, agentcore.ErrJobAcquisitionRejected) { - // If the error was because BK rejected the acquisition(?), then its probably moved - // on to a state where we need to cancel instead. + switch err := acquireAndFailForObject(ctx, log, w.k8s, w.cfg, pod, message); { + case errors.Is(err, agentcore.ErrJobAcquisitionRejected): + podWatcherBuildkiteJobFailErrorsCounter.Inc() + // If the error was because BK rejected the job acquisition, then + // it's moved on to a state where we need to cancel instead. + // (The init container probably successfully pulled, but another + // pull of the same image later on failed after the agent started.) log.Info("Attempting to cancel job instead") w.cancelJob(ctx, log, pod, jobUUID) return + + case err != nil: + podWatcherBuildkiteJobFailErrorsCounter.Inc() + + // Maybe the job was cancelled in the meantime? + log.Error("Could not fail Buildkite job", zap.Error(err)) + return } + podWatcherBuildkiteJobFailsCounter.Inc() // Also evict the pod, because it won't die on its own. w.evictPod(ctx, log, pod, jobUUID) @@ -337,7 +349,7 @@ func (w *podWatcher) failOnImagePullFailure(ctx context.Context, log *zap.Logger } } -func (w *podWatcher) failOnInitContainerFailure(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID) { +func (w *podWatcher) failOnInitContainerFailure(ctx context.Context, log *zap.Logger, pod *corev1.Pod) { log.Debug("Checking pod for failed init containers") containerFails := make(map[string]*corev1.ContainerStateTerminated) @@ -366,8 +378,13 @@ func (w *podWatcher) failOnInitContainerFailure(ctx context.Context, log *zap.Lo // probably shouldn't interfere. log.Info("One or more init containers failed. Failing.") message := w.formatInitContainerFails(containerFails) - // failJob logs on error - that's sufficient error handling. - _ = w.failJob(ctx, log, pod, jobUUID, message) + if err := acquireAndFailForObject(ctx, log, w.k8s, w.cfg, pod, message); err != nil { + // Maybe the job was cancelled in the meantime? + log.Error("Could not fail Buildkite job", zap.Error(err)) + podWatcherBuildkiteJobFailErrorsCounter.Inc() + return + } + podWatcherBuildkiteJobFailsCounter.Inc() // No need to fall back to cancelling if acquire failed - see above. // No need to evict, the pod should be considered failed already. } @@ -405,26 +422,6 @@ func (w *podWatcher) formatImagePullFailureMessage(images map[string]struct{}) s return message.String() } -func (w *podWatcher) failJob(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID, message string) error { - agentToken, err := fetchAgentToken(ctx, w.logger, w.k8s, w.cfg.Namespace, w.cfg.AgentTokenSecret) - if err != nil { - log.Error("Couldn't fetch agent token in order to fail the job", zap.Error(err)) - return err - } - - // Tags are required order to connect the agent. - tags := agenttags.TagsFromLabels(pod.Labels) - opts := w.cfg.AgentConfig.ControllerOptions() - - if err := failJob(ctx, w.logger, agentToken, jobUUID.String(), tags, message, opts...); err != nil { - log.Error("Couldn't fail the job", zap.Error(err)) - podWatcherBuildkiteJobFailErrorsCounter.Inc() - return err - } - podWatcherBuildkiteJobFailsCounter.Inc() - return nil -} - func (w *podWatcher) evictPod(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID) { eviction := &policyv1.Eviction{ ObjectMeta: pod.ObjectMeta, diff --git a/internal/controller/scheduler/scheduler.go b/internal/controller/scheduler/scheduler.go index a23094bb..23052031 100644 --- a/internal/controller/scheduler/scheduler.go +++ b/internal/controller/scheduler/scheduler.go @@ -883,7 +883,7 @@ func (w *worker) failJob(ctx context.Context, inputs buildInputs, message string } opts := w.cfg.AgentConfig.ControllerOptions() - if err := failJob(ctx, w.logger, agentToken, inputs.uuid, inputs.agentQueryRules, message, opts...); err != nil { + if err := acquireAndFail(ctx, w.logger, agentToken, inputs.uuid, inputs.agentQueryRules, message, opts...); err != nil { w.logger.Error("failed to acquire and fail the job on Buildkite", zap.Error(err)) schedulerBuildkiteJobFailErrorsCounter.Inc() return err From 4df35d21ec1dbdb17bb2bbccbbf617a8a565d43d Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Mon, 2 Dec 2024 15:18:07 +1100 Subject: [PATCH 5/6] Refactor jobUUIDAndLogger helper function --- internal/controller/scheduler/pod_watcher.go | 39 +++---------------- .../controller/scheduler/uuid_and_logger.go | 30 ++++++++++++++ 2 files changed, 36 insertions(+), 33 deletions(-) create mode 100644 internal/controller/scheduler/uuid_and_logger.go diff --git a/internal/controller/scheduler/pod_watcher.go b/internal/controller/scheduler/pod_watcher.go index 3b37cfc9..f03fd895 100644 --- a/internal/controller/scheduler/pod_watcher.go +++ b/internal/controller/scheduler/pod_watcher.go @@ -131,8 +131,10 @@ func (w *podWatcher) OnDelete(maybePod any) { return } - jobUUID, _, err := w.jobUUIDAndLogger(pod) + log := loggerForObject(w.logger, pod) + jobUUID, err := jobUUIDForObject(pod) if err != nil { + log.Error("Job UUID label missing or invalid for pod") return } @@ -171,8 +173,10 @@ func (w *podWatcher) OnUpdate(oldMaybePod, newMaybePod any) { } func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) { - jobUUID, log, err := w.jobUUIDAndLogger(pod) + log := loggerForObject(w.logger, pod) + jobUUID, err := jobUUIDForObject(pod) if err != nil { + log.Error("Job UUID label missing or invalid for pod") return } @@ -194,37 +198,6 @@ func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) { w.startOrStopJobCancelChecker(ctx, log, pod, jobUUID) } -func (w *podWatcher) jobUUIDAndLogger(pod *corev1.Pod) (uuid.UUID, *zap.Logger, error) { - log := w.logger.With(zap.String("namespace", pod.Namespace), zap.String("podName", pod.Name)) - - rawJobUUID, exists := pod.Labels[config.UUIDLabel] - if !exists { - log.Debug("Job UUID label not present. Skipping.") - return uuid.UUID{}, log, errors.New("no job UUID label") - } - - jobUUID, err := uuid.Parse(rawJobUUID) - if err != nil { - log.Warn("Job UUID label was not a UUID!", zap.String("jobUUID", rawJobUUID)) - return uuid.UUID{}, log, err - } - - log = log.With(zap.String("jobUUID", jobUUID.String())) - - // Check that tags match - there may be pods around that were created by - // another controller using different tags. - if !agenttags.JobTagsMatchAgentTags(agenttags.ScanLabels(pod.Labels), w.agentTags) { - log.Debug("Pod labels do not match agent tags for this controller. Skipping.") - return uuid.UUID{}, log, errors.New("pod labels do not match agent tags for this controller") - } - - if w.isIgnored(jobUUID) { - log.Debug("Job already failed, canceled, or wasn't in a failable/cancellable state") - return jobUUID, log, errors.New("job ignored") - } - return jobUUID, log, nil -} - func (w *podWatcher) failOnImagePullFailure(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID) { log.Debug("Checking pod containers for ImagePullBackOff or InvalidImageName") diff --git a/internal/controller/scheduler/uuid_and_logger.go b/internal/controller/scheduler/uuid_and_logger.go new file mode 100644 index 00000000..9711b4e7 --- /dev/null +++ b/internal/controller/scheduler/uuid_and_logger.go @@ -0,0 +1,30 @@ +package scheduler + +import ( + "errors" + + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" + "github.com/google/uuid" + "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// loggerForObject curries a logger with namespace, name, and job UUID taken +// from the object labels. +func loggerForObject(baseLog *zap.Logger, obj metav1.Object) *zap.Logger { + return baseLog.With( + zap.String("namespace", obj.GetNamespace()), + zap.String("name", obj.GetName()), + zap.String("jobUUID", obj.GetLabels()[config.UUIDLabel]), + ) +} + +// jobUUIDForObject parses the Buildkite job UUID from the object labels. +func jobUUIDForObject(obj metav1.Object) (uuid.UUID, error) { + rawJobUUID := obj.GetLabels()[config.UUIDLabel] + if rawJobUUID == "" { + return uuid.UUID{}, errors.New("no job UUID label") + } + + return uuid.Parse(rawJobUUID) +} From 1ea1b55837720932eb7e86b442077adeba28c5a7 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Mon, 2 Dec 2024 15:18:07 +1100 Subject: [PATCH 6/6] Add job watcher --- .buildkite/rbac.yaml | 6 + .../agent-stack-k8s/templates/rbac.yaml.tpl | 6 + cmd/controller/controller.go | 5 + cmd/controller/controller_test.go | 1 + examples/config.yaml | 1 + internal/controller/config/config.go | 2 + internal/controller/controller.go | 13 + internal/controller/scheduler/job_watcher.go | 355 ++++++++++++++++++ internal/controller/scheduler/metrics.go | 87 ++++- internal/controller/scheduler/pod_watcher.go | 14 +- .../fixtures/missing-service-account.yaml | 12 + internal/integration/integration_test.go | 36 +- 12 files changed, 522 insertions(+), 16 deletions(-) create mode 100644 internal/controller/scheduler/job_watcher.go create mode 100644 internal/integration/fixtures/missing-service-account.yaml diff --git a/.buildkite/rbac.yaml b/.buildkite/rbac.yaml index 3fc6cf70..cac0aec0 100644 --- a/.buildkite/rbac.yaml +++ b/.buildkite/rbac.yaml @@ -34,6 +34,12 @@ rules: - pods/eviction verbs: - create + - apiGroups: + - "" + resources: + - events + verbs: + - list --- apiVersion: v1 kind: ServiceAccount diff --git a/charts/agent-stack-k8s/templates/rbac.yaml.tpl b/charts/agent-stack-k8s/templates/rbac.yaml.tpl index e50196b1..6fdc0252 100644 --- a/charts/agent-stack-k8s/templates/rbac.yaml.tpl +++ b/charts/agent-stack-k8s/templates/rbac.yaml.tpl @@ -33,6 +33,12 @@ rules: - pods/eviction verbs: - create + - apiGroups: + - "" + resources: + - events + verbs: + - list --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index ecb6fad1..370ba057 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -110,6 +110,11 @@ func AddConfigFlags(cmd *cobra.Command) { config.DefaultJobCancelCheckerPollInterval, "Controls the interval between job state queries while a pod is still Pending", ) + cmd.Flags().Duration( + "empty-job-grace-period", + config.DefaultEmptyJobGracePeriod, + "Duration after starting a Kubernetes job that the controller will wait before considering failing the job due to a missing pod (e.g. when the podSpec specifies a missing service account)", + ) cmd.Flags().Bool( "prohibit-kubernetes-plugin", false, diff --git a/cmd/controller/controller_test.go b/cmd/controller/controller_test.go index 6e2dda64..e4fdb1c3 100644 --- a/cmd/controller/controller_test.go +++ b/cmd/controller/controller_test.go @@ -26,6 +26,7 @@ func TestReadAndParseConfig(t *testing.T) { JobTTL: 300 * time.Second, ImagePullBackOffGracePeriod: 60 * time.Second, JobCancelCheckerPollInterval: 10 * time.Second, + EmptyJobGracePeriod: 50 * time.Second, PollInterval: 5 * time.Second, StaleJobDataTimeout: 10 * time.Second, JobCreationConcurrency: 5, diff --git a/examples/config.yaml b/examples/config.yaml index 21b6d725..b941a6ee 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -4,6 +4,7 @@ image: my.registry.dev/buildkite-agent:latest job-ttl: 5m image-pull-backoff-grace-period: 60s job-cancel-checker-poll-interval: 10s +empty-job-grace-period: 50s poll-interval: 5s stale-job-data-timeout: 10s job-creation-concurrency: 5 diff --git a/internal/controller/config/config.go b/internal/controller/config/config.go index 167a2068..83a1c589 100644 --- a/internal/controller/config/config.go +++ b/internal/controller/config/config.go @@ -18,6 +18,7 @@ const ( DefaultStaleJobDataTimeout = 10 * time.Second DefaultImagePullBackOffGracePeriod = 30 * time.Second DefaultJobCancelCheckerPollInterval = 5 * time.Second + DefaultEmptyJobGracePeriod = 30 * time.Second DefaultJobCreationConcurrency = 5 ) @@ -51,6 +52,7 @@ type Config struct { PodSpecPatch *corev1.PodSpec `json:"pod-spec-patch" validate:"omitempty"` ImagePullBackOffGracePeriod time.Duration `json:"image-pull-backoff-grace-period" validate:"omitempty"` JobCancelCheckerPollInterval time.Duration `json:"job-cancel-checker-poll-interval" validate:"omitempty"` + EmptyJobGracePeriod time.Duration `json:"empty-job-grace-period" validate:"omitempty"` // WorkspaceVolume allows supplying a volume for /workspace. By default // an EmptyDir volume is created for it. diff --git a/internal/controller/controller.go b/internal/controller/controller.go index f830210b..3cca6362 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -136,7 +136,20 @@ func Run( logger.Fatal("failed to register completions informer", zap.Error(err)) } + // JobWatcher watches for jobs in bad conditions to clean up: + // * Jobs that fail without ever creating a pod + // * Jobs that stall forever without ever creating a pod + jobWatcher := scheduler.NewJobWatcher( + logger.Named("jobWatcher"), + k8sClient, + cfg, + ) + if err := jobWatcher.RegisterInformer(ctx, informerFactory); err != nil { + logger.Fatal("failed to register jobWatcher informer", zap.Error(err)) + } + // PodWatcher watches for other conditions to clean up pods: + // * Pods where an init container failed for any reason // * Pods where a container is in ImagePullBackOff for too long // * Pods that are still pending, but the Buildkite job has been cancelled podWatcher := scheduler.NewPodWatcher( diff --git a/internal/controller/scheduler/job_watcher.go b/internal/controller/scheduler/job_watcher.go new file mode 100644 index 00000000..3e202c89 --- /dev/null +++ b/internal/controller/scheduler/job_watcher.go @@ -0,0 +1,355 @@ +package scheduler + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/model" + + "github.com/google/uuid" + "github.com/jedib0t/go-pretty/v6/table" + "go.uber.org/zap" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/duration" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" + "k8s.io/utils/ptr" +) + +// jobWatcher watches k8s jobs for failure to start a pod. The corresponding +// Buildkite job is failed with an error message if this happens. Also, if such +// a k8s job doesn't enter a terminal state on its own, jobWatcher sets a +// deadline so that it is cleaned up. +type jobWatcher struct { + // Logs go here + logger *zap.Logger + + k8s kubernetes.Interface + cfg *config.Config + + // Tracks stalling jobs (jobs that have yet to create pods). + stallingJobsMu sync.Mutex + stallingJobs map[*batchv1.Job]struct{} + + // Tracks jobs that are being cleaned up (to avoid repeats). + ignoredJobsMu sync.RWMutex + ignoredJobs map[uuid.UUID]struct{} + + // This is the context passed to RegisterInformer. + // It's being stored here (grrrr!) because the k8s ResourceEventHandler + // interface doesn't have context args. (Working around an interface in a + // library outside of our control is a carve-out from the usual rule.) + // The context is needed to ensure goroutines are cleaned up. + resourceEventHandlerCtx context.Context +} + +// NewJobWatcher creates a JobWatcher. +func NewJobWatcher(logger *zap.Logger, k8sClient kubernetes.Interface, cfg *config.Config) *jobWatcher { + w := &jobWatcher{ + logger: logger, + k8s: k8sClient, + cfg: cfg, + stallingJobs: make(map[*batchv1.Job]struct{}), + ignoredJobs: make(map[uuid.UUID]struct{}), + } + jobsStallingGaugeFunc = func() int { + w.stallingJobsMu.Lock() + defer w.stallingJobsMu.Unlock() + return len(w.stallingJobs) + } + jobWatcherIgnoredJobsGaugeFunc = func() int { + w.ignoredJobsMu.Lock() + defer w.ignoredJobsMu.Unlock() + return len(w.ignoredJobs) + } + return w +} + +// RegisterInformer registers the limiter to listen for Kubernetes job events. +func (w *jobWatcher) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error { + informer := factory.Batch().V1().Jobs() + jobInformer := informer.Informer() + if _, err := jobInformer.AddEventHandler(w); err != nil { + return err + } + w.resourceEventHandlerCtx = ctx // See field comment + go factory.Start(ctx.Done()) + // No need to wait for cache sync here. These are cleanup tasks, not + // barriers to prevent creating new jobs. + go w.stalledJobChecker(ctx) + return nil +} + +// OnAdd is called by k8s to inform us a resource is added. +func (w *jobWatcher) OnAdd(obj any, _ bool) { + jobWatcherOnAddEventCounter.Inc() + kjob, _ := obj.(*batchv1.Job) + if kjob == nil { + return + } + // Same logic whether we are considering pre-existing jobs, or new jobs. + w.runChecks(w.resourceEventHandlerCtx, kjob) +} + +// OnUpdate is called by k8s to inform us a resource is updated. +func (w *jobWatcher) OnUpdate(_, curr any) { + jobWatcherOnUpdateEventCounter.Inc() + kjob, _ := curr.(*batchv1.Job) + if kjob == nil { + return + } + // Same logic whether or not anything relevant changed about the job. + w.runChecks(w.resourceEventHandlerCtx, kjob) +} + +// OnDelete is called by k8s to inform us a resource is deleted. +func (w *jobWatcher) OnDelete(prev any) { + jobWatcherOnDeleteEventCounter.Inc() + kjob, _ := prev.(*batchv1.Job) + if kjob == nil { + return + } + w.removeFromStalling(kjob) + + jobUUID, err := jobUUIDForObject(kjob) + if err != nil { + return + } + + // The job is gone, so we can stop ignoring it (if it comes back). + w.unignoreJob(jobUUID) + + // TODO: consider catching jobs that were deleted manually? +} + +func (w *jobWatcher) runChecks(ctx context.Context, kjob *batchv1.Job) { + log := loggerForObject(w.logger, kjob) + jobUUID, err := jobUUIDForObject(kjob) + if err != nil { + log.Error("Job UUID label missing or invalid for job") + return + } + + if w.isIgnored(jobUUID) { + log.Debug("Job is currently ignored for jobWatcher checks") + return + } + + if model.JobFinished(kjob) { + w.removeFromStalling(kjob) + w.checkFinishedWithoutPod(ctx, log, kjob) + } else { + w.checkStalledWithoutPod(log, kjob) + } +} + +func (w *jobWatcher) checkFinishedWithoutPod(ctx context.Context, log *zap.Logger, kjob *batchv1.Job) { + log.Debug("Checking job for finishing without a pod") + + // If the job is finished, there should be one finished pod. + if kjob.Status.Failed+kjob.Status.Succeeded > 0 { + // All's well with the world. + return + } + + jobWatcherFinishedWithoutPodCounter.Inc() + + // Because no pod has been created, the agent hasn't started. + // We can acquire the Buildkite job and fail it ourselves. + log.Info("The Kubernetes job ended without starting a pod. Failing the corresponding Buildkite job") + message := "The Kubernetes job ended without starting a pod.\n" + message += w.fetchEvents(ctx, log, kjob) + w.failJob(ctx, log, kjob, message) +} + +func (w *jobWatcher) checkStalledWithoutPod(log *zap.Logger, kjob *batchv1.Job) { + log.Debug("Checking job for stalling without a pod") + + // If the job is not finished and there is no pod, it should start one + // before too long. Otherwise the job is stalled. + pods := kjob.Status.Active + kjob.Status.Failed + kjob.Status.Succeeded + // Ready and Terminating are subsets of Active (I think) + if utp := kjob.Status.UncountedTerminatedPods; utp != nil { + pods += int32(len(utp.Succeeded)) + pods += int32(len(utp.Failed)) + } + if pods > 0 { + // All's well with the world. + w.removeFromStalling(kjob) + return + } + + if kjob.Status.StartTime == nil { + // the _job_ hasn't even started? + return + } + + w.addToStalling(kjob) +} + +func (w *jobWatcher) fetchEvents(ctx context.Context, log *zap.Logger, kjob *batchv1.Job) string { + // List the events for the job, which might contain useful info for + // diagnosing the problem. + events := w.k8s.CoreV1().Events(w.cfg.Namespace) + evlist, err := events.List(ctx, metav1.ListOptions{ + FieldSelector: fields.AndSelectors( + fields.OneTermEqualSelector("involvedObject.kind", "Job"), + fields.OneTermEqualSelector("involvedObject.name", kjob.Name), + ).String(), + }) + if err != nil { + log.Error("Couldn't get events for job", zap.Error(err)) + return fmt.Sprintf("Couldn't get events for job %s: %v", kjob.Name, err) + } + if evlist == nil { + return "" + } + return w.formatEvents(evlist) +} + +func (w *jobWatcher) failJob(ctx context.Context, log *zap.Logger, kjob *batchv1.Job, message string) { + if err := acquireAndFailForObject(ctx, log, w.k8s, w.cfg, kjob, message); err != nil { + // Maybe the job was cancelled in the meantime? + log.Error("Could not fail Buildkite job", zap.Error(err)) + jobWatcherBuildkiteJobFailErrorsCounter.Inc() + return + } + jobWatcherBuildkiteJobFailsCounter.Inc() +} + +func (w *jobWatcher) formatEvents(evlist *corev1.EventList) string { + if len(evlist.Items) == 0 { + return "Events: none" + } + + tw := table.NewWriter() + tw.SetStyle(table.StyleColoredDark) + tw.AppendHeader(table.Row{"LAST EVENT", "REPEATED", "TYPE", "REASON", "MESSAGE"}) + tw.AppendSeparator() + for _, event := range evlist.Items { + if event.Series == nil { + tw.AppendRow(table.Row{event.EventTime.Time, "-", event.Type, event.Reason, event.Message}) + continue + } + lastTime := event.Series.LastObservedTime.Time + firstToLast := duration.HumanDuration(lastTime.Sub(event.EventTime.Time)) + countMsg := fmt.Sprintf("x%d over %s", event.Series.Count, firstToLast) + tw.AppendRow(table.Row{lastTime, countMsg, event.Type, event.Reason, event.Message}) + } + return tw.Render() +} + +func (w *jobWatcher) addToStalling(kjob *batchv1.Job) { + w.stallingJobsMu.Lock() + defer w.stallingJobsMu.Unlock() + w.stallingJobs[kjob] = struct{}{} +} + +func (w *jobWatcher) removeFromStalling(kjob *batchv1.Job) { + w.stallingJobsMu.Lock() + defer w.stallingJobsMu.Unlock() + delete(w.stallingJobs, kjob) +} + +func (w *jobWatcher) stalledJobChecker(ctx context.Context) { + ticker := time.Tick(time.Second) + for { + select { + case <-ctx.Done(): + return + + case <-ticker: + // continue below + } + + // Gather stalled jobs + var stalled []*batchv1.Job + w.stallingJobsMu.Lock() + for kjob := range w.stallingJobs { + if time.Since(kjob.Status.StartTime.Time) < w.cfg.EmptyJobGracePeriod { + continue + } + + // ignore it from now until it is deleted + jobUUID, err := jobUUIDForObject(kjob) + if err != nil { + continue + } + w.ignoreJob(jobUUID) + + // Move it from w.stalling into stalled + stalled = append(stalled, kjob) + delete(w.stallingJobs, kjob) + } + w.stallingJobsMu.Unlock() + + jobWatcherStalledWithoutPodCounter.Add(float64(len(stalled))) + + // Fail BK jobs and delete k8s jobs. + for _, kjob := range stalled { + w.cleanupStalledJob(ctx, kjob) + } + } +} + +func (w *jobWatcher) cleanupStalledJob(ctx context.Context, kjob *batchv1.Job) { + log := loggerForObject(w.logger, kjob) + + // Fetch events for the failure message, and try to fail the job. + stallDuration := duration.HumanDuration(time.Since(kjob.Status.StartTime.Time)) + message := fmt.Sprintf("The Kubernetes job spent %s without starting a pod.\n", stallDuration) + message += w.fetchEvents(ctx, log, kjob) + w.failJob(ctx, log, kjob, message) + + // Use ActiveDeadlineSeconds to fail the job, which makes k8s delete the job + // in the future. + // If we delete it straight away, the deduper will remove it from its map, + // letting the job (still in BK state "scheduled") be recreated immediately + // even though the job is failed on BK first, because it might still be in + // the most recent jobs query, waiting for the staleness timeout. + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + job, err := w.k8s.BatchV1().Jobs(kjob.Namespace).Get(ctx, kjob.Name, metav1.GetOptions{}) + if err != nil { + return err + } + // activeDeadlineSeconds applies from the start of the job. But the + // job is only cleaned up though TTLSecondsAfterFinished, which is way + // in the future. + job.Spec.ActiveDeadlineSeconds = ptr.To[int64](1) + _, err = w.k8s.BatchV1().Jobs(kjob.Namespace).Update(ctx, job, metav1.UpdateOptions{}) + return err + }); err != nil { + jobWatcherJobCleanupErrorsCounter.WithLabelValues(string(kerrors.ReasonForError(err))).Inc() + w.logger.Error("failed to update job with ActiveDeadlineSeconds", zap.Error(err)) + return + } + jobWatcherJobCleanupsCounter.Inc() +} + +func (w *jobWatcher) ignoreJob(jobUUID uuid.UUID) { + w.ignoredJobsMu.Lock() + defer w.ignoredJobsMu.Unlock() + w.ignoredJobs[jobUUID] = struct{}{} +} + +func (w *jobWatcher) unignoreJob(jobUUID uuid.UUID) { + w.ignoredJobsMu.Lock() + defer w.ignoredJobsMu.Unlock() + delete(w.ignoredJobs, jobUUID) +} + +func (w *jobWatcher) isIgnored(jobUUID uuid.UUID) bool { + w.ignoredJobsMu.RLock() + defer w.ignoredJobsMu.RUnlock() + _, ignore := w.ignoredJobs[jobUUID] + return ignore +} diff --git a/internal/controller/scheduler/metrics.go b/internal/controller/scheduler/metrics.go index 49e7d542..f1846d20 100644 --- a/internal/controller/scheduler/metrics.go +++ b/internal/controller/scheduler/metrics.go @@ -53,12 +53,89 @@ var ( }) ) +// Job watcher metrics +var ( + // Overridden by NewJobWatcher + jobsStallingGaugeFunc = func() int { return 0 } + jobWatcherIgnoredJobsGaugeFunc = func() int { return 0 } + + _ = promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "num_stalling_jobs", + Help: "Current number of jobs that are running but have no pods", + }, func() float64 { return float64(jobsStallingGaugeFunc()) }) + _ = promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "num_ignored_jobs", + Help: "Current count of jobs ignored for jobWatcher checks", + }, func() float64 { return float64(jobWatcherIgnoredJobsGaugeFunc()) }) + + jobWatcherOnAddEventCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "onadd_events_total", + Help: "Count of OnAdd informer events", + }) + jobWatcherOnUpdateEventCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "onupdate_events_total", + Help: "Count of OnUpdate informer events", + }) + jobWatcherOnDeleteEventCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "ondelete_events_total", + Help: "Count of OnDelete informer events", + }) + + jobWatcherFinishedWithoutPodCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "jobs_finished_without_pod_total", + Help: "Count of jobs that entered a terminal state (Failed or Succeeded) without a pod", + }) + jobWatcherStalledWithoutPodCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "jobs_stalled_without_pod_total", + Help: "Count of jobs that ran for too long without a pod", + }) + + jobWatcherBuildkiteJobFailsCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "jobs_failed_on_buildkite_total", + Help: "Count of jobs that jobWatcher successfully acquired and failed on Buildkite", + }) + jobWatcherBuildkiteJobFailErrorsCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "job_fail_on_buildkite_errors_total", + Help: "Count of errors when jobWatcher tried to acquire and fail a job on Buildkite", + }) + jobWatcherJobCleanupsCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "cleanups_total", + Help: "Count of stalled jobs successfully cleaned up", + }) + jobWatcherJobCleanupErrorsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "cleanup_errors_total", + Help: "Count of errors during attempts to clean up a stalled job", + }, []string{"reason"}) +) + // Pod watcher metrics var ( // Overridden to return len(jobCancelCheckers) by podWatcher. - jobCancelCheckerGaugeFunc = func() int { return 0 } - ignoredJobsGaugeFunc = func() int { return 0 } + jobCancelCheckerGaugeFunc = func() int { return 0 } + podWatcherIgnoredJobsGaugeFunc = func() int { return 0 } _ = promauto.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: promNamespace, @@ -71,7 +148,7 @@ var ( Subsystem: "pod_watcher", Name: "num_ignored_jobs", Help: "Current count of jobs ignored for podWatcher checks", - }, func() float64 { return float64(ignoredJobsGaugeFunc()) }) + }, func() float64 { return float64(podWatcherIgnoredJobsGaugeFunc()) }) podWatcherOnAddEventCounter = promauto.NewCounter(prometheus.CounterOpts{ Namespace: promNamespace, @@ -151,12 +228,12 @@ var ( Namespace: promNamespace, Subsystem: "completion_watcher", Name: "cleanups_total", - Help: "Count of jobs successfully cleaned up", + Help: "Count of jobs with finished agents successfully cleaned up", }) completionWatcherJobCleanupErrorsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: promNamespace, Subsystem: "completion_watcher", Name: "cleanup_errors_total", - Help: "Count of errors during attempts to clean up a job", + Help: "Count of errors during attempts to clean up a job with a finished agent", }, []string{"reason"}) ) diff --git a/internal/controller/scheduler/pod_watcher.go b/internal/controller/scheduler/pod_watcher.go index f03fd895..b5c4b6c0 100644 --- a/internal/controller/scheduler/pod_watcher.go +++ b/internal/controller/scheduler/pod_watcher.go @@ -42,7 +42,7 @@ type podWatcher struct { // Jobs that we've failed, cancelled, or were found to be in a terminal // state. ignoredJobsMu sync.RWMutex - ignoredJob map[uuid.UUID]struct{} + ignoredJobs map[uuid.UUID]struct{} // The job cancel checkers query the job state every so often. jobCancelCheckerInterval time.Duration @@ -95,14 +95,14 @@ func NewPodWatcher(logger *zap.Logger, k8s kubernetes.Interface, cfg *config.Con cfg: cfg, imagePullBackOffGracePeriod: imagePullBackOffGracePeriod, jobCancelCheckerInterval: jobCancelCheckerInterval, - ignoredJob: make(map[uuid.UUID]struct{}), + ignoredJobs: make(map[uuid.UUID]struct{}), cancelCheckerChs: make(map[uuid.UUID]*onceChan), agentTags: agentTags, } - ignoredJobsGaugeFunc = func() int { + podWatcherIgnoredJobsGaugeFunc = func() int { pw.ignoredJobsMu.RLock() defer pw.ignoredJobsMu.RUnlock() - return len(pw.ignoredJob) + return len(pw.ignoredJobs) } jobCancelCheckerGaugeFunc = func() int { pw.cancelCheckerChsMu.Lock() @@ -530,19 +530,19 @@ func (w *podWatcher) jobCancelChecker(ctx context.Context, stopCh <-chan struct{ func (w *podWatcher) ignoreJob(jobUUID uuid.UUID) { w.ignoredJobsMu.Lock() defer w.ignoredJobsMu.Unlock() - w.ignoredJob[jobUUID] = struct{}{} + w.ignoredJobs[jobUUID] = struct{}{} } func (w *podWatcher) unignoreJob(jobUUID uuid.UUID) { w.ignoredJobsMu.Lock() defer w.ignoredJobsMu.Unlock() - delete(w.ignoredJob, jobUUID) + delete(w.ignoredJobs, jobUUID) } func (w *podWatcher) isIgnored(jobUUID uuid.UUID) bool { w.ignoredJobsMu.RLock() defer w.ignoredJobsMu.RUnlock() - _, ignore := w.ignoredJob[jobUUID] + _, ignore := w.ignoredJobs[jobUUID] return ignore } diff --git a/internal/integration/fixtures/missing-service-account.yaml b/internal/integration/fixtures/missing-service-account.yaml new file mode 100644 index 00000000..cc2a128f --- /dev/null +++ b/internal/integration/fixtures/missing-service-account.yaml @@ -0,0 +1,12 @@ +steps: + - label: ":x:" + agents: + queue: "{{.queue}}" + plugins: + - kubernetes: + podSpec: + serviceAccount: does-not-exist + containers: + - image: buildkite/agent:latest + command: + - "true" diff --git a/internal/integration/integration_test.go b/internal/integration/integration_test.go index f8e7c1ae..d9b43d87 100644 --- a/internal/integration/integration_test.go +++ b/internal/integration/integration_test.go @@ -338,6 +338,7 @@ func TestSidecars(t *testing.T) { tc.StartController(ctx, cfg) build := tc.TriggerBuild(ctx, pipelineID) tc.AssertSuccess(ctx, build) + time.Sleep(5 * time.Second) tc.AssertLogsContain(build, "Welcome to nginx!") } @@ -367,6 +368,7 @@ func TestInvalidPodSpec(t *testing.T) { tc.StartController(ctx, cfg) build := tc.TriggerBuild(ctx, pipelineID) tc.AssertFail(ctx, build) + time.Sleep(5 * time.Second) // trying to reduce flakes: logs not immediately available tc.AssertLogsContain( build, `is invalid: spec.template.spec.containers[0].volumeMounts[0].name: Not found: "this-doesnt-exist"`, @@ -385,12 +387,29 @@ func TestInvalidPodJSON(t *testing.T) { tc.StartController(ctx, cfg) build := tc.TriggerBuild(ctx, pipelineID) tc.AssertFail(ctx, build) + time.Sleep(5 * time.Second) // trying to reduce flakes: logs not immediately available tc.AssertLogsContain( build, "failed parsing Kubernetes plugin: json: cannot unmarshal number into Go struct field EnvVar.podSpec.containers.env.value of type string", ) } +func TestMissingServiceAccount(t *testing.T) { + tc := testcase{ + T: t, + Fixture: "missing-service-account.yaml", + Repo: repoHTTP, + GraphQL: api.NewClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint), + }.Init() + ctx := context.Background() + pipelineID := tc.PrepareQueueAndPipelineWithCleanup(ctx) + tc.StartController(ctx, cfg) + build := tc.TriggerBuild(ctx, pipelineID) + tc.AssertFail(ctx, build) + time.Sleep(5 * time.Second) // trying to reduce flakes: logs not immediately available + tc.AssertLogsContain(build, "error looking up service account") +} + func TestEnvVariables(t *testing.T) { tc := testcase{ T: t, @@ -403,6 +422,7 @@ func TestEnvVariables(t *testing.T) { tc.StartController(ctx, cfg) build := tc.TriggerBuild(ctx, pipelineID) tc.AssertSuccess(ctx, build) + time.Sleep(5 * time.Second) // trying to reduce flakes: logs not immediately available tc.AssertLogsContain(build, "Testing some env variables: set") } @@ -418,8 +438,10 @@ func TestImagePullBackOffFailed(t *testing.T) { tc.StartController(ctx, cfg) build := tc.TriggerBuild(ctx, pipelineID) tc.AssertFail(ctx, build) - tc.AssertLogsContain(build, "other job has run") - tc.AssertLogsContain(build, "The following container images couldn't be pulled:\n * \"buildkite/non-existant-image:latest\"") + time.Sleep(5 * time.Second) // trying to reduce flakes: logs not immediately available + logs := tc.FetchLogs(build) + assert.Contains(t, logs, "other job has run") + assert.Contains(t, logs, "The following container images couldn't be pulled:\n * \"buildkite/non-existant-image:latest\"") } func TestBrokenInitContainer(t *testing.T) { @@ -434,8 +456,10 @@ func TestBrokenInitContainer(t *testing.T) { tc.StartController(ctx, cfg) build := tc.TriggerBuild(ctx, pipelineID) tc.AssertFail(ctx, build) - tc.AssertLogsContain(build, "The following init containers failed:") - tc.AssertLogsContain(build, "well this isn't going to work") + time.Sleep(5 * time.Second) // trying to reduce flakes: logs not immediately available + logs := tc.FetchLogs(build) + assert.Contains(t, logs, "The following init containers failed:") + assert.Contains(t, logs, "well this isn't going to work") } func TestInvalidImageRefFormat(t *testing.T) { @@ -450,6 +474,7 @@ func TestInvalidImageRefFormat(t *testing.T) { tc.StartController(ctx, cfg) build := tc.TriggerBuild(ctx, pipelineID) tc.AssertFail(ctx, build) + time.Sleep(5 * time.Second) // trying to reduce flakes: logs not immediately available tc.AssertLogsContain(build, "The following container images couldn't be pulled:\n * \"buildkite/agent:latest plus some extra junk\"") } @@ -480,6 +505,7 @@ func TestInterposerBuildkite(t *testing.T) { tc.StartController(ctx, cfg) build := tc.TriggerBuild(ctx, pipelineID) tc.AssertSuccess(ctx, build) + time.Sleep(5 * time.Second) // trying to reduce flakes: logs not immediately available logs := tc.FetchLogs(build) assert.Contains(t, logs, "Hello World!") assert.Contains(t, logs, "Goodbye World!") @@ -500,6 +526,7 @@ func TestInterposerVector(t *testing.T) { build := tc.TriggerBuild(ctx, pipelineID) tc.AssertSuccess(ctx, build) logs := tc.FetchLogs(build) + time.Sleep(5 * time.Second) // trying to reduce flakes: logs not immediately available assert.Contains(t, logs, "Hello World!") assert.Contains(t, logs, "Goodbye World!") } @@ -526,6 +553,7 @@ func TestCancelCheckerEvictsPod(t *testing.T) { t.Errorf("api.BuildCancel(... %q) error: %v", build.Id, err) } tc.AssertCancelled(ctx, build) + time.Sleep(5 * time.Second) // trying to reduce flakes: logs not immediately available logs := tc.FetchLogs(build) if strings.Contains(logs, "Received cancellation signal, interrupting") { t.Error("The agent ran and handled cancellation")