Skip to content

Commit

Permalink
Refactor fail_job functions
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Dec 5, 2024
1 parent 98e9a2d commit f849b99
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 30 deletions.
39 changes: 37 additions & 2 deletions internal/controller/scheduler/fail_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"os"

"github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/version"

agentcore "github.com/buildkite/agent/v3/core"
Expand All @@ -16,9 +18,42 @@ import (
"k8s.io/client-go/kubernetes"
)

// failJob fails the job in Buildkite. agentToken needs to be the token value.
// acquireAndFailForObject figures out how to fail the BK job corresponding to
// the k8s object (a pod or job) by inspecting the object's labels.
func acquireAndFailForObject(
ctx context.Context,
logger *zap.Logger,
k8sClient kubernetes.Interface,
cfg *config.Config,
obj metav1.Object,
message string,
) error {
agentToken, err := fetchAgentToken(ctx, logger, k8sClient, obj.GetNamespace(), cfg.AgentTokenSecret)
if err != nil {
logger.Error("fetching agent token from secret", zap.Error(err))
return err
}

// Matching tags are required order to connect the temporary agent.
labels := obj.GetLabels()
jobUUID := labels[config.UUIDLabel]
if jobUUID == "" {
logger.Error("object missing UUID label", zap.String("label", config.UUIDLabel))
return errors.New("missing UUID label")
}
tags := agenttags.TagsFromLabels(labels)
opts := cfg.AgentConfig.ControllerOptions()

if err := acquireAndFail(ctx, logger, agentToken, jobUUID, tags, message, opts...); err != nil {
logger.Error("failed to acquire and fail the job on Buildkite", zap.Error(err))
return err
}
return nil
}

// acquireAndFail fails the job in Buildkite. agentToken needs to be the token value.
// Use fetchAgentToken to fetch it from the k8s secret.
func failJob(
func acquireAndFail(
ctx context.Context,
zapLogger *zap.Logger,
agentToken string,
Expand Down
51 changes: 24 additions & 27 deletions internal/controller/scheduler/pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) {

// Check for an init container that failed for any reason.
// (Note: users can define their own init containers through podSpec.)
w.failOnInitContainerFailure(ctx, log, pod, jobUUID)
w.failOnInitContainerFailure(ctx, log, pod)

// Check for a container stuck in ImagePullBackOff or InvalidImageName,
// and fail or cancel the job accordingly.
Expand Down Expand Up @@ -306,13 +306,25 @@ func (w *podWatcher) failOnImagePullFailure(ctx context.Context, log *zap.Logger
// We can acquire it and fail it ourselves.
log.Info("One or more job containers are in ImagePullBackOff. Failing.")
message := w.formatImagePullFailureMessage(images)
if err := w.failJob(ctx, log, pod, jobUUID, message); errors.Is(err, agentcore.ErrJobAcquisitionRejected) {
// If the error was because BK rejected the acquisition(?), then its probably moved
// on to a state where we need to cancel instead.
switch err := acquireAndFailForObject(ctx, log, w.k8s, w.cfg, pod, message); {
case errors.Is(err, agentcore.ErrJobAcquisitionRejected):
podWatcherBuildkiteJobFailErrorsCounter.Inc()
// If the error was because BK rejected the job acquisition, then
// it's moved on to a state where we need to cancel instead.
// (The init container probably successfully pulled, but another
// pull of the same image later on failed after the agent started.)
log.Info("Attempting to cancel job instead")
w.cancelJob(ctx, log, pod, jobUUID)
return

case err != nil:
podWatcherBuildkiteJobFailErrorsCounter.Inc()

// Maybe the job was cancelled in the meantime?
log.Error("Could not fail Buildkite job", zap.Error(err))
return
}
podWatcherBuildkiteJobFailsCounter.Inc()
// Also evict the pod, because it won't die on its own.
w.evictPod(ctx, log, pod, jobUUID)

Expand All @@ -337,7 +349,7 @@ func (w *podWatcher) failOnImagePullFailure(ctx context.Context, log *zap.Logger
}
}

func (w *podWatcher) failOnInitContainerFailure(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID) {
func (w *podWatcher) failOnInitContainerFailure(ctx context.Context, log *zap.Logger, pod *corev1.Pod) {
log.Debug("Checking pod for failed init containers")

containerFails := make(map[string]*corev1.ContainerStateTerminated)
Expand Down Expand Up @@ -366,8 +378,13 @@ func (w *podWatcher) failOnInitContainerFailure(ctx context.Context, log *zap.Lo
// probably shouldn't interfere.
log.Info("One or more init containers failed. Failing.")
message := w.formatInitContainerFails(containerFails)
// failJob logs on error - that's sufficient error handling.
_ = w.failJob(ctx, log, pod, jobUUID, message)
if err := acquireAndFailForObject(ctx, log, w.k8s, w.cfg, pod, message); err != nil {
// Maybe the job was cancelled in the meantime?
log.Error("Could not fail Buildkite job", zap.Error(err))
podWatcherBuildkiteJobFailErrorsCounter.Inc()
return
}
podWatcherBuildkiteJobFailsCounter.Inc()
// No need to fall back to cancelling if acquire failed - see above.
// No need to evict, the pod should be considered failed already.
}
Expand Down Expand Up @@ -405,26 +422,6 @@ func (w *podWatcher) formatImagePullFailureMessage(images map[string]struct{}) s
return message.String()
}

func (w *podWatcher) failJob(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID, message string) error {
agentToken, err := fetchAgentToken(ctx, w.logger, w.k8s, w.cfg.Namespace, w.cfg.AgentTokenSecret)
if err != nil {
log.Error("Couldn't fetch agent token in order to fail the job", zap.Error(err))
return err
}

// Tags are required order to connect the agent.
tags := agenttags.TagsFromLabels(pod.Labels)
opts := w.cfg.AgentConfig.ControllerOptions()

if err := failJob(ctx, w.logger, agentToken, jobUUID.String(), tags, message, opts...); err != nil {
log.Error("Couldn't fail the job", zap.Error(err))
podWatcherBuildkiteJobFailErrorsCounter.Inc()
return err
}
podWatcherBuildkiteJobFailsCounter.Inc()
return nil
}

func (w *podWatcher) evictPod(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID) {
eviction := &policyv1.Eviction{
ObjectMeta: pod.ObjectMeta,
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ func (w *worker) failJob(ctx context.Context, inputs buildInputs, message string
}

opts := w.cfg.AgentConfig.ControllerOptions()
if err := failJob(ctx, w.logger, agentToken, inputs.uuid, inputs.agentQueryRules, message, opts...); err != nil {
if err := acquireAndFail(ctx, w.logger, agentToken, inputs.uuid, inputs.agentQueryRules, message, opts...); err != nil {
w.logger.Error("failed to acquire and fail the job on Buildkite", zap.Error(err))
schedulerBuildkiteJobFailErrorsCounter.Inc()
return err
Expand Down

0 comments on commit f849b99

Please sign in to comment.